Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ def dag_list_dags(args, session: Session = NEW_SESSION) -> None:
file=sys.stderr,
)

# Reserialize DAGs by parsing the DagBag files
if args.bundle_name:
_reserialize_dags(args.bundle_name, session=session)
else:
_reserialize_dags(session=session)

dagbag = DagBag(read_dags_from_db=True)
dagbag.collect_dags_from_db()

Expand Down Expand Up @@ -479,6 +485,12 @@ def dag_list_import_errors(args, session: Session = NEW_SESSION) -> None:
"""Display dags with import errors on the command line."""
data = []

# Reserialize DAGs by parsing the DagBag files
if args.bundle_name:
_reserialize_dags(args.bundle_name, session=session)
else:
_reserialize_dags(session=session)

# Get import errors from the DB
query = select(ParseImportError)
if args.bundle_name:
Expand Down Expand Up @@ -687,19 +699,18 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")


@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
def _reserialize_dags(bundle_name: str | None = None, session: Session = NEW_SESSION) -> None:
"""Serialize a DAG instance."""
manager = DagBundlesManager()
manager.sync_bundles_to_db(session=session)
session.commit()

all_bundles = list(manager.get_all_dag_bundles())
if args.bundle_name:
validate_dag_bundle_arg(args.bundle_name)
bundles_to_reserialize = set(args.bundle_name)
if bundle_name:
validate_dag_bundle_arg(bundle_name)
bundles_to_reserialize = set(bundle_name)
else:
bundles_to_reserialize = {b.name for b in all_bundles}

Expand All @@ -709,3 +720,14 @@ def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
bundle.initialize()
dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False)
dag_bag.sync_to_db(bundle.name, bundle_version=bundle.get_current_version(), session=session)


@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
"""Serialize a DAG instance."""
if args.bundle_name:
_reserialize_dags(args.bundle_name, session=session)
else:
_reserialize_dags(session=session)
Loading