diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index e70127358f5e5..ff956f2a476d2 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -404,6 +404,12 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None: file=sys.stderr, ) + # Reserialize DAGs by parsing the DagBag files + if args.bundle_name: + _reserialize_dags(args.bundle_name, session=session) + else: + _reserialize_dags(session=session) + dagbag = DagBag(read_dags_from_db=True) dagbag.collect_dags_from_db() @@ -479,6 +485,12 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None: """Display dags with import errors on the command line.""" data = [] + # Reserialize DAGs by parsing the DagBag files + if args.bundle_name: + _reserialize_dags(args.bundle_name, session=session) + else: + _reserialize_dags(session=session) + # Get import errors from the DB query = select(ParseImportError) if args.bundle_name: @@ -687,19 +699,18 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No raise SystemExit("DagRun failed") -@cli_utils.action_cli @providers_configuration_loaded @provide_session -def dag_reserialize(args, session: Session = NEW_SESSION) -> None: +def _reserialize_dags(bundle_name: str | None = None, session: Session = NEW_SESSION) -> None: """Serialize a DAG instance.""" manager = DagBundlesManager() manager.sync_bundles_to_db(session=session) session.commit() all_bundles = list(manager.get_all_dag_bundles()) - if args.bundle_name: - validate_dag_bundle_arg(args.bundle_name) - bundles_to_reserialize = set(args.bundle_name) + if bundle_name: + validate_dag_bundle_arg(bundle_name) + bundles_to_reserialize = set(bundle_name) else: bundles_to_reserialize = {b.name for b in all_bundles} @@ -709,3 +720,14 @@ def dag_reserialize(args, session: Session = NEW_SESSION) -> None: bundle.initialize() dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False) dag_bag.sync_to_db(bundle.name, bundle_version=bundle.get_current_version(), session=session) + + +@cli_utils.action_cli +@providers_configuration_loaded +@provide_session +def dag_reserialize(args, session: Session = NEW_SESSION) -> None: + """Serialize a DAG instance.""" + if args.bundle_name: + _reserialize_dags(args.bundle_name, session=session) + else: + _reserialize_dags(session=session)