From 045440e20c679bff193ffaf2f40c91988ce22ef6 Mon Sep 17 00:00:00 2001 From: Mateusz Henc Date: Mon, 4 Apr 2022 11:37:32 +0200 Subject: [PATCH] Fix starting dag processor when running as a daemon + move reading [scheduler]standalone_dag_processor outside of the loop See https://github.com/apache/airflow/pull/22305#discussion_r841004257 --- airflow/cli/commands/dag_processor_command.py | 1 + airflow/dag_processing/manager.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index f842d6e36ca2a..a07035be506dc 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -71,6 +71,7 @@ def dag_processor(args): with ctx: try: manager.register_exit_signals() + manager.start() finally: manager.terminate() manager.end() diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index a47af36508f6c..b7a3d540a95e6 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -527,7 +527,7 @@ def _run_parsing_loop(self): self._refresh_dag_dir() self.prepare_file_path_queue() max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop") - + standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") if self._async_mode: # If we're in async mode, we can start up straight away. If we're # in sync mode we need to be told to start a "loop" @@ -578,7 +578,7 @@ def _run_parsing_loop(self): self.waitables.pop(sentinel) self._processors.pop(processor.file_path) - if conf.getboolean("scheduler", "standalone_dag_processor"): + if standalone_dag_processor: self._fetch_callbacks(max_callbacks_per_loop) self._deactivate_stale_dags() self._refresh_dag_dir()