From f916e02427cba9cf48ed5f5fee2ea2f652544599 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 12 Aug 2024 15:14:21 +0800 Subject: [PATCH 1/3] fix(datasets/manager): fix DagPriorityParsingRequest unique constraint error when dataset aliases are resolved into new datasets this happens when dynamic task mapping is used --- airflow/datasets/manager.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 29f95ef4c742a..4d46ced1d80e4 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -140,10 +140,8 @@ def register_dataset_change( dags_to_reparse = dags_to_queue_from_dataset_alias - dags_to_queue_from_dataset if dags_to_reparse: - session.add_all( - DagPriorityParsingRequest(fileloc=fileloc) - for fileloc in {dag.fileloc for dag in dags_to_reparse} - ) + file_locs = {dag.fileloc for dag in dags_to_reparse} + cls._send_dag_priority_parsing_request(file_locs, session) session.flush() cls.notify_dataset_changed(dataset=dataset) @@ -208,6 +206,37 @@ def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue: set[DagModel], stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing() session.execute(stmt, values) + @classmethod + def _send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None: + if session.bind.dialect.name == "postgresql": + return cls._postgres_send_dag_priority_parsing_request(file_locs, session) + return cls._slow_path_send_dag_priority_parsing_request(file_locs, session) + + @classmethod + def _slow_path_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None: + def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str | None: + # Don't error whole transaction when a single DagPriorityParsingRequest item conflicts. + # https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint + req = DagPriorityParsingRequest(fileloc=fileloc) + try: + with session.begin_nested(): + session.merge(req) + except exc.IntegrityError: + cls.logger().debug("Skipping request %s", req, exc_info=True) + return None + return req.fileloc + + results = (_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in file_locs) + if filelocs_to_parse := [result for result in results if result is not None]: + cls.logger().debug("parse DAGs in %s", filelocs_to_parse) + + @classmethod + def _postgres_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None: + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(DagPriorityParsingRequest).on_conflict_do_nothing() + session.execute(stmt, {"fileloc": fileloc for fileloc in file_locs}) + def resolve_dataset_manager() -> DatasetManager: """Retrieve the dataset manager.""" From 9a587ccab499598c9498a58ad43b64016bbb6884 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 12 Aug 2024 16:18:11 +0800 Subject: [PATCH 2/3] refactor(dataset/manager): reword debug log Co-authored-by: Ephraim Anierobi --- airflow/datasets/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 4d46ced1d80e4..efcf1639c4f97 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -222,7 +222,7 @@ def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str | None: with session.begin_nested(): session.merge(req) except exc.IntegrityError: - cls.logger().debug("Skipping request %s", req, exc_info=True) + cls.logger().debug("Skipping request %s, already present", req, exc_info=True) return None return req.fileloc From 82d86f067dbd7a5b349d3004a546f49d5db034ba Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 12 Aug 2024 16:18:41 +0800 Subject: [PATCH 3/3] refactor(dataset/manager): remove unnecessary logging Co-authored-by: Ephraim Anierobi --- airflow/datasets/manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index efcf1639c4f97..058eef6ab8922 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -226,9 +226,7 @@ def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str | None: return None return req.fileloc - results = (_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in file_locs) - if filelocs_to_parse := [result for result in results if result is not None]: - cls.logger().debug("parse DAGs in %s", filelocs_to_parse) + (_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in file_locs) @classmethod def _postgres_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None: