Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,21 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
:type pickle_dags: bool
:param dag_id_white_list: If specified, only look at these DAG ID's
:type dag_id_white_list: list[unicode]
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
"""

# Counter that increments every time an instance of this class is created
class_creation_counter = 0

def __init__(self, file_path, pickle_dags, dag_id_white_list):
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
self._file_path = file_path

# The process that was launched to process the given .
self._process = None
self._dag_id_white_list = dag_id_white_list
self._pickle_dags = pickle_dags
self._zombies = zombies
# The result of Scheduler.process_file(file_path).
self._result = None
# Whether the process is done running.
Expand All @@ -94,7 +97,8 @@ def _run_file_processor(result_channel,
file_path,
pickle_dags,
dag_id_white_list,
thread_name):
thread_name,
zombies):
"""
Process the given file.

Expand All @@ -112,6 +116,8 @@ def _run_file_processor(result_channel,
:type thread_name: unicode
:return: the process that was launched
:rtype: multiprocessing.Process
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
"""
# This helper runs in the newly created process
log = logging.getLogger("airflow.processor")
Expand Down Expand Up @@ -139,7 +145,9 @@ def _run_file_processor(result_channel,
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
result = scheduler_job.process_file(file_path, pickle_dags)
result = scheduler_job.process_file(file_path,
zombies,
pickle_dags)
result_channel.send(result)
end_time = time.time()
log.info(
Expand Down Expand Up @@ -170,6 +178,7 @@ def start(self):
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id),
self._zombies
),
name="DagFileProcessor{}-Process".format(self._instance_id)
)
Expand Down Expand Up @@ -1285,10 +1294,11 @@ def _execute(self):
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

def processor_factory(file_path):
def processor_factory(file_path, zombies):
return DagFileProcessor(file_path,
pickle_dags,
self.dag_ids)
self.dag_ids,
zombies)

# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
Expand Down Expand Up @@ -1465,7 +1475,7 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]):
return dags

@provide_session
def process_file(self, file_path, pickle_dags=False, session=None):
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
"""
Process a Python file containing Airflow DAGs.

Expand All @@ -1484,6 +1494,8 @@ def process_file(self, file_path, pickle_dags=False, session=None):

:param file_path: the path to the Python file that should be executed
:type file_path: unicode
:param zombies: zombie task instances to kill.
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:type pickle_dags: bool
Expand Down Expand Up @@ -1567,7 +1579,7 @@ def process_file(self, file_path, pickle_dags=False, session=None):
except Exception:
self.log.exception("Error logging import errors!")
try:
dagbag.kill_zombies()
dagbag.kill_zombies(zombies)
except Exception:
self.log.exception("Error killing zombies!")

Expand Down
57 changes: 23 additions & 34 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import textwrap
import zipfile
from collections import namedtuple
from datetime import datetime, timedelta
from datetime import datetime

from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
from sqlalchemy import or_

from airflow import settings
from airflow.configuration import conf
Expand All @@ -41,7 +40,6 @@
from airflow.utils.db import provide_session
from airflow.utils.helpers import pprinttable
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.utils.timeout import timeout


Expand Down Expand Up @@ -273,43 +271,34 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

@provide_session
def kill_zombies(self, session=None):
def kill_zombies(self, zombies, session=None):
"""
Fail zombie tasks, which are tasks that haven't
Fail given zombie tasks, which are tasks that haven't
had a heartbeat for too long, in the current DagBag.

:param zombies: zombie task instances to kill.
:type zombies: airflow.utils.dag_processing.SimpleTaskInstance
:param session: DB session.
:type session: sqlalchemy.orm.session.Session
"""
# Avoid circular import
from airflow.models.taskinstance import TaskInstance as TI
from airflow.jobs import LocalTaskJob as LJ

# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
limit_dttm = timezone.utcnow() - timedelta(seconds=self.SCHEDULER_ZOMBIE_TASK_THRESHOLD)
self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)

