Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ Parity catch-up with upstream `4.26.0`. No upstream version change.
- Ported the 4 `[getParticipants]` tests from `thread.test.ts` and the 4
`[thread]` factory tests from `chat.test.ts` (existing-behavior coverage
for `Chat.thread(id)`). Closes 8 fidelity gaps.
- Ported 19 `[post with Plan]` tests from `thread.test.ts` — closes #55.

### Fixes (parity with upstream Plan semantics)

- **`Plan.update_task(input)` / `StreamingPlan.update_task(input)` now honor `input.id`** — previously only worked on the last in-progress task; with `id` set, targets that specific task and returns `None` for unknown IDs. Matches upstream `UpdateTaskInput` semantics.
- **`Plan.add_task()` / `update_task()` now propagate `adapter.edit_object` errors** — previously swallowed and logged; upstream returns the chained promise so callers see failures.
- **Plan edit queue is now actually sequential under concurrency** — previously racy under `asyncio.gather`; rewrote `_enqueue_edit` to build the chain synchronously before awaiting, matching upstream TS's `.then`-based chain. Fixes out-of-order edits when multiple `add_task`/`update_task` calls interleave.

### Test hygiene

Expand Down
121 changes: 93 additions & 28 deletions src/chat_sdk/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import asyncio
import contextlib
import uuid
from dataclasses import dataclass, field
from typing import Any, Literal
Expand Down Expand Up @@ -72,8 +73,17 @@ class AddTaskOptions:

@dataclass
class UpdateTaskInput:
"""Structured update input with optional output and status override."""
"""Structured update input targeting a task by ``id`` (or the last
in-progress task when ``id`` is omitted) with optional output and
status override.

Mirrors upstream ``UpdateTaskInput`` shape (`plan.ts`):
``{ id?: string; output?: PlanContent; status?: PlanTaskStatus }``.
When ``id`` is set but no matching task exists, ``update_task``
returns ``None`` (matching upstream).
"""

id: str | None = None
output: PlanContent | None = None
status: PlanTaskStatus | None = None

Expand Down Expand Up @@ -194,7 +204,10 @@ class _BoundState:
message_id: str
thread_id: str
logger: Logger | None = None
update_chain: asyncio.Future[None] | None = None
# Tail of the synchronously-built edit chain. Each ``_enqueue_edit``
# reads this, chains a new task after it, and assigns the new tail
# — all before yielding — so concurrent callers see FIFO ordering.
update_chain: asyncio.Task[None] | None = None


# =============================================================================
Expand Down Expand Up @@ -329,24 +342,38 @@ async def add_task(self, options: AddTaskOptions) -> PlanTask | None:
return PlanTask(id=next_task.id, title=next_task.title, status=next_task.status)

async def update_task(self, update: PlanContent | UpdateTaskInput | None = None) -> PlanTask | None:
"""Update the current in-progress task.
"""Update a task on this plan.

``update`` can be:
- ``PlanContent`` (str, list, dict) -- sets the task output
- ``UpdateTaskInput`` -- sets output and/or status
- ``None`` -- just triggers a re-render
- ``PlanContent`` (str, list, dict) -- sets the output on the last
in-progress task (falling back to the last task).
- ``UpdateTaskInput`` -- sets output and/or status. When
``update.id`` is set, targets that specific task and returns
``None`` if no task matches. When ``id`` is omitted, behaves
like the PlanContent path (last in-progress task).
- ``None`` -- just triggers a re-render of the current state.
"""
if not self._can_mutate():
return None
current: PlanModelTask | None = None
for t in reversed(self._model.tasks):
if t.status == "in_progress":
current = t
break
if current is None and self._model.tasks:
current = self._model.tasks[-1]
if current is None:
return None
if isinstance(update, UpdateTaskInput) and update.id is not None:
for t in self._model.tasks:
if t.id == update.id:
current = t
break
# Upstream returns null for a non-existent id rather than
# silently falling back to "last in-progress".
if current is None:
return None
else:
for t in reversed(self._model.tasks):
if t.status == "in_progress":
current = t
break
if current is None and self._model.tasks:
current = self._model.tasks[-1]
if current is None:
return None

if update is not None:
if isinstance(update, UpdateTaskInput):
Expand Down Expand Up @@ -396,12 +423,27 @@ def _can_mutate(self) -> bool:
async def _enqueue_edit(self) -> None:
"""Edit the posted message with the current plan state.

Chains edits sequentially to avoid race conditions.
Chains edits sequentially to avoid race conditions. Mirrors the
upstream TS pattern (`plan.ts`):

```ts
const chained = bound.updateChain.then(doEdit, doEdit);
bound.updateChain = chained.then(() => undefined, (err) => log);
return chained;
```

