diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index e3358a3e1dd2b..106f1cdca6795 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -949,6 +949,12 @@ def set_file_paths(self, new_file_paths): Stats.decr("dag_processing.processes") processor.terminate() self._file_stats.pop(file_path) + + to_remove = set(self._file_stats.keys()) - set(self._file_paths) + for key in to_remove: + # Remove the stats for any dag files that don't exist anymore + del self._file_stats[key] + self._processors = filtered_processors def wait_until_finished(self): @@ -1064,6 +1070,7 @@ def prepare_file_path_queue(self): is_mtime_mode = list_mode == "modified_time" file_paths_recently_processed = [] + file_paths_to_stop_watching = set() for file_path in self._file_paths: if is_mtime_mode: @@ -1071,6 +1078,8 @@ def prepare_file_path_queue(self): files_with_mtime[file_path] = os.path.getmtime(file_path) except FileNotFoundError: self.log.warning("Skipping processing of missing file: %s", file_path) + self._file_stats.pop(file_path, None) + file_paths_to_stop_watching.add(file_path) continue file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path])) else: @@ -1099,12 +1108,18 @@ def prepare_file_path_queue(self): # set of files. Since we set the seed, the sort order will remain same per host random.Random(get_hostname()).shuffle(file_paths) + if file_paths_to_stop_watching: + self.set_file_paths( + [path for path in self._file_paths if path not in file_paths_to_stop_watching] + ) + files_paths_at_run_limit = [ file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs ] file_paths_to_exclude = set(file_paths_in_progress).union( - file_paths_recently_processed, files_paths_at_run_limit + file_paths_recently_processed, + files_paths_at_run_limit, ) # Do not convert the following list to set as set does not preserve the order @@ -1122,12 +1137,11 @@ def prepare_file_path_queue(self): self.log.debug("Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue)) + default = DagFileStat( + num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0 + ) for file_path in files_paths_to_queue: - if file_path not in self._file_stats: - self._file_stats[file_path] = DagFileStat( - num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0 - ) - + self._file_stats.setdefault(file_path, default) self._file_path_queue.extend(files_paths_to_queue) def _kill_timed_out_processors(self): @@ -1153,6 +1167,15 @@ def _kill_timed_out_processors(self): self.waitables.pop(processor.waitable_handle) processors_to_remove.append(file_path) + stat = DagFileStat( + num_dags=0, + import_errors=1, + last_finish_time=now, + last_duration=duration, + run_count=self.get_run_count(file_path) + 1, + ) + self._file_stats[processor.file_path] = stat + # Clean up `self._processors` after iterating over it for proc in processors_to_remove: self._processors.pop(proc) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 7df0e78840cd0..2e5c0c2345c5b 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -257,6 +257,7 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager.set_file_paths(["abc.txt"]) assert manager._processors == {} + assert "missing_file.txt" not in manager._file_stats def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): manager = DagFileProcessorManager(