diff --git a/airflow/cli/commands/local_commands/scheduler_command.py b/airflow/cli/commands/local_commands/scheduler_command.py index fb436b22b4e23..61362c497a044 100644 --- a/airflow/cli/commands/local_commands/scheduler_command.py +++ b/airflow/cli/commands/local_commands/scheduler_command.py @@ -40,7 +40,6 @@ def _run_scheduler_job(args) -> None: job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs) - ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__) enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): run_job(job=job_runner.job, execute_callable=job_runner._execute) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 2df7890a48f5d..7d9c9298a996e 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -97,7 +97,6 @@ class DagFileStat: class DagParsingSignal(enum.Enum): """All signals sent to parser.""" - AGENT_RUN_ONCE = "agent_run_once" TERMINATE_MANAGER = "terminate_manager" END_MANAGER = "end_manager" @@ -118,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): for unlimited. :param processor_timeout: How long to wait before timing out a DAG file processor :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param async_mode: Whether to start agent in async mode """ def __init__( @@ -127,14 +125,12 @@ def __init__( max_runs: int, processor_timeout: timedelta, dag_ids: list[str] | None, - async_mode: bool, ): super().__init__() self._dag_directory: os.PathLike = dag_directory self._max_runs = max_runs self._processor_timeout = processor_timeout self._dag_ids = dag_ids - self._async_mode = async_mode # Map from file path to the processor self._processors: dict[str, DagFileProcessorProcess] = {} # Pipe for communicating signals @@ -161,7 +157,6 @@ def start(self) -> None: self._processor_timeout, child_signal_conn, self._dag_ids, - self._async_mode, ), ) self._process = process @@ -170,49 +165,12 @@ def start(self) -> None: self.log.info("Launched DagFileProcessorManager with pid: %s", process.pid) - def run_single_parsing_loop(self) -> None: - """ - Send agent heartbeat signal to the manager, requesting that it runs one processing "loop". - - Should only be used when launched DAG file processor manager in sync mode. - - Call wait_until_finished to ensure that any launched processors have finished before continuing. - """ - if not self._parent_signal_conn or not self._process: - raise ValueError("Process not started.") - if not self._process.is_alive(): - return - - try: - self._parent_signal_conn.send(DagParsingSignal.AGENT_RUN_ONCE) - except ConnectionError: - # If this died cos of an error then we will noticed and restarted - # when harvest_serialized_dags calls _heartbeat_manager. - pass - def get_callbacks_pipe(self) -> MultiprocessingConnection: """Return the pipe for sending Callbacks to DagProcessorManager.""" if not self._parent_signal_conn: raise ValueError("Process not started.") return self._parent_signal_conn - def wait_until_finished(self) -> None: - """Wait until DAG parsing is finished.""" - if not self._parent_signal_conn: - raise ValueError("Process not started.") - if self._async_mode: - raise RuntimeError("wait_until_finished should only be called in sync_mode") - while self._parent_signal_conn.poll(timeout=None): - try: - result = self._parent_signal_conn.recv() - except EOFError: - return - self._process_message(result) - if isinstance(result, DagParsingStat): - # In sync mode (which is the only time we call this function) we don't send this message from - # the Manager until all the running processors have finished - return - @staticmethod def _run_processor_manager( dag_directory: os.PathLike, @@ -220,7 +178,6 @@ def _run_processor_manager( processor_timeout: timedelta, signal_conn: MultiprocessingConnection, dag_ids: list[str] | None, - async_mode: bool, ) -> None: # Make this process start as a new process group - that makes it easy # to kill all sub-process of this at the OS-level, rather than having @@ -241,7 +198,6 @@ def _run_processor_manager( processor_timeout=processor_timeout, dag_ids=dag_ids, signal_conn=signal_conn, - async_mode=async_mode, ) processor_manager.start() @@ -352,7 +308,6 @@ class DagFileProcessorManager(LoggingMixin): :param processor_timeout: How long to wait before timing out a DAG file processor :param signal_conn: connection to communicate signal with processor agent. :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param async_mode: whether to start the manager in async mode """ def __init__( @@ -362,7 +317,6 @@ def __init__( processor_timeout: timedelta, dag_ids: list[str] | None, signal_conn: MultiprocessingConnection | None = None, - async_mode: bool = True, ): super().__init__() # known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly @@ -372,30 +326,16 @@ def __init__( # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn self._dag_ids = dag_ids - self._async_mode = async_mode self._parsing_start_time: float | None = None self._dag_directory = dag_directory # Set the signal conn in to non-blocking mode, so that attempting to # send when the buffer is full errors, rather than hangs for-ever # attempting to send (this is to avoid deadlocks!) - # - # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to - # continue the scheduler - if self._async_mode and self._direct_scheduler_conn is not None: + if self._direct_scheduler_conn: os.set_blocking(self._direct_scheduler_conn.fileno(), False) self.standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") self._parallelism = conf.getint("scheduler", "parsing_processes") - if ( - conf.get_mandatory_value("database", "sql_alchemy_conn").startswith("sqlite") - and self._parallelism > 1 - ): - self.log.warning( - "Because we cannot use more than 1 thread (parsing_processes = " - "%d) when using sqlite. So we set parallelism to 1.", - self._parallelism, - ) - self._parallelism = 1 # Parse and schedule each file no faster than this interval. self._file_process_interval = conf.getint("scheduler", "min_file_process_interval") @@ -531,20 +471,13 @@ def deactivate_stale_dags( cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated) def _run_parsing_loop(self): - # In sync mode we want timeout=None -- wait forever until a message is received - if self._async_mode: - poll_time = 0.0 - else: - poll_time = None + poll_time = 0.0 self._refresh_dag_dir() self.prepare_file_path_queue() max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop") - if self._async_mode: - # If we're in async mode, we can start up straight away. If we're - # in sync mode we need to be told to start a "loop" - self.start_new_processes() + self.start_new_processes() while True: with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span: loop_start_time = time.monotonic() @@ -557,36 +490,16 @@ def _run_parsing_loop(self): self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) if agent_signal == DagParsingSignal.TERMINATE_MANAGER: - if span.is_recording(): - span.add_event(name="terminate") self.terminate() break elif agent_signal == DagParsingSignal.END_MANAGER: - if span.is_recording(): - span.add_event(name="end") self.end() sys.exit(os.EX_OK) - elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: - # continue the loop to parse dags - pass elif isinstance(agent_signal, CallbackRequest): self._add_callback_to_queue(agent_signal) else: raise ValueError(f"Invalid message {type(agent_signal)}") - if not ready and not self._async_mode: - # In "sync" mode we don't want to parse the DAGs until we - # are told to (as that would open another connection to the - # SQLite DB which isn't a good practice - - # This shouldn't happen, as in sync mode poll should block for - # ever. Lets be defensive about that. - self.log.warning( - "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time - ) - - continue - for sentinel in ready: if sentinel is not self._direct_scheduler_conn: processor = self.waitables.get(sentinel) @@ -631,14 +544,6 @@ def _run_parsing_loop(self): # Update number of loop iteration. self._num_run += 1 - if not self._async_mode: - self.log.debug("Waiting for processors to finish since we're using sqlite") - # Wait until the running DAG processors are finished before - # sending a DagParsingStat message back. This means the Agent - # can tell we've got to the end of this iteration when it sees - # this type of message - self.wait_until_finished() - # Collect anything else that has finished, but don't kick off any more processors if span.is_recording(): span.add_event(name="collect_results") @@ -664,10 +569,9 @@ def _run_parsing_loop(self): except BlockingIOError: # Try again next time around the loop! - # It is better to fail, than it is deadlock. This should - # "almost never happen" since the DagParsingStat object is - # small, and in async mode this stat is not actually _required_ - # for normal operation (It only drives "max runs") + # It is better to fail, than it is deadlock. This should "almost never happen" since the + # DagParsingStat object is small, and is not actually _required_ for normal operation (It + # only drives "max runs") self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring") if max_runs_reached: @@ -683,12 +587,11 @@ def _run_parsing_loop(self): ) break - if self._async_mode: - loop_duration = time.monotonic() - loop_start_time - if loop_duration < 1: - poll_time = 1 - loop_duration - else: - poll_time = 0.0 + loop_duration = time.monotonic() - loop_start_time + if loop_duration < 1: + poll_time = 1 - loop_duration + else: + poll_time = 0.0 @classmethod @provide_session diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index b57f7458bbb97..1ba0ed15567ba 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -119,7 +119,6 @@ class BaseExecutor(LoggingMixin): supports_sentry: bool = False is_local: bool = False - is_single_threaded: bool = False is_production: bool = True change_sensor_mode_to_reschedule: bool = False diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index 525c80791e37a..9d86f5e72703c 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -47,7 +47,6 @@ class DebugExecutor(BaseExecutor): _terminated = threading.Event() - is_single_threaded: bool = True is_production: bool = False change_sensor_mode_to_reschedule: bool = True diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 8093566ab5abe..cd025e38c8d65 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -18,9 +18,7 @@ from __future__ import annotations -import functools import logging -import os from typing import TYPE_CHECKING from airflow.exceptions import AirflowConfigException, UnknownExecutorException @@ -236,68 +234,29 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor return executor @classmethod - def import_executor_cls( - cls, executor_name: ExecutorName, validate: bool = True - ) -> tuple[type[BaseExecutor], ConnectorSource]: + def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]: """ Import the executor class. Supports the same formats as ExecutorLoader.load_executor. :param executor_name: Name of core executor or module path to executor. - :param validate: Whether or not to validate the executor before returning :return: executor class via executor_name and executor import source """ - - def _import_and_validate(path: str) -> type[BaseExecutor]: - executor = import_string(path) - if validate: - cls.validate_database_executor_compatibility(executor) - return executor - - return _import_and_validate(executor_name.module_path), executor_name.connector_source + return import_string(executor_name.module_path), executor_name.connector_source @classmethod - def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]: + def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]: """ Import the default executor class. - :param validate: Whether or not to validate the executor before returning - :return: executor class and executor import source """ executor_name = cls.get_default_executor_name() - executor, source = cls.import_executor_cls(executor_name, validate=validate) + executor, source = cls.import_executor_cls(executor_name) return executor, source - @classmethod - @functools.cache - def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None: - """ - Validate database and executor compatibility. - - Most of the databases work universally, but SQLite can only work with - single-threaded executors (e.g. Sequential). - - This is NOT done in ``airflow.configuration`` (when configuration is - initialized) because loading the executor class is heavy work we want to - avoid unless needed. - """ - # Single threaded executors can run with any backend. - if executor.is_single_threaded: - return - - # This is set in tests when we want to be able to use SQLite. - if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1": - return - - from airflow.settings import engine - - # SQLite only works with single threaded executors - if engine.dialect.name == "sqlite": - raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") - @classmethod def __load_celery_kubernetes_executor(cls) -> BaseExecutor: celery_executor = import_string(cls.executors[CELERY_EXECUTOR])() diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 7ac8047ed5d53..e3dce51e1ebde 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -49,7 +49,6 @@ class SequentialExecutor(BaseExecutor): """ is_local: bool = True - is_single_threaded: bool = True is_production: bool = False serve_logs: bool = True diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 0dd6b32f74143..21fa41aa2c592 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -209,9 +209,6 @@ def __init__( if log: self._log = log - # Check what SQL backend we use - sql_conn: str = conf.get_mandatory_value("database", "sql_alchemy_conn").lower() - self.using_sqlite = sql_conn.startswith("sqlite") # Dag Processor agent - not used in Dag Processor standalone mode. self.processor_agent: DagFileProcessorAgent | None = None @@ -930,10 +927,6 @@ def _execute(self) -> int | None: self.log.info("Processing each file at most %s times", self.num_times_parse_dags) - # When using sqlite, we do not use async_mode - # so the scheduler job and DAG parser don't access the DB at the same time. - async_mode = not self.using_sqlite - processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout") processor_timeout = timedelta(seconds=processor_timeout_seconds) if not self._standalone_dag_processor and not self.processor_agent: @@ -942,7 +935,6 @@ def _execute(self) -> int | None: max_runs=self.num_times_parse_dags, processor_timeout=processor_timeout, dag_ids=[], - async_mode=async_mode, ) reset_signals = self.register_signals() @@ -1106,13 +1098,6 @@ def _run_scheduler_loop(self) -> None: } ) - if self.using_sqlite and self.processor_agent: - self.processor_agent.run_single_parsing_loop() - # For the sqlite case w/ 1 thread, wait until the processor - # is finished to avoid concurrent access to the DB. - self.log.debug("Waiting for processors to finish since we're using sqlite") - self.processor_agent.wait_until_finished() - with create_session() as session: # This will schedule for as many executors as possible. num_queued_tis = self._do_scheduling(session) diff --git a/airflow/sentry.py b/airflow/sentry.py index 22261bd99fd2d..7a7b1df2e118e 100644 --- a/airflow/sentry.py +++ b/airflow/sentry.py @@ -85,7 +85,7 @@ def __init__(self): # LoggingIntegration is set by default. integrations = [sentry_flask] - executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False) + executor_class, _ = ExecutorLoader.import_default_executor_cls() if executor_class.supports_sentry: from sentry_sdk.integrations.celery import CeleryIntegration diff --git a/airflow/utils/orm_event_handlers.py b/airflow/utils/orm_event_handlers.py index 575f7ac939167..a62e21af33ea2 100644 --- a/airflow/utils/orm_event_handlers.py +++ b/airflow/utils/orm_event_handlers.py @@ -46,6 +46,7 @@ def connect(dbapi_connection, connection_record): def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") + cursor.execute("PRAGMA journal_mode=WAL") cursor.close() # this ensures coherence in mysql when storing datetimes (not required for postgres) diff --git a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py index 2352bcda533a2..d97706b59e7cc 100644 --- a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py @@ -52,7 +52,6 @@ DOCKER_DEFAULT_PLATFORM, MIN_DOCKER_COMPOSE_VERSION, MIN_DOCKER_VERSION, - SEQUENTIAL_EXECUTOR, ) from airflow_breeze.utils.console import Output, get_console from airflow_breeze.utils.run_utils import ( @@ -724,16 +723,13 @@ def execute_command_in_shell( :param command: """ shell_params.backend = "sqlite" - shell_params.executor = SEQUENTIAL_EXECUTOR shell_params.forward_ports = False shell_params.project_name = project_name shell_params.quiet = True shell_params.skip_environment_initialization = True shell_params.skip_image_upgrade_check = True if get_verbose(): - get_console().print(f"[warning]Backend forced to: sqlite and {SEQUENTIAL_EXECUTOR}[/]") get_console().print("[warning]Sqlite DB is cleaned[/]") - get_console().print(f"[warning]Executor forced to {SEQUENTIAL_EXECUTOR}[/]") get_console().print("[warning]Disabled port forwarding[/]") get_console().print(f"[warning]Project name set to: {project_name}[/]") get_console().print("[warning]Forced quiet mode[/]") @@ -775,13 +771,6 @@ def enter_shell(shell_params: ShellParams, output: Output | None = None) -> RunC ) bring_compose_project_down(preserve_volumes=False, shell_params=shell_params) - if shell_params.backend == "sqlite" and shell_params.executor != SEQUENTIAL_EXECUTOR: - get_console().print( - f"\n[warning]backend: sqlite is not " - f"compatible with executor: {shell_params.executor}. " - f"Changing the executor to {SEQUENTIAL_EXECUTOR}.\n" - ) - shell_params.executor = SEQUENTIAL_EXECUTOR if shell_params.restart: bring_compose_project_down(preserve_volumes=False, shell_params=shell_params) if shell_params.include_mypy_volume: diff --git a/providers/tests/celery/executors/test_celery_kubernetes_executor.py b/providers/tests/celery/executors/test_celery_kubernetes_executor.py index 6c3857912b5ce..ce51afea4e8fb 100644 --- a/providers/tests/celery/executors/test_celery_kubernetes_executor.py +++ b/providers/tests/celery/executors/test_celery_kubernetes_executor.py @@ -46,9 +46,6 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert not CeleryKubernetesExecutor.serve_logs - def test_is_single_threaded_default_value(self): - assert not CeleryKubernetesExecutor.is_single_threaded - def test_cli_commands_vended(self): assert CeleryKubernetesExecutor.get_cli_commands() diff --git a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py index 2b1817ac89853..1a745ac139315 100644 --- a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py @@ -43,9 +43,6 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert LocalKubernetesExecutor.serve_logs - def test_is_single_threaded_default_value(self): - assert not LocalKubernetesExecutor.is_single_threaded - def test_cli_commands_vended(self): assert LocalKubernetesExecutor.get_cli_commands() diff --git a/tests/cli/commands/local_commands/test_scheduler_command.py b/tests/cli/commands/local_commands/test_scheduler_command.py index c1969aa9a715b..2dfce8edde6c6 100644 --- a/tests/cli/commands/local_commands/test_scheduler_command.py +++ b/tests/cli/commands/local_commands/test_scheduler_command.py @@ -48,20 +48,9 @@ def setup_class(cls): ("LocalKubernetesExecutor", True), ], ) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_serve_logs_on_scheduler( - self, - mock_process, - mock_scheduler_job, - mock_validate, - executor, - expect_serve_logs, - ): + def test_serve_logs_on_scheduler(self, mock_process, mock_scheduler_job, executor, expect_serve_logs): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) @@ -74,14 +63,10 @@ def test_serve_logs_on_scheduler( with pytest.raises(AssertionError): mock_process.assert_has_calls([mock.call(target=serve_logs)]) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") @pytest.mark.parametrize("executor", ["LocalExecutor", "SequentialExecutor"]) - def test_skip_serve_logs(self, mock_process, mock_scheduler_job, mock_validate, executor): + def test_skip_serve_logs(self, mock_process, mock_scheduler_job, executor): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler", "--skip-serve-logs"]) with conf_vars({("core", "executor"): executor}): @@ -90,17 +75,11 @@ def test_skip_serve_logs(self, mock_process, mock_scheduler_job, mock_validate, with pytest.raises(AssertionError): mock_process.assert_has_calls([mock.call(target=serve_logs)]) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.utils.db.check_and_run_migrations") @mock.patch("airflow.utils.db.synchronize_log_template") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_check_migrations_is_false( - self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, mock_validate - ): + def test_check_migrations_is_false(self, mock_process, mock_scheduler_job, mock_log, mock_run_migration): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) with conf_vars({("database", "check_migrations"): "False"}): @@ -108,17 +87,11 @@ def test_check_migrations_is_false( mock_run_migration.assert_not_called() mock_log.assert_called_once() - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.utils.db.check_and_run_migrations") @mock.patch("airflow.utils.db.synchronize_log_template") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_check_migrations_is_true( - self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, mock_validate - ): + def test_check_migrations_is_true(self, mock_process, mock_scheduler_job, mock_log, mock_run_migration): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) with conf_vars({("database", "check_migrations"): "True"}): @@ -126,14 +99,10 @@ def test_check_migrations_is_true( mock_run_migration.assert_called_once() mock_log.assert_called_once() - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") @pytest.mark.parametrize("executor", ["LocalExecutor", "SequentialExecutor"]) - def test_graceful_shutdown(self, mock_process, mock_scheduler_job, mock_validate, executor): + def test_graceful_shutdown(self, mock_process, mock_scheduler_job, executor): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) with conf_vars({("core", "executor"): executor}): @@ -144,36 +113,18 @@ def test_graceful_shutdown(self, mock_process, mock_scheduler_job, mock_validate finally: mock_process().terminate.assert_called() - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_enable_scheduler_health( - self, - mock_process, - mock_scheduler_job, - mock_validate, - ): + def test_enable_scheduler_health(self, mock_process, mock_scheduler_job): with conf_vars({("scheduler", "enable_health_check"): "True"}): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) scheduler_command.scheduler(args) mock_process.assert_has_calls([mock.call(target=serve_health_check)]) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_disable_scheduler_health( - self, - mock_process, - mock_scheduler_job, - mock_validate, - ): + def test_disable_scheduler_health(self, mock_process, mock_scheduler_job): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) scheduler_command.scheduler(args) @@ -196,23 +147,13 @@ def test_scheduler_health_host( serve_health_check() assert http_server_mock.call_args.args[0] == (health_check_host, health_check_port) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") @mock.patch( "airflow.cli.commands.local_commands.scheduler_command.run_job", side_effect=Exception("run_job failed"), ) - def test_run_job_exception_handling( - self, - mock_run_job, - mock_process, - mock_scheduler_job, - mock_validate, - ): + def test_run_job_exception_handling(self, mock_run_job, mock_process, mock_scheduler_job): args = self.parser.parse_args(["scheduler"]) with pytest.raises(Exception, match="run_job failed"): scheduler_command.scheduler(args) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 3154f3a49dfb4..4a338e164d64b 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -137,9 +137,6 @@ def teardown_class(self): clear_db_callbacks() def run_processor_manager_one_loop(self, processor, parent_pipe): - if not processor._async_mode: - parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) - results = [] while True: @@ -167,14 +164,12 @@ def test_remove_file_clears_import_error(self, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagFileProcessorManager( dag_directory=path_to_parse.parent, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=async_mode, ) with create_session() as session: @@ -185,7 +180,7 @@ def test_remove_file_clears_import_error(self, tmp_path): path_to_parse.unlink() - # Rerun the scheduler once the dag file has been removed + # Rerun the parser once the dag file has been removed self.run_processor_manager_one_loop(manager, parent_pipe) import_errors = session.query(ParseImportError).all() @@ -199,21 +194,18 @@ def test_remove_file_clears_import_error(self, tmp_path): def test_max_runs_when_no_files(self, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagFileProcessorManager( dag_directory=os.fspath(tmp_path), max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=async_mode, ) self.run_processor_manager_one_loop(manager, parent_pipe) child_pipe.close() parent_pipe.close() - @pytest.mark.backend("mysql", "postgres") @mock.patch("airflow.dag_processing.processor.iter_airflow_imports") def test_start_new_processes_with_same_filepath(self, _): """ @@ -226,7 +218,6 @@ def test_start_new_processes_with_same_filepath(self, _): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) file_1 = "file_1.py" @@ -256,7 +247,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) mock_processor = MagicMock() @@ -277,7 +267,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) mock_processor = MagicMock() @@ -307,7 +296,6 @@ def test_file_paths_in_queue_sorted_alphabetically( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -333,7 +321,6 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -394,7 +381,6 @@ def test_file_paths_in_queue_sorted_by_modified_time( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -428,7 +414,6 @@ def test_file_paths_in_queue_excludes_missing_file( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -461,7 +446,6 @@ def test_add_new_file_to_parsing_queue( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -503,7 +487,6 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) # let's say the DAG was just parsed 10 seconds before the Freezed time @@ -560,7 +543,6 @@ def test_file_paths_in_queue_sorted_by_priority( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -582,7 +564,6 @@ def test_scan_stale_dags(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py") @@ -650,7 +631,6 @@ def test_scan_stale_dags_standalone_mode(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py") @@ -703,7 +683,6 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) processor = DagFileProcessorProcess( @@ -731,7 +710,6 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) processor = DagFileProcessorProcess( @@ -767,7 +745,6 @@ def test_dag_with_system_exit(self): max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - async_mode=True, ) manager._run_parsing_loop() @@ -809,7 +786,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - async_mode=False, ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -826,7 +802,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - async_mode=True, ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -839,7 +814,6 @@ def test_import_error_with_dag_directory(self, tmp_path): session.rollback() @conf_vars({("core", "load_examples"): "False"}) - @pytest.mark.backend("mysql", "postgres") @pytest.mark.execution_timeout(30) @mock.patch("airflow.dag_processing.manager.DagFileProcessorProcess") def test_pipe_full_deadlock(self, mock_processor): @@ -894,7 +868,6 @@ def fake_processor_(*args, **kwargs): max_runs=100, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - async_mode=True, ) try: @@ -926,14 +899,12 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagFileProcessorManager( dag_directory=path_to_parse.parent, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=async_mode, ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -963,7 +934,6 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -988,7 +958,6 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -1031,7 +1000,6 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.last_dag_dir_refresh_time = timezone.utcnow() - timedelta(minutes=10) @@ -1077,7 +1045,6 @@ def test_fetch_callbacks_from_database(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1120,7 +1087,6 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1156,7 +1122,6 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1193,7 +1158,6 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1212,7 +1176,6 @@ def test_callback_queue(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) dag1_req1 = DagCallbackRequest( @@ -1267,7 +1230,7 @@ def test_callback_queue(self, tmp_path): ] -def _wait_for_processor_agent_to_complete_in_async_mode(processor_agent: DagFileProcessorAgent): +def _wait_for_processor_agent_to_complete(processor_agent: DagFileProcessorAgent): start_timer = time.monotonic() while time.monotonic() - start_timer < 10: if processor_agent.done and all( @@ -1304,17 +1267,14 @@ class path, thus when reloading logging module the airflow.processor_manager # Launch a process through DagFileProcessorAgent, which will try # reload the logging module. test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") log_file_loc = conf.get("logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION") with contextlib.suppress(OSError): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() # Since we are reloading logging config not creating this file, @@ -1328,14 +1288,9 @@ def test_parse_once(self): clear_db_dags() test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") - processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() while not processor_agent.done: - if not async_mode: - processor_agent.wait_until_finished() processor_agent.heartbeat() assert processor_agent.all_files_processed @@ -1350,88 +1305,39 @@ def test_parse_once(self): def test_launch_process(self): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") log_file_loc = conf.get("logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION") with contextlib.suppress(OSError): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() assert os.path.isfile(log_file_loc) - def test_single_parsing_loop_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._process = Mock() - processor_agent._parent_signal_conn = None - with pytest.raises(ValueError, match="Process not started"): - processor_agent.run_single_parsing_loop() - - def test_single_parsing_loop_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._parent_signal_conn = Mock() - processor_agent._process = None - with pytest.raises(ValueError, match="Process not started"): - processor_agent.run_single_parsing_loop() - - def test_single_parsing_loop_process_isnt_alive(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._process = Mock() - processor_agent._parent_signal_conn = Mock() - processor_agent._process.is_alive.return_value = False - ret_val = processor_agent.run_single_parsing_loop() - assert not ret_val - - def test_single_parsing_loop_process_conn_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._process = Mock() - processor_agent._parent_signal_conn = Mock() - processor_agent._process.is_alive.return_value = True - processor_agent._parent_signal_conn.send.side_effect = ConnectionError - ret_val = processor_agent.run_single_parsing_loop() - assert not ret_val - def test_get_callbacks_pipe(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() retval = processor_agent.get_callbacks_pipe() assert retval == processor_agent._parent_signal_conn def test_get_callbacks_pipe_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.get_callbacks_pipe() - def test_wait_until_finished_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._parent_signal_conn = None - with pytest.raises(ValueError, match="Process not started"): - processor_agent.wait_until_finished() - - def test_wait_until_finished_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._parent_signal_conn = Mock() - processor_agent._parent_signal_conn.poll.return_value = True - processor_agent._parent_signal_conn.recv = Mock() - processor_agent._parent_signal_conn.recv.side_effect = EOFError - ret_val = processor_agent.wait_until_finished() - assert ret_val is None - def test_heartbeat_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.heartbeat() def test_heartbeat_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1440,7 +1346,7 @@ def test_heartbeat_poll_eof_error(self): assert ret_val is None def test_heartbeat_poll_connection_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1449,7 +1355,7 @@ def test_heartbeat_poll_connection_error(self): assert ret_val is None def test_heartbeat_poll_process_message(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.side_effect = [True, False] processor_agent._parent_signal_conn.recv = Mock() @@ -1460,19 +1366,19 @@ def test_heartbeat_poll_process_message(self): def test_process_message_invalid_type(self): message = "xyz" - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) with pytest.raises(RuntimeError, match="Unexpected message received of type str"): processor_agent._process_message(message) def test_heartbeat_manager(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent._heartbeat_manager() @mock.patch("airflow.utils.process_utils.reap_process_group") def test_heartbeat_manager_process_restart(self, mock_pg): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._process = MagicMock() processor_agent.start = Mock() @@ -1486,7 +1392,7 @@ def test_heartbeat_manager_process_restart(self, mock_pg): @mock.patch("time.monotonic") @mock.patch("airflow.dag_processing.manager.reap_process_group") def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock_stats): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.pid = 12345 @@ -1507,7 +1413,7 @@ def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock processor_agent.start.assert_called() def test_heartbeat_manager_terminate(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True @@ -1517,7 +1423,7 @@ def test_heartbeat_manager_terminate(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_terminate_conn_err(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True processor_agent._parent_signal_conn = Mock() @@ -1528,7 +1434,7 @@ def test_heartbeat_manager_terminate_conn_err(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_end_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._process = Mock() processor_agent._process.__bool__ = Mock(return_value=False) processor_agent._process.side_effect = [None] @@ -1541,17 +1447,13 @@ def test_heartbeat_manager_end_no_process(self): @conf_vars({("logging", "dag_processor_manager_log_stdout"): "True"}) def test_log_to_stdout(self, capfd): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() - if async_mode: - _wait_for_processor_agent_to_complete_in_async_mode(processor_agent) + _wait_for_processor_agent_to_complete(processor_agent) # Capture the stdout and stderr out, _ = capfd.readouterr() @@ -1560,17 +1462,13 @@ def test_log_to_stdout(self, capfd): @conf_vars({("logging", "dag_processor_manager_log_stdout"): "False"}) def test_not_log_to_stdout(self, capfd): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() - if async_mode: - _wait_for_processor_agent_to_complete_in_async_mode(processor_agent) + _wait_for_processor_agent_to_complete(processor_agent) # Capture the stdout and stderr out, _ = capfd.readouterr() diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 336e2ab242c7c..c2962ea04115c 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import datetime import os import pathlib import sys @@ -30,7 +29,6 @@ from airflow import settings from airflow.callbacks.callback_requests import TaskCallbackRequest from airflow.configuration import TEST_DAGS_FOLDER, conf -from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess from airflow.models import DagBag, DagModel, TaskInstance from airflow.models.serialized_dag import SerializedDagModel @@ -656,49 +654,3 @@ def test_counter_for_last_num_of_db_queries(self): session=session, ): self._process_file(dag_filepath, TEST_DAG_FOLDER, session) - - -class TestProcessorAgent: - @pytest.fixture(autouse=True) - def per_test(self): - self.processor_agent = None - yield - if self.processor_agent: - self.processor_agent.end() - - def test_error_when_waiting_in_async_mode(self, tmp_path): - self.processor_agent = DagFileProcessorAgent( - dag_directory=tmp_path, - max_runs=1, - processor_timeout=datetime.timedelta(1), - dag_ids=[], - async_mode=True, - ) - self.processor_agent.start() - with pytest.raises(RuntimeError, match="wait_until_finished should only be called in sync_mode"): - self.processor_agent.wait_until_finished() - - def test_default_multiprocessing_behaviour(self, tmp_path): - self.processor_agent = DagFileProcessorAgent( - dag_directory=tmp_path, - max_runs=1, - processor_timeout=datetime.timedelta(1), - dag_ids=[], - async_mode=False, - ) - self.processor_agent.start() - self.processor_agent.run_single_parsing_loop() - self.processor_agent.wait_until_finished() - - @conf_vars({("core", "mp_start_method"): "spawn"}) - def test_spawn_multiprocessing_behaviour(self, tmp_path): - self.processor_agent = DagFileProcessorAgent( - dag_directory=tmp_path, - max_runs=1, - processor_timeout=datetime.timedelta(1), - dag_ids=[], - async_mode=False, - ) - self.processor_agent.start() - self.processor_agent.run_single_parsing_loop() - self.processor_agent.wait_until_finished() diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index 6d69a90d4f870..1c144eb844540 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -46,10 +46,6 @@ def test_is_local_default_value(): assert not BaseExecutor.is_local -def test_is_single_threaded_default_value(): - assert not BaseExecutor.is_single_threaded - - def test_is_production_default_value(): assert BaseExecutor.is_production diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py index a8ad667957677..fdafa68ad5aa6 100644 --- a/tests/executors/test_debug_executor.py +++ b/tests/executors/test_debug_executor.py @@ -119,9 +119,6 @@ def test_fail_fast(self, change_state_mock): def test_reschedule_mode(self): assert DebugExecutor.change_sensor_mode_to_reschedule - def test_is_single_threaded(self): - assert DebugExecutor.is_single_threaded - def test_is_production_default_value(self): assert not DebugExecutor.is_production diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index 7532c878dc51f..48c531c5cb942 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -from contextlib import nullcontext from importlib import reload from unittest import mock @@ -33,11 +32,7 @@ class FakeExecutor: - is_single_threaded = False - - -class FakeSingleThreadedExecutor: - is_single_threaded = True + pass class TestExecutorLoader: @@ -219,31 +214,6 @@ def test_should_support_import_custom_path(self, executor_config): assert executor.__name__ == "FakeExecutor" assert import_source == ConnectorSource.CUSTOM_PATH - @pytest.mark.db_test - @pytest.mark.backend("mysql", "postgres") - @pytest.mark.parametrize("executor", [FakeExecutor, FakeSingleThreadedExecutor]) - def test_validate_database_executor_compatibility_general(self, monkeypatch, executor): - monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") - ExecutorLoader.validate_database_executor_compatibility(executor) - - @pytest.mark.db_test - @pytest.mark.backend("sqlite") - @pytest.mark.parametrize( - ["executor", "expectation"], - [ - pytest.param(FakeSingleThreadedExecutor, nullcontext(), id="single-threaded"), - pytest.param( - FakeExecutor, - pytest.raises(AirflowConfigException, match=r"^error: cannot use SQLite with the .+"), - id="multi-threaded", - ), - ], - ) - def test_validate_database_executor_compatibility_sqlite(self, monkeypatch, executor, expectation): - monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") - with expectation: - ExecutorLoader.validate_database_executor_compatibility(executor) - def test_load_executor(self): with conf_vars({("core", "executor"): "LocalExecutor"}): ExecutorLoader.init_executors() diff --git a/tests/executors/test_sequential_executor.py b/tests/executors/test_sequential_executor.py index f6cb7aae575b5..ea7ab6aad96ef 100644 --- a/tests/executors/test_sequential_executor.py +++ b/tests/executors/test_sequential_executor.py @@ -35,9 +35,6 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert SequentialExecutor.serve_logs - def test_is_single_threaded_default_value(self): - assert SequentialExecutor.is_single_threaded - @mock.patch("airflow.executors.sequential_executor.SequentialExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge") diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 85097f43d9c4f..963c0a0bd5d7f 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -1438,18 +1438,13 @@ def _clear_db(request): @pytest.fixture(autouse=True) def clear_lru_cache(): - from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.entry_points import _get_grouped_entry_points - ExecutorLoader.validate_database_executor_compatibility.cache_clear() + _get_grouped_entry_points.cache_clear() try: - _get_grouped_entry_points.cache_clear() - try: - yield - finally: - _get_grouped_entry_points.cache_clear() + yield finally: - ExecutorLoader.validate_database_executor_compatibility.cache_clear() + _get_grouped_entry_points.cache_clear() @pytest.fixture(autouse=True)