From 40dbdce21ff0dd4323bff15c9d4ca54b8d1787f1 Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Tue, 15 Apr 2025 22:54:57 -0500 Subject: [PATCH 1/2] Reserialize before dags list/list-import-errors --- airflow-core/src/airflow/cli/commands/dag_command.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index ee75ef34aa170..af393de0f3b53 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -404,6 +404,9 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None: file=sys.stderr, ) + # Reserialize DAGs by parsing the DagBag files + dag_reserialize(args, session) + dagbag = DagBag(read_dags_from_db=True) dagbag.collect_dags_from_db() @@ -479,6 +482,9 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None: """Display dags with import errors on the command line.""" data = [] + # Reserialize DAGs by parsing the DagBag files + dag_reserialize(args, session) + # Get import errors from the DB query = select(ParseImportError) if args.bundle_name: From 0edbca2b498d3e863838893d8608a3aa6bd8b897 Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Wed, 16 Apr 2025 12:31:51 -0500 Subject: [PATCH 2/2] common method to reserialise the dags --- .../src/airflow/cli/commands/dag_command.py | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index af393de0f3b53..6c471f8724022 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -405,7 +405,10 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None: ) # Reserialize DAGs by parsing the DagBag files - dag_reserialize(args, session) + if args.bundle_name: + _reserialize_dags(args.bundle_name, session=session) + else: + _reserialize_dags(session=session) dagbag = DagBag(read_dags_from_db=True) dagbag.collect_dags_from_db() @@ -483,7 +486,10 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None: data = [] # Reserialize DAGs by parsing the DagBag files - dag_reserialize(args, session) + if args.bundle_name: + _reserialize_dags(args.bundle_name, session=session) + else: + _reserialize_dags(session=session) # Get import errors from the DB query = select(ParseImportError) @@ -693,19 +699,18 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No raise SystemExit("DagRun failed") -@cli_utils.action_cli @providers_configuration_loaded @provide_session -def dag_reserialize(args, session: Session = NEW_SESSION) -> None: +def _reserialize_dags(bundle_name: str | None = None, session: Session = NEW_SESSION) -> None: """Serialize a DAG instance.""" manager = DagBundlesManager() manager.sync_bundles_to_db(session=session) session.commit() all_bundles = list(manager.get_all_dag_bundles()) - if args.bundle_name: - validate_dag_bundle_arg(args.bundle_name) - bundles_to_reserialize = set(args.bundle_name) + if bundle_name: + validate_dag_bundle_arg(bundle_name) + bundles_to_reserialize = set(bundle_name) else: bundles_to_reserialize = {b.name for b in all_bundles} @@ -715,3 +720,14 @@ def dag_reserialize(args, session: Session = NEW_SESSION) -> None: bundle.initialize() dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False) dag_bag.sync_to_db(bundle.name, bundle_version=bundle.get_current_version(), session=session) + + +@cli_utils.action_cli +@providers_configuration_loaded +@provide_session +def dag_reserialize(args, session: Session = NEW_SESSION) -> None: + """Serialize a DAG instance.""" + if args.bundle_name: + _reserialize_dags(args.bundle_name, session=session) + else: + _reserialize_dags(session=session)