-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Update conf column in dag_run table type from bytes to JSON #44533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ephraimbuddy
merged 56 commits into
apache:main
from
astronomer:dag_run_migrate_conf_column_as_json
Jan 13, 2025
Merged
Changes from all commits
Commits
Show all changes
56 commits
Select commit
Hold shift + click to select a range
b91446b
remove pickled data from dag run table
vatsrahul1001 cbde2ed
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 e698769
fix downgrade + add news fragement
vatsrahul1001 f7f9155
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 18c03fb
remove archive table if exits after downgrade
vatsrahul1001 2b6abf7
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 a54c435
removing archiving data
vatsrahul1001 6b3789d
implementing review comments
vatsrahul1001 07258f5
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 8b88d63
fixing static check
vatsrahul1001 730470b
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 c2e0d09
fixing static checks
vatsrahul1001 c2b8dbd
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 c8fa27d
simplying upgrade and downgrade as per review
vatsrahul1001 d27b7ed
simplying upgrade and downgrade as per review
vatsrahul1001 603ca17
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 9406702
fixing failures
vatsrahul1001 01c453c
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 219b694
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 22a5dba
removing setting conf to null
vatsrahul1001 70ce5df
refactor approach to migrate values in conf
vatsrahul1001 0cade3a
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 cce341a
update offline warning
vatsrahul1001 c3604d9
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 1eb120a
resolving conflicts
vatsrahul1001 1bdc5cb
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 8261e05
resolving conflicts
vatsrahul1001 9101a05
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 a6ebc01
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 7fb6509
resolving conflicts
vatsrahul1001 33e4c6e
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 6b23d8a
resolving conflicts
vatsrahul1001 249aa7e
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 fa28acd
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 dcd8534
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 71078d7
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 2ee9e7b
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 d2f2b0f
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 4845c61
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 141abfd
Merge branch 'main' into dag_run_migrate_conf_column_as_json
jedcunningham 1276951
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 8488cdf
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 fbdfd66
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 2b91c12
updating batch size
vatsrahul1001 a946d19
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 4409c11
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 a8ec33b
updaing conf type
vatsrahul1001 95da082
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 8e1dc76
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 0484923
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 630e5f6
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 ccf7c2f
Merge branch 'dag_run_migrate_conf_column_as_json' of github.com:astr…
vatsrahul1001 b30f8c1
Merge branch 'main' of github.com:astronomer/airflow into dag_run_mig…
vatsrahul1001 331455f
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 80ca6ba
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 56abe3a
Merge branch 'main' into dag_run_migrate_conf_column_as_json
vatsrahul1001 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
145 changes: 145 additions & 0 deletions
145
airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| """ | ||
| remove pickled data from dagrun table. | ||
|
|
||
| Revision ID: e39a26ac59f6 | ||
| Revises: 38770795785f | ||
| Create Date: 2024-12-01 08:33:15.425141 | ||
|
|
||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import pickle | ||
| from textwrap import dedent | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import context, op | ||
| from sqlalchemy import text | ||
| from sqlalchemy.dialects import postgresql | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "e39a26ac59f6" | ||
| down_revision = "38770795785f" | ||
| branch_labels = None | ||
| depends_on = None | ||
| airflow_version = "3.0.0" | ||
|
|
||
|
|
||
| def upgrade(): | ||
| """Apply remove pickled data from dagrun table.""" | ||
| conn = op.get_bind() | ||
| conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") | ||
| op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True)) | ||
|
|
||
| if context.is_offline_mode(): | ||
| print( | ||
| dedent(""" | ||
| ------------ | ||
| -- WARNING: Unable to migrate the data in the 'conf' column while in offline mode! | ||
| -- The 'conf' column will be set to NULL in offline mode. | ||
| -- Avoid using offline mode if you need to retain 'conf' values. | ||
| ------------ | ||
| """) | ||
| ) | ||
| else: | ||
| BATCH_SIZE = 100 | ||
| offset = 0 | ||
| while True: | ||
| rows = conn.execute( | ||
| text( | ||
| f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" | ||
| ) | ||
| ).fetchall() | ||
| if not rows: | ||
| break | ||
| for row in rows: | ||
| row_id, pickle_data = row | ||
|
|
||
| try: | ||
| original_data = pickle.loads(pickle_data) | ||
| json_data = json.dumps(original_data) | ||
| conn.execute( | ||
| text(""" | ||
| UPDATE dag_run | ||
| SET conf_json = :json_data | ||
| WHERE id = :id | ||
| """), | ||
| {"json_data": json_data, "id": row_id}, | ||
| ) | ||
| except Exception as e: | ||
| print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}") | ||
| continue | ||
| offset += BATCH_SIZE | ||
|
|
||
| op.drop_column("dag_run", "conf") | ||
|
|
||
| op.alter_column("dag_run", "conf_json", existing_type=conf_type, new_column_name="conf") | ||
|
|
||
|
|
||
| def downgrade(): | ||
| """Unapply Remove pickled data from dagrun table.""" | ||
| conn = op.get_bind() | ||
| op.add_column("dag_run", sa.Column("conf_pickle", sa.PickleType(), nullable=True)) | ||
|
|
||
| if context.is_offline_mode(): | ||
| print( | ||
| dedent(""" | ||
| ------------ | ||
| -- WARNING: Unable to migrate the data in the 'conf' column while in offline mode! | ||
| -- The 'conf' column will be set to NULL in offline mode. | ||
| -- Avoid using offline mode if you need to retain 'conf' values. | ||
| ------------ | ||
| """) | ||
| ) | ||
|
|
||
| else: | ||
| BATCH_SIZE = 100 | ||
| offset = 0 | ||
| while True: | ||
| rows = conn.execute( | ||
| text( | ||
| f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" | ||
| ) | ||
| ).fetchall() | ||
| if not rows: | ||
| break | ||
| for row in rows: | ||
| row_id, json_data = row | ||
|
|
||
| try: | ||
| pickled_data = pickle.dumps(json_data, protocol=pickle.HIGHEST_PROTOCOL) | ||
ephraimbuddy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| conn.execute( | ||
| text(""" | ||
| UPDATE dag_run | ||
| SET conf_pickle = :pickle_data | ||
| WHERE id = :id | ||
| """), | ||
| {"pickle_data": pickled_data, "id": row_id}, | ||
| ) | ||
| except Exception as e: | ||
| print(f"Error pickling dagrun conf for dagrun ID {row_id}: {e}") | ||
| continue | ||
| offset += BATCH_SIZE | ||
|
|
||
| op.drop_column("dag_run", "conf") | ||
|
|
||
| op.alter_column("dag_run", "conf_pickle", existing_type=sa.PickleType(), new_column_name="conf") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| f4ad824c8d9ff45e86002506edd83b540a88dab45bb292b1af96cd86dec5ecab | ||
| ca59d711e6304f8bfdb25f49339d455602430dd6b880e420869fc892faef0596 |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| During offline migration, ``DagRun.conf`` is cleared | ||
|
|
||
| .. Provide additional contextual information | ||
|
|
||
| The ``conf`` column is changing from pickle to json, thus, the values in that column cannot be migrated during offline migrations. If you want to retain ``conf`` values for existing DagRuns, you must do a normal, non-offline, migration. |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.