diff --git a/.changeset/two-roses-think.md b/.changeset/two-roses-think.md new file mode 100644 index 000000000000..550b2b282e8b --- /dev/null +++ b/.changeset/two-roses-think.md @@ -0,0 +1,5 @@ +--- +'ai': major +--- + +chore (ai): remove StreamData and mergeStreams diff --git a/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap b/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap index f72754ace263..d8f79a8622bb 100644 --- a/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap +++ b/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap @@ -4123,25 +4123,6 @@ exports[`streamText > result.pipeDataStreamToResponse > should support custom er ] `; -exports[`streamText > result.pipeDataStreamToResponse > should support merging with existing stream data 1`] = ` -[ - "2:["stream-data-value"] -", - "f:{"messageId":"msg-0"} -", - "0:"Hello" -", - "0:", " -", - "0:"world!" -", - "e:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13},"isContinued":false} -", - "d:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13}} -", -] -`; - exports[`streamText > result.pipeDataStreamToResponse > should suppress usage information when sendUsage is false 1`] = ` [ "f:{"messageId":"msg-0"} @@ -4793,25 +4774,6 @@ exports[`streamText > result.toDataStream > should support custom error messages ] `; -exports[`streamText > result.toDataStream > should support merging with existing stream data 1`] = ` -[ - "2:["stream-data-value"] -", - "f:{"messageId":"msg-0"} -", - "0:"Hello" -", - "0:", " -", - "0:"world!" -", - "e:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13},"isContinued":false} -", - "d:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13}} -", -] -`; - exports[`streamText > result.toDataStream > should suppress usage information when sendUsage is false 1`] = ` [ "f:{"messageId":"msg-0"} @@ -4885,25 +4847,6 @@ exports[`streamText > result.toDataStreamResponse > should support custom error ] `; -exports[`streamText > result.toDataStreamResponse > should support merging with existing stream data 1`] = ` -[ - "2:["stream-data-value"] -", - "f:{"messageId":"msg-0"} -", - "0:"Hello" -", - "0:", " -", - "0:"world!" -", - "e:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13},"isContinued":false} -", - "d:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13}} -", -] -`; - exports[`streamText > result.toDataStreamResponse > should suppress usage information when sendUsage is false 1`] = ` [ "f:{"messageId":"msg-0"} diff --git a/packages/ai/core/generate-text/stream-text-result.ts b/packages/ai/core/generate-text/stream-text-result.ts index 78f48096efa9..a41fa42d2f7e 100644 --- a/packages/ai/core/generate-text/stream-text-result.ts +++ b/packages/ai/core/generate-text/stream-text-result.ts @@ -1,6 +1,6 @@ import { ServerResponse } from 'node:http'; -import { StreamData } from '../../streams/stream-data'; import { DataStreamWriter } from '../data-stream/data-stream-writer'; +import { ReasoningPart } from '../prompt/content-part'; import { CallWarning, FinishReason, @@ -11,12 +11,12 @@ import { Source } from '../types/language-model'; import { LanguageModelResponseMetadata } from '../types/language-model-response-metadata'; import { LanguageModelUsage } from '../types/usage'; import { AsyncIterableStream } from '../util/async-iterable-stream'; +import { DataStreamText } from '../util/data-stream-parts'; import { GeneratedFile } from './generated-file'; import { ResponseMessage, StepResult } from './step-result'; import { ToolCallUnion } from './tool-call'; import { ToolResultUnion } from './tool-result'; import { ToolSet } from './tool-set'; -import { ReasoningPart } from '../prompt/content-part'; export type DataStreamOptions = { /** @@ -215,10 +215,9 @@ If an error occurs, it is passed to the optional `onError` callback. */ toDataStream( options?: { - data?: StreamData; getErrorMessage?: (error: unknown) => string; } & DataStreamOptions, - ): ReadableStream; + ): ReadableStream; /** * Merges the result as a data stream into another data stream. @@ -247,7 +246,6 @@ If an error occurs, it is passed to the optional `onError` callback. pipeDataStreamToResponse( response: ServerResponse, options?: ResponseInit & { - data?: StreamData; getErrorMessage?: (error: unknown) => string; } & DataStreamOptions, ): void; @@ -278,7 +276,6 @@ If an error occurs, it is passed to the optional `onError` callback. */ toDataStreamResponse( options?: ResponseInit & { - data?: StreamData; getErrorMessage?: (error: unknown) => string; } & DataStreamOptions, ): Response; diff --git a/packages/ai/core/generate-text/stream-text.test.ts b/packages/ai/core/generate-text/stream-text.test.ts index c0dfeb459bd9..12e6507db16a 100644 --- a/packages/ai/core/generate-text/stream-text.test.ts +++ b/packages/ai/core/generate-text/stream-text.test.ts @@ -15,7 +15,6 @@ import { } from '@ai-sdk/provider-utils/test'; import assert from 'node:assert'; import { z } from 'zod'; -import { StreamData } from '../../streams/stream-data'; import { createDataStream } from '../data-stream/create-data-stream'; import { MockLanguageModelV2 } from '../test/mock-language-model-v2'; import { createMockServerResponse } from '../test/mock-server-response'; @@ -873,34 +872,6 @@ describe('streamText', () => { expect(mockResponse.getDecodedChunks()).toMatchSnapshot(); }); - it('should support merging with existing stream data', async () => { - const mockResponse = createMockServerResponse(); - - const result = streamText({ - model: createTestModel(), - prompt: 'test-input', - experimental_generateMessageId: mockId({ prefix: 'msg' }), - }); - - const streamData = new StreamData(); - streamData.append('stream-data-value'); - streamData.close(); - - result.pipeDataStreamToResponse(mockResponse, { - data: streamData, - }); - - await mockResponse.waitForEnd(); - - expect(mockResponse.statusCode).toBe(200); - expect(mockResponse.headers).toEqual({ - 'Content-Type': 'text/plain; charset=utf-8', - 'X-Vercel-AI-Data-Stream': 'v1', - }); - - expect(mockResponse.getDecodedChunks()).toMatchSnapshot(); - }); - it('should mask error messages by default', async () => { const mockResponse = createMockServerResponse(); @@ -1099,30 +1070,7 @@ describe('streamText', () => { const dataStream = result.toDataStream(); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); - }); - - it('should support merging with existing stream data', async () => { - const result = streamText({ - model: createTestModel(), - ...defaultSettings(), - }); - - const streamData = new StreamData(); - streamData.append('stream-data-value'); - streamData.close(); - - const dataStream = result.toDataStream({ data: streamData }); - - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should send tool call and tool result stream parts', async () => { @@ -1167,9 +1115,7 @@ describe('streamText', () => { }); expect( - await convertReadableStreamToArray( - result.toDataStream().pipeThrough(new TextDecoderStream()), - ), + await convertReadableStreamToArray(result.toDataStream()), ).toMatchSnapshot(); }); @@ -1216,9 +1162,7 @@ describe('streamText', () => { }); expect( - await convertReadableStreamToArray( - result.toDataStream().pipeThrough(new TextDecoderStream()), - ), + await convertReadableStreamToArray(result.toDataStream()), ).toMatchSnapshot(); }); @@ -1234,11 +1178,7 @@ describe('streamText', () => { const dataStream = result.toDataStream(); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should support custom error messages', async () => { @@ -1255,11 +1195,7 @@ describe('streamText', () => { getErrorMessage: error => `custom error message: ${error}`, }); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should suppress usage information when sendUsage is false', async () => { @@ -1279,11 +1215,7 @@ describe('streamText', () => { const dataStream = result.toDataStream({ sendUsage: false }); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should omit message finish event (d:) when sendFinish is false', async () => { @@ -1305,11 +1237,7 @@ describe('streamText', () => { experimental_sendFinish: false, }); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should send reasoning content when sendReasoning is true', async () => { @@ -1320,11 +1248,7 @@ describe('streamText', () => { const dataStream = result.toDataStream({ sendReasoning: true }); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should send source content when sendSources is true', async () => { @@ -1335,11 +1259,7 @@ describe('streamText', () => { const dataStream = result.toDataStream({ sendSources: true }); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); it('should send file content', async () => { @@ -1350,11 +1270,7 @@ describe('streamText', () => { const dataStream = result.toDataStream(); - expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), - ).toMatchSnapshot(); + expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot(); }); }); @@ -1404,26 +1320,6 @@ describe('streamText', () => { expect(await convertResponseStreamToArray(response)).toMatchSnapshot(); }); - it('should support merging with existing stream data', async () => { - const result = streamText({ - model: createTestModel(), - prompt: 'test-input', - experimental_generateMessageId: mockId({ prefix: 'msg' }), - }); - - const streamData = new StreamData(); - streamData.append('stream-data-value'); - streamData.close(); - - const response = result.toDataStreamResponse({ data: streamData }); - - expect(response.status).toStrictEqual(200); - expect(response.headers.get('Content-Type')).toStrictEqual( - 'text/plain; charset=utf-8', - ); - expect(await convertResponseStreamToArray(response)).toMatchSnapshot(); - }); - it('should mask error messages by default', async () => { const result = streamText({ model: createTestModel({ @@ -1654,9 +1550,7 @@ describe('streamText', () => { expect({ textStream: await convertAsyncIterableToArray(result.textStream), fullStream: await convertAsyncIterableToArray(result.fullStream), - dataStream: await convertReadableStreamToArray( - result.toDataStream().pipeThrough(new TextDecoderStream()), - ), + dataStream: await convertReadableStreamToArray(result.toDataStream()), }).toMatchSnapshot(); }); }); @@ -2905,9 +2799,7 @@ describe('streamText', () => { const dataStream = result.toDataStream(); expect( - await convertReadableStreamToArray( - dataStream.pipeThrough(new TextDecoderStream()), - ), + await convertReadableStreamToArray(dataStream), ).toMatchSnapshot(); }); diff --git a/packages/ai/core/generate-text/stream-text.ts b/packages/ai/core/generate-text/stream-text.ts index 7be33137e342..9ffc6440b054 100644 --- a/packages/ai/core/generate-text/stream-text.ts +++ b/packages/ai/core/generate-text/stream-text.ts @@ -7,7 +7,6 @@ import { Span } from '@opentelemetry/api'; import { ServerResponse } from 'node:http'; import { InvalidArgumentError } from '../../errors/invalid-argument-error'; import { NoOutputSpecifiedError } from '../../errors/no-output-specified-error'; -import { StreamData } from '../../streams/stream-data'; import { asArray } from '../../util/as-array'; import { consumeStream } from '../../util/consume-stream'; import { DelayedPromise } from '../../util/delayed-promise'; @@ -15,7 +14,7 @@ import { DataStreamWriter } from '../data-stream/data-stream-writer'; import { CallSettings } from '../prompt/call-settings'; import { ReasoningPart } from '../prompt/content-part'; import { convertToLanguageModelPrompt } from '../prompt/convert-to-language-model-prompt'; -import { CoreAssistantMessage } from '../prompt/message'; +import { AssistantModelMessage } from '../prompt/message'; import { prepareCallSettings } from '../prompt/prepare-call-settings'; import { prepareRetries } from '../prompt/prepare-retries'; import { prepareToolsAndToolChoice } from '../prompt/prepare-tools-and-tool-choice'; @@ -41,7 +40,6 @@ import { createAsyncIterableStream, } from '../util/async-iterable-stream'; import { createStitchableStream } from '../util/create-stitchable-stream'; -import { mergeStreams } from '../util/merge-streams'; import { now as originalNow } from '../util/now'; import { prepareOutgoingHttpHeaders } from '../util/prepare-outgoing-http-headers'; import { prepareResponseHeaders } from '../util/prepare-response-headers'; @@ -1369,7 +1367,7 @@ class DefaultStreamTextResult // so we can assume that there is a single last assistant message: const lastMessage = responseMessages[ responseMessages.length - 1 - ] as CoreAssistantMessage; + ] as AssistantModelMessage; if (typeof lastMessage.content === 'string') { lastMessage.content += stepText; @@ -1743,7 +1741,6 @@ However, the LLM results are expected to be small enough to not cause issues. status, statusText, headers, - data, getErrorMessage, sendUsage, sendReasoning, @@ -1751,7 +1748,6 @@ However, the LLM results are expected to be small enough to not cause issues. experimental_sendFinish, }: ResponseInit & DataStreamOptions & { - data?: StreamData; getErrorMessage?: (error: unknown) => string; } = {}, ) { @@ -1764,13 +1760,12 @@ However, the LLM results are expected to be small enough to not cause issues. dataStreamVersion: 'v1', }), stream: this.toDataStream({ - data, getErrorMessage, sendUsage, sendReasoning, sendSources, experimental_sendFinish, - }), + }).pipeThrough(new TextEncoderStream()), }); } @@ -1786,22 +1781,18 @@ However, the LLM results are expected to be small enough to not cause issues. }); } - // TODO breaking change 5.0: remove pipeThrough(new TextEncoderStream()) toDataStream( options?: DataStreamOptions & { - data?: StreamData; getErrorMessage?: (error: unknown) => string; }, ) { - const stream = this.toDataStreamInternal({ + return this.toDataStreamInternal({ getErrorMessage: options?.getErrorMessage, sendUsage: options?.sendUsage, sendReasoning: options?.sendReasoning, sendSources: options?.sendSources, experimental_sendFinish: options?.experimental_sendFinish, - }).pipeThrough(new TextEncoderStream()); - - return options?.data ? mergeStreams(options?.data.stream, stream) : stream; + }); } mergeIntoDataStream(writer: DataStreamWriter, options?: DataStreamOptions) { @@ -1820,7 +1811,6 @@ However, the LLM results are expected to be small enough to not cause issues. headers, status, statusText, - data, getErrorMessage, sendUsage, sendReasoning, @@ -1828,18 +1818,16 @@ However, the LLM results are expected to be small enough to not cause issues. experimental_sendFinish, }: ResponseInit & DataStreamOptions & { - data?: StreamData; getErrorMessage?: (error: unknown) => string; } = {}): Response { return new Response( this.toDataStream({ - data, getErrorMessage, sendUsage, sendReasoning, sendSources, experimental_sendFinish, - }), + }).pipeThrough(new TextEncoderStream()), { status, statusText, diff --git a/packages/ai/core/util/data-stream-parts.ts b/packages/ai/core/util/data-stream-parts.ts index 7f7e69b128b5..afa01e12efb4 100644 --- a/packages/ai/core/util/data-stream-parts.ts +++ b/packages/ai/core/util/data-stream-parts.ts @@ -6,7 +6,7 @@ import { import { ToolCall, ToolResult } from '@ai-sdk/provider-utils'; import { JSONValue } from '../types'; -export type DataStreamString = +export type DataStreamText = `${(typeof DataStreamStringPrefixes)[keyof typeof DataStreamStringPrefixes]}:${string}\n`; export interface DataStreamPart< @@ -559,7 +559,7 @@ It ensures type-safety for the part type and value. export function formatDataStreamPart( type: T, value: DataStreamPartValueType[T], -): DataStreamString { +): DataStreamText { const streamPart = dataStreamParts.find(part => part.name === type); if (!streamPart) { diff --git a/packages/ai/core/util/index.ts b/packages/ai/core/util/index.ts index a2b298e3af57..cdf06bf6a7b8 100644 --- a/packages/ai/core/util/index.ts +++ b/packages/ai/core/util/index.ts @@ -12,7 +12,10 @@ export { callChatApi } from './call-chat-api'; export { callCompletionApi } from './call-completion-api'; export { convertFileListToFileUIParts } from './convert-file-list-to-file-ui-parts'; export { formatDataStreamPart, parseDataStreamPart } from './data-stream-parts'; -export type { DataStreamPart, DataStreamString } from './data-stream-parts'; +export type { + DataStreamPart, + DataStreamText as DataStreamString, +} from './data-stream-parts'; export { getTextFromDataUrl } from './data-url'; export type { DeepPartial } from './deep-partial'; export { extractMaxToolInvocationStep } from './extract-max-tool-invocation-step'; diff --git a/packages/ai/core/util/merge-streams.test.ts b/packages/ai/core/util/merge-streams.test.ts deleted file mode 100644 index 67f595b4a211..000000000000 --- a/packages/ai/core/util/merge-streams.test.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { - convertArrayToReadableStream, - convertReadableStreamToArray, - isNodeVersion, -} from '@ai-sdk/provider-utils/test'; -import { expect, it } from 'vitest'; -import { mergeStreams } from './merge-streams'; - -it('should prioritize the first stream over the second stream', async () => { - const stream1 = convertArrayToReadableStream(['1a', '1b', '1c']); - const stream2 = convertArrayToReadableStream(['2a', '2b', '2c']); - - const mergedStream = mergeStreams(stream1, stream2); - - expect(await convertReadableStreamToArray(mergedStream)).toEqual([ - '1a', - '1b', - '1c', - '2a', - '2b', - '2c', - ]); -}); - -it.skipIf(isNodeVersion(20))( - 'should return values from the 2nd stream until the 1st stream has values', - async () => { - let stream1Controller: ReadableStreamDefaultController | undefined; - const stream1 = new ReadableStream({ - start(controller) { - stream1Controller = controller; - }, - }); - - let stream2Controller: ReadableStreamDefaultController | undefined; - const stream2 = new ReadableStream({ - start(controller) { - stream2Controller = controller; - }, - }); - - const mergedStream = mergeStreams(stream1, stream2); - - const result: string[] = []; - const reader = mergedStream.getReader(); - - async function pull() { - const { value, done } = await reader.read(); - result.push(value!); - } - - stream2Controller!.enqueue('2a'); - stream2Controller!.enqueue('2b'); - - await pull(); - await pull(); - - stream2Controller!.enqueue('2c'); // comes later - stream2Controller!.enqueue('2d'); // comes later - stream1Controller!.enqueue('1a'); - stream2Controller!.enqueue('2e'); // comes later - stream1Controller!.enqueue('1b'); - stream1Controller!.enqueue('1c'); - stream2Controller!.enqueue('2f'); - - await pull(); - await pull(); - await pull(); - await pull(); - await pull(); - - stream1Controller!.close(); - stream2Controller!.close(); - - await pull(); - await pull(); - - expect(result).toMatchInlineSnapshot(` - [ - "2a", - "2b", - "1a", - "1b", - "1c", - "2c", - "2d", - "2e", - "2f", - ] - `); - }, -); - -it('should not duplicate last value when parallel calls happen', async () => { - let stream1Controller: ReadableStreamDefaultController | undefined; - const stream1 = new ReadableStream({ - start(controller) { - stream1Controller = controller; - }, - }); - - stream1Controller!.enqueue('1a'); - stream1Controller!.close(); - - let stream2Controller: ReadableStreamDefaultController | undefined; - const stream2 = new ReadableStream({ - start(controller) { - stream2Controller = controller; - }, - }); - - const mergedStream = mergeStreams(stream1, stream2); - - const reader = mergedStream.getReader(); - - const resultsPromise = Promise.all([ - reader.read(), - reader.read(), - reader.read(), - reader.read(), - reader.read(), - ]); - - stream2Controller!.enqueue('2a'); - stream2Controller!.enqueue('2b'); - stream2Controller!.enqueue('2c'); - stream2Controller!.close(); - - const values = (await resultsPromise).map(result => result.value); - - expect(values).toEqual(['1a', '2a', '2b', '2c', undefined]); -}); diff --git a/packages/ai/core/util/merge-streams.ts b/packages/ai/core/util/merge-streams.ts deleted file mode 100644 index 9e1dd7d31e86..000000000000 --- a/packages/ai/core/util/merge-streams.ts +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Merges two readable streams into a single readable stream, emitting values - * from each stream as they become available. - * - * The first stream is prioritized over the second stream. If both streams have - * values available, the first stream's value is emitted first. - * - * @template VALUE1 - The type of values emitted by the first stream. - * @template VALUE2 - The type of values emitted by the second stream. - * @param {ReadableStream} stream1 - The first readable stream. - * @param {ReadableStream} stream2 - The second readable stream. - * @returns {ReadableStream} A new readable stream that emits values from both input streams. - */ -export function mergeStreams( - stream1: ReadableStream, - stream2: ReadableStream, -): ReadableStream { - const reader1 = stream1.getReader(); - const reader2 = stream2.getReader(); - - let lastRead1: Promise> | undefined = - undefined; - let lastRead2: Promise> | undefined = - undefined; - - let stream1Done = false; - let stream2Done = false; - - // only use when stream 2 is done: - async function readStream1( - controller: ReadableStreamDefaultController, - ) { - try { - if (lastRead1 == null) { - lastRead1 = reader1.read(); - } - - const result = await lastRead1; - lastRead1 = undefined; - - if (!result.done) { - controller.enqueue(result.value); - } else { - controller.close(); - } - } catch (error) { - controller.error(error); - } - } - - // only use when stream 1 is done: - async function readStream2( - controller: ReadableStreamDefaultController, - ) { - try { - if (lastRead2 == null) { - lastRead2 = reader2.read(); - } - - const result = await lastRead2; - lastRead2 = undefined; - - if (!result.done) { - controller.enqueue(result.value); - } else { - controller.close(); - } - } catch (error) { - controller.error(error); - } - } - - return new ReadableStream({ - async pull(controller) { - try { - // stream 1 is done, we can only read from stream 2: - if (stream1Done) { - await readStream2(controller); - return; - } - - // stream 2 is done, we can only read from stream 1: - if (stream2Done) { - await readStream1(controller); - return; - } - - // pull the next value from the stream that was read last: - if (lastRead1 == null) { - lastRead1 = reader1.read(); - } - if (lastRead2 == null) { - lastRead2 = reader2.read(); - } - - // Note on Promise.race (prioritizing stream 1 over stream 2): - // If the iterable contains one or more non-promise values and/or an already settled promise, - // then Promise.race() will settle to the first of these values found in the iterable. - const { result, reader } = await Promise.race([ - lastRead1.then(result => ({ result, reader: reader1 })), - lastRead2.then(result => ({ result, reader: reader2 })), - ]); - - if (!result.done) { - controller.enqueue(result.value); - } - - if (reader === reader1) { - lastRead1 = undefined; - if (result.done) { - // stream 1 is done, we can only read from stream 2: - await readStream2(controller); - stream1Done = true; - } - } else { - lastRead2 = undefined; - // stream 2 is done, we can only read from stream 1: - if (result.done) { - stream2Done = true; - await readStream1(controller); - } - } - } catch (error) { - controller.error(error); - } - }, - cancel() { - reader1.cancel(); - reader2.cancel(); - }, - }); -} diff --git a/packages/ai/core/util/process-chat-response.test.ts b/packages/ai/core/util/process-chat-response.test.ts index 5dd27f59a5c5..798ec0b70fec 100644 --- a/packages/ai/core/util/process-chat-response.test.ts +++ b/packages/ai/core/util/process-chat-response.test.ts @@ -1,12 +1,12 @@ import { LanguageModelV2FinishReason } from '@ai-sdk/provider'; import { convertArrayToReadableStream } from '@ai-sdk/provider-utils/test'; import { describe, expect, it, vi } from 'vitest'; -import { DataStreamString, formatDataStreamPart } from './data-stream-parts'; +import { DataStreamText, formatDataStreamPart } from './data-stream-parts'; import { processChatResponse } from './process-chat-response'; import { JSONValue, LanguageModelUsage, UIMessage } from '../types'; function createDataProtocolStream( - dataPartTexts: DataStreamString[], + dataPartTexts: DataStreamText[], ): ReadableStream { return convertArrayToReadableStream(dataPartTexts).pipeThrough( new TextEncoderStream(), diff --git a/packages/ai/internal/index.ts b/packages/ai/internal/index.ts index 0de69e318975..1c2461b08088 100644 --- a/packages/ai/internal/index.ts +++ b/packages/ai/internal/index.ts @@ -5,12 +5,10 @@ export { prepareCallSettings } from '../core/prompt/prepare-call-settings'; export { convertToLanguageModelPrompt } from '../core/prompt/convert-to-language-model-prompt'; export { formatDataStreamPart } from '../core'; export { type DataStreamWriter } from '../core/data-stream/data-stream-writer'; -export { mergeStreams } from '../core/util/merge-streams'; export { prepareResponseHeaders } from '../core/util/prepare-response-headers'; export { createCallbacksTransformer, type StreamCallbacks, } from '../streams/stream-callbacks'; -export { StreamData } from '../streams/stream-data'; export * from '../util/constants'; diff --git a/packages/ai/streams/index.ts b/packages/ai/streams/index.ts index 678883e3a971..a4a976957e0d 100644 --- a/packages/ai/streams/index.ts +++ b/packages/ai/streams/index.ts @@ -1,3 +1,2 @@ export * from '../core/index'; export * from '../errors/index'; -export * from './stream-data'; diff --git a/packages/ai/streams/stream-data.ts b/packages/ai/streams/stream-data.ts deleted file mode 100644 index 4320c59f4205..000000000000 --- a/packages/ai/streams/stream-data.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { JSONValue, formatDataStreamPart } from '../core'; -import { HANGING_STREAM_WARNING_TIME_MS } from '../util/constants'; - -/** - * A stream wrapper to send custom JSON-encoded data back to the client. - * - * @deprecated Please use `createDataStream`, `createDataStreamResponse`, and `pipeDataStreamToResponse` instead. - */ -export class StreamData { - private encoder = new TextEncoder(); - - private controller: ReadableStreamController | null = null; - public stream: ReadableStream; - - private isClosed: boolean = false; - private warningTimeout: NodeJS.Timeout | null = null; - - constructor() { - const self = this; - - this.stream = new ReadableStream({ - start: async controller => { - self.controller = controller; - - // Set a timeout to show a warning if the stream is not closed within 3 seconds - if (process.env.NODE_ENV === 'development') { - self.warningTimeout = setTimeout(() => { - console.warn( - 'The data stream is hanging. Did you forget to close it with `data.close()`?', - ); - }, HANGING_STREAM_WARNING_TIME_MS); - } - }, - pull: controller => { - // No-op: we don't need to do anything special on pull - }, - cancel: reason => { - this.isClosed = true; - }, - }); - } - - async close(): Promise { - if (this.isClosed) { - throw new Error('Data Stream has already been closed.'); - } - - if (!this.controller) { - throw new Error('Stream controller is not initialized.'); - } - - this.controller.close(); - this.isClosed = true; - - // Clear the warning timeout if the stream is closed - if (this.warningTimeout) { - clearTimeout(this.warningTimeout); - } - } - - append(value: JSONValue): void { - if (this.isClosed) { - throw new Error('Data Stream has already been closed.'); - } - - if (!this.controller) { - throw new Error('Stream controller is not initialized.'); - } - - this.controller.enqueue( - this.encoder.encode(formatDataStreamPart('data', [value])), - ); - } - - appendMessageAnnotation(value: JSONValue): void { - if (this.isClosed) { - throw new Error('Data Stream has already been closed.'); - } - - if (!this.controller) { - throw new Error('Stream controller is not initialized.'); - } - - this.controller.enqueue( - this.encoder.encode(formatDataStreamPart('message_annotations', [value])), - ); - } -} diff --git a/packages/langchain/src/langchain-adapter.ts b/packages/langchain/src/langchain-adapter.ts index 48c9b2538ca1..468ff3febc51 100644 --- a/packages/langchain/src/langchain-adapter.ts +++ b/packages/langchain/src/langchain-adapter.ts @@ -1,8 +1,7 @@ -import { formatDataStreamPart, DataStreamWriter, StreamData } from 'ai'; +import { DataStreamWriter, formatDataStreamPart } from 'ai'; import { - mergeStreams, - prepareResponseHeaders, createCallbacksTransformer, + prepareResponseHeaders, StreamCallbacks, } from 'ai/internal'; @@ -118,7 +117,6 @@ export function toDataStreamResponse( | ReadableStream, options?: { init?: ResponseInit; - data?: StreamData; callbacks?: StreamCallbacks; }, ) { @@ -126,14 +124,9 @@ export function toDataStreamResponse( stream, options?.callbacks, ).pipeThrough(new TextEncoderStream()); - const data = options?.data; const init = options?.init; - const responseStream = data - ? mergeStreams(data.stream, dataStream) - : dataStream; - - return new Response(responseStream, { + return new Response(dataStream, { status: init?.status ?? 200, statusText: init?.statusText, headers: prepareResponseHeaders(init?.headers, { diff --git a/packages/llamaindex/src/llamaindex-adapter.ts b/packages/llamaindex/src/llamaindex-adapter.ts index c687e2d723e3..fb03a0632593 100644 --- a/packages/llamaindex/src/llamaindex-adapter.ts +++ b/packages/llamaindex/src/llamaindex-adapter.ts @@ -1,7 +1,6 @@ import { convertAsyncIteratorToReadableStream } from '@ai-sdk/provider-utils'; -import { formatDataStreamPart, DataStreamWriter, StreamData } from 'ai'; +import { formatDataStreamPart, DataStreamWriter } from 'ai'; import { - mergeStreams, prepareResponseHeaders, createCallbacksTransformer, StreamCallbacks, @@ -49,26 +48,24 @@ export function toDataStreamResponse( stream: AsyncIterable, options: { init?: ResponseInit; - data?: StreamData; callbacks?: StreamCallbacks; } = {}, ) { - const { init, data, callbacks } = options; - const dataStream = toDataStreamInternal(stream, callbacks).pipeThrough( - new TextEncoderStream(), - ); - const responseStream = data - ? mergeStreams(data.stream, dataStream) - : dataStream; + const { init, callbacks } = options; - return new Response(responseStream, { - status: init?.status ?? 200, - statusText: init?.statusText, - headers: prepareResponseHeaders(init?.headers, { - contentType: 'text/plain; charset=utf-8', - dataStreamVersion: 'v1', - }), - }); + return new Response( + toDataStreamInternal(stream, callbacks).pipeThrough( + new TextEncoderStream(), + ), + { + status: init?.status ?? 200, + statusText: init?.statusText, + headers: prepareResponseHeaders(init?.headers, { + contentType: 'text/plain; charset=utf-8', + dataStreamVersion: 'v1', + }), + }, + ); } export function mergeIntoDataStream(