From 7d15065cac5f96775714bfafa7059a0d581b96df Mon Sep 17 00:00:00 2001 From: vincbeck Date: Wed, 26 Nov 2025 10:31:58 -0500 Subject: [PATCH] Fix mypy errors in models --- .../api_fastapi/execution_api/datamodels/taskinstance.py | 2 +- airflow-core/src/airflow/models/connection.py | 3 ++- airflow-core/src/airflow/models/dag.py | 7 ++++--- airflow-core/src/airflow/models/serialized_dag.py | 2 +- airflow-core/src/airflow/models/taskinstance.py | 4 +--- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index e576ac13740d3..56656bc722218 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -140,7 +140,7 @@ class TIDeferredStatePayload(StrictBaseModel): trigger_timeout: timedelta | None = None next_method: str """The name of the method on the operator to call in the worker after the trigger has fired.""" - next_kwargs: Annotated[dict[str, Any] | str, Field(default_factory=dict)] + next_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)] """ Kwargs to pass to the above method, either a plain dict or an encrypted string. diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index f64c38b5efe43..5c57f53922e0c 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -363,8 +363,9 @@ def password(cls): """Password. The value is decrypted/encrypted when reading/setting the value.""" return synonym("_password", descriptor=property(cls.get_password, cls.set_password)) - def get_extra(self) -> str: + def get_extra(self) -> str | None: """Return encrypted extra-data.""" + extra_val: str | None if self._extra and self.is_extra_encrypted: fernet = get_fernet() if not fernet.is_encrypted: diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index e43be246c155f..16ce63aab9eed 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -141,6 +141,8 @@ def get_run_data_interval(timetable: Timetable, run: DagRun) -> DataInterval: # Compatibility: runs created before AIP-39 implementation don't have an # explicit data interval. Try to infer from the logical date. + if TYPE_CHECKING: + assert run.logical_date is not None return infer_automated_data_interval(timetable, run.logical_date) @@ -521,14 +523,13 @@ def get_paused_dag_ids(dag_ids: list[str], session: Session = NEW_SESSION) -> se :param session: ORM Session :return: Paused Dag_ids """ - paused_dag_ids = session.execute( + paused_dag_ids = session.scalars( select(DagModel.dag_id) .where(DagModel.is_paused == expression.true()) .where(DagModel.dag_id.in_(dag_ids)) ) - paused_dag_ids = {paused_dag_id for (paused_dag_id,) in paused_dag_ids} - return paused_dag_ids + return set(paused_dag_ids) @property def safe_dag_id(self): diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 191d36c673723..a5cf8a21a07cd 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -335,7 +335,7 @@ def __init__(self, dag: LazyDeserializedDAG) -> None: # serve as cache so no need to decompress and load, when accessing data field # when COMPRESS_SERIALIZED_DAGS is True - self.__data_cache = dag_data + self.__data_cache: dict[Any, Any] | None = dag_data def __repr__(self) -> str: return f"" diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5e873096712d5..6d11d1472af66 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -438,9 +438,7 @@ class TaskInstance(Base, LoggingMixin): # The method to call next, and any extra arguments to pass to it. # Usually used when resuming from DEFERRED. next_method: Mapped[str | None] = mapped_column(String(1000), nullable=True) - next_kwargs: Mapped[dict | str | None] = mapped_column( - MutableDict.as_mutable(ExtendedJSON), nullable=True - ) + next_kwargs: Mapped[dict | None] = mapped_column(MutableDict.as_mutable(ExtendedJSON), nullable=True) _task_display_property_value: Mapped[str | None] = mapped_column( "task_display_name", String(2000), nullable=True diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 3b26341612759..4b38ff8bb50f7 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -184,7 +184,7 @@ class TIDeferredStatePayload(BaseModel): trigger_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Trigger Kwargs")] = None trigger_timeout: Annotated[timedelta | None, Field(title="Trigger Timeout")] = None next_method: Annotated[str, Field(title="Next Method")] - next_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Next Kwargs")] = None + next_kwargs: Annotated[dict[str, Any] | None, Field(title="Next Kwargs")] = None rendered_map_index: Annotated[str | None, Field(title="Rendered Map Index")] = None