Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ repos:
name: Run pydocstyle
args:
- --convention=pep257
- --add-ignore=D100,D102,D103,D104,D105,D107,D205,D400,D401
- --add-ignore=D100,D102,D103,D104,D105,D107,D202,D205,D400,D401
exclude: |
(?x)
^tests/.*\.py$|
Expand Down
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,13 @@
type: string
example: ~
default: "300"
- name: zombie_detection_interval
description: |
How often (in seconds) should the scheduler check for zombie tasks.
version_added: 2.3.0
type: float
example: ~
default: "10.0"
- name: catchup_by_default
description: |
Turn off scheduler catchup by setting this to ``False``.
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,9 @@ child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300

# How often (in seconds) should the scheduler check for zombie tasks.
zombie_detection_interval = 10.0

# Turn off scheduler catchup by setting this to ``False``.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
Expand Down
58 changes: 1 addition & 57 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,22 @@
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union, cast

from setproctitle import setproctitle
from sqlalchemy import or_
from tabulate import tabulate

import airflow.models
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import DagModel, errors
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest, TaskCallbackRequest
from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.session import provide_session
from airflow.utils.state import State

if TYPE_CHECKING:
import pathlib
Expand Down Expand Up @@ -434,8 +431,6 @@ def __init__(
# How often to print out DAG file processing stats to the log. Default to
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval')
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = conf.getint('scheduler', 'scheduler_zombie_task_threshold')

# Map from file path to the processor
self._processors: Dict[str, DagFileProcessorProcess] = {}
Expand All @@ -445,13 +440,10 @@ def __init__(
# Map from file path to stats about the file
self._file_stats: Dict[str, DagFileStat] = {}

self._last_zombie_query_time = None
# Last time that the DAG dir was traversed to look for files
self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0))
# Last time stats were printed
self.last_stat_print_time = 0
# TODO: Remove magic number
self._zombie_query_interval = 10
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout

Expand Down Expand Up @@ -566,7 +558,6 @@ def _run_parsing_loop(self):
self._processors.pop(processor.file_path)

self._refresh_dag_dir()
self._find_zombies()

self._kill_timed_out_processors()

Expand Down Expand Up @@ -1023,53 +1014,6 @@ def prepare_file_path_queue(self):

self._file_path_queue.extend(files_paths_to_queue)

