Skip to content

Commit d5e10f9

Browse files
msumitSumit Maheshwaripotiuk
authored andcommitted
[v3-1-test] Fix dag-processor crashing due to MySql deadlock errors (#60166)
MySQL may throw deadlock errors when multiple DAG-Processor instances are running. The issue is a fetch sub-query being used within a delete query, which is sometimes causing a deadlock in MySQL. --------- (cherry picked from commit dfcd049) Co-authored-by: Sumit Maheshwari <msumit@users.noreply.github.com> Co-authored-by: Sumit Maheshwari <sumitm@uber.com> Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
1 parent d2dfb94 commit d5e10f9

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

airflow-core/src/airflow/models/asset.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,17 @@ def remove_references_to_deleted_dags(session: Session):
108108
DagScheduleAssetAliasReference,
109109
TaskOutletAssetReference,
110110
]
111-
for model in models_to_check:
112-
session.execute(
113-
delete(model)
114-
.where(model.dag_id.in_(select(DagModel.dag_id).where(DagModel.is_stale)))
115-
.execution_options(synchronize_session="fetch")
116-
)
111+
112+
# The queries need to be done in separate steps, because in the case of multiple
113+
# dag processors on MySQL, there could be a deadlock caused by acquiring both an
114+
# exclusive lock for deletion and shared lock for query in reverse sequence
115+
if stale_dag_ids := session.scalars(select(DagModel.dag_id).where(DagModel.is_stale)).all():
116+
for model in models_to_check:
117+
session.execute(
118+
delete(model)
119+
.where(model.dag_id.in_(stale_dag_ids))
120+
.execution_options(synchronize_session="fetch")
121+
)
117122

118123

119124
alias_association_table = Table(

0 commit comments

Comments
 (0)