From 30b2fc0a99f2a6ce84e49f19c981db949b14486f Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 24 May 2025 04:01:42 +0530 Subject: [PATCH 1/3] Fix lingering task supervisors when ``EOF`` is missed closes https://github.com/apache/airflow/issues/50500 Adds a new safeguard for cases where the task subprocess closes before all pipe sockets send EOF. The supervisor now records the process exit time and forcibly closes any sockets still open after `workers.socket_cleanup_timeout`. This stops the supervisor loop from hanging indefinitely and allows the process to exit cleanly. --- airflow-core/docs/troubleshooting.rst | 10 ++++ .../src/airflow/config_templates/config.yml | 9 ++++ .../airflow/sdk/execution_time/supervisor.py | 53 +++++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 7 +++ .../execution_time/test_supervisor.py | 38 +++++++++++++ 5 files changed, 117 insertions(+) diff --git a/airflow-core/docs/troubleshooting.rst b/airflow-core/docs/troubleshooting.rst index f636b87a42c47..f354ea1a2ff6e 100644 --- a/airflow-core/docs/troubleshooting.rst +++ b/airflow-core/docs/troubleshooting.rst @@ -46,3 +46,13 @@ Here are some examples that could cause such an event: - A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition. - An Airflow worker running out of memory - Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens. + +Lingering task supervisor processes +----------------------------------- + +Under very high concurrency the socket handlers inside the task supervisor may +miss the final EOF events from the task process. When this occurs the supervisor +believes sockets are still open and will not exit. The +:ref:`workers.socket_cleanup_timeout ` option controls how long the supervisor +waits after the task finishes before force-closing any remaining sockets. If you +observe leftover ``supervisor`` processes, consider increasing this delay. diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 950844bcfee4a..ca1b1cf0eb12b 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1547,6 +1547,15 @@ workers: type: float example: ~ default: "90.0" + socket_cleanup_timeout: + description: | + Seconds to wait after a task process exits before forcibly closing any + remaining communication sockets. Missed EOF events can otherwise leave + the task supervisor running indefinitely. + version_added: 3.0.2 + type: float + example: ~ + default: "60.0" api_auth: description: Settings relating to authentication on the Airflow APIs options: diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 81fd945cdbc61..1006b861378d0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -124,6 +124,7 @@ MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval") MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats") +SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", "socket_cleanup_timeout") SERVER_TERMINATED = "SERVER_TERMINATED" @@ -351,6 +352,13 @@ def exit(n: int) -> NoReturn: sys.stderr.flush() with suppress(ValueError, OSError): last_chance_stderr.flush() + + # Explicitly close the child-end of our supervisor sockets so + # the parent sees EOF on both "requests" and "logs" channels. + with suppress(OSError): + os.close(log_fd) + with suppress(OSError): + os.close(child_stdin.fileno()) os._exit(n) if hasattr(atexit, "_clear"): @@ -423,6 +431,8 @@ class WatchedSubprocess: _num_open_sockets: int = 4 _exit_code: int | None = attrs.field(default=None, init=False) + _process_exit_monotonic: float | None = attrs.field(default=None, init=False) + _fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False) selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector, repr=False) @@ -507,6 +517,14 @@ def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: socke # alternatives are used automatically) -- this is a way of having "event-based" code, but without # needing full async, to read and process output from each socket as it is received. + # Track socket types for debugging + self._fd_to_socket_type = { + stdout.fileno(): "stdout", + stderr.fileno(): "stderr", + requests.fileno(): "requests", + logs.fileno(): "logs", + } + target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,) if self.subprocess_logs_to_stdout: target_loggers += (log,) @@ -593,6 +611,28 @@ def _close_unused_sockets(*sockets): sock._sock.close() sock.close() + def _cleanup_open_sockets(self): + """Force-close any sockets that never reported EOF.""" + # In extremely busy environments the selector can fail to deliver a + # final read event before the subprocess exits. Without closing these + # sockets the supervisor would wait forever thinking they are still + # active. This cleanup ensures we always release resources and exit. + stuck_sockets = [] + for key in list(self.selector.get_map().values()): + socket_type = self._fd_to_socket_type.get(key.fd, f"unknown-{key.fd}") + stuck_sockets.append(f"{socket_type}({key.fd})") + with suppress(Exception): + self.selector.unregister(key.fileobj) + with suppress(Exception): + key.fileobj.close() # type: ignore[union-attr] + + if stuck_sockets: + log.warning("Force-closed stuck sockets", pid=self.pid, sockets=stuck_sockets) + + self.selector.close() + self._close_unused_sockets(self.stdin) + self._num_open_sockets = 0 + def kill( self, signal_to_send: signal.Signals = signal.SIGINT, @@ -726,6 +766,7 @@ def _check_subprocess_exit( if raise_on_timeout: raise else: + self._process_exit_monotonic = time.monotonic() self._close_unused_sockets(self.stdin) # Put a message in the viewable task logs @@ -899,6 +940,18 @@ def _monitor_subprocess(self): # This listens for activity (e.g., subprocess output) on registered file objects alive = self._service_subprocess(max_wait_time=max_wait_time) is None + if self._exit_code is not None and self._num_open_sockets > 0: + if ( + self._process_exit_monotonic + and time.monotonic() - self._process_exit_monotonic > SOCKET_CLEANUP_TIMEOUT + ): + log.debug( + "Forcefully closing remaining sockets", + open_sockets=self._num_open_sockets, + pid=self.pid, + ) + self._cleanup_open_sockets() + if alive: # We don't need to heartbeat if the process has shutdown, as we are just finishing of reading the # logs diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index cb8040278c474..7d2e6b65fe917 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -26,6 +26,7 @@ import sys import time from collections.abc import Callable, Iterable, Iterator, Mapping +from contextlib import suppress from datetime import datetime, timezone from io import FileIO from itertools import product @@ -1297,6 +1298,12 @@ def main(): log = structlog.get_logger(logger_name="task") log.exception("Top level error") exit(1) + finally: + # Ensure the request socket is closed on the child side in all circumstances + # before the process fully terminates. + if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket: + with suppress(Exception): + SUPERVISOR_COMMS.request_socket.close() if __name__ == "__main__": diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 7861a9d599712..4696908bae72f 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -768,6 +768,44 @@ def subprocess_main(): } in cap_structlog assert rc == -signal_to_raise + @pytest.mark.execution_timeout(3) + def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, time_machine): + """Supervisor should close sockets if EOF events are missed.""" + + monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT", 1.0) + + mock_process = mocker.Mock(pid=12345) + + time_machine.move_to(time.monotonic(), tick=False) + + proc = ActivitySubprocess( + process_log=mocker.MagicMock(), + id=TI_ID, + pid=mock_process.pid, + stdin=mocker.MagicMock(), + client=mocker.MagicMock(), + process=mock_process, + requests_fd=-1, + ) + + proc.selector = mocker.MagicMock() + proc.selector.select.return_value = [] + + proc._exit_code = 0 + proc._num_open_sockets = 1 + proc._process_exit_monotonic = time.monotonic() + + mocker.patch.object( + ActivitySubprocess, + "_cleanup_open_sockets", + side_effect=lambda: setattr(proc, "_num_open_sockets", 0), + ) + + time_machine.shift(2) + + proc._monitor_subprocess() + assert proc._num_open_sockets == 0 + class TestWatchedSubprocessKill: @pytest.fixture From 33e85469976d7155fba37826ac817b2c4ff667b9 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 29 May 2025 13:08:57 +0530 Subject: [PATCH 2/3] Update config.yml Co-authored-by: Amogh Desai --- airflow-core/src/airflow/config_templates/config.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index ca1b1cf0eb12b..2cb22f7c7244a 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1549,9 +1549,9 @@ workers: default: "90.0" socket_cleanup_timeout: description: | - Seconds to wait after a task process exits before forcibly closing any - remaining communication sockets. Missed EOF events can otherwise leave - the task supervisor running indefinitely. + Number of seconds to wait after a task process exits before forcibly closing any + remaining communication sockets. This helps prevent the task supervisor from hanging + indefinitely due to missed EOF signals. version_added: 3.0.2 type: float example: ~ From d896bb6c16717d4fddae93355616f8f1cfadfd9f Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 29 May 2025 13:45:47 +0530 Subject: [PATCH 3/3] Update config.yml --- airflow-core/src/airflow/config_templates/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 2cb22f7c7244a..bda494eed5dcc 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1550,7 +1550,7 @@ workers: socket_cleanup_timeout: description: | Number of seconds to wait after a task process exits before forcibly closing any - remaining communication sockets. This helps prevent the task supervisor from hanging + remaining communication sockets. This helps prevent the task supervisor from hanging indefinitely due to missed EOF signals. version_added: 3.0.2 type: float