From f0834e6ae1302223b485df2251aa5f32a6953fb2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:08:54 +0000 Subject: [PATCH 01/14] Add watchfiles dependency and hot-reload utility with --dev flag support Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- airflow-core/pyproject.toml | 1 + airflow-core/src/airflow/cli/cli_config.py | 2 + .../cli/commands/api_server_command.py | 24 +-- .../airflow/cli/commands/scheduler_command.py | 7 + .../airflow/cli/commands/triggerer_command.py | 10 + airflow-core/src/airflow/utils/hot_reload.py | 171 ++++++++++++++++++ 6 files changed, 204 insertions(+), 11 deletions(-) create mode 100644 airflow-core/src/airflow/utils/hot_reload.py diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index 4f51b6b66d667..d0a16f45e505c 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -140,6 +140,7 @@ dependencies = [ # https://github.com/apache/airflow/issues/56369 , rework universal-pathlib usage "universal-pathlib>=0.2.6,<0.3.0", "uuid6>=2024.7.10", + "watchfiles>=0.20.0", "apache-airflow-task-sdk<1.3.0,>=1.1.0", # pre-installed providers "apache-airflow-providers-common-compat>=1.7.4", diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 31bfc6b90f0d9..f269e8aa706b7 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -1923,6 +1923,7 @@ class GroupCommand(NamedTuple): ARG_LOG_FILE, ARG_SKIP_SERVE_LOGS, ARG_VERBOSE, + ARG_DEV, ), epilog=( "Signals:\n" @@ -1946,6 +1947,7 @@ class GroupCommand(NamedTuple): ARG_CAPACITY, ARG_VERBOSE, ARG_SKIP_SERVE_LOGS, + ARG_DEV, ), ), ActionCommand( diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py b/airflow-core/src/airflow/cli/commands/api_server_command.py index 01844398f79b2..bb42037dd3468 100644 --- a/airflow-core/src/airflow/cli/commands/api_server_command.py +++ b/airflow-core/src/airflow/cli/commands/api_server_command.py @@ -141,17 +141,19 @@ def api_server(args: Namespace): if args.dev: print(f"Starting the API server on port {args.port} and host {args.host} in development mode.") - log.warning("Running in dev mode, ignoring uvicorn args") - from fastapi_cli.cli import _run - - _run( - entrypoint="airflow.api_fastapi.main:app", - port=args.port, - host=args.host, - reload=True, - proxy_headers=args.proxy_headers, - command="dev", - ) + log.info("Running in dev mode with hot-reload enabled") + from airflow.utils.hot_reload import run_with_reloader + + def _run_dev_server(): + _run_api_server( + args=args, + apps=apps, + num_workers=1, # Use single worker in dev mode + worker_timeout=worker_timeout, + proxy_headers=proxy_headers, + ) + + run_with_reloader(_run_dev_server) return run_command_with_daemon_option( diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py b/airflow-core/src/airflow/cli/commands/scheduler_command.py index fcdd9b2b78b4a..3f072ba3453f0 100644 --- a/airflow-core/src/airflow/cli/commands/scheduler_command.py +++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py @@ -51,6 +51,13 @@ def scheduler(args: Namespace): """Start Airflow Scheduler.""" print(settings.HEADER) + if hasattr(args, "dev") and args.dev: + log.info("Starting scheduler in development mode with hot-reload enabled") + from airflow.utils.hot_reload import run_with_reloader + + run_with_reloader(lambda: _run_scheduler_job(args)) + return + run_command_with_daemon_option( args=args, process_name="scheduler", diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index eedc4a49e50c2..c91b13a3d8f09 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -59,6 +59,8 @@ def triggerer_run(skip_serve_logs: bool, capacity: int, triggerer_heartrate: flo @providers_configuration_loaded def triggerer(args): """Start Airflow Triggerer.""" + import logging + from airflow.sdk._shared.secrets_masker import SecretsMasker SecretsMasker.enable_log_masking() @@ -66,6 +68,14 @@ def triggerer(args): print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") + if hasattr(args, "dev") and args.dev: + log = logging.getLogger(__name__) + log.info("Starting triggerer in development mode with hot-reload enabled") + from airflow.utils.hot_reload import run_with_reloader + + run_with_reloader(lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate)) + return + run_command_with_daemon_option( args=args, process_name="triggerer", diff --git a/airflow-core/src/airflow/utils/hot_reload.py b/airflow-core/src/airflow/utils/hot_reload.py new file mode 100644 index 0000000000000..dcddade8b9f10 --- /dev/null +++ b/airflow-core/src/airflow/utils/hot_reload.py @@ -0,0 +1,171 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Hot reload utilities for development mode.""" + +from __future__ import annotations + +import logging +import os +import signal +import sys +from collections.abc import Callable +from pathlib import Path + +log = logging.getLogger(__name__) + + +def run_with_reloader( + callback: Callable, + watch_paths: list[str | Path] | None = None, + exclude_patterns: list[str] | None = None, +): + """ + Run a callback function with automatic reloading on file changes. + + This function monitors specified paths for changes and restarts the process + when changes are detected. Useful for development mode hot-reloading. + + :param callback: The function to run. This should be the main entry point + of the command that needs hot-reload support. + :param watch_paths: List of paths to watch for changes. If None, watches + the Airflow source directory. + :param exclude_patterns: List of glob patterns to exclude from watching. + Common patterns like __pycache__, .git, etc. are excluded by default. + """ + try: + import watchfiles + except ImportError: + log.error( + "watchfiles is not installed. Install it with: pip install 'apache-airflow[dev]' " + "or pip install watchfiles" + ) + sys.exit(1) + + # Default watch paths - watch the airflow source directory + if watch_paths is None: + import airflow + + airflow_root = Path(airflow.__file__).parent + watch_paths = [airflow_root] + + # Default exclude patterns + default_excludes = [ + "**/__pycache__/**", + "**/*.pyc", + "**/*.pyo", + "**/.git/**", + "**/.venv/**", + "**/venv/**", + "**/node_modules/**", + "**/.tox/**", + "**/build/**", + "**/dist/**", + "**/*.egg-info/**", + "**/logs/**", + "**/.pytest_cache/**", + "**/.mypy_cache/**", + "**/.ruff_cache/**", + ] + + if exclude_patterns: + default_excludes.extend(exclude_patterns) + + log.info("Starting in development mode with hot-reload enabled") + log.info("Watching paths: %s", watch_paths) + log.info("Excluding patterns: %s", default_excludes) + + # Check if we're the main process or a reloaded child + reloader_pid = os.environ.get("AIRFLOW_DEV_RELOADER_PID") + if reloader_pid is None: + # We're the main process - set up the reloader + os.environ["AIRFLOW_DEV_RELOADER_PID"] = str(os.getpid()) + _run_reloader(callback, watch_paths, default_excludes) + else: + # We're a child process - just run the callback + callback() + + +def _run_reloader(callback: Callable, watch_paths: list[str | Path], exclude_patterns: list[str]): + """ + Internal function to run the reloader loop. + + This function watches for changes and restarts the process by re-executing + the Python interpreter with the same arguments. + """ + import subprocess + import threading + + from watchfiles import watch + + process = None + should_exit = False + + def start_process(): + """Start or restart the subprocess.""" + nonlocal process + if process is not None: + log.info("Stopping process...") + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + log.warning("Process did not terminate gracefully, killing...") + process.kill() + process.wait() + + log.info("Starting process...") + # Restart the process by re-executing Python with the same arguments + process = subprocess.Popen([sys.executable] + sys.argv) + return process + + def signal_handler(signum, frame): + """Handle termination signals.""" + nonlocal should_exit, process + should_exit = True + log.info("Received signal %s, shutting down...", signum) + if process: + process.terminate() + process.wait() + sys.exit(0) + + # Set up signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Start the initial process + process = start_process() + + log.info("Hot-reload enabled. Watching for file changes...") + log.info("Press Ctrl+C to stop") + + try: + for changes in watch(*watch_paths, watch_filter=None, ignore_patterns=exclude_patterns): + if should_exit: + break + + log.info("Detected changes: %s", changes) + log.info("Reloading...") + + # Restart the process + process = start_process() + + except KeyboardInterrupt: + log.info("Shutting down...") + if process: + process.terminate() + process.wait() From ae403d2a0e0852647b5faa927c1f10d38386cbb3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:10:49 +0000 Subject: [PATCH 02/14] Add tests for --dev flag and hot-reload functionality Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- .../cli/commands/test_api_server_command.py | 16 ++- .../cli/commands/test_scheduler_command.py | 10 ++ .../cli/commands/test_triggerer_command.py | 11 ++ .../tests/unit/utils/test_hot_reload.py | 100 ++++++++++++++++++ 4 files changed, 127 insertions(+), 10 deletions(-) create mode 100644 airflow-core/tests/unit/utils/test_hot_reload.py diff --git a/airflow-core/tests/unit/cli/commands/test_api_server_command.py b/airflow-core/tests/unit/cli/commands/test_api_server_command.py index d633a6791c03b..a9bb0ed9fabec 100644 --- a/airflow-core/tests/unit/cli/commands/test_api_server_command.py +++ b/airflow-core/tests/unit/cli/commands/test_api_server_command.py @@ -62,19 +62,15 @@ class TestCliApiServer(_CommonCLIUvicornTestClass): ) def test_dev_arg(self, args): with ( - mock.patch("fastapi_cli.cli._run") as mock_run, + mock.patch("airflow.utils.hot_reload.run_with_reloader") as mock_reloader, ): args = self.parser.parse_args(args) api_server_command.api_server(args) - mock_run.assert_called_with( - entrypoint="airflow.api_fastapi.main:app", - port=args.port, - host=args.host, - reload=True, - proxy_headers=args.proxy_headers, - command="dev", - ) + # Verify that run_with_reloader was called + mock_reloader.assert_called_once() + # The callback function should be callable + assert callable(mock_reloader.call_args[0][0]) @pytest.mark.parametrize( "args", @@ -111,7 +107,7 @@ def test_api_apps_env(self, args, dev_mode, original_env): with ( mock.patch("os.environ", autospec=True) as mock_environ, mock.patch("uvicorn.run"), - mock.patch("fastapi_cli.cli._run"), + mock.patch("airflow.utils.hot_reload.run_with_reloader"), ): # Mock the environment variable with initial value or None mock_environ.get.return_value = original_env diff --git a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py index 1a9e7cdc24144..4e52f2a54d1f7 100644 --- a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py +++ b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py @@ -163,3 +163,13 @@ def test_run_job_exception_handling(self, mock_run_job, mock_process, mock_sched ) mock_process.assert_called_once_with(target=serve_logs) mock_process().terminate.assert_called_once_with() + + @mock.patch("airflow.utils.hot_reload.run_with_reloader") + def test_scheduler_with_dev_flag(self, mock_reloader): + args = self.parser.parse_args(["scheduler", "--dev"]) + scheduler_command.scheduler(args) + + # Verify that run_with_reloader was called + mock_reloader.assert_called_once() + # The callback function should be callable + assert callable(mock_reloader.call_args[0][0]) diff --git a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py index b5222038f2de5..1cfdb498acaf0 100644 --- a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py +++ b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py @@ -63,3 +63,14 @@ def test_trigger_run_serve_logs(self, mock_process, mock_run_job, mock_trigger_j job=mock_trigger_job_runner.return_value.job, execute_callable=mock_trigger_job_runner.return_value._execute, ) + + @mock.patch("airflow.utils.hot_reload.run_with_reloader") + def test_triggerer_with_dev_flag(self, mock_reloader): + """Ensure that triggerer with --dev flag uses hot-reload""" + args = self.parser.parse_args(["triggerer", "--dev"]) + triggerer_command.triggerer(args) + + # Verify that run_with_reloader was called + mock_reloader.assert_called_once() + # The callback function should be callable + assert callable(mock_reloader.call_args[0][0]) diff --git a/airflow-core/tests/unit/utils/test_hot_reload.py b/airflow-core/tests/unit/utils/test_hot_reload.py new file mode 100644 index 0000000000000..b9429d15ffbf6 --- /dev/null +++ b/airflow-core/tests/unit/utils/test_hot_reload.py @@ -0,0 +1,100 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +import sys +from unittest import mock + +import pytest + +from airflow.utils import hot_reload + + +class TestHotReload: + """Tests for hot reload utilities.""" + + def test_run_with_reloader_missing_watchfiles(self): + """Test that run_with_reloader exits gracefully when watchfiles is not installed.""" + with mock.patch.dict(sys.modules, {"watchfiles": None}): + with pytest.raises(SystemExit): + hot_reload.run_with_reloader(lambda: None) + + @mock.patch("airflow.utils.hot_reload._run_reloader") + def test_run_with_reloader_main_process(self, mock_run_reloader): + """Test run_with_reloader as the main process.""" + # Clear the reloader PID env var to simulate being the main process + with mock.patch.dict(os.environ, {}, clear=True): + callback = mock.Mock() + watch_paths = ["/tmp/test"] + exclude_patterns = ["*.pyc"] + + hot_reload.run_with_reloader(callback, watch_paths, exclude_patterns) + + # Should set the env var and call _run_reloader + assert "AIRFLOW_DEV_RELOADER_PID" in os.environ + mock_run_reloader.assert_called_once() + + def test_run_with_reloader_child_process(self): + """Test run_with_reloader as a child process.""" + # Set the reloader PID env var to simulate being a child process + with mock.patch.dict(os.environ, {"AIRFLOW_DEV_RELOADER_PID": "12345"}): + callback = mock.Mock() + hot_reload.run_with_reloader(callback) + + # Should just call the callback directly + callback.assert_called_once() + + @mock.patch("subprocess.Popen") + @mock.patch("airflow.utils.hot_reload.watch") + def test_run_reloader_starts_process(self, mock_watch, mock_popen): + """Test that _run_reloader starts a subprocess.""" + mock_process = mock.Mock() + mock_popen.return_value = mock_process + mock_watch.return_value = [] # Empty iterator, will exit immediately + + callback = lambda: None + watch_paths = ["/tmp/test"] + exclude_patterns = ["*.pyc"] + + hot_reload._run_reloader(callback, watch_paths, exclude_patterns) + + # Should have started a process + mock_popen.assert_called_once() + assert mock_popen.call_args[0][0] == [sys.executable] + sys.argv + + @mock.patch("subprocess.Popen") + @mock.patch("airflow.utils.hot_reload.watch") + def test_run_reloader_restarts_on_changes(self, mock_watch, mock_popen): + """Test that _run_reloader restarts the process on file changes.""" + mock_process = mock.Mock() + mock_popen.return_value = mock_process + + # Simulate one file change and then exit + mock_watch.return_value = iter([[("change", "/tmp/test/file.py")]]) + + callback = lambda: None + watch_paths = ["/tmp/test"] + exclude_patterns = ["*.pyc"] + + hot_reload._run_reloader(callback, watch_paths, exclude_patterns) + + # Should have started process twice (initial + restart) + assert mock_popen.call_count == 2 + # Should have terminated the first process + mock_process.terminate.assert_called() From 9c4d10918a8b6fed6111eb3b95607074384fab4b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:14:49 +0000 Subject: [PATCH 03/14] Fix watchfiles API usage with proper DefaultFilter Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- airflow-core/src/airflow/utils/hot_reload.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/utils/hot_reload.py b/airflow-core/src/airflow/utils/hot_reload.py index dcddade8b9f10..8cc8df4c9a19a 100644 --- a/airflow-core/src/airflow/utils/hot_reload.py +++ b/airflow-core/src/airflow/utils/hot_reload.py @@ -108,9 +108,8 @@ def _run_reloader(callback: Callable, watch_paths: list[str | Path], exclude_pat the Python interpreter with the same arguments. """ import subprocess - import threading - from watchfiles import watch + from watchfiles import DefaultFilter, watch process = None should_exit = False @@ -153,8 +152,11 @@ def signal_handler(signum, frame): log.info("Hot-reload enabled. Watching for file changes...") log.info("Press Ctrl+C to stop") + # Create a custom filter that excludes specified patterns + watch_filter = DefaultFilter(ignore_paths=exclude_patterns) + try: - for changes in watch(*watch_paths, watch_filter=None, ignore_patterns=exclude_patterns): + for changes in watch(*watch_paths, watch_filter=watch_filter): if should_exit: break From 3fe79e8e73027bd541eccf8040e0108fa03cc9b5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:17:40 +0000 Subject: [PATCH 04/14] Address code review feedback: fix help text, API usage, and improve error messages Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- airflow-core/src/airflow/cli/cli_config.py | 2 +- airflow-core/src/airflow/utils/hot_reload.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index f269e8aa706b7..66db5feaa4ec5 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -669,7 +669,7 @@ def string_lower_type(val): default=conf.get("api", "ssl_key"), help="Path to the key to use with the SSL certificate", ) -ARG_DEV = Arg(("-d", "--dev"), help="Start FastAPI in development mode", action="store_true") +ARG_DEV = Arg(("-d", "--dev"), help="Start in development mode with hot-reload enabled", action="store_true") # scheduler ARG_NUM_RUNS = Arg( diff --git a/airflow-core/src/airflow/utils/hot_reload.py b/airflow-core/src/airflow/utils/hot_reload.py index 8cc8df4c9a19a..dd76b589619f1 100644 --- a/airflow-core/src/airflow/utils/hot_reload.py +++ b/airflow-core/src/airflow/utils/hot_reload.py @@ -51,8 +51,8 @@ def run_with_reloader( import watchfiles except ImportError: log.error( - "watchfiles is not installed. Install it with: pip install 'apache-airflow[dev]' " - "or pip install watchfiles" + "watchfiles is not installed. This is a required dependency for --dev mode. " + "Please reinstall Airflow or install watchfiles separately: pip install watchfiles" ) sys.exit(1) @@ -129,6 +129,8 @@ def start_process(): log.info("Starting process...") # Restart the process by re-executing Python with the same arguments + # Note: sys.argv is safe here as it comes from the original CLI invocation + # and is only used in development mode for hot-reloading the same process process = subprocess.Popen([sys.executable] + sys.argv) return process @@ -153,7 +155,7 @@ def signal_handler(signum, frame): log.info("Press Ctrl+C to stop") # Create a custom filter that excludes specified patterns - watch_filter = DefaultFilter(ignore_paths=exclude_patterns) + watch_filter = DefaultFilter(ignore_patterns=exclude_patterns) try: for changes in watch(*watch_paths, watch_filter=watch_filter): From 922179a53bdcac64a24bef62aa106bfdc2b3bb2a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 1 Nov 2025 13:20:16 +0000 Subject: [PATCH 05/14] Final code review fixes: improve logging and test clarity Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- .../src/airflow/cli/commands/triggerer_command.py | 6 +++--- airflow-core/tests/unit/utils/test_hot_reload.py | 14 ++++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index c91b13a3d8f09..dce44b9c2267f 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -18,6 +18,7 @@ from __future__ import annotations +import logging from collections.abc import Generator from contextlib import contextmanager from functools import partial @@ -31,6 +32,8 @@ from airflow.utils import cli as cli_utils from airflow.utils.providers_configuration_loader import providers_configuration_loaded +log = logging.getLogger(__name__) + @contextmanager def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]: @@ -59,8 +62,6 @@ def triggerer_run(skip_serve_logs: bool, capacity: int, triggerer_heartrate: flo @providers_configuration_loaded def triggerer(args): """Start Airflow Triggerer.""" - import logging - from airflow.sdk._shared.secrets_masker import SecretsMasker SecretsMasker.enable_log_masking() @@ -69,7 +70,6 @@ def triggerer(args): triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") if hasattr(args, "dev") and args.dev: - log = logging.getLogger(__name__) log.info("Starting triggerer in development mode with hot-reload enabled") from airflow.utils.hot_reload import run_with_reloader diff --git a/airflow-core/tests/unit/utils/test_hot_reload.py b/airflow-core/tests/unit/utils/test_hot_reload.py index b9429d15ffbf6..34a01d52b9376 100644 --- a/airflow-core/tests/unit/utils/test_hot_reload.py +++ b/airflow-core/tests/unit/utils/test_hot_reload.py @@ -68,11 +68,14 @@ def test_run_reloader_starts_process(self, mock_watch, mock_popen): mock_popen.return_value = mock_process mock_watch.return_value = [] # Empty iterator, will exit immediately - callback = lambda: None + def test_callback(): + """Test callback function.""" + pass + watch_paths = ["/tmp/test"] exclude_patterns = ["*.pyc"] - hot_reload._run_reloader(callback, watch_paths, exclude_patterns) + hot_reload._run_reloader(test_callback, watch_paths, exclude_patterns) # Should have started a process mock_popen.assert_called_once() @@ -88,11 +91,14 @@ def test_run_reloader_restarts_on_changes(self, mock_watch, mock_popen): # Simulate one file change and then exit mock_watch.return_value = iter([[("change", "/tmp/test/file.py")]]) - callback = lambda: None + def test_callback(): + """Test callback function.""" + pass + watch_paths = ["/tmp/test"] exclude_patterns = ["*.pyc"] - hot_reload._run_reloader(callback, watch_paths, exclude_patterns) + hot_reload._run_reloader(test_callback, watch_paths, exclude_patterns) # Should have started process twice (initial + restart) assert mock_popen.call_count == 2 From 5d03f6107db629bb5a820ca5e7d15088100ef3a4 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 3 Nov 2025 11:42:29 +0800 Subject: [PATCH 06/14] Refactor hot_reload utils --- airflow-core/src/airflow/utils/hot_reload.py | 71 +++++--------------- 1 file changed, 16 insertions(+), 55 deletions(-) diff --git a/airflow-core/src/airflow/utils/hot_reload.py b/airflow-core/src/airflow/utils/hot_reload.py index dd76b589619f1..dd9027d676351 100644 --- a/airflow-core/src/airflow/utils/hot_reload.py +++ b/airflow-core/src/airflow/utils/hot_reload.py @@ -19,20 +19,19 @@ from __future__ import annotations -import logging import os import signal import sys from collections.abc import Callable from pathlib import Path -log = logging.getLogger(__name__) +import structlog + +log = structlog.getLogger(__name__) def run_with_reloader( callback: Callable, - watch_paths: list[str | Path] | None = None, - exclude_patterns: list[str] | None = None, ): """ Run a callback function with automatic reloading on file changes. @@ -42,74 +41,39 @@ def run_with_reloader( :param callback: The function to run. This should be the main entry point of the command that needs hot-reload support. - :param watch_paths: List of paths to watch for changes. If None, watches - the Airflow source directory. - :param exclude_patterns: List of glob patterns to exclude from watching. - Common patterns like __pycache__, .git, etc. are excluded by default. """ - try: - import watchfiles - except ImportError: - log.error( - "watchfiles is not installed. This is a required dependency for --dev mode. " - "Please reinstall Airflow or install watchfiles separately: pip install watchfiles" - ) - sys.exit(1) - # Default watch paths - watch the airflow source directory - if watch_paths is None: - import airflow - - airflow_root = Path(airflow.__file__).parent - watch_paths = [airflow_root] - - # Default exclude patterns - default_excludes = [ - "**/__pycache__/**", - "**/*.pyc", - "**/*.pyo", - "**/.git/**", - "**/.venv/**", - "**/venv/**", - "**/node_modules/**", - "**/.tox/**", - "**/build/**", - "**/dist/**", - "**/*.egg-info/**", - "**/logs/**", - "**/.pytest_cache/**", - "**/.mypy_cache/**", - "**/.ruff_cache/**", - ] - - if exclude_patterns: - default_excludes.extend(exclude_patterns) + import airflow + + airflow_root = Path(airflow.__file__).parent + watch_paths = [airflow_root] log.info("Starting in development mode with hot-reload enabled") log.info("Watching paths: %s", watch_paths) - log.info("Excluding patterns: %s", default_excludes) # Check if we're the main process or a reloaded child reloader_pid = os.environ.get("AIRFLOW_DEV_RELOADER_PID") if reloader_pid is None: # We're the main process - set up the reloader os.environ["AIRFLOW_DEV_RELOADER_PID"] = str(os.getpid()) - _run_reloader(callback, watch_paths, default_excludes) + _run_reloader(watch_paths) else: # We're a child process - just run the callback callback() -def _run_reloader(callback: Callable, watch_paths: list[str | Path], exclude_patterns: list[str]): +def _run_reloader(watch_paths: list[str]): """ - Internal function to run the reloader loop. + Watch for changes and restart the process. + + Watches the provided paths and restarts the process by re-executing the + Python interpreter with the same arguments. - This function watches for changes and restarts the process by re-executing - the Python interpreter with the same arguments. + :param watch_paths: List of paths to watch for changes. """ import subprocess - from watchfiles import DefaultFilter, watch + from watchfiles import watch process = None should_exit = False @@ -154,11 +118,8 @@ def signal_handler(signum, frame): log.info("Hot-reload enabled. Watching for file changes...") log.info("Press Ctrl+C to stop") - # Create a custom filter that excludes specified patterns - watch_filter = DefaultFilter(ignore_patterns=exclude_patterns) - try: - for changes in watch(*watch_paths, watch_filter=watch_filter): + for changes in watch(*watch_paths): if should_exit: break From cf9d063c03b34792b90ade7c1a2cf639cb0745b1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 3 Nov 2025 05:15:06 +0000 Subject: [PATCH 07/14] Refactor hot-reload: move to cli module, remove pyproject change, add dag-processor support Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- airflow-core/pyproject.toml | 1 - airflow-core/src/airflow/cli/cli_config.py | 1 + .../cli/commands/api_server_command.py | 24 +++++++++---------- .../cli/commands/dag_processor_command.py | 7 ++++++ .../airflow/cli/commands/scheduler_command.py | 2 +- .../airflow/cli/commands/triggerer_command.py | 2 +- .../src/airflow/{utils => cli}/hot_reload.py | 0 .../cli/commands/test_api_server_command.py | 16 ++++++++----- .../commands/test_dag_processor_command.py | 11 +++++++++ .../cli/commands/test_scheduler_command.py | 2 +- .../cli/commands/test_triggerer_command.py | 2 +- .../unit/{utils => cli}/test_hot_reload.py | 12 ++++------ 12 files changed, 49 insertions(+), 31 deletions(-) rename airflow-core/src/airflow/{utils => cli}/hot_reload.py (100%) rename airflow-core/tests/unit/{utils => cli}/test_hot_reload.py (91%) diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index d0a16f45e505c..4f51b6b66d667 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -140,7 +140,6 @@ dependencies = [ # https://github.com/apache/airflow/issues/56369 , rework universal-pathlib usage "universal-pathlib>=0.2.6,<0.3.0", "uuid6>=2024.7.10", - "watchfiles>=0.20.0", "apache-airflow-task-sdk<1.3.0,>=1.1.0", # pre-installed providers "apache-airflow-providers-common-compat>=1.7.4", diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 66db5feaa4ec5..0e0bb1f44088d 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -1963,6 +1963,7 @@ class GroupCommand(NamedTuple): ARG_STDERR, ARG_LOG_FILE, ARG_VERBOSE, + ARG_DEV, ), ), ActionCommand( diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py b/airflow-core/src/airflow/cli/commands/api_server_command.py index bb42037dd3468..01844398f79b2 100644 --- a/airflow-core/src/airflow/cli/commands/api_server_command.py +++ b/airflow-core/src/airflow/cli/commands/api_server_command.py @@ -141,19 +141,17 @@ def api_server(args: Namespace): if args.dev: print(f"Starting the API server on port {args.port} and host {args.host} in development mode.") - log.info("Running in dev mode with hot-reload enabled") - from airflow.utils.hot_reload import run_with_reloader - - def _run_dev_server(): - _run_api_server( - args=args, - apps=apps, - num_workers=1, # Use single worker in dev mode - worker_timeout=worker_timeout, - proxy_headers=proxy_headers, - ) - - run_with_reloader(_run_dev_server) + log.warning("Running in dev mode, ignoring uvicorn args") + from fastapi_cli.cli import _run + + _run( + entrypoint="airflow.api_fastapi.main:app", + port=args.port, + host=args.host, + reload=True, + proxy_headers=args.proxy_headers, + command="dev", + ) return run_command_with_daemon_option( diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py b/airflow-core/src/airflow/cli/commands/dag_processor_command.py index 64e36a864fc3f..f67ba764e79ad 100644 --- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py @@ -52,6 +52,13 @@ def dag_processor(args): """Start Airflow Dag Processor Job.""" job_runner = _create_dag_processor_job_runner(args) + if hasattr(args, "dev") and args.dev: + log.info("Starting dag-processor in development mode with hot-reload enabled") + from airflow.cli.hot_reload import run_with_reloader + + run_with_reloader(lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute)) + return + run_command_with_daemon_option( args=args, process_name="dag-processor", diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py b/airflow-core/src/airflow/cli/commands/scheduler_command.py index 3f072ba3453f0..e4870aeb4640a 100644 --- a/airflow-core/src/airflow/cli/commands/scheduler_command.py +++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py @@ -53,7 +53,7 @@ def scheduler(args: Namespace): if hasattr(args, "dev") and args.dev: log.info("Starting scheduler in development mode with hot-reload enabled") - from airflow.utils.hot_reload import run_with_reloader + from airflow.cli.hot_reload import run_with_reloader run_with_reloader(lambda: _run_scheduler_job(args)) return diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index dce44b9c2267f..4cbfea0db151d 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -71,7 +71,7 @@ def triggerer(args): if hasattr(args, "dev") and args.dev: log.info("Starting triggerer in development mode with hot-reload enabled") - from airflow.utils.hot_reload import run_with_reloader + from airflow.cli.hot_reload import run_with_reloader run_with_reloader(lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate)) return diff --git a/airflow-core/src/airflow/utils/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py similarity index 100% rename from airflow-core/src/airflow/utils/hot_reload.py rename to airflow-core/src/airflow/cli/hot_reload.py diff --git a/airflow-core/tests/unit/cli/commands/test_api_server_command.py b/airflow-core/tests/unit/cli/commands/test_api_server_command.py index a9bb0ed9fabec..d633a6791c03b 100644 --- a/airflow-core/tests/unit/cli/commands/test_api_server_command.py +++ b/airflow-core/tests/unit/cli/commands/test_api_server_command.py @@ -62,15 +62,19 @@ class TestCliApiServer(_CommonCLIUvicornTestClass): ) def test_dev_arg(self, args): with ( - mock.patch("airflow.utils.hot_reload.run_with_reloader") as mock_reloader, + mock.patch("fastapi_cli.cli._run") as mock_run, ): args = self.parser.parse_args(args) api_server_command.api_server(args) - # Verify that run_with_reloader was called - mock_reloader.assert_called_once() - # The callback function should be callable - assert callable(mock_reloader.call_args[0][0]) + mock_run.assert_called_with( + entrypoint="airflow.api_fastapi.main:app", + port=args.port, + host=args.host, + reload=True, + proxy_headers=args.proxy_headers, + command="dev", + ) @pytest.mark.parametrize( "args", @@ -107,7 +111,7 @@ def test_api_apps_env(self, args, dev_mode, original_env): with ( mock.patch("os.environ", autospec=True) as mock_environ, mock.patch("uvicorn.run"), - mock.patch("airflow.utils.hot_reload.run_with_reloader"), + mock.patch("fastapi_cli.cli._run"), ): # Mock the environment variable with initial value or None mock_environ.get.return_value = original_env diff --git a/airflow-core/tests/unit/cli/commands/test_dag_processor_command.py b/airflow-core/tests/unit/cli/commands/test_dag_processor_command.py index 3e0e931587cd3..6224be2c5d59a 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_processor_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_processor_command.py @@ -56,3 +56,14 @@ def test_bundle_names_passed(self, mock_runner, configure_testing_dag_bundle): with configure_testing_dag_bundle(os.devnull): dag_processor_command.dag_processor(args) assert mock_runner.call_args.kwargs["processor"].bundle_names_to_parse == ["testing"] + + @mock.patch("airflow.cli.hot_reload.run_with_reloader") + def test_dag_processor_with_dev_flag(self, mock_reloader): + """Ensure that dag-processor with --dev flag uses hot-reload""" + args = self.parser.parse_args(["dag-processor", "--dev"]) + dag_processor_command.dag_processor(args) + + # Verify that run_with_reloader was called + mock_reloader.assert_called_once() + # The callback function should be callable + assert callable(mock_reloader.call_args[0][0]) diff --git a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py index 4e52f2a54d1f7..e381f9a754f0e 100644 --- a/airflow-core/tests/unit/cli/commands/test_scheduler_command.py +++ b/airflow-core/tests/unit/cli/commands/test_scheduler_command.py @@ -164,7 +164,7 @@ def test_run_job_exception_handling(self, mock_run_job, mock_process, mock_sched mock_process.assert_called_once_with(target=serve_logs) mock_process().terminate.assert_called_once_with() - @mock.patch("airflow.utils.hot_reload.run_with_reloader") + @mock.patch("airflow.cli.hot_reload.run_with_reloader") def test_scheduler_with_dev_flag(self, mock_reloader): args = self.parser.parse_args(["scheduler", "--dev"]) scheduler_command.scheduler(args) diff --git a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py index 1cfdb498acaf0..44120f27fc981 100644 --- a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py +++ b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py @@ -64,7 +64,7 @@ def test_trigger_run_serve_logs(self, mock_process, mock_run_job, mock_trigger_j execute_callable=mock_trigger_job_runner.return_value._execute, ) - @mock.patch("airflow.utils.hot_reload.run_with_reloader") + @mock.patch("airflow.cli.hot_reload.run_with_reloader") def test_triggerer_with_dev_flag(self, mock_reloader): """Ensure that triggerer with --dev flag uses hot-reload""" args = self.parser.parse_args(["triggerer", "--dev"]) diff --git a/airflow-core/tests/unit/utils/test_hot_reload.py b/airflow-core/tests/unit/cli/test_hot_reload.py similarity index 91% rename from airflow-core/tests/unit/utils/test_hot_reload.py rename to airflow-core/tests/unit/cli/test_hot_reload.py index 34a01d52b9376..8401c5a38a736 100644 --- a/airflow-core/tests/unit/utils/test_hot_reload.py +++ b/airflow-core/tests/unit/cli/test_hot_reload.py @@ -23,7 +23,7 @@ import pytest -from airflow.utils import hot_reload +from airflow.cli import hot_reload class TestHotReload: @@ -35,16 +35,14 @@ def test_run_with_reloader_missing_watchfiles(self): with pytest.raises(SystemExit): hot_reload.run_with_reloader(lambda: None) - @mock.patch("airflow.utils.hot_reload._run_reloader") + @mock.patch("airflow.cli.hot_reload._run_reloader") def test_run_with_reloader_main_process(self, mock_run_reloader): """Test run_with_reloader as the main process.""" # Clear the reloader PID env var to simulate being the main process with mock.patch.dict(os.environ, {}, clear=True): callback = mock.Mock() - watch_paths = ["/tmp/test"] - exclude_patterns = ["*.pyc"] - hot_reload.run_with_reloader(callback, watch_paths, exclude_patterns) + hot_reload.run_with_reloader(callback) # Should set the env var and call _run_reloader assert "AIRFLOW_DEV_RELOADER_PID" in os.environ @@ -61,7 +59,7 @@ def test_run_with_reloader_child_process(self): callback.assert_called_once() @mock.patch("subprocess.Popen") - @mock.patch("airflow.utils.hot_reload.watch") + @mock.patch("airflow.cli.hot_reload.watch") def test_run_reloader_starts_process(self, mock_watch, mock_popen): """Test that _run_reloader starts a subprocess.""" mock_process = mock.Mock() @@ -82,7 +80,7 @@ def test_callback(): assert mock_popen.call_args[0][0] == [sys.executable] + sys.argv @mock.patch("subprocess.Popen") - @mock.patch("airflow.utils.hot_reload.watch") + @mock.patch("airflow.cli.hot_reload.watch") def test_run_reloader_restarts_on_changes(self, mock_watch, mock_popen): """Test that _run_reloader restarts the process on file changes.""" mock_process = mock.Mock() From 6938ea4ee82cfd832f88ee1330aeaee4624e88aa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 3 Nov 2025 05:50:02 +0000 Subject: [PATCH 08/14] Fix serve-log teardown issue using psutil to terminate process tree Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- airflow-core/src/airflow/cli/hot_reload.py | 77 +++++++++++++++++++--- 1 file changed, 68 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/cli/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py index dd9027d676351..28274f3107bc1 100644 --- a/airflow-core/src/airflow/cli/hot_reload.py +++ b/airflow-core/src/airflow/cli/hot_reload.py @@ -82,14 +82,49 @@ def start_process(): """Start or restart the subprocess.""" nonlocal process if process is not None: - log.info("Stopping process...") - process.terminate() + log.info("Stopping process and all its children...") + # Use psutil to terminate the entire process tree try: - process.wait(timeout=5) - except subprocess.TimeoutExpired: - log.warning("Process did not terminate gracefully, killing...") - process.kill() - process.wait() + import psutil + + parent = psutil.Process(process.pid) + # Get all child processes recursively + children = parent.children(recursive=True) + + # Terminate all children first + for child in children: + try: + child.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Terminate the parent + parent.terminate() + + # Wait for all processes to terminate + gone, alive = psutil.wait_procs(children + [parent], timeout=5) + + # Force kill any remaining processes + for proc in alive: + try: + log.warning("Force killing process %s", proc.pid) + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + except (psutil.NoSuchProcess, psutil.AccessDenied): + # Process already terminated + pass + except Exception as e: + log.warning("Error terminating process tree: %s", e) + # Fallback to simple termination + try: + process.terminate() + process.wait(timeout=5) + except subprocess.TimeoutExpired: + log.warning("Process did not terminate gracefully, killing...") + process.kill() + process.wait() log.info("Starting process...") # Restart the process by re-executing Python with the same arguments @@ -104,8 +139,32 @@ def signal_handler(signum, frame): should_exit = True log.info("Received signal %s, shutting down...", signum) if process: - process.terminate() - process.wait() + # Terminate the entire process tree + try: + import psutil + + parent = psutil.Process(process.pid) + children = parent.children(recursive=True) + + # Terminate all children + for child in children: + try: + child.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Terminate parent + parent.terminate() + + # Wait for processes to terminate + psutil.wait_procs(children + [parent], timeout=5) + except Exception: + # Fallback to simple termination + try: + process.terminate() + process.wait() + except Exception: + pass sys.exit(0) # Set up signal handlers From b536b499c26c0c5800b98211064afb97717e2a0e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 3 Nov 2025 08:24:19 +0000 Subject: [PATCH 09/14] Refactor hot_reload: extract _terminate_process_tree helper function Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- airflow-core/src/airflow/cli/hot_reload.py | 128 ++++++++++----------- 1 file changed, 60 insertions(+), 68 deletions(-) diff --git a/airflow-core/src/airflow/cli/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py index 28274f3107bc1..6e49143c3a65a 100644 --- a/airflow-core/src/airflow/cli/hot_reload.py +++ b/airflow-core/src/airflow/cli/hot_reload.py @@ -62,6 +62,64 @@ def run_with_reloader( callback() +def _terminate_process_tree(process, timeout: int = 5, force_kill_remaining: bool = True): + """ + Terminate a process and all its children recursively. + + Uses psutil to ensure all child processes are properly terminated, + which is important for cleaning up subprocesses like serve-log servers. + + :param process: The subprocess.Popen process to terminate + :param timeout: Timeout in seconds to wait for graceful termination + :param force_kill_remaining: If True, force kill processes that don't terminate gracefully + """ + import subprocess + + try: + import psutil + + parent = psutil.Process(process.pid) + # Get all child processes recursively + children = parent.children(recursive=True) + + # Terminate all children first + for child in children: + try: + child.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Terminate the parent + parent.terminate() + + # Wait for all processes to terminate + gone, alive = psutil.wait_procs(children + [parent], timeout=timeout) + + # Force kill any remaining processes if requested + if force_kill_remaining: + for proc in alive: + try: + log.warning("Force killing process %s", proc.pid) + proc.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + except (psutil.NoSuchProcess, psutil.AccessDenied): + # Process already terminated + pass + except Exception as e: + log.warning("Error terminating process tree: %s", e) + # Fallback to simple termination + try: + process.terminate() + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + if force_kill_remaining: + log.warning("Process did not terminate gracefully, killing...") + process.kill() + process.wait() + + def _run_reloader(watch_paths: list[str]): """ Watch for changes and restart the process. @@ -83,48 +141,7 @@ def start_process(): nonlocal process if process is not None: log.info("Stopping process and all its children...") - # Use psutil to terminate the entire process tree - try: - import psutil - - parent = psutil.Process(process.pid) - # Get all child processes recursively - children = parent.children(recursive=True) - - # Terminate all children first - for child in children: - try: - child.terminate() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - # Terminate the parent - parent.terminate() - - # Wait for all processes to terminate - gone, alive = psutil.wait_procs(children + [parent], timeout=5) - - # Force kill any remaining processes - for proc in alive: - try: - log.warning("Force killing process %s", proc.pid) - proc.kill() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - except (psutil.NoSuchProcess, psutil.AccessDenied): - # Process already terminated - pass - except Exception as e: - log.warning("Error terminating process tree: %s", e) - # Fallback to simple termination - try: - process.terminate() - process.wait(timeout=5) - except subprocess.TimeoutExpired: - log.warning("Process did not terminate gracefully, killing...") - process.kill() - process.wait() + _terminate_process_tree(process, timeout=5, force_kill_remaining=True) log.info("Starting process...") # Restart the process by re-executing Python with the same arguments @@ -139,32 +156,7 @@ def signal_handler(signum, frame): should_exit = True log.info("Received signal %s, shutting down...", signum) if process: - # Terminate the entire process tree - try: - import psutil - - parent = psutil.Process(process.pid) - children = parent.children(recursive=True) - - # Terminate all children - for child in children: - try: - child.terminate() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - # Terminate parent - parent.terminate() - - # Wait for processes to terminate - psutil.wait_procs(children + [parent], timeout=5) - except Exception: - # Fallback to simple termination - try: - process.terminate() - process.wait() - except Exception: - pass + _terminate_process_tree(process, timeout=5, force_kill_remaining=False) sys.exit(0) # Set up signal handlers From 7da7ce9261778d18af84f5ffbd0d1be29282af3c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 3 Nov 2025 10:58:56 +0000 Subject: [PATCH 10/14] Add type annotations and process_name parameter to hot_reload Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- .../airflow/cli/commands/dag_processor_command.py | 6 ++++-- .../src/airflow/cli/commands/scheduler_command.py | 3 +-- .../src/airflow/cli/commands/triggerer_command.py | 6 ++++-- airflow-core/src/airflow/cli/hot_reload.py | 12 +++++++++--- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py b/airflow-core/src/airflow/cli/commands/dag_processor_command.py index f67ba764e79ad..84785de190b1d 100644 --- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py @@ -53,10 +53,12 @@ def dag_processor(args): job_runner = _create_dag_processor_job_runner(args) if hasattr(args, "dev") and args.dev: - log.info("Starting dag-processor in development mode with hot-reload enabled") from airflow.cli.hot_reload import run_with_reloader - run_with_reloader(lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute)) + run_with_reloader( + lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute), + process_name="dag-processor", + ) return run_command_with_daemon_option( diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py b/airflow-core/src/airflow/cli/commands/scheduler_command.py index e4870aeb4640a..83e187e2adf2e 100644 --- a/airflow-core/src/airflow/cli/commands/scheduler_command.py +++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py @@ -52,10 +52,9 @@ def scheduler(args: Namespace): print(settings.HEADER) if hasattr(args, "dev") and args.dev: - log.info("Starting scheduler in development mode with hot-reload enabled") from airflow.cli.hot_reload import run_with_reloader - run_with_reloader(lambda: _run_scheduler_job(args)) + run_with_reloader(lambda: _run_scheduler_job(args), process_name="scheduler") return run_command_with_daemon_option( diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index 4cbfea0db151d..be94dd0478ff5 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -70,10 +70,12 @@ def triggerer(args): triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") if hasattr(args, "dev") and args.dev: - log.info("Starting triggerer in development mode with hot-reload enabled") from airflow.cli.hot_reload import run_with_reloader - run_with_reloader(lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate)) + run_with_reloader( + lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), + process_name="triggerer", + ) return run_command_with_daemon_option( diff --git a/airflow-core/src/airflow/cli/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py index 6e49143c3a65a..d5a6228960c34 100644 --- a/airflow-core/src/airflow/cli/hot_reload.py +++ b/airflow-core/src/airflow/cli/hot_reload.py @@ -32,6 +32,7 @@ def run_with_reloader( callback: Callable, + process_name: str = "process", ): """ Run a callback function with automatic reloading on file changes. @@ -41,6 +42,7 @@ def run_with_reloader( :param callback: The function to run. This should be the main entry point of the command that needs hot-reload support. + :param process_name: Name of the process being run (for logging purposes) """ # Default watch paths - watch the airflow source directory import airflow @@ -48,7 +50,7 @@ def run_with_reloader( airflow_root = Path(airflow.__file__).parent watch_paths = [airflow_root] - log.info("Starting in development mode with hot-reload enabled") + log.info("Starting %s in development mode with hot-reload enabled", process_name) log.info("Watching paths: %s", watch_paths) # Check if we're the main process or a reloaded child @@ -62,7 +64,11 @@ def run_with_reloader( callback() -def _terminate_process_tree(process, timeout: int = 5, force_kill_remaining: bool = True): +def _terminate_process_tree( + process: "subprocess.Popen[bytes]", + timeout: int = 5, + force_kill_remaining: bool = True, +) -> None: """ Terminate a process and all its children recursively. @@ -120,7 +126,7 @@ def _terminate_process_tree(process, timeout: int = 5, force_kill_remaining: boo process.wait() -def _run_reloader(watch_paths: list[str]): +def _run_reloader(watch_paths: list[str | Path]) -> None: """ Watch for changes and restart the process. From 8964a94f5b1a4b591e350b29ce3757b49c294543 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 3 Nov 2025 19:09:01 +0800 Subject: [PATCH 11/14] Refactor final nits - remove logger in triggerer command - ensure type annotation in hot-reload module - fix test hot-reload --- .../airflow/cli/commands/triggerer_command.py | 3 -- airflow-core/src/airflow/cli/hot_reload.py | 18 ++++++---- .../tests/unit/cli/test_hot_reload.py | 36 +++++++++---------- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index be94dd0478ff5..fe1adbe26c6f7 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -18,7 +18,6 @@ from __future__ import annotations -import logging from collections.abc import Generator from contextlib import contextmanager from functools import partial @@ -32,8 +31,6 @@ from airflow.utils import cli as cli_utils from airflow.utils.providers_configuration_loader import providers_configuration_loaded -log = logging.getLogger(__name__) - @contextmanager def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]: diff --git a/airflow-core/src/airflow/cli/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py index d5a6228960c34..c70a3b1945272 100644 --- a/airflow-core/src/airflow/cli/hot_reload.py +++ b/airflow-core/src/airflow/cli/hot_reload.py @@ -24,9 +24,13 @@ import sys from collections.abc import Callable from pathlib import Path +from typing import TYPE_CHECKING import structlog +if TYPE_CHECKING: + import subprocess + log = structlog.getLogger(__name__) @@ -65,7 +69,7 @@ def run_with_reloader( def _terminate_process_tree( - process: "subprocess.Popen[bytes]", + process: subprocess.Popen[bytes], timeout: int = 5, force_kill_remaining: bool = True, ) -> None: @@ -83,24 +87,24 @@ def _terminate_process_tree( try: import psutil - + parent = psutil.Process(process.pid) # Get all child processes recursively children = parent.children(recursive=True) - + # Terminate all children first for child in children: try: child.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): pass - + # Terminate the parent parent.terminate() - + # Wait for all processes to terminate gone, alive = psutil.wait_procs(children + [parent], timeout=timeout) - + # Force kill any remaining processes if requested if force_kill_remaining: for proc in alive: @@ -109,7 +113,7 @@ def _terminate_process_tree( proc.kill() except (psutil.NoSuchProcess, psutil.AccessDenied): pass - + except (psutil.NoSuchProcess, psutil.AccessDenied): # Process already terminated pass diff --git a/airflow-core/tests/unit/cli/test_hot_reload.py b/airflow-core/tests/unit/cli/test_hot_reload.py index 8401c5a38a736..952671ab2367e 100644 --- a/airflow-core/tests/unit/cli/test_hot_reload.py +++ b/airflow-core/tests/unit/cli/test_hot_reload.py @@ -29,10 +29,15 @@ class TestHotReload: """Tests for hot reload utilities.""" - def test_run_with_reloader_missing_watchfiles(self): - """Test that run_with_reloader exits gracefully when watchfiles is not installed.""" - with mock.patch.dict(sys.modules, {"watchfiles": None}): - with pytest.raises(SystemExit): + @mock.patch("airflow.cli.hot_reload._run_reloader") + def test_run_with_reloader_missing_watchfiles(self, mock_run_reloader): + """Test that run_with_reloader handles missing watchfiles by raising ImportError.""" + # Simulate watchfiles not being available when _run_reloader tries to import it + mock_run_reloader.side_effect = ImportError("No module named 'watchfiles'") + + # Clear the reloader PID env var to simulate being the main process + with mock.patch.dict(os.environ, {}, clear=True): + with pytest.raises(ImportError): hot_reload.run_with_reloader(lambda: None) @mock.patch("airflow.cli.hot_reload._run_reloader") @@ -59,29 +64,25 @@ def test_run_with_reloader_child_process(self): callback.assert_called_once() @mock.patch("subprocess.Popen") - @mock.patch("airflow.cli.hot_reload.watch") + @mock.patch("watchfiles.watch") def test_run_reloader_starts_process(self, mock_watch, mock_popen): """Test that _run_reloader starts a subprocess.""" mock_process = mock.Mock() mock_popen.return_value = mock_process mock_watch.return_value = [] # Empty iterator, will exit immediately - def test_callback(): - """Test callback function.""" - pass - watch_paths = ["/tmp/test"] - exclude_patterns = ["*.pyc"] - hot_reload._run_reloader(test_callback, watch_paths, exclude_patterns) + hot_reload._run_reloader(watch_paths) # Should have started a process mock_popen.assert_called_once() assert mock_popen.call_args[0][0] == [sys.executable] + sys.argv + @mock.patch("airflow.cli.hot_reload._terminate_process_tree") @mock.patch("subprocess.Popen") - @mock.patch("airflow.cli.hot_reload.watch") - def test_run_reloader_restarts_on_changes(self, mock_watch, mock_popen): + @mock.patch("watchfiles.watch") + def test_run_reloader_restarts_on_changes(self, mock_watch, mock_popen, mock_terminate): """Test that _run_reloader restarts the process on file changes.""" mock_process = mock.Mock() mock_popen.return_value = mock_process @@ -89,16 +90,11 @@ def test_run_reloader_restarts_on_changes(self, mock_watch, mock_popen): # Simulate one file change and then exit mock_watch.return_value = iter([[("change", "/tmp/test/file.py")]]) - def test_callback(): - """Test callback function.""" - pass - watch_paths = ["/tmp/test"] - exclude_patterns = ["*.pyc"] - hot_reload._run_reloader(test_callback, watch_paths, exclude_patterns) + hot_reload._run_reloader(watch_paths) # Should have started process twice (initial + restart) assert mock_popen.call_count == 2 # Should have terminated the first process - mock_process.terminate.assert_called() + mock_terminate.assert_called() From abaa0d2ad75531910e689a361c4e3e041e53206c Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 3 Nov 2025 20:56:18 +0800 Subject: [PATCH 12/14] Fix mypy error --- airflow-core/src/airflow/cli/hot_reload.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/cli/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py index c70a3b1945272..7012ae4f87705 100644 --- a/airflow-core/src/airflow/cli/hot_reload.py +++ b/airflow-core/src/airflow/cli/hot_reload.py @@ -22,7 +22,7 @@ import os import signal import sys -from collections.abc import Callable +from collections.abc import Callable, Sequence from pathlib import Path from typing import TYPE_CHECKING @@ -130,7 +130,7 @@ def _terminate_process_tree( process.wait() -def _run_reloader(watch_paths: list[str | Path]) -> None: +def _run_reloader(watch_paths: Sequence[str | Path]) -> None: """ Watch for changes and restart the process. From bece8596bde08dce2c2ded033a06d25e326f2376 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 4 Nov 2025 23:50:26 +0800 Subject: [PATCH 13/14] Respect DEV_MODE env var --- .../cli/commands/api_server_command.py | 2 +- .../cli/commands/dag_processor_command.py | 2 +- .../airflow/cli/commands/scheduler_command.py | 2 +- .../airflow/cli/commands/triggerer_command.py | 2 +- airflow-core/src/airflow/utils/cli.py | 7 +++++ .../tests/unit/utils/test_cli_util.py | 28 +++++++++++++++++++ 6 files changed, 39 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/api_server_command.py b/airflow-core/src/airflow/cli/commands/api_server_command.py index 01844398f79b2..cdecbea9b8ea0 100644 --- a/airflow-core/src/airflow/cli/commands/api_server_command.py +++ b/airflow-core/src/airflow/cli/commands/api_server_command.py @@ -139,7 +139,7 @@ def api_server(args: Namespace): get_signing_args() - if args.dev: + if cli_utils.should_enable_hot_reload(args): print(f"Starting the API server on port {args.port} and host {args.host} in development mode.") log.warning("Running in dev mode, ignoring uvicorn args") from fastapi_cli.cli import _run diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py b/airflow-core/src/airflow/cli/commands/dag_processor_command.py index 84785de190b1d..f4c303c278dbb 100644 --- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py @@ -52,7 +52,7 @@ def dag_processor(args): """Start Airflow Dag Processor Job.""" job_runner = _create_dag_processor_job_runner(args) - if hasattr(args, "dev") and args.dev: + if cli_utils.should_enable_hot_reload(args): from airflow.cli.hot_reload import run_with_reloader run_with_reloader( diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py b/airflow-core/src/airflow/cli/commands/scheduler_command.py index 83e187e2adf2e..089d7dd51967d 100644 --- a/airflow-core/src/airflow/cli/commands/scheduler_command.py +++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py @@ -51,7 +51,7 @@ def scheduler(args: Namespace): """Start Airflow Scheduler.""" print(settings.HEADER) - if hasattr(args, "dev") and args.dev: + if cli_utils.should_enable_hot_reload(args): from airflow.cli.hot_reload import run_with_reloader run_with_reloader(lambda: _run_scheduler_job(args), process_name="scheduler") diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index fe1adbe26c6f7..a293d44603051 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -66,7 +66,7 @@ def triggerer(args): print(settings.HEADER) triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") - if hasattr(args, "dev") and args.dev: + if cli_utils.should_enable_hot_reload(args): from airflow.cli.hot_reload import run_with_reloader run_with_reloader( diff --git a/airflow-core/src/airflow/utils/cli.py b/airflow-core/src/airflow/utils/cli.py index 17cad26771c1d..92ed36201f22b 100644 --- a/airflow-core/src/airflow/utils/cli.py +++ b/airflow-core/src/airflow/utils/cli.py @@ -472,3 +472,10 @@ def validate_dag_bundle_arg(bundle_names: list[str]) -> None: unknown_bundles: set[str] = set(bundle_names) - known_bundles if unknown_bundles: raise SystemExit(f"Bundles not found: {', '.join(unknown_bundles)}") + + +def should_enable_hot_reload(args) -> bool: + """Check whether hot-reload should be enabled based on --dev flag or DEV_MODE env var.""" + if hasattr(args, "dev") and args.dev: + return True + return os.getenv("DEV_MODE", "false").lower() == "true" diff --git a/airflow-core/tests/unit/utils/test_cli_util.py b/airflow-core/tests/unit/utils/test_cli_util.py index 40ab3080c52b6..49200510f6844 100644 --- a/airflow-core/tests/unit/utils/test_cli_util.py +++ b/airflow-core/tests/unit/utils/test_cli_util.py @@ -289,3 +289,31 @@ def test_validate_dag_bundle_arg(): # doesn't raise cli.validate_dag_bundle_arg(["dags-folder"]) + + +@pytest.mark.parametrize( + ["dev_flag", "env_var", "expected"], + [ + # --dev flag tests + (True, None, True), + (False, None, False), + (None, None, False), # no dev flag attribute + # DEV_MODE env var tests + (False, "true", True), + (False, "false", False), + (False, "TRUE", True), + (False, "True", True), + # --dev flag takes precedence + (True, "false", True), + # Invalid env var values + (False, "yes", False), + (False, "1", False), + ], +) +def test_should_enable_hot_reload(dev_flag, env_var, expected): + """Test should_enable_hot_reload with various --dev flag and DEV_MODE env var combinations.""" + args = Namespace() if dev_flag is None else Namespace(dev=dev_flag) + env = {} if env_var is None else {"DEV_MODE": env_var} + + with mock.patch.dict(os.environ, env, clear=True): + assert cli.should_enable_hot_reload(args) is expected From dc41c5cd3f49148b0c9fdc73c0cf74a2488c494e Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 10 Nov 2025 19:46:53 +0800 Subject: [PATCH 14/14] Fix nits --- airflow-core/src/airflow/cli/hot_reload.py | 6 +++--- airflow-core/src/airflow/utils/cli.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/cli/hot_reload.py b/airflow-core/src/airflow/cli/hot_reload.py index 7012ae4f87705..ecf64c2781dc2 100644 --- a/airflow-core/src/airflow/cli/hot_reload.py +++ b/airflow-core/src/airflow/cli/hot_reload.py @@ -37,7 +37,7 @@ def run_with_reloader( callback: Callable, process_name: str = "process", -): +) -> None: """ Run a callback function with automatic reloading on file changes. @@ -85,9 +85,9 @@ def _terminate_process_tree( """ import subprocess - try: - import psutil + import psutil + try: parent = psutil.Process(process.pid) # Get all child processes recursively children = parent.children(recursive=True) diff --git a/airflow-core/src/airflow/utils/cli.py b/airflow-core/src/airflow/utils/cli.py index 92ed36201f22b..ed9eb7e84f633 100644 --- a/airflow-core/src/airflow/utils/cli.py +++ b/airflow-core/src/airflow/utils/cli.py @@ -476,6 +476,6 @@ def validate_dag_bundle_arg(bundle_names: list[str]) -> None: def should_enable_hot_reload(args) -> bool: """Check whether hot-reload should be enabled based on --dev flag or DEV_MODE env var.""" - if hasattr(args, "dev") and args.dev: + if getattr(args, "dev", False): return True return os.getenv("DEV_MODE", "false").lower() == "true"