diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py index 269af8276abed..22bb536d8140f 100644 --- a/airflow/models/renderedtifields.py +++ b/airflow/models/renderedtifields.py @@ -22,7 +22,16 @@ from typing import TYPE_CHECKING import sqlalchemy_jsonfield -from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, delete, select, text +from sqlalchemy import ( + Column, + ForeignKeyConstraint, + Integer, + PrimaryKeyConstraint, + delete, + exists, + select, + text, +) from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import Session, relationship @@ -33,7 +42,6 @@ from airflow.settings import json from airflow.utils.retries import retry_db_transaction from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import tuple_not_in_condition if TYPE_CHECKING: from sqlalchemy.sql import FromClause @@ -201,11 +209,11 @@ def delete_old_records( :param num_to_keep: Number of Records to keep :param session: SqlAlchemy Session """ - from airflow.models.dagrun import DagRun - if num_to_keep <= 0: return + from airflow.models.dagrun import DagRun + tis_to_keep_query = ( select(cls.dag_id, cls.task_id, cls.run_id, DagRun.execution_date) .where(cls.dag_id == dag_id, cls.task_id == task_id) @@ -234,17 +242,19 @@ def _do_delete_old_records( session: Session, ) -> None: # This query might deadlock occasionally and it should be retried if fails (see decorator) + stmt = ( delete(cls) .where( cls.dag_id == dag_id, cls.task_id == task_id, - tuple_not_in_condition( - (cls.dag_id, cls.task_id, cls.run_id), - select(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id), - session=session, + ~exists(1).where( + ti_clause.c.dag_id == cls.dag_id, + ti_clause.c.task_id == cls.task_id, + ti_clause.c.run_id == cls.run_id, ), ) .execution_options(synchronize_session=False) ) + session.execute(stmt) diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index b4b726ca327ab..6ea2005efab71 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -590,7 +590,9 @@ def tuple_not_in_condition( :meta private: """ - if settings.engine.dialect.name != "mssql": + dialect = session.bind.dialect if session else settings.engine.dialect + + if dialect.name != "mssql": return tuple_(*columns).not_in(collection) if not isinstance(collection, Select): rows = collection