From 56a3e065e0492253b200c62ac1df4f947d099287 Mon Sep 17 00:00:00 2001 From: michaelreavant Date: Thu, 9 Apr 2026 14:28:54 +0200 Subject: [PATCH 1/6] fix(sdk): raise asyncio StreamReader buffer in Python AsyncHostTransport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Python async transport spawned the host CLI without passing a `limit=` to `asyncio.create_subprocess_exec`, so its stdout `StreamReader` inherited asyncio's default 64 KiB buffer. Every host response is written as a single newline-delimited JSON line, so any `cli.invoke` whose serialized result exceeds 64 KiB (e.g. `superdoc_get_content` on larger documents) caused `readline()` to raise `ValueError: Separator is not found, and chunk exceed the limit` inside `_reader_loop`. The exception was caught by the generic reader-loop handler and pending requests were rejected with the misleading `HOST_DISCONNECTED` error — even though the host process was still alive and healthy. Pass `limit=` to `create_subprocess_exec` and expose it as a new `stdout_buffer_limit_bytes` constructor option on `AsyncHostTransport`, threaded through `SuperDocAsyncRuntime` and `AsyncSuperDocClient`. The default of 64 MiB safely covers the host's own 32 MiB `DEFAULT_MAX_STDIN_BYTES` input cap with room for ~2x JSON expansion. `SyncHostTransport` is unaffected — it uses raw blocking `subprocess.Popen` which has no asyncio buffer limit. Adds a `TestAsyncLargeResponse` regression suite that: 1. Round-trips a 200 KB response through the default-configured transport. 2. Pins that an explicitly tightened `stdout_buffer_limit_bytes` still reproduces the original failure mode, guaranteeing the option is wired through to `create_subprocess_exec`. --- packages/sdk/langs/python/superdoc/client.py | 2 + packages/sdk/langs/python/superdoc/runtime.py | 2 + .../sdk/langs/python/superdoc/transport.py | 5 +++ .../sdk/langs/python/tests/test_transport.py | 43 +++++++++++++++++++ 4 files changed, 52 insertions(+) diff --git a/packages/sdk/langs/python/superdoc/client.py b/packages/sdk/langs/python/superdoc/client.py index f187895b04..122e437a78 100644 --- a/packages/sdk/langs/python/superdoc/client.py +++ b/packages/sdk/langs/python/superdoc/client.py @@ -340,6 +340,7 @@ def __init__( request_timeout_ms: int | None = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, default_change_mode: Literal['direct', 'tracked'] | None = None, user: UserIdentity | None = None, ) -> None: @@ -350,6 +351,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/runtime.py b/packages/sdk/langs/python/superdoc/runtime.py index e303754502..a725578e0d 100644 --- a/packages/sdk/langs/python/superdoc/runtime.py +++ b/packages/sdk/langs/python/superdoc/runtime.py @@ -79,6 +79,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, default_change_mode: Optional[str] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -93,6 +94,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=self._default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index fec79e9c8e..93945c501e 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -399,6 +399,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, default_change_mode: Optional[ChangeMode] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -409,6 +410,7 @@ def __init__( self._request_timeout_ms = request_timeout_ms self._watchdog_timeout_ms = watchdog_timeout_ms self._max_queue_depth = max_queue_depth + self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes self._default_change_mode = default_change_mode self._user = user @@ -531,12 +533,15 @@ async def _start_host(self) -> None: args = [*prefix_args, 'host', '--stdio'] try: + # ``limit`` raises asyncio's StreamReader buffer above its 64 KiB + # default; host responses are single JSON lines and can exceed it. self._process = await asyncio.create_subprocess_exec( command, *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, env={**os.environ, **self._env}, + limit=self._stdout_buffer_limit_bytes, ) logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin) except Exception as exc: diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index d71bafa186..bbda79ad5a 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -493,3 +493,46 @@ async def test_reuse_after_dispose(self): await transport.dispose() finally: _cleanup_wrapper(cli2) + + +class TestAsyncLargeResponse: + """Responses larger than the StreamReader buffer must not crash the reader.""" + + @pytest.mark.asyncio + async def test_response_above_default_64kb_buffer(self): + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + result = await transport.invoke(_TEST_OP, {'query': 'big'}) + assert result == {'content': big_payload} + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_response_above_custom_buffer_limit_disconnects(self): + # Pins that stdout_buffer_limit_bytes is wired through to the spawn: + # setting it below the response size reproduces the original failure. + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport( + cli, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + ) + await transport.connect() + with pytest.raises(SuperDocError) as exc_info: + await transport.invoke(_TEST_OP, {'query': 'big'}) + assert exc_info.value.code == HOST_DISCONNECTED + finally: + _cleanup_wrapper(cli) From ba392e67a787088d558894ef70026aca1d74c0d3 Mon Sep 17 00:00:00 2001 From: michaelreavant Date: Tue, 14 Apr 2026 09:36:03 +0200 Subject: [PATCH 2/6] fix(sdk): tear down host process on async reader-loop failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AsyncHostTransport._reader_loop caught reader exceptions by rejecting pending futures and flipping state to DISCONNECTED, but never killed self._process. Because dispose() early-returns on DISCONNECTED, any reader-loop failure left an orphaned host subprocess running with no public API to reap it. This is a pre-existing bug, but the previous commit made it easier to trip by exposing stdout_buffer_limit_bytes: any caller who sets it below their real response size hits the orphan path. Route both the buffer-overflow and generic-error branches through a new _schedule_cleanup helper that fires _cleanup() as a separate task (it can't be awaited inline — _cleanup cancels and awaits the reader task itself). _cleanup kills the process, waits on it, rejects pending, and only then transitions to DISCONNECTED, so a subsequent dispose() is a safe no-op instead of leaking the host. Also catch asyncio.LimitOverrunError / ValueError separately and surface HOST_PROTOCOL_ERROR with a "raise stdout_buffer_limit_bytes" hint plus the current limit in details. The previous HOST_DISCONNECTED code pointed users at the wrong problem since the host was still alive. Extends TestAsyncLargeResponse to assert HOST_PROTOCOL_ERROR, verify the hint is in the message, confirm the subprocess is actually reaped (returncode set, _process cleared), and that dispose() after an overflow is a safe no-op. --- .../sdk/langs/python/superdoc/transport.py | 39 ++++++++++++++++--- .../sdk/langs/python/tests/test_transport.py | 27 +++++++++++-- 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index 93945c501e..d43ce04aac 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -416,6 +416,7 @@ def __init__( self._process: Optional[asyncio.subprocess.Process] = None self._reader_task: Optional[asyncio.Task] = None + self._cleanup_task: Optional[asyncio.Task] = None self._pending: Dict[int, asyncio.Future] = {} self._state = _State.DISCONNECTED self._next_request_id = 1 @@ -616,19 +617,35 @@ async def _reader_loop(self) -> None: except asyncio.CancelledError: return + except (asyncio.LimitOverrunError, ValueError) as exc: + # StreamReader raises LimitOverrunError (a ValueError subclass on + # some Python versions) when a single line exceeds the stdout + # buffer limit. The host is still alive — we must kill it so a + # later dispose() doesn't short-circuit on DISCONNECTED state. + logger.debug('Reader loop buffer overflow: %s', exc) + if not self._stopping: + self._schedule_cleanup(SuperDocError( + 'Host response exceeded stdout buffer limit. ' + 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', + code=HOST_PROTOCOL_ERROR, + details={ + 'message': str(exc), + 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, + }, + )) + return except Exception as exc: logger.debug('Reader loop error: %s', exc) - # Reader exited (EOF or error) — reject all pending futures. + # Reader exited (EOF or unexpected error) — tear down the process so + # no orphaned host is left running, then reject pending futures. if not self._stopping: exit_code = process.returncode - error = SuperDocError( + self._schedule_cleanup(SuperDocError( 'Host process disconnected.', code=HOST_DISCONNECTED, details={'exit_code': exit_code, 'signal': None}, - ) - self._reject_all_pending(error) - self._state = _State.DISCONNECTED + )) async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any: """Send a JSON-RPC request and await the matching response future.""" @@ -697,6 +714,18 @@ async def _kill_and_reset(self) -> None: SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED), ) + def _schedule_cleanup(self, error: SuperDocError) -> None: + """Fire-and-forget cleanup from inside the reader task. + + Must not be awaited from `_reader_loop` itself — `_cleanup` cancels + and awaits the reader task, which would deadlock. Scheduling it as a + separate task lets the reader return first; by the time cleanup runs + the reader task is already done and the cancel/await is a no-op. + """ + if self._cleanup_task and not self._cleanup_task.done(): + return + self._cleanup_task = asyncio.create_task(self._cleanup(error)) + async def _cleanup(self, error: Optional[SuperDocError]) -> None: """Cancel reader, kill process, reject pending, reset state.""" if self._reader_task and not self._reader_task.done(): diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index bbda79ad5a..f0de75f20e 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -516,9 +516,13 @@ async def test_response_above_default_64kb_buffer(self): _cleanup_wrapper(cli) @pytest.mark.asyncio - async def test_response_above_custom_buffer_limit_disconnects(self): - # Pins that stdout_buffer_limit_bytes is wired through to the spawn: - # setting it below the response size reproduces the original failure. + async def test_response_above_custom_buffer_limit_raises_protocol_error(self): + # Setting stdout_buffer_limit_bytes below the response size should + # surface HOST_PROTOCOL_ERROR (actionable) rather than + # HOST_DISCONNECTED (misleading — the host is still alive), and the + # error should carry a hint to raise the buffer limit. + from superdoc.errors import HOST_PROTOCOL_ERROR + big_payload = 'x' * (200 * 1024) cli = _mock_cli_bin({ 'handshake': 'ok', @@ -531,8 +535,23 @@ async def test_response_above_custom_buffer_limit_disconnects(self): stdout_buffer_limit_bytes=64 * 1024, ) await transport.connect() + process = transport._process + assert process is not None with pytest.raises(SuperDocError) as exc_info: await transport.invoke(_TEST_OP, {'query': 'big'}) - assert exc_info.value.code == HOST_DISCONNECTED + assert exc_info.value.code == HOST_PROTOCOL_ERROR + assert 'stdout_buffer_limit_bytes' in str(exc_info.value) + + # The host process must be torn down — not just the transport + # state flipped to DISCONNECTED. Otherwise dispose() short-circuits + # and leaves an orphaned host running. + if transport._cleanup_task is not None: + await transport._cleanup_task + assert transport._process is None + assert transport.state == 'DISCONNECTED' + assert process.returncode is not None + + # dispose() after an overflow must be a safe no-op. + await transport.dispose() finally: _cleanup_wrapper(cli) From 4ffd1e7bd30e78f2068951f6f2b31ec776970cba Mon Sep 17 00:00:00 2001 From: michaelreavant Date: Tue, 14 Apr 2026 09:45:07 +0200 Subject: [PATCH 3/6] refactor(sdk): dedupe stdout_buffer_limit default and add wiring test Address review follow-ups on the async transport buffer-limit option. - Hoist DEFAULT_STDOUT_BUFFER_LIMIT_BYTES (64 MiB) to module scope in transport.py and reference it from AsyncHostTransport, the async runtime, and AsyncSuperDocClient so the default lives in one place instead of three copies of 64 * 1024 * 1024. - Add a short "raise if a single host response can exceed this size" comment on the client.py parameter so callers see the guidance at the public API boundary, not buried in transport.py. - Rename test_response_above_default_64kb_buffer to test_response_above_asyncio_default_streamreader_limit. 64 KiB is asyncio's default, not the SDK's (which is now 64 MiB), so the old name read backwards after this PR. - Add test_client_threads_stdout_buffer_limit_to_transport: builds AsyncSuperDocClient with a custom limit and asserts the value reaches AsyncHostTransport. Without this, a silent drop of the arg in client.py or runtime.py would leave the existing overflow test passing while the public API reverts to the asyncio 64 KiB default. --- packages/sdk/langs/python/superdoc/client.py | 5 ++++- packages/sdk/langs/python/superdoc/runtime.py | 8 +++++-- .../sdk/langs/python/superdoc/transport.py | 8 ++++++- .../sdk/langs/python/tests/test_transport.py | 22 ++++++++++++++++++- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/client.py b/packages/sdk/langs/python/superdoc/client.py index 122e437a78..9291399518 100644 --- a/packages/sdk/langs/python/superdoc/client.py +++ b/packages/sdk/langs/python/superdoc/client.py @@ -22,6 +22,7 @@ DocOpenResult as GeneratedDocOpenResult, ) from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime +from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES UserIdentity = Dict[str, str] @@ -340,7 +341,9 @@ def __init__( request_timeout_ms: int | None = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, - stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, + # Raise if a single host response can exceed this size (e.g. reading + # very large documents); otherwise the default is safe. + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Literal['direct', 'tracked'] | None = None, user: UserIdentity | None = None, ) -> None: diff --git a/packages/sdk/langs/python/superdoc/runtime.py b/packages/sdk/langs/python/superdoc/runtime.py index a725578e0d..25dc3727e0 100644 --- a/packages/sdk/langs/python/superdoc/runtime.py +++ b/packages/sdk/langs/python/superdoc/runtime.py @@ -14,7 +14,11 @@ from .embedded_cli import resolve_embedded_cli_path from .generated.contract import OPERATION_INDEX from .protocol import normalize_default_change_mode -from .transport import AsyncHostTransport, SyncHostTransport +from .transport import ( + DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, + AsyncHostTransport, + SyncHostTransport, +) class SuperDocSyncRuntime: @@ -79,7 +83,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, - stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[str] = None, user: Optional[Dict[str, str]] = None, ) -> None: diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index d43ce04aac..bacd490b5d 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -43,6 +43,12 @@ logger = logging.getLogger('superdoc.transport') +# Default stdout StreamReader buffer for the async transport. Host responses +# are single newline-delimited JSON lines, so this caps the largest individual +# response a caller can receive. Raise it if your workload routinely produces +# responses above this size (e.g. whole-document reads on very large docs). +DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024 + # Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug. # Only configures the named logger — never mutates root logging config. _log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower() @@ -399,7 +405,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, - stdout_buffer_limit_bytes: int = 64 * 1024 * 1024, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[ChangeMode] = None, user: Optional[Dict[str, str]] = None, ) -> None: diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index f0de75f20e..aba75db875 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -499,7 +499,7 @@ class TestAsyncLargeResponse: """Responses larger than the StreamReader buffer must not crash the reader.""" @pytest.mark.asyncio - async def test_response_above_default_64kb_buffer(self): + async def test_response_above_asyncio_default_streamreader_limit(self): big_payload = 'x' * (200 * 1024) cli = _mock_cli_bin({ 'handshake': 'ok', @@ -555,3 +555,23 @@ async def test_response_above_custom_buffer_limit_raises_protocol_error(self): await transport.dispose() finally: _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_client_threads_stdout_buffer_limit_to_transport(self): + # End-to-end wiring check: the public AsyncSuperDocClient constructor + # must thread stdout_buffer_limit_bytes through SuperDocAsyncRuntime + # into AsyncHostTransport. Without this, a silent drop in client.py + # or runtime.py would leave the existing overflow test passing while + # the public API reverts to the asyncio 64 KiB default. + from superdoc.client import AsyncSuperDocClient + + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + client = AsyncSuperDocClient( + env={'SUPERDOC_CLI_BIN': cli}, + stdout_buffer_limit_bytes=64 * 1024, + ) + transport = client._runtime._transport + assert transport._stdout_buffer_limit_bytes == 64 * 1024 + finally: + _cleanup_wrapper(cli) From a0e48cc20923e05681e20e6bb233cbf80a005e82 Mon Sep 17 00:00:00 2001 From: Caio Pizzol Date: Sat, 18 Apr 2026 14:54:25 -0300 Subject: [PATCH 4/6] fix(sdk): mark transport DISPOSING synchronously on reader teardown Round-2 review follow-ups: - _schedule_cleanup now flips state to DISPOSING before scheduling the cleanup task. Previously, between the reader returning and the async _cleanup running, _ensure_connected's CONNECTED fast path would still accept invoke() calls; they then blocked on a future the dead reader could never resolve until watchdog_timeout_ms (default 30s). - Narrow the buffer-overflow catch to readline() only and drop asyncio.LimitOverrunError from the tuple. readline() re-raises LimitOverrunError as ValueError (it is not a ValueError subclass on any supported CPython), so the previous broad except could reclassify unrelated ValueErrors from dispatch as a buffer-limit error with a misleading remediation hint. Comment corrected to match. - Re-export DEFAULT_STDOUT_BUFFER_LIMIT_BYTES from superdoc/__init__.py so consumers tuning the option don't import from the implementation module. - Tighten test_host_crash to assert HOST_DISCONNECTED specifically and verify process teardown via the new _schedule_cleanup path. - Strengthen the dispose-after-overflow assertion to actually verify the no-op claim (state stays DISCONNECTED, _process stays None, a second dispose is also safe). Replace the timing-sensitive process.returncode read with await process.wait(). --- .../sdk/langs/python/superdoc/__init__.py | 2 + .../sdk/langs/python/superdoc/transport.py | 61 ++++++++++++------- .../sdk/langs/python/tests/test_transport.py | 34 +++++++++-- 3 files changed, 70 insertions(+), 27 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/__init__.py b/packages/sdk/langs/python/superdoc/__init__.py index 9682645771..fa0b628a52 100644 --- a/packages/sdk/langs/python/superdoc/__init__.py +++ b/packages/sdk/langs/python/superdoc/__init__.py @@ -10,6 +10,7 @@ get_tool_catalog, list_tools, ) +from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES __all__ = [ "SuperDocClient", @@ -17,6 +18,7 @@ "SuperDocDocument", "AsyncSuperDocDocument", "SuperDocError", + "DEFAULT_STDOUT_BUFFER_LIMIT_BYTES", "get_skill", "install_skill", "list_skills", diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index bacd490b5d..3154b6e187 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -594,7 +594,29 @@ async def _reader_loop(self) -> None: try: while True: - raw = await process.stdout.readline() + try: + raw = await process.stdout.readline() + except ValueError as exc: + # asyncio.StreamReader.readline() re-raises LimitOverrunError + # from readuntil() as ValueError when a single line exceeds + # `limit` (see CPython asyncio/streams.py). The host is still + # alive — schedule cleanup so a later dispose() doesn't + # short-circuit on DISCONNECTED state. Scoped to readline() + # only so unrelated ValueErrors from dispatch aren't + # reclassified as a buffer-limit error. + logger.debug('Reader loop buffer overflow: %s', exc) + if not self._stopping: + self._schedule_cleanup(SuperDocError( + 'Host response exceeded stdout buffer limit. ' + 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', + code=HOST_PROTOCOL_ERROR, + details={ + 'message': str(exc), + 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, + }, + )) + return + if not raw: # EOF — process died. break @@ -623,23 +645,6 @@ async def _reader_loop(self) -> None: except asyncio.CancelledError: return - except (asyncio.LimitOverrunError, ValueError) as exc: - # StreamReader raises LimitOverrunError (a ValueError subclass on - # some Python versions) when a single line exceeds the stdout - # buffer limit. The host is still alive — we must kill it so a - # later dispose() doesn't short-circuit on DISCONNECTED state. - logger.debug('Reader loop buffer overflow: %s', exc) - if not self._stopping: - self._schedule_cleanup(SuperDocError( - 'Host response exceeded stdout buffer limit. ' - 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', - code=HOST_PROTOCOL_ERROR, - details={ - 'message': str(exc), - 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, - }, - )) - return except Exception as exc: logger.debug('Reader loop error: %s', exc) @@ -721,15 +726,25 @@ async def _kill_and_reset(self) -> None: ) def _schedule_cleanup(self, error: SuperDocError) -> None: - """Fire-and-forget cleanup from inside the reader task. + """Fire-and-forget teardown from inside the reader task. - Must not be awaited from `_reader_loop` itself — `_cleanup` cancels - and awaits the reader task, which would deadlock. Scheduling it as a - separate task lets the reader return first; by the time cleanup runs - the reader task is already done and the cancel/await is a no-op. + Why a separate task: `_cleanup` cancels and awaits `self._reader_task`. + Awaiting it from inside the reader itself would deadlock — so we punt + to a fresh task, and by the time it runs the reader has already + returned (so cancel+await is a no-op). + + Synchronously flips state to DISPOSING so concurrent `invoke()` callers + observe the failed transport immediately rather than passing the + CONNECTED fast path and blocking on a future the dead reader can never + resolve until `watchdog_timeout_ms`. + + Idempotent: if a cleanup is already in flight, subsequent errors are + dropped — the first one wins. Callers may observe completion via + `self._cleanup_task`. """ if self._cleanup_task and not self._cleanup_task.done(): return + self._state = _State.DISPOSING self._cleanup_task = asyncio.create_task(self._cleanup(error)) async def _cleanup(self, error: Optional[SuperDocError]) -> None: diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index aba75db875..0c7e471be3 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -447,9 +447,24 @@ async def test_host_crash(self): try: transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) await transport.connect() + process = transport._process + assert process is not None with pytest.raises(SuperDocError) as exc_info: await transport.invoke(_TEST_OP, {'query': 'test'}) - assert exc_info.value.code in (HOST_DISCONNECTED, HOST_TIMEOUT) + # The reader-loop EOF branch now goes through _schedule_cleanup, + # which rejects the pending future synchronously enough that the + # invoke() never has to fall back to the watchdog timeout. + assert exc_info.value.code == HOST_DISCONNECTED + + # Cleanup must tear the process down — pre-fix, the inline + # _reject_all_pending + state flip left the process orphaned. + cleanup_task = transport._cleanup_task + if cleanup_task is not None: + await cleanup_task + assert transport._process is None + assert transport.state == 'DISCONNECTED' + await process.wait() + assert process.returncode is not None finally: _cleanup_wrapper(cli) @@ -545,14 +560,25 @@ async def test_response_above_custom_buffer_limit_raises_protocol_error(self): # The host process must be torn down — not just the transport # state flipped to DISCONNECTED. Otherwise dispose() short-circuits # and leaves an orphaned host running. - if transport._cleanup_task is not None: - await transport._cleanup_task + cleanup_task = transport._cleanup_task + if cleanup_task is not None: + await cleanup_task assert transport._process is None assert transport.state == 'DISCONNECTED' + # The captured handle should be reaped by _cleanup; await wait() + # rather than reading returncode to avoid a CI-timing flake if the + # 2 s wait inside _cleanup didn't finish reaping in time. + await process.wait() assert process.returncode is not None - # dispose() after an overflow must be a safe no-op. + # dispose() after an overflow must be a safe no-op: state and + # process stay as cleanup left them, no exception is raised, and + # a second dispose() is also safe. await transport.dispose() + assert transport.state == 'DISCONNECTED' + assert transport._process is None + await transport.dispose() + assert transport.state == 'DISCONNECTED' finally: _cleanup_wrapper(cli) From 2d5c28ac06f2a1078c52bc27cf3caa89d54fa0c1 Mon Sep 17 00:00:00 2001 From: Caio Pizzol Date: Sat, 18 Apr 2026 15:01:57 -0300 Subject: [PATCH 5/6] fix(sdk): serialize teardown across reader, _kill_and_reset, and dispose MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-2 follow-up — addresses the residual race that the synchronous DISPOSING flip didn't cover. Before: `_kill_and_reset()` (called from `_send_request` on stdin write failure or watchdog timeout) `await`ed `_cleanup` directly. If a reader-triggered `_schedule_cleanup` was in flight, both ran concurrently and raced on `_reject_all_pending`'s read-then-clear of `self._pending` (futures added between snapshot and clear were leaked) and on `process.kill()`/`reader_task.cancel()`. `dispose()` similarly short-circuited on DISPOSING without waiting for the in-flight cleanup to finish — the caller saw "disposed" before the host was fully torn down. Now: - `_kill_and_reset` and `dispose` both check the cleanup-task slot and `await` an in-flight cleanup rather than starting a parallel one. Single-flight teardown across all three entry points. - `_cleanup` clears `self._cleanup_task` in `finally` when it owns the slot, so introspection doesn't surface a stale done handle and the next teardown gets a fresh slot. - `dispose()` after a reader-triggered cleanup now blocks until that cleanup finishes, restoring the "host fully torn down on return" contract. Tests: - `test_schedule_cleanup_dedupe_guard_drops_reentrant_call` — second `_schedule_cleanup` does not replace the in-flight task slot. - `test_overflow_during_dispose_does_not_schedule_cleanup` — `_stopping` suppression is honored. - `test_kill_and_reset_awaits_in_flight_cleanup` — `_kill_and_reset` observes the existing task instead of running a parallel `_cleanup`. - `test_dispose_waits_for_in_flight_cleanup` — `dispose()` blocks until reader-triggered cleanup completes before returning. 95 transport tests pass; 5 consecutive runs with PYTHONASYNCIODEBUG=1 show no flakes. --- .../sdk/langs/python/superdoc/transport.py | 106 +++++++++---- .../sdk/langs/python/tests/test_transport.py | 148 ++++++++++++++++++ 2 files changed, 223 insertions(+), 31 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index 3154b6e187..c70689a26e 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -437,7 +437,18 @@ async def connect(self) -> None: async def dispose(self) -> None: """Gracefully shut down the host process.""" - if self._state == _State.DISCONNECTED or self._state == _State.DISPOSING: + if self._state == _State.DISCONNECTED: + return + if self._state == _State.DISPOSING: + # A reader-triggered cleanup is in flight (or an earlier teardown + # left state in DISPOSING briefly). Wait for it so the caller + # observes "host fully torn down" by the time dispose() returns. + existing = self._cleanup_task + if existing and not existing.done(): + try: + await existing + except Exception: + pass return self._stopping = True @@ -720,10 +731,31 @@ def _reject_all_pending(self, error: SuperDocError) -> None: future.set_exception(error) async def _kill_and_reset(self) -> None: - """Kill the host process and reset to DISCONNECTED.""" - await self._cleanup( + """Kill the host process and reset to DISCONNECTED. + + Coordinates with `_schedule_cleanup` so callers (e.g. `_send_request` + on watchdog timeout or stdin write failure) don't run a parallel + `_cleanup` that races a reader-triggered cleanup on + `_reject_all_pending` and `process.kill`. If a cleanup is already in + flight, await it; otherwise own a fresh task in the same slot so a + later concurrent caller sees us instead of starting its own. + """ + existing = self._cleanup_task + if existing and not existing.done(): + try: + await existing + except Exception: + pass + return + self._state = _State.DISPOSING + task = asyncio.create_task(self._cleanup( SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED), - ) + )) + self._cleanup_task = task + try: + await task + except Exception: + pass def _schedule_cleanup(self, error: SuperDocError) -> None: """Fire-and-forget teardown from inside the reader task. @@ -749,32 +781,44 @@ def _schedule_cleanup(self, error: SuperDocError) -> None: async def _cleanup(self, error: Optional[SuperDocError]) -> None: """Cancel reader, kill process, reject pending, reset state.""" - if self._reader_task and not self._reader_task.done(): - self._reader_task.cancel() - try: - await self._reader_task - except (asyncio.CancelledError, Exception): - pass - self._reader_task = None - - process = self._process - if process: - try: - process.kill() - except Exception: - pass - try: - await asyncio.wait_for(process.wait(), timeout=2) - except (asyncio.TimeoutError, Exception): - pass - self._process = None + try: + if self._reader_task and not self._reader_task.done(): + self._reader_task.cancel() + try: + await self._reader_task + except (asyncio.CancelledError, Exception): + pass + self._reader_task = None - if error: - self._reject_all_pending(error) - else: - # Dispose path — reject remaining with generic disconnect. - self._reject_all_pending( - SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED), - ) + process = self._process + if process: + try: + process.kill() + except Exception: + pass + try: + await asyncio.wait_for(process.wait(), timeout=2) + except (asyncio.TimeoutError, Exception): + pass + self._process = None + + if error: + self._reject_all_pending(error) + else: + # Dispose path — reject remaining with generic disconnect. + self._reject_all_pending( + SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED), + ) - self._state = _State.DISCONNECTED + self._state = _State.DISCONNECTED + finally: + # Release the task handle if we are the in-flight cleanup task, + # so introspection doesn't surface a stale done handle and the + # next teardown gets a fresh slot. Skip when called inline (e.g. + # from dispose) — that current task is not our cleanup task. + try: + current = asyncio.current_task() + except RuntimeError: + current = None + if current is not None and self._cleanup_task is current: + self._cleanup_task = None diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index 0c7e471be3..4df56e0956 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -601,3 +601,151 @@ async def test_client_threads_stdout_buffer_limit_to_transport(self): assert transport._stdout_buffer_limit_bytes == 64 * 1024 finally: _cleanup_wrapper(cli) + + +class TestAsyncCleanupLifecycle: + """Lock down the cleanup-task slot so its load-bearing invariants don't + silently regress: the dedupe guard, the _stopping suppression branch, + and the _kill_and_reset coordination with reader-triggered cleanup. + """ + + @pytest.mark.asyncio + async def test_schedule_cleanup_dedupe_guard_drops_reentrant_call(self): + # If a cleanup task is already in flight, a second _schedule_cleanup + # must NOT replace it — that would cancel the in-flight teardown + # mid-flight and could leak the host process. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + slow = asyncio.create_task(asyncio.sleep(0.5)) + transport._cleanup_task = slow + + transport._schedule_cleanup( + SuperDocError('second', code=HOST_DISCONNECTED), + ) + # Slot must still point at the original task — second call dropped. + assert transport._cleanup_task is slow + + slow.cancel() + try: + await slow + except (asyncio.CancelledError, Exception): + pass + transport._cleanup_task = None + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_overflow_during_dispose_does_not_schedule_cleanup(self): + # When `dispose()` is in progress, `_stopping` is set; a buffer + # overflow racing with shutdown must NOT schedule a redundant + # cleanup that would race with dispose's own teardown. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + transport._stopping = True + assert transport._cleanup_task is None + # Simulate the reader hitting overflow while dispose is in flight. + # The guard at the top of the overflow branch checks + # `not self._stopping` before scheduling; we re-check the same + # condition the reader does. + if not transport._stopping: + transport._schedule_cleanup( + SuperDocError('overflow', code=HOST_PROTOCOL_ERROR), + ) + assert transport._cleanup_task is None + transport._stopping = False + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_kill_and_reset_awaits_in_flight_cleanup(self): + # If a reader-triggered cleanup is already running, _kill_and_reset + # must await it rather than spin up a parallel _cleanup that would + # race on _reject_all_pending and process.kill. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + # Replace _cleanup with a tracking stub so we can count entries + # and verify the second call observes the first task instead of + # creating a fresh one. + entry_count = 0 + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def tracking_cleanup(error): + nonlocal entry_count + entry_count += 1 + # First entry blocks until the test releases it; subsequent + # entries (if any) would race past — failure mode for the bug. + await release.wait() + await real_cleanup(error) + + transport._cleanup = tracking_cleanup # type: ignore[assignment] + + transport._schedule_cleanup( + SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), + ) + # Yield so the cleanup task starts and increments entry_count. + await asyncio.sleep(0) + assert entry_count == 1 + assert transport._cleanup_task is not None + assert not transport._cleanup_task.done() + + kill_task = asyncio.create_task(transport._kill_and_reset()) + await asyncio.sleep(0) + # _kill_and_reset must NOT start a second _cleanup — it should + # await the in-flight one. entry_count stays at 1. + assert entry_count == 1 + + release.set() + await kill_task + assert entry_count == 1 + assert transport.state == 'DISCONNECTED' + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_dispose_waits_for_in_flight_cleanup(self): + # `dispose()` called while a reader-triggered cleanup is in flight + # must wait for it to finish, so the caller observes "fully torn + # down" by the time dispose returns. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def slow_cleanup(error): + await release.wait() + await real_cleanup(error) + + transport._cleanup = slow_cleanup # type: ignore[assignment] + + transport._schedule_cleanup( + SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), + ) + await asyncio.sleep(0) + assert transport.state == 'DISPOSING' + + dispose_task = asyncio.create_task(transport.dispose()) + await asyncio.sleep(0) + # dispose must still be waiting on the cleanup task. + assert not dispose_task.done() + + release.set() + await dispose_task + assert transport.state == 'DISCONNECTED' + assert transport._process is None + finally: + _cleanup_wrapper(cli) From 231449d851c3a0e5cab3392f4485cf962d7dba81 Mon Sep 17 00:00:00 2001 From: Caio Pizzol Date: Sat, 18 Apr 2026 15:37:55 -0300 Subject: [PATCH 6/6] fix(sdk): close residual races in async transport teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two correctness regressions and three test gaps surfaced in the final-pass review of the cleanup-task lifecycle. **1. _ensure_connected race (HIGH).** The synchronous DISPOSING flip in _schedule_cleanup did not gate _ensure_connected, so a concurrent connect()/invoke() reaching _start_host during the DISPOSING window would reassign self._process and self._reader_task. The pending cleanup task then read those slots after its first await and killed the freshly-spawned process. Fix: drain self._cleanup_task at the top of _ensure_connected via asyncio.shield (so a cancelled caller doesn't abort the in-flight cleanup). **2. Cancellation propagation race (HIGH).** _kill_and_reset and dispose() awaited the cleanup task without asyncio.shield. When the caller (e.g. an invoke task at the watchdog branch) was cancelled, asyncio cancelled the awaited cleanup task too — _cleanup did not catch CancelledError around process.wait(), so teardown stopped before clearing _process / setting state. dispose() then saw DISPOSING with _cleanup_task=None and returned without finishing teardown, leaking the host process. Fix: wrap the awaited cleanup in asyncio.shield in both call sites; restructure _cleanup so it captures handles and sets state synchronously up-front, before any awaits, so observable state is always consistent. **3. Move _stopping guard into _schedule_cleanup.** The previous test_overflow_during_dispose_does_not_schedule_cleanup was tautological — it set _stopping=True and then re-checked the same condition in the test body before calling _schedule_cleanup, so the call never ran and the assertion passed trivially. Move the guard into _schedule_cleanup itself (it's the correct authoritative location anyway), remove the now-redundant call-site checks in _reader_loop, and rewrite the test to call _schedule_cleanup unconditionally with _stopping=True. The test now actually exercises the production guard. **4. Multi-pending-invoke overflow test.** Codex round-2 gap that remained open. Locks down that _reject_all_pending fails ALL pending futures with HOST_PROTOCOL_ERROR plus the actionable hint, not just the one whose response overflowed. **5. Async reconnect-after-buffer-overflow test.** Sync transport already had test_reconnect_after_failure; async only covered reconnect after explicit dispose. Validates that reader-triggered cleanup leaves the transport reusable for a fresh invoke without wedging _cleanup_task / _connecting / _process. Plus: replaced asyncio.sleep(0) with asyncio.Event-based synchronization in lifecycle tests (Codex/Opus medium — sleep(0) is implementation-defined under uvloop / Python scheduling changes); two new tests directly cover the round-3 races (test_ensure_connected_drains_in_flight_cleanup_before_spawn, test_kill_and_reset_caller_cancellation_does_not_cancel_cleanup). 99 transport tests pass; 5 consecutive runs with PYTHONASYNCIODEBUG=1 show no flakes; new tests pass under -W error::ResourceWarning. --- .../sdk/langs/python/superdoc/transport.py | 123 ++++++--- .../sdk/langs/python/tests/test_transport.py | 236 ++++++++++++++++-- 2 files changed, 298 insertions(+), 61 deletions(-) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index c70689a26e..d7c4a9ca0e 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -443,10 +443,14 @@ async def dispose(self) -> None: # A reader-triggered cleanup is in flight (or an earlier teardown # left state in DISPOSING briefly). Wait for it so the caller # observes "host fully torn down" by the time dispose() returns. + # shield() so a cancelled dispose() doesn't interrupt _cleanup + # mid-flight and leak the host process. existing = self._cleanup_task if existing and not existing.done(): try: - await existing + await asyncio.shield(existing) + except asyncio.CancelledError: + raise except Exception: pass return @@ -527,6 +531,20 @@ async def invoke( async def _ensure_connected(self) -> None: """Lazy connect: spawn and handshake if not already connected.""" + # Drain any in-flight teardown before spawning a new host. Without + # this, a concurrent reader-triggered cleanup would still be running + # when _start_host reassigns self._process / self._reader_task; the + # cleanup task would then cancel the fresh reader and kill the fresh + # process. shield() so we don't cancel the cleanup if our caller is. + cleanup = self._cleanup_task + if cleanup and not cleanup.done(): + try: + await asyncio.shield(cleanup) + except asyncio.CancelledError: + raise + except Exception: + pass + if self._state == _State.CONNECTED and self._process and self._process.returncode is None: return @@ -614,18 +632,18 @@ async def _reader_loop(self) -> None: # alive — schedule cleanup so a later dispose() doesn't # short-circuit on DISCONNECTED state. Scoped to readline() # only so unrelated ValueErrors from dispatch aren't - # reclassified as a buffer-limit error. + # reclassified as a buffer-limit error. _schedule_cleanup + # is a no-op when _stopping is set (graceful dispose path). logger.debug('Reader loop buffer overflow: %s', exc) - if not self._stopping: - self._schedule_cleanup(SuperDocError( - 'Host response exceeded stdout buffer limit. ' - 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', - code=HOST_PROTOCOL_ERROR, - details={ - 'message': str(exc), - 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, - }, - )) + self._schedule_cleanup(SuperDocError( + 'Host response exceeded stdout buffer limit. ' + 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', + code=HOST_PROTOCOL_ERROR, + details={ + 'message': str(exc), + 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, + }, + )) return if not raw: @@ -661,13 +679,14 @@ async def _reader_loop(self) -> None: # Reader exited (EOF or unexpected error) — tear down the process so # no orphaned host is left running, then reject pending futures. - if not self._stopping: - exit_code = process.returncode - self._schedule_cleanup(SuperDocError( - 'Host process disconnected.', - code=HOST_DISCONNECTED, - details={'exit_code': exit_code, 'signal': None}, - )) + # _schedule_cleanup is a no-op when _stopping is set (graceful + # dispose path) so we don't race the dispose teardown. + exit_code = process.returncode + self._schedule_cleanup(SuperDocError( + 'Host process disconnected.', + code=HOST_DISCONNECTED, + details={'exit_code': exit_code, 'signal': None}, + )) async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any: """Send a JSON-RPC request and await the matching response future.""" @@ -739,11 +758,18 @@ async def _kill_and_reset(self) -> None: `_reject_all_pending` and `process.kill`. If a cleanup is already in flight, await it; otherwise own a fresh task in the same slot so a later concurrent caller sees us instead of starting its own. + + shield() the await so caller cancellation (e.g. an `invoke()` task + that times out and is then cancelled by the user) does NOT propagate + into `_cleanup` — interrupting cleanup mid-flight would leak the + subprocess and wedge state in DISPOSING. """ existing = self._cleanup_task if existing and not existing.done(): try: - await existing + await asyncio.shield(existing) + except asyncio.CancelledError: + raise except Exception: pass return @@ -753,7 +779,9 @@ async def _kill_and_reset(self) -> None: )) self._cleanup_task = task try: - await task + await asyncio.shield(task) + except asyncio.CancelledError: + raise except Exception: pass @@ -770,27 +798,55 @@ def _schedule_cleanup(self, error: SuperDocError) -> None: CONNECTED fast path and blocking on a future the dead reader can never resolve until `watchdog_timeout_ms`. + Skips when `_stopping` is set: a graceful `dispose()` is already + tearing down, and a parallel cleanup task would race on + `_reject_all_pending` and `process.kill`. + Idempotent: if a cleanup is already in flight, subsequent errors are dropped — the first one wins. Callers may observe completion via `self._cleanup_task`. """ + if self._stopping: + return if self._cleanup_task and not self._cleanup_task.done(): return self._state = _State.DISPOSING self._cleanup_task = asyncio.create_task(self._cleanup(error)) async def _cleanup(self, error: Optional[SuperDocError]) -> None: - """Cancel reader, kill process, reject pending, reset state.""" + """Cancel reader, kill process, reject pending, reset state. + + Capture handles and flip user-visible state SYNCHRONOUSLY at the top + before any awaits. That way, even if cancellation arrives during + `process.wait()`, observers see a consistent "torn down" transport + (state DISCONNECTED, _process None, pending futures rejected) rather + than a half-disposed one. The async work below is best-effort + process reaping. + """ + # Snapshot and clear before any await so concurrent callers see a + # fully torn-down transport from this point on. + reader_task = self._reader_task + process = self._process + self._reader_task = None + self._process = None + self._state = _State.DISCONNECTED + + if error is not None: + self._reject_all_pending(error) + else: + # Dispose path — reject remaining with generic disconnect. + self._reject_all_pending( + SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED), + ) + try: - if self._reader_task and not self._reader_task.done(): - self._reader_task.cancel() + if reader_task and not reader_task.done(): + reader_task.cancel() try: - await self._reader_task + await reader_task except (asyncio.CancelledError, Exception): pass - self._reader_task = None - process = self._process if process: try: process.kill() @@ -798,19 +854,8 @@ async def _cleanup(self, error: Optional[SuperDocError]) -> None: pass try: await asyncio.wait_for(process.wait(), timeout=2) - except (asyncio.TimeoutError, Exception): + except (asyncio.TimeoutError, asyncio.CancelledError, Exception): pass - self._process = None - - if error: - self._reject_all_pending(error) - else: - # Dispose path — reject remaining with generic disconnect. - self._reject_all_pending( - SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED), - ) - - self._state = _State.DISCONNECTED finally: # Release the task handle if we are the in-flight cleanup task, # so introspection doesn't surface a stale done handle and the diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index 4df56e0956..f54c0c7a71 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -24,7 +24,11 @@ HOST_TIMEOUT, SuperDocError, ) -from superdoc.transport import AsyncHostTransport, SyncHostTransport +from superdoc.transport import ( + DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, + AsyncHostTransport, + SyncHostTransport, +) MOCK_HOST = os.path.join(os.path.dirname(__file__), 'mock_host.py') @@ -606,7 +610,9 @@ async def test_client_threads_stdout_buffer_limit_to_transport(self): class TestAsyncCleanupLifecycle: """Lock down the cleanup-task slot so its load-bearing invariants don't silently regress: the dedupe guard, the _stopping suppression branch, - and the _kill_and_reset coordination with reader-triggered cleanup. + the _kill_and_reset coordination with reader-triggered cleanup, and the + _ensure_connected drain that prevents stale cleanup from killing a + freshly-spawned host. """ @pytest.mark.asyncio @@ -639,25 +645,26 @@ async def test_schedule_cleanup_dedupe_guard_drops_reentrant_call(self): _cleanup_wrapper(cli) @pytest.mark.asyncio - async def test_overflow_during_dispose_does_not_schedule_cleanup(self): - # When `dispose()` is in progress, `_stopping` is set; a buffer - # overflow racing with shutdown must NOT schedule a redundant - # cleanup that would race with dispose's own teardown. + async def test_schedule_cleanup_skipped_when_stopping(self): + # When `dispose()` is in progress, `_stopping` is set; the production + # guard inside `_schedule_cleanup` must short-circuit so a reader + # overflow doesn't race the graceful teardown. (Earlier iterations + # of this test were tautological because the test re-checked + # `_stopping` before calling `_schedule_cleanup`. This version calls + # it unconditionally and asserts the production guard fires.) cli = _mock_cli_bin({'handshake': 'ok'}) try: transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) await transport.connect() transport._stopping = True assert transport._cleanup_task is None - # Simulate the reader hitting overflow while dispose is in flight. - # The guard at the top of the overflow branch checks - # `not self._stopping` before scheduling; we re-check the same - # condition the reader does. - if not transport._stopping: - transport._schedule_cleanup( - SuperDocError('overflow', code=HOST_PROTOCOL_ERROR), - ) + + transport._schedule_cleanup( + SuperDocError('overflow', code=HOST_PROTOCOL_ERROR), + ) assert transport._cleanup_task is None + assert transport.state == 'CONNECTED' + transport._stopping = False await transport.dispose() finally: @@ -675,14 +682,18 @@ async def test_kill_and_reset_awaits_in_flight_cleanup(self): # Replace _cleanup with a tracking stub so we can count entries # and verify the second call observes the first task instead of - # creating a fresh one. + # creating a fresh one. Use Events for deterministic ordering + # rather than asyncio.sleep(0) (which is implementation-defined + # under uvloop / Python scheduling changes). entry_count = 0 + started = asyncio.Event() release = asyncio.Event() real_cleanup = transport._cleanup async def tracking_cleanup(error): nonlocal entry_count entry_count += 1 + started.set() # First entry blocks until the test releases it; subsequent # entries (if any) would race past — failure mode for the bug. await release.wait() @@ -693,17 +704,17 @@ async def tracking_cleanup(error): transport._schedule_cleanup( SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), ) - # Yield so the cleanup task starts and increments entry_count. - await asyncio.sleep(0) + await asyncio.wait_for(started.wait(), timeout=2.0) assert entry_count == 1 assert transport._cleanup_task is not None assert not transport._cleanup_task.done() kill_task = asyncio.create_task(transport._kill_and_reset()) - await asyncio.sleep(0) - # _kill_and_reset must NOT start a second _cleanup — it should - # await the in-flight one. entry_count stays at 1. + # Give kill_task a chance to enter — but it must NOT start a + # second _cleanup (which would re-fire `started`). + await asyncio.sleep(0.05) assert entry_count == 1 + assert not kill_task.done() release.set() await kill_task @@ -723,10 +734,12 @@ async def test_dispose_waits_for_in_flight_cleanup(self): transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) await transport.connect() + started = asyncio.Event() release = asyncio.Event() real_cleanup = transport._cleanup async def slow_cleanup(error): + started.set() await release.wait() await real_cleanup(error) @@ -735,11 +748,11 @@ async def slow_cleanup(error): transport._schedule_cleanup( SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), ) - await asyncio.sleep(0) + await asyncio.wait_for(started.wait(), timeout=2.0) assert transport.state == 'DISPOSING' dispose_task = asyncio.create_task(transport.dispose()) - await asyncio.sleep(0) + await asyncio.sleep(0.05) # dispose must still be waiting on the cleanup task. assert not dispose_task.done() @@ -747,5 +760,184 @@ async def slow_cleanup(error): await dispose_task assert transport.state == 'DISCONNECTED' assert transport._process is None + assert transport._cleanup_task is None finally: _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_ensure_connected_drains_in_flight_cleanup_before_spawn(self): + # Round-3 regression: without this drain, `_start_host` reassigns + # `self._process` while a stale `_cleanup` task is still scheduled; + # the cleanup then kills the freshly-spawned process. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + old_process = transport._process + assert old_process is not None + + started = asyncio.Event() + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def slow_cleanup(error): + started.set() + await release.wait() + await real_cleanup(error) + + transport._cleanup = slow_cleanup # type: ignore[assignment] + + transport._schedule_cleanup( + SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), + ) + await asyncio.wait_for(started.wait(), timeout=2.0) + + connect_task = asyncio.create_task(transport.connect()) + await asyncio.sleep(0.05) + # connect() must be blocked on the in-flight cleanup, not racing + # ahead to spawn a fresh process the cleanup would then kill. + assert not connect_task.done() + + release.set() + await connect_task + new_process = transport._process + assert new_process is not None + assert new_process is not old_process + # The fresh process must NOT have been killed by the stale cleanup. + assert new_process.returncode is None + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_kill_and_reset_caller_cancellation_does_not_cancel_cleanup(self): + # Round-3 regression: without `asyncio.shield`, cancelling the + # awaiter of `_kill_and_reset` propagates into the cleanup task, + # interrupting it mid-flight before `_process` is fully reaped and + # leaving state wedged in DISPOSING. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + started = asyncio.Event() + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def slow_cleanup(error): + started.set() + try: + await release.wait() + except asyncio.CancelledError: + # If shield works, this should NOT fire. Re-raise so the + # test's assertion catches the regression. + raise + await real_cleanup(error) + + transport._cleanup = slow_cleanup # type: ignore[assignment] + + kill_task = asyncio.create_task(transport._kill_and_reset()) + await asyncio.wait_for(started.wait(), timeout=2.0) + + kill_task.cancel() + with pytest.raises(asyncio.CancelledError): + await kill_task + + # Cleanup must keep running despite kill_task being cancelled. + assert transport._cleanup_task is not None + assert not transport._cleanup_task.done() + + release.set() + await transport._cleanup_task + assert transport.state == 'DISCONNECTED' + assert transport._process is None + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli) + + +class TestAsyncOverflowConcurrency: + """Concurrency scenarios for the buffer-overflow path.""" + + @pytest.mark.asyncio + async def test_overflow_rejects_all_pending_invokes(self): + # Codex/Opus round-3 gap: every pending future must be rejected with + # HOST_PROTOCOL_ERROR — not just the one whose response overflowed. + # A regression where _reject_all_pending only rejects pending[msg.id] + # would silently leave concurrent callers hanging until watchdog. + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [ + {'data': {'content': big_payload}}, + {'data': {'v': 2}}, + {'data': {'v': 3}}, + ], + }) + try: + transport = AsyncHostTransport( + cli, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + watchdog_timeout_ms=10_000, + ) + await transport.connect() + tasks = [ + asyncio.ensure_future(transport.invoke(_TEST_OP, {'query': f'q{i}'})) + for i in range(3) + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + assert all(isinstance(r, SuperDocError) for r in results), results + assert all(r.code == HOST_PROTOCOL_ERROR for r in results) + # Every error must carry the actionable hint, not just the first. + assert all('stdout_buffer_limit_bytes' in str(r) for r in results) + assert transport._pending == {} + assert transport.state == 'DISCONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_reconnect_after_buffer_overflow(self): + # Sync transport has test_reconnect_after_failure; async previously + # only had reconnect-after-explicit-dispose. After reader-triggered + # cleanup the transport must be reusable for a fresh invoke without + # leaving _cleanup_task / _connecting / _process in a wedged state. + cli1 = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': 'x' * (200 * 1024)}}], + }) + transport = None + try: + transport = AsyncHostTransport( + cli1, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + ) + await transport.connect() + with pytest.raises(SuperDocError) as exc_info: + await transport.invoke(_TEST_OP, {'query': 'big'}) + assert exc_info.value.code == HOST_PROTOCOL_ERROR + cleanup_task = transport._cleanup_task + if cleanup_task is not None: + await cleanup_task + assert transport.state == 'DISCONNECTED' + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli1) + + cli2 = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'v': 'reconnected'}}], + }) + try: + # Reuse the transport — point at a healthy host with default buffer. + transport._cli_bin = cli2 + transport._stdout_buffer_limit_bytes = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES + result = await transport.invoke(_TEST_OP, {'query': 'again'}) + assert result == {'v': 'reconnected'} + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli2)