Skip to content

Commit b0ec871

Browse files
KevinYang21ashb
authored andcommitted
[AIRFLOW-4797] Use same zombies in all DAG file processors
(cherry picked from commit cb0dbe3)
1 parent 8af58da commit b0ec871

File tree

4 files changed

+105
-18
lines changed

4 files changed

+105
-18
lines changed

airflow/models/dagbag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
from collections import namedtuple
3131
from datetime import datetime
3232

33+
from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
3334
import six
34-
from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
3535

3636
from airflow import settings
3737
from airflow.configuration import conf

airflow/utils/dag_processing.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from setproctitle import setproctitle
4141
import six
4242
from six.moves import reload_module
43+
from sqlalchemy import or_
4344
from tabulate import tabulate
4445

4546
# To avoid circular imports
@@ -54,7 +55,6 @@
5455
from airflow.utils.db import provide_session
5556
from airflow.utils.log.logging_mixin import LoggingMixin
5657
from airflow.utils.state import State
57-
from sqlalchemy import or_
5858

5959
if six.PY2:
6060
ConnectionError = IOError
@@ -776,13 +776,14 @@ def __init__(self,
776776
# Map from file path to stats about the file
777777
self._file_stats = {} # type: dict(str, DagFileStat)
778778

779-
self._last_zombie_query_time = timezone.utcnow()
779+
self._last_zombie_query_time = None
780780
# Last time that the DAG dir was traversed to look for files
781781
self.last_dag_dir_refresh_time = timezone.utcnow()
782782
# Last time stats were printed
783783
self.last_stat_print_time = timezone.datetime(2000, 1, 1)
784784
# TODO: Remove magic number
785785
self._zombie_query_interval = 10
786+
self._zombies = []
786787
# How long to wait before timing out a process to parse a DAG file
787788
self._processor_timeout = processor_timeout
788789

@@ -852,6 +853,7 @@ def start(self):
852853
continue
853854

854855
self._refresh_dag_dir()
856+
self._find_zombies()
855857

856858
simple_dags = self.heartbeat()
857859
for simple_dag in simple_dags:
@@ -1247,13 +1249,11 @@ def heartbeat(self):
12471249

12481250
self._file_path_queue.extend(files_paths_to_queue)
12491251

1250-
zombies = self._find_zombies()
1251-
12521252
# Start more processors if we have enough slots and files to process
12531253
while (self._parallelism - len(self._processors) > 0 and
12541254
len(self._file_path_queue) > 0):
12551255
file_path = self._file_path_queue.pop(0)
1256-
processor = self._processor_factory(file_path, zombies)
1256+
processor = self._processor_factory(file_path, self._zombies)
12571257
Stats.incr('dag_processing.processes')
12581258

12591259
processor.start()
@@ -1271,13 +1271,13 @@ def heartbeat(self):
12711271
@provide_session
12721272
def _find_zombies(self, session):
12731273
"""
1274-
Find zombie task instances, which are tasks haven't heartbeated for too long.
1275-
:return: Zombie task instances in SimpleTaskInstance format.
1274+
Find zombie task instances, which are tasks haven't heartbeated for too long
1275+
and update the current zombie list.
12761276
"""
12771277
now = timezone.utcnow()
12781278
zombies = []
1279-
if (now - self._last_zombie_query_time).total_seconds() \
1280-
> self._zombie_query_interval:
1279+
if not self._last_zombie_query_time or \
1280+
(now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval:
12811281
# to avoid circular imports
12821282
from airflow.jobs import LocalTaskJob as LJ
12831283
self.log.info("Finding 'running' jobs without a recent heartbeat")
@@ -1305,7 +1305,7 @@ def _find_zombies(self, session):
13051305
sti.dag_id, sti.task_id, sti.execution_date.isoformat())
13061306
zombies.append(sti)
13071307

1308-
return zombies
1308+
self._zombies = zombies
13091309

13101310
def _kill_timed_out_processors(self):
13111311
"""

tests/models/test_dagbag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919

20-
from datetime import datetime
2120
import inspect
2221
import os
2322
import shutil
2423
import textwrap
2524
import unittest
26-
from tempfile import mkdtemp, NamedTemporaryFile
25+
from datetime import datetime
26+
from tempfile import NamedTemporaryFile, mkdtemp
2727

2828
from mock import patch, ANY
2929

tests/utils/test_dag_processing.py

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@
2727
from mock import MagicMock, PropertyMock
2828

2929
from airflow.configuration import conf, mkdir_p
30-
from airflow.jobs import DagFileProcessor
31-
from airflow.jobs import LocalTaskJob as LJ
30+
from airflow.jobs import DagFileProcessor, LocalTaskJob as LJ
3231
from airflow.models import DagBag, TaskInstance as TI
3332
from airflow.utils import timezone
34-
from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager,
35-
DagFileStat, SimpleTaskInstance, correct_maybe_zipped)
33+
from airflow.utils.dag_processing import (
34+
DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, SimpleTaskInstance, correct_maybe_zipped,
35+
)
3636
from airflow.utils.db import create_session
3737
from airflow.utils.state import State
38+
from tests.test_utils.config import conf_vars
39+
from tests.test_utils.db import clear_db_runs
3840

3941
TEST_DAG_FOLDER = os.path.join(
4042
os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags')
@@ -133,6 +135,9 @@ def __exit__(self, *exc_info):
133135

134136

135137
class TestDagFileProcessorManager(unittest.TestCase):
138+
def setUp(self):
139+
clear_db_runs()
140+
136141
def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
137142
manager = DagFileProcessorManager(
138143
dag_directory='directory',
@@ -202,7 +207,8 @@ def test_find_zombies(self):
202207

203208
manager._last_zombie_query_time = timezone.utcnow() - timedelta(
204209
seconds=manager._zombie_threshold_secs + 1)
205-
zombies = manager._find_zombies()
210+
manager._find_zombies()
211+
zombies = manager._zombies
206212
self.assertEqual(1, len(zombies))
207213
self.assertIsInstance(zombies[0], SimpleTaskInstance)
208214
self.assertEqual(ti.dag_id, zombies[0].dag_id)
@@ -212,6 +218,87 @@ def test_find_zombies(self):
212218
session.query(TI).delete()
213219
session.query(LJ).delete()
214220

221+
def test_zombies_are_correctly_passed_to_dag_file_processor(self):
222+
"""
223+
Check that the same set of zombies are passed to the dag
224+
file processors until the next zombie detection logic is invoked.
225+
"""
226+
with conf_vars({('scheduler', 'max_threads'): '1',
227+
('core', 'load_examples'): 'False'}):
228+
dagbag = DagBag(os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py'))
229+
with create_session() as session:
230+
session.query(LJ).delete()
231+
dag = dagbag.get_dag('test_example_bash_operator')
232+
task = dag.get_task(task_id='run_this_last')
233+
234+
ti = TI(task, DEFAULT_DATE, State.RUNNING)
235+
lj = LJ(ti)
236+
lj.state = State.SHUTDOWN
237+
lj.id = 1
238+
ti.job_id = lj.id
239+
240+
session.add(lj)
241+
session.add(ti)
242+
session.commit()
243+
fake_zombies = [SimpleTaskInstance(ti)]
244+
245+
class FakeDagFIleProcessor(DagFileProcessor):
246+
# This fake processor will return the zombies it received in constructor
247+
# as its processing result w/o actually parsing anything.
248+
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
249+
super(FakeDagFIleProcessor, self).__init__(
250+
file_path, pickle_dags, dag_id_white_list, zombies
251+
)
252+
253+
self._result = zombies, 0
254+
255+
def start(self):
256+
pass
257+
258+
@property
259+
def start_time(self):
260+
return DEFAULT_DATE
261+
262+
@property
263+
def pid(self):
264+
return 1234
265+
266+
@property
267+
def done(self):
268+
return True
269+
270+
@property
271+
def result(self):
272+
return self._result
273+
274+
def processor_factory(file_path, zombies):
275+
return FakeDagFIleProcessor(file_path,
276+
False,
277+
[],
278+
zombies)
279+
280+
test_dag_path = os.path.join(TEST_DAG_FOLDER,
281+
'test_example_bash_operator.py')
282+
async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
283+
processor_agent = DagFileProcessorAgent(test_dag_path,
284+
[],
285+
1,
286+
processor_factory,
287+
timedelta.max,
288+
async_mode)
289+
processor_agent.start()
290+
parsing_result = []
291+
if not async_mode:
292+
processor_agent.heartbeat()
293+
while not processor_agent.done:
294+
if not async_mode:
295+
processor_agent.wait_until_finished()
296+
parsing_result.extend(processor_agent.harvest_simple_dags())
297+
298+
self.assertEqual(len(fake_zombies), len(parsing_result))
299+
self.assertEqual(set([zombie.key for zombie in fake_zombies]),
300+
set([result.key for result in parsing_result]))
301+
215302
@mock.patch("airflow.jobs.DagFileProcessor.pid", new_callable=PropertyMock)
216303
@mock.patch("airflow.jobs.DagFileProcessor.kill")
217304
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):

0 commit comments

Comments
 (0)