From 74e28b876f4f920531565371ea24f6f9207d705c Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 17 Jun 2025 09:35:14 +0100 Subject: [PATCH] Delete import error when a dag bundle becomes inactive (#51800) * Delete import error when a dag bundle becomes inactive Currently, when a bundle is marked as inactive, its associated import errors continue to appear. To resolve this, we delete the import errors at the moment the bundle is deactivated. * Update airflow-core/src/airflow/dag_processing/bundles/manager.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> (cherry picked from commit 073fe2ea1977770d87ad72db01d5c61a007488e6) --- .../airflow/dag_processing/bundles/manager.py | 6 +++++ .../bundles/test_dag_bundle_manager.py | 23 ++++++++++++------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/bundles/manager.py b/airflow-core/src/airflow/dag_processing/bundles/manager.py index 9760da617a1d9..b1ae38a4f6449 100644 --- a/airflow-core/src/airflow/dag_processing/bundles/manager.py +++ b/airflow-core/src/airflow/dag_processing/bundles/manager.py @@ -18,6 +18,8 @@ from typing import TYPE_CHECKING +from sqlalchemy import delete + from airflow.configuration import conf from airflow.exceptions import AirflowConfigException from airflow.models.dag_version import DagVersion @@ -136,6 +138,10 @@ def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: bundle.active = False inactive_bundle_names.append(name) self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) + from airflow.models.errors import ParseImportError + + session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name)) + self.log.info("Deleted import errors for bundle %s which is no longer configured", name) if inactive_bundle_names and active_bundle_names: new_bundle_name = sorted(active_bundle_names)[0] diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py index 9f20ed49f8dda..80b37bdea1fff 100644 --- a/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py +++ b/airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py @@ -29,8 +29,8 @@ from airflow.exceptions import AirflowConfigException from airflow.models.dag_version import DagVersion from airflow.models.dagbundle import DagBundleModel +from airflow.models.errors import ParseImportError from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.utils.session import create_session from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dag_bundles @@ -141,16 +141,12 @@ def clear_db(): @pytest.mark.db_test @conf_vars({("core", "LOAD_EXAMPLES"): "False"}) -def test_sync_bundles_to_db(clear_db, dag_maker): +def test_sync_bundles_to_db(clear_db, dag_maker, session): def _get_bundle_names_and_active(): - with create_session() as session: - return ( - session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all() - ) + return session.query(DagBundleModel.name, DagBundleModel.active).order_by(DagBundleModel.name).all() def _get_dag_version_bundle_names(): - with create_session() as session: - return session.query(DagVersion.dag_id, DagVersion.bundle_name).all() + return session.query(DagVersion.dag_id, DagVersion.bundle_name).all() # Initial add with patch.dict( @@ -164,6 +160,15 @@ def _get_dag_version_bundle_names(): with dag_maker(dag_id="test_dag", schedule=None): EmptyOperator(task_id="mytask") + session.add( + ParseImportError( + bundle_name="my-test-bundle", # simulate import error for this bundle + filename="some_file.py", + stacktrace="some error", + ) + ) + session.flush() + # simulate bundle config change (now 'dags-folder' is active, 'my-test-bundle' becomes inactive) manager = DagBundlesManager() manager.sync_bundles_to_db() @@ -172,6 +177,8 @@ def _get_dag_version_bundle_names(): ("dags-folder", True), ("my-test-bundle", False), ] + # Since my-test-bundle is inactive, the associated import errors should be deleted + assert session.query(ParseImportError).count() == 0 # Check that the DAG version bundle_name got auto-updated to active one assert _get_dag_version_bundle_names() == [("test_dag", "dags-folder")]