-
Notifications
You must be signed in to change notification settings - Fork 3.9k
feat (ui/react): support resuming an ongoing stream #6052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
10b8eba
6e15bff
fd753e4
7dad719
af232ee
89b2f75
283eea4
b475105
2dd7eda
cad77ab
cebcca3
64dc198
c661289
9ab107d
02d91ec
99f9f6f
8383e2a
3fc8c24
9ea0a1b
4533564
fef181c
05e3ce4
67fe0d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| '@ai-sdk/ui-utils': patch | ||
| '@ai-sdk/react': patch | ||
| --- | ||
|
|
||
| feat (ui/react): support resuming an ongoing stream |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -325,3 +325,165 @@ When the client reloads the page after a disconnect, the chat will be restored f | |
| the case where the client reloads the page after a disconnection, but the | ||
| streaming is not yet complete. | ||
| </Note> | ||
|
|
||
| ## Resuming ongoing streams | ||
|
|
||
| <Note>This feature is experimental and may change in future versions.</Note> | ||
|
|
||
| The `useChat` hook has experimental support for resuming an ongoing chat generation stream by any client, either after a network disconnect or by reloading the chat page. This can be useful for building applications that involve long-running conversations or for ensuring that messages are not lost in case of network failures. | ||
|
|
||
| The following are the pre-requisities for your chat application to support resumable streams: | ||
|
|
||
| - Installing the [`resumable-stream`](https://www.npmjs.com/package/resumable-stream) package that helps create and manage the publisher/subscriber mechanism of the streams. | ||
| - Creating a [Redis](https://vercel.com/marketplace/redis) instance to store the stream state. | ||
| - Creating a table that tracks the stream IDs associated with a chat. | ||
|
|
||
| To resume a chat stream, you will use the `experimental_resume` function returned by the `useChat` hook. You will call this function during the initial mount of the hook inside the main chat component. | ||
|
|
||
| ```tsx filename="app/components/chat.tsx" | ||
| 'use client' | ||
|
|
||
| import { useChat } from "@ai-sdk/react"; | ||
| import { Input } from "@/components/input"; | ||
| import { Messages } from "@/components/messages"; | ||
|
|
||
| export function Chat() { | ||
| const { experimental_resume } = useChat({id}); | ||
|
|
||
| useEffect(() => { | ||
| experimental_resume(); | ||
|
|
||
| // we use an empty dependency array to | ||
| // ensure this effect runs only once | ||
| }, []) | ||
|
|
||
| return ( | ||
| <div> | ||
| <Messages> | ||
| <Input/> | ||
| </div> | ||
| ) | ||
| } | ||
| ``` | ||
|
|
||
| The `experimental_resume` function makes a `GET` request to your configured chat endpoint (or `/api/chat` by default) whenever your client calls it. If there’s an active stream, it will pick up where it left off, otherwise it simply finishes without error. | ||
|
|
||
| The `GET` request automatically appends the `chatId` query parameter to the URL to help identify the chat the request belongs to. Using the `chatId`, you can look up the most recent stream ID from the database and resume the stream. | ||
|
|
||
| ```bash | ||
| GET /api/chat?chatId=<your-chat-id> | ||
| ``` | ||
|
|
||
| Earlier, you must've implemented the `POST` handler for the `/api/chat` route to create new chat generations. When using `experimental_resume`, you must also implement the `GET` handler for `/api/chat` route to resume streams. | ||
|
|
||
| ### 1. Implement the GET handler | ||
|
|
||
| Add a `GET` method to `/api/chat` that: | ||
|
|
||
| 1. Reads `chatId` from the query string | ||
| 2. Validates it’s present | ||
| 3. Loads any stored stream IDs for that chat | ||
| 4. Returns the latest one to `streamContext.resumableStream()` | ||
| 5. Falls back to an empty stream if it’s already closed | ||
|
|
||
| ```ts filename="app/api/chat/route.ts" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The highlighting is broken - wonder if this will lead to issues with the docs rendering. Did you check it in the docs deployment?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm not sure why github isn't highlighting the code snippet, fwiw I did try building this in the docs deployment and it works! |
||
| import { loadStreams } from '@/util/chat-store'; | ||
| import { createDataStream } from 'ai'; | ||
| import { after } from 'next/server'; | ||
| import { createResumableStreamContext } from 'resumable-stream'; | ||
|
|
||
| const streamContext = createResumableStreamContext({ | ||
| waitUntil: after, | ||
| }); | ||
|
|
||
| export async function GET() { | ||
| const { searchParams } = new URL(request.url); | ||
| const chatId = searchParams.get('chatId'); | ||
|
|
||
| if (!chatId) { | ||
| return new Response('id is required', { status: 400 }); | ||
| } | ||
|
|
||
| const streamIds = await loadStreams(chatId); | ||
|
|
||
| if (!streamIds.length) { | ||
| return new Response('No streams found', { status: 404 }); | ||
| } | ||
|
|
||
| const recentStreamId = streamIds.at(-1); | ||
|
|
||
| if (!recentStreamId) { | ||
| return new Response('No recent stream found', { status: 404 }); | ||
| } | ||
|
|
||
| const emptyDataStream = createDataStream({ | ||
| execute: () => {}, | ||
| }); | ||
|
|
||
| return new Response( | ||
| await streamContext.resumableStream(recentStreamId, () => emptyDataStream), | ||
| ); | ||
| } | ||
| ``` | ||
|
|
||
| After you've implemented the `GET` handler, you can update the `POST` handler to handle the creation of resumable streams. | ||
|
|
||
| ### 2. Update the POST handler | ||
|
|
||
| When you create a brand-new chat completion, you must: | ||
|
|
||
| 1. Generate a fresh `streamId` | ||
| 2. Persist it alongside your `chatId` | ||
| 3. Kick off a `createDataStream` that pipes tokens as they arrive | ||
| 4. Hand that new stream to `streamContext.resumableStream()` | ||
|
|
||
| ```ts filename="app/api/chat/route.ts" | ||
| import { | ||
| appendResponseMessages, | ||
| createDataStream, | ||
| generateId, | ||
| streamText, | ||
| } from 'ai'; | ||
| import { appendStreamId, saveChat } from '@/util/chat-store'; | ||
| import { createResumableStreamContext } from 'resumable-stream'; | ||
|
|
||
| const streamContext = createResumableStreamContext({ | ||
| waitUntil: after, | ||
| }); | ||
|
|
||
| async function POST(request: Request) { | ||
| const { id, messages } = await req.json(); | ||
| const streamId = generateId(); | ||
|
|
||
| // Record this new stream so we can resume later | ||
| await appendStreamId({ chatId: id, streamId }); | ||
|
|
||
| // Build the data stream that will emit tokens | ||
| const stream = createDataStream({ | ||
| execute: dataStream => { | ||
| const result = streamText({ | ||
| model: openai('gpt-4o'), | ||
| messages, | ||
| onFinish: async ({ response }) => { | ||
| await saveChat({ | ||
| id, | ||
| messages: appendResponseMessages({ | ||
| messages, | ||
| responseMessages: response.messages, | ||
| }), | ||
| }); | ||
| }, | ||
| }); | ||
|
|
||
| // Return a resumable stream to the client | ||
| result.mergeIntoDataStream(dataStream); | ||
| }, | ||
| }); | ||
|
|
||
| return new Response( | ||
| await streamContext.resumableStream(streamId, () => stream), | ||
| ); | ||
| } | ||
| ``` | ||
|
|
||
| With both handlers, your clients can now gracefully resume ongoing streams. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,5 +34,6 @@ yarn-error.log* | |
| *.tsbuildinfo | ||
| next-env.d.ts | ||
|
|
||
| # chat persistence | ||
| # persistence | ||
| .chats | ||
| .streams | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| import { | ||
| appendMessageToChat, | ||
| appendStreamId, | ||
| loadStreams, | ||
| saveChat, | ||
| } from '@/util/chat-store'; | ||
| import { openai } from '@ai-sdk/openai'; | ||
| import { | ||
| appendResponseMessages, | ||
| createDataStream, | ||
| generateId, | ||
| Message, | ||
| streamText, | ||
| } from 'ai'; | ||
| import { after } from 'next/server'; | ||
| import { createResumableStreamContext } from 'resumable-stream'; | ||
|
|
||
| // Allow streaming responses up to 30 seconds | ||
| export const maxDuration = 30; | ||
|
|
||
| export async function POST(req: Request) { | ||
| const streamContext = createResumableStreamContext({ | ||
| waitUntil: after, | ||
| }); | ||
|
|
||
| const { id, messages }: { id: string; messages: Message[] } = | ||
| await req.json(); | ||
|
|
||
| const streamId = generateId(); | ||
|
|
||
| const recentUserMessage = messages | ||
| .filter(message => message.role === 'user') | ||
| .at(-1); | ||
|
|
||
| if (!recentUserMessage) { | ||
| throw new Error('No recent user message found'); | ||
| } | ||
|
|
||
| await appendMessageToChat({ chatId: id, message: recentUserMessage }); | ||
|
|
||
| await appendStreamId({ chatId: id, streamId }); | ||
|
|
||
| const stream = createDataStream({ | ||
| execute: dataStream => { | ||
| const result = streamText({ | ||
| model: openai('gpt-4o'), | ||
| messages, | ||
| onFinish: async ({ response }) => { | ||
| await saveChat({ | ||
| id, | ||
| messages: appendResponseMessages({ | ||
| messages, | ||
| responseMessages: response.messages, | ||
| }), | ||
| }); | ||
| }, | ||
| }); | ||
|
|
||
| result.mergeIntoDataStream(dataStream); | ||
| }, | ||
| }); | ||
|
|
||
| return new Response( | ||
| await streamContext.resumableStream(streamId, () => stream), | ||
| ); | ||
| } | ||
|
|
||
| export async function GET(request: Request) { | ||
| const streamContext = createResumableStreamContext({ | ||
| waitUntil: after, | ||
| }); | ||
|
|
||
| const { searchParams } = new URL(request.url); | ||
| const chatId = searchParams.get('chatId'); | ||
|
|
||
| if (!chatId) { | ||
| return new Response('id is required', { status: 400 }); | ||
| } | ||
|
|
||
| const streamIds = await loadStreams(chatId); | ||
|
|
||
| if (!streamIds.length) { | ||
| return new Response('No streams found', { status: 404 }); | ||
| } | ||
|
|
||
| const recentStreamId = streamIds.at(-1); | ||
|
|
||
| if (!recentStreamId) { | ||
| return new Response('No recent stream found', { status: 404 }); | ||
| } | ||
|
|
||
| const emptyDataStream = createDataStream({ | ||
| execute: () => {}, | ||
| }); | ||
|
|
||
| return new Response( | ||
| await streamContext.resumableStream(recentStreamId, () => emptyDataStream), | ||
| ); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| import { loadChat } from '@/util/chat-store'; | ||
| import { Chat } from '../chat'; | ||
|
|
||
| export default async function Page({ | ||
| params, | ||
| }: { | ||
| params: Promise<{ id: string }>; | ||
| }) { | ||
| const { id } = await params; | ||
|
|
||
| const messages = await loadChat(id); | ||
|
|
||
| return <Chat chatId={id} autoResume={true} initialMessages={messages} />; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @nicoalbanese for docs review