From 60e7c71b7f767ff466e239c8207f2d31518369f4 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Wed, 11 Dec 2024 11:12:46 +0000 Subject: [PATCH 1/2] Remove "single process" restrictions on SQLite in favour of using WAL mode Since 2010(!) sqlite has had a WAL, or Write-Ahead Log mode of journalling which allos multiple concurrent readers and one writer. More than good enough for us for "local" use. The primary driver for this change was a realisation that it is possible and to reduce the amount of code in complexity in DagProcessorManager before reworking it for AIP-72 support :- we have a lot of code in the DagProcessorManager to support `if async_mode` that makes understanding the flow complex. Some useful docs and articles about this mode: - [The offical docs](https://sqlite.org/wal.html) - [Simon Willison's TIL](https://til.simonwillison.net/sqlite/enabling-wal-mode) - [fly.io article about scaling read concurrency](https://fly.io/blog/sqlite-internals-wal/) This still keeps the warning against using SQLite in production, but it greatly reduces the restrictions what combos and settings can use this. In short, when using an SQLite db it is now possible to: - use LocalExecutor, including with more than 1 concurrent worker slot - have multiple DAG parsing processes (even before AIP-72/TaskSDK changes to that) We execute the `PRAGMA journal_mode` every time we connect, which is more often that is strictly needed as this is one of the few modes thatis persistent and a property of the DB file just for ease and to ensure that it it is in the mode we want. I have tested this with `breeze -b sqlite start_airflow` and a kicking off a lot of tasks concurrently. Will this be without problems? No, not entirely, but due to the scheduler+webserver+api server process we've _already_ got the case where multiple processes are operating on the DB file. This change just makes the best use of that following the guidance of the SQLite project: Ensuring that only a single process accesses the DB concurrently is not a requirement anymore! --- .../local_commands/scheduler_command.py | 1 - airflow/dag_processing/manager.py | 119 ++------------ airflow/executors/base_executor.py | 1 - airflow/executors/debug_executor.py | 1 - airflow/executors/executor_loader.py | 49 +----- airflow/executors/sequential_executor.py | 1 - airflow/jobs/scheduler_job_runner.py | 15 -- airflow/sentry.py | 2 +- airflow/utils/orm_event_handlers.py | 1 + .../utils/docker_command_utils.py | 11 -- .../test_celery_kubernetes_executor.py | 3 - .../test_local_kubernetes_executor.py | 3 - tests/dag_processing/test_manager.py | 146 +++--------------- tests/dag_processing/test_processor.py | 48 ------ tests/executors/test_base_executor.py | 4 - tests/executors/test_debug_executor.py | 3 - tests/executors/test_executor_loader.py | 32 +--- tests/executors/test_sequential_executor.py | 3 - tests_common/pytest_plugin.py | 11 +- 19 files changed, 43 insertions(+), 411 deletions(-) diff --git a/airflow/cli/commands/local_commands/scheduler_command.py b/airflow/cli/commands/local_commands/scheduler_command.py index fb436b22b4e23..61362c497a044 100644 --- a/airflow/cli/commands/local_commands/scheduler_command.py +++ b/airflow/cli/commands/local_commands/scheduler_command.py @@ -40,7 +40,6 @@ def _run_scheduler_job(args) -> None: job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs) - ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__) enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): run_job(job=job_runner.job, execute_callable=job_runner._execute) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 2df7890a48f5d..7d9c9298a996e 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -97,7 +97,6 @@ class DagFileStat: class DagParsingSignal(enum.Enum): """All signals sent to parser.""" - AGENT_RUN_ONCE = "agent_run_once" TERMINATE_MANAGER = "terminate_manager" END_MANAGER = "end_manager" @@ -118,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): for unlimited. :param processor_timeout: How long to wait before timing out a DAG file processor :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param async_mode: Whether to start agent in async mode """ def __init__( @@ -127,14 +125,12 @@ def __init__( max_runs: int, processor_timeout: timedelta, dag_ids: list[str] | None, - async_mode: bool, ): super().__init__() self._dag_directory: os.PathLike = dag_directory self._max_runs = max_runs self._processor_timeout = processor_timeout self._dag_ids = dag_ids - self._async_mode = async_mode # Map from file path to the processor self._processors: dict[str, DagFileProcessorProcess] = {} # Pipe for communicating signals @@ -161,7 +157,6 @@ def start(self) -> None: self._processor_timeout, child_signal_conn, self._dag_ids, - self._async_mode, ), ) self._process = process @@ -170,49 +165,12 @@ def start(self) -> None: self.log.info("Launched DagFileProcessorManager with pid: %s", process.pid) - def run_single_parsing_loop(self) -> None: - """ - Send agent heartbeat signal to the manager, requesting that it runs one processing "loop". - - Should only be used when launched DAG file processor manager in sync mode. - - Call wait_until_finished to ensure that any launched processors have finished before continuing. - """ - if not self._parent_signal_conn or not self._process: - raise ValueError("Process not started.") - if not self._process.is_alive(): - return - - try: - self._parent_signal_conn.send(DagParsingSignal.AGENT_RUN_ONCE) - except ConnectionError: - # If this died cos of an error then we will noticed and restarted - # when harvest_serialized_dags calls _heartbeat_manager. - pass - def get_callbacks_pipe(self) -> MultiprocessingConnection: """Return the pipe for sending Callbacks to DagProcessorManager.""" if not self._parent_signal_conn: raise ValueError("Process not started.") return self._parent_signal_conn - def wait_until_finished(self) -> None: - """Wait until DAG parsing is finished.""" - if not self._parent_signal_conn: - raise ValueError("Process not started.") - if self._async_mode: - raise RuntimeError("wait_until_finished should only be called in sync_mode") - while self._parent_signal_conn.poll(timeout=None): - try: - result = self._parent_signal_conn.recv() - except EOFError: - return - self._process_message(result) - if isinstance(result, DagParsingStat): - # In sync mode (which is the only time we call this function) we don't send this message from - # the Manager until all the running processors have finished - return - @staticmethod def _run_processor_manager( dag_directory: os.PathLike, @@ -220,7 +178,6 @@ def _run_processor_manager( processor_timeout: timedelta, signal_conn: MultiprocessingConnection, dag_ids: list[str] | None, - async_mode: bool, ) -> None: # Make this process start as a new process group - that makes it easy # to kill all sub-process of this at the OS-level, rather than having @@ -241,7 +198,6 @@ def _run_processor_manager( processor_timeout=processor_timeout, dag_ids=dag_ids, signal_conn=signal_conn, - async_mode=async_mode, ) processor_manager.start() @@ -352,7 +308,6 @@ class DagFileProcessorManager(LoggingMixin): :param processor_timeout: How long to wait before timing out a DAG file processor :param signal_conn: connection to communicate signal with processor agent. :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param async_mode: whether to start the manager in async mode """ def __init__( @@ -362,7 +317,6 @@ def __init__( processor_timeout: timedelta, dag_ids: list[str] | None, signal_conn: MultiprocessingConnection | None = None, - async_mode: bool = True, ): super().__init__() # known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly @@ -372,30 +326,16 @@ def __init__( # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn self._dag_ids = dag_ids - self._async_mode = async_mode self._parsing_start_time: float | None = None self._dag_directory = dag_directory # Set the signal conn in to non-blocking mode, so that attempting to # send when the buffer is full errors, rather than hangs for-ever # attempting to send (this is to avoid deadlocks!) - # - # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to - # continue the scheduler - if self._async_mode and self._direct_scheduler_conn is not None: + if self._direct_scheduler_conn: os.set_blocking(self._direct_scheduler_conn.fileno(), False) self.standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor") self._parallelism = conf.getint("scheduler", "parsing_processes") - if ( - conf.get_mandatory_value("database", "sql_alchemy_conn").startswith("sqlite") - and self._parallelism > 1 - ): - self.log.warning( - "Because we cannot use more than 1 thread (parsing_processes = " - "%d) when using sqlite. So we set parallelism to 1.", - self._parallelism, - ) - self._parallelism = 1 # Parse and schedule each file no faster than this interval. self._file_process_interval = conf.getint("scheduler", "min_file_process_interval") @@ -531,20 +471,13 @@ def deactivate_stale_dags( cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated) def _run_parsing_loop(self): - # In sync mode we want timeout=None -- wait forever until a message is received - if self._async_mode: - poll_time = 0.0 - else: - poll_time = None + poll_time = 0.0 self._refresh_dag_dir() self.prepare_file_path_queue() max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop") - if self._async_mode: - # If we're in async mode, we can start up straight away. If we're - # in sync mode we need to be told to start a "loop" - self.start_new_processes() + self.start_new_processes() while True: with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span: loop_start_time = time.monotonic() @@ -557,36 +490,16 @@ def _run_parsing_loop(self): self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal) if agent_signal == DagParsingSignal.TERMINATE_MANAGER: - if span.is_recording(): - span.add_event(name="terminate") self.terminate() break elif agent_signal == DagParsingSignal.END_MANAGER: - if span.is_recording(): - span.add_event(name="end") self.end() sys.exit(os.EX_OK) - elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE: - # continue the loop to parse dags - pass elif isinstance(agent_signal, CallbackRequest): self._add_callback_to_queue(agent_signal) else: raise ValueError(f"Invalid message {type(agent_signal)}") - if not ready and not self._async_mode: - # In "sync" mode we don't want to parse the DAGs until we - # are told to (as that would open another connection to the - # SQLite DB which isn't a good practice - - # This shouldn't happen, as in sync mode poll should block for - # ever. Lets be defensive about that. - self.log.warning( - "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time - ) - - continue - for sentinel in ready: if sentinel is not self._direct_scheduler_conn: processor = self.waitables.get(sentinel) @@ -631,14 +544,6 @@ def _run_parsing_loop(self): # Update number of loop iteration. self._num_run += 1 - if not self._async_mode: - self.log.debug("Waiting for processors to finish since we're using sqlite") - # Wait until the running DAG processors are finished before - # sending a DagParsingStat message back. This means the Agent - # can tell we've got to the end of this iteration when it sees - # this type of message - self.wait_until_finished() - # Collect anything else that has finished, but don't kick off any more processors if span.is_recording(): span.add_event(name="collect_results") @@ -664,10 +569,9 @@ def _run_parsing_loop(self): except BlockingIOError: # Try again next time around the loop! - # It is better to fail, than it is deadlock. This should - # "almost never happen" since the DagParsingStat object is - # small, and in async mode this stat is not actually _required_ - # for normal operation (It only drives "max runs") + # It is better to fail, than it is deadlock. This should "almost never happen" since the + # DagParsingStat object is small, and is not actually _required_ for normal operation (It + # only drives "max runs") self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring") if max_runs_reached: @@ -683,12 +587,11 @@ def _run_parsing_loop(self): ) break - if self._async_mode: - loop_duration = time.monotonic() - loop_start_time - if loop_duration < 1: - poll_time = 1 - loop_duration - else: - poll_time = 0.0 + loop_duration = time.monotonic() - loop_start_time + if loop_duration < 1: + poll_time = 1 - loop_duration + else: + poll_time = 0.0 @classmethod @provide_session diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index b57f7458bbb97..1ba0ed15567ba 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -119,7 +119,6 @@ class BaseExecutor(LoggingMixin): supports_sentry: bool = False is_local: bool = False - is_single_threaded: bool = False is_production: bool = True change_sensor_mode_to_reschedule: bool = False diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index 525c80791e37a..9d86f5e72703c 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -47,7 +47,6 @@ class DebugExecutor(BaseExecutor): _terminated = threading.Event() - is_single_threaded: bool = True is_production: bool = False change_sensor_mode_to_reschedule: bool = True diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index 8093566ab5abe..cd025e38c8d65 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -18,9 +18,7 @@ from __future__ import annotations -import functools import logging -import os from typing import TYPE_CHECKING from airflow.exceptions import AirflowConfigException, UnknownExecutorException @@ -236,68 +234,29 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor return executor @classmethod - def import_executor_cls( - cls, executor_name: ExecutorName, validate: bool = True - ) -> tuple[type[BaseExecutor], ConnectorSource]: + def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]: """ Import the executor class. Supports the same formats as ExecutorLoader.load_executor. :param executor_name: Name of core executor or module path to executor. - :param validate: Whether or not to validate the executor before returning :return: executor class via executor_name and executor import source """ - - def _import_and_validate(path: str) -> type[BaseExecutor]: - executor = import_string(path) - if validate: - cls.validate_database_executor_compatibility(executor) - return executor - - return _import_and_validate(executor_name.module_path), executor_name.connector_source + return import_string(executor_name.module_path), executor_name.connector_source @classmethod - def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]: + def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]: """ Import the default executor class. - :param validate: Whether or not to validate the executor before returning - :return: executor class and executor import source """ executor_name = cls.get_default_executor_name() - executor, source = cls.import_executor_cls(executor_name, validate=validate) + executor, source = cls.import_executor_cls(executor_name) return executor, source - @classmethod - @functools.cache - def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None: - """ - Validate database and executor compatibility. - - Most of the databases work universally, but SQLite can only work with - single-threaded executors (e.g. Sequential). - - This is NOT done in ``airflow.configuration`` (when configuration is - initialized) because loading the executor class is heavy work we want to - avoid unless needed. - """ - # Single threaded executors can run with any backend. - if executor.is_single_threaded: - return - - # This is set in tests when we want to be able to use SQLite. - if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1": - return - - from airflow.settings import engine - - # SQLite only works with single threaded executors - if engine.dialect.name == "sqlite": - raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}") - @classmethod def __load_celery_kubernetes_executor(cls) -> BaseExecutor: celery_executor = import_string(cls.executors[CELERY_EXECUTOR])() diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 7ac8047ed5d53..e3dce51e1ebde 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -49,7 +49,6 @@ class SequentialExecutor(BaseExecutor): """ is_local: bool = True - is_single_threaded: bool = True is_production: bool = False serve_logs: bool = True diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 0dd6b32f74143..21fa41aa2c592 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -209,9 +209,6 @@ def __init__( if log: self._log = log - # Check what SQL backend we use - sql_conn: str = conf.get_mandatory_value("database", "sql_alchemy_conn").lower() - self.using_sqlite = sql_conn.startswith("sqlite") # Dag Processor agent - not used in Dag Processor standalone mode. self.processor_agent: DagFileProcessorAgent | None = None @@ -930,10 +927,6 @@ def _execute(self) -> int | None: self.log.info("Processing each file at most %s times", self.num_times_parse_dags) - # When using sqlite, we do not use async_mode - # so the scheduler job and DAG parser don't access the DB at the same time. - async_mode = not self.using_sqlite - processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout") processor_timeout = timedelta(seconds=processor_timeout_seconds) if not self._standalone_dag_processor and not self.processor_agent: @@ -942,7 +935,6 @@ def _execute(self) -> int | None: max_runs=self.num_times_parse_dags, processor_timeout=processor_timeout, dag_ids=[], - async_mode=async_mode, ) reset_signals = self.register_signals() @@ -1106,13 +1098,6 @@ def _run_scheduler_loop(self) -> None: } ) - if self.using_sqlite and self.processor_agent: - self.processor_agent.run_single_parsing_loop() - # For the sqlite case w/ 1 thread, wait until the processor - # is finished to avoid concurrent access to the DB. - self.log.debug("Waiting for processors to finish since we're using sqlite") - self.processor_agent.wait_until_finished() - with create_session() as session: # This will schedule for as many executors as possible. num_queued_tis = self._do_scheduling(session) diff --git a/airflow/sentry.py b/airflow/sentry.py index 22261bd99fd2d..7a7b1df2e118e 100644 --- a/airflow/sentry.py +++ b/airflow/sentry.py @@ -85,7 +85,7 @@ def __init__(self): # LoggingIntegration is set by default. integrations = [sentry_flask] - executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False) + executor_class, _ = ExecutorLoader.import_default_executor_cls() if executor_class.supports_sentry: from sentry_sdk.integrations.celery import CeleryIntegration diff --git a/airflow/utils/orm_event_handlers.py b/airflow/utils/orm_event_handlers.py index 575f7ac939167..a62e21af33ea2 100644 --- a/airflow/utils/orm_event_handlers.py +++ b/airflow/utils/orm_event_handlers.py @@ -46,6 +46,7 @@ def connect(dbapi_connection, connection_record): def set_sqlite_pragma(dbapi_connection, connection_record): cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") + cursor.execute("PRAGMA journal_mode=WAL") cursor.close() # this ensures coherence in mysql when storing datetimes (not required for postgres) diff --git a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py index 2352bcda533a2..d97706b59e7cc 100644 --- a/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/docker_command_utils.py @@ -52,7 +52,6 @@ DOCKER_DEFAULT_PLATFORM, MIN_DOCKER_COMPOSE_VERSION, MIN_DOCKER_VERSION, - SEQUENTIAL_EXECUTOR, ) from airflow_breeze.utils.console import Output, get_console from airflow_breeze.utils.run_utils import ( @@ -724,16 +723,13 @@ def execute_command_in_shell( :param command: """ shell_params.backend = "sqlite" - shell_params.executor = SEQUENTIAL_EXECUTOR shell_params.forward_ports = False shell_params.project_name = project_name shell_params.quiet = True shell_params.skip_environment_initialization = True shell_params.skip_image_upgrade_check = True if get_verbose(): - get_console().print(f"[warning]Backend forced to: sqlite and {SEQUENTIAL_EXECUTOR}[/]") get_console().print("[warning]Sqlite DB is cleaned[/]") - get_console().print(f"[warning]Executor forced to {SEQUENTIAL_EXECUTOR}[/]") get_console().print("[warning]Disabled port forwarding[/]") get_console().print(f"[warning]Project name set to: {project_name}[/]") get_console().print("[warning]Forced quiet mode[/]") @@ -775,13 +771,6 @@ def enter_shell(shell_params: ShellParams, output: Output | None = None) -> RunC ) bring_compose_project_down(preserve_volumes=False, shell_params=shell_params) - if shell_params.backend == "sqlite" and shell_params.executor != SEQUENTIAL_EXECUTOR: - get_console().print( - f"\n[warning]backend: sqlite is not " - f"compatible with executor: {shell_params.executor}. " - f"Changing the executor to {SEQUENTIAL_EXECUTOR}.\n" - ) - shell_params.executor = SEQUENTIAL_EXECUTOR if shell_params.restart: bring_compose_project_down(preserve_volumes=False, shell_params=shell_params) if shell_params.include_mypy_volume: diff --git a/providers/tests/celery/executors/test_celery_kubernetes_executor.py b/providers/tests/celery/executors/test_celery_kubernetes_executor.py index 6c3857912b5ce..ce51afea4e8fb 100644 --- a/providers/tests/celery/executors/test_celery_kubernetes_executor.py +++ b/providers/tests/celery/executors/test_celery_kubernetes_executor.py @@ -46,9 +46,6 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert not CeleryKubernetesExecutor.serve_logs - def test_is_single_threaded_default_value(self): - assert not CeleryKubernetesExecutor.is_single_threaded - def test_cli_commands_vended(self): assert CeleryKubernetesExecutor.get_cli_commands() diff --git a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py index 2b1817ac89853..1a745ac139315 100644 --- a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py @@ -43,9 +43,6 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert LocalKubernetesExecutor.serve_logs - def test_is_single_threaded_default_value(self): - assert not LocalKubernetesExecutor.is_single_threaded - def test_cli_commands_vended(self): assert LocalKubernetesExecutor.get_cli_commands() diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 3154f3a49dfb4..4a338e164d64b 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -137,9 +137,6 @@ def teardown_class(self): clear_db_callbacks() def run_processor_manager_one_loop(self, processor, parent_pipe): - if not processor._async_mode: - parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) - results = [] while True: @@ -167,14 +164,12 @@ def test_remove_file_clears_import_error(self, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagFileProcessorManager( dag_directory=path_to_parse.parent, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=async_mode, ) with create_session() as session: @@ -185,7 +180,7 @@ def test_remove_file_clears_import_error(self, tmp_path): path_to_parse.unlink() - # Rerun the scheduler once the dag file has been removed + # Rerun the parser once the dag file has been removed self.run_processor_manager_one_loop(manager, parent_pipe) import_errors = session.query(ParseImportError).all() @@ -199,21 +194,18 @@ def test_remove_file_clears_import_error(self, tmp_path): def test_max_runs_when_no_files(self, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagFileProcessorManager( dag_directory=os.fspath(tmp_path), max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=async_mode, ) self.run_processor_manager_one_loop(manager, parent_pipe) child_pipe.close() parent_pipe.close() - @pytest.mark.backend("mysql", "postgres") @mock.patch("airflow.dag_processing.processor.iter_airflow_imports") def test_start_new_processes_with_same_filepath(self, _): """ @@ -226,7 +218,6 @@ def test_start_new_processes_with_same_filepath(self, _): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) file_1 = "file_1.py" @@ -256,7 +247,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) mock_processor = MagicMock() @@ -277,7 +267,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) mock_processor = MagicMock() @@ -307,7 +296,6 @@ def test_file_paths_in_queue_sorted_alphabetically( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -333,7 +321,6 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -394,7 +381,6 @@ def test_file_paths_in_queue_sorted_by_modified_time( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -428,7 +414,6 @@ def test_file_paths_in_queue_excludes_missing_file( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -461,7 +446,6 @@ def test_add_new_file_to_parsing_queue( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -503,7 +487,6 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) # let's say the DAG was just parsed 10 seconds before the Freezed time @@ -560,7 +543,6 @@ def test_file_paths_in_queue_sorted_by_priority( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.set_file_paths(dag_files) @@ -582,7 +564,6 @@ def test_scan_stale_dags(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py") @@ -650,7 +631,6 @@ def test_scan_stale_dags_standalone_mode(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py") @@ -703,7 +683,6 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) processor = DagFileProcessorProcess( @@ -731,7 +710,6 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) processor = DagFileProcessorProcess( @@ -767,7 +745,6 @@ def test_dag_with_system_exit(self): max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - async_mode=True, ) manager._run_parsing_loop() @@ -809,7 +786,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - async_mode=False, ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -826,7 +802,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - async_mode=True, ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -839,7 +814,6 @@ def test_import_error_with_dag_directory(self, tmp_path): session.rollback() @conf_vars({("core", "load_examples"): "False"}) - @pytest.mark.backend("mysql", "postgres") @pytest.mark.execution_timeout(30) @mock.patch("airflow.dag_processing.manager.DagFileProcessorProcess") def test_pipe_full_deadlock(self, mock_processor): @@ -894,7 +868,6 @@ def fake_processor_(*args, **kwargs): max_runs=100, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - async_mode=True, ) try: @@ -926,14 +899,12 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path): child_pipe, parent_pipe = multiprocessing.Pipe() - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagFileProcessorManager( dag_directory=path_to_parse.parent, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=async_mode, ) self.run_processor_manager_one_loop(manager, parent_pipe) @@ -963,7 +934,6 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -988,7 +958,6 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) dagbag = DagBag(dag_folder=tmp_path, include_examples=False) zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip") @@ -1031,7 +1000,6 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) manager.last_dag_dir_refresh_time = timezone.utcnow() - timedelta(minutes=10) @@ -1077,7 +1045,6 @@ def test_fetch_callbacks_from_database(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1120,7 +1087,6 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1156,7 +1122,6 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1193,7 +1158,6 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - async_mode=False, ) with create_session() as session: @@ -1212,7 +1176,6 @@ def test_callback_queue(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - async_mode=True, ) dag1_req1 = DagCallbackRequest( @@ -1267,7 +1230,7 @@ def test_callback_queue(self, tmp_path): ] -def _wait_for_processor_agent_to_complete_in_async_mode(processor_agent: DagFileProcessorAgent): +def _wait_for_processor_agent_to_complete(processor_agent: DagFileProcessorAgent): start_timer = time.monotonic() while time.monotonic() - start_timer < 10: if processor_agent.done and all( @@ -1304,17 +1267,14 @@ class path, thus when reloading logging module the airflow.processor_manager # Launch a process through DagFileProcessorAgent, which will try # reload the logging module. test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") log_file_loc = conf.get("logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION") with contextlib.suppress(OSError): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() # Since we are reloading logging config not creating this file, @@ -1328,14 +1288,9 @@ def test_parse_once(self): clear_db_dags() test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") - processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() while not processor_agent.done: - if not async_mode: - processor_agent.wait_until_finished() processor_agent.heartbeat() assert processor_agent.all_files_processed @@ -1350,88 +1305,39 @@ def test_parse_once(self): def test_launch_process(self): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") log_file_loc = conf.get("logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION") with contextlib.suppress(OSError): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() assert os.path.isfile(log_file_loc) - def test_single_parsing_loop_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._process = Mock() - processor_agent._parent_signal_conn = None - with pytest.raises(ValueError, match="Process not started"): - processor_agent.run_single_parsing_loop() - - def test_single_parsing_loop_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._parent_signal_conn = Mock() - processor_agent._process = None - with pytest.raises(ValueError, match="Process not started"): - processor_agent.run_single_parsing_loop() - - def test_single_parsing_loop_process_isnt_alive(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._process = Mock() - processor_agent._parent_signal_conn = Mock() - processor_agent._process.is_alive.return_value = False - ret_val = processor_agent.run_single_parsing_loop() - assert not ret_val - - def test_single_parsing_loop_process_conn_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._process = Mock() - processor_agent._parent_signal_conn = Mock() - processor_agent._process.is_alive.return_value = True - processor_agent._parent_signal_conn.send.side_effect = ConnectionError - ret_val = processor_agent.run_single_parsing_loop() - assert not ret_val - def test_get_callbacks_pipe(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() retval = processor_agent.get_callbacks_pipe() assert retval == processor_agent._parent_signal_conn def test_get_callbacks_pipe_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.get_callbacks_pipe() - def test_wait_until_finished_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._parent_signal_conn = None - with pytest.raises(ValueError, match="Process not started"): - processor_agent.wait_until_finished() - - def test_wait_until_finished_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) - processor_agent._parent_signal_conn = Mock() - processor_agent._parent_signal_conn.poll.return_value = True - processor_agent._parent_signal_conn.recv = Mock() - processor_agent._parent_signal_conn.recv.side_effect = EOFError - ret_val = processor_agent.wait_until_finished() - assert ret_val is None - def test_heartbeat_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.heartbeat() def test_heartbeat_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1440,7 +1346,7 @@ def test_heartbeat_poll_eof_error(self): assert ret_val is None def test_heartbeat_poll_connection_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1449,7 +1355,7 @@ def test_heartbeat_poll_connection_error(self): assert ret_val is None def test_heartbeat_poll_process_message(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.side_effect = [True, False] processor_agent._parent_signal_conn.recv = Mock() @@ -1460,19 +1366,19 @@ def test_heartbeat_poll_process_message(self): def test_process_message_invalid_type(self): message = "xyz" - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) with pytest.raises(RuntimeError, match="Unexpected message received of type str"): processor_agent._process_message(message) def test_heartbeat_manager(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent._heartbeat_manager() @mock.patch("airflow.utils.process_utils.reap_process_group") def test_heartbeat_manager_process_restart(self, mock_pg): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._process = MagicMock() processor_agent.start = Mock() @@ -1486,7 +1392,7 @@ def test_heartbeat_manager_process_restart(self, mock_pg): @mock.patch("time.monotonic") @mock.patch("airflow.dag_processing.manager.reap_process_group") def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock_stats): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.pid = 12345 @@ -1507,7 +1413,7 @@ def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock processor_agent.start.assert_called() def test_heartbeat_manager_terminate(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True @@ -1517,7 +1423,7 @@ def test_heartbeat_manager_terminate(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_terminate_conn_err(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True processor_agent._parent_signal_conn = Mock() @@ -1528,7 +1434,7 @@ def test_heartbeat_manager_terminate_conn_err(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_end_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), []) processor_agent._process = Mock() processor_agent._process.__bool__ = Mock(return_value=False) processor_agent._process.side_effect = [None] @@ -1541,17 +1447,13 @@ def test_heartbeat_manager_end_no_process(self): @conf_vars({("logging", "dag_processor_manager_log_stdout"): "True"}) def test_log_to_stdout(self, capfd): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() - if async_mode: - _wait_for_processor_agent_to_complete_in_async_mode(processor_agent) + _wait_for_processor_agent_to_complete(processor_agent) # Capture the stdout and stderr out, _ = capfd.readouterr() @@ -1560,17 +1462,13 @@ def test_log_to_stdout(self, capfd): @conf_vars({("logging", "dag_processor_manager_log_stdout"): "False"}) def test_not_log_to_stdout(self, capfd): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" - async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), []) processor_agent.start() - if not async_mode: - processor_agent.run_single_parsing_loop() processor_agent._process.join() - if async_mode: - _wait_for_processor_agent_to_complete_in_async_mode(processor_agent) + _wait_for_processor_agent_to_complete(processor_agent) # Capture the stdout and stderr out, _ = capfd.readouterr() diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 336e2ab242c7c..c2962ea04115c 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import datetime import os import pathlib import sys @@ -30,7 +29,6 @@ from airflow import settings from airflow.callbacks.callback_requests import TaskCallbackRequest from airflow.configuration import TEST_DAGS_FOLDER, conf -from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess from airflow.models import DagBag, DagModel, TaskInstance from airflow.models.serialized_dag import SerializedDagModel @@ -656,49 +654,3 @@ def test_counter_for_last_num_of_db_queries(self): session=session, ): self._process_file(dag_filepath, TEST_DAG_FOLDER, session) - - -class TestProcessorAgent: - @pytest.fixture(autouse=True) - def per_test(self): - self.processor_agent = None - yield - if self.processor_agent: - self.processor_agent.end() - - def test_error_when_waiting_in_async_mode(self, tmp_path): - self.processor_agent = DagFileProcessorAgent( - dag_directory=tmp_path, - max_runs=1, - processor_timeout=datetime.timedelta(1), - dag_ids=[], - async_mode=True, - ) - self.processor_agent.start() - with pytest.raises(RuntimeError, match="wait_until_finished should only be called in sync_mode"): - self.processor_agent.wait_until_finished() - - def test_default_multiprocessing_behaviour(self, tmp_path): - self.processor_agent = DagFileProcessorAgent( - dag_directory=tmp_path, - max_runs=1, - processor_timeout=datetime.timedelta(1), - dag_ids=[], - async_mode=False, - ) - self.processor_agent.start() - self.processor_agent.run_single_parsing_loop() - self.processor_agent.wait_until_finished() - - @conf_vars({("core", "mp_start_method"): "spawn"}) - def test_spawn_multiprocessing_behaviour(self, tmp_path): - self.processor_agent = DagFileProcessorAgent( - dag_directory=tmp_path, - max_runs=1, - processor_timeout=datetime.timedelta(1), - dag_ids=[], - async_mode=False, - ) - self.processor_agent.start() - self.processor_agent.run_single_parsing_loop() - self.processor_agent.wait_until_finished() diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index 6d69a90d4f870..1c144eb844540 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -46,10 +46,6 @@ def test_is_local_default_value(): assert not BaseExecutor.is_local -def test_is_single_threaded_default_value(): - assert not BaseExecutor.is_single_threaded - - def test_is_production_default_value(): assert BaseExecutor.is_production diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py index a8ad667957677..fdafa68ad5aa6 100644 --- a/tests/executors/test_debug_executor.py +++ b/tests/executors/test_debug_executor.py @@ -119,9 +119,6 @@ def test_fail_fast(self, change_state_mock): def test_reschedule_mode(self): assert DebugExecutor.change_sensor_mode_to_reschedule - def test_is_single_threaded(self): - assert DebugExecutor.is_single_threaded - def test_is_production_default_value(self): assert not DebugExecutor.is_production diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py index 7532c878dc51f..48c531c5cb942 100644 --- a/tests/executors/test_executor_loader.py +++ b/tests/executors/test_executor_loader.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -from contextlib import nullcontext from importlib import reload from unittest import mock @@ -33,11 +32,7 @@ class FakeExecutor: - is_single_threaded = False - - -class FakeSingleThreadedExecutor: - is_single_threaded = True + pass class TestExecutorLoader: @@ -219,31 +214,6 @@ def test_should_support_import_custom_path(self, executor_config): assert executor.__name__ == "FakeExecutor" assert import_source == ConnectorSource.CUSTOM_PATH - @pytest.mark.db_test - @pytest.mark.backend("mysql", "postgres") - @pytest.mark.parametrize("executor", [FakeExecutor, FakeSingleThreadedExecutor]) - def test_validate_database_executor_compatibility_general(self, monkeypatch, executor): - monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") - ExecutorLoader.validate_database_executor_compatibility(executor) - - @pytest.mark.db_test - @pytest.mark.backend("sqlite") - @pytest.mark.parametrize( - ["executor", "expectation"], - [ - pytest.param(FakeSingleThreadedExecutor, nullcontext(), id="single-threaded"), - pytest.param( - FakeExecutor, - pytest.raises(AirflowConfigException, match=r"^error: cannot use SQLite with the .+"), - id="multi-threaded", - ), - ], - ) - def test_validate_database_executor_compatibility_sqlite(self, monkeypatch, executor, expectation): - monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") - with expectation: - ExecutorLoader.validate_database_executor_compatibility(executor) - def test_load_executor(self): with conf_vars({("core", "executor"): "LocalExecutor"}): ExecutorLoader.init_executors() diff --git a/tests/executors/test_sequential_executor.py b/tests/executors/test_sequential_executor.py index f6cb7aae575b5..ea7ab6aad96ef 100644 --- a/tests/executors/test_sequential_executor.py +++ b/tests/executors/test_sequential_executor.py @@ -35,9 +35,6 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert SequentialExecutor.serve_logs - def test_is_single_threaded_default_value(self): - assert SequentialExecutor.is_single_threaded - @mock.patch("airflow.executors.sequential_executor.SequentialExecutor.sync") @mock.patch("airflow.executors.base_executor.BaseExecutor.trigger_tasks") @mock.patch("airflow.executors.base_executor.Stats.gauge") diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index 85097f43d9c4f..963c0a0bd5d7f 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -1438,18 +1438,13 @@ def _clear_db(request): @pytest.fixture(autouse=True) def clear_lru_cache(): - from airflow.executors.executor_loader import ExecutorLoader from airflow.utils.entry_points import _get_grouped_entry_points - ExecutorLoader.validate_database_executor_compatibility.cache_clear() + _get_grouped_entry_points.cache_clear() try: - _get_grouped_entry_points.cache_clear() - try: - yield - finally: - _get_grouped_entry_points.cache_clear() + yield finally: - ExecutorLoader.validate_database_executor_compatibility.cache_clear() + _get_grouped_entry_points.cache_clear() @pytest.fixture(autouse=True) From 335036a311253eea61694b196442344458762afa Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Wed, 11 Dec 2024 12:04:31 +0000 Subject: [PATCH 2/2] fixup! --- .../local_commands/test_scheduler_command.py | 75 ++----------------- 1 file changed, 8 insertions(+), 67 deletions(-) diff --git a/tests/cli/commands/local_commands/test_scheduler_command.py b/tests/cli/commands/local_commands/test_scheduler_command.py index c1969aa9a715b..2dfce8edde6c6 100644 --- a/tests/cli/commands/local_commands/test_scheduler_command.py +++ b/tests/cli/commands/local_commands/test_scheduler_command.py @@ -48,20 +48,9 @@ def setup_class(cls): ("LocalKubernetesExecutor", True), ], ) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_serve_logs_on_scheduler( - self, - mock_process, - mock_scheduler_job, - mock_validate, - executor, - expect_serve_logs, - ): + def test_serve_logs_on_scheduler(self, mock_process, mock_scheduler_job, executor, expect_serve_logs): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) @@ -74,14 +63,10 @@ def test_serve_logs_on_scheduler( with pytest.raises(AssertionError): mock_process.assert_has_calls([mock.call(target=serve_logs)]) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") @pytest.mark.parametrize("executor", ["LocalExecutor", "SequentialExecutor"]) - def test_skip_serve_logs(self, mock_process, mock_scheduler_job, mock_validate, executor): + def test_skip_serve_logs(self, mock_process, mock_scheduler_job, executor): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler", "--skip-serve-logs"]) with conf_vars({("core", "executor"): executor}): @@ -90,17 +75,11 @@ def test_skip_serve_logs(self, mock_process, mock_scheduler_job, mock_validate, with pytest.raises(AssertionError): mock_process.assert_has_calls([mock.call(target=serve_logs)]) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.utils.db.check_and_run_migrations") @mock.patch("airflow.utils.db.synchronize_log_template") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_check_migrations_is_false( - self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, mock_validate - ): + def test_check_migrations_is_false(self, mock_process, mock_scheduler_job, mock_log, mock_run_migration): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) with conf_vars({("database", "check_migrations"): "False"}): @@ -108,17 +87,11 @@ def test_check_migrations_is_false( mock_run_migration.assert_not_called() mock_log.assert_called_once() - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.utils.db.check_and_run_migrations") @mock.patch("airflow.utils.db.synchronize_log_template") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_check_migrations_is_true( - self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, mock_validate - ): + def test_check_migrations_is_true(self, mock_process, mock_scheduler_job, mock_log, mock_run_migration): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) with conf_vars({("database", "check_migrations"): "True"}): @@ -126,14 +99,10 @@ def test_check_migrations_is_true( mock_run_migration.assert_called_once() mock_log.assert_called_once() - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") @pytest.mark.parametrize("executor", ["LocalExecutor", "SequentialExecutor"]) - def test_graceful_shutdown(self, mock_process, mock_scheduler_job, mock_validate, executor): + def test_graceful_shutdown(self, mock_process, mock_scheduler_job, executor): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) with conf_vars({("core", "executor"): executor}): @@ -144,36 +113,18 @@ def test_graceful_shutdown(self, mock_process, mock_scheduler_job, mock_validate finally: mock_process().terminate.assert_called() - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_enable_scheduler_health( - self, - mock_process, - mock_scheduler_job, - mock_validate, - ): + def test_enable_scheduler_health(self, mock_process, mock_scheduler_job): with conf_vars({("scheduler", "enable_health_check"): "True"}): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) scheduler_command.scheduler(args) mock_process.assert_has_calls([mock.call(target=serve_health_check)]) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") - def test_disable_scheduler_health( - self, - mock_process, - mock_scheduler_job, - mock_validate, - ): + def test_disable_scheduler_health(self, mock_process, mock_scheduler_job): mock_scheduler_job.return_value.job_type = "SchedulerJob" args = self.parser.parse_args(["scheduler"]) scheduler_command.scheduler(args) @@ -196,23 +147,13 @@ def test_scheduler_health_host( serve_health_check() assert http_server_mock.call_args.args[0] == (health_check_host, health_check_port) - @mock.patch( - "airflow.cli.commands.local_commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility", - side_effect=None, - ) @mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner") @mock.patch("airflow.cli.commands.local_commands.scheduler_command.Process") @mock.patch( "airflow.cli.commands.local_commands.scheduler_command.run_job", side_effect=Exception("run_job failed"), ) - def test_run_job_exception_handling( - self, - mock_run_job, - mock_process, - mock_scheduler_job, - mock_validate, - ): + def test_run_job_exception_handling(self, mock_run_job, mock_process, mock_scheduler_job): args = self.parser.parse_args(["scheduler"]) with pytest.raises(Exception, match="run_job failed"): scheduler_command.scheduler(args)