Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,13 +967,6 @@ def string_lower_type(val):
help="The maximum number of triggers that a Triggerer will run at one time.",
)

# reserialize
ARG_CLEAR_ONLY = Arg(
("--clear-only",),
action="store_true",
help="If passed, serialized DAGs will be cleared but not reserialized.",
)

ARG_DAG_LIST_COLUMNS = Arg(
("--columns",),
type=string_list_type,
Expand Down Expand Up @@ -1243,15 +1236,14 @@ class GroupCommand(NamedTuple):
),
ActionCommand(
name="reserialize",
help="Reserialize all DAGs by parsing the DagBag files",
help="Reserialize DAGs by parsing the DagBag files",
description=(
"Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
"from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
"version of Airflow that you are running."
"Reserialize DAGs in the metadata DB. This can be "
"particularly useful if your serialized DAGs become out of sync with the Airflow "
"version you are using."
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"),
args=(
ARG_CLEAR_ONLY,
ARG_SUBDIR,
ARG_VERBOSE,
),
Expand Down
9 changes: 3 additions & 6 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from typing import TYPE_CHECKING

import re2
from sqlalchemy import delete, select
from sqlalchemy import select

from airflow.api.client import get_current_api_client
from airflow.api_connexion.schemas.dag_schema import dag_schema
Expand Down Expand Up @@ -537,8 +537,5 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
@provide_session
def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
"""Serialize a DAG instance."""
session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False))

if not args.clear_only:
dagbag = DagBag(process_subdir(args.subdir))
dagbag.sync_to_db(session=session)
dagbag = DagBag(process_subdir(args.subdir))
dagbag.sync_to_db(session=session)
3 changes: 0 additions & 3 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import attrs
from sqlalchemy import (
Table,
delete,
exc,
func,
inspect,
Expand Down Expand Up @@ -924,9 +923,7 @@ def check_and_run_migrations():

def _reserialize_dags(*, session: Session) -> None:
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel

session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False))
dagbag = DagBag(collect_dags=False)
dagbag.collect_dags(only_if_updated=False)
dagbag.sync_to_db(session=session)
Expand Down
5 changes: 5 additions & 0 deletions newsfragments/43949.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The ``--clear-only`` option of ``airflow dags reserialize`` command is now removed.

The ``--clear-only`` option was added to clear the serialized DAGs without reserializing them.
This option has been removed as it is no longer needed. We have implemented DAG versioning and can
no longer delete serialized dag without going through ``airflow db-clean`` command. This command is now only for reserializing DAGs.
43 changes: 20 additions & 23 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airflow.models import DagBag, DagModel, DagRun
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import _run_inline_trigger
from airflow.models.dag_version import DagVersion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.triggers.base import TriggerEvent
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
Expand Down Expand Up @@ -79,34 +80,31 @@ def teardown_class(cls) -> None:
def setup_method(self):
clear_db_runs() # clean-up all dag run before start each test

def test_reserialize(self):
def test_reserialize(self, session):
# Assert that there are serialized Dags
with create_session() as session:
serialized_dags_before_command = session.query(SerializedDagModel).all()
serialized_dags_before_command = session.query(SerializedDagModel).all()
assert len(serialized_dags_before_command) # There are serialized DAGs to delete

# Run clear of serialized dags
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"]))
# Assert no serialized Dags
with create_session() as session:
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert not len(serialized_dags_after_clear)

# Serialize manually
# delete all versioning
session.query(DagVersion).delete()

serialized_dags_before_command = session.query(SerializedDagModel).all()
assert not len(serialized_dags_before_command) # There are no more serialized dag
dag_version_before_command = session.query(DagVersion).all()
assert not len(dag_version_before_command)
# Serialize the dags
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"]))
# Assert serialized Dags
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_clear)
dag_version_after_command = session.query(DagVersion).all()
assert len(dag_version_after_command)

# Check serialized DAGs are back
with create_session() as session:
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back

def test_reserialize_should_support_subdir_argument(self):
def test_reserialize_should_support_subdir_argument(self, session):
# Run clear of serialized dags
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--clear-only"]))
session.query(DagVersion).delete()

# Assert no serialized Dags
with create_session() as session:
serialized_dags_after_clear = session.query(SerializedDagModel).all()
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_clear) == 0

# Serialize manually
Expand All @@ -120,8 +118,7 @@ def test_reserialize_should_support_subdir_argument(self):
dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize", "--subdir", dag_path]))

# Check serialized DAG are back
with create_session() as session:
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG back

def test_show_dag_dependencies_print(self):
Expand Down