diff --git a/UPDATING.md b/UPDATING.md index e03243f552f4f..2bb4134cde99f 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -186,6 +186,9 @@ indefinitely. This is only available on the command line. #### min_file_process_interval After how much time should an updated DAG be picked up from the filesystem. +#### min_file_parsing_loop_time +How many seconds to wait between file-parsing loops to prevent the logs from being spammed. + #### dag_dir_list_interval How often the scheduler should relist the contents of the DAG directory. If you experience that while developing your dags are not being picked up, have a look at this number and decrease it when necessary. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 5356af79b61d6..d000e1a29707d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -372,6 +372,9 @@ run_duration = -1 # after how much time a new DAGs should be picked up from the filesystem min_file_process_interval = 0 +# How many seconds to wait between file-parsing loops to prevent the logs from being spammed. +min_file_parsing_loop_time = 1 + dag_dir_list_interval = 300 # How often should stats be printed to the logs diff --git a/airflow/jobs.py b/airflow/jobs.py index 5a2ab5ba350e8..67bb827268dac 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -512,7 +512,8 @@ def __init__( num_runs=-1, file_process_interval=conf.getint('scheduler', 'min_file_process_interval'), - processor_poll_interval=1.0, + min_file_parsing_loop_time=conf.getint('scheduler', + 'min_file_parsing_loop_time'), run_duration=None, do_pickle=False, log=None, @@ -527,8 +528,6 @@ def __init__( :type subdir: unicode :param num_runs: The number of times to try to schedule each DAG file. -1 for unlimited within the run_duration. - :param processor_poll_interval: The number of seconds to wait between - polls of running processors :param run_duration: how long to run (in seconds) before exiting :type run_duration: int :param do_pickle: once a DAG object is obtained by executing the Python @@ -545,7 +544,6 @@ def __init__( self.num_runs = num_runs self.run_duration = run_duration - self._processor_poll_interval = processor_poll_interval self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) @@ -574,6 +572,10 @@ def __init__( # to 3 minutes. self.file_process_interval = file_process_interval + # Wait until at least this many seconds have passed before parsing files once all + # files have finished parsing. + self.min_file_parsing_loop_time = min_file_parsing_loop_time + self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query') if run_duration is None: self.run_duration = conf.getint('scheduler', @@ -1521,11 +1523,16 @@ def _execute(self): # DAGs in parallel. By processing them in separate processes, # we can get parallelism and isolation from potentially harmful # user code. - self.log.info("Processing files using up to %s processes at a time", self.max_threads) + self.log.info("Processing files using up to %s processes at a time", + self.max_threads) self.log.info("Running execute loop for %s seconds", self.run_duration) self.log.info("Processing each file at most %s times", self.num_runs) - self.log.info("Process each file at most once every %s seconds", self.file_process_interval) - self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval) + self.log.info("Process each file at most once every %s seconds", + self.file_process_interval) + self.log.info("Wait until at least %s seconds have passed between file parsing " + "loops", self.min_file_parsing_loop_time) + self.log.info("Checking for new files in %s every %s seconds", + self.subdir, self.dag_dir_list_interval) # Build up a list of Python files that could contain DAGs self.log.info("Searching for files in %s", self.subdir) @@ -1541,6 +1548,7 @@ def processor_factory(file_path): known_file_paths, self.max_threads, self.file_process_interval, + self.min_file_parsing_loop_time, self.num_runs, processor_factory) @@ -1684,13 +1692,13 @@ def _execute_helper(self, processor_manager): last_stat_print_time = timezone.utcnow() loop_end_time = time.time() - self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time) - self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval) - time.sleep(self._processor_poll_interval) + self.log.debug("Ran scheduling loop in %.2f seconds", + loop_end_time - loop_start_time) # Exit early for a test mode if processor_manager.max_runs_reached(): - self.log.info("Exiting loop as all files have been processed %s times", self.num_runs) + self.log.info("Exiting loop as all files have been processed %s times", + self.num_runs) break # Stop any processors diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index b4abd34fa7bb9..ed2bafc07a827 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -304,6 +304,7 @@ def __init__(self, file_paths, parallelism, process_file_interval, + min_file_parsing_loop_time, max_runs, processor_factory): """ @@ -317,6 +318,9 @@ def __init__(self, :param process_file_interval: process a file at most once every this many seconds :type process_file_interval: float + :param min_file_parsing_loop_time: wait until at least this many seconds have + passed before parsing files once all files have finished parsing. + :type min_file_parsing_loop_time: float :param max_runs: The number of times to parse and schedule each file. -1 for unlimited. :type max_runs: int @@ -332,6 +336,7 @@ def __init__(self, self._dag_directory = dag_directory self._max_runs = max_runs self._process_file_interval = process_file_interval + self._min_file_parsing_loop_time = min_file_parsing_loop_time self._processor_factory = processor_factory # Map from file path to the processor self._processors = {} @@ -502,12 +507,24 @@ def heartbeat(self): file_paths_in_progress = self._processors.keys() now = timezone.utcnow() file_paths_recently_processed = [] + + longest_parse_duration = 0 for file_path in self._file_paths: last_finish_time = self.get_last_finish_time(file_path) - if (last_finish_time is not None and - (now - last_finish_time).total_seconds() < - self._process_file_interval): - file_paths_recently_processed.append(file_path) + if last_finish_time is not None: + duration = now - last_finish_time + longest_parse_duration = max(duration.total_seconds(), + longest_parse_duration) + if duration.total_seconds() < self._process_file_interval: + file_paths_recently_processed.append(file_path) + + sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration, + 0) + if sleep_length > 0: + self.log.debug("Sleeping for %.2f seconds to prevent excessive " + "logging", + sleep_length) + time.sleep(sleep_length) files_paths_at_run_limit = [file_path for file_path, num_runs in self._run_count.items() diff --git a/tests/core.py b/tests/core.py index 45a6e3472fe21..a2887a8240a3d 100644 --- a/tests/core.py +++ b/tests/core.py @@ -108,10 +108,7 @@ def execute(*args, **kwargs): class CoreTest(unittest.TestCase): - # These defaults make the test faster to run - default_scheduler_args = {"file_process_interval": 0, - "processor_poll_interval": 0.5, - "num_runs": 1} + default_scheduler_args = {"num_runs": 1} def setUp(self): configuration.load_test_config() diff --git a/tests/jobs.py b/tests/jobs.py index ace593a032af6..d721d380877af 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -868,11 +868,11 @@ def test_mark_success_no_kill(self): session = settings.Session() dag.clear() - dr = dag.create_dagrun(run_id="test", - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session) + dag.create_dagrun(run_id="test", + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) ti = TI(task=task, execution_date=DEFAULT_DATE) ti.refresh_from_db() job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True) @@ -932,9 +932,6 @@ def test_localtaskjob_double_trigger(self): class SchedulerJobTest(unittest.TestCase): - # These defaults make the test faster to run - default_scheduler_args = {"file_process_interval": 0, - "processor_poll_interval": 0.5} def setUp(self): self.dagbag = DagBag() @@ -977,7 +974,7 @@ def test_process_executor_events(self): dagbag1 = self._make_simple_dag_bag([dag]) dagbag2 = self._make_simple_dag_bag([dag2]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() ti1 = TI(task1, DEFAULT_DATE) @@ -1017,7 +1014,7 @@ def test_execute_task_instances_is_paused_wont_execute(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1047,10 +1044,10 @@ def test_execute_task_instances_no_dagrun_task_will_execute(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() - dr1 = scheduler.create_dag_run(dag) + scheduler.create_dag_run(dag) ti1 = TI(task1, DEFAULT_DATE) ti1.state = State.SCHEDULED ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1) @@ -1072,7 +1069,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1097,7 +1094,7 @@ def test_find_executable_task_instances_backfill_nodagrun(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1137,7 +1134,7 @@ def test_find_executable_task_instances_pool(self): task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b') dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1178,7 +1175,7 @@ def test_nonexistent_pool(self): task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist") dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr = scheduler.create_dag_run(dag) @@ -1199,13 +1196,13 @@ def test_find_executable_task_instances_none(self): dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none' task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) - task1 = DummyOperator(dag=dag, task_id=task_id_1) + DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() - dr1 = scheduler.create_dag_run(dag) + scheduler.create_dag_run(dag) session.commit() self.assertEqual(0, len(scheduler._find_executable_task_instances( @@ -1220,7 +1217,7 @@ def test_find_executable_task_instances_concurrency(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1268,7 +1265,7 @@ def test_find_executable_task_instances_task_concurrency(self): task2 = DummyOperator(dag=dag, task_id=task_id_2) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1352,9 +1349,10 @@ def test_find_executable_task_instances_task_concurrency(self): self.assertEqual(1, len(res)) def test_change_state_for_executable_task_instances_no_tis(self): - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() - res = scheduler._change_state_for_executable_task_instances([], [State.NONE], session) + res = scheduler._change_state_for_executable_task_instances( + [], [State.NONE], session) self.assertEqual(0, len(res)) def test_change_state_for_executable_task_instances_no_tis_with_state(self): @@ -1362,9 +1360,9 @@ def test_change_state_for_executable_task_instances_no_tis_with_state(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) - dagbag = self._make_simple_dag_bag([dag]) + self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1394,9 +1392,9 @@ def test_change_state_for_executable_task_instances_none_state(self): task_id_1 = 'dummy' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) task1 = DummyOperator(dag=dag, task_id=task_id_1) - dagbag = self._make_simple_dag_bag([dag]) + self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1432,7 +1430,7 @@ def test_enqueue_task_instances_with_queued_state(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1453,7 +1451,7 @@ def test_execute_task_instances_nothing(self): task1 = DummyOperator(dag=dag, task_id=task_id_1) dagbag = SimpleDagBag([]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -1477,7 +1475,7 @@ def test_execute_task_instances(self): task2 = DummyOperator(dag=dag, task_id=task_id_2) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() # create first dag run with 1 running and 1 queued @@ -1538,7 +1536,7 @@ def test_execute_task_instances_limit(self): task2 = DummyOperator(dag=dag, task_id=task_id_2) dagbag = self._make_simple_dag_bag([dag]) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() scheduler.max_tis_per_query = 3 session = settings.Session() @@ -1714,7 +1712,7 @@ def evaluate_dagrun( if run_kwargs is None: run_kwargs = {} - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() dag = self.dagbag.get_dag(dag_id) dag.clear() dr = scheduler.create_dag_run(dag) @@ -1784,7 +1782,7 @@ def test_dagrun_root_fail_unfinished(self): DagRuns with one unfinished and one failed root task -> RUNNING """ # Run both the failed and successful tasks - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() dag_id = 'test_dagrun_states_root_fail_unfinished' dag = self.dagbag.get_dag(dag_id) dag.clear() @@ -1848,8 +1846,7 @@ def test_scheduler_start_date(self): self.assertTrue(dag.start_date > DEFAULT_DATE) scheduler = SchedulerJob(dag_id, - num_runs=2, - **self.default_scheduler_args) + num_runs=2) scheduler.run() # zero tasks ran @@ -1873,8 +1870,7 @@ def test_scheduler_start_date(self): len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) scheduler = SchedulerJob(dag_id, - num_runs=2, - **self.default_scheduler_args) + num_runs=2) scheduler.run() # still one task @@ -1892,8 +1888,6 @@ def test_scheduler_multiprocessing(self): dag.clear() scheduler = SchedulerJob(dag_ids=dag_ids, - file_process_interval=0, - processor_poll_interval=0.5, num_runs=2) scheduler.run() @@ -2411,8 +2405,7 @@ def test_scheduler_sla_miss_callback(self): # Now call manage_slas and see if the sla_miss callback gets called scheduler = SchedulerJob(dag_id='test_sla_miss', - num_runs=1, - **self.default_scheduler_args) + num_runs=1) scheduler.manage_slas(dag=dag, session=session) sla_callback.assert_not_called() @@ -2533,8 +2526,7 @@ def test_scheduler_run_duration(self): expected_run_duration = 5 start_time = timezone.utcnow() scheduler = SchedulerJob(dag_id, - run_duration=expected_run_duration, - **self.default_scheduler_args) + run_duration=expected_run_duration) scheduler.run() end_time = timezone.utcnow() @@ -2563,9 +2555,8 @@ def test_dag_with_system_exit(self): dag.clear() scheduler = SchedulerJob(dag_ids=dag_ids, - subdir= dag_directory, - num_runs=1, - **self.default_scheduler_args) + subdir=dag_directory, + num_runs=1) scheduler.run() session = settings.Session() self.assertEqual( @@ -2907,17 +2898,18 @@ def test_list_py_file_paths(self): def test_reset_orphaned_tasks_nothing(self): """Try with nothing. """ - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() - self.assertEqual(0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) + self.assertEqual( + 0, len(scheduler.reset_state_for_orphaned_tasks(session=session))) def test_reset_orphaned_tasks_external_triggered_dag(self): dag_id = 'test_reset_orphaned_tasks_external_triggered_dag' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') task_id = dag_id + '_task' - task = DummyOperator(task_id=task_id, dag=dag) + DummyOperator(task_id=task_id, dag=dag) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag, session=session) @@ -2936,9 +2928,9 @@ def test_reset_orphaned_tasks_backfill_dag(self): dag_id = 'test_reset_orphaned_tasks_backfill_dag' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') task_id = dag_id + '_task' - task = DummyOperator(task_id=task_id, dag=dag) + DummyOperator(task_id=task_id, dag=dag) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag, session=session) @@ -2958,9 +2950,9 @@ def test_reset_orphaned_tasks_specified_dagrun(self): dag_id = 'test_reset_orphaned_tasks_specified_dagrun' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') task_id = dag_id + '_task' - task = DummyOperator(task_id=task_id, dag=dag) + DummyOperator(task_id=task_id, dag=dag) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() # make two dagruns, only reset for one dr1 = scheduler.create_dag_run(dag) @@ -2992,7 +2984,7 @@ def test_reset_orphaned_tasks_nonexistent_dagrun(self): task_id = dag_id + '_task' task = DummyOperator(task_id=task_id, dag=dag) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE) @@ -3010,9 +3002,9 @@ def test_reset_orphaned_tasks_no_orphans(self): dag_id = 'test_reset_orphaned_tasks_no_orphans' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') task_id = dag_id + '_task' - task = DummyOperator(task_id=task_id, dag=dag) + DummyOperator(task_id=task_id, dag=dag) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -3032,9 +3024,9 @@ def test_reset_orphaned_tasks_non_running_dagruns(self): dag_id = 'test_reset_orphaned_tasks_non_running_dagruns' dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') task_id = dag_id + '_task' - task = DummyOperator(task_id=task_id, dag=dag) + DummyOperator(task_id=task_id, dag=dag) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() dr1 = scheduler.create_dag_run(dag) @@ -3063,7 +3055,7 @@ def test_reset_orphaned_tasks_with_orphans(self): task = DummyOperator(task_id=task_id, dag=dag) tasks.append(task) - scheduler = SchedulerJob(**self.default_scheduler_args) + scheduler = SchedulerJob() session = settings.Session() diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index 2b60cd0ca7649..f784f0457da2c 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -21,9 +21,14 @@ class TestDagFileProcessorManager(unittest.TestCase): def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): - manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'], - parallelism=1, process_file_interval=1, - max_runs=1, processor_factory=MagicMock().return_value) + manager = DagFileProcessorManager( + dag_directory='directory', + file_paths=['abc.txt'], + parallelism=1, + process_file_interval=1, + max_runs=1, + min_file_parsing_loop_time=0, + processor_factory=MagicMock().return_value) mock_processor = MagicMock() mock_processor.stop.side_effect = AttributeError( @@ -36,9 +41,14 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): self.assertDictEqual(manager._processors, {}) def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): - manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'], - parallelism=1, process_file_interval=1, - max_runs=1, processor_factory=MagicMock().return_value) + manager = DagFileProcessorManager( + dag_directory='directory', + file_paths=['abc.txt'], + parallelism=1, + process_file_interval=1, + max_runs=1, + min_file_parsing_loop_time=0, + processor_factory=MagicMock().return_value) mock_processor = MagicMock() mock_processor.stop.side_effect = AttributeError(