diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index f8ce65663bd40..d96fb4f06f0dc 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -25,21 +25,21 @@ from airflow import settings from airflow.configuration import conf -from airflow.dag_processing.manager import DagFileProcessorManager +from airflow.jobs.dag_processor_job import DagProcessorJob from airflow.utils import cli as cli_utils from airflow.utils.cli import setup_locations, setup_logging log = logging.getLogger(__name__) -def _create_dag_processor_manager(args) -> DagFileProcessorManager: +def _create_dag_processor_job(args) -> DagProcessorJob: """Creates DagFileProcessorProcess instance.""" processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout") processor_timeout = timedelta(seconds=processor_timeout_seconds) - return DagFileProcessorManager( + return DagProcessorJob( + processor_timeout=processor_timeout, dag_directory=args.subdir, max_runs=args.num_runs, - processor_timeout=processor_timeout, dag_ids=[], pickle_dags=args.do_pickle, ) @@ -55,7 +55,7 @@ def dag_processor(args): if sql_conn.startswith("sqlite"): raise SystemExit("Standalone DagProcessor is not supported when using sqlite.") - manager = _create_dag_processor_manager(args) + job = _create_dag_processor_job(args) if args.daemon: pid, stdout, stderr, log_file = setup_locations( @@ -74,10 +74,6 @@ def dag_processor(args): umask=int(settings.DAEMON_UMASK, 8), ) with ctx: - try: - manager.start() - finally: - manager.terminate() - manager.end() + job.run() else: - manager.start() + job.run() diff --git a/airflow/jobs/dag_processor_job.py b/airflow/jobs/dag_processor_job.py new file mode 100644 index 0000000000000..70690a78db647 --- /dev/null +++ b/airflow/jobs/dag_processor_job.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import os +from datetime import timedelta + +from airflow.dag_processing.manager import DagFileProcessorManager +from airflow.jobs.base_job import BaseJob + + +class DagProcessorJob(BaseJob): + """ + :param dag_directory: Directory where DAG definitions are kept. All + files in file_paths should be under this directory + :param max_runs: The number of times to parse and schedule each file. -1 + for unlimited. + :param processor_timeout: How long to wait before timing out a DAG file processor + :param dag_ids: if specified, only schedule tasks with these DAG IDs + :param pickle_dags: whether to pickle DAGs. + :param async_mode: Whether to start agent in async mode + """ + + __mapper_args__ = {"polymorphic_identity": "DagProcessorJob"} + + def __init__( + self, + dag_directory: os.PathLike, + max_runs: int, + processor_timeout: timedelta, + dag_ids: list[str] | None, + pickle_dags: bool, + *args, + **kwargs, + ): + self.processor = DagFileProcessorManager( + dag_directory=dag_directory, + max_runs=max_runs, + processor_timeout=processor_timeout, + dag_ids=dag_ids, + pickle_dags=pickle_dags, + ) + super().__init__(*args, **kwargs) + + def _execute(self) -> None: + self.log.info("Starting the Dag Processor Job") + try: + self.processor.start() + except Exception: + self.log.exception("Exception when executing DagProcessorJob") + raise + finally: + self.processor.terminate() + self.processor.end() diff --git a/tests/cli/commands/test_dag_processor_command.py b/tests/cli/commands/test_dag_processor_command.py index b3ff84531f74a..69d0dcdb024fe 100644 --- a/tests/cli/commands/test_dag_processor_command.py +++ b/tests/cli/commands/test_dag_processor_command.py @@ -42,16 +42,17 @@ def setup_class(cls): ("core", "load_examples"): "False", } ) - @mock.patch("airflow.cli.commands.dag_processor_command.DagFileProcessorManager") + @mock.patch("airflow.cli.commands.dag_processor_command.DagProcessorJob") @pytest.mark.skipif( conf.get_mandatory_value("database", "sql_alchemy_conn").lower().startswith("sqlite"), reason="Standalone Dag Processor doesn't support sqlite.", ) - def test_start_manager( + def test_start_job( self, - mock_dag_manager, + mock_dag_job, ): """Ensure that DagFileProcessorManager is started""" - args = self.parser.parse_args(["dag-processor"]) - dag_processor_command.dag_processor(args) - mock_dag_manager.return_value.start.assert_called() + with conf_vars({("scheduler", "standalone_dag_processor"): "True"}): + args = self.parser.parse_args(["dag-processor"]) + dag_processor_command.dag_processor(args) + mock_dag_job.return_value.run.assert_called()