Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,17 +394,6 @@ def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
("fab", "auth_rate_limit"): ("webserver", "auth_rate_limit", "2.9.0"),
}

# A mapping of new configurations to a list of old configurations for when one configuration
# deprecates more than one other deprecation. The deprecation logic for these configurations
# is defined in SchedulerJobRunner.
many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
("scheduler", "task_queued_timeout"): [
("celery", "stalled_task_timeout", "2.6.0"),
("celery", "task_adoption_timeout", "2.6.0"),
("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
]
}

# A mapping of new section -> (old section, since_version).
deprecated_sections: dict[str, tuple[str, str]] = {"kubernetes_executor": ("kubernetes", "2.5.0")}

Expand Down
41 changes: 1 addition & 40 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import signal
import sys
import time
import warnings
from collections import Counter, defaultdict, deque
from dataclasses import dataclass
from datetime import timedelta
Expand Down Expand Up @@ -178,45 +177,7 @@ def __init__(
self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold")
self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")

# Since the functionality for stalled_task_timeout, task_adoption_timeout, and
# worker_pods_pending_timeout are now handled by a single config (task_queued_timeout),
# we can't deprecate them as we normally would. So, we'll read each config and take
# the max value in order to ensure we're not undercutting a legitimate
# use of any of these configs.
stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout", fallback=0)
if stalled_task_timeout:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The '[celery] stalled_task_timeout' config option is deprecated. "
"Please update your config to use '[scheduler] task_queued_timeout' instead.",
DeprecationWarning,
stacklevel=2,
)
task_adoption_timeout = conf.getfloat("celery", "task_adoption_timeout", fallback=0)
if task_adoption_timeout:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The '[celery] task_adoption_timeout' config option is deprecated. "
"Please update your config to use '[scheduler] task_queued_timeout' instead.",
DeprecationWarning,
stacklevel=2,
)
worker_pods_pending_timeout = conf.getfloat(
"kubernetes_executor", "worker_pods_pending_timeout", fallback=0
)
if worker_pods_pending_timeout:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The '[kubernetes_executor] worker_pods_pending_timeout' config option is deprecated. "
"Please update your config to use '[scheduler] task_queued_timeout' instead.",
DeprecationWarning,
stacklevel=2,
)
task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
self._task_queued_timeout = max(
stalled_task_timeout, task_adoption_timeout, worker_pods_pending_timeout, task_queued_timeout
)
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")

self.do_pickle = do_pickle

Expand Down
4 changes: 0 additions & 4 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,6 @@ def get_configs_and_deprecations(
) in AirflowConfigParser.deprecated_options.items():
deprecated_options[deprecated_section][deprecated_key] = section, key, since_version

for (section, key), deprecated in AirflowConfigParser.many_to_one_deprecated_options.items():
for deprecated_section, deprecated_key, since_version in deprecated:
deprecated_options[deprecated_section][deprecated_key] = section, key, since_version

if package_name == "apache-airflow":
configs = retrieve_configuration_description(include_providers=False)
else:
Expand Down
1 change: 1 addition & 0 deletions newsfragments/42060.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed deprecated configuration ``stalled_task_timeout`` from ``celery``, ``task_adoption_timeout`` from ``celery`` and ``worker_pods_pending_timeout`` from ``worker_pods_pending_timeout``. Please use ``task_queued_timeout`` from ``scheduler`` instead.