From a040587c448a05f406dfb46b5aa94327c259f4be Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Sat, 19 Aug 2023 17:54:48 +0400 Subject: [PATCH 1/2] Use `NOT EXISTS` subquery instead of `tuple_not_in_condition` --- airflow/models/renderedtifields.py | 26 ++++++++++++++++++-------- airflow/utils/sqlalchemy.py | 14 +++++++++++++- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py index 269af8276abed..22bb536d8140f 100644 --- a/airflow/models/renderedtifields.py +++ b/airflow/models/renderedtifields.py @@ -22,7 +22,16 @@ from typing import TYPE_CHECKING import sqlalchemy_jsonfield -from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, delete, select, text +from sqlalchemy import ( + Column, + ForeignKeyConstraint, + Integer, + PrimaryKeyConstraint, + delete, + exists, + select, + text, +) from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import Session, relationship @@ -33,7 +42,6 @@ from airflow.settings import json from airflow.utils.retries import retry_db_transaction from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.sqlalchemy import tuple_not_in_condition if TYPE_CHECKING: from sqlalchemy.sql import FromClause @@ -201,11 +209,11 @@ def delete_old_records( :param num_to_keep: Number of Records to keep :param session: SqlAlchemy Session """ - from airflow.models.dagrun import DagRun - if num_to_keep <= 0: return + from airflow.models.dagrun import DagRun + tis_to_keep_query = ( select(cls.dag_id, cls.task_id, cls.run_id, DagRun.execution_date) .where(cls.dag_id == dag_id, cls.task_id == task_id) @@ -234,17 +242,19 @@ def _do_delete_old_records( session: Session, ) -> None: # This query might deadlock occasionally and it should be retried if fails (see decorator) + stmt = ( delete(cls) .where( cls.dag_id == dag_id, cls.task_id == task_id, - tuple_not_in_condition( - (cls.dag_id, cls.task_id, cls.run_id), - select(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id), - session=session, + ~exists(1).where( + ti_clause.c.dag_id == cls.dag_id, + ti_clause.c.task_id == cls.task_id, + ti_clause.c.run_id == cls.run_id, ), ) .execution_options(synchronize_session=False) ) + session.execute(stmt) diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index b4b726ca327ab..c21d2a3ee5fac 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -22,6 +22,7 @@ import datetime import json import logging +import warnings from typing import TYPE_CHECKING, Any, Generator, Iterable, overload import pendulum @@ -34,6 +35,7 @@ from airflow import settings from airflow.configuration import conf +from airflow.exceptions import RemovedInAirflow3Warning from airflow.serialization.enums import Encoding from airflow.utils.timezone import make_naive @@ -590,7 +592,17 @@ def tuple_not_in_condition( :meta private: """ - if settings.engine.dialect.name != "mssql": + warnings.warn( + f"`{__name__}.tuple_not_in_condition` is deprecated. " + "Please consider to use `sqlalchemy.sql.expression.exists` instead, see: " + "https://docs.sqlalchemy.org/en/latest/core/selectable.html#sqlalchemy.sql.expression.exists", + RemovedInAirflow3Warning, + stacklevel=2, + ) + + dialect = session.bind.dialect if session else settings.engine.dialect + + if dialect.name != "mssql": return tuple_(*columns).not_in(collection) if not isinstance(collection, Select): rows = collection From f403d43e52a9c6ebd5343083ea6e10af5e9f9027 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Sun, 20 Aug 2023 12:14:39 +0400 Subject: [PATCH 2/2] Remove deprication from `tuple_not_in_condition` --- airflow/utils/sqlalchemy.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index c21d2a3ee5fac..6ea2005efab71 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -22,7 +22,6 @@ import datetime import json import logging -import warnings from typing import TYPE_CHECKING, Any, Generator, Iterable, overload import pendulum @@ -35,7 +34,6 @@ from airflow import settings from airflow.configuration import conf -from airflow.exceptions import RemovedInAirflow3Warning from airflow.serialization.enums import Encoding from airflow.utils.timezone import make_naive @@ -592,14 +590,6 @@ def tuple_not_in_condition( :meta private: """ - warnings.warn( - f"`{__name__}.tuple_not_in_condition` is deprecated. " - "Please consider to use `sqlalchemy.sql.expression.exists` instead, see: " - "https://docs.sqlalchemy.org/en/latest/core/selectable.html#sqlalchemy.sql.expression.exists", - RemovedInAirflow3Warning, - stacklevel=2, - ) - dialect = session.bind.dialect if session else settings.engine.dialect if dialect.name != "mssql":