From b91446bbb812787659bbf05b7c103439aea35664 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Sun, 1 Dec 2024 18:38:02 +0530 Subject: [PATCH 01/16] remove pickled data from dag run table --- ...0_remove_pickled_data_from_dagrun_table.py | 195 ++++++++++++++++++ airflow/models/dagrun.py | 5 +- airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2 +- docs/apache-airflow/migrations-ref.rst | 4 +- 6 files changed, 204 insertions(+), 6 deletions(-) create mode 100644 airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py diff --git a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py new file mode 100644 index 0000000000000..50f6a069c9c59 --- /dev/null +++ b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +remove pickled data from dagrun table. + +Revision ID: e39a26ac59f6 +Revises: eed27faa34e3 +Create Date: 2024-12-01 08:33:15.425141 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import text +from sqlalchemy.dialects.mysql import LONGBLOB +from sqlalchemy_utils import UUIDType + +# revision identifiers, used by Alembic. +revision = "e39a26ac59f6" +down_revision = "eed27faa34e3" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply remove pickled data from dagrun table.""" + # Summary of the change: + # 1. Create an archived table (`_dag_run_archive`) to store the current "pickled" data in the dag_run table + # 2. Extract and archive the pickled data using the condition + # 3. Delete the pickled data from the dag_run table so that we can update the column type + # 4. Update the dag_run.conf column type to JSON from bytea + + conn = op.get_bind() + dialect = conn.dialect.name + + # Create an archived table to store the current data + # Create the dag_run table + op.create_table( + "_dag_run_archive", + sa.Column("id", sa.Integer(), nullable=False, primary_key=True, autoincrement=True), + sa.Column("dag_id", sa.String(length=250), nullable=False), + sa.Column("queued_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("logical_date", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("start_date", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("end_date", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("state", sa.String(length=50), nullable=True), + sa.Column("run_id", sa.String(length=250), nullable=False), + sa.Column("creating_job_id", sa.Integer(), nullable=True), + sa.Column("external_trigger", sa.Boolean(), nullable=True), + sa.Column("run_type", sa.String(length=50), nullable=False), + sa.Column("triggered_by", sa.String(length=50), nullable=True), + sa.Column("conf", sa.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=True), + sa.Column("data_interval_start", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("data_interval_end", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("last_scheduling_decision", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("log_template_id", sa.Integer(), nullable=True), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("clear_number", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column("backfill_id", sa.Integer(), nullable=True), + sa.Column("dag_version_id", UUIDType(binary=False), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("dag_id", "run_id"), + if_not_exists=True, + ) + + # Condition to detect pickled data for different databases + condition_templates = { + "postgresql": "get_byte(conf, 0) = 128", + "mysql": "HEX(SUBSTRING(conf, 1, 1)) = '80'", + "sqlite": "substr(conf, 1, 1) = char(128)", + } + + condition = condition_templates.get(dialect) + if not condition: + raise RuntimeError(f"Unsupported dialect: {dialect}") + + # Archive pickled data using the condition + conn.execute( + text( + f""" + INSERT INTO _dag_run_archive (dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id) + SELECT dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id + FROM dag_run + WHERE conf IS NOT NULL AND {condition}; + + """ + ) + ) + + # Delete the pickled data from the dag_run table so that we can update the column type + conn.execute(text(f"DELETE FROM dag_run WHERE conf IS NOT NULL AND {condition}")) + + # Update the conf column type to JSON + if dialect == "postgresql": + op.execute( + """ + ALTER TABLE dag_run + ALTER COLUMN conf TYPE JSONB + USING CASE + WHEN conf IS NOT NULL THEN CAST(CONVERT_FROM(conf, 'UTF8') AS JSONB) + ELSE NULL + END + """ + ) + elif dialect == "mysql": + op.add_column("dag_run", sa.Column("conf_json", sa.JSON(), nullable=True)) + op.execute("UPDATE dag_run SET conf_json = CAST(conf AS CHAR CHARACTER SET utf8mb4)") + op.drop_column("dag_run", "conf") + op.alter_column("dag_run", "conf_json", existing_type=sa.JSON(), new_column_name="conf") + elif dialect == "sqlite": + # Rename the existing `conf` column to `conf_old` + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.alter_column("conf", new_column_name="conf_old") + + # Add the new `conf` column with JSON type + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.add_column(sa.Column("conf", sa.JSON(), nullable=True)) + + # Migrate data from `conf_old` to `conf` + conn.execute( + text( + """ + UPDATE dag_run + SET conf = json(CAST(conf_old AS TEXT)) + WHERE conf_old IS NOT NULL + """ + ) + ) + + # Drop the old `conf_old` column + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_column("conf_old") + + +def downgrade(): + """Unapply Remove pickled data from dagrun table.""" + conn = op.get_bind() + dialect = conn.dialect.name + + # Revert the conf column back to LargeBinary + if dialect == "postgresql": + op.execute( + """ + ALTER TABLE dag_run + ALTER COLUMN conf TYPE BYTEA + USING CASE + WHEN conf IS NOT NULL THEN CONVERT_TO(conf::TEXT, 'UTF8') + ELSE NULL + END + """ + ) + elif dialect == "mysql": + op.add_column("dag_run", sa.Column("conf_blob", LONGBLOB, nullable=True)) + op.execute("UPDATE dag_run SET conf_blob = CAST(conf AS BINARY);") + op.drop_column("dag_run", "conf") + op.alter_column("dag_run", "conf_blob", existing_type=LONGBLOB, new_column_name="conf") + + elif dialect == "sqlite": + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.alter_column("conf", new_column_name="conf_old") + + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.add_column(sa.Column("conf", sa.LargeBinary, nullable=True)) + + conn.execute( + text( + """ + UPDATE dag_run + SET conf = CAST(conf_old AS BLOB) + WHERE conf IS NOT NULL + """ + ) + ) + + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_column("conf_old") diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index b28d2fbf02fc1..563a4e33f3da3 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -25,6 +25,7 @@ import re2 from sqlalchemy import ( + JSON, Boolean, Column, Enum, @@ -32,7 +33,6 @@ ForeignKeyConstraint, Index, Integer, - PickleType, PrimaryKeyConstraint, String, Text, @@ -44,6 +44,7 @@ text, update, ) +from sqlalchemy.dialects import postgresql from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates @@ -137,7 +138,7 @@ class DagRun(Base, LoggingMixin): triggered_by = Column( Enum(DagRunTriggeredByType, native_enum=False, length=50) ) # Airflow component that triggered the run. - conf = Column(PickleType) + conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) # These two must be either both NULL or both datetime. data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 5748adf0e46e6..fe13a496ff2cc 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "eed27faa34e3", + "3.0.0": "e39a26ac59f6", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 6b004f91298b9..58eabb233cd3d 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -aa9e2e5b2a52af1e92bc876727ad5e8958e291315096fc5249a9afa2c21a5d06 \ No newline at end of file +bfbabfa8ed9fb1365f99b715e4c1a361e12f71743a31fef4c1deb4d02714e1f6 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index fd06e1810f731..440f8d7eed734 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1719,7 +1719,7 @@ conf - [BYTEA] + [JSONB] creating_job_id diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 88a6079d6b6c5..1664df20008af 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``eed27faa34e3`` (head) | ``9fc3fc5de720`` | ``3.0.0`` | Remove pickled data from xcom table. | +| ``e39a26ac59f6`` (head) | ``eed27faa34e3`` | ``3.0.0`` | remove pickled data from dagrun table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``eed27faa34e3`` | ``9fc3fc5de720`` | ``3.0.0`` | Remove pickled data from xcom table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``9fc3fc5de720`` | ``2b47dc6bc8df`` | ``3.0.0`` | Add references between assets and triggers. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From e69876992a3c4093b01de4b9eec5a61ea3a49c51 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 3 Dec 2024 15:44:33 +0530 Subject: [PATCH 02/16] fix downgrade + add news fragement --- ...0_remove_pickled_data_from_dagrun_table.py | 26 ++++++++++++++++++- airflow/www/views.py | 2 ++ docs/apache-airflow/img/airflow_erd.sha256 | 2 +- newsfragments/44533.significant.rst | 21 +++++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 newsfragments/44533.significant.rst diff --git a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py index 50f6a069c9c59..57813c9c4c3cd 100644 --- a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -86,7 +86,7 @@ def upgrade(): condition_templates = { "postgresql": "get_byte(conf, 0) = 128", "mysql": "HEX(SUBSTRING(conf, 1, 1)) = '80'", - "sqlite": "substr(conf, 1, 1) = char(128)", + "sqlite": "hex(substr(conf, 1, 1)) = '80'", } condition = condition_templates.get(dialect) @@ -155,9 +155,24 @@ def downgrade(): """Unapply Remove pickled data from dagrun table.""" conn = op.get_bind() dialect = conn.dialect.name + inspector = sa.inspect(conn) + + insert_query = text( + """ + INSERT INTO dag_run (dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id) + SELECT dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id + FROM _dag_run_archive + WHERE conf IS NOT NULL; + + """ + ) + + conn.execute(text("DELETE FROM dag_run WHERE conf IS NOT NULL")) # Revert the conf column back to LargeBinary if dialect == "postgresql": + # Delete the pickled data from the dag_run table so that we can update the column type + op.execute( """ ALTER TABLE dag_run @@ -168,11 +183,17 @@ def downgrade(): END """ ) + + if "_dag_run_archive" in inspector.get_table_names(): + conn.execute(insert_query) + elif dialect == "mysql": op.add_column("dag_run", sa.Column("conf_blob", LONGBLOB, nullable=True)) op.execute("UPDATE dag_run SET conf_blob = CAST(conf AS BINARY);") op.drop_column("dag_run", "conf") op.alter_column("dag_run", "conf_blob", existing_type=LONGBLOB, new_column_name="conf") + if "_dag_run_archive" in inspector.get_table_names(): + conn.execute(insert_query) elif dialect == "sqlite": with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -193,3 +214,6 @@ def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_column("conf_old") + + if "_dag_run_archive" in inspector.get_table_names(): + conn.execute(insert_query) diff --git a/airflow/www/views.py b/airflow/www/views.py index 5fc189800f652..d4c44753bffc2 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -4819,6 +4819,8 @@ class DagRunModelView(AirflowModelView): edit_form = DagRunEditForm + add_exclude_columns = edit_exclude_columns = ["conf"] + def duration_f(self): """Duration calculation.""" end_date = self.get("end_date") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 4987e63b9bbed..4df4ec4542119 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -b42b04b6cc47650cb9e7a37258a6e8e99bdca2677253715505b8ad287192bf72 \ No newline at end of file +7b6b9b6b6ed57730b6aff64a1a1e89a3eb560bf848d519b1d3100a80af5c9fea \ No newline at end of file diff --git a/newsfragments/44533.significant.rst b/newsfragments/44533.significant.rst new file mode 100644 index 0000000000000..597e0f2b31c2a --- /dev/null +++ b/newsfragments/44533.significant.rst @@ -0,0 +1,21 @@ +Update conf column in dag_run table type from byte ( that store a python pickle ) to JSON + +.. Provide additional contextual information + +Column conf of the table dag_run is using the type byte ( and storing a python pickle ) on the database , since airflow only support postgres 12+ and mysql 8+ , we updated it to json type . + +.. Check the type of change that applies to this change + +* Types of change + + * [ ] DAG changes + * [ ] Config changes + * [ ] API changes + * [ ] CLI changes + * [x] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency change + +.. List the migration rules needed for this change (see https://github.com/apache/airflow/issues/41641) + +* Migrations rules needed From 18c03fbfabff10c6db3469af2ebe48d2550b68e5 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 3 Dec 2024 17:42:39 +0530 Subject: [PATCH 03/16] remove archive table if exits after downgrade --- .../0050_3_0_0_remove_pickled_data_from_dagrun_table.py | 2 ++ airflow/www/views.py | 2 -- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py index 57813c9c4c3cd..f559a7cca0195 100644 --- a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -217,3 +217,5 @@ def downgrade(): if "_dag_run_archive" in inspector.get_table_names(): conn.execute(insert_query) + + op.execute(sa.text("DROP TABLE IF EXISTS _dag_run_archive")) diff --git a/airflow/www/views.py b/airflow/www/views.py index d4c44753bffc2..5fc189800f652 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -4819,8 +4819,6 @@ class DagRunModelView(AirflowModelView): edit_form = DagRunEditForm - add_exclude_columns = edit_exclude_columns = ["conf"] - def duration_f(self): """Duration calculation.""" end_date = self.get("end_date") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 4df4ec4542119..d6d39316be652 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -7b6b9b6b6ed57730b6aff64a1a1e89a3eb560bf848d519b1d3100a80af5c9fea \ No newline at end of file +d3a0a7c9decf6f3b976596c45a9fb5f548808c71d97dd107f7049a60f6b06335 \ No newline at end of file From a54c4358a5ca2e38773cf129189e9695ac1d3a89 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 4 Dec 2024 17:12:43 +0530 Subject: [PATCH 04/16] removing archiving data --- ...0_remove_pickled_data_from_dagrun_table.py | 113 +----------------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 6 insertions(+), 109 deletions(-) diff --git a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py index f559a7cca0195..9bf48e4d6db37 100644 --- a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -31,7 +31,6 @@ from alembic import op from sqlalchemy import text from sqlalchemy.dialects.mysql import LONGBLOB -from sqlalchemy_utils import UUIDType # revision identifiers, used by Alembic. revision = "e39a26ac59f6" @@ -44,70 +43,14 @@ def upgrade(): """Apply remove pickled data from dagrun table.""" # Summary of the change: - # 1. Create an archived table (`_dag_run_archive`) to store the current "pickled" data in the dag_run table - # 2. Extract and archive the pickled data using the condition - # 3. Delete the pickled data from the dag_run table so that we can update the column type - # 4. Update the dag_run.conf column type to JSON from bytea + # 1. Updating dag_run.conf column value to NULL. + # 2. Update the dag_run.conf column type to JSON from bytea conn = op.get_bind() dialect = conn.dialect.name - # Create an archived table to store the current data - # Create the dag_run table - op.create_table( - "_dag_run_archive", - sa.Column("id", sa.Integer(), nullable=False, primary_key=True, autoincrement=True), - sa.Column("dag_id", sa.String(length=250), nullable=False), - sa.Column("queued_at", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("logical_date", sa.TIMESTAMP(timezone=True), nullable=False), - sa.Column("start_date", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("end_date", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("state", sa.String(length=50), nullable=True), - sa.Column("run_id", sa.String(length=250), nullable=False), - sa.Column("creating_job_id", sa.Integer(), nullable=True), - sa.Column("external_trigger", sa.Boolean(), nullable=True), - sa.Column("run_type", sa.String(length=50), nullable=False), - sa.Column("triggered_by", sa.String(length=50), nullable=True), - sa.Column("conf", sa.LargeBinary().with_variant(LONGBLOB, "mysql"), nullable=True), - sa.Column("data_interval_start", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("data_interval_end", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("last_scheduling_decision", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("log_template_id", sa.Integer(), nullable=True), - sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=True), - sa.Column("clear_number", sa.Integer(), nullable=False, server_default=sa.text("0")), - sa.Column("backfill_id", sa.Integer(), nullable=True), - sa.Column("dag_version_id", UUIDType(binary=False), nullable=True), - sa.PrimaryKeyConstraint("id"), - sa.UniqueConstraint("dag_id", "run_id"), - if_not_exists=True, - ) - - # Condition to detect pickled data for different databases - condition_templates = { - "postgresql": "get_byte(conf, 0) = 128", - "mysql": "HEX(SUBSTRING(conf, 1, 1)) = '80'", - "sqlite": "hex(substr(conf, 1, 1)) = '80'", - } - - condition = condition_templates.get(dialect) - if not condition: - raise RuntimeError(f"Unsupported dialect: {dialect}") - - # Archive pickled data using the condition - conn.execute( - text( - f""" - INSERT INTO _dag_run_archive (dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id) - SELECT dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id - FROM dag_run - WHERE conf IS NOT NULL AND {condition}; - - """ - ) - ) - - # Delete the pickled data from the dag_run table so that we can update the column type - conn.execute(text(f"DELETE FROM dag_run WHERE conf IS NOT NULL AND {condition}")) + # Update the dag_run.conf column value to NULL + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) # Update the conf column type to JSON if dialect == "postgresql": @@ -123,7 +66,6 @@ def upgrade(): ) elif dialect == "mysql": op.add_column("dag_run", sa.Column("conf_json", sa.JSON(), nullable=True)) - op.execute("UPDATE dag_run SET conf_json = CAST(conf AS CHAR CHARACTER SET utf8mb4)") op.drop_column("dag_run", "conf") op.alter_column("dag_run", "conf_json", existing_type=sa.JSON(), new_column_name="conf") elif dialect == "sqlite": @@ -135,17 +77,6 @@ def upgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.add_column(sa.Column("conf", sa.JSON(), nullable=True)) - # Migrate data from `conf_old` to `conf` - conn.execute( - text( - """ - UPDATE dag_run - SET conf = json(CAST(conf_old AS TEXT)) - WHERE conf_old IS NOT NULL - """ - ) - ) - # Drop the old `conf_old` column with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_column("conf_old") @@ -155,24 +86,11 @@ def downgrade(): """Unapply Remove pickled data from dagrun table.""" conn = op.get_bind() dialect = conn.dialect.name - inspector = sa.inspect(conn) - - insert_query = text( - """ - INSERT INTO dag_run (dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id) - SELECT dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id - FROM _dag_run_archive - WHERE conf IS NOT NULL; - - """ - ) - conn.execute(text("DELETE FROM dag_run WHERE conf IS NOT NULL")) + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) # Revert the conf column back to LargeBinary if dialect == "postgresql": - # Delete the pickled data from the dag_run table so that we can update the column type - op.execute( """ ALTER TABLE dag_run @@ -184,16 +102,10 @@ def downgrade(): """ ) - if "_dag_run_archive" in inspector.get_table_names(): - conn.execute(insert_query) - elif dialect == "mysql": op.add_column("dag_run", sa.Column("conf_blob", LONGBLOB, nullable=True)) - op.execute("UPDATE dag_run SET conf_blob = CAST(conf AS BINARY);") op.drop_column("dag_run", "conf") op.alter_column("dag_run", "conf_blob", existing_type=LONGBLOB, new_column_name="conf") - if "_dag_run_archive" in inspector.get_table_names(): - conn.execute(insert_query) elif dialect == "sqlite": with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -202,20 +114,5 @@ def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.add_column(sa.Column("conf", sa.LargeBinary, nullable=True)) - conn.execute( - text( - """ - UPDATE dag_run - SET conf = CAST(conf_old AS BLOB) - WHERE conf IS NOT NULL - """ - ) - ) - with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_column("conf_old") - - if "_dag_run_archive" in inspector.get_table_names(): - conn.execute(insert_query) - - op.execute(sa.text("DROP TABLE IF EXISTS _dag_run_archive")) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index d6d39316be652..0802f9d65590a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -d3a0a7c9decf6f3b976596c45a9fb5f548808c71d97dd107f7049a60f6b06335 \ No newline at end of file +70f79abb590d0e71cda51088f60cee69814ba7e41bb321230e05356e2789d9c1 \ No newline at end of file From 8b88d633329e1c2606e4f5825284ef6225b9cb8a Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 10 Dec 2024 16:24:00 +0530 Subject: [PATCH 05/16] fixing static check --- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index e1bb8bb19319a..4282d080762f4 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -95b59583f58fa1dad1ae07cac699b0a72143abae9beaa33f79c3ff0f24e52c7d \ No newline at end of file +f1d78ff4cc6cbb10e23c96e67d90da3655dc908057442d2d259a563eb75ef2a7 From c2e0d096c5b8fc92e52b20c71e46d324cb8bda56 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 10 Dec 2024 17:27:23 +0530 Subject: [PATCH 06/16] fixing static checks --- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 4282d080762f4..341f5e03d2b89 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -f1d78ff4cc6cbb10e23c96e67d90da3655dc908057442d2d259a563eb75ef2a7 +f1d78ff4cc6cbb10e23c96e67d90da3655dc908057442d2d259a563eb75ef2a7 \ No newline at end of file From d27b7edf47c78159ef87df824eda6529e46ca587 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 13 Dec 2024 21:00:42 +0530 Subject: [PATCH 07/16] simplying upgrade and downgrade as per review --- ...0_remove_pickled_data_from_dagrun_table.py | 54 ++++++------------- airflow/models/dagrun.py | 6 +-- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2 +- 4 files changed, 21 insertions(+), 43 deletions(-) diff --git a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py index df0bebc9444c7..70f6937d674db 100644 --- a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -42,54 +42,32 @@ def upgrade(): """Apply remove pickled data from dagrun table.""" - # Summary of the change: - # 1. Updating dag_run.conf column value to NULL. - # 2. Update the dag_run.conf column type to JSON from bytea - conn = op.get_bind() - dialect = conn.dialect.name # Update the dag_run.conf column value to NULL conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) - if dialect == "postgresql": - op.alter_column( - "dag_run", - "conf", - existing_type=sa.dialects.postgresql.BYTEA, - type_=postgresql.JSONB, - existing_nullable=True, - ) - - else: - op.alter_column( - "dag_run", - "conf", - existing_type=sa.LargeBinary, - type_=sa.JSON(), - existing_nullable=True, - ) + conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") + + with op.batch_alter_table("dag_run", schema=None) as batch_op: + # Drop the existing column for SQLite (due to its limitations) + batch_op.drop_column("conf") + # Add the new column with the correct type for the all dialect + batch_op.add_column(sa.Column("conf", conf_type, nullable=True)) def downgrade(): """Unapply Remove pickled data from dagrun table.""" conn = op.get_bind() - dialect = conn.dialect.name + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) + # Update the dag_run.conf column value to NULL to avoid issues during the type change conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) - if dialect == "postgresql": - op.alter_column( - "dag_run", - "conf", - existing_type=postgresql.JSONB, - type_=sa.dialects.postgresql.BYTEA, - postgresql_using="encode(conf::TEXT, 'escape')", - ) - else: - op.alter_column( - "dag_run", - "conf", - existing_type=sa.JSON(), - type_=sa.LargeBinary, - ) + conf_type = sa.LargeBinary().with_variant(postgresql.BYTEA, "postgresql") + + # Apply the same logic for all dialects, including SQLite + with op.batch_alter_table("dag_run", schema=None) as batch_op: + # Drop the existing column for SQLite (due to its limitations) + batch_op.drop_column("conf") + batch_op.add_column(sa.Column("conf", conf_type, nullable=True)) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 563a4e33f3da3..fbbc2eab32d0b 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -25,7 +25,6 @@ import re2 from sqlalchemy import ( - JSON, Boolean, Column, Enum, @@ -33,6 +32,7 @@ ForeignKeyConstraint, Index, Integer, + PickleType, PrimaryKeyConstraint, String, Text, @@ -44,7 +44,6 @@ text, update, ) -from sqlalchemy.dialects import postgresql from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates @@ -138,7 +137,8 @@ class DagRun(Base, LoggingMixin): triggered_by = Column( Enum(DagRunTriggeredByType, native_enum=False, length=50) ) # Airflow component that triggered the run. - conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) + # conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) + conf = Column(PickleType) # These two must be either both NULL or both datetime. data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8acfcb299a33a..268e864c231e7 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -2b58fe5fb405b3b4bfc4b438a54572aa5663a9fc54b789f499f821321f1efc58 \ No newline at end of file +3c7e69f4098d4078941590dd3a21b78473e8586683595a592dafc08decf4d534 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 2c6adaf39f90a..bdbd97db8efc8 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1771,7 +1771,7 @@ conf - [JSONB] + [BYTEA] creating_job_id From 940670244c4fcf6243233e1408c8112674aef7d2 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 13 Dec 2024 22:19:07 +0530 Subject: [PATCH 08/16] fixing failures --- airflow/models/dagrun.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index fbbc2eab32d0b..563a4e33f3da3 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -25,6 +25,7 @@ import re2 from sqlalchemy import ( + JSON, Boolean, Column, Enum, @@ -32,7 +33,6 @@ ForeignKeyConstraint, Index, Integer, - PickleType, PrimaryKeyConstraint, String, Text, @@ -44,6 +44,7 @@ text, update, ) +from sqlalchemy.dialects import postgresql from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates @@ -137,8 +138,7 @@ class DagRun(Base, LoggingMixin): triggered_by = Column( Enum(DagRunTriggeredByType, native_enum=False, length=50) ) # Airflow component that triggered the run. - # conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) - conf = Column(PickleType) + conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) # These two must be either both NULL or both datetime. data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) From 22a5dbac64b5844dc4f2a4b431f25ae96ffc2956 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Sat, 14 Dec 2024 08:22:50 +0530 Subject: [PATCH 09/16] removing setting conf to null --- ...52_3_0_0_remove_pickled_data_from_dagrun_table.py | 12 ------------ docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2 +- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py index 70f6937d674db..ad5168c069ceb 100644 --- a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy import text from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. @@ -42,11 +41,6 @@ def upgrade(): """Apply remove pickled data from dagrun table.""" - conn = op.get_bind() - - # Update the dag_run.conf column value to NULL - conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) - conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -58,12 +52,6 @@ def upgrade(): def downgrade(): """Unapply Remove pickled data from dagrun table.""" - conn = op.get_bind() - conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) - - # Update the dag_run.conf column value to NULL to avoid issues during the type change - conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) - conf_type = sa.LargeBinary().with_variant(postgresql.BYTEA, "postgresql") # Apply the same logic for all dialects, including SQLite diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 6ee5972cc1624..feeaefb5df311 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -ccb8ef5583b2a6b3ee3ab4212139c112b92953675655010a6775fffb4945b206 \ No newline at end of file +36ed3adccc0986daa8f13307996a3fc6dcae23a55a1b6c4935f507723bf6b31a \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 9c37f5c320686..9af32a9296781 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1723,7 +1723,7 @@ conf - [BYTEA] + [JSONB] creating_job_id From 70ce5dfad2dfacaec42387a8c306567b82843a52 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 17 Dec 2024 00:14:12 +0530 Subject: [PATCH 10/16] refactor approach to migrate values in conf --- ...0_remove_pickled_data_from_dagrun_table.py | 111 ++++++++++++++++-- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 101 insertions(+), 12 deletions(-) diff --git a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py index ad5168c069ceb..1b6c2823f6869 100644 --- a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -27,8 +27,13 @@ from __future__ import annotations +import json +import pickle +from textwrap import dedent + import sqlalchemy as sa -from alembic import op +from alembic import context, op +from sqlalchemy import text from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. @@ -41,21 +46,105 @@ def upgrade(): """Apply remove pickled data from dagrun table.""" + conn = op.get_bind() conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") + op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True)) + + if context.is_offline_mode(): + # Update the dag_run.conf column value to NULL + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate data of conf column in offline mode! + -- Column conf will be set to NULL in offline mode!. + -- DO not use Offline mode if data if you want to keep conf values. + ------------ + """) + ) + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) + else: + BATCH_SIZE = 2 + total_processed = 0 + offset = 0 + while True: + rows = conn.execute( + text( + f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" + ) + ).fetchall() + if not rows: + break + for row in rows: + row_id, pickle_data = row + + try: + original_data = pickle.loads(pickle_data) + json_data = json.dumps(original_data) + conn.execute(text(f"UPDATE dag_run SET conf_json ='{json_data}' WHERE id = {row_id}")) + except Exception as e: + print(f"Error processing row ID {row_id}: {e}") + continue + offset += BATCH_SIZE + + # Update total processed count + total_processed += len(rows) + + op.drop_column("dag_run", "conf") - with op.batch_alter_table("dag_run", schema=None) as batch_op: - # Drop the existing column for SQLite (due to its limitations) - batch_op.drop_column("conf") - # Add the new column with the correct type for the all dialect - batch_op.add_column(sa.Column("conf", conf_type, nullable=True)) + op.alter_column("dag_run", "conf_json", existing_type=conf_type, new_column_name="conf") def downgrade(): """Unapply Remove pickled data from dagrun table.""" + conn = op.get_bind() conf_type = sa.LargeBinary().with_variant(postgresql.BYTEA, "postgresql") + op.add_column("dag_run", sa.Column("conf_pickle", conf_type, nullable=True)) + + if context.is_offline_mode(): + # Update the dag_run.conf column value to NULL + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate data of conf column in offline mode! + -- Column conf will be set to NULL in offline mode!. + -- DO not use Offline mode if data if you want to keep conf values. + ------------ + """) + ) + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) + else: + BATCH_SIZE = 2 + total_processed = 0 + offset = 0 + while True: + rows = conn.execute( + text( + f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" + ) + ).fetchall() + if not rows: + break # + for row in rows: + row_id, json_data = row + + try: + pickled_data = pickle.dumps(json_data, protocol=pickle.HIGHEST_PROTOCOL) + conn.execute( + text(""" + UPDATE dag_run + SET conf_pickle = :pickle_data + WHERE id = :id + """), + {"pickle_data": pickled_data, "id": row_id}, + ) + except Exception as e: + print(f"Error processing row ID {row_id}: {e}") + continue + offset += BATCH_SIZE + + # Update total processed count + total_processed += len(rows) + + op.drop_column("dag_run", "conf") - # Apply the same logic for all dialects, including SQLite - with op.batch_alter_table("dag_run", schema=None) as batch_op: - # Drop the existing column for SQLite (due to its limitations) - batch_op.drop_column("conf") - batch_op.add_column(sa.Column("conf", conf_type, nullable=True)) + op.alter_column("dag_run", "conf_pickle", existing_type=conf_type, new_column_name="conf") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index feeaefb5df311..f7e38505150b4 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -36ed3adccc0986daa8f13307996a3fc6dcae23a55a1b6c4935f507723bf6b31a \ No newline at end of file +45416198ffd7571cf11293b7f5e165236ccc1eceea672a009e143cc39820089e \ No newline at end of file From cce341a07c9d5bedff37ea562eca9a13272def5b Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 17 Dec 2024 11:31:52 +0530 Subject: [PATCH 11/16] update offline warning --- ...0_remove_pickled_data_from_dagrun_table.py | 36 ++++++++----------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py index 1b6c2823f6869..e62f1fdde7249 100644 --- a/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0052_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -54,17 +54,17 @@ def upgrade(): # Update the dag_run.conf column value to NULL print( dedent(""" - ------------ - -- WARNING: Unable to migrate data of conf column in offline mode! - -- Column conf will be set to NULL in offline mode!. - -- DO not use Offline mode if data if you want to keep conf values. - ------------ - """) + ------------ + -- WARNING: Unable to migrate the data in the 'conf' column while in offline mode! + -- The 'conf' column will be set to NULL in offline mode. + -- Avoid using offline mode if you need to retain 'conf' values. + ------------ + """) ) + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) else: BATCH_SIZE = 2 - total_processed = 0 offset = 0 while True: rows = conn.execute( @@ -86,9 +86,6 @@ def upgrade(): continue offset += BATCH_SIZE - # Update total processed count - total_processed += len(rows) - op.drop_column("dag_run", "conf") op.alter_column("dag_run", "conf_json", existing_type=conf_type, new_column_name="conf") @@ -104,17 +101,17 @@ def downgrade(): # Update the dag_run.conf column value to NULL print( dedent(""" - ------------ - -- WARNING: Unable to migrate data of conf column in offline mode! - -- Column conf will be set to NULL in offline mode!. - -- DO not use Offline mode if data if you want to keep conf values. - ------------ - """) + ------------ + -- WARNING: Unable to migrate the data in the 'conf' column while in offline mode! + -- The 'conf' column will be set to NULL in offline mode. + -- Avoid using offline mode if you need to retain 'conf' values. + ------------ + """) ) + conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) else: BATCH_SIZE = 2 - total_processed = 0 offset = 0 while True: rows = conn.execute( @@ -123,7 +120,7 @@ def downgrade(): ) ).fetchall() if not rows: - break # + break for row in rows: row_id, json_data = row @@ -142,9 +139,6 @@ def downgrade(): continue offset += BATCH_SIZE - # Update total processed count - total_processed += len(rows) - op.drop_column("dag_run", "conf") op.alter_column("dag_run", "conf_pickle", existing_type=conf_type, new_column_name="conf") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index f7e38505150b4..198128196b645 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -45416198ffd7571cf11293b7f5e165236ccc1eceea672a009e143cc39820089e \ No newline at end of file +998df061dfd42deca983b9569fdf57025a18f1fdd2d9be3a331318c34a0bb281 \ No newline at end of file From 8261e0565e39207bea289dc75bf2f466a8c3904f Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 31 Dec 2024 21:27:08 +0530 Subject: [PATCH 12/16] resolving conflicts --- .../0055_3_0_0_remove_pickled_data_from_dagrun_table.py | 4 +--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- newsfragments/44533.significant.rst | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py index b55b2a3df6409..c3a6dcb38421a 100644 --- a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -61,8 +61,6 @@ def upgrade(): ------------ """) ) - - conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) else: BATCH_SIZE = 2 offset = 0 @@ -82,7 +80,7 @@ def upgrade(): json_data = json.dumps(original_data) conn.execute(text(f"UPDATE dag_run SET conf_json ='{json_data}' WHERE id = {row_id}")) except Exception as e: - print(f"Error processing row ID {row_id}: {e}") + print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}") continue offset += BATCH_SIZE diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index f364188af58f8..bf0ab8976ad7a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -ec9031db4552d850565a983df9af27307885b074663d98192567b20cfeb7533e \ No newline at end of file +eb062a932fc314f038a64dddc1d1dead14706259f06f4cf1a017b1739fe79c8c \ No newline at end of file diff --git a/newsfragments/44533.significant.rst b/newsfragments/44533.significant.rst index 597e0f2b31c2a..070730fb3a874 100644 --- a/newsfragments/44533.significant.rst +++ b/newsfragments/44533.significant.rst @@ -1,4 +1,4 @@ -Update conf column in dag_run table type from byte ( that store a python pickle ) to JSON +Update conf column in dag_run table type from byte ( that store a python pickle ) to JSON. It is important to note that existing dagrun records will lose their conf data if an offline migration is performed .. Provide additional contextual information From 7fb6509fa27076afb68f2ff2c382a192793f8883 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 31 Dec 2024 22:20:09 +0530 Subject: [PATCH 13/16] resolving conflicts --- ...0_remove_pickled_data_from_dagrun_table.py | 12 ++++++++--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- newsfragments/44533.significant.rst | 20 ++----------------- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py index c3a6dcb38421a..e068943489e26 100644 --- a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -78,7 +78,14 @@ def upgrade(): try: original_data = pickle.loads(pickle_data) json_data = json.dumps(original_data) - conn.execute(text(f"UPDATE dag_run SET conf_json ='{json_data}' WHERE id = {row_id}")) + conn.execute( + text(""" + UPDATE dag_run + SET conf_json = :json_data + WHERE id = :id + """), + {"json_data": json_data, "id": row_id}, + ) except Exception as e: print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}") continue @@ -107,7 +114,6 @@ def downgrade(): """) ) - conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) else: BATCH_SIZE = 2 offset = 0 @@ -133,7 +139,7 @@ def downgrade(): {"pickle_data": pickled_data, "id": row_id}, ) except Exception as e: - print(f"Error processing row ID {row_id}: {e}") + print(f"Error pickling dagrun conf for dagrun ID {row_id}: {e}") continue offset += BATCH_SIZE diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index bf0ab8976ad7a..94cce915e5ad3 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -eb062a932fc314f038a64dddc1d1dead14706259f06f4cf1a017b1739fe79c8c \ No newline at end of file +d899fd893119389509f4798e730b580f5bba1921f120e0010108444dd44226eb \ No newline at end of file diff --git a/newsfragments/44533.significant.rst b/newsfragments/44533.significant.rst index 070730fb3a874..55619c244f5ef 100644 --- a/newsfragments/44533.significant.rst +++ b/newsfragments/44533.significant.rst @@ -1,21 +1,5 @@ -Update conf column in dag_run table type from byte ( that store a python pickle ) to JSON. It is important to note that existing dagrun records will lose their conf data if an offline migration is performed +During offline migration, ``DagRun.conf`` is cleared .. Provide additional contextual information -Column conf of the table dag_run is using the type byte ( and storing a python pickle ) on the database , since airflow only support postgres 12+ and mysql 8+ , we updated it to json type . - -.. Check the type of change that applies to this change - -* Types of change - - * [ ] DAG changes - * [ ] Config changes - * [ ] API changes - * [ ] CLI changes - * [x] Behaviour changes - * [ ] Plugin changes - * [ ] Dependency change - -.. List the migration rules needed for this change (see https://github.com/apache/airflow/issues/41641) - -* Migrations rules needed +The ``conf`` column is changing from pickle to json, thus, the values in that column cannot be migrated during offline migrations. If you want to retain ``conf`` values for existing DagRuns, you must do a normal, non-offline, migration. From 6b23d8a1676f00e39bb1c62a6b0a294ba287a9b6 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 31 Dec 2024 22:24:06 +0530 Subject: [PATCH 14/16] resolving conflicts --- .../0055_3_0_0_remove_pickled_data_from_dagrun_table.py | 2 -- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py index e068943489e26..790236ab271d5 100644 --- a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -51,7 +51,6 @@ def upgrade(): op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True)) if context.is_offline_mode(): - # Update the dag_run.conf column value to NULL print( dedent(""" ------------ @@ -103,7 +102,6 @@ def downgrade(): op.add_column("dag_run", sa.Column("conf_pickle", conf_type, nullable=True)) if context.is_offline_mode(): - # Update the dag_run.conf column value to NULL print( dedent(""" ------------ diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 94cce915e5ad3..4a4a39fee7725 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -d899fd893119389509f4798e730b580f5bba1921f120e0010108444dd44226eb \ No newline at end of file +a78bedd5d2f54614a920db77044f6013b86de9792ee8fafc6a7e3f30fbd4c602 \ No newline at end of file From 2b91c124151ce0e74f6f657a550f0c1829955459 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 7 Jan 2025 20:54:52 +0530 Subject: [PATCH 15/16] updating batch size --- .../0055_3_0_0_remove_pickled_data_from_dagrun_table.py | 4 ++-- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py index 790236ab271d5..b9e17cf30331f 100644 --- a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -61,7 +61,7 @@ def upgrade(): """) ) else: - BATCH_SIZE = 2 + BATCH_SIZE = 100 offset = 0 while True: rows = conn.execute( @@ -113,7 +113,7 @@ def downgrade(): ) else: - BATCH_SIZE = 2 + BATCH_SIZE = 100 offset = 0 while True: rows = conn.execute( diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 4a4a39fee7725..a38eb228aab56 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -a78bedd5d2f54614a920db77044f6013b86de9792ee8fafc6a7e3f30fbd4c602 \ No newline at end of file +6c69ef5b1d861062cb00a1b9dd0b134608af079b5831af4212c364275062c891 \ No newline at end of file From a8ec33b8dbb79e72dad15024dc5e1cbf48ad9dae Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 7 Jan 2025 21:47:21 +0530 Subject: [PATCH 16/16] updaing conf type --- .../0055_3_0_0_remove_pickled_data_from_dagrun_table.py | 5 ++--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py index b9e17cf30331f..07d012ddf5719 100644 --- a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -98,8 +98,7 @@ def upgrade(): def downgrade(): """Unapply Remove pickled data from dagrun table.""" conn = op.get_bind() - conf_type = sa.LargeBinary().with_variant(postgresql.BYTEA, "postgresql") - op.add_column("dag_run", sa.Column("conf_pickle", conf_type, nullable=True)) + op.add_column("dag_run", sa.Column("conf_pickle", sa.PickleType(), nullable=True)) if context.is_offline_mode(): print( @@ -143,4 +142,4 @@ def downgrade(): op.drop_column("dag_run", "conf") - op.alter_column("dag_run", "conf_pickle", existing_type=conf_type, new_column_name="conf") + op.alter_column("dag_run", "conf_pickle", existing_type=sa.PickleType(), new_column_name="conf") diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index a38eb228aab56..3616222fd880c 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -6c69ef5b1d861062cb00a1b9dd0b134608af079b5831af4212c364275062c891 \ No newline at end of file +ca59d711e6304f8bfdb25f49339d455602430dd6b880e420869fc892faef0596 \ No newline at end of file