Skip to content

Commit 8af58da

Browse files
KevinYang21ashb
authored andcommitted
Revert "[AIRFLOW-4797] Improve performance and behaviour of zombie detection (#5511)"
This reverts commit 5842247. (cherry picked from commit 8979607)
1 parent e2bec69 commit 8af58da

File tree

5 files changed

+150
-121
lines changed

5 files changed

+150
-121
lines changed

airflow/jobs/scheduler_job.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,21 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
6969
:type pickle_dags: bool
7070
:param dag_id_white_list: If specified, only look at these DAG ID's
7171
:type dag_id_white_list: list[unicode]
72+
:param zombies: zombie task instances to kill
73+
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
7274
"""
7375

7476
# Counter that increments every time an instance of this class is created
7577
class_creation_counter = 0
7678

77-
def __init__(self, file_path, pickle_dags, dag_id_white_list):
79+
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
7880
self._file_path = file_path
7981

8082
# The process that was launched to process the given .
8183
self._process = None
8284
self._dag_id_white_list = dag_id_white_list
8385
self._pickle_dags = pickle_dags
86+
self._zombies = zombies
8487
# The result of Scheduler.process_file(file_path).
8588
self._result = None
8689
# Whether the process is done running.
@@ -101,7 +104,8 @@ def _run_file_processor(result_channel,
101104
file_path,
102105
pickle_dags,
103106
dag_id_white_list,
104-
thread_name):
107+
thread_name,
108+
zombies):
105109
"""
106110
Process the given file.
107111
@@ -119,6 +123,8 @@ def _run_file_processor(result_channel,
119123
:type thread_name: unicode
120124
:return: the process that was launched
121125
:rtype: multiprocessing.Process
126+
:param zombies: zombie task instances to kill
127+
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
122128
"""
123129
# This helper runs in the newly created process
124130
log = logging.getLogger("airflow.processor")
@@ -146,7 +152,9 @@ def _run_file_processor(result_channel,
146152
log.info("Started process (PID=%s) to work on %s",
147153
os.getpid(), file_path)
148154
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
149-
result = scheduler_job.process_file(file_path, pickle_dags)
155+
result = scheduler_job.process_file(file_path,
156+
zombies,
157+
pickle_dags)
150158
result_channel.send(result)
151159
end_time = time.time()
152160
log.info(
@@ -177,6 +185,7 @@ def start(self):
177185
self._pickle_dags,
178186
self._dag_id_white_list,
179187
"DagFileProcessor{}".format(self._instance_id),
188+
self._zombies
180189
),
181190
name="DagFileProcessor{}-Process".format(self._instance_id)
182191
)
@@ -1324,10 +1333,11 @@ def _execute(self):
13241333
known_file_paths = list_py_file_paths(self.subdir)
13251334
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
13261335

1327-
def processor_factory(file_path):
1336+
def processor_factory(file_path, zombies):
13281337
return DagFileProcessor(file_path,
13291338
pickle_dags,
1330-
self.dag_ids)
1339+
self.dag_ids,
1340+
zombies)
13311341

13321342
# When using sqlite, we do not use async_mode
13331343
# so the scheduler job and DAG parser don't access the DB at the same time.
@@ -1488,7 +1498,7 @@ def _execute_helper(self):
14881498
models.DAG.deactivate_stale_dags(execute_start_time)
14891499

14901500
@provide_session
1491-
def process_file(self, file_path, pickle_dags=False, session=None):
1501+
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
14921502
"""
14931503
Process a Python file containing Airflow DAGs.
14941504
@@ -1507,6 +1517,8 @@ def process_file(self, file_path, pickle_dags=False, session=None):
15071517
15081518
:param file_path: the path to the Python file that should be executed
15091519
:type file_path: unicode
1520+
:param zombies: zombie task instances to kill.
1521+
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
15101522
:param pickle_dags: whether serialize the DAGs found in the file and
15111523
save them to the db
15121524
:type pickle_dags: bool
@@ -1598,7 +1610,7 @@ def process_file(self, file_path, pickle_dags=False, session=None):
15981610
except Exception:
15991611
self.log.exception("Error logging import errors!")
16001612
try:
1601-
dagbag.kill_zombies()
1613+
dagbag.kill_zombies(zombies)
16021614
except Exception:
16031615
self.log.exception("Error killing zombies!")
16041616

airflow/models/dagbag.py

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828
import textwrap
2929
import zipfile
3030
from collections import namedtuple
31-
from datetime import datetime, timedelta
31+
from datetime import datetime
3232

3333
import six
3434
from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
35-
from sqlalchemy import or_
3635

3736
from airflow import settings
3837
from airflow.configuration import conf
@@ -45,7 +44,6 @@
4544
from airflow.utils.db import provide_session
4645
from airflow.utils.helpers import pprinttable
4746
from airflow.utils.log.logging_mixin import LoggingMixin
48-
from airflow.utils.state import State
4947
from airflow.utils.timeout import timeout
5048

5149

@@ -277,43 +275,33 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
277275
return found_dags
278276

279277
@provide_session
280-
def kill_zombies(self, session=None):
278+
def kill_zombies(self, zombies, session=None):
281279
"""
282-
Fail zombie tasks, which are tasks that haven't
280+
Fail given zombie tasks, which are tasks that haven't
283281
had a heartbeat for too long, in the current DagBag.
284282
283+
:param zombies: zombie task instances to kill.
284+
:type zombies: airflow.utils.dag_processing.SimpleTaskInstance
285285
:param session: DB session.
286286
:type session: sqlalchemy.orm.session.Session
287287
"""
288-
# Avoid circular import
289-
from airflow.models.taskinstance import TaskInstance as TI
290-
from airflow.jobs import LocalTaskJob as LJ
291-
292-
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
293-
limit_dttm = timezone.utcnow() - timedelta(seconds=self.SCHEDULER_ZOMBIE_TASK_THRESHOLD)
294-
self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)
295-
296-
tis = (
297-
session.query(TI)
298-
.join(LJ, TI.job_id == LJ.id)
299-
.filter(TI.state == State.RUNNING)
300-
.filter(TI.dag_id.in_(self.dags))
301-
.filter(
302-
or_(
303-
LJ.state != State.RUNNING,
304-
LJ.latest_heartbeat < limit_dttm,
305-
)
306-
).all()
307-
)
308-
for ti in tis:
309-
self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
310-
ti.dag_id, ti.task_id, ti.execution_date.isoformat())
311-
ti.test_mode = self.UNIT_TEST_MODE
312-
ti.task = self.dags[ti.dag_id].get_task(ti.task_id)
313-
ti.handle_failure("{} detected as zombie".format(ti),
314-
ti.test_mode, ti.get_template_context())
315-
self.log.info('Marked zombie job %s as %s', ti, ti.state)
316-
Stats.incr('zombies_killed')
288+
from airflow.models.taskinstance import TaskInstance # Avoid circular import
289+
290+
for zombie in zombies:
291+
if zombie.dag_id in self.dags:
292+
dag = self.dags[zombie.dag_id]
293+
if zombie.task_id in dag.task_ids:
294+
task = dag.get_task(zombie.task_id)
295+
ti = TaskInstance(task, zombie.execution_date)
296+
# Get properties needed for failure handling from SimpleTaskInstance.
297+
ti.start_date = zombie.start_date
298+
ti.end_date = zombie.end_date
299+
ti.try_number = zombie.try_number
300+
ti.state = zombie.state
301+
ti.test_mode = self.UNIT_TEST_MODE
302+
ti.handle_failure("{} detected as zombie".format(ti),
303+
ti.test_mode, ti.get_template_context())
304+
self.log.info('Marked zombie job %s as %s', ti, ti.state)
317305
session.commit()
318306

