diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 780cf20b95671..5611004f83dff 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -967,13 +967,6 @@ def string_lower_type(val): help="The maximum number of triggers that a Triggerer will run at one time.", ) -# reserialize -ARG_CLEAR_ONLY = Arg( - ("--clear-only",), - action="store_true", - help="If passed, serialized DAGs will be cleared but not reserialized.", -) - ARG_DAG_LIST_COLUMNS = Arg( ("--columns",), type=string_list_type, @@ -1243,15 +1236,14 @@ class GroupCommand(NamedTuple): ), ActionCommand( name="reserialize", - help="Reserialize all DAGs by parsing the DagBag files", + help="Reserialize DAGs by parsing the DagBag files", description=( - "Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized " - "from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the " - "version of Airflow that you are running." + "Reserialize DAGs in the metadata DB. This can be " + "particularly useful if your serialized DAGs become out of sync with the Airflow " + "version you are using." ), func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"), args=( - ARG_CLEAR_ONLY, ARG_SUBDIR, ARG_VERBOSE, ), diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 0b4d3192d6eda..669a075a6db2c 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -28,7 +28,7 @@ from typing import TYPE_CHECKING import re2 -from sqlalchemy import delete, select +from sqlalchemy import select from airflow.api.client import get_current_api_client from airflow.api_connexion.schemas.dag_schema import dag_schema @@ -537,8 +537,5 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No @provide_session def dag_reserialize(args, session: Session = NEW_SESSION) -> None: """Serialize a DAG instance.""" - session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False)) - - if not args.clear_only: - dagbag = DagBag(process_subdir(args.subdir)) - dagbag.sync_to_db(session=session) + dagbag = DagBag(process_subdir(args.subdir)) + dagbag.sync_to_db(session=session) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 3c657fd447fd1..8b348034f6c49 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -41,7 +41,6 @@ import attrs from sqlalchemy import ( Table, - delete, exc, func, inspect, @@ -924,9 +923,7 @@ def check_and_run_migrations(): def _reserialize_dags(*, session: Session) -> None: from airflow.models.dagbag import DagBag - from airflow.models.serialized_dag import SerializedDagModel - session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False)) dagbag = DagBag(collect_dags=False) dagbag.collect_dags(only_if_updated=False) dagbag.sync_to_db(session=session) diff --git a/newsfragments/43949.significant.rst b/newsfragments/43949.significant.rst new file mode 100644 index 0000000000000..2d1cf53797e5b --- /dev/null +++ b/newsfragments/43949.significant.rst @@ -0,0 +1,5 @@ +The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed. + +The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them. +This option has been removed as it is no longer needed. We have implemented DAG versioning and can +no longer delete serialized dag without going through ``airflow db-clean`` command. This command is now only for reserializing DAGs. diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 7b9dd1e63bd18..1932321b5fcb2 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -39,6 +39,7 @@ from airflow.models import DagBag, DagModel, DagRun from airflow.models.baseoperator import BaseOperator from airflow.models.dag import _run_inline_trigger +from airflow.models.dag_version import DagVersion from airflow.models.serialized_dag import SerializedDagModel from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger @@ -79,34 +80,31 @@ def teardown_class(cls) -> None: def setup_method(self): clear_db_runs() # clean-up all dag run before start each test - def test_reserialize(self): + def test_reserialize(self, session): # Assert that there are serialized Dags - with create_session() as session: - serialized_dags_before_command = session.query(SerializedDagModel).all() + serialized_dags_before_command = session.query(SerializedDagModel).all() assert len(serialized_dags_before_command) # There are serialized DAGs to delete - - # Run clear of serialized dags - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"])) - # Assert no serialized Dags - with create_session() as session: - serialized_dags_after_clear = session.query(SerializedDagModel).all() - assert not len(serialized_dags_after_clear) - - # Serialize manually + # delete all versioning + session.query(DagVersion).delete() + + serialized_dags_before_command = session.query(SerializedDagModel).all() + assert not len(serialized_dags_before_command) # There are no more serialized dag + dag_version_before_command = session.query(DagVersion).all() + assert not len(dag_version_before_command) + # Serialize the dags dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"])) + # Assert serialized Dags + serialized_dags_after_clear = session.query(SerializedDagModel).all() + assert len(serialized_dags_after_clear) + dag_version_after_command = session.query(DagVersion).all() + assert len(dag_version_after_command) - # Check serialized DAGs are back - with create_session() as session: - serialized_dags_after_reserialize = session.query(SerializedDagModel).all() - assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back - - def test_reserialize_should_support_subdir_argument(self): + def test_reserialize_should_support_subdir_argument(self, session): # Run clear of serialized dags - dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"])) + session.query(DagVersion).delete() # Assert no serialized Dags - with create_session() as session: - serialized_dags_after_clear = session.query(SerializedDagModel).all() + serialized_dags_after_clear = session.query(SerializedDagModel).all() assert len(serialized_dags_after_clear) == 0 # Serialize manually @@ -120,8 +118,7 @@ def test_reserialize_should_support_subdir_argument(self): dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path])) # Check serialized DAG are back - with create_session() as session: - serialized_dags_after_reserialize = session.query(SerializedDagModel).all() + serialized_dags_after_reserialize = session.query(SerializedDagModel).all() assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG back def test_show_dag_dependencies_print(self):