Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Parity catch-up with upstream `4.26.0`. No upstream version change.
- **`RedisStateAdapter(token_prefix=...)`**: new `token_prefix` kwarg
(default `"redis"`). Parameterizes the lock-token prefix for observability
and interop.
- **`StreamingPlan` / `StreamingPlanOptions`** (`chat_sdk.plan`): a
`PostableObject` wrapping an async iterable with platform-specific
streaming options (`group_tasks`, `end_with`, `update_interval_ms`).
Mirrors upstream `streaming-plan.ts`. Issue #56.

### Upstream parity

Expand All @@ -57,12 +61,22 @@ Parity catch-up with upstream `4.26.0`. No upstream version change.
`[thread]` factory tests from `chat.test.ts` (existing-behavior coverage
for `Chat.thread(id)`). Closes 8 fidelity gaps.
- Ported 19 `[post with Plan]` tests from `thread.test.ts` — closes #55.
- Ported 6 `[Streaming]` StreamingPlan option-variant tests from upstream
`thread.test.ts` — closes #56.

### Fixes (parity with upstream Plan semantics)
### Fixes

- **`Plan.update_task(input)` / `StreamingPlan.update_task(input)` now honor `input.id`** — previously only worked on the last in-progress task; with `id` set, targets that specific task and returns `None` for unknown IDs. Matches upstream `UpdateTaskInput` semantics.
- **`Plan.add_task()` / `update_task()` now propagate `adapter.edit_object` errors** — previously swallowed and logged; upstream returns the chained promise so callers see failures.
- **Plan edit queue is now actually sequential under concurrency** — previously racy under `asyncio.gather`; rewrote `_enqueue_edit` to build the chain synchronously before awaiting, matching upstream TS's `.then`-based chain. Fixes out-of-order edits when multiple `add_task`/`update_task` calls interleave.
- **`StreamingPlan` options now wired through `Thread.post()`** — the Python
port was missing the `StreamingPlan` class entirely, so `group_tasks` /
`end_with` / `update_interval_ms` were silently dropped (a plain async
iterable was the only way to stream, and options went nowhere). Upstream
already had the `kind === "stream"` branch that maps
`groupTasks → taskDisplayMode`, `endWith → stopBlocks`, and
`updateIntervalMs → updateIntervalMs` onto `StreamOptions` before invoking
`adapter.stream(...)` or the fallback `post+edit` path. Issue #56.

### Test hygiene

