From dcedeffc27ceed258cb50c1556ef53c3755e6880 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 4 Jul 2025 17:38:37 +0530 Subject: [PATCH] Fix task configuration defaults for AbstractOperator Some defaults weren't being taken from configuration -- this is now fixed. --- airflow-core/src/airflow/models/__init__.py | 8 +++++ .../src/airflow/models/abstractoperator.py | 34 ------------------- .../serialization/test_dag_serialization.py | 2 ++ .../edge3/executors/edge_executor.py | 6 +++- .../definitions/_internal/abstractoperator.py | 20 +++++++---- .../sdk/definitions/decorators/__init__.pyi | 4 +-- .../airflow/sdk/definitions/mappedoperator.py | 7 ++-- 7 files changed, 33 insertions(+), 48 deletions(-) delete mode 100644 airflow-core/src/airflow/models/abstractoperator.py diff --git a/airflow-core/src/airflow/models/__init__.py b/airflow-core/src/airflow/models/__init__.py index be9541484d059..b28c3f2b0639a 100644 --- a/airflow-core/src/airflow/models/__init__.py +++ b/airflow-core/src/airflow/models/__init__.py @@ -146,6 +146,14 @@ def __getattr__(name): __deprecated_classes = { + "abstractoperator": { + "AbstractOperator": "airflow.sdk.definitions._internal.abstractoperator.AbstractOperator", + "NotMapped": "airflow.sdk.definitions._internal.abstractoperator.NotMapped", + "TaskStateChangeCallback": "airflow.sdk.definitions._internal.abstractoperator.TaskStateChangeCallback", + "DEFAULT_OWNER": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_OWNER", + "DEFAULT_QUEUE": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_QUEUE", + "DEFAULT_TASK_EXECUTION_TIMEOUT": "airflow.sdk.definitions._internal.abstractoperator.DEFAULT_TASK_EXECUTION_TIMEOUT", + }, "param": { "Param": "airflow.sdk.definitions.param.Param", "ParamsDict": "airflow.sdk.definitions.param.ParamsDict", diff --git a/airflow-core/src/airflow/models/abstractoperator.py b/airflow-core/src/airflow/models/abstractoperator.py deleted file mode 100644 index e5b5f7dc81f45..0000000000000 --- a/airflow-core/src/airflow/models/abstractoperator.py +++ /dev/null @@ -1,34 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import datetime - -from airflow.configuration import conf -from airflow.sdk.definitions._internal.abstractoperator import ( - AbstractOperator as AbstractOperator, - NotMapped as NotMapped, # Re-export this for compat - TaskStateChangeCallback as TaskStateChangeCallback, -) - -DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner") -DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue") - -DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta( - "core", "default_task_execution_timeout" -) diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 143f2c8acdd51..ad15025bc7036 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -187,6 +187,7 @@ "bash_command": "echo {{ task.task_id }}", "task_type": "BashOperator", "_task_module": "airflow.providers.standard.operators.bash", + "owner": "airflow", "pool": "default_pool", "is_setup": False, "is_teardown": False, @@ -3162,6 +3163,7 @@ def test_handle_v1_serdag(): "_task_type": "BashOperator", # Slightly difference from v2-10-stable here, we manually changed this path "_task_module": "airflow.providers.standard.operators.bash", + "owner": "airflow", "pool": "default_pool", "is_setup": False, "is_teardown": False, diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index e2e380fe7e285..19e99a4572580 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -30,7 +30,11 @@ from airflow.cli.cli_config import GroupCommand from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor -from airflow.models.abstractoperator import DEFAULT_QUEUE + +try: + from airflow.models.abstractoperator import DEFAULT_QUEUE +except (ImportError, AttributeError): + from airflow.sdk.definitions._internal.abstractoperator import DEFAULT_QUEUE from airflow.models.taskinstance import TaskInstance, TaskInstanceState from airflow.providers.edge3.cli.edge_command import EDGE_COMMANDS from airflow.providers.edge3.models.edge_job import EdgeJobModel diff --git a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py index 95c8d9d8287cb..8934cd0e4532c 100644 --- a/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/_internal/abstractoperator.py @@ -51,7 +51,7 @@ TaskStateChangeCallback = Callable[[Context], None] -DEFAULT_OWNER: str = "airflow" +DEFAULT_OWNER: str = conf.get_mandatory_value("operators", "default_owner") DEFAULT_POOL_SLOTS: int = 1 DEFAULT_POOL_NAME = "default_pool" DEFAULT_PRIORITY_WEIGHT: int = 1 @@ -62,17 +62,23 @@ MINIMUM_PRIORITY_WEIGHT: int = -2147483648 MAXIMUM_PRIORITY_WEIGHT: int = 2147483647 DEFAULT_EXECUTOR: str | None = None -DEFAULT_QUEUE: str = conf.get("operators", "default_queue", "default") +DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue") DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False -DEFAULT_RETRIES: int = 0 -DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(seconds=300) -MAX_RETRY_DELAY: int = 24 * 60 * 60 +DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0) +DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta( + seconds=conf.getint("core", "default_task_retry_delay", fallback=300) +) +MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 * 60 * 60) # TODO: Task-SDK -- these defaults should be overridable from the Airflow config DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS -DEFAULT_WEIGHT_RULE: WeightRule = WeightRule.DOWNSTREAM -DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = None +DEFAULT_WEIGHT_RULE: WeightRule = WeightRule( + conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM) +) +DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta( + "core", "default_task_execution_timeout" +) log = logging.getLogger(__name__) diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi index e60852a3f02d9..c9a6d9956bb64 100644 --- a/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi +++ b/task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi @@ -496,7 +496,7 @@ class TaskDecoratorCollection: """ # [END decorator_signature] @overload - def kubernetes( + def kubernetes( # type: ignore[misc] self, *, multiple_outputs: bool | None = None, @@ -670,7 +670,7 @@ class TaskDecoratorCollection: @overload def kubernetes(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... @overload - def kubernetes_cmd( + def kubernetes_cmd( # type: ignore[misc] self, *, args_only: bool = False, # Added by _KubernetesCmdDecoratedOperator. diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index 7fddf969137fb..7d4093d401868 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -26,7 +26,6 @@ import attrs import methodtools -from airflow.models.abstractoperator import TaskStateChangeCallback from airflow.sdk.definitions._internal.abstractoperator import ( DEFAULT_EXECUTOR, DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, @@ -42,6 +41,7 @@ DEFAULT_WEIGHT_RULE, AbstractOperator, NotMapped, + TaskStateChangeCallback, ) from airflow.sdk.definitions._internal.expandinput import ( DictOfListsExpandInput, @@ -51,7 +51,7 @@ from airflow.sdk.definitions._internal.types import NOTSET from airflow.serialization.enums import DagAttributeTypes from airflow.task.priority_strategy import PriorityWeightStrategy, validate_and_load_priority_weight_strategy -from airflow.typing_compat import Literal +from airflow.typing_compat import Literal, TypeAlias, TypeGuard from airflow.utils.helpers import is_container, prevent_duplicates from airflow.utils.xcom import XCOM_RETURN_KEY @@ -73,13 +73,12 @@ from airflow.sdk.definitions.xcom_arg import XComArg from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.triggers.base import StartTriggerArgs - from airflow.typing_compat import TypeGuard from airflow.utils.context import Context from airflow.utils.operator_resources import Resources from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule -TaskStateChangeCallbackAttrType = TaskStateChangeCallback | list[TaskStateChangeCallback] | None +TaskStateChangeCallbackAttrType: TypeAlias = TaskStateChangeCallback | list[TaskStateChangeCallback] | None ValidationSource = Literal["expand"] | Literal["partial"]