Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/sdk/langs/python/superdoc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
get_tool_catalog,
list_tools,
)
from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES

__all__ = [
"SuperDocClient",
"AsyncSuperDocClient",
"SuperDocDocument",
"AsyncSuperDocDocument",
"SuperDocError",
"DEFAULT_STDOUT_BUFFER_LIMIT_BYTES",
"get_skill",
"install_skill",
"list_skills",
Expand Down
5 changes: 5 additions & 0 deletions packages/sdk/langs/python/superdoc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DocOpenResult as GeneratedDocOpenResult,
)
from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime
from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES

UserIdentity = Dict[str, str]

Expand Down Expand Up @@ -340,6 +341,9 @@ def __init__(
request_timeout_ms: int | None = None,
watchdog_timeout_ms: int = 30_000,
max_queue_depth: int = 100,
# Raise if a single host response can exceed this size (e.g. reading
# very large documents); otherwise the default is safe.
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
default_change_mode: Literal['direct', 'tracked'] | None = None,
user: UserIdentity | None = None,
) -> None:
Expand All @@ -350,6 +354,7 @@ def __init__(
request_timeout_ms=request_timeout_ms,
watchdog_timeout_ms=watchdog_timeout_ms,
max_queue_depth=max_queue_depth,
stdout_buffer_limit_bytes=stdout_buffer_limit_bytes,
default_change_mode=default_change_mode,
user=user,
)
Expand Down
8 changes: 7 additions & 1 deletion packages/sdk/langs/python/superdoc/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
from .embedded_cli import resolve_embedded_cli_path
from .generated.contract import OPERATION_INDEX
from .protocol import normalize_default_change_mode
from .transport import AsyncHostTransport, SyncHostTransport
from .transport import (
DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
AsyncHostTransport,
SyncHostTransport,
)


class SuperDocSyncRuntime:
Expand Down Expand Up @@ -79,6 +83,7 @@ def __init__(
request_timeout_ms: Optional[int] = None,
watchdog_timeout_ms: int = 30_000,
max_queue_depth: int = 100,
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
default_change_mode: Optional[str] = None,
user: Optional[Dict[str, str]] = None,
) -> None:
Expand All @@ -93,6 +98,7 @@ def __init__(
request_timeout_ms=request_timeout_ms,
watchdog_timeout_ms=watchdog_timeout_ms,
max_queue_depth=max_queue_depth,
stdout_buffer_limit_bytes=stdout_buffer_limit_bytes,
default_change_mode=self._default_change_mode,
user=user,
)
Expand Down
214 changes: 179 additions & 35 deletions packages/sdk/langs/python/superdoc/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@

logger = logging.getLogger('superdoc.transport')

# Default stdout StreamReader buffer for the async transport. Host responses
# are single newline-delimited JSON lines, so this caps the largest individual
# response a caller can receive. Raise it if your workload routinely produces
# responses above this size (e.g. whole-document reads on very large docs).
DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024

# Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug.
# Only configures the named logger — never mutates root logging config.
_log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower()
Expand Down Expand Up @@ -399,6 +405,7 @@ def __init__(
request_timeout_ms: Optional[int] = None,
watchdog_timeout_ms: int = 30_000,
max_queue_depth: int = 100,
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
default_change_mode: Optional[ChangeMode] = None,
user: Optional[Dict[str, str]] = None,
) -> None:
Expand All @@ -409,11 +416,13 @@ def __init__(
self._request_timeout_ms = request_timeout_ms
self._watchdog_timeout_ms = watchdog_timeout_ms
self._max_queue_depth = max_queue_depth
self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes
self._default_change_mode = default_change_mode
self._user = user

self._process: Optional[asyncio.subprocess.Process] = None
self._reader_task: Optional[asyncio.Task] = None
self._cleanup_task: Optional[asyncio.Task] = None
self._pending: Dict[int, asyncio.Future] = {}
self._state = _State.DISCONNECTED
self._next_request_id = 1
Expand All @@ -428,7 +437,22 @@ async def connect(self) -> None:

async def dispose(self) -> None:
"""Gracefully shut down the host process."""
if self._state == _State.DISCONNECTED or self._state == _State.DISPOSING:
if self._state == _State.DISCONNECTED:
return
if self._state == _State.DISPOSING:
# A reader-triggered cleanup is in flight (or an earlier teardown
# left state in DISPOSING briefly). Wait for it so the caller
# observes "host fully torn down" by the time dispose() returns.
# shield() so a cancelled dispose() doesn't interrupt _cleanup
# mid-flight and leak the host process.
existing = self._cleanup_task
if existing and not existing.done():
try:
await asyncio.shield(existing)
except asyncio.CancelledError:
raise
except Exception:
pass
return

self._stopping = True
Expand Down Expand Up @@ -507,6 +531,20 @@ async def invoke(

async def _ensure_connected(self) -> None:
"""Lazy connect: spawn and handshake if not already connected."""
# Drain any in-flight teardown before spawning a new host. Without
# this, a concurrent reader-triggered cleanup would still be running
# when _start_host reassigns self._process / self._reader_task; the
# cleanup task would then cancel the fresh reader and kill the fresh
# process. shield() so we don't cancel the cleanup if our caller is.
cleanup = self._cleanup_task
if cleanup and not cleanup.done():
try:
await asyncio.shield(cleanup)
except asyncio.CancelledError:
raise
except Exception:
pass

if self._state == _State.CONNECTED and self._process and self._process.returncode is None:
return

Expand All @@ -531,12 +569,15 @@ async def _start_host(self) -> None:
args = [*prefix_args, 'host', '--stdio']

try:
# ``limit`` raises asyncio's StreamReader buffer above its 64 KiB
# default; host responses are single JSON lines and can exceed it.
self._process = await asyncio.create_subprocess_exec(
command, *args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
env={**os.environ, **self._env},
limit=self._stdout_buffer_limit_bytes,
Comment thread
caio-pizzol marked this conversation as resolved.
)
logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin)
except Exception as exc:
Expand Down Expand Up @@ -582,7 +623,29 @@ async def _reader_loop(self) -> None:

try:
while True:
raw = await process.stdout.readline()
try:
raw = await process.stdout.readline()
except ValueError as exc:
# asyncio.StreamReader.readline() re-raises LimitOverrunError
# from readuntil() as ValueError when a single line exceeds
# `limit` (see CPython asyncio/streams.py). The host is still
# alive — schedule cleanup so a later dispose() doesn't
# short-circuit on DISCONNECTED state. Scoped to readline()
# only so unrelated ValueErrors from dispatch aren't
# reclassified as a buffer-limit error. _schedule_cleanup
# is a no-op when _stopping is set (graceful dispose path).
logger.debug('Reader loop buffer overflow: %s', exc)
self._schedule_cleanup(SuperDocError(
'Host response exceeded stdout buffer limit. '
'Raise stdout_buffer_limit_bytes to accommodate larger responses.',
code=HOST_PROTOCOL_ERROR,
details={
'message': str(exc),
'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes,
},
))
return

if not raw:
# EOF — process died.
break
Expand Down Expand Up @@ -614,16 +677,16 @@ async def _reader_loop(self) -> None:
except Exception as exc:
logger.debug('Reader loop error: %s', exc)

# Reader exited (EOF or error) — reject all pending futures.
if not self._stopping:
exit_code = process.returncode
error = SuperDocError(
'Host process disconnected.',
code=HOST_DISCONNECTED,
details={'exit_code': exit_code, 'signal': None},
)
self._reject_all_pending(error)
self._state = _State.DISCONNECTED
# Reader exited (EOF or unexpected error) — tear down the process so
# no orphaned host is left running, then reject pending futures.
# _schedule_cleanup is a no-op when _stopping is set (graceful
# dispose path) so we don't race the dispose teardown.
exit_code = process.returncode
self._schedule_cleanup(SuperDocError(
'Host process disconnected.',
code=HOST_DISCONNECTED,
details={'exit_code': exit_code, 'signal': None},
))

async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any:
"""Send a JSON-RPC request and await the matching response future."""
Expand Down Expand Up @@ -687,39 +750,120 @@ def _reject_all_pending(self, error: SuperDocError) -> None:
future.set_exception(error)

async def _kill_and_reset(self) -> None:
"""Kill the host process and reset to DISCONNECTED."""
await self._cleanup(
SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED),
)

