Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from typing import TYPE_CHECKING

from sqlalchemy import delete

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.models.dag_version import DagVersion
Expand Down Expand Up @@ -136,6 +138,10 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
bundle.active = False
inactive_bundle_names.append(name)
self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name)
from airflow.models.errors import ParseImportError

session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name))
self.log.info("Deleted import errors for bundle %s which is no longer configured", name)

if inactive_bundle_names and active_bundle_names:
new_bundle_name = sorted(active_bundle_names)[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from airflow.exceptions import AirflowConfigException
from airflow.models.dag_version import DagVersion
from airflow.models.dagbundle import DagBundleModel
from airflow.models.errors import ParseImportError
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.session import create_session

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dag_bundles
Expand Down Expand Up @@ -141,16 +141,12 @@ def clear_db():

@pytest.mark.db_test
@conf_vars({("core", "LOAD_EXAMPLES"): "False"})
def test_sync_bundles_to_db(clear_db, dag_maker):
def test_sync_bundles_to_db(clear_db, dag_maker, session):
def _get_bundle_names_and_active():
with create_session() as session:
return (
session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all()
)
return session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all()

def _get_dag_version_bundle_names():
with create_session() as session:
return session.query(DagVersion.dag_id, DagVersion.bundle_name).all()
return session.query(DagVersion.dag_id, DagVersion.bundle_name).all()

# Initial add
with patch.dict(
Expand All @@ -164,6 +160,15 @@ def _get_dag_version_bundle_names():
with dag_maker(dag_id="test_dag", schedule=None):
EmptyOperator(task_id="mytask")

session.add(
ParseImportError(
bundle_name="my-test-bundle", # simulate import error for this bundle
filename="some_file.py",
stacktrace="some error",
)
)
session.flush()

# simulate bundle config change (now 'dags-folder' is active, 'my-test-bundle' becomes inactive)
manager = DagBundlesManager()
manager.sync_bundles_to_db()
Expand All @@ -172,6 +177,8 @@ def _get_dag_version_bundle_names():
("dags-folder", True),
("my-test-bundle", False),
]
# Since my-test-bundle is inactive, the associated import errors should be deleted
assert session.query(ParseImportError).count() == 0

# Check that the DAG version bundle_name got auto-updated to active one
assert _get_dag_version_bundle_names() == [("test_dag", "dags-folder")]
Expand Down