319307
def bag_dag(self, dag, parent_dag, root_dag):

airflow/utils/dag_processing.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import time
3232
import zipfile
3333
from abc import ABCMeta, abstractmethod
34-
from datetime import datetime
34+
from datetime import datetime, timedelta
3535
from importlib import import_module
3636
import enum
3737
from typing import Optional, NamedTuple, Iterable
@@ -53,6 +53,8 @@
5353
from airflow.utils.helpers import reap_process_group
5454
from airflow.utils.db import provide_session
5555
from airflow.utils.log.logging_mixin import LoggingMixin
56+
from airflow.utils.state import State
57+
from sqlalchemy import or_
5658

5759
if six.PY2:
5860
ConnectionError = IOError
@@ -763,6 +765,9 @@ def __init__(self,
763765
# 30 seconds.
764766
self.print_stats_interval = conf.getint('scheduler',
765767
'print_stats_interval')
768+
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
769+
self._zombie_threshold_secs = (
770+
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
766771
# Map from file path to the processor
767772
self._processors = {}
768773

@@ -1242,11 +1247,13 @@ def heartbeat(self):
12421247

12431248
self._file_path_queue.extend(files_paths_to_queue)
12441249

1250+
zombies = self._find_zombies()
1251+
12451252
# Start more processors if we have enough slots and files to process
12461253
while (self._parallelism - len(self._processors) > 0 and
12471254
len(self._file_path_queue) > 0):
12481255
file_path = self._file_path_queue.pop(0)
1249-
processor = self._processor_factory(file_path)
1256+
processor = self._processor_factory(file_path, zombies)
12501257
Stats.incr('dag_processing.processes')
12511258

12521259
processor.start()
@@ -1261,6 +1268,45 @@ def heartbeat(self):
12611268

12621269
return simple_dags
12631270

1271+
@provide_session
1272+
def _find_zombies(self, session):
1273+
"""
1274+
Find zombie task instances, which are tasks haven't heartbeated for too long.
1275+
:return: Zombie task instances in SimpleTaskInstance format.
1276+
"""
1277+
now = timezone.utcnow()
1278+
zombies = []
1279+
if (now - self._last_zombie_query_time).total_seconds() \
1280+
> self._zombie_query_interval:
1281+
# to avoid circular imports
1282+
from airflow.jobs import LocalTaskJob as LJ
1283+
self.log.info("Finding 'running' jobs without a recent heartbeat")
1284+
TI = airflow.models.TaskInstance
1285+
limit_dttm = timezone.utcnow() - timedelta(
1286+
seconds=self._zombie_threshold_secs)
1287+
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
1288+
1289+
tis = (
1290+
session.query(TI)
1291+
.join(LJ, TI.job_id == LJ.id)
1292+
.filter(TI.state == State.RUNNING)
1293+
.filter(
1294+
or_(
1295+
LJ.state != State.RUNNING,
1296+
LJ.latest_heartbeat < limit_dttm,
1297+
)
1298+
).all()
1299+
)
1300+
self._last_zombie_query_time = timezone.utcnow()
1301+
for ti in tis:
1302+
sti = SimpleTaskInstance(ti)
1303+
self.log.info(
1304+
"Detected zombie job with dag_id %s, task_id %s, and execution date %s",
1305+
sti.dag_id, sti.task_id, sti.execution_date.isoformat())
1306+
zombies.append(sti)
1307+
1308+
return zombies
1309+
12641310
def _kill_timed_out_processors(self):
12651311
"""
12661312
Kill any file processors that timeout to defend against process hangs.

tests/models/test_dagbag.py

Lines changed: 7 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919

20-
from datetime import datetime, timedelta
20+
from datetime import datetime
2121
import inspect
2222
import os
2323
import shutil
@@ -29,11 +29,11 @@
2929

3030
from airflow import models
3131
from airflow.configuration import conf
32-
from airflow.jobs import LocalTaskJob as LJ
32+
from airflow.utils.dag_processing import SimpleTaskInstance
3333
from airflow.models import DagModel, DagBag, TaskInstance as TI
3434
from airflow.utils.db import create_session
3535
from airflow.utils.state import State
36-
from airflow.utils.timezone import utc, utcnow
36+
from airflow.utils.timezone import utc
3737
from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE
3838
from tests.test_utils.config import conf_vars
3939
import airflow.example_dags
@@ -606,90 +606,28 @@ def test_process_file_with_none(self):
606606
self.assertEqual([], dagbag.process_file(None))
607607

608608
@patch.object(TI, 'handle_failure')
609-
def test_kill_zombies_when_job_state_is_not_running(self, mock_ti_handle_failure):
609+
def test_kill_zombies(self, mock_ti_handle_failure):
610610
"""
611-
Test that kill zombies calls TI's failure handler with proper context
611+
Test that kill zombies call TIs failure handler with proper context
612612
"""
613613
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
614614
with create_session() as session:
615615
session.query(TI).delete()
616-
session.query(LJ).delete()
617616
dag = dagbag.get_dag('example_branch_operator')
618617
task = dag.get_task(task_id='run_this_first')
619618

620619
ti = TI(task, DEFAULT_DATE, State.RUNNING)
621-
lj = LJ(ti)
622-
lj.state = State.SHUTDOWN
623-
lj.id = 1
624-
ti.job_id = lj.id
625620

626-
session.add(lj)
627621
session.add(ti)
628622
session.commit()
629623

630-
dagbag.kill_zombies()
624+
zombies = [SimpleTaskInstance(ti)]
625+
dagbag.kill_zombies(zombies)
631626
mock_ti_handle_failure \
632627
.assert_called_with(ANY,
633628
conf.getboolean('core', 'unit_test_mode'),
634629
ANY)
635630

636-
@patch.object(TI, 'handle_failure')
637-
def test_kill_zombie_when_job_received_no_heartbeat(self, mock_ti_handle_failure):
638-
"""
639-
Test that kill zombies calls TI's failure handler with proper context
640-
"""
641-
zombie_threshold_secs = (
642-
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
643-
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
644-
with create_session() as session:
645-
session.query(TI).delete()
646-
session.query(LJ).delete()
647-
dag = dagbag.get_dag('example_branch_operator')
648-
task = dag.get_task(task_id='run_this_first')
649-
650-
ti = TI(task, DEFAULT_DATE, State.RUNNING)
651-
lj = LJ(ti)
652-
lj.latest_heartbeat = utcnow() - timedelta(seconds=zombie_threshold_secs)
653-
lj.state = State.RUNNING
654-
lj.id = 1
655-
ti.job_id = lj.id
656-
657-
session.add(lj)
658-
session.add(ti)
659-
session.commit()
660-
661-
dagbag.kill_zombies()
662-
mock_ti_handle_failure \
663-
.assert_called_with(ANY,
664-
conf.getboolean('core', 'unit_test_mode'),
665-
ANY)
666-
667-
@patch.object(TI, 'handle_failure')
668-
def test_kill_zombies_doesn_nothing(self, mock_ti_handle_failure):
669-
"""
670-
Test that kill zombies does nothing when job is running and received heartbeat
671-
"""
672-
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
673-
with create_session() as session:
674-
session.query(TI).delete()
675-
session.query(LJ).delete()
676-
dag = dagbag.get_dag('example_branch_operator')
677-
task = dag.get_task(task_id='run_this_first')
678-
679-
ti = TI(task, DEFAULT_DATE, State.RUNNING)
680-
lj = LJ(ti)
681-
lj.latest_heartbeat = utcnow()
682-
lj.state = State.RUNNING
683-
lj.id = 1
684-
ti.job_id = lj.id
685-
686-
session.add(lj)
687-
session.add(ti)
688-
session.commit()
689-
690-
dagbag.kill_zombies()
691-
mock_ti_handle_failure.assert_not_called()
692-
693631
def test_deactivate_unknown_dags(self):
694632
"""
695633
Test that dag_ids not passed into deactivate_unknown_dags

0 commit comments

Comments
 (0)