@provide_session
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
and update the current zombie list.
"""
now = timezone.utcnow()
if (
not self._last_zombie_query_time
or (now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval
):
# to avoid circular imports
from airflow.jobs.local_task_job import LocalTaskJob as LJ

self.log.info("Finding 'running' jobs without a recent heartbeat")
TI = airflow.models.TaskInstance
DM = airflow.models.DagModel
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)

zombies = (
session.query(TI, DM.fileloc)
.join(LJ, TI.job_id == LJ.id)
.join(DM, TI.dag_id == DM.dag_id)
.filter(TI.state == State.RUNNING)
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
)
.all()
)

if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

self._last_zombie_query_time = timezone.utcnow()
for ti, file_loc in zombies:
request = TaskCallbackRequest(
full_filepath=file_loc,
simple_task_instance=SimpleTaskInstance(ti),
msg=f"Detected {ti} as zombie",
)
self.log.error("Detected zombie job: %s", request)
self._add_callback_to_queue(request)
Stats.incr('zombies_killed')

def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
now = timezone.utcnow()
Expand Down
92 changes: 75 additions & 17 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,30 +280,88 @@ def _validate_arg_names(self, funcname: str, kwargs: Dict[str, Any], valid_names
names = ", ".join(repr(n) for n in unknown_args)
raise TypeError(f'{funcname} got unexpected keyword arguments {names}')

def map(
self, *, dag: Optional["DAG"] = None, task_group: Optional["TaskGroup"] = None, **kwargs
) -> XComArg:
def map(self, *args, **kwargs) -> XComArg:
self._validate_arg_names("map", kwargs)
dag = dag or DagContext.get_current_dag()
task_group = task_group or TaskGroupContext.get_current_task_group(dag)
task_id = get_unique_task_id(self.kwargs['task_id'], dag, task_group)

operator = MappedOperator.from_decorator(
decorator=self,
partial_kwargs = self.kwargs.copy()
dag = partial_kwargs.pop("dag", DagContext.get_current_dag())
task_group = partial_kwargs.pop("task_group", TaskGroupContext.get_current_task_group(dag))
task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)

# Unfortunately attrs's type hinting support does not work well with
# subclassing; it complains that arguments forwarded to the superclass
# are "unexpected" (they are fine at runtime).
operator = cast(Any, DecoratedMappedOperator)(
operator_class=self.operator_class,
partial_kwargs=partial_kwargs,
mapped_kwargs={},
task_id=task_id,
dag=dag,
task_group=task_group,
task_id=task_id,
mapped_kwargs=kwargs,
deps=MappedOperator._deps(self.operator_class.deps),
multiple_outputs=self.multiple_outputs,
python_callable=self.function,
)

operator.mapped_kwargs["op_args"] = list(args)
operator.mapped_kwargs["op_kwargs"] = kwargs

for arg in itertools.chain(args, kwargs.values()):
XComArg.apply_upstream_relationship(operator, arg)
return XComArg(operator=operator)

def partial(
self, *, dag: Optional["DAG"] = None, task_group: Optional["TaskGroup"] = None, **kwargs
) -> "_TaskDecorator[Function, OperatorSubclass]":
self._validate_arg_names("partial", kwargs, {'task_id'})
partial_kwargs = self.kwargs.copy()
partial_kwargs.update(kwargs)
return attr.evolve(self, kwargs=partial_kwargs)
def partial(self, *args, **kwargs) -> "_TaskDecorator[Function, OperatorSubclass]":
self._validate_arg_names("partial", kwargs)

op_args = self.kwargs.get("op_args", [])
op_args.extend(args)

op_kwargs = self.kwargs.get("op_kwargs", {})
op_kwargs = _merge_kwargs(op_kwargs, kwargs, fail_reason="duplicate partial")

return attr.evolve(self, kwargs={**self.kwargs, "op_args": op_args, "op_kwargs": op_kwargs})


def _merge_kwargs(
kwargs1: Dict[str, XComArg],
kwargs2: Dict[str, XComArg],
*,
fail_reason: str,
) -> Dict[str, XComArg]:
duplicated_keys = set(kwargs1).intersection(kwargs2)
if len(duplicated_keys) == 1:
raise TypeError(f"{fail_reason} argument: {duplicated_keys.pop()}")
elif duplicated_keys:
duplicated_keys_display = ", ".join(sorted(duplicated_keys))
raise TypeError(f"{fail_reason} arguments: {duplicated_keys_display}")
return {**kwargs1, **kwargs2}


@attr.define(kw_only=True)
class DecoratedMappedOperator(MappedOperator):
"""MappedOperator implementation for @task-decorated task function."""

multiple_outputs: bool
python_callable: Callable

def create_unmapped_operator(self, dag: "DAG") -> BaseOperator:
assert not isinstance(self.operator_class, str)
op_args = self.partial_kwargs.pop("op_args", []) + self.mapped_kwargs.pop("op_args", [])
op_kwargs = _merge_kwargs(
self.partial_kwargs.pop("op_kwargs", {}),
self.mapped_kwargs.pop("op_kwargs", {}),
fail_reason="mapping already partial",
)
return self.operator_class(
dag=dag,
task_id=self.task_id,
op_args=op_args,
op_kwargs=op_kwargs,
multiple_outputs=self.multiple_outputs,
python_callable=self.python_callable,
**self.partial_kwargs,
**self.mapped_kwargs,
)


class Task(Generic[Function]):
Expand Down
44 changes: 44 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
from airflow.jobs.base_job import BaseJob
from airflow.jobs.local_task_job import LocalTaskJob
from airflow.models import DAG
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
Expand Down Expand Up @@ -123,6 +124,8 @@ def __init__(
)
scheduler_idle_sleep_time = processor_poll_interval
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = conf.getint('scheduler', 'scheduler_zombie_task_threshold')

self.do_pickle = do_pickle
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -739,6 +742,11 @@ def _run_scheduler_loop(self) -> None:
self._emit_pool_metrics,
)

timers.call_regular_interval(
conf.getfloat('scheduler', 'zombie_detection_interval', fallback=10.0),
self._find_zombies,
)

for loop_count in itertools.count(start=1):
with Stats.timer() as timer:

Expand Down Expand Up @@ -1259,3 +1267,39 @@ def check_trigger_timeouts(self, session: Session = None):
)
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

@provide_session
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
and update the current zombie list.
"""
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)

zombies = (
session.query(TaskInstance, DagModel.fileloc)
.join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
.join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
.filter(TaskInstance.state == State.RUNNING)
.filter(
or_(
LocalTaskJob.state != State.RUNNING,
LocalTaskJob.latest_heartbeat < limit_dttm,
)
)
.all()
)

if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

for ti, file_loc in zombies:
request = TaskCallbackRequest(
full_filepath=file_loc,
simple_task_instance=SimpleTaskInstance(ti),
msg=f"Detected {ti} as zombie",
)
self.log.error("Detected zombie job: %s", request)
self.processor_agent.send_callback_to_execute(request)
Stats.incr('zombies_killed')
Loading