Skip to content

fix(ai-chat): serialize chat turns and saveMessages#1142

Merged
threepointone merged 3 commits into
mainfrom
fix/ai-chat-stream-serialization
Mar 22, 2026
Merged

fix(ai-chat): serialize chat turns and saveMessages#1142
threepointone merged 3 commits into
mainfrom
fix/ai-chat-stream-serialization

Conversation

@whoiskatrin
Copy link
Copy Markdown
Contributor

@whoiskatrin whoiskatrin commented Mar 20, 2026

Fixes #1110

Summary

  • serialize onChatMessage() + _reply() behind a private turn queue so websocket requests, tool auto-continuations, and saveMessages() cannot stream concurrently
  • add protected isChatTurnActive(), waitForIdle(), abortActiveTurn(), hasPendingInteraction(), and waitForPendingInteractionResolution() helpers for subclass code that needs to inspect active turns and defer injected work until pending tool interactions resolve
  • skip queued continuations and saveMessages() calls that were enqueued before a chat clear (epoch guard)
  • capture saveMessages() context (_lastClientTools, _lastBody) at enqueue time so a later request cannot overwrite it before execution
  • fix _queueAutoContinuation to use the caller-provided clientTools directly instead of re-reading this._lastClientTools at execution time
  • enqueue tool-result and tool-approval continuations synchronously (with the apply promise as a prerequisite) instead of from a .then() microtask, closing a race where waitForIdle() could resolve before the continuation was queued
  • prevent unhandled promise rejections from _applyToolResult / _applyToolApproval
  • add regression coverage for queued websocket turns, saveMessages(), abort behavior, continuation body isolation, pending-interaction coordination, clear-during-active-turn, and clear-during-queued-saveMessages

Design Notes

  • AIChatAgent already assumes a single active resumable stream, so this PR keeps that model and serializes the higher-level chat turn (onChatMessage() + _reply()) instead of introducing concurrent stream support.
  • I intentionally did not expose _streamCompletionPromise: it resolved too early for this use case because it did not guarantee final assistant-message persistence.
  • Auto-continuations now capture request body context at enqueue time so a later request cannot overwrite continuation state before it runs.
  • A _chatEpoch counter is incremented on chat clear. Queued turns check the epoch before executing and skip if it changed, preventing stale continuations from writing into a cleared conversation. The counter is in-memory only — it doesn't need to survive hibernation because the queue itself (_chatTurnQueue) is in-memory and resets on wake.
  • Pending-interaction helpers are additive and opt-in. saveMessages() itself still only solves turn serialization; callers that need Claude Code-style queued/scheduled behavior can combine waitForIdle() with waitForPendingInteractionResolution() before injecting a synthetic message.

Behavior Changes

  • saveMessages() now waits for any active turn, runs its own turn exclusively, and resolves only after the reply finishes.
  • WebSocket requests, tool-result continuations, and tool-approval continuations now execute in order instead of overlapping.
  • Chat clear now skips any queued continuations and saveMessages() calls that were enqueued before the clear.
  • New protected helpers (isChatTurnActive(), waitForIdle(), abortActiveTurn()) support turn coordination.
  • New protected helpers (hasPendingInteraction(), waitForPendingInteractionResolution()) let subclasses detect unresolved client-tool input/approval state before starting a follow-up turn.

Non-Goals / Limitations

  • This PR does not add support for multiple concurrent active streams.
  • abortActiveTurn() aborts the active request or stream, but it does not interrupt unrelated pre-stream setup such as waitForMcpConnections().
  • Pending-interaction waiting is explicit and opt-in; saveMessages() does not block on user interactions by default.

Review Guide

  • packages/ai-chat/src/index.ts: queueing model, turn helpers, epoch guard, and pending-interaction helpers
  • packages/ai-chat/src/tests/chat-turn-serialization.test.ts: websocket/saveMessages/abort ordering, clear-during-turn, tool-continuation coverage
  • packages/ai-chat/src/tests/custom-body-continuation.test.ts: continuation context isolation
  • packages/ai-chat/src/tests/pending-interaction.test.ts: pending interaction detection and waiting

Testing

  • npm install
  • npm run build
  • npm run test:workers --workspace @cloudflare/ai-chat -- src/tests/chat-turn-serialization.test.ts src/tests/custom-body-continuation.test.ts src/tests/pending-interaction.test.ts
  • npm run check

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Mar 20, 2026

🦋 Changeset detected

Latest commit: 78332f2

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
@cloudflare/ai-chat Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented Mar 20, 2026

Open in StackBlitz

agents

npm i https://pkg.pr.new/agents@1142

@cloudflare/ai-chat

npm i https://pkg.pr.new/@cloudflare/ai-chat@1142

@cloudflare/codemode

npm i https://pkg.pr.new/@cloudflare/codemode@1142

hono-agents

npm i https://pkg.pr.new/hono-agents@1142

@cloudflare/shell

npm i https://pkg.pr.new/@cloudflare/shell@1142

@cloudflare/think

npm i https://pkg.pr.new/@cloudflare/think@1142

@cloudflare/voice

npm i https://pkg.pr.new/@cloudflare/voice@1142

@cloudflare/worker-bundler

npm i https://pkg.pr.new/@cloudflare/worker-bundler@1142

commit: 78332f2

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 5 additional findings.

Open in Devin Review

@whoiskatrin whoiskatrin marked this pull request as ready for review March 20, 2026 19:45
whoiskatrin and others added 3 commits March 22, 2026 10:30
Introduce a monotonic _chatEpoch (incremented on CF_AGENT_CHAT_CLEAR) so queued continuations or save operations enqueued under an older epoch are skipped after a chat clear. Wire apply/approval promises as prerequisites for auto-continuations (catching rejections) so continuations only run if the tool result/approval was applied, and check epoch before running exclusive chat turns. Snap clientTools/body in saveMessages and avoid running them if the epoch changed. Minor cleanup: remove an unnecessary _getAbortSignal call in cancel flow. Add tests and test helpers (isChatTurnActiveForTest, waitForIdleForTest, persistToolCallMessage, getMessageCount) to cover tool-result continuations, chat-clear skipping, and saveMessages behavior; replace some fixed delays with waitForIdleForTest.
@threepointone threepointone force-pushed the fix/ai-chat-stream-serialization branch from 10f6a03 to 78332f2 Compare March 22, 2026 10:55
Copy link
Copy Markdown
Contributor

@threepointone threepointone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. added some checks for an edge cases and added some tests. landing after ci passes.

@threepointone threepointone merged commit 5651ece into main Mar 22, 2026
2 checks passed
@threepointone threepointone deleted the fix/ai-chat-stream-serialization branch March 22, 2026 11:01
@github-actions github-actions Bot mentioned this pull request Mar 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

No public API to detect or wait for active streams — needed for schedule() + saveMessages() safety

2 participants