async def _cleanup(self, error: Optional[SuperDocError]) -> None:
"""Cancel reader, kill process, reject pending, reset state."""
if self._reader_task and not self._reader_task.done():
self._reader_task.cancel()
"""Kill the host process and reset to DISCONNECTED.

Coordinates with `_schedule_cleanup` so callers (e.g. `_send_request`
on watchdog timeout or stdin write failure) don't run a parallel
`_cleanup` that races a reader-triggered cleanup on
`_reject_all_pending` and `process.kill`. If a cleanup is already in
flight, await it; otherwise own a fresh task in the same slot so a
later concurrent caller sees us instead of starting its own.

shield() the await so caller cancellation (e.g. an `invoke()` task
that times out and is then cancelled by the user) does NOT propagate
into `_cleanup` — interrupting cleanup mid-flight would leak the
subprocess and wedge state in DISPOSING.
"""
existing = self._cleanup_task
if existing and not existing.done():
try:
await self._reader_task
except (asyncio.CancelledError, Exception):
await asyncio.shield(existing)
except asyncio.CancelledError:
raise
except Exception:
pass
self._reader_task = None
return
self._state = _State.DISPOSING
task = asyncio.create_task(self._cleanup(
SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED),
))
self._cleanup_task = task
try:
await asyncio.shield(task)
except asyncio.CancelledError:
raise
except Exception:
pass

def _schedule_cleanup(self, error: SuperDocError) -> None:
"""Fire-and-forget teardown from inside the reader task.

Why a separate task: `_cleanup` cancels and awaits `self._reader_task`.
Awaiting it from inside the reader itself would deadlock — so we punt
to a fresh task, and by the time it runs the reader has already
returned (so cancel+await is a no-op).

Synchronously flips state to DISPOSING so concurrent `invoke()` callers
observe the failed transport immediately rather than passing the
CONNECTED fast path and blocking on a future the dead reader can never
resolve until `watchdog_timeout_ms`.

Skips when `_stopping` is set: a graceful `dispose()` is already
tearing down, and a parallel cleanup task would race on
`_reject_all_pending` and `process.kill`.

Idempotent: if a cleanup is already in flight, subsequent errors are
dropped — the first one wins. Callers may observe completion via
`self._cleanup_task`.
"""
if self._stopping:
return
if self._cleanup_task and not self._cleanup_task.done():
return
self._state = _State.DISPOSING
self._cleanup_task = asyncio.create_task(self._cleanup(error))

async def _cleanup(self, error: Optional[SuperDocError]) -> None:
"""Cancel reader, kill process, reject pending, reset state.

Capture handles and flip user-visible state SYNCHRONOUSLY at the top
before any awaits. That way, even if cancellation arrives during
`process.wait()`, observers see a consistent "torn down" transport
(state DISCONNECTED, _process None, pending futures rejected) rather
than a half-disposed one. The async work below is best-effort
process reaping.
"""
# Snapshot and clear before any await so concurrent callers see a
# fully torn-down transport from this point on.
reader_task = self._reader_task
process = self._process
if process:
try:
process.kill()
except Exception:
pass
try:
await asyncio.wait_for(process.wait(), timeout=2)
except (asyncio.TimeoutError, Exception):
pass
self._reader_task = None
self._process = None
self._state = _State.DISCONNECTED

if error:
if error is not None:
self._reject_all_pending(error)
else:
# Dispose path — reject remaining with generic disconnect.
self._reject_all_pending(
SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED),
)

self._state = _State.DISCONNECTED
try:
if reader_task and not reader_task.done():
reader_task.cancel()
try:
await reader_task
except (asyncio.CancelledError, Exception):
pass

if process:
try:
process.kill()
except Exception:
pass
try:
await asyncio.wait_for(process.wait(), timeout=2)
except (asyncio.TimeoutError, asyncio.CancelledError, Exception):
pass
finally:
# Release the task handle if we are the in-flight cleanup task,
# so introspection doesn't surface a stale done handle and the
# next teardown gets a fresh slot. Skip when called inline (e.g.
# from dispose) — that current task is not our cleanup task.
try:
current = asyncio.current_task()
except RuntimeError:
current = None
if current is not None and self._cleanup_task is current:
self._cleanup_task = None
Loading
Loading