Crucially, the new chain tail (``update_chain``) is registered
**synchronously** — before any ``await`` — so that concurrent
callers racing through ``asyncio.gather`` observe a strict FIFO
ordering. Errors from the adapter edit propagate to the caller
via the returned awaitable (``chained``); the internal chain
absorbs them so the next enqueued edit still runs.
"""
if self._bound is None:
return

bound = self._bound
prev = bound.update_chain # synchronous read — must not await first

async def _do_edit() -> None:
if bound.fallback:
Expand All @@ -421,17 +463,40 @@ async def _do_edit() -> None:
self._model,
)

# Chain edits: wait for previous edit to finish before starting new one
if bound.update_chain is not None:
async def _run_after_prev() -> None:
if prev is not None:
# Upstream ``.then(doEdit, doEdit)`` runs doEdit whether
# the previous edit resolved or rejected; mirror that by
# absorbing any exception from the previous step here.
# (Note: the internal chain tail absorbs errors anyway,
# so in practice ``await prev`` only raises if someone
# swapped the chain out with a rejecting future — the
# suppression keeps the parity guarantee defensive.)
with contextlib.suppress(Exception):
await prev
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False positive — await prev under contextlib.suppress(...) has real side effects (drives the previous coroutine to completion).

await _do_edit()

loop = asyncio.get_running_loop()
chained = loop.create_task(_run_after_prev())

async def _absorb_for_chain() -> None:
# The internal chain tail must not propagate errors — otherwise
# the next enqueued edit would await a rejected future and be
# treated as a previous-failure chain that still runs doEdit,
# but we'd also lose the ability to recover cleanly. Upstream
# uses ``chained.then(() => undefined, (err) => logger.warn)``.
#
# Catch ``Exception`` (not ``BaseException``) so that
# ``asyncio.CancelledError``, ``KeyboardInterrupt``, and
# ``SystemExit`` propagate — only regular failures need to be
# absorbed here so the next enqueued edit can still run.
try:
await bound.update_chain
except Exception as prev_exc:
if bound.logger:
bound.logger.warn("Previous plan edit failed", prev_exc)

try:
bound.update_chain = asyncio.get_running_loop().create_task(_do_edit())
await bound.update_chain
except Exception as exc:
if bound.logger:
bound.logger.warn("Failed to edit plan", exc)
await chained
except Exception as exc: # noqa: BLE001 — log and swallow for queue
if bound.logger is not None:
bound.logger.warn("Failed to edit plan", exc)

bound.update_chain = loop.create_task(_absorb_for_chain())
# ``chained`` preserves upstream semantics: exceptions from the
# adapter edit propagate to the caller.
await chained
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False positive — same as #502 above; await chained has real effect.

23 changes: 13 additions & 10 deletions tests/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,13 @@ def error(self, message: str, *args: Any) -> None:

class TestEditErrorPath:
@pytest.mark.asyncio
async def test_edit_failure_is_logged_and_plan_continues(self) -> None:
"""When adapter.edit_message raises, the error is logged and
the next mutation still fires successfully."""
async def test_edit_failure_propagates_and_plan_continues(self) -> None:
"""When ``adapter.edit_message`` raises, the caller sees the
exception (mirroring upstream TS ``enqueueEdit``), the failure
is logged by the internal chain tail, and the next mutation
still fires successfully without the previous rejection
poisoning the queue.
"""
adapter = _FailingEditAdapter()
logger = _SpyLogger()
thread = _make_thread(adapter=adapter)
Expand All @@ -556,20 +560,19 @@ async def test_edit_failure_is_logged_and_plan_continues(self) -> None:
assert plan._bound is not None
plan._bound.logger = logger

# First mutation: edit will fail
# First mutation: edit will fail — caller observes the error.
adapter.fail_edit = True
await plan.add_task(AddTaskOptions(title="Step 2"))
with pytest.raises(RuntimeError, match="simulated edit failure"):
await plan.add_task(AddTaskOptions(title="Step 2"))

# The error should have been logged
# The internal chain absorbs the error and logs it so the queue
# is not poisoned for the next edit.
assert any("Failed to edit plan" in w[0] for w in logger.warnings)

# Second mutation: edit succeeds -- plan is still usable
# Second mutation: edit succeeds -- plan is still usable.
adapter.fail_edit = False
logger.warnings.clear()
task = await plan.add_task(AddTaskOptions(title="Step 3"))
assert task is not None
assert task.title == "Step 3"
assert len(plan.tasks) == 3

# The previous-chain failure should also be logged
assert any("Previous plan edit failed" in w[0] for w in logger.warnings)
Loading
Loading