Expand Down
1 change: 1 addition & 0 deletions docs/UPSTREAM_SYNC.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ stay explicit instead of being rediscovered in code review.
| `SlackAdapter.current_token` / `current_client` | Public `@property` accessors that return the request-context-bound token and a preconfigured `AsyncWebClient` | Not exposed (`getToken()` is private on the TS `SlackAdapter`) | Python-only addition (issue #47). Downstream code that calls Slack Web APIs from inside a handler — email resolution, user profile fetches, reaction bookkeeping — otherwise depends on underscore-prefixed helpers. |
| `ConcurrencyConfig.max_concurrent` | Enforced via `asyncio.Semaphore` in the `"concurrent"` strategy path; rejects non-integer or `<= 0` values, and rejects any non-`None` `max_concurrent` paired with a non-`"concurrent"` strategy | Accepted into the config type with docstring "Default: Infinity" but never read (3 writes, 0 reads) | Silent correctness bug upstream — consumers setting `max_concurrent=N` with `strategy="concurrent"` reasonably expect an N-way bound on in-flight handlers. We honor the documented contract via a semaphore and fail-fast on misconfiguration so it's never silent. `max_concurrent=None` stays compatible with every strategy (unbounded default). |
| Redis lock token format | `{token_prefix}_{ms}_{secrets.token_hex(16)}` — always 32 hex chars, CSPRNG-sourced | `ioredis_${Date.now()}_${Math.random().toString(36).substring(2, 15)}` — base36, ≤13 chars, **not** CSPRNG | Interop via `IoRedisStateAdapter(token_prefix="ioredis")` still works for lock-release (release/extend compare by full-string equality, and each runtime only releases what it issued), but the token byte-shape diverges. Intentional — CSPRNG should not be regressed to `Math.random()` for cosmetic byte-for-byte compatibility. |
| `StreamingPlan.is_supported()` / `get_fallback_text()` | Raise `RuntimeError` to fail loudly if a generic posting path (e.g. `ChannelImpl.post`, `post_postable_object`) tries to consume a `StreamingPlan` as a normal `PostableObject` | Silently return `True` / `""` — `ChannelImpl.post` would route through `postPostableObject` and post an empty-string fallback | Prevents `StreamingPlan` being silently routed through non-stream-aware posting paths where upstream would post a blank message or attempt a wrong-shape `adapter.post_object("stream", ...)` call. Internal dispatch is guarded by the `kind == "stream"` short-circuit in `post_postable_object` / `Thread.post`; this also protects third-party code that duck-types PostableObjects. |

### Platform-specific gaps

Expand Down
4 changes: 4 additions & 0 deletions src/chat_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@
PlanTaskStatus,
PostableObjectContext,
StartPlanOptions,
StreamingPlan,
StreamingPlanOptions,
UpdateTaskInput,
is_postable_object,
post_postable_object,
Expand Down Expand Up @@ -202,6 +204,8 @@
"AddTaskOptions",
"UpdateTaskInput",
"CompletePlanOptions",
"StreamingPlan",
"StreamingPlanOptions",
"PostableObjectContext",
"is_postable_object",
"post_postable_object",
Expand Down
155 changes: 151 additions & 4 deletions src/chat_sdk/plan.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""Plan implementation for chat-sdk.

Python port of Vercel Chat SDK plan.ts and postable-object.ts.
Provides the Plan class (a PostableObject that manages a task list),
and the ``post_postable_object`` helper used by Thread/Channel to post
any PostableObject.
Python port of Vercel Chat SDK plan.ts, streaming-plan.ts, and
postable-object.ts. Provides the Plan class (a PostableObject that
manages a task list), the ``StreamingPlan`` PostableObject that wraps
an async iterable with platform-specific streaming options, and the
``post_postable_object`` helper used by Thread/Channel to post any
PostableObject.
"""

from __future__ import annotations

import asyncio
import contextlib
import uuid
from collections.abc import AsyncIterable
from dataclasses import dataclass, field
from typing import Any, Literal

Expand Down Expand Up @@ -175,6 +178,20 @@ async def post_postable_object(
Optional logger for error reporting.
"""

# StreamingPlan (kind == "stream") is a nominal PostableObject that only
# Thread.post() knows how to consume (via its native-or-fallback streaming
# path). Reject it here so callers get a clear error instead of blank
# posts or a wrong-shape adapter.post_object("stream", ...) call. Diverges
# from upstream postable-object.ts, which posts ``getFallbackText() == ""``
# as an empty message.
if getattr(obj, "kind", None) == "stream":
raise RuntimeError(
"StreamingPlan cannot be posted via post_postable_object / "
"Channel.post -- its stream is consumed only by Thread.post(), "
"which special-cases kind=='stream' for native or fallback "
"streaming. Use thread.post(streaming_plan) instead."
)

def _make_context(raw: Any) -> PostableObjectContext:
return PostableObjectContext(
adapter=adapter,
Expand Down Expand Up @@ -500,3 +517,133 @@ async def _absorb_for_chain() -> None:
# ``chained`` preserves upstream semantics: exceptions from the
# adapter edit propagate to the caller.
await chained


# =============================================================================
# StreamingPlan -- PostableObject that wraps an async iterable with options
# =============================================================================


@dataclass
class StreamingPlanOptions:
"""Options for a :class:`StreamingPlan`.

Mirrors upstream ``StreamingPlanOptions`` (streaming-plan.ts).
Python uses snake_case at the public boundary while still accepting
``group_tasks``/``end_with``/``update_interval_ms``.

Attributes
----------
group_tasks:
Controls how ``task_update`` chunks are displayed (Slack only).
- ``"plan"`` -- all tasks grouped into a single plan block.
- ``"timeline"`` -- individual task cards shown inline (default).
end_with:
Block Kit elements to attach when the stream stops (Slack only).
Useful for adding feedback buttons after a streamed response.
update_interval_ms:
Minimum interval between updates in ms (default: 500).
Used for fallback mode (post + edit on adapters without native
streaming).
"""

group_tasks: Literal["plan", "timeline"] | None = None
end_with: list[Any] | None = None
update_interval_ms: int | None = None


@dataclass
class _StreamingPlanData:
"""Internal post-data payload exposed via ``get_post_data``."""

stream: AsyncIterable[Any]
options: StreamingPlanOptions


class StreamingPlan:
"""A ``PostableObject`` wrapping an async iterable with streaming options.

Use this when you need to pass options like task grouping or stop
blocks to the streaming API. For simple streaming without options,
pass the async iterable directly to :meth:`Thread.post`.

Example::

stream = StreamingPlan(
result.full_stream,
StreamingPlanOptions(group_tasks="plan", end_with=[feedback_block]),
)
await thread.post(stream)
"""

kind: str = "stream"

def __init__(
self,
stream: AsyncIterable[Any],
options: StreamingPlanOptions | None = None,
) -> None:
self._stream = stream
self._options = options if options is not None else StreamingPlanOptions()

@property
def stream(self) -> AsyncIterable[Any]:
"""The wrapped async iterable of chunks."""
return self._stream

@property
def options(self) -> StreamingPlanOptions:
"""The streaming options supplied at construction time."""
return self._options

# -- PostableObject protocol ------------------------------------------------
#
# StreamingPlan is a "nominal" PostableObject: it satisfies the duck-typing
# protocol (so ``is_postable_object()`` detects it and ``Thread.post``'s
# ``kind == "stream"`` branch fires), but it cannot actually round-trip
# through the generic ``post_postable_object`` helper -- there is no static
# fallback text to post and no meaningful ``adapter.post_object("stream",
# ...)`` shape.
#
# Upstream TS has the same latent gap: ``ChannelImpl.post`` routes any
# PostableObject through ``postPostableObject``, which would post an empty
# string for StreamingPlan. We diverge by failing loudly rather than
# silently posting blanks, per CLAUDE.md adversarial-review discipline.

def get_fallback_text(self) -> str:
"""StreamingPlan has no static fallback text.

Raises ``RuntimeError`` to fail loudly if a generic posting path
(e.g. ``Channel.post`` or ``post_postable_object``) tries to
consume a StreamingPlan as a normal PostableObject. StreamingPlan
must be posted via :meth:`Thread.post`, which special-cases
``kind == "stream"`` and consumes the wrapped async iterable.
"""
raise RuntimeError(
"StreamingPlan cannot be posted via the generic PostableObject "
"path (no static fallback text). Post it with Thread.post(), "
"which routes kind=='stream' to native or fallback streaming."
)

def get_post_data(self) -> _StreamingPlanData:
"""Return the underlying stream + options for Thread.post to route."""
return _StreamingPlanData(stream=self._stream, options=self._options)

def is_supported(self, _adapter: Adapter) -> bool:
"""StreamingPlan is not generically postable -- see
:meth:`get_fallback_text`.

Raises ``RuntimeError`` so misroutes through
``post_postable_object`` fail loudly rather than silently trying
``adapter.post_object("stream", ...)`` on adapters that don't
understand the shape.
"""
raise RuntimeError(
"StreamingPlan cannot be posted via the generic PostableObject "
"path. Post it with Thread.post(), which routes kind=='stream' "
"to native or fallback streaming."
)

def on_posted(self, _context: PostableObjectContext) -> None:
"""Streams are one-shot, no lifecycle binding needed."""
return None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
54 changes: 52 additions & 2 deletions src/chat_sdk/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,34 @@ async def post(
or a PostableObject (e.g. Plan). PostableObjects are returned directly
after posting so the caller can continue to mutate them.
"""
# Handle PostableObject (e.g. Plan)
# Handle PostableObject (e.g. Plan, StreamingPlan)
if is_postable_object(message):
# StreamingPlan PostableObject -- route through streaming with
# options mapped to StreamOptions. Mirrors upstream thread.ts
# `if (message.kind === "stream")` branch.
if getattr(message, "kind", None) == "stream":
postable: Any = message
data = postable.get_post_data()
stream_iter = getattr(data, "stream", None)
plan_options = getattr(data, "options", None)
extra = StreamOptions()
if plan_options is not None:
group_tasks = getattr(plan_options, "group_tasks", None)
end_with = getattr(plan_options, "end_with", None)
update_interval_ms = getattr(plan_options, "update_interval_ms", None)
# Port Rule #1: use `is not None` so explicit falsy values
# (``end_with=[]``, ``update_interval_ms=0``) still
# propagate to the adapter/fallback instead of being
# silently dropped by a truthiness check. Diverges from
# upstream thread.ts, which has the same latent bug.
if group_tasks is not None:
extra.task_display_mode = group_tasks
if end_with is not None:
extra.stop_blocks = end_with
if update_interval_ms is not None:
extra.update_interval_ms = update_interval_ms
Comment thread
coderabbitai[bot] marked this conversation as resolved.
await self._handle_stream(stream_iter, extra_options=extra)
return message
raw = await self._handle_postable_object(message)
# Cache in history with the real message ID (upstream skips this,
# but that's a gap — posted messages should appear in history).
Expand Down Expand Up @@ -567,10 +593,20 @@ async def schedule(
async def _handle_stream(
self,
raw_stream: Any,
*,
extra_options: StreamOptions | None = None,
) -> SentMessage:
"""Handle streaming from an AsyncIterable.

Uses adapter's native streaming if available, otherwise falls back to post+edit.

``extra_options`` carries caller-supplied fields (e.g. from a
:class:`StreamingPlan`: ``task_display_mode``, ``stop_blocks``,
``update_interval_ms``). They are merged on top of the
message-context defaults so the adapter and the fallback path see
them. Matches upstream thread.ts where ``StreamingPlan`` options
are built into ``StreamOptions`` before both ``adapter.stream`` and
``fallbackStream`` are invoked.
"""
# Build text-only stream from raw_stream
text_stream = _from_full_stream(raw_stream)
Expand All @@ -583,6 +619,15 @@ async def _handle_stream(
if isinstance(raw, dict):
options.recipient_team_id = raw.get("team_id") or raw.get("team")

# Merge caller-supplied StreamingPlan options on top. Explicit fields win.
if extra_options is not None:
if extra_options.task_display_mode is not None:
options.task_display_mode = extra_options.task_display_mode
if extra_options.stop_blocks is not None:
options.stop_blocks = extra_options.stop_blocks
if extra_options.update_interval_ms is not None:
options.update_interval_ms = extra_options.update_interval_ms

# Use native streaming if adapter supports it
if hasattr(self.adapter, "stream") and self.adapter.stream: # type: ignore[union-attr]
accumulated = ""
Expand Down Expand Up @@ -634,8 +679,13 @@ async def _fallback_stream(
Posts an initial placeholder, then edits the message at intervals as
new text arrives from the stream.
"""
# ``is not None`` so explicit ``update_interval_ms=0`` (edit-on-every-
# chunk) from ``StreamingPlan`` is honored rather than silently reset
# to the thread default by a truthiness check.
interval_ms = (
options.update_interval_ms if options and options.update_interval_ms else self._streaming_update_interval_ms
options.update_interval_ms
if options is not None and options.update_interval_ms is not None
else self._streaming_update_interval_ms
)
interval_s = interval_ms / 1000.0
placeholder_text = self._fallback_streaming_placeholder_text
Expand Down
Loading
Loading