From 19c8c592dfd40e0ac7c7ab871e5382fc27bd299e Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Sat, 24 Aug 2019 09:53:26 -0700 Subject: [PATCH 1/2] Revert "[AIRFLOW-4797] Improve performance and behaviour of zombie detection (#5511)" This reverts commit 2bdb053db618de7064b527e6e3ebe29f220d857b. --- airflow/jobs/scheduler_job.py | 26 ++++++++--- airflow/models/dagbag.py | 56 ++++++++++------------- airflow/utils/dag_processing.py | 52 ++++++++++++++++++++- tests/models/test_dagbag.py | 73 +++--------------------------- tests/utils/test_dag_processing.py | 71 +++++++++++++++++++++++------ 5 files changed, 156 insertions(+), 122 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 6ce6cb498f888..1cfb34fc25b34 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -62,18 +62,21 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): :type pickle_dags: bool :param dag_id_white_list: If specified, only look at these DAG ID's :type dag_id_white_list: list[unicode] + :param zombies: zombie task instances to kill + :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] """ # Counter that increments every time an instance of this class is created class_creation_counter = 0 - def __init__(self, file_path, pickle_dags, dag_id_white_list): + def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies): self._file_path = file_path # The process that was launched to process the given . self._process = None self._dag_id_white_list = dag_id_white_list self._pickle_dags = pickle_dags + self._zombies = zombies # The result of Scheduler.process_file(file_path). self._result = None # Whether the process is done running. @@ -94,7 +97,8 @@ def _run_file_processor(result_channel, file_path, pickle_dags, dag_id_white_list, - thread_name): + thread_name, + zombies): """ Process the given file. @@ -112,6 +116,8 @@ def _run_file_processor(result_channel, :type thread_name: unicode :return: the process that was launched :rtype: multiprocessing.Process + :param zombies: zombie task instances to kill + :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] """ # This helper runs in the newly created process log = logging.getLogger("airflow.processor") @@ -139,7 +145,9 @@ def _run_file_processor(result_channel, log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path) scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log) - result = scheduler_job.process_file(file_path, pickle_dags) + result = scheduler_job.process_file(file_path, + zombies, + pickle_dags) result_channel.send(result) end_time = time.time() log.info( @@ -170,6 +178,7 @@ def start(self): self._pickle_dags, self._dag_id_white_list, "DagFileProcessor{}".format(self._instance_id), + self._zombies ), name="DagFileProcessor{}-Process".format(self._instance_id) ) @@ -1285,10 +1294,11 @@ def _execute(self): known_file_paths = list_py_file_paths(self.subdir) self.log.info("There are %s files in %s", len(known_file_paths), self.subdir) - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, pickle_dags, - self.dag_ids) + self.dag_ids, + zombies) # When using sqlite, we do not use async_mode # so the scheduler job and DAG parser don't access the DB at the same time. @@ -1465,7 +1475,7 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]): return dags @provide_session - def process_file(self, file_path, pickle_dags=False, session=None): + def process_file(self, file_path, zombies, pickle_dags=False, session=None): """ Process a Python file containing Airflow DAGs. @@ -1484,6 +1494,8 @@ def process_file(self, file_path, pickle_dags=False, session=None): :param file_path: the path to the Python file that should be executed :type file_path: unicode + :param zombies: zombie task instances to kill. + :type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance] :param pickle_dags: whether serialize the DAGs found in the file and save them to the db :type pickle_dags: bool @@ -1567,7 +1579,7 @@ def process_file(self, file_path, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: - dagbag.kill_zombies() + dagbag.kill_zombies(zombies) except Exception: self.log.exception("Error killing zombies!") diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index b09f47f095ec2..64195fbb9b192 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -25,7 +25,7 @@ import textwrap import zipfile from collections import namedtuple -from datetime import datetime, timedelta +from datetime import datetime from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter from sqlalchemy import or_ @@ -41,7 +41,6 @@ from airflow.utils.db import provide_session from airflow.utils.helpers import pprinttable from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.state import State from airflow.utils.timeout import timeout @@ -273,43 +272,34 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): return found_dags @provide_session - def kill_zombies(self, session=None): + def kill_zombies(self, zombies, session=None): """ - Fail zombie tasks, which are tasks that haven't + Fail given zombie tasks, which are tasks that haven't had a heartbeat for too long, in the current DagBag. + :param zombies: zombie task instances to kill. + :type zombies: airflow.utils.dag_processing.SimpleTaskInstance :param session: DB session. :type session: sqlalchemy.orm.session.Session """ - # Avoid circular import - from airflow.models.taskinstance import TaskInstance as TI - from airflow.jobs import LocalTaskJob as LJ - - # How many seconds do we wait for tasks to heartbeat before mark them as zombies. - limit_dttm = timezone.utcnow() - timedelta(seconds=self.SCHEDULER_ZOMBIE_TASK_THRESHOLD) - self.log.debug("Failing jobs without heartbeat after %s", limit_dttm) - - tis = ( - session.query(TI) - .join(LJ, TI.job_id == LJ.id) - .filter(TI.state == State.RUNNING) - .filter(TI.dag_id.in_(self.dags)) - .filter( - or_( - LJ.state != State.RUNNING, - LJ.latest_heartbeat < limit_dttm, - ) - ).all() - ) - for ti in tis: - self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s", - ti.dag_id, ti.task_id, ti.execution_date.isoformat()) - ti.test_mode = self.UNIT_TEST_MODE - ti.task = self.dags[ti.dag_id].get_task(ti.task_id) - ti.handle_failure("{} detected as zombie".format(ti), - ti.test_mode, ti.get_template_context()) - self.log.info('Marked zombie job %s as %s', ti, ti.state) - Stats.incr('zombies_killed') + from airflow.models.taskinstance import TaskInstance # Avoid circular import + + for zombie in zombies: + if zombie.dag_id in self.dags: + dag = self.dags[zombie.dag_id] + if zombie.task_id in dag.task_ids: + task = dag.get_task(zombie.task_id) + ti = TaskInstance(task, zombie.execution_date) + # Get properties needed for failure handling from SimpleTaskInstance. + ti.start_date = zombie.start_date + ti.end_date = zombie.end_date + ti.try_number = zombie.try_number + ti.state = zombie.state + ti.test_mode = self.UNIT_TEST_MODE + ti.handle_failure("{} detected as zombie".format(ti), + ti.test_mode, ti.get_template_context()) + self.log.info('Marked zombie job %s as %s', ti, ti.state) + Stats.incr('zombies_killed') session.commit() def bag_dag(self, dag, parent_dag, root_dag): diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index aadbac9d4d69e..c0d98dfe211a5 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -28,7 +28,9 @@ import time import zipfile from abc import ABCMeta, abstractmethod -from datetime import datetime +from collections import defaultdict +from collections import namedtuple +from datetime import datetime, timedelta from importlib import import_module from typing import Iterable, NamedTuple, Optional @@ -47,6 +49,8 @@ from airflow.utils.db import provide_session from airflow.utils.helpers import reap_process_group from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State +from sqlalchemy import or_ class SimpleDag(BaseDag): @@ -751,6 +755,9 @@ def __init__(self, # 30 seconds. self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval') + # How many seconds do we wait for tasks to heartbeat before mark them as zombies. + self._zombie_threshold_secs = ( + conf.getint('scheduler', 'scheduler_zombie_task_threshold')) # Map from file path to the processor self._processors = {} @@ -1230,11 +1237,13 @@ def heartbeat(self): self._file_path_queue.extend(files_paths_to_queue) + zombies = self._find_zombies() + # Start more processors if we have enough slots and files to process while (self._parallelism - len(self._processors) > 0 and len(self._file_path_queue) > 0): file_path = self._file_path_queue.pop(0) - processor = self._processor_factory(file_path) + processor = self._processor_factory(file_path, zombies) Stats.incr('dag_processing.processes') processor.start() @@ -1249,6 +1258,45 @@ def heartbeat(self): return simple_dags + @provide_session + def _find_zombies(self, session): + """ + Find zombie task instances, which are tasks haven't heartbeated for too long. + :return: Zombie task instances in SimpleTaskInstance format. + """ + now = timezone.utcnow() + zombies = [] + if (now - self._last_zombie_query_time).total_seconds() \ + > self._zombie_query_interval: + # to avoid circular imports + from airflow.jobs import LocalTaskJob as LJ + self.log.info("Finding 'running' jobs without a recent heartbeat") + TI = airflow.models.TaskInstance + limit_dttm = timezone.utcnow() - timedelta( + seconds=self._zombie_threshold_secs) + self.log.info("Failing jobs without heartbeat after %s", limit_dttm) + + tis = ( + session.query(TI) + .join(LJ, TI.job_id == LJ.id) + .filter(TI.state == State.RUNNING) + .filter( + or_( + LJ.state != State.RUNNING, + LJ.latest_heartbeat < limit_dttm, + ) + ).all() + ) + self._last_zombie_query_time = timezone.utcnow() + for ti in tis: + sti = SimpleTaskInstance(ti) + self.log.info( + "Detected zombie job with dag_id %s, task_id %s, and execution date %s", + sti.dag_id, sti.task_id, sti.execution_date.isoformat()) + zombies.append(sti) + + return zombies + def _kill_timed_out_processors(self): """ Kill any file processors that timeout to defend against process hangs. diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index e3ab7874495a4..99ba9473199c9 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +from datetime import datetime, timezone import inspect import os import shutil @@ -29,6 +30,7 @@ import airflow.example_dags from airflow import models from airflow.configuration import conf +from airflow.utils.dag_processing import SimpleTaskInstance from airflow.jobs import LocalTaskJob as LJ from airflow.models import DagBag, DagModel, TaskInstance as TI from airflow.utils.db import create_session @@ -605,92 +607,29 @@ def test_process_file_with_none(self): self.assertEqual([], dagbag.process_file(None)) @patch.object(TI, 'handle_failure') - def test_kill_zombies_when_job_state_is_not_running(self, mock_ti_handle_failure): + def test_kill_zombies(self, mock_ti_handle_failure): """ - Test that kill zombies calls TI's failure handler with proper context + Test that kill zombies call TIs failure handler with proper context """ dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) with create_session() as session: session.query(TI).delete() - session.query(LJ).delete() dag = dagbag.get_dag('example_branch_operator') task = dag.get_task(task_id='run_this_first') ti = TI(task, DEFAULT_DATE, State.RUNNING) - lj = LJ(ti) - lj.state = State.SHUTDOWN - lj.id = 1 - ti.job_id = lj.id - session.add(lj) session.add(ti) session.commit() - dagbag.kill_zombies() + zombies = [SimpleTaskInstance(ti)] + dagbag.kill_zombies(zombies) mock_ti_handle_failure.assert_called_once_with( ANY, conf.getboolean('core', 'unit_test_mode'), ANY ) - @patch.object(TI, 'handle_failure') - def test_kill_zombie_when_job_received_no_heartbeat(self, mock_ti_handle_failure): - """ - Test that kill zombies calls TI's failure handler with proper context - """ - zombie_threshold_secs = ( - conf.getint('scheduler', 'scheduler_zombie_task_threshold')) - dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) - with create_session() as session: - session.query(TI).delete() - session.query(LJ).delete() - dag = dagbag.get_dag('example_branch_operator') - task = dag.get_task(task_id='run_this_first') - - ti = TI(task, DEFAULT_DATE, State.RUNNING) - lj = LJ(ti) - lj.latest_heartbeat = utcnow() - timedelta(seconds=zombie_threshold_secs) - lj.state = State.RUNNING - lj.id = 1 - ti.job_id = lj.id - - session.add(lj) - session.add(ti) - session.commit() - - dagbag.kill_zombies() - mock_ti_handle_failure.assert_called_once_with( - ANY, - conf.getboolean('core', 'unit_test_mode'), - ANY - ) - - @patch.object(TI, 'handle_failure') - def test_kill_zombies_doesn_nothing(self, mock_ti_handle_failure): - """ - Test that kill zombies does nothing when job is running and received heartbeat - """ - dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True) - with create_session() as session: - session.query(TI).delete() - session.query(LJ).delete() - dag = dagbag.get_dag('example_branch_operator') - task = dag.get_task(task_id='run_this_first') - - ti = TI(task, DEFAULT_DATE, State.RUNNING) - lj = LJ(ti) - lj.latest_heartbeat = utcnow() - lj.state = State.RUNNING - lj.id = 1 - ti.job_id = lj.id - - session.add(lj) - session.add(ti) - session.commit() - - dagbag.kill_zombies() - mock_ti_handle_failure.assert_not_called() - def test_deactivate_unknown_dags(self): """ Test that dag_ids not passed into deactivate_unknown_dags diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index b1e20ff52f479..ac5ade5d1c228 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -18,20 +18,24 @@ # under the License. import os -import pathlib import sys import tempfile import unittest from datetime import datetime, timedelta from unittest import mock -from unittest.mock import MagicMock, PropertyMock + +import pathlib +from unittest.mock import (MagicMock, PropertyMock) from airflow.configuration import conf from airflow.jobs import DagFileProcessor +from airflow.jobs import LocalTaskJob as LJ +from airflow.models import DagBag, TaskInstance as TI from airflow.utils import timezone -from airflow.utils.dag_processing import ( - DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, correct_maybe_zipped, -) +from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, + SimpleTaskInstance, correct_maybe_zipped) +from airflow.utils.db import create_session +from airflow.utils.state import State TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags') @@ -171,6 +175,44 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): manager.set_file_paths(['abc.txt']) self.assertDictEqual(manager._processors, {'abc.txt': mock_processor}) + def test_find_zombies(self): + manager = DagFileProcessorManager( + dag_directory='directory', + file_paths=['abc.txt'], + max_runs=1, + processor_factory=MagicMock().return_value, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + async_mode=True) + + dagbag = DagBag(TEST_DAG_FOLDER) + with create_session() as session: + session.query(LJ).delete() + dag = dagbag.get_dag('example_branch_operator') + task = dag.get_task(task_id='run_this_first') + + ti = TI(task, DEFAULT_DATE, State.RUNNING) + lj = LJ(ti) + lj.state = State.SHUTDOWN + lj.id = 1 + ti.job_id = lj.id + + session.add(lj) + session.add(ti) + session.commit() + + manager._last_zombie_query_time = timezone.utcnow() - timedelta( + seconds=manager._zombie_threshold_secs + 1) + zombies = manager._find_zombies() + self.assertEqual(1, len(zombies)) + self.assertIsInstance(zombies[0], SimpleTaskInstance) + self.assertEqual(ti.dag_id, zombies[0].dag_id) + self.assertEqual(ti.task_id, zombies[0].task_id) + self.assertEqual(ti.execution_date, zombies[0].execution_date) + + session.query(TI).delete() + session.query(LJ).delete() + @mock.patch("airflow.jobs.DagFileProcessor.pid", new_callable=PropertyMock) @mock.patch("airflow.jobs.DagFileProcessor.kill") def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid): @@ -184,7 +226,7 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid): signal_conn=MagicMock(), async_mode=True) - processor = DagFileProcessor('abc.txt', False, []) + processor = DagFileProcessor('abc.txt', False, [], []) processor._start_time = timezone.make_aware(datetime.min) manager._processors = {'abc.txt': processor} manager._kill_timed_out_processors() @@ -203,7 +245,7 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p signal_conn=MagicMock(), async_mode=True) - processor = DagFileProcessor('abc.txt', False, []) + processor = DagFileProcessor('abc.txt', False, [], []) processor._start_time = timezone.make_aware(datetime.max) manager._processors = {'abc.txt': processor} manager._kill_timed_out_processors() @@ -230,10 +272,11 @@ class path, thus when reloading logging module the airflow.processor_manager with settings_context(SETTINGS_FILE_VALID): # Launch a process through DagFileProcessorAgent, which will try # reload the logging module. - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, False, - []) + [], + zombies) test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') @@ -262,10 +305,11 @@ def processor_factory(file_path): self.assertFalse(os.path.isfile(log_file_loc)) def test_parse_once(self): - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, False, - []) + [], + zombies) test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') @@ -288,10 +332,11 @@ def processor_factory(file_path): self.assertEqual(dag_ids.count('test_start_date_scheduling'), 1) def test_launch_process(self): - def processor_factory(file_path): + def processor_factory(file_path, zombies): return DagFileProcessor(file_path, False, - []) + [], + zombies) test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') From 4ad08eccef59fa62871849fc2a66ac3a6bcad50c Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Sat, 7 Sep 2019 01:51:15 -0700 Subject: [PATCH 2/2] [AIRFLOW-4797] Use same zombies in all DAG file processors --- airflow/models/dagbag.py | 1 - airflow/utils/dag_processing.py | 22 +++---- tests/models/test_dagbag.py | 7 +- tests/utils/test_dag_processing.py | 100 ++++++++++++++++++++++++++--- 4 files changed, 104 insertions(+), 26 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 64195fbb9b192..ff3dea32ba236 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -28,7 +28,6 @@ from datetime import datetime from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter -from sqlalchemy import or_ from airflow import settings from airflow.configuration import conf diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index c0d98dfe211a5..70254bb63a988 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -28,14 +28,13 @@ import time import zipfile from abc import ABCMeta, abstractmethod -from collections import defaultdict -from collections import namedtuple from datetime import datetime, timedelta from importlib import import_module from typing import Iterable, NamedTuple, Optional import psutil from setproctitle import setproctitle +from sqlalchemy import or_ from tabulate import tabulate # To avoid circular imports @@ -50,7 +49,6 @@ from airflow.utils.helpers import reap_process_group from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State -from sqlalchemy import or_ class SimpleDag(BaseDag): @@ -766,13 +764,14 @@ def __init__(self, # Map from file path to stats about the file self._file_stats = {} # type: dict(str, DagFileStat) - self._last_zombie_query_time = timezone.utcnow() + self._last_zombie_query_time = None # Last time that the DAG dir was traversed to look for files self.last_dag_dir_refresh_time = timezone.utcnow() # Last time stats were printed self.last_stat_print_time = timezone.datetime(2000, 1, 1) # TODO: Remove magic number self._zombie_query_interval = 10 + self._zombies = [] # How long to wait before timing out a process to parse a DAG file self._processor_timeout = processor_timeout @@ -842,6 +841,7 @@ def start(self): continue self._refresh_dag_dir() + self._find_zombies() simple_dags = self.heartbeat() for simple_dag in simple_dags: @@ -1237,13 +1237,11 @@ def heartbeat(self): self._file_path_queue.extend(files_paths_to_queue) - zombies = self._find_zombies() - # Start more processors if we have enough slots and files to process while (self._parallelism - len(self._processors) > 0 and len(self._file_path_queue) > 0): file_path = self._file_path_queue.pop(0) - processor = self._processor_factory(file_path, zombies) + processor = self._processor_factory(file_path, self._zombies) Stats.incr('dag_processing.processes') processor.start() @@ -1261,13 +1259,13 @@ def heartbeat(self): @provide_session def _find_zombies(self, session): """ - Find zombie task instances, which are tasks haven't heartbeated for too long. - :return: Zombie task instances in SimpleTaskInstance format. + Find zombie task instances, which are tasks haven't heartbeated for too long + and update the current zombie list. """ now = timezone.utcnow() zombies = [] - if (now - self._last_zombie_query_time).total_seconds() \ - > self._zombie_query_interval: + if not self._last_zombie_query_time or \ + (now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval: # to avoid circular imports from airflow.jobs import LocalTaskJob as LJ self.log.info("Finding 'running' jobs without a recent heartbeat") @@ -1295,7 +1293,7 @@ def _find_zombies(self, session): sti.dag_id, sti.task_id, sti.execution_date.isoformat()) zombies.append(sti) - return zombies + self._zombies = zombies def _kill_timed_out_processors(self): """ diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 99ba9473199c9..67dd9f16955a9 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -17,25 +17,22 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime, timezone import inspect import os import shutil import textwrap import unittest -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from tempfile import NamedTemporaryFile, mkdtemp from unittest.mock import ANY, patch import airflow.example_dags from airflow import models from airflow.configuration import conf -from airflow.utils.dag_processing import SimpleTaskInstance -from airflow.jobs import LocalTaskJob as LJ from airflow.models import DagBag, DagModel, TaskInstance as TI +from airflow.utils.dag_processing import SimpleTaskInstance from airflow.utils.db import create_session from airflow.utils.state import State -from airflow.utils.timezone import utcnow from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER from tests.test_utils.config import conf_vars diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index ac5ade5d1c228..c34810925d461 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -18,24 +18,25 @@ # under the License. import os +import pathlib import sys import tempfile import unittest from datetime import datetime, timedelta from unittest import mock - -import pathlib -from unittest.mock import (MagicMock, PropertyMock) +from unittest.mock import MagicMock, PropertyMock from airflow.configuration import conf -from airflow.jobs import DagFileProcessor -from airflow.jobs import LocalTaskJob as LJ +from airflow.jobs import DagFileProcessor, LocalTaskJob as LJ from airflow.models import DagBag, TaskInstance as TI from airflow.utils import timezone -from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, - SimpleTaskInstance, correct_maybe_zipped) +from airflow.utils.dag_processing import ( + DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, SimpleTaskInstance, correct_maybe_zipped, +) from airflow.utils.db import create_session from airflow.utils.state import State +from tests.test_utils.config import conf_vars +from tests.test_utils.db import clear_db_runs TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags') @@ -134,6 +135,9 @@ def __exit__(self, *exc_info): class TestDagFileProcessorManager(unittest.TestCase): + def setUp(self): + clear_db_runs() + def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager = DagFileProcessorManager( dag_directory='directory', @@ -203,7 +207,8 @@ def test_find_zombies(self): manager._last_zombie_query_time = timezone.utcnow() - timedelta( seconds=manager._zombie_threshold_secs + 1) - zombies = manager._find_zombies() + manager._find_zombies() + zombies = manager._zombies self.assertEqual(1, len(zombies)) self.assertIsInstance(zombies[0], SimpleTaskInstance) self.assertEqual(ti.dag_id, zombies[0].dag_id) @@ -213,6 +218,85 @@ def test_find_zombies(self): session.query(TI).delete() session.query(LJ).delete() + def test_zombies_are_correctly_passed_to_dag_file_processor(self): + """ + Check that the same set of zombies are passed to the dag + file processors until the next zombie detection logic is invoked. + """ + with conf_vars({('scheduler', 'max_threads'): '1', + ('core', 'load_examples'): 'False'}): + dagbag = DagBag(os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')) + with create_session() as session: + session.query(LJ).delete() + dag = dagbag.get_dag('test_example_bash_operator') + task = dag.get_task(task_id='run_this_last') + + ti = TI(task, DEFAULT_DATE, State.RUNNING) + lj = LJ(ti) + lj.state = State.SHUTDOWN + lj.id = 1 + ti.job_id = lj.id + + session.add(lj) + session.add(ti) + session.commit() + fake_zombies = [SimpleTaskInstance(ti)] + + class FakeDagFIleProcessor(DagFileProcessor): + # This fake processor will return the zombies it received in constructor + # as its processing result w/o actually parsing anything. + def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies): + super().__init__(file_path, pickle_dags, dag_id_white_list, zombies) + + self._result = zombies, 0 + + def start(self): + pass + + @property + def start_time(self): + return DEFAULT_DATE + + @property + def pid(self): + return 1234 + + @property + def done(self): + return True + + @property + def result(self): + return self._result + + def processor_factory(file_path, zombies): + return FakeDagFIleProcessor(file_path, + False, + [], + zombies) + + test_dag_path = os.path.join(TEST_DAG_FOLDER, + 'test_example_bash_operator.py') + async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') + processor_agent = DagFileProcessorAgent(test_dag_path, + [], + 1, + processor_factory, + timedelta.max, + async_mode) + processor_agent.start() + parsing_result = [] + if not async_mode: + processor_agent.heartbeat() + while not processor_agent.done: + if not async_mode: + processor_agent.wait_until_finished() + parsing_result.extend(processor_agent.harvest_simple_dags()) + + self.assertEqual(len(fake_zombies), len(parsing_result)) + self.assertEqual(set([zombie.key for zombie in fake_zombies]), + set([result.key for result in parsing_result])) + @mock.patch("airflow.jobs.DagFileProcessor.pid", new_callable=PropertyMock) @mock.patch("airflow.jobs.DagFileProcessor.kill") def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):