diff --git a/CHANGELOG.md b/CHANGELOG.md index cbbce6b..8db74fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,10 @@ Parity catch-up with upstream `4.26.0`. No upstream version change. - **`RedisStateAdapter(token_prefix=...)`**: new `token_prefix` kwarg (default `"redis"`). Parameterizes the lock-token prefix for observability and interop. +- **`StreamingPlan` / `StreamingPlanOptions`** (`chat_sdk.plan`): a + `PostableObject` wrapping an async iterable with platform-specific + streaming options (`group_tasks`, `end_with`, `update_interval_ms`). + Mirrors upstream `streaming-plan.ts`. Issue #56. ### Upstream parity @@ -57,12 +61,22 @@ Parity catch-up with upstream `4.26.0`. No upstream version change. `[thread]` factory tests from `chat.test.ts` (existing-behavior coverage for `Chat.thread(id)`). Closes 8 fidelity gaps. - Ported 19 `[post with Plan]` tests from `thread.test.ts` — closes #55. +- Ported 6 `[Streaming]` StreamingPlan option-variant tests from upstream + `thread.test.ts` — closes #56. -### Fixes (parity with upstream Plan semantics) +### Fixes - **`Plan.update_task(input)` / `StreamingPlan.update_task(input)` now honor `input.id`** — previously only worked on the last in-progress task; with `id` set, targets that specific task and returns `None` for unknown IDs. Matches upstream `UpdateTaskInput` semantics. - **`Plan.add_task()` / `update_task()` now propagate `adapter.edit_object` errors** — previously swallowed and logged; upstream returns the chained promise so callers see failures. - **Plan edit queue is now actually sequential under concurrency** — previously racy under `asyncio.gather`; rewrote `_enqueue_edit` to build the chain synchronously before awaiting, matching upstream TS's `.then`-based chain. Fixes out-of-order edits when multiple `add_task`/`update_task` calls interleave. +- **`StreamingPlan` options now wired through `Thread.post()`** — the Python + port was missing the `StreamingPlan` class entirely, so `group_tasks` / + `end_with` / `update_interval_ms` were silently dropped (a plain async + iterable was the only way to stream, and options went nowhere). Upstream + already had the `kind === "stream"` branch that maps + `groupTasks → taskDisplayMode`, `endWith → stopBlocks`, and + `updateIntervalMs → updateIntervalMs` onto `StreamOptions` before invoking + `adapter.stream(...)` or the fallback `post+edit` path. Issue #56. ### Test hygiene diff --git a/docs/UPSTREAM_SYNC.md b/docs/UPSTREAM_SYNC.md index 9ea2e47..4db0bbf 100644 --- a/docs/UPSTREAM_SYNC.md +++ b/docs/UPSTREAM_SYNC.md @@ -459,6 +459,7 @@ stay explicit instead of being rediscovered in code review. | `SlackAdapter.current_token` / `current_client` | Public `@property` accessors that return the request-context-bound token and a preconfigured `AsyncWebClient` | Not exposed (`getToken()` is private on the TS `SlackAdapter`) | Python-only addition (issue #47). Downstream code that calls Slack Web APIs from inside a handler — email resolution, user profile fetches, reaction bookkeeping — otherwise depends on underscore-prefixed helpers. | | `ConcurrencyConfig.max_concurrent` | Enforced via `asyncio.Semaphore` in the `"concurrent"` strategy path; rejects non-integer or `<= 0` values, and rejects any non-`None` `max_concurrent` paired with a non-`"concurrent"` strategy | Accepted into the config type with docstring "Default: Infinity" but never read (3 writes, 0 reads) | Silent correctness bug upstream — consumers setting `max_concurrent=N` with `strategy="concurrent"` reasonably expect an N-way bound on in-flight handlers. We honor the documented contract via a semaphore and fail-fast on misconfiguration so it's never silent. `max_concurrent=None` stays compatible with every strategy (unbounded default). | | Redis lock token format | `{token_prefix}_{ms}_{secrets.token_hex(16)}` — always 32 hex chars, CSPRNG-sourced | `ioredis_${Date.now()}_${Math.random().toString(36).substring(2, 15)}` — base36, ≤13 chars, **not** CSPRNG | Interop via `IoRedisStateAdapter(token_prefix="ioredis")` still works for lock-release (release/extend compare by full-string equality, and each runtime only releases what it issued), but the token byte-shape diverges. Intentional — CSPRNG should not be regressed to `Math.random()` for cosmetic byte-for-byte compatibility. | +| `StreamingPlan.is_supported()` / `get_fallback_text()` | Raise `RuntimeError` to fail loudly if a generic posting path (e.g. `ChannelImpl.post`, `post_postable_object`) tries to consume a `StreamingPlan` as a normal `PostableObject` | Silently return `True` / `""` — `ChannelImpl.post` would route through `postPostableObject` and post an empty-string fallback | Prevents `StreamingPlan` being silently routed through non-stream-aware posting paths where upstream would post a blank message or attempt a wrong-shape `adapter.post_object("stream", ...)` call. Internal dispatch is guarded by the `kind == "stream"` short-circuit in `post_postable_object` / `Thread.post`; this also protects third-party code that duck-types PostableObjects. | ### Platform-specific gaps diff --git a/src/chat_sdk/__init__.py b/src/chat_sdk/__init__.py index b3ca66e..0e42337 100644 --- a/src/chat_sdk/__init__.py +++ b/src/chat_sdk/__init__.py @@ -97,6 +97,8 @@ PlanTaskStatus, PostableObjectContext, StartPlanOptions, + StreamingPlan, + StreamingPlanOptions, UpdateTaskInput, is_postable_object, post_postable_object, @@ -202,6 +204,8 @@ "AddTaskOptions", "UpdateTaskInput", "CompletePlanOptions", + "StreamingPlan", + "StreamingPlanOptions", "PostableObjectContext", "is_postable_object", "post_postable_object", diff --git a/src/chat_sdk/plan.py b/src/chat_sdk/plan.py index 798f799..6a8f422 100644 --- a/src/chat_sdk/plan.py +++ b/src/chat_sdk/plan.py @@ -1,9 +1,11 @@ """Plan implementation for chat-sdk. -Python port of Vercel Chat SDK plan.ts and postable-object.ts. -Provides the Plan class (a PostableObject that manages a task list), -and the ``post_postable_object`` helper used by Thread/Channel to post -any PostableObject. +Python port of Vercel Chat SDK plan.ts, streaming-plan.ts, and +postable-object.ts. Provides the Plan class (a PostableObject that +manages a task list), the ``StreamingPlan`` PostableObject that wraps +an async iterable with platform-specific streaming options, and the +``post_postable_object`` helper used by Thread/Channel to post any +PostableObject. """ from __future__ import annotations @@ -11,6 +13,7 @@ import asyncio import contextlib import uuid +from collections.abc import AsyncIterable from dataclasses import dataclass, field from typing import Any, Literal @@ -175,6 +178,20 @@ async def post_postable_object( Optional logger for error reporting. """ + # StreamingPlan (kind == "stream") is a nominal PostableObject that only + # Thread.post() knows how to consume (via its native-or-fallback streaming + # path). Reject it here so callers get a clear error instead of blank + # posts or a wrong-shape adapter.post_object("stream", ...) call. Diverges + # from upstream postable-object.ts, which posts ``getFallbackText() == ""`` + # as an empty message. + if getattr(obj, "kind", None) == "stream": + raise RuntimeError( + "StreamingPlan cannot be posted via post_postable_object / " + "Channel.post -- its stream is consumed only by Thread.post(), " + "which special-cases kind=='stream' for native or fallback " + "streaming. Use thread.post(streaming_plan) instead." + ) + def _make_context(raw: Any) -> PostableObjectContext: return PostableObjectContext( adapter=adapter, @@ -500,3 +517,133 @@ async def _absorb_for_chain() -> None: # ``chained`` preserves upstream semantics: exceptions from the # adapter edit propagate to the caller. await chained + + +# ============================================================================= +# StreamingPlan -- PostableObject that wraps an async iterable with options +# ============================================================================= + + +@dataclass +class StreamingPlanOptions: + """Options for a :class:`StreamingPlan`. + + Mirrors upstream ``StreamingPlanOptions`` (streaming-plan.ts). + Python uses snake_case at the public boundary while still accepting + ``group_tasks``/``end_with``/``update_interval_ms``. + + Attributes + ---------- + group_tasks: + Controls how ``task_update`` chunks are displayed (Slack only). + - ``"plan"`` -- all tasks grouped into a single plan block. + - ``"timeline"`` -- individual task cards shown inline (default). + end_with: + Block Kit elements to attach when the stream stops (Slack only). + Useful for adding feedback buttons after a streamed response. + update_interval_ms: + Minimum interval between updates in ms (default: 500). + Used for fallback mode (post + edit on adapters without native + streaming). + """ + + group_tasks: Literal["plan", "timeline"] | None = None + end_with: list[Any] | None = None + update_interval_ms: int | None = None + + +@dataclass +class _StreamingPlanData: + """Internal post-data payload exposed via ``get_post_data``.""" + + stream: AsyncIterable[Any] + options: StreamingPlanOptions + + +class StreamingPlan: + """A ``PostableObject`` wrapping an async iterable with streaming options. + + Use this when you need to pass options like task grouping or stop + blocks to the streaming API. For simple streaming without options, + pass the async iterable directly to :meth:`Thread.post`. + + Example:: + + stream = StreamingPlan( + result.full_stream, + StreamingPlanOptions(group_tasks="plan", end_with=[feedback_block]), + ) + await thread.post(stream) + """ + + kind: str = "stream" + + def __init__( + self, + stream: AsyncIterable[Any], + options: StreamingPlanOptions | None = None, + ) -> None: + self._stream = stream + self._options = options if options is not None else StreamingPlanOptions() + + @property + def stream(self) -> AsyncIterable[Any]: + """The wrapped async iterable of chunks.""" + return self._stream + + @property + def options(self) -> StreamingPlanOptions: + """The streaming options supplied at construction time.""" + return self._options + + # -- PostableObject protocol ------------------------------------------------ + # + # StreamingPlan is a "nominal" PostableObject: it satisfies the duck-typing + # protocol (so ``is_postable_object()`` detects it and ``Thread.post``'s + # ``kind == "stream"`` branch fires), but it cannot actually round-trip + # through the generic ``post_postable_object`` helper -- there is no static + # fallback text to post and no meaningful ``adapter.post_object("stream", + # ...)`` shape. + # + # Upstream TS has the same latent gap: ``ChannelImpl.post`` routes any + # PostableObject through ``postPostableObject``, which would post an empty + # string for StreamingPlan. We diverge by failing loudly rather than + # silently posting blanks, per CLAUDE.md adversarial-review discipline. + + def get_fallback_text(self) -> str: + """StreamingPlan has no static fallback text. + + Raises ``RuntimeError`` to fail loudly if a generic posting path + (e.g. ``Channel.post`` or ``post_postable_object``) tries to + consume a StreamingPlan as a normal PostableObject. StreamingPlan + must be posted via :meth:`Thread.post`, which special-cases + ``kind == "stream"`` and consumes the wrapped async iterable. + """ + raise RuntimeError( + "StreamingPlan cannot be posted via the generic PostableObject " + "path (no static fallback text). Post it with Thread.post(), " + "which routes kind=='stream' to native or fallback streaming." + ) + + def get_post_data(self) -> _StreamingPlanData: + """Return the underlying stream + options for Thread.post to route.""" + return _StreamingPlanData(stream=self._stream, options=self._options) + + def is_supported(self, _adapter: Adapter) -> bool: + """StreamingPlan is not generically postable -- see + :meth:`get_fallback_text`. + + Raises ``RuntimeError`` so misroutes through + ``post_postable_object`` fail loudly rather than silently trying + ``adapter.post_object("stream", ...)`` on adapters that don't + understand the shape. + """ + raise RuntimeError( + "StreamingPlan cannot be posted via the generic PostableObject " + "path. Post it with Thread.post(), which routes kind=='stream' " + "to native or fallback streaming." + ) + + def on_posted(self, _context: PostableObjectContext) -> None: + """Streams are one-shot, no lifecycle binding needed.""" + return None diff --git a/src/chat_sdk/thread.py b/src/chat_sdk/thread.py index 3494ca0..24eba7b 100644 --- a/src/chat_sdk/thread.py +++ b/src/chat_sdk/thread.py @@ -481,8 +481,34 @@ async def post( or a PostableObject (e.g. Plan). PostableObjects are returned directly after posting so the caller can continue to mutate them. """ - # Handle PostableObject (e.g. Plan) + # Handle PostableObject (e.g. Plan, StreamingPlan) if is_postable_object(message): + # StreamingPlan PostableObject -- route through streaming with + # options mapped to StreamOptions. Mirrors upstream thread.ts + # `if (message.kind === "stream")` branch. + if getattr(message, "kind", None) == "stream": + postable: Any = message + data = postable.get_post_data() + stream_iter = getattr(data, "stream", None) + plan_options = getattr(data, "options", None) + extra = StreamOptions() + if plan_options is not None: + group_tasks = getattr(plan_options, "group_tasks", None) + end_with = getattr(plan_options, "end_with", None) + update_interval_ms = getattr(plan_options, "update_interval_ms", None) + # Port Rule #1: use `is not None` so explicit falsy values + # (``end_with=[]``, ``update_interval_ms=0``) still + # propagate to the adapter/fallback instead of being + # silently dropped by a truthiness check. Diverges from + # upstream thread.ts, which has the same latent bug. + if group_tasks is not None: + extra.task_display_mode = group_tasks + if end_with is not None: + extra.stop_blocks = end_with + if update_interval_ms is not None: + extra.update_interval_ms = update_interval_ms + await self._handle_stream(stream_iter, extra_options=extra) + return message raw = await self._handle_postable_object(message) # Cache in history with the real message ID (upstream skips this, # but that's a gap — posted messages should appear in history). @@ -567,10 +593,20 @@ async def schedule( async def _handle_stream( self, raw_stream: Any, + *, + extra_options: StreamOptions | None = None, ) -> SentMessage: """Handle streaming from an AsyncIterable. Uses adapter's native streaming if available, otherwise falls back to post+edit. + + ``extra_options`` carries caller-supplied fields (e.g. from a + :class:`StreamingPlan`: ``task_display_mode``, ``stop_blocks``, + ``update_interval_ms``). They are merged on top of the + message-context defaults so the adapter and the fallback path see + them. Matches upstream thread.ts where ``StreamingPlan`` options + are built into ``StreamOptions`` before both ``adapter.stream`` and + ``fallbackStream`` are invoked. """ # Build text-only stream from raw_stream text_stream = _from_full_stream(raw_stream) @@ -583,6 +619,15 @@ async def _handle_stream( if isinstance(raw, dict): options.recipient_team_id = raw.get("team_id") or raw.get("team") + # Merge caller-supplied StreamingPlan options on top. Explicit fields win. + if extra_options is not None: + if extra_options.task_display_mode is not None: + options.task_display_mode = extra_options.task_display_mode + if extra_options.stop_blocks is not None: + options.stop_blocks = extra_options.stop_blocks + if extra_options.update_interval_ms is not None: + options.update_interval_ms = extra_options.update_interval_ms + # Use native streaming if adapter supports it if hasattr(self.adapter, "stream") and self.adapter.stream: # type: ignore[union-attr] accumulated = "" @@ -634,8 +679,13 @@ async def _fallback_stream( Posts an initial placeholder, then edits the message at intervals as new text arrives from the stream. """ + # ``is not None`` so explicit ``update_interval_ms=0`` (edit-on-every- + # chunk) from ``StreamingPlan`` is honored rather than silently reset + # to the thread default by a truthiness check. interval_ms = ( - options.update_interval_ms if options and options.update_interval_ms else self._streaming_update_interval_ms + options.update_interval_ms + if options is not None and options.update_interval_ms is not None + else self._streaming_update_interval_ms ) interval_s = interval_ms / 1000.0 placeholder_text = self._fallback_streaming_placeholder_text diff --git a/tests/test_thread_faithful.py b/tests/test_thread_faithful.py index 43cda40..91bd375 100644 --- a/tests/test_thread_faithful.py +++ b/tests/test_thread_faithful.py @@ -20,6 +20,7 @@ from chat_sdk.channel import derive_channel_id from chat_sdk.errors import ChatNotImplementedError +from chat_sdk.plan import StreamingPlan, StreamingPlanOptions from chat_sdk.shared.mock_adapter import MockLogger from chat_sdk.testing import ( MockAdapter, @@ -826,6 +827,263 @@ async def mock_stream(thread_id: str, text_stream: Any, options: Any = None) -> assert options.recipient_user_id == "U456" assert options.recipient_team_id == "T123" + # it("should pass StreamingPlan PostableObject options to adapter.stream") + @pytest.mark.asyncio + async def test_should_pass_streamingplan_postableobject_options_to_adapterstream(self): + adapter = create_mock_adapter() + state = create_mock_state() + + stream_call_args: list[Any] = [] + + async def mock_stream(thread_id: str, text_stream: Any, options: Any = None) -> RawMessage: + stream_call_args.append((thread_id, text_stream, options)) + async for _ in text_stream: + pass + return RawMessage(id="msg-stream", thread_id="t1", raw="Hello") + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello"]) + stream_msg = StreamingPlan( + text_stream, + StreamingPlanOptions( + group_tasks="plan", + end_with=[{"type": "actions"}], + update_interval_ms=1000, + ), + ) + await thread.post(stream_msg) + + assert len(stream_call_args) == 1 + thread_id, _iterable, options = stream_call_args[0] + assert thread_id == "slack:C123:1234.5678" + # Upstream maps groupTasks->taskDisplayMode, endWith->stopBlocks + assert options.task_display_mode == "plan" + assert options.stop_blocks == [{"type": "actions"}] + assert options.update_interval_ms == 1000 + + # it("should pass StreamingPlan with only groupTasks") + @pytest.mark.asyncio + async def test_should_pass_streamingplan_with_only_grouptasks(self): + adapter = create_mock_adapter() + state = create_mock_state() + + stream_call_args: list[Any] = [] + + async def mock_stream(thread_id: str, text_stream: Any, options: Any = None) -> RawMessage: + stream_call_args.append((thread_id, text_stream, options)) + async for _ in text_stream: + pass + return RawMessage(id="msg-stream", thread_id="t1", raw="Hello") + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello"]) + await thread.post(StreamingPlan(text_stream, StreamingPlanOptions(group_tasks="timeline"))) + + assert len(stream_call_args) == 1 + options = stream_call_args[0][2] + assert options.task_display_mode == "timeline" + # Omitted options must not be set + assert options.stop_blocks is None + + # it("should pass StreamingPlan with only endWith") + @pytest.mark.asyncio + async def test_should_pass_streamingplan_with_only_endwith(self): + adapter = create_mock_adapter() + state = create_mock_state() + + stream_call_args: list[Any] = [] + + async def mock_stream(thread_id: str, text_stream: Any, options: Any = None) -> RawMessage: + stream_call_args.append((thread_id, text_stream, options)) + async for _ in text_stream: + pass + return RawMessage(id="msg-stream", thread_id="t1", raw="Hello") + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello"]) + await thread.post(StreamingPlan(text_stream, StreamingPlanOptions(end_with=[{"type": "actions"}]))) + + assert len(stream_call_args) == 1 + options = stream_call_args[0][2] + assert options.stop_blocks == [{"type": "actions"}] + assert options.task_display_mode is None + + # it("should pass StreamingPlan with only updateIntervalMs") + @pytest.mark.asyncio + async def test_should_pass_streamingplan_with_only_updateintervalms(self): + adapter = create_mock_adapter() + state = create_mock_state() + + stream_call_args: list[Any] = [] + + async def mock_stream(thread_id: str, text_stream: Any, options: Any = None) -> RawMessage: + stream_call_args.append((thread_id, text_stream, options)) + async for _ in text_stream: + pass + return RawMessage(id="msg-stream", thread_id="t1", raw="Hello") + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello"]) + await thread.post(StreamingPlan(text_stream, StreamingPlanOptions(update_interval_ms=2000))) + + assert len(stream_call_args) == 1 + options = stream_call_args[0][2] + assert options.update_interval_ms == 2000 + assert options.task_display_mode is None + assert options.stop_blocks is None + + # it("should route StreamingPlan through fallback when adapter has no native streaming") + @pytest.mark.asyncio + async def test_should_route_streamingplan_through_fallback_when_adapter_has_no_native_streaming(self): + adapter = create_mock_adapter() + state = create_mock_state() + # Ensure no stream method + assert not hasattr(adapter, "stream") or getattr(adapter, "stream", None) is None + + thread = _make_thread(adapter, state) + + # Capture StreamOptions at the fallback entry point so we can prove + # that StreamingPlanOptions actually reach the fallback path -- this + # guards issue #56 against silent truthiness drops that would leave + # observable streaming behavior unchanged. + captured_options: list[Any] = [] + original_fallback = thread._fallback_stream + + async def _spy_fallback(text_stream: Any, options: Any = None) -> Any: + captured_options.append(options) + return await original_fallback(text_stream, options) + + thread._fallback_stream = _spy_fallback # type: ignore[method-assign] + + text_stream = _create_text_stream(["Hello", " ", "World"]) + await thread.post( + StreamingPlan( + text_stream, + StreamingPlanOptions( + group_tasks="plan", + end_with=[{"type": "actions"}], + update_interval_ms=2000, + ), + ) + ) + + # Should post initial placeholder and edit with final content + assert len(adapter._post_calls) >= 1 + assert adapter._post_calls[0] == ("slack:C123:1234.5678", "...") + assert len(adapter._edit_calls) >= 1 + last_edit = adapter._edit_calls[-1] + assert last_edit[0] == "slack:C123:1234.5678" + assert last_edit[1] == "msg-1" + assert isinstance(last_edit[2], PostableMarkdown) + assert last_edit[2].markdown == "Hello World" + + # All three StreamingPlanOptions fields must reach the fallback path. + # Before the truthiness->`is not None` fix, end_with=[] and + # update_interval_ms=0 would be dropped; asserting full propagation + # here catches both the mapping bug and any future regression that + # forgets to thread options through to _fallback_stream. + assert len(captured_options) == 1 + options = captured_options[0] + assert options is not None + assert options.task_display_mode == "plan" + assert options.stop_blocks == [{"type": "actions"}] + assert options.update_interval_ms == 2000 + + # Python-only regression lock: the truthiness fix in thread.py (post + # dispatcher + `_fallback_stream` interval guard) replaced `if x:` with + # `if x is not None:`. The propagation test above uses truthy values + # (`end_with=[{...}]`, `update_interval_ms=2000`), which would pass even + # under the old truthy check. Pin the specifically falsy-but-explicit + # values that motivated the fix so a regression to `if x:` resets them + # to `StreamOptions` defaults (None) and fails loudly here. + @pytest.mark.asyncio + async def test_streamingplan_falsy_options_still_propagate_to_fallback(self): + adapter = create_mock_adapter() + state = create_mock_state() + # Ensure no native stream — forces the fallback path. + assert not hasattr(adapter, "stream") or getattr(adapter, "stream", None) is None + + thread = _make_thread(adapter, state) + + captured_options: list[Any] = [] + original_fallback = thread._fallback_stream + + async def _spy_fallback(text_stream: Any, options: Any = None) -> Any: + captured_options.append(options) + return await original_fallback(text_stream, options) + + thread._fallback_stream = _spy_fallback # type: ignore[method-assign] + + text_stream = _create_text_stream(["Hello", " ", "World"]) + await thread.post( + StreamingPlan( + text_stream, + StreamingPlanOptions( + # Falsy-but-explicit: edit on every chunk. Under a truthy + # check (`if update_interval_ms:`), 0 is dropped and the + # default (500) wins — silently changing streaming + # cadence. `is not None` preserves the caller's choice. + update_interval_ms=0, + # Falsy-but-explicit: caller opts in to "no stop blocks". + # Under `if end_with:`, [] would be dropped and + # `StreamOptions.stop_blocks` would stay None, which is + # indistinguishable from the default. Lock in that the + # explicit empty list makes it through to StreamOptions. + end_with=[], + group_tasks="plan", + ), + ) + ) + + assert len(captured_options) == 1 + options = captured_options[0] + assert options is not None + # Falsy values explicitly set by the caller must survive the mapping. + assert options.update_interval_ms == 0, ( + "update_interval_ms=0 was dropped — likely a regression to " + "truthiness-based option mapping in thread.py post dispatcher." + ) + assert options.stop_blocks == [], ( + "end_with=[] was dropped — likely a regression to truthiness-" + "based option mapping in thread.py post dispatcher." + ) + # Sanity: truthy option still propagates alongside the falsy ones. + assert options.task_display_mode == "plan" + + # it("should still work without options (backward compat)") + @pytest.mark.asyncio + async def test_should_still_work_without_options_backward_compat(self): + adapter = create_mock_adapter() + state = create_mock_state() + + stream_call_args: list[Any] = [] + + async def mock_stream(thread_id: str, text_stream: Any, options: Any = None) -> RawMessage: + stream_call_args.append((thread_id, text_stream, options)) + async for _ in text_stream: + pass + return RawMessage(id="msg-stream", thread_id="t1", raw="Hello") + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello"]) + # Plain async iterable — no StreamingPlan wrapper. + await thread.post(text_stream) + + assert len(stream_call_args) == 1 + options = stream_call_args[0][2] + assert options.task_display_mode is None + assert options.stop_blocks is None + # =========================================================================== # Fallback streaming error logging