From 3f831daebf743d2bcd602114b1b5692879a75889 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 13 Nov 2024 07:26:16 +0100 Subject: [PATCH 1/7] Update dag reserialize command Now that we have versioning, users must be sure they want to use the dags reserialize command, as it deletes dag history. I updated the command to ensure users know it will delete DAG history and answer yes for it to continue. The DagVersion is deleted and since it has foreignkey to SerializedDagModel and DagCode, those will also get deleted. Also updated the _reserialize function at DB upgrade so that it doesn't delete the serializedDag since that won't be necessary Updated the test to use session fixture instead of create_session --- airflow/cli/cli_config.py | 1 + airflow/cli/commands/dag_command.py | 7 ++++++- airflow/utils/db.py | 3 --- tests/cli/commands/test_dag_command.py | 29 ++++++++++++-------------- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 780cf20b95671..e2df71e0ad18f 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1254,6 +1254,7 @@ class GroupCommand(NamedTuple): ARG_CLEAR_ONLY, ARG_SUBDIR, ARG_VERBOSE, + ARG_YES, ), ), ) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 0b4d3192d6eda..f26f9ef9bbdf7 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -36,6 +36,7 @@ from airflow.exceptions import AirflowException from airflow.jobs.job import Job from airflow.models import DagBag, DagModel, DagRun, TaskInstance +from airflow.models.dag_version import DagVersion from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import get_dag, process_subdir, suppress_logs_and_warning @@ -537,7 +538,11 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No @provide_session def dag_reserialize(args, session: Session = NEW_SESSION) -> None: """Serialize a DAG instance.""" - session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False)) + if not ( + args.yes or input("This will clean out all DAG versioning history. Proceed? (y/n)").upper() == "Y" + ): + raise SystemExit("Cancelled") + session.execute(delete(DagVersion).execution_options(synchronize_session=False)) if not args.clear_only: dagbag = DagBag(process_subdir(args.subdir)) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 3c657fd447fd1..8b348034f6c49 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -41,7 +41,6 @@ import attrs from sqlalchemy import ( Table, - delete, exc, func, inspect, @@ -924,9 +923,7 @@ def check_and_run_migrations(): def _reserialize_dags(*, session: Session) -> None: from airflow.models.dagbag import DagBag - from airflow.models.serialized_dag import SerializedDagModel - session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False)) dagbag = DagBag(collect_dags=False) dagbag.collect_dags(only_if_updated=False) dagbag.sync_to_db(session=session) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 7b9dd1e63bd18..1a8b9f3cc1bf2 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -79,34 +79,30 @@ def teardown_class(cls) -> None: def setup_method(self): clear_db_runs() # clean-up all dag run before start each test - def test_reserialize(self): + def test_reserialize(self, session): # Assert that there are serialized Dags - with create_session() as session: - serialized_dags_before_command = session.query(SerializedDagModel).all() + serialized_dags_before_command = session.query(SerializedDagModel).all() assert len(serialized_dags_before_command) # There are serialized DAGs to delete # Run clear of serialized dags - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"])) + dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only", "--yes"])) # Assert no serialized Dags - with create_session() as session: - serialized_dags_after_clear = session.query(SerializedDagModel).all() + serialized_dags_after_clear = session.query(SerializedDagModel).all() assert not len(serialized_dags_after_clear) # Serialize manually - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"])) + dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--yes"])) # Check serialized DAGs are back - with create_session() as session: - serialized_dags_after_reserialize = session.query(SerializedDagModel).all() + serialized_dags_after_reserialize = session.query(SerializedDagModel).all() assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back - def test_reserialize_should_support_subdir_argument(self): + def test_reserialize_should_support_subdir_argument(self, session): # Run clear of serialized dags - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"])) + dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only", "--yes"])) # Assert no serialized Dags - with create_session() as session: - serialized_dags_after_clear = session.query(SerializedDagModel).all() + serialized_dags_after_clear = session.query(SerializedDagModel).all() assert len(serialized_dags_after_clear) == 0 # Serialize manually @@ -117,11 +113,12 @@ def test_reserialize_should_support_subdir_argument(self): with mock.patch( "airflow.cli.commands.dag_command.DagBag.__init__.__defaults__", tuple(dagbag_default) ): - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path])) + dag_command.dag_reserialize( + self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path, "--yes"]) + ) # Check serialized DAG are back - with create_session() as session: - serialized_dags_after_reserialize = session.query(SerializedDagModel).all() + serialized_dags_after_reserialize = session.query(SerializedDagModel).all() assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG back def test_show_dag_dependencies_print(self): From 9706207fcd6fc859d77ec6d38c2c06ee82ba7f43 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 14 Nov 2024 13:43:56 +0100 Subject: [PATCH 2/7] add --clear-history to dag reserialize command --- airflow/cli/cli_config.py | 9 ++++---- airflow/cli/commands/dag_command.py | 12 ++++------- tests/cli/commands/test_dag_command.py | 30 +++++++++++++++++--------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index e2df71e0ad18f..5586b716be13a 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -968,10 +968,10 @@ def string_lower_type(val): ) # reserialize -ARG_CLEAR_ONLY = Arg( - ("--clear-only",), +ARG_CLEAR_HISTORY = Arg( + ("--clear-history",), action="store_true", - help="If passed, serialized DAGs will be cleared but not reserialized.", + help="If passed, DAGs history will be cleared.", ) ARG_DAG_LIST_COLUMNS = Arg( @@ -1251,10 +1251,9 @@ class GroupCommand(NamedTuple): ), func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"), args=( - ARG_CLEAR_ONLY, + ARG_CLEAR_HISTORY, ARG_SUBDIR, ARG_VERBOSE, - ARG_YES, ), ), ) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index f26f9ef9bbdf7..34248e35a4900 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -538,12 +538,8 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No @provide_session def dag_reserialize(args, session: Session = NEW_SESSION) -> None: """Serialize a DAG instance.""" - if not ( - args.yes or input("This will clean out all DAG versioning history. Proceed? (y/n)").upper() == "Y" - ): - raise SystemExit("Cancelled") - session.execute(delete(DagVersion).execution_options(synchronize_session=False)) + if args.clear_history: + session.execute(delete(DagVersion).execution_options(synchronize_session=False)) - if not args.clear_only: - dagbag = DagBag(process_subdir(args.subdir)) - dagbag.sync_to_db(session=session) + dagbag = DagBag(process_subdir(args.subdir)) + dagbag.sync_to_db(session=session) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 1a8b9f3cc1bf2..60888204db5dc 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -39,6 +39,7 @@ from airflow.models import DagBag, DagModel, DagRun from airflow.models.baseoperator import BaseOperator from airflow.models.dag import _run_inline_trigger +from airflow.models.dag_version import DagVersion from airflow.models.serialized_dag import SerializedDagModel from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger @@ -83,23 +84,34 @@ def test_reserialize(self, session): # Assert that there are serialized Dags serialized_dags_before_command = session.query(SerializedDagModel).all() assert len(serialized_dags_before_command) # There are serialized DAGs to delete + # delete all versioning + session.query(DagVersion).delete() - # Run clear of serialized dags - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only", "--yes"])) - # Assert no serialized Dags + serialized_dags_before_command = session.query(SerializedDagModel).all() + assert not len(serialized_dags_before_command) # There are no more serialized dag + dag_version_before_command = session.query(DagVersion).all() + assert not len(dag_version_before_command) + # Serialize the dags + dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"])) + # Assert serialized Dags serialized_dags_after_clear = session.query(SerializedDagModel).all() - assert not len(serialized_dags_after_clear) + assert len(serialized_dags_after_clear) + dag_version_after_command = session.query(DagVersion).all() + assert len(dag_version_after_command) - # Serialize manually - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--yes"])) + # Serialize the dag with clear history + dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-history"])) # Check serialized DAGs are back + dagv = session.query(DagVersion).all() + assert len(dagv) serialized_dags_after_reserialize = session.query(SerializedDagModel).all() assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back + assert len(dagv) == len(serialized_dags_after_reserialize) def test_reserialize_should_support_subdir_argument(self, session): # Run clear of serialized dags - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only", "--yes"])) + session.query(DagVersion).delete() # Assert no serialized Dags serialized_dags_after_clear = session.query(SerializedDagModel).all() @@ -113,9 +125,7 @@ def test_reserialize_should_support_subdir_argument(self, session): with mock.patch( "airflow.cli.commands.dag_command.DagBag.__init__.__defaults__", tuple(dagbag_default) ): - dag_command.dag_reserialize( - self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path, "--yes"]) - ) + dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path])) # Check serialized DAG are back serialized_dags_after_reserialize = session.query(SerializedDagModel).all() From 3111c065b84d1bf81e22308f19382f5dc6d449e5 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 14 Nov 2024 13:58:24 +0100 Subject: [PATCH 3/7] Add news fragment item --- newsfragments/43949.significant.rst | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 newsfragments/43949.significant.rst diff --git a/newsfragments/43949.significant.rst b/newsfragments/43949.significant.rst new file mode 100644 index 0000000000000..26b2cba081a47 --- /dev/null +++ b/newsfragments/43949.significant.rst @@ -0,0 +1,5 @@ +The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed. + +The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them. +This option is now removed as it is no longer needed. By default, the command only reserializes dags. If you want +to clear dags serialization history, you can use the ``--clear-history`` option. From 6da8d37b5df27551bbae38f2f30001b13893e245 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 19 Nov 2024 10:28:58 +0100 Subject: [PATCH 4/7] Remove clear-history --- airflow/cli/cli_config.py | 17 +++++------------ airflow/cli/commands/dag_command.py | 6 +----- newsfragments/43949.significant.rst | 3 +-- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 5586b716be13a..b6c1dc603e075 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -967,13 +967,6 @@ def string_lower_type(val): help="The maximum number of triggers that a Triggerer will run at one time.", ) -# reserialize -ARG_CLEAR_HISTORY = Arg( - ("--clear-history",), - action="store_true", - help="If passed, DAGs history will be cleared.", -) - ARG_DAG_LIST_COLUMNS = Arg( ("--columns",), type=string_list_type, @@ -1243,15 +1236,15 @@ class GroupCommand(NamedTuple): ), ActionCommand( name="reserialize", - help="Reserialize all DAGs by parsing the DagBag files", + help="Re-version DAGs by parsing the DagBag files", description=( - "Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized " - "from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the " - "version of Airflow that you are running." + "Manually initiate re-versioning of DAGs. Airflow will detect any changes in " + "your DAG's structure and re-version those that have been modified. This can be " + "particularly useful if your serialized DAGs become out of sync with the Airflow " + "version you are using." ), func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"), args=( - ARG_CLEAR_HISTORY, ARG_SUBDIR, ARG_VERBOSE, ), diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 34248e35a4900..669a075a6db2c 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -28,7 +28,7 @@ from typing import TYPE_CHECKING import re2 -from sqlalchemy import delete, select +from sqlalchemy import select from airflow.api.client import get_current_api_client from airflow.api_connexion.schemas.dag_schema import dag_schema @@ -36,7 +36,6 @@ from airflow.exceptions import AirflowException from airflow.jobs.job import Job from airflow.models import DagBag, DagModel, DagRun, TaskInstance -from airflow.models.dag_version import DagVersion from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import get_dag, process_subdir, suppress_logs_and_warning @@ -538,8 +537,5 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No @provide_session def dag_reserialize(args, session: Session = NEW_SESSION) -> None: """Serialize a DAG instance.""" - if args.clear_history: - session.execute(delete(DagVersion).execution_options(synchronize_session=False)) - dagbag = DagBag(process_subdir(args.subdir)) dagbag.sync_to_db(session=session) diff --git a/newsfragments/43949.significant.rst b/newsfragments/43949.significant.rst index 26b2cba081a47..44e9e59b99ec6 100644 --- a/newsfragments/43949.significant.rst +++ b/newsfragments/43949.significant.rst @@ -1,5 +1,4 @@ The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed. The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them. -This option is now removed as it is no longer needed. By default, the command only reserializes dags. If you want -to clear dags serialization history, you can use the ``--clear-history`` option. +This option is now removed as it is no longer needed. The command is now only for re-versioning DAGs. From c1d6d63bd5c4eb981df3e8704e64021d477adafc Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 19 Nov 2024 12:05:35 +0100 Subject: [PATCH 5/7] fix test --- tests/cli/commands/test_dag_command.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 60888204db5dc..1932321b5fcb2 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -99,16 +99,6 @@ def test_reserialize(self, session): dag_version_after_command = session.query(DagVersion).all() assert len(dag_version_after_command) - # Serialize the dag with clear history - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-history"])) - - # Check serialized DAGs are back - dagv = session.query(DagVersion).all() - assert len(dagv) - serialized_dags_after_reserialize = session.query(SerializedDagModel).all() - assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back - assert len(dagv) == len(serialized_dags_after_reserialize) - def test_reserialize_should_support_subdir_argument(self, session): # Run clear of serialized dags session.query(DagVersion).delete() From 967c70650c856627c3c3e19192e6a367f6fab88d Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 20 Nov 2024 09:25:44 +0100 Subject: [PATCH 6/7] fixup! fix test --- airflow/cli/cli_config.py | 5 ++--- newsfragments/43949.significant.rst | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index b6c1dc603e075..5611004f83dff 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1236,10 +1236,9 @@ class GroupCommand(NamedTuple): ), ActionCommand( name="reserialize", - help="Re-version DAGs by parsing the DagBag files", + help="Reserialize DAGs by parsing the DagBag files", description=( - "Manually initiate re-versioning of DAGs. Airflow will detect any changes in " - "your DAG's structure and re-version those that have been modified. This can be " + "Reserialize DAGs in the metadata DB. This can be " "particularly useful if your serialized DAGs become out of sync with the Airflow " "version you are using." ), diff --git a/newsfragments/43949.significant.rst b/newsfragments/43949.significant.rst index 44e9e59b99ec6..6e19f22228444 100644 --- a/newsfragments/43949.significant.rst +++ b/newsfragments/43949.significant.rst @@ -1,4 +1,4 @@ The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed. The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them. -This option is now removed as it is no longer needed. The command is now only for re-versioning DAGs. +This option is now removed as it is no longer needed. The command is now only for reserializing DAGs. From 34842af7f03b7bdbd54997693d4b833d926ba98a Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Sun, 24 Nov 2024 09:02:09 +0100 Subject: [PATCH 7/7] Update newsfragments/43949.significant.rst --- newsfragments/43949.significant.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/newsfragments/43949.significant.rst b/newsfragments/43949.significant.rst index 6e19f22228444..2d1cf53797e5b 100644 --- a/newsfragments/43949.significant.rst +++ b/newsfragments/43949.significant.rst @@ -1,4 +1,5 @@ The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed. The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them. -This option is now removed as it is no longer needed. The command is now only for reserializing DAGs. +This option has been removed as it is no longer needed. We have implemented DAG versioning and can +no longer delete serialized dag without going through ``airflow db-clean`` command. This command is now only for reserializing DAGs.