Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/two-roses-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': major
---

chore (ai): remove StreamData and mergeStreams
Original file line number Diff line number Diff line change
Expand Up @@ -4123,25 +4123,6 @@ exports[`streamText > result.pipeDataStreamToResponse > should support custom er
]
`;

exports[`streamText > result.pipeDataStreamToResponse > should support merging with existing stream data 1`] = `
[
"2:["stream-data-value"]
",
"f:{"messageId":"msg-0"}
",
"0:"Hello"
",
"0:", "
",
"0:"world!"
",
"e:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13},"isContinued":false}
",
"d:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13}}
",
]
`;

exports[`streamText > result.pipeDataStreamToResponse > should suppress usage information when sendUsage is false 1`] = `
[
"f:{"messageId":"msg-0"}
Expand Down Expand Up @@ -4793,25 +4774,6 @@ exports[`streamText > result.toDataStream > should support custom error messages
]
`;

exports[`streamText > result.toDataStream > should support merging with existing stream data 1`] = `
[
"2:["stream-data-value"]
",
"f:{"messageId":"msg-0"}
",
"0:"Hello"
",
"0:", "
",
"0:"world!"
",
"e:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13},"isContinued":false}
",
"d:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13}}
",
]
`;

exports[`streamText > result.toDataStream > should suppress usage information when sendUsage is false 1`] = `
[
"f:{"messageId":"msg-0"}
Expand Down Expand Up @@ -4885,25 +4847,6 @@ exports[`streamText > result.toDataStreamResponse > should support custom error
]
`;

exports[`streamText > result.toDataStreamResponse > should support merging with existing stream data 1`] = `
[
"2:["stream-data-value"]
",
"f:{"messageId":"msg-0"}
",
"0:"Hello"
",
"0:", "
",
"0:"world!"
",
"e:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13},"isContinued":false}
",
"d:{"finishReason":"stop","usage":{"inputTokens":3,"outputTokens":10,"totalTokens":13}}
",
]
`;

exports[`streamText > result.toDataStreamResponse > should suppress usage information when sendUsage is false 1`] = `
[
"f:{"messageId":"msg-0"}
Expand Down
9 changes: 3 additions & 6 deletions packages/ai/core/generate-text/stream-text-result.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ServerResponse } from 'node:http';
import { StreamData } from '../../streams/stream-data';
import { DataStreamWriter } from '../data-stream/data-stream-writer';
import { ReasoningPart } from '../prompt/content-part';
import {
CallWarning,
FinishReason,
Expand All @@ -11,12 +11,12 @@ import { Source } from '../types/language-model';
import { LanguageModelResponseMetadata } from '../types/language-model-response-metadata';
import { LanguageModelUsage } from '../types/usage';
import { AsyncIterableStream } from '../util/async-iterable-stream';
import { DataStreamText } from '../util/data-stream-parts';
import { GeneratedFile } from './generated-file';
import { ResponseMessage, StepResult } from './step-result';
import { ToolCallUnion } from './tool-call';
import { ToolResultUnion } from './tool-result';
import { ToolSet } from './tool-set';
import { ReasoningPart } from '../prompt/content-part';

export type DataStreamOptions = {
/**
Expand Down Expand Up @@ -215,10 +215,9 @@ If an error occurs, it is passed to the optional `onError` callback.
*/
toDataStream(
options?: {
data?: StreamData;
getErrorMessage?: (error: unknown) => string;
} & DataStreamOptions,
): ReadableStream<Uint8Array>;
): ReadableStream<DataStreamText>;

/**
* Merges the result as a data stream into another data stream.
Expand Down Expand Up @@ -247,7 +246,6 @@ If an error occurs, it is passed to the optional `onError` callback.
pipeDataStreamToResponse(
response: ServerResponse,
options?: ResponseInit & {
data?: StreamData;
getErrorMessage?: (error: unknown) => string;
} & DataStreamOptions,
): void;
Expand Down Expand Up @@ -278,7 +276,6 @@ If an error occurs, it is passed to the optional `onError` callback.
*/
toDataStreamResponse(
options?: ResponseInit & {
data?: StreamData;
getErrorMessage?: (error: unknown) => string;
} & DataStreamOptions,
): Response;
Expand Down
132 changes: 12 additions & 120 deletions packages/ai/core/generate-text/stream-text.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
} from '@ai-sdk/provider-utils/test';
import assert from 'node:assert';
import { z } from 'zod';
import { StreamData } from '../../streams/stream-data';
import { createDataStream } from '../data-stream/create-data-stream';
import { MockLanguageModelV2 } from '../test/mock-language-model-v2';
import { createMockServerResponse } from '../test/mock-server-response';
Expand Down Expand Up @@ -873,34 +872,6 @@ describe('streamText', () => {
expect(mockResponse.getDecodedChunks()).toMatchSnapshot();
});

it('should support merging with existing stream data', async () => {
const mockResponse = createMockServerResponse();

const result = streamText({
model: createTestModel(),
prompt: 'test-input',
experimental_generateMessageId: mockId({ prefix: 'msg' }),
});

const streamData = new StreamData();
streamData.append('stream-data-value');
streamData.close();

result.pipeDataStreamToResponse(mockResponse, {
data: streamData,
});

await mockResponse.waitForEnd();

expect(mockResponse.statusCode).toBe(200);
expect(mockResponse.headers).toEqual({
'Content-Type': 'text/plain; charset=utf-8',
'X-Vercel-AI-Data-Stream': 'v1',
});

expect(mockResponse.getDecodedChunks()).toMatchSnapshot();
});

it('should mask error messages by default', async () => {
const mockResponse = createMockServerResponse();

Expand Down Expand Up @@ -1099,30 +1070,7 @@ describe('streamText', () => {

const dataStream = result.toDataStream();

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
});

it('should support merging with existing stream data', async () => {
const result = streamText({
model: createTestModel(),
...defaultSettings(),
});

const streamData = new StreamData();
streamData.append('stream-data-value');
streamData.close();

const dataStream = result.toDataStream({ data: streamData });

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should send tool call and tool result stream parts', async () => {
Expand Down Expand Up @@ -1167,9 +1115,7 @@ describe('streamText', () => {
});

expect(
await convertReadableStreamToArray(
result.toDataStream().pipeThrough(new TextDecoderStream()),
),
await convertReadableStreamToArray(result.toDataStream()),
).toMatchSnapshot();
});

Expand Down Expand Up @@ -1216,9 +1162,7 @@ describe('streamText', () => {
});

expect(
await convertReadableStreamToArray(
result.toDataStream().pipeThrough(new TextDecoderStream()),
),
await convertReadableStreamToArray(result.toDataStream()),
).toMatchSnapshot();
});

Expand All @@ -1234,11 +1178,7 @@ describe('streamText', () => {

const dataStream = result.toDataStream();

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should support custom error messages', async () => {
Expand All @@ -1255,11 +1195,7 @@ describe('streamText', () => {
getErrorMessage: error => `custom error message: ${error}`,
});

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should suppress usage information when sendUsage is false', async () => {
Expand All @@ -1279,11 +1215,7 @@ describe('streamText', () => {

const dataStream = result.toDataStream({ sendUsage: false });

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should omit message finish event (d:) when sendFinish is false', async () => {
Expand All @@ -1305,11 +1237,7 @@ describe('streamText', () => {
experimental_sendFinish: false,
});

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should send reasoning content when sendReasoning is true', async () => {
Expand All @@ -1320,11 +1248,7 @@ describe('streamText', () => {

const dataStream = result.toDataStream({ sendReasoning: true });

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should send source content when sendSources is true', async () => {
Expand All @@ -1335,11 +1259,7 @@ describe('streamText', () => {

const dataStream = result.toDataStream({ sendSources: true });

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});

it('should send file content', async () => {
Expand All @@ -1350,11 +1270,7 @@ describe('streamText', () => {

const dataStream = result.toDataStream();

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
).toMatchSnapshot();
expect(await convertReadableStreamToArray(dataStream)).toMatchSnapshot();
});
});

Expand Down Expand Up @@ -1404,26 +1320,6 @@ describe('streamText', () => {
expect(await convertResponseStreamToArray(response)).toMatchSnapshot();
});

it('should support merging with existing stream data', async () => {
const result = streamText({
model: createTestModel(),
prompt: 'test-input',
experimental_generateMessageId: mockId({ prefix: 'msg' }),
});

const streamData = new StreamData();
streamData.append('stream-data-value');
streamData.close();

const response = result.toDataStreamResponse({ data: streamData });

expect(response.status).toStrictEqual(200);
expect(response.headers.get('Content-Type')).toStrictEqual(
'text/plain; charset=utf-8',
);
expect(await convertResponseStreamToArray(response)).toMatchSnapshot();
});

it('should mask error messages by default', async () => {
const result = streamText({
model: createTestModel({
Expand Down Expand Up @@ -1654,9 +1550,7 @@ describe('streamText', () => {
expect({
textStream: await convertAsyncIterableToArray(result.textStream),
fullStream: await convertAsyncIterableToArray(result.fullStream),
dataStream: await convertReadableStreamToArray(
result.toDataStream().pipeThrough(new TextDecoderStream()),
),
dataStream: await convertReadableStreamToArray(result.toDataStream()),
}).toMatchSnapshot();
});
});
Expand Down Expand Up @@ -2905,9 +2799,7 @@ describe('streamText', () => {
const dataStream = result.toDataStream();

expect(
await convertReadableStreamToArray(
dataStream.pipeThrough(new TextDecoderStream()),
),
await convertReadableStreamToArray(dataStream),
).toMatchSnapshot();
});

Expand Down
Loading
Loading