Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/nine-pillows-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@ai-sdk/ui-utils': patch
'@ai-sdk/react': patch
---

feat (ui/react): support resuming an ongoing stream
162 changes: 162 additions & 0 deletions content/docs/04-ai-sdk-ui/03-chatbot-message-persistence.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,165 @@ When the client reloads the page after a disconnect, the chat will be restored f
the case where the client reloads the page after a disconnection, but the
streaming is not yet complete.
</Note>

## Resuming ongoing streams

<Note>This feature is experimental and may change in future versions.</Note>

The `useChat` hook has experimental support for resuming an ongoing chat generation stream by any client, either after a network disconnect or by reloading the chat page. This can be useful for building applications that involve long-running conversations or for ensuring that messages are not lost in case of network failures.

The following are the pre-requisities for your chat application to support resumable streams:

- Installing the [`resumable-stream`](https://www.npmjs.com/package/resumable-stream) package that helps create and manage the publisher/subscriber mechanism of the streams.
- Creating a [Redis](https://vercel.com/marketplace/redis) instance to store the stream state.
- Creating a table that tracks the stream IDs associated with a chat.

To resume a chat stream, you will use the `experimental_resume` function returned by the `useChat` hook. You will call this function during the initial mount of the hook inside the main chat component.

```tsx filename="app/components/chat.tsx"
'use client'

import { useChat } from "@ai-sdk/react";
import { Input } from "@/components/input";
import { Messages } from "@/components/messages";

export function Chat() {
const { experimental_resume } = useChat({id});

useEffect(() => {
experimental_resume();

// we use an empty dependency array to
// ensure this effect runs only once
}, [])

return (
<div>
<Messages>
<Input/>
</div>
)
}
```

The `experimental_resume` function makes a `GET` request to your configured chat endpoint (or `/api/chat` by default) whenever your client calls it. If there’s an active stream, it will pick up where it left off, otherwise it simply finishes without error.

The `GET` request automatically appends the `chatId` query parameter to the URL to help identify the chat the request belongs to. Using the `chatId`, you can look up the most recent stream ID from the database and resume the stream.

```bash
GET /api/chat?chatId=<your-chat-id>
```

Earlier, you must've implemented the `POST` handler for the `/api/chat` route to create new chat generations. When using `experimental_resume`, you must also implement the `GET` handler for `/api/chat` route to resume streams.

### 1. Implement the GET handler

Add a `GET` method to `/api/chat` that:

1. Reads `chatId` from the query string
2. Validates it’s present
3. Loads any stored stream IDs for that chat
4. Returns the latest one to `streamContext.resumableStream()`
5. Falls back to an empty stream if it’s already closed

```ts filename="app/api/chat/route.ts"
import { loadStreams } from '@/util/chat-store';
import { createDataStream } from 'ai';
import { after } from 'next/server';
import { createResumableStreamContext } from 'resumable-stream';

const streamContext = createResumableStreamContext({
waitUntil: after,
});

export async function GET() {
const { searchParams } = new URL(request.url);
const chatId = searchParams.get('chatId');

if (!chatId) {
return new Response('id is required', { status: 400 });
}

const streamIds = await loadStreams(chatId);

if (!streamIds.length) {
return new Response('No streams found', { status: 404 });
}

const recentStreamId = streamIds.at(-1);

if (!recentStreamId) {
return new Response('No recent stream found', { status: 404 });
}

const emptyDataStream = createDataStream({
execute: () => {},
});

return new Response(
await streamContext.resumableStream(recentStreamId, () => emptyDataStream),
);
}
```

After you've implemented the `GET` handler, you can update the `POST` handler to handle the creation of resumable streams.

### 2. Update the POST handler

When you create a brand-new chat completion, you must:

1. Generate a fresh `streamId`
2. Persist it alongside your `chatId`
3. Kick off a `createDataStream` that pipes tokens as they arrive
4. Hand that new stream to `streamContext.resumableStream()`

```ts filename="@/app/api/chat/route.ts"
import {
appendResponseMessages,
createDataStream,
generateId,
streamText,
} from 'ai';
import { appendStreamId, saveChat } from '@/util/chat-store';
import { createResumableStreamContext } from 'resumable-stream';

const streamContext = createResumableStreamContext({
waitUntil: after,
});

async function POST(request: Request) {
const { id, messages } = await req.json();
const streamId = generateId();

// Record this new stream so we can resume later
await appendStreamId({ chatId: id, streamId });

// Build the data stream that will emit tokens
const stream = createDataStream({
execute: dataStream => {
const result = streamText({
model: openai('gpt-4o'),
messages,
onFinish: async ({ response }) => {
await saveChat({
id,
messages: appendResponseMessages({
messages,
responseMessages: response.messages,
}),
});
},
});

// Return a resumable stream to the client
result.mergeIntoDataStream(dataStream);
},
});

return new Response(
await streamContext.resumableStream(streamId, () => stream),
);
}
```

With both handlers, your clients can now gracefully resume ongoing streams.
2 changes: 2 additions & 0 deletions examples/next-openai/.env.local.example
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ BLOB_READ_WRITE_TOKEN=xxxxxxx
# Required for reasoning example
DEEPSEEK_API_KEY=xxxxxxx

# Required for resumable streams. You can create a Redis store here: https://vercel.com/marketplace/redis
REDIS_URL=xxxxxx
3 changes: 2 additions & 1 deletion examples/next-openai/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ yarn-error.log*
*.tsbuildinfo
next-env.d.ts

# chat persistence
# persistence
.chats
.streams
99 changes: 99 additions & 0 deletions examples/next-openai/app/api/use-chat-resume/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import {
appendMessageToChat,
appendStreamId,
loadStreams,
saveChat,
} from '@/util/chat-store';
import { openai } from '@ai-sdk/openai';
import {
appendResponseMessages,
createDataStream,
generateId,
Message,
streamText,
} from 'ai';
import { after } from 'next/server';
import { createResumableStreamContext } from 'resumable-stream';

// Allow streaming responses up to 30 seconds
export const maxDuration = 30;

export async function POST(req: Request) {
const streamContext = createResumableStreamContext({
waitUntil: after,
});

const { id, messages }: { id: string; messages: Message[] } =
await req.json();

const streamId = generateId();

const recentUserMessage = messages
.filter(message => message.role === 'user')
.at(-1);

if (!recentUserMessage) {
throw new Error('No recent user message found');
}

await appendMessageToChat({ chatId: id, message: recentUserMessage });

await appendStreamId({ chatId: id, streamId });

const stream = createDataStream({
execute: dataStream => {
const result = streamText({
model: openai('gpt-4o'),
messages,
onFinish: async ({ response }) => {
await saveChat({
id,
messages: appendResponseMessages({
messages,
responseMessages: response.messages,
}),
});
},
});

result.mergeIntoDataStream(dataStream);
},
});

return new Response(
await streamContext.resumableStream(streamId, () => stream),
);
}

export async function GET(request: Request) {
const streamContext = createResumableStreamContext({
waitUntil: after,
});

const { searchParams } = new URL(request.url);
const chatId = searchParams.get('chatId');

if (!chatId) {
return new Response('id is required', { status: 400 });
}

const streamIds = await loadStreams(chatId);

if (!streamIds.length) {
return new Response('No streams found', { status: 404 });
}

const recentStreamId = streamIds.at(-1);

if (!recentStreamId) {
return new Response('No recent stream found', { status: 404 });
}

const emptyDataStream = createDataStream({
execute: () => {},
});

return new Response(
await streamContext.resumableStream(recentStreamId, () => emptyDataStream),
);
}
14 changes: 14 additions & 0 deletions examples/next-openai/app/use-chat-resume/[id]/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { loadChat } from '@/util/chat-store';
import { Chat } from '../chat';

export default async function Page({
params,
}: {
params: Promise<{ id: string }>;
}) {
const { id } = await params;

const messages = await loadChat(id);

return <Chat chatId={id} autoResume={true} initialMessages={messages} />;
}
Loading