Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,12 @@ def set_file_paths(self, new_file_paths):
Stats.decr("dag_processing.processes")
processor.terminate()
self._file_stats.pop(file_path)

to_remove = set(self._file_stats.keys()) - set(self._file_paths)
for key in to_remove:
# Remove the stats for any dag files that don't exist anymore
del self._file_stats[key]

self._processors = filtered_processors

def wait_until_finished(self):
Expand Down Expand Up @@ -1064,13 +1070,16 @@ def prepare_file_path_queue(self):
is_mtime_mode = list_mode == "modified_time"

file_paths_recently_processed = []
file_paths_to_stop_watching = set()
for file_path in self._file_paths:

if is_mtime_mode:
try:
files_with_mtime[file_path] = os.path.getmtime(file_path)
except FileNotFoundError:
self.log.warning("Skipping processing of missing file: %s", file_path)
self._file_stats.pop(file_path, None)
file_paths_to_stop_watching.add(file_path)
continue
file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
else:
Expand Down Expand Up @@ -1099,12 +1108,18 @@ def prepare_file_path_queue(self):
# set of files. Since we set the seed, the sort order will remain same per host
random.Random(get_hostname()).shuffle(file_paths)

if file_paths_to_stop_watching:
self.set_file_paths(
[path for path in self._file_paths if path not in file_paths_to_stop_watching]
)

files_paths_at_run_limit = [
file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
]

file_paths_to_exclude = set(file_paths_in_progress).union(
file_paths_recently_processed, files_paths_at_run_limit
file_paths_recently_processed,
files_paths_at_run_limit,
)

# Do not convert the following list to set as set does not preserve the order
Expand All @@ -1122,12 +1137,11 @@ def prepare_file_path_queue(self):

self.log.debug("Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue))

default = DagFileStat(
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
)
for file_path in files_paths_to_queue:
if file_path not in self._file_stats:
self._file_stats[file_path] = DagFileStat(
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
)

self._file_stats.setdefault(file_path, default)
self._file_path_queue.extend(files_paths_to_queue)

def _kill_timed_out_processors(self):
Expand All @@ -1153,6 +1167,15 @@ def _kill_timed_out_processors(self):
self.waitables.pop(processor.waitable_handle)
processors_to_remove.append(file_path)

stat = DagFileStat(
num_dags=0,
import_errors=1,
last_finish_time=now,
last_duration=duration,
run_count=self.get_run_count(file_path) + 1,
)
self._file_stats[processor.file_path] = stat

# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
self._processors.pop(proc)
Expand Down
1 change: 1 addition & 0 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):

manager.set_file_paths(["abc.txt"])
assert manager._processors == {}
assert "missing_file.txt" not in manager._file_stats

def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
manager = DagFileProcessorManager(
Expand Down