tis = (
session.query(TI)
.join(LJ, TI.job_id == LJ.id)
.filter(TI.state == State.RUNNING)
.filter(TI.dag_id.in_(self.dags))
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
).all()
)
for ti in tis:
self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
ti.dag_id, ti.task_id, ti.execution_date.isoformat())
ti.test_mode = self.UNIT_TEST_MODE
ti.task = self.dags[ti.dag_id].get_task(ti.task_id)
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
self.log.info('Marked zombie job %s as %s', ti, ti.state)
Stats.incr('zombies_killed')
from airflow.models.taskinstance import TaskInstance # Avoid circular import

for zombie in zombies:
if zombie.dag_id in self.dags:
dag = self.dags[zombie.dag_id]
if zombie.task_id in dag.task_ids:
task = dag.get_task(zombie.task_id)
ti = TaskInstance(task, zombie.execution_date)
# Get properties needed for failure handling from SimpleTaskInstance.
ti.start_date = zombie.start_date
ti.end_date = zombie.end_date
ti.try_number = zombie.try_number
ti.state = zombie.state
ti.test_mode = self.UNIT_TEST_MODE
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
self.log.info('Marked zombie job %s as %s', ti, ti.state)
Stats.incr('zombies_killed')
session.commit()

def bag_dag(self, dag, parent_dag, root_dag):
Expand Down
52 changes: 49 additions & 3 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
import time
import zipfile
from abc import ABCMeta, abstractmethod
from datetime import datetime
from datetime import datetime, timedelta
from importlib import import_module
from typing import Iterable, NamedTuple, Optional

import psutil
from setproctitle import setproctitle
from sqlalchemy import or_
from tabulate import tabulate

# To avoid circular imports
Expand All @@ -47,6 +48,7 @@
from airflow.utils.db import provide_session
from airflow.utils.helpers import reap_process_group
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State


class SimpleDag(BaseDag):
Expand Down Expand Up @@ -751,6 +753,9 @@ def __init__(self,
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler',
'print_stats_interval')
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = (
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
# Map from file path to the processor
self._processors = {}

Expand All @@ -759,13 +764,14 @@ def __init__(self,
# Map from file path to stats about the file
self._file_stats = {} # type: dict(str, DagFileStat)

self._last_zombie_query_time = timezone.utcnow()
self._last_zombie_query_time = None
# Last time that the DAG dir was traversed to look for files
self.last_dag_dir_refresh_time = timezone.utcnow()
# Last time stats were printed
self.last_stat_print_time = timezone.datetime(2000, 1, 1)
# TODO: Remove magic number
self._zombie_query_interval = 10
self._zombies = []
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout

Expand Down Expand Up @@ -835,6 +841,7 @@ def start(self):
continue

self._refresh_dag_dir()
self._find_zombies()

simple_dags = self.heartbeat()
for simple_dag in simple_dags:
Expand Down Expand Up @@ -1234,7 +1241,7 @@ def heartbeat(self):
while (self._parallelism - len(self._processors) > 0 and
len(self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path)
processor = self._processor_factory(file_path, self._zombies)
Stats.incr('dag_processing.processes')

processor.start()
Expand All @@ -1249,6 +1256,45 @@ def heartbeat(self):

return simple_dags

@provide_session
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
and update the current zombie list.
"""
now = timezone.utcnow()
zombies = []
if not self._last_zombie_query_time or \
(now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval:
# to avoid circular imports
from airflow.jobs import LocalTaskJob as LJ
self.log.info("Finding 'running' jobs without a recent heartbeat")
TI = airflow.models.TaskInstance
limit_dttm = timezone.utcnow() - timedelta(
seconds=self._zombie_threshold_secs)
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)

tis = (
session.query(TI)
.join(LJ, TI.job_id == LJ.id)
.filter(TI.state == State.RUNNING)
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
).all()
)
self._last_zombie_query_time = timezone.utcnow()
for ti in tis:
sti = SimpleTaskInstance(ti)
self.log.info(
"Detected zombie job with dag_id %s, task_id %s, and execution date %s",
sti.dag_id, sti.task_id, sti.execution_date.isoformat())
zombies.append(sti)

self._zombies = zombies

def _kill_timed_out_processors(self):
"""
Kill any file processors that timeout to defend against process hangs.
Expand Down
Loading