Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f0b81bb
feat(ai): align start event types with AG-UI
thruflo Feb 10, 2026
30d00e1
feat(ai): add MessageStreamState type for per-message stream tracking
thruflo Feb 10, 2026
f03493b
feat(ai): refactor StreamProcessor to per-message state
thruflo Feb 10, 2026
f036f55
feat(ai): replace STATE_SNAPSHOT with MESSAGES_SNAPSHOT event
thruflo Feb 10, 2026
3f1ccb5
feat(ai-client): add SessionAdapter interface and createDefaultSession
thruflo Feb 10, 2026
202244a
feat(ai-client): refactor ChatClient to use SessionAdapter subscripti…
thruflo Feb 10, 2026
d4cc2b1
fix(ai-preact): thread option through.
thruflo Feb 10, 2026
f45dd77
fix(ai): finalizeStream when RUN_FINISHED.
thruflo Feb 10, 2026
6d1c733
fix(ai-client): handle reload during active stream with generation co…
thruflo Feb 10, 2026
a217426
docs: remove proposal docs.
thruflo Feb 10, 2026
fd1c50c
fix(ai, ai-client): address stream lifecycle edge cases from PR review
thruflo Feb 10, 2026
8c628ee
fix(ai-client): fix reload failures from stale stream state and waite…
thruflo Feb 11, 2026
36d6a93
ci: apply automated fixes
autofix-ci[bot] Feb 11, 2026
2abc6c7
fix(ai): resolve eslint errors in stream processor
thruflo Feb 11, 2026
c5e1aa3
fix(ai-client): resolve eslint errors in chat-client and session-adapter
thruflo Feb 11, 2026
537b73c
fix(ai-client): propagate send() errors to subscribe() consumers
thruflo Feb 11, 2026
fc52ef7
fix(ai): map 'tool' role to 'assistant' in message state to fix lookups
thruflo Feb 11, 2026
ed1cddb
fix(ai): normalize chunk.delta to avoid "undefined" string concatenation
thruflo Feb 12, 2026
fd7c226
fix(ai): use || instead of ?? for chunk.delta fallback to satisfy eslint
thruflo Feb 12, 2026
64f5517
fix(ai): reset stream flags on MESSAGES_SNAPSHOT to avoid stale state
thruflo Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 172 additions & 82 deletions packages/typescript/ai-client/src/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
normalizeToUIMessage,
} from '@tanstack/ai'
import { DefaultChatClientEventEmitter } from './events'
import { createDefaultSession } from './session-adapter'
import type {
AnyClientTool,
ContentPart,
Expand All @@ -12,6 +13,7 @@ import type {
} from '@tanstack/ai'
import type { ConnectionAdapter } from './connection-adapters'
import type { ChatClientEventEmitter } from './events'
import type { SessionAdapter } from './session-adapter'
import type {
ChatClientOptions,
ChatClientState,
Expand All @@ -23,7 +25,7 @@ import type {

export class ChatClient {
private processor: StreamProcessor
private connection: ConnectionAdapter
private session!: SessionAdapter
private uniqueId: string
private body: Record<string, any> = {}
private pendingMessageBody: Record<string, any> | undefined = undefined
Expand All @@ -40,6 +42,9 @@ export class ChatClient {
private pendingToolExecutions: Map<string, Promise<void>> = new Map()
// Flag to deduplicate continuation checks during action draining
private continuationPending = false
private subscriptionAbortController: AbortController | null = null
private processingResolve: (() => void) | null = null
private streamGeneration = 0

private callbacksRef: {
current: {
Expand All @@ -57,7 +62,15 @@ export class ChatClient {
constructor(options: ChatClientOptions) {
this.uniqueId = options.id || this.generateUniqueId('chat')
this.body = options.body || {}
this.connection = options.connection

// Resolve session adapter
if (options.session) {
this.session = options.session
} else if (options.connection) {
this.session = createDefaultSession(options.connection)
} else {
throw new Error('Either connection or session must be provided')
}
this.events = new DefaultChatClientEventEmitter(this.uniqueId)

// Build client tools map
Expand Down Expand Up @@ -91,10 +104,24 @@ export class ChatClient {
},
onStreamStart: () => {
this.setStatus('streaming')
const messages = this.processor.getMessages()
const lastAssistant = messages.findLast(
(m: UIMessage) => m.role === 'assistant',
)
if (lastAssistant) {
this.currentMessageId = lastAssistant.id
this.events.messageAppended(
lastAssistant,
this.currentStreamId || undefined,
)
}
},
onStreamEnd: (message: UIMessage) => {
this.callbacksRef.current.onFinish(message)
this.setStatus('ready')
// Resolve the processing-complete promise so streamResponse can continue
this.processingResolve?.()
this.processingResolve = null
},
onError: (error: Error) => {
this.setError(error)
Expand Down Expand Up @@ -226,68 +253,66 @@ export class ChatClient {
}

/**
* Process a stream through the StreamProcessor
* Start the background subscription loop.
*/
private async processStream(
source: AsyncIterable<StreamChunk>,
): Promise<UIMessage | null> {
// Generate a stream ID for this streaming operation
this.currentStreamId = this.generateUniqueId('stream')
private startSubscription(): void {
this.subscriptionAbortController = new AbortController()
const signal = this.subscriptionAbortController.signal

// Prepare for a new assistant message (created lazily on first content)
this.processor.prepareAssistantMessage()
this.consumeSubscription(signal).catch((err) => {
if (err instanceof Error && err.name !== 'AbortError') {
this.setError(err)
this.setStatus('error')
this.callbacksRef.current.onError(err)
}
// Resolve pending processing so streamResponse doesn't hang
this.processingResolve?.()
this.processingResolve = null
})
}

// Process each chunk
for await (const chunk of source) {
/**
* Consume chunks from the session subscription.
*/
private async consumeSubscription(signal: AbortSignal): Promise<void> {
const stream = this.session.subscribe(signal)
for await (const chunk of stream) {
if (signal.aborted) break
this.callbacksRef.current.onChunk(chunk)
this.processor.processChunk(chunk)

// Track the message ID once the processor lazily creates it
if (!this.currentMessageId) {
const newMessageId =
this.processor.getCurrentAssistantMessageId() ?? null
if (newMessageId) {
this.currentMessageId = newMessageId
// Emit message appended event now that the assistant message exists
const assistantMessage = this.processor
.getMessages()
.find((m: UIMessage) => m.id === newMessageId)
if (assistantMessage) {
this.events.messageAppended(
assistantMessage,
this.currentStreamId || undefined,
)
}
}
// RUN_FINISHED / RUN_ERROR signal run completion — resolve processing
// (redundant if onStreamEnd already resolved it, harmless)
if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') {
this.processingResolve?.()
this.processingResolve = null
}

// Yield control back to event loop to allow UI updates
// Yield control back to event loop for UI updates
await new Promise((resolve) => setTimeout(resolve, 0))
}
}

// Wait for all pending tool executions to complete before finalizing
// This ensures client tools finish before we check for continuation
if (this.pendingToolExecutions.size > 0) {
await Promise.all(this.pendingToolExecutions.values())
}

// Finalize the stream
this.processor.finalizeStream()

// Get the message ID (may be null if no content arrived)
const messageId = this.processor.getCurrentAssistantMessageId()

// Clear the current stream and message IDs
this.currentStreamId = null
this.currentMessageId = null

// Return the assistant message if one was created
if (messageId) {
const messages = this.processor.getMessages()
return messages.find((m: UIMessage) => m.id === messageId) || null
/**
* Ensure subscription loop is running, starting it if needed.
*/
private ensureSubscription(): void {
if (
!this.subscriptionAbortController ||
this.subscriptionAbortController.signal.aborted
) {
this.startSubscription()
}
}

return null
/**
* Create a promise that resolves when onStreamEnd fires.
* Used by streamResponse to await processing completion.
*/
private waitForProcessing(): Promise<void> {
// Resolve any stale promise (e.g., from a previous aborted request)
this.processingResolve?.()
return new Promise<void>((resolve) => {
this.processingResolve = resolve
})
}

/**
Expand Down Expand Up @@ -407,6 +432,9 @@ export class ChatClient {
return
}

// Track generation so a superseded stream's cleanup doesn't clobber the new one
const generation = ++this.streamGeneration

this.setIsLoading(true)
this.setStatus('submitted')
this.setError(undefined)
Expand All @@ -433,42 +461,78 @@ export class ChatClient {
// Clear the pending message body after use
this.pendingMessageBody = undefined

// Connect and stream
const stream = this.connection.connect(
messages,
mergedBody,
this.abortController.signal,
)
// Generate stream ID — assistant message will be created by stream events
this.currentStreamId = this.generateUniqueId('stream')
this.currentMessageId = null

// Reset processor stream state for new response — prevents stale
// messageStates entries (from a previous stream) from blocking
// creation of a new assistant message (e.g. after reload).
this.processor.prepareAssistantMessage()

// Ensure subscription loop is running
this.ensureSubscription()

// Set up promise that resolves when onStreamEnd fires
const processingComplete = this.waitForProcessing()

// Send through session adapter (pushes chunks to subscription queue)
await this.session.send(messages, mergedBody, this.abortController.signal)

// Wait for subscription loop to finish processing all chunks
await processingComplete

await this.processStream(stream)
// If this stream was superseded (e.g. by reload()), bail out —
// the new stream owns the processor and processingResolve now.
if (generation !== this.streamGeneration) {
return
}

// Wait for pending client tool executions
if (this.pendingToolExecutions.size > 0) {
await Promise.all(this.pendingToolExecutions.values())
}

// Finalize (idempotent — may already be done by RUN_FINISHED handler)
this.processor.finalizeStream()

this.currentStreamId = null
this.currentMessageId = null
streamCompletedSuccessfully = true
} catch (err) {
if (err instanceof Error) {
if (err.name === 'AbortError') {
return
}
this.setError(err)
this.setStatus('error')
this.callbacksRef.current.onError(err)
if (generation === this.streamGeneration) {
this.setError(err)
this.setStatus('error')
this.callbacksRef.current.onError(err)
}
}
} finally {
this.abortController = null
this.setIsLoading(false)
this.pendingMessageBody = undefined // Ensure it's cleared even on error

// Drain any actions that were queued while the stream was in progress
await this.drainPostStreamActions()

// Continue conversation if the stream ended with a tool result (server tool completed)
if (streamCompletedSuccessfully) {
const messages = this.processor.getMessages()
const lastPart = messages.at(-1)?.parts.at(-1)

if (lastPart?.type === 'tool-result' && this.shouldAutoSend()) {
try {
await this.checkForContinuation()
} catch (error) {
console.error('Failed to continue flow after tool result:', error)
// Only clean up if this is still the active stream.
// A superseded stream (e.g. reload() started a new one) must not
// clobber the new stream's abortController or isLoading state.
if (generation === this.streamGeneration) {
this.abortController = null
this.setIsLoading(false)
this.pendingMessageBody = undefined // Ensure it's cleared even on error

// Drain any actions that were queued while the stream was in progress
await this.drainPostStreamActions()

// Continue conversation if the stream ended with a tool result (server tool completed)
if (streamCompletedSuccessfully) {
const messages = this.processor.getMessages()
const lastPart = messages.at(-1)?.parts.at(-1)

if (lastPart?.type === 'tool-result' && this.shouldAutoSend()) {
try {
await this.checkForContinuation()
} catch (error) {
console.error('Failed to continue flow after tool result:', error)
}
}
}
}
Expand All @@ -489,6 +553,17 @@ export class ChatClient {

if (lastUserMessageIndex === -1) return

// Cancel any active stream before reloading
if (this.isLoading) {
this.abortController?.abort()
this.abortController = null
this.subscriptionAbortController?.abort()
this.subscriptionAbortController = null
this.processingResolve?.()
this.processingResolve = null
this.setIsLoading(false)
}

this.events.reloaded(lastUserMessageIndex)

// Remove all messages after the last user message
Expand All @@ -502,10 +577,20 @@ export class ChatClient {
* Stop the current stream
*/
stop(): void {
// Abort any in-flight send
if (this.abortController) {
this.abortController.abort()
this.abortController = null
}

// Abort the subscription loop
this.subscriptionAbortController?.abort()
this.subscriptionAbortController = null

// Resolve any pending processing promise (unblock streamResponse)
this.processingResolve?.()
this.processingResolve = null

this.setIsLoading(false)
this.setStatus('ready')
this.events.stopped()
Expand Down Expand Up @@ -678,15 +763,20 @@ export class ChatClient {
*/
updateOptions(options: {
connection?: ConnectionAdapter
session?: SessionAdapter
body?: Record<string, any>
tools?: ReadonlyArray<AnyClientTool>
onResponse?: (response?: Response) => void | Promise<void>
onChunk?: (chunk: StreamChunk) => void
onFinish?: (message: UIMessage) => void
onError?: (error: Error) => void
}): void {
if (options.connection !== undefined) {
this.connection = options.connection
if (options.session !== undefined) {
this.subscriptionAbortController?.abort()
this.session = options.session
} else if (options.connection !== undefined) {
this.subscriptionAbortController?.abort()
this.session = createDefaultSession(options.connection)
}
if (options.body !== undefined) {
this.body = options.body
Expand Down
1 change: 1 addition & 0 deletions packages/typescript/ai-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export {
type ConnectionAdapter,
type FetchConnectionOptions,
} from './connection-adapters'
export { createDefaultSession, type SessionAdapter } from './session-adapter'

// Re-export message converters from @tanstack/ai
export {
Expand Down
Loading
Loading