-
-
Notifications
You must be signed in to change notification settings - Fork 130
feat(ai, ai-client): add SessionAdapter for durable session support. #286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Widen TextMessageStartEvent.role to accept all message roles and add optional parentMessageId to ToolCallStartEvent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace single-message instance variables with a Map<string, MessageStreamState> keyed by messageId. Add explicit handlers for TEXT_MESSAGE_START, TEXT_MESSAGE_END, and STATE_SNAPSHOT events. Route tool calls via toolCallToMessage mapping. Maintains backward compat: startAssistantMessage() sets pendingManualMessageId which TEXT_MESSAGE_START associates with. ensureAssistantMessage() auto-creates state for streams without TEXT_MESSAGE_START. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add MessagesSnapshotEvent as a first-class AG-UI event type for conversation hydration. Replace the previous STATE_SNAPSHOT handler (which extracted messages from arbitrary state) with a dedicated MESSAGES_SNAPSHOT handler that accepts a typed messages array. - Add MessagesSnapshotEvent type to AGUIEventType and AGUIEvent unions - Add MESSAGES_SNAPSHOT case in StreamProcessor.processChunk() - Remove STATE_SNAPSHOT handler (falls through to default no-op) - Fix onStreamEnd to fire per-message (not only when no active messages remain) - Fix getActiveAssistantMessageId to return on first reverse match - Fix ensureAssistantMessage to emit onStreamStart and onMessagesChange - Add proposal docs for resumeable session support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…on model Replace direct ConnectionAdapter usage in ChatClient with a SessionAdapter-based subscription loop. When only a ConnectionAdapter is provided, it is wrapped in a DefaultSessionAdapter internally. This enables persistent session support while preserving existing timing semantics and backwards compatibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…unter reload() now cancels the active stream (abort controllers, subscription, processing promise) before starting a new one. A stream generation counter prevents a superseded stream's async cleanup from clobbering the new stream's state (abortController, isLoading, processor). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Guard against double onStreamEnd when RUN_FINISHED arrives before TEXT_MESSAGE_END - Clear dead waiters on subscribe exit to prevent chunk loss on reconnection - Reset transient processor state (messageStates, activeMessageIds, etc.) on MESSAGES_SNAPSHOT - Remove optimistic startAssistantMessage() from streamResponse(); let stream events create the message naturally via TEXT_MESSAGE_START or ensureAssistantMessage() - Clean up abort listeners on normal waiter resolution to prevent listener accumulation - Make handleStepFinishedEvent use ensureAssistantMessage() for backward compat with streams that lack TEXT_MESSAGE_START Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…r race Reset processor stream state (prepareAssistantMessage) in streamResponse() before the subscription loop, preventing stale messageStates from blocking new assistant message creation on reload. Rewrite createDefaultSession with per-subscribe queue isolation: each subscribe() synchronously installs fresh buffer/waiters, drains pre-buffered chunks via splice(0), and removes async cleanup that raced with new subscription cycles. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
View your CI Pipeline Execution ↗ for commit 64f5517
☁️ Nx Cloud last updated this comment at |
@tanstack/ai
@tanstack/ai-anthropic
@tanstack/ai-client
@tanstack/ai-devtools-core
@tanstack/ai-gemini
@tanstack/ai-grok
@tanstack/ai-ollama
@tanstack/ai-openai
@tanstack/ai-openrouter
@tanstack/ai-preact
@tanstack/ai-react
@tanstack/ai-react-ui
@tanstack/ai-solid
@tanstack/ai-solid-ui
@tanstack/ai-svelte
@tanstack/ai-vue
@tanstack/ai-vue-ui
@tanstack/preact-ai-devtools
@tanstack/react-ai-devtools
@tanstack/solid-ai-devtools
commit: |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughMigrates ChatClient to a session-backed streaming model via a new SessionAdapter and createDefaultSession; reworks streaming lifecycle and background subscription handling; adds per-message stream state in StreamProcessor and new snapshot/role/tool-call types; updates framework bindings and adds tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as ChatClient
participant SA as SessionAdapter
participant Q as InternalQueue
participant CA as ConnectionAdapter
rect rgba(100,150,255,0.5)
Note over Client,SA: Subscription start
Client->>SA: subscribe(signal)
activate SA
SA->>Q: swap in fresh buffer / wait
SA-->>Client: AsyncIterable yields chunks
deactivate SA
end
rect rgba(150,200,100,0.5)
Note over Client,CA: send() -> connection stream via session
Client->>SA: send(messages, data, signal)
activate SA
SA->>CA: connect(messages, data, signal)
activate CA
CA-->>SA: StreamChunk (streaming)
loop per chunk
SA->>Q: push chunk
Q-->>SA: deliver to subscriber
SA-->>Client: yield chunk
end
CA-->>SA: complete / error
deactivate CA
SA-->>Client: send() resolves/rejects
deactivate SA
end
rect rgba(200,100,150,0.5)
Note over SA,Q: Subscription loop / abort handling
SA->>Q: wait for chunk or abort
Q-->>SA: chunk or abort
SA-->>Client: yield or stop
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/typescript/ai/src/activities/chat/stream/processor.ts (2)
936-961:⚠️ Potential issue | 🟡 MinorFalsy check on
chunk.resultskips empty-string results.
if (chunk.result)evaluates tofalsefor'', which means a tool returning an empty string as a valid result would be silently ignored. Use a strictundefinedcheck instead.🐛 Proposed fix
- if (chunk.result) { + if (chunk.result !== undefined) {
1050-1099:⚠️ Potential issue | 🟡 Minor
handleCustomEventsilently drops approval updates when no active assistant message exists.At line 1081, if
messageIdis null (no active assistant), theupdateToolCallApprovalcall is skipped butonApprovalRequeststill fires. The consumer receives the approval callback but the internal state isn't updated, leading to inconsistency. Consider logging a warning whenmessageIdis null.
🤖 Fix all issues with AI agents
In `@packages/typescript/ai-client/src/chat-client.ts`:
- Around line 14-15: The import ordering is wrong: the value import
createDefaultSession from './session-adapter' must come before the type-only
import SessionAdapter; move the value import (import { createDefaultSession }
from './session-adapter') above the type import (import type { SessionAdapter }
...) and ensure the value import is grouped with other value imports at the top
of the file so ESLint's ordering rule is satisfied.
In `@packages/typescript/ai-client/src/session-adapter.ts`:
- Around line 97-102: The send() function can throw from connection.connect() or
during async iteration and currently subscribers via subscribe() never learn
about these errors; wrap the connect/iteration in try/catch and on any exception
push a clear error sentinel (e.g., an object with an error flag/message) or call
the same queue/notify mechanism used by push() to deliver the error to
subscribers, then ensure the stream/queue is closed/marked complete so
subscribe() stops waiting; update send(), connection.connect() error handling,
and the consumer-side subscribe() to recognize and surface that sentinel/error
object.
🧹 Nitpick comments (7)
packages/typescript/ai-client/src/session-adapter.ts (1)
16-36: Use function property signatures per ESLint rule.The
@typescript-eslint/method-signature-stylerule requires function property syntax instead of shorthand method signatures.♻️ Proposed fix
export interface SessionAdapter { - subscribe(signal?: AbortSignal): AsyncIterable<StreamChunk> - - send( - messages: Array<UIMessage>, - data?: Record<string, any>, - signal?: AbortSignal, - ): Promise<void> + subscribe: (signal?: AbortSignal) => AsyncIterable<StreamChunk> + + send: ( + messages: Array<UIMessage>, + data?: Record<string, any>, + signal?: AbortSignal, + ) => Promise<void> }packages/typescript/ai/src/activities/chat/stream/processor.ts (1)
974-981:handleRunFinishedEventcallsfinalizeStream(), which will run again fromprocess().
handleRunFinishedEvent(line 980) callsfinalizeStream(), andprocess()(line 409) also callsfinalizeStream()after the stream ends. The second call is a no-op sinceactiveMessageIdsis cleared, butonStreamEndcould fire twice if a message is created between the two calls (unlikely but possible in edge cases with concurrent chunk processing).The current guard —
activeMessageIdsbeing empty on the second call — makes this safe in practice, but consider adding an explicitisDoneorisFinalizedguard tofinalizeStream()for clarity.packages/typescript/ai/tests/stream-processor.test.ts (1)
2309-2313: Test chunk missing requiredtoolNamefield, masked byas StreamChunkcast.The
TOOL_CALL_ENDchunk constructed here omits the requiredtoolNamefield. While this doesn't cause a runtime failure (the handler doesn't usetoolNamein this path), it's a test accuracy issue. Consider using theev.toolEndhelper for consistency:- processor.processChunk({ - type: 'TOOL_CALL_END', - toolCallId: 'tc-1', - timestamp: Date.now(), - } as StreamChunk) + processor.processChunk(ev.toolEnd('tc-1', 'myTool'))packages/typescript/ai-client/src/types.ts (1)
182-193: JSDoc says "not both" but providing both is silently accepted.The constructor (in
chat-client.ts) checksoptions.sessionfirst, then falls back tooptions.connection, so providing both just silently ignoresconnection. The JSDoc guidance on lines 184 and 191 says "Provide eitherconnectionorsession, not both" — but there's no runtime enforcement of the "not both" part.Consider either:
- Adding a runtime warning/error when both are provided, or
- Softening the JSDoc to say
sessiontakes priority when both are supplied.packages/typescript/ai-client/tests/session-adapter.test.ts (1)
258-261: Minor: sync generator used where async generator is expected.Line 259 uses a synchronous
function*withas anyto overrideconnect. While this works becausefor await...ofaccepts sync iterables, it diverges from theAsyncIterable<StreamChunk>contract ofConnectionAdapter.connect. Consider usingasync function*for consistency with the other test overrides (e.g., lines 58–66).Suggested fix
- connection.connect = function* () { - yield testChunk - } as any + connection.connect = async function* () { + yield testChunk + }packages/typescript/ai-client/src/chat-client.ts (2)
306-316:waitForProcessingpreemptively resolves stale promises — confirm this is safe.Line 312 resolves any existing
processingResolvebefore creating a new one. This is necessary to prevent oldstreamResponsecalls from hanging, but it means the oldstreamResponse'sawait processingCompletewill resolve successfully (not with an error), even though its processing didn't actually complete. The generation guard on line 487 catches this:await processingComplete // resolves because waitForProcessing resolved stale if (generation !== this.streamGeneration) return // bails outThis works correctly, but the coupling between
waitForProcessing's preemptive resolve andstreamGeneration's guard is non-obvious. A brief comment on line 312 explaining why it's safe (i.e., "the generation check in streamResponse will catch this") would help future readers.
774-780:updateOptionsaborts old subscription but doesn't null the controller.When updating
sessionorconnection, the old subscription is aborted (lines 775, 778), butsubscriptionAbortControlleris not set tonull. This is functionally fine sinceensureSubscription()checkssignal.aborted, but differs fromstop()andreload()which null the reference. Consider nulling for consistency.Suggested fix
if (options.session !== undefined) { this.subscriptionAbortController?.abort() + this.subscriptionAbortController = null this.session = options.session } else if (options.connection !== undefined) { this.subscriptionAbortController?.abort() + this.subscriptionAbortController = null this.session = createDefaultSession(options.connection) }
Remove unnecessary `chunk.delta !== undefined` condition (delta is always a string on TextMessageContentEvent) and remove redundant `!` non-null assertion inside an already-narrowed `if` block. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix import ordering: move value import `createDefaultSession` above type-only imports. Convert shorthand method signatures to function property style in the SessionAdapter interface. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wrap createDefaultSession's send() in try/catch and push a RUN_ERROR AG-UI event to the queue before re-throwing, so subscribe() consumers learn about connection failures through the standard protocol. Also resolve processingResolve on RUN_ERROR in consumeSubscription (same as RUN_FINISHED) to prevent hangs. Tests updated: error assertions now check message content rather than referential identity, since errors flowing through RUN_ERROR create new Error instances from the message string. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@packages/typescript/ai/src/activities/chat/stream/processor.ts`:
- Around line 648-667: The message state is created with the original role
('tool') while the UIMessage uses 'assistant', causing lookups in
getActiveAssistantMessageId() and getCurrentAssistantMessageId() to miss the
message; fix by passing the mapped uiRole ('assistant' when role === 'tool')
into createMessageState instead of the original role so the persisted state.role
matches the UIMessage role and subsequent stream events attach to the existing
message (update the call to createMessageState(messageId, ...) to use uiRole and
ensure any related state/collections like activeMessageIds remain consistent).
🧹 Nitpick comments (3)
packages/typescript/ai-client/src/session-adapter.ts (1)
56-94: Potential hang if signal is aborted between thewhilecheck andaddEventListener.There's a TOCTOU window: if the signal is aborted after the
while (!signal?.aborted)check at Line 76 but beforeaddEventListenerat Line 87, theabortevent may already have been dispatched. In modern runtimes (Node ≥ 16.14, modern browsers), lateabortlisteners fire immediately so this is safe; in older runtimes the Promise would never resolve.If you need to support older runtimes, add a guard after registering the listener:
🛡️ Defensive guard for older runtimes
myWaiters.push((c) => { signal?.removeEventListener('abort', onAbort) resolve(c) }) signal?.addEventListener('abort', onAbort, { once: true }) + // Guard: if signal was aborted in the window between the while + // check and addEventListener, the event already fired. Resolve now. + if (signal?.aborted) resolve(null) })packages/typescript/ai-client/src/chat-client.ts (1)
277-292:onErrormay fire twice for the same failure.When
session.send()throws (e.g., connection error),createDefaultSessionpushes aRUN_ERRORchunk and re-throws. The subscription loop processes theRUN_ERRORchunk → processor firesonError. ThenstreamResponse's catch block (Lines 507-511) also callssetError+onError. This results in the consumer'sonErrorcallback being invoked twice for the same error.Consider guarding the
streamResponsecatch so it skipsonErrorwhen the processor already surfaced it (e.g., by checkingthis.errorbefore setting it again), or suppressing the duplicate in the processor path forRUN_ERRORevents originating from the default session's error propagation.packages/typescript/ai/src/activities/chat/stream/processor.ts (1)
1301-1317: Content aggregation across message states may produce unexpected results in multi-message scenarios.
getResult()andgetState()concatenatetotalTextContentandthinkingContentacross allmessageStatesentries. In a multi-message session, this could mix content from different messages (e.g., two assistant messages). IfgetResult()is used for anything beyond basic backward-compat (e.g., single-shotprocess()calls), the concatenation could be surprising.Consider whether these methods should scope to the last assistant message or return a structured per-message result. Not urgent since the current usage appears to be limited to single-stream scenarios.
The stream processor mapped 'tool' to 'assistant' for UIMessage but stored the raw 'tool' role in MessageStreamState. This caused getActiveAssistantMessageId() and getCurrentAssistantMessageId() to miss tool-role messages, so subsequent stream events couldn't attach to the existing message. Now the uiRole mapping is applied consistently across all three cases in handleTextMessageStartEvent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/typescript/ai/src/activities/chat/stream/processor.ts (1)
1074-1101:⚠️ Potential issue | 🟡 Minor
onApprovalRequestfires even whenmessageIdis null and the tool-call part update is skipped.When
getActiveAssistantMessageId()returnsnull(Line 1083),updateToolCallApprovalis skipped, butonApprovalRequestat Line 1094 still fires. The client receives an approval request for a tool call whose UIMessage part was never updated to reflect the approval state, which could cause UI inconsistencies.Consider either logging a warning / returning early when
messageIdis null, or usingtoolCallToMessageto resolve the correct message for the approval update (similar tohandleToolCallArgsEvent).
🤖 Fix all issues with AI agents
In `@packages/typescript/ai/src/activities/chat/stream/processor.ts`:
- Around line 746-759: The check for chunk.delta currently uses `chunk.delta !==
''`, which treats undefined as not equal to empty string and causes "undefined"
to be concatenated into `nextText`; change the logic in the stream processor to
normalize `delta` before use (e.g., `const delta = chunk.delta ?? ''` or use
`chunk.delta || ''`) and then use `if (delta !== '') { nextText = currentText +
delta }` so `chunk.delta`, `chunk.content`, `currentText`, and `nextText` are
safely handled (mirror the safe pattern used in `handleToolCallArgsEvent`).
🧹 Nitpick comments (2)
packages/typescript/ai/src/activities/chat/stream/processor.ts (2)
976-983:finalizeStream()can be called twice whenRUN_FINISHEDis processed viaprocess().
handleRunFinishedEventcallsfinalizeStream()at Line 982, and thenprocess()calls it again at Line 409 after the stream loop ends. The second call is a no-op becauseactiveMessageIdswas already cleared, so no duplicateonStreamEndfires — but the doublecompleteAllToolCalls()(Line 981) followed by the one insidefinalizeStream()is also redundant.Consider guarding with an early return in
finalizeStream()ifactiveMessageIdsis empty, or removing the explicit call fromhandleRunFinishedEvent.
1303-1319: Aggregatingcontentandthinkingacross all message states concatenates without separators.In a multi-message session,
getResult()joinstotalTextContentfrom everyMessageStreamStateentry without any delimiter. If two assistant messages each produce text (e.g.,"Hello"and"World"), the result is"HelloWorld"rather than something distinguishable. The same applies tothinking.getState()(Line 1324) has the identical pattern.If multi-message
ProcessorResultis expected to be meaningful, consider adding a newline or other separator, or returning per-message results.
When chunk.delta was undefined, the check `chunk.delta !== ''` evaluated to true, causing "undefined" to be concatenated into nextText. Use `chunk.delta ?? ''` to normalize before comparison, matching the safe pattern already used in handleToolCallArgsEvent. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/typescript/ai/src/activities/chat/stream/processor.ts (1)
1074-1101:⚠️ Potential issue | 🟡 Minor
onApprovalRequestfires even whenmessageIdis null, skipping the state update.When
getActiveAssistantMessageId()returnsnull(line 1056), theupdateToolCallApprovalcall is correctly guarded (line 1084), butonApprovalRequestat line 1095 fires unconditionally. The client receives an approval request event for a tool call whose message state was never updated with the approval metadata. This could leave the UI in an inconsistent state where an approval dialog is shown but the underlying message part hasn't been marked.Proposed fix: move onApprovalRequest inside the guard
if (messageId) { this.messages = updateToolCallApproval( this.messages, messageId, toolCallId, approval.id, ) this.emitMessagesChange() - } - // Emit approval request event - this.events.onApprovalRequest?.({ - toolCallId, - toolName, - input, - approvalId: approval.id, - }) + // Emit approval request event + this.events.onApprovalRequest?.({ + toolCallId, + toolName, + input, + approvalId: approval.id, + }) + }
🧹 Nitpick comments (5)
packages/typescript/ai/src/activities/chat/stream/processor.ts (5)
243-254:getCurrentAssistantMessageIdhas O(n) scan over all message states.This iterates the full
messageStatesmap on every call. For typical use this is fine, but consider thatgetActiveAssistantMessageId()(line 528) does the same pattern overactiveMessageIds. The asymmetry is intentional (active vs all states, surviving finalize vs not), but a brief inline comment explaining why this scansmessageStatesrather thanactiveMessageIdswould help maintainers.
694-703: MESSAGES_SNAPSHOT clears per-message state but does not rebuild it for snapshot messages.After handling
MESSAGES_SNAPSHOT,messageStatesis empty whilethis.messageshas the snapshot contents. If subsequent content events (e.g.,TEXT_MESSAGE_CONTENT) reference a message ID from the snapshot,ensureAssistantMessagewill recreate state on demand, so this works. However, any in-progress accumulated text or tool-call state from before the snapshot is silently discarded — this is presumably intentional (full replace semantics), but worth a brief doc comment confirming that expectation.
977-984:handleRunFinishedEventcallsfinalizeStream(), which is also called byprocess()after the loop.When using
process(), ifRUN_FINISHEDis in the stream,finalizeStream()runs twice — once from here and once at line 409. The second invocation is effectively a no-op (activeMessageIds is already cleared), so no functional bug. However, any chunks arriving afterRUN_FINISHEDin the same stream are still dispatched throughprocessChunkwith no active message state, which could silently drop them or triggerensureAssistantMessageto create a spurious new message.Consider guarding
processChunkwith an early return whenisDoneis true, or documenting thatRUN_FINISHEDmust be the terminal event.Proposed guard in processChunk
processChunk(chunk: StreamChunk): void { + // RUN_FINISHED already finalized the stream; ignore trailing chunks + if (this.isDone && chunk.type !== 'RUN_ERROR') return + // Record chunk if enabled if (this.recording) {
1304-1348:getResult/getStateaggregate content from all message roles without filtering.Both methods concatenate
totalTextContentandthinkingContentacross all entries inmessageStates, regardless of role. If a stream includes non-assistant messages (e.g., aTEXT_MESSAGE_STARTwithrole: 'user'), their text would be mixed into the result. Additionally,getStatemerges tool call maps by ID — if two messages happen to share atoolCallId, the later entry silently overwrites the earlier one.This is unlikely in practice (streams typically contain only assistant messages), but filtering to
state.role === 'assistant'would make the aggregation semantics explicit and defensive.
545-577:ensureAssistantMessagesetspendingManualMessageIdon auto-creation.Line 573 sets
pendingManualMessageIdwhenever a message is auto-created, not just when called fromstartAssistantMessage(). This means if aTEXT_MESSAGE_CONTENTarrives beforeTEXT_MESSAGE_START, the auto-created message will be reconciled with the subsequentTEXT_MESSAGE_START. This is a reasonable backward-compat behavior, but it conflates "manually started" with "auto-created" messages. A comment clarifying this intent would help.
The no-unnecessary-condition rule flags ?? since TypeScript types delta as string. Using || preserves runtime safety and matches existing patterns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/typescript/ai/src/activities/chat/stream/processor.ts (2)
1053-1102:⚠️ Potential issue | 🟡 Minor
handleCustomEventretrievesmessageIdviagetActiveAssistantMessageId()which searches onlyactiveMessageIds— will returnnullafterfinalizeStream().In session-based flows where
processChunkis called individually, if aCUSTOMevent (e.g.,approval-requested) arrives afterRUN_FINISHEDhas finalized the stream,getActiveAssistantMessageId()returnsnullbecauseactiveMessageIdswas cleared. Theif (messageId)guard at Line 1084 prevents a crash, but the approval state update is silently dropped.This is related to the
finalizeStream()timing issue noted onhandleRunFinishedEvent, but worth calling out: custom events that carry tool-call context should ideally include amessageIdin their payload rather than relying on active-message lookup.
977-984:⚠️ Potential issue | 🟡 MinorRemove redundant
finalizeStream()call inhandleRunFinishedEventor make it idempotent.
handleRunFinishedEvent()(Line 983) callsfinalizeStream(), andprocess()(Line 409) calls it again after the chunk loop. The second call is harmless sinceactiveMessageIdsis already cleared, but redundant.More importantly, if
processChunk()is used outside ofprocess()(as shown in tests),RUN_FINISHEDfinalizes immediately. While tests showRUN_FINISHEDas a terminal event, makingfinalizeStream()idempotent is a defensive improvement:finalizeStream(): void { + if (this.activeMessageIds.size === 0) return let lastAssistantMessage: UIMessage | undefined ...
🤖 Fix all issues with AI agents
In `@packages/typescript/ai/src/activities/chat/stream/processor.ts`:
- Around line 694-703: handleMessagesSnapshotEvent currently replaces messages
but leaves shared stream flags (isDone, hasError, finishReason) stale; update it
to call resetStreamState() before applying the snapshot (or explicitly clear the
flags) so the stream-wide state is reset when a MESSAGES_SNAPSHOT arrives.
Locate the method handleMessagesSnapshotEvent and either invoke
resetStreamState() at the start of that method or set isDone = false, hasError =
false, finishReason = null prior to setting this.messages and clearing maps so
finalizeStream() and other logic see a fresh state.
🧹 Nitpick comments (2)
packages/typescript/ai/src/activities/chat/stream/processor.ts (2)
1304-1320:getResult()concatenates text from all messages without separators — multi-message streams will produce garbled content.Lines 1309-1311 simply
+=each message'stotalTextContentandthinkingContent. With multiple assistant messages (the scenario this refactor enables), two messages containing"Hello"and"World"produce"HelloWorld".If
ProcessorResult.contentis expected to be a single coherent string, consider joining with\n\nor documenting thatgetResult()is only meaningful for single-message flows. The same applies togetState()at Lines 1331-1333.
1234-1277:finalizeStreamemitsonStreamEndonly for the last assistant message — multi-message sessions won't receive per-message end signals.If multiple assistant messages are active (the primary use case for this refactor), only the last one triggers
onStreamEnd. Earlier assistant messages are finalized silently. TheonStreamEndcallback signature(message: UIMessage) => voidaccepts a single message, so callers tracking per-message loading state would miss completion of earlier messages.This may be intentional for now (the PR notes "per-message
isLoadingtracking" as follow-up work), but worth documenting.
handleMessagesSnapshotEvent was clearing maps but not resetting isDone, hasError, and finishReason. Use resetStreamState() which handles all of these, ensuring finalizeStream() sees fresh state after a snapshot. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This PR introduces a
SessionAdapterinterface that decouples sending messages from receiving chunks, and refactorsChatClientto support it. This enables durable sessions with support for resumability, re-connection, multi-tab, multi-device, multi-user and multi-agent.Existing
ConnectionAdapterusage and support is unchanged:Users choosing to opt-in to the session pattern can do so using the new
SessionAdapteras an alternative:Problem
Currently,
ChatClientis built around request-response.sendMessagefires an HTTP request, streams back a single assistant response.Request-response couples three things that should be independent:
This means there's no way to subscribe to a persistent stream, receive messages from multiple sources or resume consumption from an offset.
The
StreamProcessorcompounds this by tracking a single "current assistant message" with flat instance state. It can't handle interleaved messages from different participants.Solution
ChatClientto remove backed in assumptions around the request <> response interaction paradigmSessionAdapterinterface that supports message sending and session subscriptionsConnectionAdapterinterface into a defaultSessionAdapter1.
ChatClientrefactoredThe constructor resolves a
SessionAdapter. Either provided directly viasession, or by wrappingconnectionwithcreateDefaultSession.A background subscription loop reads from
subscribe()and feeds chunks to the processor.streamResponse()callssession.send(), then awaits a promise that resolves whenonStreamEndfires.2.
SessionAdapterinterfacesubscribe()returns a long-lived async iterable of AG-UI events.send()dispatches messages — responses arrive throughsubscribe(), not as a return value. This inversion is what enables session backends to replay events, inject snapshots, and manage stream lifecycle independently.SessionAdapteris intentionally minimal. Implementations can be backed by:3.
createDefaultSession(connection)Wraps an existing
ConnectionAdapterusing an async queue:send()callsconnection.connect()and pushes chunks;subscribe()yields them. Existing users get the new architecture transparently.4. Per-message stream state in
StreamProcessorThe processor now maintains a
Map<string, MessageStreamState>keyed by message ID, with a routing map from tool call IDs to parent messages. This allows:TEXT_MESSAGE_START/ content /TEXT_MESSAGE_END)startAssistantMessage()callsisComplete,onStreamEndper message)Backward compatibility:
startAssistantMessage()still works via apendingManualMessageIdthat reconciles whenTEXT_MESSAGE_STARTarrives.5. AG-UI type alignment
Three additive type changes:
TextMessageStartEvent.role— widened from'assistant'to'user' | 'assistant' | 'system' | 'tool'(session backends replay messages of any role)ToolCallStartEvent.parentMessageId— routes tool calls to the correct message without implicit "current message" stateMessagesSnapshotEvent— first-class event for hydrating the full conversation transcript on connect/reconnect6. Framework hooks
All framework integrations (
ai-react,ai-solid,ai-vue,ai-svelte,ai-preact) thread the newsessionoption toChatClient. No additional framework-level changes needed.Backward compatibility
Existing usage is unchanged.
ChatClientOptionsaccepts eitherconnectionorsession. Theconnectionpath works identically to before —createDefaultSessionmakes the wrapping invisible.Follow-up work
This PR establishes the transport and processing foundation. Envisaged follow-up work includes:
SessionAdapterimplementations (e.g.,@durable-streams/sessions/tanstack-ai)STATE_SNAPSHOT/STATE_DELTAhandling with managedsessionStateonChatClientonSessionStateChangeandonCustomEventcallbacksisLoadingtrackingSummary by CodeRabbit
New Features
Bug Fixes
Tests