Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,16 @@
from typing import TYPE_CHECKING

import sqlalchemy_jsonfield
from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, delete, select, text
from sqlalchemy import (
Column,
ForeignKeyConstraint,
Integer,
PrimaryKeyConstraint,
delete,
exists,
select,
text,
)
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Session, relationship

Expand All @@ -33,7 +42,6 @@
from airflow.settings import json
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import tuple_not_in_condition

if TYPE_CHECKING:
from sqlalchemy.sql import FromClause
Expand Down Expand Up @@ -201,11 +209,11 @@ def delete_old_records(
:param num_to_keep: Number of Records to keep
:param session: SqlAlchemy Session
"""
from airflow.models.dagrun import DagRun

if num_to_keep <= 0:
return

from airflow.models.dagrun import DagRun
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think this actually matters, at this point it’s likely DagRun is imported anyway and the import would be basically free. But either way works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side comment. I've learned recently to use more and more local imports. I think basically that when not really needed, the advice of "use top level imports by default" is less and less relevant for "heavy" imports.

The lazy import PEP is years away and it does not seem that it will be enabled by default - you will have to specify an option when you launch interpreter, and I think it's simply inevitable that people will use more and more local imports and the tooling (static checks IDE support etc.) will start supporting it better.

That's my bet actually that rather than switching to lazy impors we will switch to "local imports for heavy things by default". You can see it all over Airflow's code already.

The only reason why you would like to use top-level imports for "heavy" packages is when you use the same import in multiple methods in the same module - but this also can be solved by simply local importing that partiular module.

I bet some tooling for "detect heavy top-level imports" will soon show up in one of the static check tooling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and of course the problem is with type annotations and TYPE_CHECKING - would also be nice to solve that one as the TYPE_CHECKING is really a hack as opposed to local imports)


tis_to_keep_query = (
select(cls.dag_id, cls.task_id, cls.run_id, DagRun.execution_date)
.where(cls.dag_id == dag_id, cls.task_id == task_id)
Expand Down Expand Up @@ -234,17 +242,19 @@ def _do_delete_old_records(
session: Session,
) -> None:
# This query might deadlock occasionally and it should be retried if fails (see decorator)

stmt = (
delete(cls)
.where(
cls.dag_id == dag_id,
cls.task_id == task_id,
tuple_not_in_condition(
(cls.dag_id, cls.task_id, cls.run_id),
select(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id),
session=session,
~exists(1).where(
ti_clause.c.dag_id == cls.dag_id,
ti_clause.c.task_id == cls.task_id,
ti_clause.c.run_id == cls.run_id,
),
)
.execution_options(synchronize_session=False)
)

session.execute(stmt)
4 changes: 3 additions & 1 deletion airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ def tuple_not_in_condition(

:meta private:
"""
if settings.engine.dialect.name != "mssql":
dialect = session.bind.dialect if session else settings.engine.dialect

if dialect.name != "mssql":
return tuple_(*columns).not_in(collection)
if not isinstance(collection, Select):
rows = collection
Expand Down