diff --git a/packages/sdk/langs/python/superdoc/__init__.py b/packages/sdk/langs/python/superdoc/__init__.py index 9682645771..fa0b628a52 100644 --- a/packages/sdk/langs/python/superdoc/__init__.py +++ b/packages/sdk/langs/python/superdoc/__init__.py @@ -10,6 +10,7 @@ get_tool_catalog, list_tools, ) +from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES __all__ = [ "SuperDocClient", @@ -17,6 +18,7 @@ "SuperDocDocument", "AsyncSuperDocDocument", "SuperDocError", + "DEFAULT_STDOUT_BUFFER_LIMIT_BYTES", "get_skill", "install_skill", "list_skills", diff --git a/packages/sdk/langs/python/superdoc/client.py b/packages/sdk/langs/python/superdoc/client.py index f187895b04..9291399518 100644 --- a/packages/sdk/langs/python/superdoc/client.py +++ b/packages/sdk/langs/python/superdoc/client.py @@ -22,6 +22,7 @@ DocOpenResult as GeneratedDocOpenResult, ) from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime +from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES UserIdentity = Dict[str, str] @@ -340,6 +341,9 @@ def __init__( request_timeout_ms: int | None = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + # Raise if a single host response can exceed this size (e.g. reading + # very large documents); otherwise the default is safe. + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Literal['direct', 'tracked'] | None = None, user: UserIdentity | None = None, ) -> None: @@ -350,6 +354,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/runtime.py b/packages/sdk/langs/python/superdoc/runtime.py index e303754502..25dc3727e0 100644 --- a/packages/sdk/langs/python/superdoc/runtime.py +++ b/packages/sdk/langs/python/superdoc/runtime.py @@ -14,7 +14,11 @@ from .embedded_cli import resolve_embedded_cli_path from .generated.contract import OPERATION_INDEX from .protocol import normalize_default_change_mode -from .transport import AsyncHostTransport, SyncHostTransport +from .transport import ( + DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, + AsyncHostTransport, + SyncHostTransport, +) class SuperDocSyncRuntime: @@ -79,6 +83,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[str] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -93,6 +98,7 @@ def __init__( request_timeout_ms=request_timeout_ms, watchdog_timeout_ms=watchdog_timeout_ms, max_queue_depth=max_queue_depth, + stdout_buffer_limit_bytes=stdout_buffer_limit_bytes, default_change_mode=self._default_change_mode, user=user, ) diff --git a/packages/sdk/langs/python/superdoc/transport.py b/packages/sdk/langs/python/superdoc/transport.py index fec79e9c8e..d7c4a9ca0e 100644 --- a/packages/sdk/langs/python/superdoc/transport.py +++ b/packages/sdk/langs/python/superdoc/transport.py @@ -43,6 +43,12 @@ logger = logging.getLogger('superdoc.transport') +# Default stdout StreamReader buffer for the async transport. Host responses +# are single newline-delimited JSON lines, so this caps the largest individual +# response a caller can receive. Raise it if your workload routinely produces +# responses above this size (e.g. whole-document reads on very large docs). +DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024 + # Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug. # Only configures the named logger — never mutates root logging config. _log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower() @@ -399,6 +405,7 @@ def __init__( request_timeout_ms: Optional[int] = None, watchdog_timeout_ms: int = 30_000, max_queue_depth: int = 100, + stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, default_change_mode: Optional[ChangeMode] = None, user: Optional[Dict[str, str]] = None, ) -> None: @@ -409,11 +416,13 @@ def __init__( self._request_timeout_ms = request_timeout_ms self._watchdog_timeout_ms = watchdog_timeout_ms self._max_queue_depth = max_queue_depth + self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes self._default_change_mode = default_change_mode self._user = user self._process: Optional[asyncio.subprocess.Process] = None self._reader_task: Optional[asyncio.Task] = None + self._cleanup_task: Optional[asyncio.Task] = None self._pending: Dict[int, asyncio.Future] = {} self._state = _State.DISCONNECTED self._next_request_id = 1 @@ -428,7 +437,22 @@ async def connect(self) -> None: async def dispose(self) -> None: """Gracefully shut down the host process.""" - if self._state == _State.DISCONNECTED or self._state == _State.DISPOSING: + if self._state == _State.DISCONNECTED: + return + if self._state == _State.DISPOSING: + # A reader-triggered cleanup is in flight (or an earlier teardown + # left state in DISPOSING briefly). Wait for it so the caller + # observes "host fully torn down" by the time dispose() returns. + # shield() so a cancelled dispose() doesn't interrupt _cleanup + # mid-flight and leak the host process. + existing = self._cleanup_task + if existing and not existing.done(): + try: + await asyncio.shield(existing) + except asyncio.CancelledError: + raise + except Exception: + pass return self._stopping = True @@ -507,6 +531,20 @@ async def invoke( async def _ensure_connected(self) -> None: """Lazy connect: spawn and handshake if not already connected.""" + # Drain any in-flight teardown before spawning a new host. Without + # this, a concurrent reader-triggered cleanup would still be running + # when _start_host reassigns self._process / self._reader_task; the + # cleanup task would then cancel the fresh reader and kill the fresh + # process. shield() so we don't cancel the cleanup if our caller is. + cleanup = self._cleanup_task + if cleanup and not cleanup.done(): + try: + await asyncio.shield(cleanup) + except asyncio.CancelledError: + raise + except Exception: + pass + if self._state == _State.CONNECTED and self._process and self._process.returncode is None: return @@ -531,12 +569,15 @@ async def _start_host(self) -> None: args = [*prefix_args, 'host', '--stdio'] try: + # ``limit`` raises asyncio's StreamReader buffer above its 64 KiB + # default; host responses are single JSON lines and can exceed it. self._process = await asyncio.create_subprocess_exec( command, *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, env={**os.environ, **self._env}, + limit=self._stdout_buffer_limit_bytes, ) logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin) except Exception as exc: @@ -582,7 +623,29 @@ async def _reader_loop(self) -> None: try: while True: - raw = await process.stdout.readline() + try: + raw = await process.stdout.readline() + except ValueError as exc: + # asyncio.StreamReader.readline() re-raises LimitOverrunError + # from readuntil() as ValueError when a single line exceeds + # `limit` (see CPython asyncio/streams.py). The host is still + # alive — schedule cleanup so a later dispose() doesn't + # short-circuit on DISCONNECTED state. Scoped to readline() + # only so unrelated ValueErrors from dispatch aren't + # reclassified as a buffer-limit error. _schedule_cleanup + # is a no-op when _stopping is set (graceful dispose path). + logger.debug('Reader loop buffer overflow: %s', exc) + self._schedule_cleanup(SuperDocError( + 'Host response exceeded stdout buffer limit. ' + 'Raise stdout_buffer_limit_bytes to accommodate larger responses.', + code=HOST_PROTOCOL_ERROR, + details={ + 'message': str(exc), + 'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes, + }, + )) + return + if not raw: # EOF — process died. break @@ -614,16 +677,16 @@ async def _reader_loop(self) -> None: except Exception as exc: logger.debug('Reader loop error: %s', exc) - # Reader exited (EOF or error) — reject all pending futures. - if not self._stopping: - exit_code = process.returncode - error = SuperDocError( - 'Host process disconnected.', - code=HOST_DISCONNECTED, - details={'exit_code': exit_code, 'signal': None}, - ) - self._reject_all_pending(error) - self._state = _State.DISCONNECTED + # Reader exited (EOF or unexpected error) — tear down the process so + # no orphaned host is left running, then reject pending futures. + # _schedule_cleanup is a no-op when _stopping is set (graceful + # dispose path) so we don't race the dispose teardown. + exit_code = process.returncode + self._schedule_cleanup(SuperDocError( + 'Host process disconnected.', + code=HOST_DISCONNECTED, + details={'exit_code': exit_code, 'signal': None}, + )) async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any: """Send a JSON-RPC request and await the matching response future.""" @@ -687,34 +750,88 @@ def _reject_all_pending(self, error: SuperDocError) -> None: future.set_exception(error) async def _kill_and_reset(self) -> None: - """Kill the host process and reset to DISCONNECTED.""" - await self._cleanup( - SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED), - ) - - async def _cleanup(self, error: Optional[SuperDocError]) -> None: - """Cancel reader, kill process, reject pending, reset state.""" - if self._reader_task and not self._reader_task.done(): - self._reader_task.cancel() + """Kill the host process and reset to DISCONNECTED. + + Coordinates with `_schedule_cleanup` so callers (e.g. `_send_request` + on watchdog timeout or stdin write failure) don't run a parallel + `_cleanup` that races a reader-triggered cleanup on + `_reject_all_pending` and `process.kill`. If a cleanup is already in + flight, await it; otherwise own a fresh task in the same slot so a + later concurrent caller sees us instead of starting its own. + + shield() the await so caller cancellation (e.g. an `invoke()` task + that times out and is then cancelled by the user) does NOT propagate + into `_cleanup` — interrupting cleanup mid-flight would leak the + subprocess and wedge state in DISPOSING. + """ + existing = self._cleanup_task + if existing and not existing.done(): try: - await self._reader_task - except (asyncio.CancelledError, Exception): + await asyncio.shield(existing) + except asyncio.CancelledError: + raise + except Exception: pass - self._reader_task = None + return + self._state = _State.DISPOSING + task = asyncio.create_task(self._cleanup( + SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED), + )) + self._cleanup_task = task + try: + await asyncio.shield(task) + except asyncio.CancelledError: + raise + except Exception: + pass + def _schedule_cleanup(self, error: SuperDocError) -> None: + """Fire-and-forget teardown from inside the reader task. + + Why a separate task: `_cleanup` cancels and awaits `self._reader_task`. + Awaiting it from inside the reader itself would deadlock — so we punt + to a fresh task, and by the time it runs the reader has already + returned (so cancel+await is a no-op). + + Synchronously flips state to DISPOSING so concurrent `invoke()` callers + observe the failed transport immediately rather than passing the + CONNECTED fast path and blocking on a future the dead reader can never + resolve until `watchdog_timeout_ms`. + + Skips when `_stopping` is set: a graceful `dispose()` is already + tearing down, and a parallel cleanup task would race on + `_reject_all_pending` and `process.kill`. + + Idempotent: if a cleanup is already in flight, subsequent errors are + dropped — the first one wins. Callers may observe completion via + `self._cleanup_task`. + """ + if self._stopping: + return + if self._cleanup_task and not self._cleanup_task.done(): + return + self._state = _State.DISPOSING + self._cleanup_task = asyncio.create_task(self._cleanup(error)) + + async def _cleanup(self, error: Optional[SuperDocError]) -> None: + """Cancel reader, kill process, reject pending, reset state. + + Capture handles and flip user-visible state SYNCHRONOUSLY at the top + before any awaits. That way, even if cancellation arrives during + `process.wait()`, observers see a consistent "torn down" transport + (state DISCONNECTED, _process None, pending futures rejected) rather + than a half-disposed one. The async work below is best-effort + process reaping. + """ + # Snapshot and clear before any await so concurrent callers see a + # fully torn-down transport from this point on. + reader_task = self._reader_task process = self._process - if process: - try: - process.kill() - except Exception: - pass - try: - await asyncio.wait_for(process.wait(), timeout=2) - except (asyncio.TimeoutError, Exception): - pass + self._reader_task = None self._process = None + self._state = _State.DISCONNECTED - if error: + if error is not None: self._reject_all_pending(error) else: # Dispose path — reject remaining with generic disconnect. @@ -722,4 +839,31 @@ async def _cleanup(self, error: Optional[SuperDocError]) -> None: SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED), ) - self._state = _State.DISCONNECTED + try: + if reader_task and not reader_task.done(): + reader_task.cancel() + try: + await reader_task + except (asyncio.CancelledError, Exception): + pass + + if process: + try: + process.kill() + except Exception: + pass + try: + await asyncio.wait_for(process.wait(), timeout=2) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception): + pass + finally: + # Release the task handle if we are the in-flight cleanup task, + # so introspection doesn't surface a stale done handle and the + # next teardown gets a fresh slot. Skip when called inline (e.g. + # from dispose) — that current task is not our cleanup task. + try: + current = asyncio.current_task() + except RuntimeError: + current = None + if current is not None and self._cleanup_task is current: + self._cleanup_task = None diff --git a/packages/sdk/langs/python/tests/test_transport.py b/packages/sdk/langs/python/tests/test_transport.py index d71bafa186..f54c0c7a71 100644 --- a/packages/sdk/langs/python/tests/test_transport.py +++ b/packages/sdk/langs/python/tests/test_transport.py @@ -24,7 +24,11 @@ HOST_TIMEOUT, SuperDocError, ) -from superdoc.transport import AsyncHostTransport, SyncHostTransport +from superdoc.transport import ( + DEFAULT_STDOUT_BUFFER_LIMIT_BYTES, + AsyncHostTransport, + SyncHostTransport, +) MOCK_HOST = os.path.join(os.path.dirname(__file__), 'mock_host.py') @@ -447,9 +451,24 @@ async def test_host_crash(self): try: transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) await transport.connect() + process = transport._process + assert process is not None with pytest.raises(SuperDocError) as exc_info: await transport.invoke(_TEST_OP, {'query': 'test'}) - assert exc_info.value.code in (HOST_DISCONNECTED, HOST_TIMEOUT) + # The reader-loop EOF branch now goes through _schedule_cleanup, + # which rejects the pending future synchronously enough that the + # invoke() never has to fall back to the watchdog timeout. + assert exc_info.value.code == HOST_DISCONNECTED + + # Cleanup must tear the process down — pre-fix, the inline + # _reject_all_pending + state flip left the process orphaned. + cleanup_task = transport._cleanup_task + if cleanup_task is not None: + await cleanup_task + assert transport._process is None + assert transport.state == 'DISCONNECTED' + await process.wait() + assert process.returncode is not None finally: _cleanup_wrapper(cli) @@ -493,3 +512,432 @@ async def test_reuse_after_dispose(self): await transport.dispose() finally: _cleanup_wrapper(cli2) + + +class TestAsyncLargeResponse: + """Responses larger than the StreamReader buffer must not crash the reader.""" + + @pytest.mark.asyncio + async def test_response_above_asyncio_default_streamreader_limit(self): + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + result = await transport.invoke(_TEST_OP, {'query': 'big'}) + assert result == {'content': big_payload} + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_response_above_custom_buffer_limit_raises_protocol_error(self): + # Setting stdout_buffer_limit_bytes below the response size should + # surface HOST_PROTOCOL_ERROR (actionable) rather than + # HOST_DISCONNECTED (misleading — the host is still alive), and the + # error should carry a hint to raise the buffer limit. + from superdoc.errors import HOST_PROTOCOL_ERROR + + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': big_payload}}], + }) + try: + transport = AsyncHostTransport( + cli, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + ) + await transport.connect() + process = transport._process + assert process is not None + with pytest.raises(SuperDocError) as exc_info: + await transport.invoke(_TEST_OP, {'query': 'big'}) + assert exc_info.value.code == HOST_PROTOCOL_ERROR + assert 'stdout_buffer_limit_bytes' in str(exc_info.value) + + # The host process must be torn down — not just the transport + # state flipped to DISCONNECTED. Otherwise dispose() short-circuits + # and leaves an orphaned host running. + cleanup_task = transport._cleanup_task + if cleanup_task is not None: + await cleanup_task + assert transport._process is None + assert transport.state == 'DISCONNECTED' + # The captured handle should be reaped by _cleanup; await wait() + # rather than reading returncode to avoid a CI-timing flake if the + # 2 s wait inside _cleanup didn't finish reaping in time. + await process.wait() + assert process.returncode is not None + + # dispose() after an overflow must be a safe no-op: state and + # process stay as cleanup left them, no exception is raised, and + # a second dispose() is also safe. + await transport.dispose() + assert transport.state == 'DISCONNECTED' + assert transport._process is None + await transport.dispose() + assert transport.state == 'DISCONNECTED' + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_client_threads_stdout_buffer_limit_to_transport(self): + # End-to-end wiring check: the public AsyncSuperDocClient constructor + # must thread stdout_buffer_limit_bytes through SuperDocAsyncRuntime + # into AsyncHostTransport. Without this, a silent drop in client.py + # or runtime.py would leave the existing overflow test passing while + # the public API reverts to the asyncio 64 KiB default. + from superdoc.client import AsyncSuperDocClient + + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + client = AsyncSuperDocClient( + env={'SUPERDOC_CLI_BIN': cli}, + stdout_buffer_limit_bytes=64 * 1024, + ) + transport = client._runtime._transport + assert transport._stdout_buffer_limit_bytes == 64 * 1024 + finally: + _cleanup_wrapper(cli) + + +class TestAsyncCleanupLifecycle: + """Lock down the cleanup-task slot so its load-bearing invariants don't + silently regress: the dedupe guard, the _stopping suppression branch, + the _kill_and_reset coordination with reader-triggered cleanup, and the + _ensure_connected drain that prevents stale cleanup from killing a + freshly-spawned host. + """ + + @pytest.mark.asyncio + async def test_schedule_cleanup_dedupe_guard_drops_reentrant_call(self): + # If a cleanup task is already in flight, a second _schedule_cleanup + # must NOT replace it — that would cancel the in-flight teardown + # mid-flight and could leak the host process. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + slow = asyncio.create_task(asyncio.sleep(0.5)) + transport._cleanup_task = slow + + transport._schedule_cleanup( + SuperDocError('second', code=HOST_DISCONNECTED), + ) + # Slot must still point at the original task — second call dropped. + assert transport._cleanup_task is slow + + slow.cancel() + try: + await slow + except (asyncio.CancelledError, Exception): + pass + transport._cleanup_task = None + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_schedule_cleanup_skipped_when_stopping(self): + # When `dispose()` is in progress, `_stopping` is set; the production + # guard inside `_schedule_cleanup` must short-circuit so a reader + # overflow doesn't race the graceful teardown. (Earlier iterations + # of this test were tautological because the test re-checked + # `_stopping` before calling `_schedule_cleanup`. This version calls + # it unconditionally and asserts the production guard fires.) + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + transport._stopping = True + assert transport._cleanup_task is None + + transport._schedule_cleanup( + SuperDocError('overflow', code=HOST_PROTOCOL_ERROR), + ) + assert transport._cleanup_task is None + assert transport.state == 'CONNECTED' + + transport._stopping = False + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_kill_and_reset_awaits_in_flight_cleanup(self): + # If a reader-triggered cleanup is already running, _kill_and_reset + # must await it rather than spin up a parallel _cleanup that would + # race on _reject_all_pending and process.kill. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + # Replace _cleanup with a tracking stub so we can count entries + # and verify the second call observes the first task instead of + # creating a fresh one. Use Events for deterministic ordering + # rather than asyncio.sleep(0) (which is implementation-defined + # under uvloop / Python scheduling changes). + entry_count = 0 + started = asyncio.Event() + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def tracking_cleanup(error): + nonlocal entry_count + entry_count += 1 + started.set() + # First entry blocks until the test releases it; subsequent + # entries (if any) would race past — failure mode for the bug. + await release.wait() + await real_cleanup(error) + + transport._cleanup = tracking_cleanup # type: ignore[assignment] + + transport._schedule_cleanup( + SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), + ) + await asyncio.wait_for(started.wait(), timeout=2.0) + assert entry_count == 1 + assert transport._cleanup_task is not None + assert not transport._cleanup_task.done() + + kill_task = asyncio.create_task(transport._kill_and_reset()) + # Give kill_task a chance to enter — but it must NOT start a + # second _cleanup (which would re-fire `started`). + await asyncio.sleep(0.05) + assert entry_count == 1 + assert not kill_task.done() + + release.set() + await kill_task + assert entry_count == 1 + assert transport.state == 'DISCONNECTED' + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_dispose_waits_for_in_flight_cleanup(self): + # `dispose()` called while a reader-triggered cleanup is in flight + # must wait for it to finish, so the caller observes "fully torn + # down" by the time dispose returns. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + started = asyncio.Event() + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def slow_cleanup(error): + started.set() + await release.wait() + await real_cleanup(error) + + transport._cleanup = slow_cleanup # type: ignore[assignment] + + transport._schedule_cleanup( + SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), + ) + await asyncio.wait_for(started.wait(), timeout=2.0) + assert transport.state == 'DISPOSING' + + dispose_task = asyncio.create_task(transport.dispose()) + await asyncio.sleep(0.05) + # dispose must still be waiting on the cleanup task. + assert not dispose_task.done() + + release.set() + await dispose_task + assert transport.state == 'DISCONNECTED' + assert transport._process is None + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_ensure_connected_drains_in_flight_cleanup_before_spawn(self): + # Round-3 regression: without this drain, `_start_host` reassigns + # `self._process` while a stale `_cleanup` task is still scheduled; + # the cleanup then kills the freshly-spawned process. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + old_process = transport._process + assert old_process is not None + + started = asyncio.Event() + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def slow_cleanup(error): + started.set() + await release.wait() + await real_cleanup(error) + + transport._cleanup = slow_cleanup # type: ignore[assignment] + + transport._schedule_cleanup( + SuperDocError('reader-overflow', code=HOST_PROTOCOL_ERROR), + ) + await asyncio.wait_for(started.wait(), timeout=2.0) + + connect_task = asyncio.create_task(transport.connect()) + await asyncio.sleep(0.05) + # connect() must be blocked on the in-flight cleanup, not racing + # ahead to spawn a fresh process the cleanup would then kill. + assert not connect_task.done() + + release.set() + await connect_task + new_process = transport._process + assert new_process is not None + assert new_process is not old_process + # The fresh process must NOT have been killed by the stale cleanup. + assert new_process.returncode is None + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_kill_and_reset_caller_cancellation_does_not_cancel_cleanup(self): + # Round-3 regression: without `asyncio.shield`, cancelling the + # awaiter of `_kill_and_reset` propagates into the cleanup task, + # interrupting it mid-flight before `_process` is fully reaped and + # leaving state wedged in DISPOSING. + cli = _mock_cli_bin({'handshake': 'ok'}) + try: + transport = AsyncHostTransport(cli, startup_timeout_ms=5_000) + await transport.connect() + + started = asyncio.Event() + release = asyncio.Event() + real_cleanup = transport._cleanup + + async def slow_cleanup(error): + started.set() + try: + await release.wait() + except asyncio.CancelledError: + # If shield works, this should NOT fire. Re-raise so the + # test's assertion catches the regression. + raise + await real_cleanup(error) + + transport._cleanup = slow_cleanup # type: ignore[assignment] + + kill_task = asyncio.create_task(transport._kill_and_reset()) + await asyncio.wait_for(started.wait(), timeout=2.0) + + kill_task.cancel() + with pytest.raises(asyncio.CancelledError): + await kill_task + + # Cleanup must keep running despite kill_task being cancelled. + assert transport._cleanup_task is not None + assert not transport._cleanup_task.done() + + release.set() + await transport._cleanup_task + assert transport.state == 'DISCONNECTED' + assert transport._process is None + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli) + + +class TestAsyncOverflowConcurrency: + """Concurrency scenarios for the buffer-overflow path.""" + + @pytest.mark.asyncio + async def test_overflow_rejects_all_pending_invokes(self): + # Codex/Opus round-3 gap: every pending future must be rejected with + # HOST_PROTOCOL_ERROR — not just the one whose response overflowed. + # A regression where _reject_all_pending only rejects pending[msg.id] + # would silently leave concurrent callers hanging until watchdog. + big_payload = 'x' * (200 * 1024) + cli = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [ + {'data': {'content': big_payload}}, + {'data': {'v': 2}}, + {'data': {'v': 3}}, + ], + }) + try: + transport = AsyncHostTransport( + cli, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + watchdog_timeout_ms=10_000, + ) + await transport.connect() + tasks = [ + asyncio.ensure_future(transport.invoke(_TEST_OP, {'query': f'q{i}'})) + for i in range(3) + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + assert all(isinstance(r, SuperDocError) for r in results), results + assert all(r.code == HOST_PROTOCOL_ERROR for r in results) + # Every error must carry the actionable hint, not just the first. + assert all('stdout_buffer_limit_bytes' in str(r) for r in results) + assert transport._pending == {} + assert transport.state == 'DISCONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli) + + @pytest.mark.asyncio + async def test_reconnect_after_buffer_overflow(self): + # Sync transport has test_reconnect_after_failure; async previously + # only had reconnect-after-explicit-dispose. After reader-triggered + # cleanup the transport must be reusable for a fresh invoke without + # leaving _cleanup_task / _connecting / _process in a wedged state. + cli1 = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'content': 'x' * (200 * 1024)}}], + }) + transport = None + try: + transport = AsyncHostTransport( + cli1, + startup_timeout_ms=5_000, + stdout_buffer_limit_bytes=64 * 1024, + ) + await transport.connect() + with pytest.raises(SuperDocError) as exc_info: + await transport.invoke(_TEST_OP, {'query': 'big'}) + assert exc_info.value.code == HOST_PROTOCOL_ERROR + cleanup_task = transport._cleanup_task + if cleanup_task is not None: + await cleanup_task + assert transport.state == 'DISCONNECTED' + assert transport._cleanup_task is None + finally: + _cleanup_wrapper(cli1) + + cli2 = _mock_cli_bin({ + 'handshake': 'ok', + 'responses': [{'data': {'v': 'reconnected'}}], + }) + try: + # Reuse the transport — point at a healthy host with default buffer. + transport._cli_bin = cli2 + transport._stdout_buffer_limit_bytes = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES + result = await transport.invoke(_TEST_OP, {'query': 'again'}) + assert result == {'v': 'reconnected'} + assert transport.state == 'CONNECTED' + await transport.dispose() + finally: + _cleanup_wrapper(cli2)