Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ indefinitely. This is only available on the command line.
#### min_file_process_interval
After how much time should an updated DAG be picked up from the filesystem.

#### min_file_parsing_loop_time
How many seconds to wait between file-parsing loops to prevent the logs from being spammed.

#### dag_dir_list_interval
How often the scheduler should relist the contents of the DAG directory. If you experience that while developing your
dags are not being picked up, have a look at this number and decrease it when necessary.
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ run_duration = -1
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0

# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1

dag_dir_list_interval = 300

# How often should stats be printed to the logs
Expand Down
30 changes: 19 additions & 11 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ def __init__(
num_runs=-1,
file_process_interval=conf.getint('scheduler',
'min_file_process_interval'),
processor_poll_interval=1.0,
min_file_parsing_loop_time=conf.getint('scheduler',
'min_file_parsing_loop_time'),
run_duration=None,
do_pickle=False,
log=None,
Expand All @@ -527,8 +528,6 @@ def __init__(
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited within the run_duration.
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
Expand All @@ -545,7 +544,6 @@ def __init__(

self.num_runs = num_runs
self.run_duration = run_duration
self._processor_poll_interval = processor_poll_interval

self.do_pickle = do_pickle
super(SchedulerJob, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -574,6 +572,10 @@ def __init__(
# to 3 minutes.
self.file_process_interval = file_process_interval

# Wait until at least this many seconds have passed before parsing files once all
# files have finished parsing.
self.min_file_parsing_loop_time = min_file_parsing_loop_time

self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
if run_duration is None:
self.run_duration = conf.getint('scheduler',
Expand Down Expand Up @@ -1521,11 +1523,16 @@ def _execute(self):
# DAGs in parallel. By processing them in separate processes,
# we can get parallelism and isolation from potentially harmful
# user code.
self.log.info("Processing files using up to %s processes at a time", self.max_threads)
self.log.info("Processing files using up to %s processes at a time",
self.max_threads)
self.log.info("Running execute loop for %s seconds", self.run_duration)
self.log.info("Processing each file at most %s times", self.num_runs)
self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
self.log.info("Process each file at most once every %s seconds",
self.file_process_interval)
self.log.info("Wait until at least %s seconds have passed between file parsing "
"loops", self.min_file_parsing_loop_time)
self.log.info("Checking for new files in %s every %s seconds",
self.subdir, self.dag_dir_list_interval)

# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
Expand All @@ -1541,6 +1548,7 @@ def processor_factory(file_path):
known_file_paths,
self.max_threads,
self.file_process_interval,
self.min_file_parsing_loop_time,
self.num_runs,
processor_factory)

Expand Down Expand Up @@ -1684,13 +1692,13 @@ def _execute_helper(self, processor_manager):
last_stat_print_time = timezone.utcnow()

loop_end_time = time.time()
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
time.sleep(self._processor_poll_interval)
self.log.debug("Ran scheduling loop in %.2f seconds",
loop_end_time - loop_start_time)

# Exit early for a test mode
if processor_manager.max_runs_reached():
self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
self.log.info("Exiting loop as all files have been processed %s times",
self.num_runs)
break

# Stop any processors
Expand Down
25 changes: 21 additions & 4 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def __init__(self,
file_paths,
parallelism,
process_file_interval,
min_file_parsing_loop_time,
max_runs,
processor_factory):
"""
Expand All @@ -317,6 +318,9 @@ def __init__(self,
:param process_file_interval: process a file at most once every this
many seconds
:type process_file_interval: float
:param min_file_parsing_loop_time: wait until at least this many seconds have
passed before parsing files once all files have finished parsing.
:type min_file_parsing_loop_time: float
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:type max_runs: int
Expand All @@ -332,6 +336,7 @@ def __init__(self,
self._dag_directory = dag_directory
self._max_runs = max_runs
self._process_file_interval = process_file_interval
self._min_file_parsing_loop_time = min_file_parsing_loop_time
self._processor_factory = processor_factory
# Map from file path to the processor
self._processors = {}
Expand Down Expand Up @@ -502,12 +507,24 @@ def heartbeat(self):
file_paths_in_progress = self._processors.keys()
now = timezone.utcnow()
file_paths_recently_processed = []

longest_parse_duration = 0
for file_path in self._file_paths:
last_finish_time = self.get_last_finish_time(file_path)
if (last_finish_time is not None and
(now - last_finish_time).total_seconds() <
self._process_file_interval):
file_paths_recently_processed.append(file_path)
if last_finish_time is not None:
duration = now - last_finish_time
longest_parse_duration = max(duration.total_seconds(),
longest_parse_duration)
if duration.total_seconds() < self._process_file_interval:
file_paths_recently_processed.append(file_path)

sleep_length = max(self._min_file_parsing_loop_time - longest_parse_duration,
0)
if sleep_length > 0:
self.log.debug("Sleeping for %.2f seconds to prevent excessive "
"logging",
sleep_length)
time.sleep(sleep_length)

files_paths_at_run_limit = [file_path
for file_path, num_runs in self._run_count.items()
Expand Down
5 changes: 1 addition & 4 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ def execute(*args, **kwargs):


class CoreTest(unittest.TestCase):
# These defaults make the test faster to run
default_scheduler_args = {"file_process_interval": 0,
"processor_poll_interval": 0.5,
"num_runs": 1}
default_scheduler_args = {"num_runs": 1}

def setUp(self):
configuration.load_test_config()
Expand Down
Loading