diff --git a/e2e/react-start/basic/src/raw-stream-fns.ts b/e2e/react-start/basic/src/raw-stream-fns.ts new file mode 100644 index 00000000000..b0a3b51ef81 --- /dev/null +++ b/e2e/react-start/basic/src/raw-stream-fns.ts @@ -0,0 +1,480 @@ +import { createServerFn, RawStream } from '@tanstack/react-start' + +// Helper to create a delayed Uint8Array stream +function createDelayedStream( + chunks: Array, + delayMs: number, +): ReadableStream { + return new ReadableStream({ + async start(controller) { + for (const chunk of chunks) { + await new Promise((resolve) => setTimeout(resolve, delayMs)) + controller.enqueue(chunk) + } + controller.close() + }, + }) +} + +// Helper to create a stream with variable delays per chunk +// Each entry is [chunk, delayBeforeMs] - delay happens BEFORE enqueueing the chunk +function createVariableDelayStream( + chunksWithDelays: Array<[Uint8Array, number]>, +): ReadableStream { + return new ReadableStream({ + async start(controller) { + for (const [chunk, delayMs] of chunksWithDelays) { + await new Promise((resolve) => setTimeout(resolve, delayMs)) + controller.enqueue(chunk) + } + controller.close() + }, + }) +} + +// Helper to encode text to Uint8Array +function encode(text: string): Uint8Array { + return new TextEncoder().encode(text) +} + +// Export helpers for use in components and SSR routes +export { encode, createDelayedStream, concatBytes } + +// Expected data for hint tests - defined here for both server and client verification +// Test 7: Text hint with pure text +export const TEST7_CHUNKS = [ + encode('Hello, '), + encode('World! '), + encode('This is text.'), +] +export const TEST7_EXPECTED = concatBytes(TEST7_CHUNKS) + +// Test 8: Text hint with pure binary (invalid UTF-8) +export const TEST8_CHUNKS = [ + new Uint8Array([0xff, 0xfe, 0x00, 0x01, 0x80, 0x90]), + new Uint8Array([0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0]), +] +export const TEST8_EXPECTED = concatBytes(TEST8_CHUNKS) + +// Test 9: Text hint with mixed content +export const TEST9_CHUNKS = [ + encode('Valid UTF-8 text'), + new Uint8Array([0xff, 0xfe, 0x80, 0x90]), // Invalid UTF-8 + encode(' More text'), +] +export const TEST9_EXPECTED = concatBytes(TEST9_CHUNKS) + +// Test 10: Binary hint with text data +export const TEST10_CHUNKS = [encode('This is text but using binary hint')] +export const TEST10_EXPECTED = concatBytes(TEST10_CHUNKS) + +// Test 11: Binary hint with pure binary +export const TEST11_CHUNKS = [ + new Uint8Array([0x00, 0x01, 0x02, 0xff, 0xfe, 0xfd]), +] +export const TEST11_EXPECTED = concatBytes(TEST11_CHUNKS) + +// Helper to concatenate byte arrays +function concatBytes(chunks: Array): Uint8Array { + const totalLength = chunks.reduce((acc, c) => acc + c.length, 0) + const result = new Uint8Array(totalLength) + let offset = 0 + for (const chunk of chunks) { + result.set(chunk, offset) + offset += chunk.length + } + return result +} + +// Test 1: Simple single raw stream +export const singleRawStreamFn = createServerFn().handler(async () => { + const stream = createDelayedStream( + [encode('chunk1'), encode('chunk2'), encode('chunk3')], + 50, + ) + return { + message: 'Single stream test', + data: new RawStream(stream), + } +}) + +// Test 2: Multiple raw streams +export const multipleRawStreamsFn = createServerFn().handler(async () => { + const stream1 = createDelayedStream( + [encode('stream1-a'), encode('stream1-b')], + 30, + ) + const stream2 = createDelayedStream( + [encode('stream2-a'), encode('stream2-b')], + 50, + ) + return { + message: 'Multiple streams test', + first: new RawStream(stream1), + second: new RawStream(stream2), + } +}) + +// Test 3: JSON streaming ends before raw stream +export const jsonEndsFirstFn = createServerFn().handler(async () => { + // Slow raw stream (takes 500ms total) + const slowStream = createDelayedStream( + [encode('slow-1'), encode('slow-2'), encode('slow-3'), encode('slow-4')], + 125, + ) + return { + message: 'JSON ends first test', + timestamp: Date.now(), + slowData: new RawStream(slowStream), + } +}) + +// Test 4: Raw stream ends before JSON streaming (fast stream, deferred JSON) +export const rawEndsFirstFn = createServerFn().handler(async () => { + // Fast raw stream (completes quickly) + const fastStream = createDelayedStream([encode('fast-done')], 10) + + // Deferred promise - NOT awaited, so it streams as deferred JSON + const deferredData = new Promise((resolve) => + setTimeout(() => resolve('deferred-json-data'), 200), + ) + + return { + message: 'Raw ends first test', + deferredData, + fastData: new RawStream(fastStream), + } +}) + +// Test 5: Large binary data +export const largeBinaryFn = createServerFn().handler(async () => { + // Create 1KB chunks + const chunk = new Uint8Array(1024) + for (let i = 0; i < chunk.length; i++) { + chunk[i] = i % 256 + } + + const stream = createDelayedStream([chunk, chunk, chunk], 20) + + return { + message: 'Large binary test', + size: 3072, + binary: new RawStream(stream), + } +}) + +// Test 6: Mixed streaming (promise + raw stream) +export const mixedStreamingFn = createServerFn().handler(async () => { + const rawStream = createDelayedStream( + [encode('mixed-raw-1'), encode('mixed-raw-2')], + 50, + ) + + return { + immediate: 'immediate-value', + deferred: new Promise((resolve) => + setTimeout(() => resolve('deferred-value'), 100), + ), + raw: new RawStream(rawStream), + } +}) + +// Test 7: Text hint with pure text data (should use UTF-8 encoding) +export const textHintPureTextFn = createServerFn().handler(async () => { + const stream = createDelayedStream(TEST7_CHUNKS, 30) + return { + message: 'Text hint with pure text', + data: new RawStream(stream, { hint: 'text' }), + } +}) + +// Test 8: Text hint with pure binary data (should fallback to base64) +export const textHintPureBinaryFn = createServerFn().handler(async () => { + const stream = createDelayedStream(TEST8_CHUNKS, 30) + return { + message: 'Text hint with pure binary', + data: new RawStream(stream, { hint: 'text' }), + } +}) + +// Test 9: Text hint with mixed content (some UTF-8, some binary) +export const textHintMixedFn = createServerFn().handler(async () => { + const stream = createDelayedStream(TEST9_CHUNKS, 30) + return { + message: 'Text hint with mixed content', + data: new RawStream(stream, { hint: 'text' }), + } +}) + +// Test 10: Binary hint with text data (should still use base64) +export const binaryHintTextFn = createServerFn().handler(async () => { + const stream = createDelayedStream(TEST10_CHUNKS, 30) + return { + message: 'Binary hint with text data', + data: new RawStream(stream, { hint: 'binary' }), + } +}) + +// Test 11: Binary hint with pure binary data +export const binaryHintBinaryFn = createServerFn().handler(async () => { + const stream = createDelayedStream(TEST11_CHUNKS, 30) + return { + message: 'Binary hint with binary data', + data: new RawStream(stream, { hint: 'binary' }), + } +}) + +// ============================================================================ +// MULTIPLEXING TESTS - Verify correct interleaving of multiple streams +// ============================================================================ + +// Expected data for multiplexing tests +// Test 12: Two streams with interleaved timing +// Stream A: sends at 0ms, 150ms, 200ms (3 chunks with pauses) +// Stream B: sends at 50ms, 100ms, 250ms (3 chunks, different rhythm) +export const TEST12_STREAM_A_CHUNKS: Array<[Uint8Array, number]> = [ + [encode('A1-first'), 0], // immediate + [encode('A2-after-pause'), 150], // 150ms pause + [encode('A3-quick'), 50], // 50ms after A2 +] +export const TEST12_STREAM_B_CHUNKS: Array<[Uint8Array, number]> = [ + [encode('B1-start'), 50], // 50ms after start + [encode('B2-continue'), 50], // 50ms after B1 + [encode('B3-final'), 150], // 150ms pause then final +] +export const TEST12_STREAM_A_EXPECTED = concatBytes( + TEST12_STREAM_A_CHUNKS.map(([chunk]) => chunk), +) +export const TEST12_STREAM_B_EXPECTED = concatBytes( + TEST12_STREAM_B_CHUNKS.map(([chunk]) => chunk), +) + +// Test 13: Burst-pause-burst pattern (single stream) +// 3 chunks quickly, long pause, 3 more chunks quickly +export const TEST13_CHUNKS: Array<[Uint8Array, number]> = [ + [encode('burst1-a'), 10], + [encode('burst1-b'), 10], + [encode('burst1-c'), 10], + [encode('pause-then-burst2-a'), 200], // long pause + [encode('burst2-b'), 10], + [encode('burst2-c'), 10], +] +export const TEST13_EXPECTED = concatBytes( + TEST13_CHUNKS.map(([chunk]) => chunk), +) + +// Test 14: Three concurrent streams with different patterns +// Stream A: fast steady (every 30ms) +// Stream B: slow steady (every 100ms) +// Stream C: burst pattern (quick-pause-quick) +export const TEST14_STREAM_A_CHUNKS: Array<[Uint8Array, number]> = [ + [encode('A1'), 30], + [encode('A2'), 30], + [encode('A3'), 30], + [encode('A4'), 30], +] +export const TEST14_STREAM_B_CHUNKS: Array<[Uint8Array, number]> = [ + [encode('B1-slow'), 100], + [encode('B2-slow'), 100], +] +export const TEST14_STREAM_C_CHUNKS: Array<[Uint8Array, number]> = [ + [encode('C1-burst'), 20], + [encode('C2-burst'), 20], + [encode('C3-after-pause'), 150], +] +export const TEST14_STREAM_A_EXPECTED = concatBytes( + TEST14_STREAM_A_CHUNKS.map(([chunk]) => chunk), +) +export const TEST14_STREAM_B_EXPECTED = concatBytes( + TEST14_STREAM_B_CHUNKS.map(([chunk]) => chunk), +) +export const TEST14_STREAM_C_EXPECTED = concatBytes( + TEST14_STREAM_C_CHUNKS.map(([chunk]) => chunk), +) + +// Test 12: Interleaved multiplexing - two streams with variable delays +export const interleavedStreamsFn = createServerFn().handler(async () => { + const streamA = createVariableDelayStream(TEST12_STREAM_A_CHUNKS) + const streamB = createVariableDelayStream(TEST12_STREAM_B_CHUNKS) + + return { + message: 'Interleaved streams test', + streamA: new RawStream(streamA), + streamB: new RawStream(streamB), + } +}) + +// Test 13: Burst-pause-burst pattern +export const burstPauseBurstFn = createServerFn().handler(async () => { + const stream = createVariableDelayStream(TEST13_CHUNKS) + + return { + message: 'Burst-pause-burst test', + data: new RawStream(stream), + } +}) + +// Test 14: Three concurrent streams +export const threeStreamsFn = createServerFn().handler(async () => { + const streamA = createVariableDelayStream(TEST14_STREAM_A_CHUNKS) + const streamB = createVariableDelayStream(TEST14_STREAM_B_CHUNKS) + const streamC = createVariableDelayStream(TEST14_STREAM_C_CHUNKS) + + return { + message: 'Three concurrent streams test', + fast: new RawStream(streamA), + slow: new RawStream(streamB), + burst: new RawStream(streamC), + } +}) + +// ============================================================================ +// EDGE CASE TESTS +// ============================================================================ + +// Test 15: Empty stream (zero bytes) +export const emptyStreamFn = createServerFn().handler(async () => { + // Stream that immediately closes with no data + const stream = new ReadableStream({ + start(controller) { + controller.close() + }, + }) + + return { + message: 'Empty stream test', + data: new RawStream(stream), + } +}) + +// Test 16: Stream that errors mid-flight +export const errorStreamFn = createServerFn().handler(async () => { + // Stream that sends some data then errors + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(encode('chunk-before-error')) + await new Promise((resolve) => setTimeout(resolve, 50)) + controller.error(new Error('Intentional stream error')) + }, + }) + + return { + message: 'Error stream test', + data: new RawStream(stream), + } +}) + +// Helpers for consuming streams (exported for use in components) +// Note: RawStream is the marker class used in loaders/server functions, +// but after SSR deserialization it becomes ReadableStream. +// We accept both types to handle the TypeScript mismatch. +export function createStreamConsumer() { + const decoder = new TextDecoder() + + return async function consumeStream( + stream: ReadableStream | RawStream, + ): Promise { + // Handle both RawStream (from type system) and ReadableStream (runtime) + const actualStream = + stream instanceof RawStream + ? stream.stream + : (stream as ReadableStream) + const reader = actualStream.getReader() + const chunks: Array = [] + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(decoder.decode(value, { stream: true })) + } + + return chunks.join('') + } +} + +export async function consumeBinaryStream( + stream: ReadableStream | RawStream, +): Promise { + // Handle both RawStream (from type system) and ReadableStream (runtime) + const actualStream = + stream instanceof RawStream + ? stream.stream + : (stream as ReadableStream) + const reader = actualStream.getReader() + let totalBytes = 0 + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { done, value } = await reader.read() + if (done) break + totalBytes += value.length + } + + return totalBytes +} + +// Helper to collect all bytes from a stream +export async function collectBytes( + stream: ReadableStream | RawStream, +): Promise { + const actualStream = + stream instanceof RawStream + ? stream.stream + : (stream as ReadableStream) + const reader = actualStream.getReader() + const chunks: Array = [] + let totalLength = 0 + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + totalLength += value.length + } + + const result = new Uint8Array(totalLength) + let pos = 0 + for (const chunk of chunks) { + result.set(chunk, pos) + pos += chunk.length + } + return result +} + +// Compare two Uint8Arrays byte-by-byte +export function compareBytes( + actual: Uint8Array, + expected: Uint8Array, +): { + match: boolean + mismatchIndex: number | null + actualLength: number + expectedLength: number +} { + if (actual.length !== expected.length) { + return { + match: false, + mismatchIndex: -1, // -1 indicates length mismatch + actualLength: actual.length, + expectedLength: expected.length, + } + } + for (let i = 0; i < actual.length; i++) { + if (actual[i] !== expected[i]) { + return { + match: false, + mismatchIndex: i, + actualLength: actual.length, + expectedLength: expected.length, + } + } + } + return { + match: true, + mismatchIndex: null, + actualLength: actual.length, + expectedLength: expected.length, + } +} diff --git a/e2e/react-start/basic/src/routeTree.gen.ts b/e2e/react-start/basic/src/routeTree.gen.ts index 56615ed3846..27d5e911452 100644 --- a/e2e/react-start/basic/src/routeTree.gen.ts +++ b/e2e/react-start/basic/src/routeTree.gen.ts @@ -14,6 +14,7 @@ import { Route as UsersRouteImport } from './routes/users' import { Route as TypeOnlyReexportRouteImport } from './routes/type-only-reexport' import { Route as StreamRouteImport } from './routes/stream' import { Route as ScriptsRouteImport } from './routes/scripts' +import { Route as RawStreamRouteImport } from './routes/raw-stream' import { Route as PostsRouteImport } from './routes/posts' import { Route as LinksRouteImport } from './routes/links' import { Route as InlineScriptsRouteImport } from './routes/inline-scripts' @@ -26,6 +27,7 @@ import { Route as IndexRouteImport } from './routes/index' import { Route as UsersIndexRouteImport } from './routes/users.index' import { Route as SearchParamsIndexRouteImport } from './routes/search-params/index' import { Route as RedirectIndexRouteImport } from './routes/redirect/index' +import { Route as RawStreamIndexRouteImport } from './routes/raw-stream/index' import { Route as PostsIndexRouteImport } from './routes/posts.index' import { Route as NotFoundIndexRouteImport } from './routes/not-found/index' import { Route as MultiCookieRedirectIndexRouteImport } from './routes/multi-cookie-redirect/index' @@ -33,6 +35,12 @@ import { Route as UsersUserIdRouteImport } from './routes/users.$userId' import { Route as SearchParamsLoaderThrowsRedirectRouteImport } from './routes/search-params/loader-throws-redirect' import { Route as SearchParamsDefaultRouteImport } from './routes/search-params/default' import { Route as RedirectTargetRouteImport } from './routes/redirect/$target' +import { Route as RawStreamSsrTextHintRouteImport } from './routes/raw-stream/ssr-text-hint' +import { Route as RawStreamSsrSingleRouteImport } from './routes/raw-stream/ssr-single' +import { Route as RawStreamSsrMultipleRouteImport } from './routes/raw-stream/ssr-multiple' +import { Route as RawStreamSsrMixedRouteImport } from './routes/raw-stream/ssr-mixed' +import { Route as RawStreamSsrBinaryHintRouteImport } from './routes/raw-stream/ssr-binary-hint' +import { Route as RawStreamClientCallRouteImport } from './routes/raw-stream/client-call' import { Route as PostsPostIdRouteImport } from './routes/posts.$postId' import { Route as NotFoundViaLoaderRouteImport } from './routes/not-found/via-loader' import { Route as NotFoundViaBeforeLoadRouteImport } from './routes/not-found/via-beforeLoad' @@ -79,6 +87,11 @@ const ScriptsRoute = ScriptsRouteImport.update({ path: '/scripts', getParentRoute: () => rootRouteImport, } as any) +const RawStreamRoute = RawStreamRouteImport.update({ + id: '/raw-stream', + path: '/raw-stream', + getParentRoute: () => rootRouteImport, +} as any) const PostsRoute = PostsRouteImport.update({ id: '/posts', path: '/posts', @@ -138,6 +151,11 @@ const RedirectIndexRoute = RedirectIndexRouteImport.update({ path: '/redirect/', getParentRoute: () => rootRouteImport, } as any) +const RawStreamIndexRoute = RawStreamIndexRouteImport.update({ + id: '/', + path: '/', + getParentRoute: () => RawStreamRoute, +} as any) const PostsIndexRoute = PostsIndexRouteImport.update({ id: '/', path: '/', @@ -175,6 +193,36 @@ const RedirectTargetRoute = RedirectTargetRouteImport.update({ path: '/redirect/$target', getParentRoute: () => rootRouteImport, } as any) +const RawStreamSsrTextHintRoute = RawStreamSsrTextHintRouteImport.update({ + id: '/ssr-text-hint', + path: '/ssr-text-hint', + getParentRoute: () => RawStreamRoute, +} as any) +const RawStreamSsrSingleRoute = RawStreamSsrSingleRouteImport.update({ + id: '/ssr-single', + path: '/ssr-single', + getParentRoute: () => RawStreamRoute, +} as any) +const RawStreamSsrMultipleRoute = RawStreamSsrMultipleRouteImport.update({ + id: '/ssr-multiple', + path: '/ssr-multiple', + getParentRoute: () => RawStreamRoute, +} as any) +const RawStreamSsrMixedRoute = RawStreamSsrMixedRouteImport.update({ + id: '/ssr-mixed', + path: '/ssr-mixed', + getParentRoute: () => RawStreamRoute, +} as any) +const RawStreamSsrBinaryHintRoute = RawStreamSsrBinaryHintRouteImport.update({ + id: '/ssr-binary-hint', + path: '/ssr-binary-hint', + getParentRoute: () => RawStreamRoute, +} as any) +const RawStreamClientCallRoute = RawStreamClientCallRouteImport.update({ + id: '/client-call', + path: '/client-call', + getParentRoute: () => RawStreamRoute, +} as any) const PostsPostIdRoute = PostsPostIdRouteImport.update({ id: '/$postId', path: '/$postId', @@ -285,6 +333,7 @@ export interface FileRoutesByFullPath { '/inline-scripts': typeof InlineScriptsRoute '/links': typeof LinksRoute '/posts': typeof PostsRouteWithChildren + '/raw-stream': typeof RawStreamRouteWithChildren '/scripts': typeof ScriptsRoute '/stream': typeof StreamRoute '/type-only-reexport': typeof TypeOnlyReexportRoute @@ -295,6 +344,12 @@ export interface FileRoutesByFullPath { '/not-found/via-beforeLoad': typeof NotFoundViaBeforeLoadRoute '/not-found/via-loader': typeof NotFoundViaLoaderRoute '/posts/$postId': typeof PostsPostIdRoute + '/raw-stream/client-call': typeof RawStreamClientCallRoute + '/raw-stream/ssr-binary-hint': typeof RawStreamSsrBinaryHintRoute + '/raw-stream/ssr-mixed': typeof RawStreamSsrMixedRoute + '/raw-stream/ssr-multiple': typeof RawStreamSsrMultipleRoute + '/raw-stream/ssr-single': typeof RawStreamSsrSingleRoute + '/raw-stream/ssr-text-hint': typeof RawStreamSsrTextHintRoute '/redirect/$target': typeof RedirectTargetRouteWithChildren '/search-params/default': typeof SearchParamsDefaultRoute '/search-params/loader-throws-redirect': typeof SearchParamsLoaderThrowsRedirectRoute @@ -302,6 +357,7 @@ export interface FileRoutesByFullPath { '/multi-cookie-redirect': typeof MultiCookieRedirectIndexRoute '/not-found/': typeof NotFoundIndexRoute '/posts/': typeof PostsIndexRoute + '/raw-stream/': typeof RawStreamIndexRoute '/redirect': typeof RedirectIndexRoute '/search-params/': typeof SearchParamsIndexRoute '/users/': typeof UsersIndexRoute @@ -334,12 +390,19 @@ export interface FileRoutesByTo { '/not-found/via-beforeLoad': typeof NotFoundViaBeforeLoadRoute '/not-found/via-loader': typeof NotFoundViaLoaderRoute '/posts/$postId': typeof PostsPostIdRoute + '/raw-stream/client-call': typeof RawStreamClientCallRoute + '/raw-stream/ssr-binary-hint': typeof RawStreamSsrBinaryHintRoute + '/raw-stream/ssr-mixed': typeof RawStreamSsrMixedRoute + '/raw-stream/ssr-multiple': typeof RawStreamSsrMultipleRoute + '/raw-stream/ssr-single': typeof RawStreamSsrSingleRoute + '/raw-stream/ssr-text-hint': typeof RawStreamSsrTextHintRoute '/search-params/default': typeof SearchParamsDefaultRoute '/search-params/loader-throws-redirect': typeof SearchParamsLoaderThrowsRedirectRoute '/users/$userId': typeof UsersUserIdRoute '/multi-cookie-redirect': typeof MultiCookieRedirectIndexRoute '/not-found': typeof NotFoundIndexRoute '/posts': typeof PostsIndexRoute + '/raw-stream': typeof RawStreamIndexRoute '/redirect': typeof RedirectIndexRoute '/search-params': typeof SearchParamsIndexRoute '/users': typeof UsersIndexRoute @@ -367,6 +430,7 @@ export interface FileRoutesById { '/inline-scripts': typeof InlineScriptsRoute '/links': typeof LinksRoute '/posts': typeof PostsRouteWithChildren + '/raw-stream': typeof RawStreamRouteWithChildren '/scripts': typeof ScriptsRoute '/stream': typeof StreamRoute '/type-only-reexport': typeof TypeOnlyReexportRoute @@ -378,6 +442,12 @@ export interface FileRoutesById { '/not-found/via-beforeLoad': typeof NotFoundViaBeforeLoadRoute '/not-found/via-loader': typeof NotFoundViaLoaderRoute '/posts/$postId': typeof PostsPostIdRoute + '/raw-stream/client-call': typeof RawStreamClientCallRoute + '/raw-stream/ssr-binary-hint': typeof RawStreamSsrBinaryHintRoute + '/raw-stream/ssr-mixed': typeof RawStreamSsrMixedRoute + '/raw-stream/ssr-multiple': typeof RawStreamSsrMultipleRoute + '/raw-stream/ssr-single': typeof RawStreamSsrSingleRoute + '/raw-stream/ssr-text-hint': typeof RawStreamSsrTextHintRoute '/redirect/$target': typeof RedirectTargetRouteWithChildren '/search-params/default': typeof SearchParamsDefaultRoute '/search-params/loader-throws-redirect': typeof SearchParamsLoaderThrowsRedirectRoute @@ -385,6 +455,7 @@ export interface FileRoutesById { '/multi-cookie-redirect/': typeof MultiCookieRedirectIndexRoute '/not-found/': typeof NotFoundIndexRoute '/posts/': typeof PostsIndexRoute + '/raw-stream/': typeof RawStreamIndexRoute '/redirect/': typeof RedirectIndexRoute '/search-params/': typeof SearchParamsIndexRoute '/users/': typeof UsersIndexRoute @@ -413,6 +484,7 @@ export interface FileRouteTypes { | '/inline-scripts' | '/links' | '/posts' + | '/raw-stream' | '/scripts' | '/stream' | '/type-only-reexport' @@ -423,6 +495,12 @@ export interface FileRouteTypes { | '/not-found/via-beforeLoad' | '/not-found/via-loader' | '/posts/$postId' + | '/raw-stream/client-call' + | '/raw-stream/ssr-binary-hint' + | '/raw-stream/ssr-mixed' + | '/raw-stream/ssr-multiple' + | '/raw-stream/ssr-single' + | '/raw-stream/ssr-text-hint' | '/redirect/$target' | '/search-params/default' | '/search-params/loader-throws-redirect' @@ -430,6 +508,7 @@ export interface FileRouteTypes { | '/multi-cookie-redirect' | '/not-found/' | '/posts/' + | '/raw-stream/' | '/redirect' | '/search-params/' | '/users/' @@ -462,12 +541,19 @@ export interface FileRouteTypes { | '/not-found/via-beforeLoad' | '/not-found/via-loader' | '/posts/$postId' + | '/raw-stream/client-call' + | '/raw-stream/ssr-binary-hint' + | '/raw-stream/ssr-mixed' + | '/raw-stream/ssr-multiple' + | '/raw-stream/ssr-single' + | '/raw-stream/ssr-text-hint' | '/search-params/default' | '/search-params/loader-throws-redirect' | '/users/$userId' | '/multi-cookie-redirect' | '/not-found' | '/posts' + | '/raw-stream' | '/redirect' | '/search-params' | '/users' @@ -494,6 +580,7 @@ export interface FileRouteTypes { | '/inline-scripts' | '/links' | '/posts' + | '/raw-stream' | '/scripts' | '/stream' | '/type-only-reexport' @@ -505,6 +592,12 @@ export interface FileRouteTypes { | '/not-found/via-beforeLoad' | '/not-found/via-loader' | '/posts/$postId' + | '/raw-stream/client-call' + | '/raw-stream/ssr-binary-hint' + | '/raw-stream/ssr-mixed' + | '/raw-stream/ssr-multiple' + | '/raw-stream/ssr-single' + | '/raw-stream/ssr-text-hint' | '/redirect/$target' | '/search-params/default' | '/search-params/loader-throws-redirect' @@ -512,6 +605,7 @@ export interface FileRouteTypes { | '/multi-cookie-redirect/' | '/not-found/' | '/posts/' + | '/raw-stream/' | '/redirect/' | '/search-params/' | '/users/' @@ -540,6 +634,7 @@ export interface RootRouteChildren { InlineScriptsRoute: typeof InlineScriptsRoute LinksRoute: typeof LinksRoute PostsRoute: typeof PostsRouteWithChildren + RawStreamRoute: typeof RawStreamRouteWithChildren ScriptsRoute: typeof ScriptsRoute StreamRoute: typeof StreamRoute TypeOnlyReexportRoute: typeof TypeOnlyReexportRoute @@ -591,6 +686,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof ScriptsRouteImport parentRoute: typeof rootRouteImport } + '/raw-stream': { + id: '/raw-stream' + path: '/raw-stream' + fullPath: '/raw-stream' + preLoaderRoute: typeof RawStreamRouteImport + parentRoute: typeof rootRouteImport + } '/posts': { id: '/posts' path: '/posts' @@ -675,6 +777,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof RedirectIndexRouteImport parentRoute: typeof rootRouteImport } + '/raw-stream/': { + id: '/raw-stream/' + path: '/' + fullPath: '/raw-stream/' + preLoaderRoute: typeof RawStreamIndexRouteImport + parentRoute: typeof RawStreamRoute + } '/posts/': { id: '/posts/' path: '/' @@ -724,6 +833,48 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof RedirectTargetRouteImport parentRoute: typeof rootRouteImport } + '/raw-stream/ssr-text-hint': { + id: '/raw-stream/ssr-text-hint' + path: '/ssr-text-hint' + fullPath: '/raw-stream/ssr-text-hint' + preLoaderRoute: typeof RawStreamSsrTextHintRouteImport + parentRoute: typeof RawStreamRoute + } + '/raw-stream/ssr-single': { + id: '/raw-stream/ssr-single' + path: '/ssr-single' + fullPath: '/raw-stream/ssr-single' + preLoaderRoute: typeof RawStreamSsrSingleRouteImport + parentRoute: typeof RawStreamRoute + } + '/raw-stream/ssr-multiple': { + id: '/raw-stream/ssr-multiple' + path: '/ssr-multiple' + fullPath: '/raw-stream/ssr-multiple' + preLoaderRoute: typeof RawStreamSsrMultipleRouteImport + parentRoute: typeof RawStreamRoute + } + '/raw-stream/ssr-mixed': { + id: '/raw-stream/ssr-mixed' + path: '/ssr-mixed' + fullPath: '/raw-stream/ssr-mixed' + preLoaderRoute: typeof RawStreamSsrMixedRouteImport + parentRoute: typeof RawStreamRoute + } + '/raw-stream/ssr-binary-hint': { + id: '/raw-stream/ssr-binary-hint' + path: '/ssr-binary-hint' + fullPath: '/raw-stream/ssr-binary-hint' + preLoaderRoute: typeof RawStreamSsrBinaryHintRouteImport + parentRoute: typeof RawStreamRoute + } + '/raw-stream/client-call': { + id: '/raw-stream/client-call' + path: '/client-call' + fullPath: '/raw-stream/client-call' + preLoaderRoute: typeof RawStreamClientCallRouteImport + parentRoute: typeof RawStreamRoute + } '/posts/$postId': { id: '/posts/$postId' path: '/$postId' @@ -928,6 +1079,30 @@ const PostsRouteChildren: PostsRouteChildren = { const PostsRouteWithChildren = PostsRoute._addFileChildren(PostsRouteChildren) +interface RawStreamRouteChildren { + RawStreamClientCallRoute: typeof RawStreamClientCallRoute + RawStreamSsrBinaryHintRoute: typeof RawStreamSsrBinaryHintRoute + RawStreamSsrMixedRoute: typeof RawStreamSsrMixedRoute + RawStreamSsrMultipleRoute: typeof RawStreamSsrMultipleRoute + RawStreamSsrSingleRoute: typeof RawStreamSsrSingleRoute + RawStreamSsrTextHintRoute: typeof RawStreamSsrTextHintRoute + RawStreamIndexRoute: typeof RawStreamIndexRoute +} + +const RawStreamRouteChildren: RawStreamRouteChildren = { + RawStreamClientCallRoute: RawStreamClientCallRoute, + RawStreamSsrBinaryHintRoute: RawStreamSsrBinaryHintRoute, + RawStreamSsrMixedRoute: RawStreamSsrMixedRoute, + RawStreamSsrMultipleRoute: RawStreamSsrMultipleRoute, + RawStreamSsrSingleRoute: RawStreamSsrSingleRoute, + RawStreamSsrTextHintRoute: RawStreamSsrTextHintRoute, + RawStreamIndexRoute: RawStreamIndexRoute, +} + +const RawStreamRouteWithChildren = RawStreamRoute._addFileChildren( + RawStreamRouteChildren, +) + interface UsersRouteChildren { UsersUserIdRoute: typeof UsersUserIdRoute UsersIndexRoute: typeof UsersIndexRoute @@ -1000,6 +1175,7 @@ const rootRouteChildren: RootRouteChildren = { InlineScriptsRoute: InlineScriptsRoute, LinksRoute: LinksRoute, PostsRoute: PostsRouteWithChildren, + RawStreamRoute: RawStreamRouteWithChildren, ScriptsRoute: ScriptsRoute, StreamRoute: StreamRoute, TypeOnlyReexportRoute: TypeOnlyReexportRoute, diff --git a/e2e/react-start/basic/src/routes/__root.tsx b/e2e/react-start/basic/src/routes/__root.tsx index 1e77eabfc01..e1862b499c6 100644 --- a/e2e/react-start/basic/src/routes/__root.tsx +++ b/e2e/react-start/basic/src/routes/__root.tsx @@ -173,6 +173,14 @@ function RootDocument({ children }: { children: React.ReactNode }) { > Client Only {' '} + + Raw Stream + {' '} +

Raw Stream Tests

+ + + +
+ + + + ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/client-call.tsx b/e2e/react-start/basic/src/routes/raw-stream/client-call.tsx new file mode 100644 index 00000000000..1fc7d049900 --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/client-call.tsx @@ -0,0 +1,474 @@ +import { createFileRoute } from '@tanstack/react-router' +import * as React from 'react' +import { + singleRawStreamFn, + multipleRawStreamsFn, + jsonEndsFirstFn, + rawEndsFirstFn, + largeBinaryFn, + mixedStreamingFn, + textHintPureTextFn, + textHintPureBinaryFn, + textHintMixedFn, + binaryHintTextFn, + binaryHintBinaryFn, + interleavedStreamsFn, + burstPauseBurstFn, + threeStreamsFn, + emptyStreamFn, + errorStreamFn, + createStreamConsumer, + consumeBinaryStream, + collectBytes, + compareBytes, + TEST7_EXPECTED, + TEST8_EXPECTED, + TEST9_EXPECTED, + TEST10_EXPECTED, + TEST11_EXPECTED, + TEST12_STREAM_A_EXPECTED, + TEST12_STREAM_B_EXPECTED, + TEST13_EXPECTED, + TEST14_STREAM_A_EXPECTED, + TEST14_STREAM_B_EXPECTED, + TEST14_STREAM_C_EXPECTED, +} from '../../raw-stream-fns' + +export const Route = createFileRoute('/raw-stream/client-call')({ + component: ClientCallTests, +}) + +function ClientCallTests() { + const [results, setResults] = React.useState>({}) + const [loading, setLoading] = React.useState>({}) + + const consumeStream = createStreamConsumer() + + async function runTest( + testName: string, + fn: () => Promise, + processor: (result: any) => Promise, + ) { + setLoading((prev) => ({ ...prev, [testName]: true })) + try { + const result = await fn() + const processed = await processor(result) + setResults((prev) => ({ ...prev, [testName]: processed })) + } catch (error) { + setResults((prev) => ({ + ...prev, + [testName]: { error: String(error) }, + })) + } finally { + setLoading((prev) => ({ ...prev, [testName]: false })) + } + } + + return ( +
+

Client-Side Server Function Calls (RPC)

+

+ These tests invoke server functions directly from the client, using the + binary framing protocol for RawStream data. +

+ + {/* Test 1: Single Raw Stream */} +
+

Test 1: Single Raw Stream

+ +
{JSON.stringify(results.test1)}
+
+ + {/* Test 2: Multiple Raw Streams */} +
+

Test 2: Multiple Raw Streams

+ +
{JSON.stringify(results.test2)}
+
+ + {/* Test 3: JSON Ends First */} +
+

Test 3: JSON Ends Before Raw Stream

+ +
{JSON.stringify(results.test3)}
+
+ + {/* Test 4: Raw Ends First */} +
+

Test 4: Raw Stream Ends Before JSON

+ +
{JSON.stringify(results.test4)}
+
+ + {/* Test 5: Large Binary */} +
+

Test 5: Large Binary Data

+ +
{JSON.stringify(results.test5)}
+
+ + {/* Test 6: Mixed Streaming */} +
+

Test 6: Mixed Streaming

+ +
{JSON.stringify(results.test6)}
+
+ + {/* Hint Tests Section */} +

Hint Parameter Tests (RPC)

+

+ These tests verify that hint parameter works correctly for RPC calls. + Note: RPC always uses binary framing regardless of hint. +

+ + {/* Test 7: Text Hint with Pure Text */} +
+

Test 7: Text Hint - Pure Text

+ +
{JSON.stringify(results.test7)}
+
+ + {/* Test 8: Text Hint with Pure Binary */} +
+

Test 8: Text Hint - Pure Binary

+ +
{JSON.stringify(results.test8)}
+
+ + {/* Test 9: Text Hint with Mixed Content */} +
+

Test 9: Text Hint - Mixed Content

+ +
{JSON.stringify(results.test9)}
+
+ + {/* Test 10: Binary Hint with Text Data */} +
+

Test 10: Binary Hint - Text Data

+ +
{JSON.stringify(results.test10)}
+
+ + {/* Test 11: Binary Hint with Binary Data */} +
+

Test 11: Binary Hint - Binary Data

+ +
{JSON.stringify(results.test11)}
+
+ + {/* Multiplexing Tests Section */} +

Multiplexing Tests (RPC)

+

+ These tests verify correct interleaving of multiple concurrent streams. +

+ + {/* Test 12: Interleaved Streams */} +
+

Test 12: Interleaved Streams

+ +
{JSON.stringify(results.test12)}
+
+ + {/* Test 13: Burst-Pause-Burst */} +
+

Test 13: Burst-Pause-Burst

+ +
{JSON.stringify(results.test13)}
+
+ + {/* Test 14: Three Concurrent Streams */} +
+

Test 14: Three Concurrent Streams

+ +
{JSON.stringify(results.test14)}
+
+ + {/* Edge Case Tests Section */} +

Edge Case Tests (RPC)

+

+ These tests verify edge cases like empty streams and error handling. +

+ + {/* Test 15: Empty Stream */} +
+

Test 15: Empty Stream

+ +
{JSON.stringify(results.test15)}
+
+ + {/* Test 16: Stream Error */} +
+

Test 16: Stream Error

+ +
{JSON.stringify(results.test16)}
+
+
+ ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/index.tsx b/e2e/react-start/basic/src/routes/raw-stream/index.tsx new file mode 100644 index 00000000000..0804efc0404 --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/index.tsx @@ -0,0 +1,74 @@ +import { Link, createFileRoute } from '@tanstack/react-router' + +export const Route = createFileRoute('/raw-stream/')({ + component: RawStreamIndex, +}) + +function RawStreamIndex() { + return ( +
+

Select a test category above to begin testing.

+
    +
  • + + Client Calls + {' '} + - Test RawStream via direct server function calls (RPC) +
  • +
  • + + SSR Single + {' '} + - Test single RawStream from route loader (SSR) +
  • +
  • + + SSR Multiple + {' '} + - Test multiple RawStreams from route loader (SSR) +
  • +
  • + + SSR Mixed + {' '} + - Test RawStream mixed with deferred data from loader (SSR) +
  • +
  • + + SSR Text Hint + {' '} + - Test RawStream with hint: 'text' from loader (SSR) +
  • +
  • + + SSR Binary Hint + {' '} + - Test RawStream with hint: 'binary' from loader (SSR) +
  • +
+
+ ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/ssr-binary-hint.tsx b/e2e/react-start/basic/src/routes/raw-stream/ssr-binary-hint.tsx new file mode 100644 index 00000000000..bd4b8b1910b --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/ssr-binary-hint.tsx @@ -0,0 +1,129 @@ +import { createFileRoute } from '@tanstack/react-router' +import * as React from 'react' +import { RawStream } from '@tanstack/react-start' +import { + encode, + createDelayedStream, + concatBytes, + collectBytes, + compareBytes, +} from '../../raw-stream-fns' + +// Expected data - defined at module level for client-side verification +const TEXT_CHUNKS = [encode('Binary '), encode('hint '), encode('with text')] +const TEXT_EXPECTED = concatBytes(TEXT_CHUNKS) + +const BINARY_CHUNKS = [ + new Uint8Array([0x00, 0x01, 0x02, 0x03]), + new Uint8Array([0xff, 0xfe, 0xfd, 0xfc]), +] +const BINARY_EXPECTED = concatBytes(BINARY_CHUNKS) + +export const Route = createFileRoute('/raw-stream/ssr-binary-hint')({ + loader: async () => { + // Text data with binary hint - should still use base64 (default behavior) + const textStream = createDelayedStream( + [encode('Binary '), encode('hint '), encode('with text')], + 30, + ) + + // Pure binary stream with binary hint + const binaryStream = createDelayedStream( + [ + new Uint8Array([0x00, 0x01, 0x02, 0x03]), + new Uint8Array([0xff, 0xfe, 0xfd, 0xfc]), + ], + 30, + ) + + return { + message: 'SSR Binary Hint Test', + textData: new RawStream(textStream, { hint: 'binary' }), + binaryData: new RawStream(binaryStream, { hint: 'binary' }), + } + }, + component: SSRBinaryHintTest, +}) + +function SSRBinaryHintTest() { + const { message, textData, binaryData } = Route.useLoaderData() + const [textMatch, setTextMatch] = React.useState<{ + match: boolean + mismatchIndex: number | null + actualLength: number + expectedLength: number + asText: string + } | null>(null) + const [binaryMatch, setBinaryMatch] = React.useState<{ + match: boolean + mismatchIndex: number | null + actualLength: number + expectedLength: number + } | null>(null) + const [isLoading, setIsLoading] = React.useState(true) + const [error, setError] = React.useState(null) + + React.useEffect(() => { + Promise.all([collectBytes(textData), collectBytes(binaryData)]) + .then(([textBytes, binaryBytes]) => { + const textComp = compareBytes(textBytes, TEXT_EXPECTED) + const decoder = new TextDecoder() + setTextMatch({ + ...textComp, + actualLength: textBytes.length, + expectedLength: TEXT_EXPECTED.length, + asText: decoder.decode(textBytes), + }) + const binaryComp = compareBytes(binaryBytes, BINARY_EXPECTED) + setBinaryMatch({ + ...binaryComp, + actualLength: binaryBytes.length, + expectedLength: BINARY_EXPECTED.length, + }) + setIsLoading(false) + }) + .catch((err) => { + setError(String(err)) + setIsLoading(false) + }) + }, [textData, binaryData]) + + return ( +
+

SSR Binary Hint Test

+

+ This route tests RawStream with hint: 'binary' from loader. Binary hint + always uses base64 encoding (default behavior). +

+ +
+
Message: {message}
+
+ Text Data:{' '} + {error + ? `Error: ${error}` + : isLoading + ? 'Loading...' + : textMatch?.asText} +
+
+ Text Bytes Match:{' '} + {isLoading ? 'Loading...' : textMatch?.match ? 'true' : 'false'} +
+
+ Binary Bytes Match:{' '} + {isLoading ? 'Loading...' : binaryMatch?.match ? 'true' : 'false'} +
+
+          {JSON.stringify({
+            message,
+            textMatch,
+            binaryMatch,
+            isLoading,
+            error,
+          })}
+        
+
+
+ ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/ssr-mixed.tsx b/e2e/react-start/basic/src/routes/raw-stream/ssr-mixed.tsx new file mode 100644 index 00000000000..0054c127e0a --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/ssr-mixed.tsx @@ -0,0 +1,80 @@ +import { Await, createFileRoute } from '@tanstack/react-router' +import * as React from 'react' +import { RawStream } from '@tanstack/react-start' +import { + createDelayedStream, + createStreamConsumer, + encode, +} from '../../raw-stream-fns' + +export const Route = createFileRoute('/raw-stream/ssr-mixed')({ + loader: () => { + const rawStream = createDelayedStream( + [encode('mixed-ssr-1'), encode('mixed-ssr-2')], + 50, + ) + + // Deferred promise that resolves after a delay + const deferredData = new Promise((resolve) => + setTimeout(() => resolve('deferred-ssr-value'), 100), + ) + + return { + immediate: 'immediate-ssr-value', + deferred: deferredData, + rawData: new RawStream(rawStream), + } + }, + component: SSRMixedTest, +}) + +function SSRMixedTest() { + const { immediate, deferred, rawData } = Route.useLoaderData() + const [streamContent, setStreamContent] = React.useState('') + const [isConsuming, setIsConsuming] = React.useState(true) + const [error, setError] = React.useState(null) + + React.useEffect(() => { + const consumeStream = createStreamConsumer() + consumeStream(rawData) + .then((content) => { + setStreamContent(content) + setIsConsuming(false) + }) + .catch((err) => { + setError(String(err)) + setIsConsuming(false) + }) + }, [rawData]) + + return ( +
+

SSR Mixed Streaming Test

+

+ This route returns a mix of immediate data, deferred promises, and + RawStream from its loader. +

+ +
+
Immediate: {immediate}
+
+ Deferred:{' '} + + {(value) => {value}} + +
+
+ Stream Content:{' '} + {error + ? `Error: ${error}` + : isConsuming + ? 'Loading...' + : streamContent} +
+
+          {JSON.stringify({ immediate, streamContent, isConsuming, error })}
+        
+
+
+ ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/ssr-multiple.tsx b/e2e/react-start/basic/src/routes/raw-stream/ssr-multiple.tsx new file mode 100644 index 00000000000..9f25b6c515a --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/ssr-multiple.tsx @@ -0,0 +1,88 @@ +import { createFileRoute } from '@tanstack/react-router' +import * as React from 'react' +import { RawStream } from '@tanstack/react-start' +import { + encode, + createDelayedStream, + createStreamConsumer, +} from '../../raw-stream-fns' + +export const Route = createFileRoute('/raw-stream/ssr-multiple')({ + loader: async () => { + const stream1 = createDelayedStream( + [encode('multi-1a'), encode('multi-1b')], + 30, + ) + const stream2 = createDelayedStream( + [encode('multi-2a'), encode('multi-2b')], + 50, + ) + return { + message: 'SSR Multiple Streams Test', + first: new RawStream(stream1), + second: new RawStream(stream2), + } + }, + component: SSRMultipleTest, +}) + +function SSRMultipleTest() { + const { message, first, second } = Route.useLoaderData() + const [firstContent, setFirstContent] = React.useState('') + const [secondContent, setSecondContent] = React.useState('') + const [isConsuming, setIsConsuming] = React.useState(true) + const [error, setError] = React.useState(null) + + React.useEffect(() => { + const consumeStream = createStreamConsumer() + Promise.all([consumeStream(first), consumeStream(second)]) + .then(([content1, content2]) => { + setFirstContent(content1) + setSecondContent(content2) + setIsConsuming(false) + }) + .catch((err) => { + setError(String(err)) + setIsConsuming(false) + }) + }, [first, second]) + + return ( +
+

SSR Multiple RawStreams Test

+

+ This route returns multiple RawStreams from its loader. Each stream is + independently serialized during SSR. +

+ +
+
Message: {message}
+
+ First Stream:{' '} + {error + ? `Error: ${error}` + : isConsuming + ? 'Loading...' + : firstContent} +
+
+ Second Stream:{' '} + {error + ? `Error: ${error}` + : isConsuming + ? 'Loading...' + : secondContent} +
+
+          {JSON.stringify({
+            message,
+            firstContent,
+            secondContent,
+            isConsuming,
+            error,
+          })}
+        
+
+
+ ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/ssr-single.tsx b/e2e/react-start/basic/src/routes/raw-stream/ssr-single.tsx new file mode 100644 index 00000000000..28fb4ea9fdd --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/ssr-single.tsx @@ -0,0 +1,75 @@ +import { createFileRoute } from '@tanstack/react-router' +import * as React from 'react' +import { RawStream } from '@tanstack/react-start' +import { + encode, + createDelayedStream, + createStreamConsumer, +} from '../../raw-stream-fns' + +export const Route = createFileRoute('/raw-stream/ssr-single')({ + loader: async () => { + const stream = createDelayedStream( + [encode('ssr-chunk1'), encode('ssr-chunk2'), encode('ssr-chunk3')], + 50, + ) + return { + message: 'SSR Single Stream Test', + timestamp: Date.now(), + rawData: new RawStream(stream), + } + }, + component: SSRSingleTest, +}) + +function SSRSingleTest() { + const { message, timestamp, rawData } = Route.useLoaderData() + const [streamContent, setStreamContent] = React.useState('') + const [isConsuming, setIsConsuming] = React.useState(true) + const [error, setError] = React.useState(null) + + React.useEffect(() => { + const consumeStream = createStreamConsumer() + consumeStream(rawData) + .then((content) => { + setStreamContent(content) + setIsConsuming(false) + }) + .catch((err) => { + setError(String(err)) + setIsConsuming(false) + }) + }, [rawData]) + + return ( +
+

SSR Single RawStream Test

+

+ This route returns a single RawStream from its loader. The stream is + serialized during SSR using base64 encoding. +

+ +
+
Message: {message}
+
+ Has Timestamp: {typeof timestamp === 'number' ? 'true' : 'false'} +
+
+ Stream Content:{' '} + {error + ? `Error: ${error}` + : isConsuming + ? 'Loading...' + : streamContent} +
+
+ RawData Type: {typeof rawData} | hasStream:{' '} + {rawData && 'getReader' in rawData ? 'true' : 'false'} +
+
+          {JSON.stringify({ message, streamContent, isConsuming, error })}
+        
+
+
+ ) +} diff --git a/e2e/react-start/basic/src/routes/raw-stream/ssr-text-hint.tsx b/e2e/react-start/basic/src/routes/raw-stream/ssr-text-hint.tsx new file mode 100644 index 00000000000..1f52c8c0176 --- /dev/null +++ b/e2e/react-start/basic/src/routes/raw-stream/ssr-text-hint.tsx @@ -0,0 +1,176 @@ +import { createFileRoute } from '@tanstack/react-router' +import * as React from 'react' +import { RawStream } from '@tanstack/react-start' +import { + encode, + createDelayedStream, + concatBytes, + collectBytes, + compareBytes, +} from '../../raw-stream-fns' + +// Expected data - defined at module level for client-side verification +const PURE_TEXT_CHUNKS = [ + encode('Hello '), + encode('World '), + encode('from SSR!'), +] +const PURE_TEXT_EXPECTED = concatBytes(PURE_TEXT_CHUNKS) + +const MIXED_CHUNKS = [ + encode('Valid text'), + new Uint8Array([0xff, 0xfe, 0x80, 0x90]), // Invalid UTF-8 + encode(' more text'), +] +const MIXED_EXPECTED = concatBytes(MIXED_CHUNKS) + +// Pure binary data (invalid UTF-8) - must use base64 fallback +const PURE_BINARY_CHUNKS = [ + new Uint8Array([0xff, 0xfe, 0x00, 0x01, 0x80, 0x90]), + new Uint8Array([0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0]), +] +const PURE_BINARY_EXPECTED = concatBytes(PURE_BINARY_CHUNKS) + +export const Route = createFileRoute('/raw-stream/ssr-text-hint')({ + loader: async () => { + // Pure text stream - should use UTF-8 encoding with text hint + const textStream = createDelayedStream( + [encode('Hello '), encode('World '), encode('from SSR!')], + 30, + ) + + // Mixed content stream - text hint should use UTF-8 for valid text, base64 for binary + const mixedStream = createDelayedStream( + [ + encode('Valid text'), + new Uint8Array([0xff, 0xfe, 0x80, 0x90]), // Invalid UTF-8 + encode(' more text'), + ], + 30, + ) + + // Pure binary stream - text hint must fallback to base64 for all chunks + const pureBinaryStream = createDelayedStream( + [ + new Uint8Array([0xff, 0xfe, 0x00, 0x01, 0x80, 0x90]), + new Uint8Array([0xa0, 0xb0, 0xc0, 0xd0, 0xe0, 0xf0]), + ], + 30, + ) + + return { + message: 'SSR Text Hint Test', + pureText: new RawStream(textStream, { hint: 'text' }), + mixedContent: new RawStream(mixedStream, { hint: 'text' }), + pureBinary: new RawStream(pureBinaryStream, { hint: 'text' }), + } + }, + component: SSRTextHintTest, +}) + +function SSRTextHintTest() { + const { message, pureText, mixedContent, pureBinary } = Route.useLoaderData() + const [pureTextMatch, setPureTextMatch] = React.useState<{ + match: boolean + mismatchIndex: number | null + actualLength: number + expectedLength: number + asText: string + } | null>(null) + const [mixedMatch, setMixedMatch] = React.useState<{ + match: boolean + mismatchIndex: number | null + actualLength: number + expectedLength: number + } | null>(null) + const [pureBinaryMatch, setPureBinaryMatch] = React.useState<{ + match: boolean + mismatchIndex: number | null + actualLength: number + expectedLength: number + } | null>(null) + const [isLoading, setIsLoading] = React.useState(true) + const [error, setError] = React.useState(null) + + React.useEffect(() => { + Promise.all([ + collectBytes(pureText), + collectBytes(mixedContent), + collectBytes(pureBinary), + ]) + .then(([pureBytes, mixedBytes, pureBinaryBytes]) => { + const pureComp = compareBytes(pureBytes, PURE_TEXT_EXPECTED) + const decoder = new TextDecoder() + setPureTextMatch({ + ...pureComp, + actualLength: pureBytes.length, + expectedLength: PURE_TEXT_EXPECTED.length, + asText: decoder.decode(pureBytes), + }) + const mixedComp = compareBytes(mixedBytes, MIXED_EXPECTED) + setMixedMatch({ + ...mixedComp, + actualLength: mixedBytes.length, + expectedLength: MIXED_EXPECTED.length, + }) + const pureBinaryComp = compareBytes( + pureBinaryBytes, + PURE_BINARY_EXPECTED, + ) + setPureBinaryMatch({ + ...pureBinaryComp, + actualLength: pureBinaryBytes.length, + expectedLength: PURE_BINARY_EXPECTED.length, + }) + setIsLoading(false) + }) + .catch((err) => { + setError(String(err)) + setIsLoading(false) + }) + }, [pureText, mixedContent, pureBinary]) + + return ( +
+

SSR Text Hint Test

+

+ This route tests RawStream with hint: 'text' from loader. Text hint + optimizes for UTF-8 content but falls back to base64 for invalid UTF-8. +

+ +
+
Message: {message}
+
+ Pure Text:{' '} + {error + ? `Error: ${error}` + : isLoading + ? 'Loading...' + : pureTextMatch?.asText} +
+
+ Pure Text Bytes Match:{' '} + {isLoading ? 'Loading...' : pureTextMatch?.match ? 'true' : 'false'} +
+
+ Mixed Content Bytes Match:{' '} + {isLoading ? 'Loading...' : mixedMatch?.match ? 'true' : 'false'} +
+
+ Pure Binary Bytes Match:{' '} + {isLoading ? 'Loading...' : pureBinaryMatch?.match ? 'true' : 'false'} +
+
+          {JSON.stringify({
+            message,
+            pureTextMatch,
+            mixedMatch,
+            pureBinaryMatch,
+            isLoading,
+            error,
+          })}
+        
+
+
+ ) +} diff --git a/e2e/react-start/basic/tests/raw-stream.spec.ts b/e2e/react-start/basic/tests/raw-stream.spec.ts new file mode 100644 index 00000000000..b47b6c0c22b --- /dev/null +++ b/e2e/react-start/basic/tests/raw-stream.spec.ts @@ -0,0 +1,666 @@ +import { expect } from '@playwright/test' +import { test } from '@tanstack/router-e2e-utils' +import { isPrerender } from './utils/isPrerender' + +/** + * These tests verify the RawStream binary streaming functionality. + * + * RawStream allows returning ReadableStream from server functions + * with efficient binary encoding: + * - Server functions (RPC): Binary framing protocol + * - SSR loaders: Base64 encoding via seroval's stream mechanism + */ + +// Wait time for hydration to complete after page load +// This needs to be long enough for React hydration to attach event handlers +const HYDRATION_WAIT = 1000 + +test.describe('RawStream - Client RPC Tests', () => { + test('Single raw stream - returns stream with binary data', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + // Wait for hydration + await page.getByTestId('test1-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test1-btn').click() + + await expect(page.getByTestId('test1-result')).toContainText( + 'chunk1chunk2chunk3', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test1-result')).toContainText( + 'Single stream test', + ) + }) + + test('Multiple raw streams - returns multiple independent streams', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test2-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test2-btn').click() + + await expect(page.getByTestId('test2-result')).toContainText( + 'stream1-astream1-b', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test2-result')).toContainText( + 'stream2-astream2-b', + ) + await expect(page.getByTestId('test2-result')).toContainText( + 'Multiple streams test', + ) + }) + + test('JSON ends before raw stream - handles timing correctly', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test3-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test3-btn').click() + + await expect(page.getByTestId('test3-result')).toContainText( + 'slow-1slow-2slow-3slow-4', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test3-result')).toContainText( + 'JSON ends first test', + ) + await expect(page.getByTestId('test3-result')).toContainText('hasTimestamp') + }) + + test('Raw stream ends before JSON - handles timing correctly', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test4-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test4-btn').click() + + await expect(page.getByTestId('test4-result')).toContainText('fast-done', { + timeout: 10000, + }) + await expect(page.getByTestId('test4-result')).toContainText( + 'deferred-json-data', + ) + await expect(page.getByTestId('test4-result')).toContainText( + 'Raw ends first test', + ) + }) + + test('Large binary data - handles 3KB of binary correctly', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test5-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test5-btn').click() + + await expect(page.getByTestId('test5-result')).toContainText( + '"sizeMatch":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test5-result')).toContainText( + '"actualSize":3072', + ) + await expect(page.getByTestId('test5-result')).toContainText( + 'Large binary test', + ) + }) + + test('Mixed streaming - Promise and RawStream together', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test6-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test6-btn').click() + + await expect(page.getByTestId('test6-result')).toContainText( + 'immediate-value', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test6-result')).toContainText( + 'deferred-value', + ) + await expect(page.getByTestId('test6-result')).toContainText( + 'mixed-raw-1mixed-raw-2', + ) + }) +}) + +test.describe('RawStream - SSR Loader Tests', () => { + test('SSR single stream - direct navigation', async ({ page }) => { + // Direct navigation = full SSR with base64 encoding + await page.goto('/raw-stream/ssr-single') + await page.waitForURL('/raw-stream/ssr-single') + + // Wait for stream to be consumed (SSR tests need hydration + stream consumption) + await expect(page.getByTestId('ssr-single-stream')).toContainText( + 'ssr-chunk1ssr-chunk2ssr-chunk3', + { timeout: 10000 }, + ) + await expect(page.getByTestId('ssr-single-message')).toContainText( + 'SSR Single Stream Test', + ) + await expect(page.getByTestId('ssr-single-timestamp')).toContainText( + 'Has Timestamp: true', + ) + }) + + test('SSR multiple streams - direct navigation', async ({ page }) => { + await page.goto('/raw-stream/ssr-multiple') + await page.waitForURL('/raw-stream/ssr-multiple') + + await expect(page.getByTestId('ssr-multiple-first')).toContainText( + 'multi-1amulti-1b', + { timeout: 10000 }, + ) + await expect(page.getByTestId('ssr-multiple-second')).toContainText( + 'multi-2amulti-2b', + ) + await expect(page.getByTestId('ssr-multiple-message')).toContainText( + 'SSR Multiple Streams Test', + ) + }) + + // Skip in prerender mode: RawStream + deferred data causes stream chunks to be + // missing from prerendered HTML. This is a known limitation where the prerender + // process doesn't properly capture streaming data when deferred promises are present. + ;(isPrerender ? test.skip : test)( + 'SSR mixed streaming - RawStream with deferred data', + async ({ page }) => { + await page.goto('/raw-stream/ssr-mixed') + await page.waitForURL('/raw-stream/ssr-mixed') + + await expect(page.getByTestId('ssr-mixed-immediate')).toContainText( + 'immediate-ssr-value', + ) + await expect(page.getByTestId('ssr-mixed-stream')).toContainText( + 'mixed-ssr-1mixed-ssr-2', + { timeout: 10000 }, + ) + // Deferred promise should also resolve + await expect(page.getByTestId('ssr-mixed-deferred')).toContainText( + 'deferred-ssr-value', + { timeout: 10000 }, + ) + }, + ) + + test('SSR single stream - client-side navigation', async ({ page }) => { + // Start from index, then navigate client-side to SSR route + await page.goto('/raw-stream') + await page.waitForURL('/raw-stream') + + // Wait for hydration (use navigation to be specific) + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Single' }) + .waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + // Client-side navigation + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Single' }) + .click() + await page.waitForURL('/raw-stream/ssr-single') + + // Stream should still work after client navigation + await expect(page.getByTestId('ssr-single-stream')).toContainText( + 'ssr-chunk1ssr-chunk2ssr-chunk3', + { timeout: 10000 }, + ) + }) + + test('SSR multiple streams - client-side navigation', async ({ page }) => { + await page.goto('/raw-stream') + await page.waitForURL('/raw-stream') + + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Multiple' }) + .waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Multiple' }) + .click() + await page.waitForURL('/raw-stream/ssr-multiple') + + await expect(page.getByTestId('ssr-multiple-first')).toContainText( + 'multi-1amulti-1b', + { timeout: 10000 }, + ) + await expect(page.getByTestId('ssr-multiple-second')).toContainText( + 'multi-2amulti-2b', + ) + }) +}) + +test.describe('RawStream - Hint Parameter (RPC)', () => { + test('Text hint with pure text - uses UTF-8 encoding', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test7-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test7-btn').click() + + await expect(page.getByTestId('test7-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test7-result')).toContainText( + 'Hello, World! This is text.', + ) + await expect(page.getByTestId('test7-result')).toContainText( + 'Text hint with pure text', + ) + }) + + test('Text hint with pure binary - fallback to base64', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test8-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test8-btn').click() + + await expect(page.getByTestId('test8-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test8-result')).toContainText( + '"expectedLength":12', + ) + }) + + test('Text hint with mixed content - handles both', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test9-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test9-btn').click() + + await expect(page.getByTestId('test9-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test9-result')).toContainText( + '"expectedLength":30', + ) + }) + + test('Binary hint with text data - uses base64', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test10-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test10-btn').click() + + await expect(page.getByTestId('test10-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test10-result')).toContainText( + 'This is text but using binary hint', + ) + }) + + test('Binary hint with binary data - uses base64', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test11-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test11-btn').click() + + await expect(page.getByTestId('test11-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test11-result')).toContainText( + '"expectedLength":6', + ) + }) +}) + +test.describe('RawStream - SSR Hint Parameter Tests', () => { + test('SSR text hint with pure text - direct navigation', async ({ page }) => { + await page.goto('/raw-stream/ssr-text-hint') + await page.waitForURL('/raw-stream/ssr-text-hint') + + await expect(page.getByTestId('ssr-text-hint-pure-text')).toContainText( + 'Hello World from SSR!', + { timeout: 10000 }, + ) + await expect(page.getByTestId('ssr-text-hint-pure-match')).toContainText( + 'true', + ) + await expect(page.getByTestId('ssr-text-hint-mixed-match')).toContainText( + 'true', + ) + await expect( + page.getByTestId('ssr-text-hint-pure-binary-match'), + ).toContainText('true') + }) + + test('SSR text hint - byte-by-byte verification', async ({ page }) => { + await page.goto('/raw-stream/ssr-text-hint') + await page.waitForURL('/raw-stream/ssr-text-hint') + + // Wait for streams to be fully consumed + await expect(page.getByTestId('ssr-text-hint-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + // Check pure text, mixed content, and pure binary all match + const result = await page.getByTestId('ssr-text-hint-result').textContent() + const parsed = JSON.parse(result || '{}') + expect(parsed.pureTextMatch?.match).toBe(true) + expect(parsed.mixedMatch?.match).toBe(true) + expect(parsed.pureBinaryMatch?.match).toBe(true) + }) + + test('SSR binary hint with text - direct navigation', async ({ page }) => { + await page.goto('/raw-stream/ssr-binary-hint') + await page.waitForURL('/raw-stream/ssr-binary-hint') + + await expect(page.getByTestId('ssr-binary-hint-text')).toContainText( + 'Binary hint with text', + { timeout: 10000 }, + ) + await expect(page.getByTestId('ssr-binary-hint-text-match')).toContainText( + 'true', + ) + await expect( + page.getByTestId('ssr-binary-hint-binary-match'), + ).toContainText('true') + }) + + test('SSR binary hint - byte-by-byte verification', async ({ page }) => { + await page.goto('/raw-stream/ssr-binary-hint') + await page.waitForURL('/raw-stream/ssr-binary-hint') + + // Wait for streams to be fully consumed + await expect(page.getByTestId('ssr-binary-hint-result')).toContainText( + '"match":true', + { timeout: 10000 }, + ) + // Check both text and binary data match + const result = await page + .getByTestId('ssr-binary-hint-result') + .textContent() + const parsed = JSON.parse(result || '{}') + expect(parsed.textMatch?.match).toBe(true) + expect(parsed.binaryMatch?.match).toBe(true) + }) + + test('SSR text hint - client-side navigation', async ({ page }) => { + await page.goto('/raw-stream') + await page.waitForURL('/raw-stream') + + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Text Hint' }) + .waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Text Hint' }) + .click() + await page.waitForURL('/raw-stream/ssr-text-hint') + + await expect(page.getByTestId('ssr-text-hint-pure-match')).toContainText( + 'true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('ssr-text-hint-mixed-match')).toContainText( + 'true', + ) + await expect( + page.getByTestId('ssr-text-hint-pure-binary-match'), + ).toContainText('true') + }) + + test('SSR binary hint - client-side navigation', async ({ page }) => { + await page.goto('/raw-stream') + await page.waitForURL('/raw-stream') + + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Binary Hint' }) + .waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page + .getByRole('navigation') + .getByRole('link', { name: 'SSR Binary Hint' }) + .click() + await page.waitForURL('/raw-stream/ssr-binary-hint') + + await expect(page.getByTestId('ssr-binary-hint-text-match')).toContainText( + 'true', + { timeout: 10000 }, + ) + await expect( + page.getByTestId('ssr-binary-hint-binary-match'), + ).toContainText('true') + }) +}) + +test.describe('RawStream - Multiplexing Tests (RPC)', () => { + test('Interleaved streams - two concurrent streams with variable delays', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test12-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test12-btn').click() + + // Both streams should have matching bytes + await expect(page.getByTestId('test12-result')).toContainText( + '"match":true', + { timeout: 15000 }, + ) + // Verify both streams match + const result = await page.getByTestId('test12-result').textContent() + const parsed = JSON.parse(result || '{}') + expect(parsed.streamA?.match).toBe(true) + expect(parsed.streamB?.match).toBe(true) + }) + + test('Burst-pause-burst - single stream with variable timing', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test13-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test13-btn').click() + + await expect(page.getByTestId('test13-result')).toContainText( + '"match":true', + { timeout: 15000 }, + ) + await expect(page.getByTestId('test13-result')).toContainText( + 'Burst-pause-burst test', + ) + }) + + test('Three concurrent streams - different timing patterns', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test14-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test14-btn').click() + + // All three streams should match + await expect(page.getByTestId('test14-result')).toContainText( + '"match":true', + { timeout: 15000 }, + ) + // Verify all three streams match + const result = await page.getByTestId('test14-result').textContent() + const parsed = JSON.parse(result || '{}') + expect(parsed.fast?.match).toBe(true) + expect(parsed.slow?.match).toBe(true) + expect(parsed.burst?.match).toBe(true) + }) +}) + +test.describe('RawStream - Cross Navigation', () => { + test('Client RPC works after navigating from SSR route', async ({ page }) => { + // Start with SSR route + await page.goto('/raw-stream/ssr-single') + await page.waitForURL('/raw-stream/ssr-single') + + // Wait for SSR stream to complete (ensures hydration is done) + await expect(page.getByTestId('ssr-single-stream')).toContainText( + 'ssr-chunk1ssr-chunk2ssr-chunk3', + { timeout: 10000 }, + ) + + // Navigate to client-call route (use first() to avoid strict mode on multiple matches) + await page + .getByRole('navigation') + .getByRole('link', { name: 'Client Calls' }) + .click() + await page.waitForURL('/raw-stream/client-call') + + // Wait for hydration + await page.getByTestId('test1-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + // Run RPC test + await page.getByTestId('test1-btn').click() + + await expect(page.getByTestId('test1-result')).toContainText( + 'chunk1chunk2chunk3', + { timeout: 10000 }, + ) + }) + + test('Navigation from home to raw-stream routes', async ({ page }) => { + // Start from home + await page.goto('/') + await page.waitForURL('/') + + // Wait for hydration + await page + .getByRole('link', { name: 'Raw Stream' }) + .waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + // Navigate via client-side to raw-stream + await page.getByRole('link', { name: 'Raw Stream' }).click() + await page.waitForURL('/raw-stream') + + // Wait for hydration on the new page + await page.waitForTimeout(HYDRATION_WAIT) + + // Then to client-call (use navigation area to avoid duplicates) + await page + .getByRole('navigation') + .getByRole('link', { name: 'Client Calls' }) + .click() + await page.waitForURL('/raw-stream/client-call') + + // Wait for button + await page.getByTestId('test1-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + // Run a test + await page.getByTestId('test1-btn').click() + + await expect(page.getByTestId('test1-result')).toContainText( + 'chunk1chunk2chunk3', + { timeout: 10000 }, + ) + }) +}) + +test.describe('RawStream - Edge Cases (RPC)', () => { + test('Empty stream - handles zero-byte stream correctly', async ({ + page, + }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test15-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test15-btn').click() + + await expect(page.getByTestId('test15-result')).toContainText( + '"isEmpty":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test15-result')).toContainText( + '"byteCount":0', + ) + await expect(page.getByTestId('test15-result')).toContainText( + 'Empty stream test', + ) + }) + + test('Stream error - propagates error to client', async ({ page }) => { + await page.goto('/raw-stream/client-call') + await page.waitForURL('/raw-stream/client-call') + + await page.getByTestId('test16-btn').waitFor({ state: 'visible' }) + await page.waitForTimeout(HYDRATION_WAIT) + + await page.getByTestId('test16-btn').click() + + await expect(page.getByTestId('test16-result')).toContainText( + '"errorCaught":true', + { timeout: 10000 }, + ) + await expect(page.getByTestId('test16-result')).toContainText( + 'Intentional stream error', + ) + await expect(page.getByTestId('test16-result')).toContainText( + 'Error stream test', + ) + }) +}) diff --git a/packages/router-core/src/index.ts b/packages/router-core/src/index.ts index 73019ccbb65..8df74f9d3f9 100644 --- a/packages/router-core/src/index.ts +++ b/packages/router-core/src/index.ts @@ -432,6 +432,18 @@ export { export { defaultSerovalPlugins } from './ssr/serializer/seroval-plugins' +export { + RawStream, + RawStreamSSRPlugin, + createRawStreamRPCPlugin, + createRawStreamDeserializePlugin, +} from './ssr/serializer/RawStream' +export type { + OnRawStreamCallback, + RawStreamHint, + RawStreamOptions, +} from './ssr/serializer/RawStream' + export { composeRewrites, executeRewriteInput, diff --git a/packages/router-core/src/ssr/serializer/RawStream.ts b/packages/router-core/src/ssr/serializer/RawStream.ts new file mode 100644 index 00000000000..b5cba6d1a57 --- /dev/null +++ b/packages/router-core/src/ssr/serializer/RawStream.ts @@ -0,0 +1,464 @@ +import { createPlugin, createStream } from 'seroval' +import type { Plugin } from 'seroval' + +/** + * Hint for RawStream encoding strategy during SSR serialization. + * - 'binary': Always use base64 encoding (best for binary data like files, images) + * - 'text': Try UTF-8 first, fallback to base64 (best for text-heavy data like RSC payloads) + */ +export type RawStreamHint = 'binary' | 'text' + +/** + * Options for RawStream configuration. + */ +export interface RawStreamOptions { + /** + * Encoding hint for SSR serialization. + * - 'binary' (default): Always use base64 encoding + * - 'text': Try UTF-8 first, fallback to base64 for invalid UTF-8 chunks + */ + hint?: RawStreamHint +} + +/** + * Marker class for ReadableStream that should be serialized + * with base64 encoding (SSR) or binary framing (server functions). + * + * Wrap your binary streams with this to get efficient serialization: + * ```ts + * // For binary data (files, images, etc.) + * return { data: new RawStream(file.stream()) } + * + * // For text-heavy data (RSC payloads, etc.) + * return { data: new RawStream(rscStream, { hint: 'text' }) } + * ``` + */ +export class RawStream { + public readonly hint: RawStreamHint + + constructor( + public readonly stream: ReadableStream, + options?: RawStreamOptions, + ) { + this.hint = options?.hint ?? 'binary' + } +} + +/** + * Callback type for RPC plugin to register raw streams with multiplexer + */ +export type OnRawStreamCallback = ( + streamId: number, + stream: ReadableStream, +) => void + +// Base64 helpers used in both Node and browser. +// In Node-like runtimes, prefer Buffer for speed and compatibility. +const BufferCtor: any = (globalThis as any).Buffer +const hasNodeBuffer = !!BufferCtor && typeof BufferCtor.from === 'function' + +function uint8ArrayToBase64(bytes: Uint8Array): string { + if (bytes.length === 0) return '' + + if (hasNodeBuffer) { + return BufferCtor.from(bytes).toString('base64') + } + + // Browser fallback: chunked String.fromCharCode + btoa + const CHUNK_SIZE = 0x8000 // 32KB chunks to avoid stack overflow + const chunks: Array = [] + for (let i = 0; i < bytes.length; i += CHUNK_SIZE) { + const chunk = bytes.subarray(i, i + CHUNK_SIZE) + chunks.push(String.fromCharCode.apply(null, chunk as any)) + } + return btoa(chunks.join('')) +} + +function base64ToUint8Array(base64: string): Uint8Array { + if (base64.length === 0) return new Uint8Array(0) + + if (hasNodeBuffer) { + const buf = BufferCtor.from(base64, 'base64') + return new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength) + } + + const binary = atob(base64) + const bytes = new Uint8Array(binary.length) + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i) + } + return bytes +} + +// Factory sentinels - use null-proto objects to avoid prototype surprises +const RAW_STREAM_FACTORY_BINARY: Record = Object.create(null) +const RAW_STREAM_FACTORY_TEXT: Record = Object.create(null) + +// Factory constructor for binary mode - converts seroval stream to ReadableStream +// All chunks are base64 encoded strings +const RAW_STREAM_FACTORY_CONSTRUCTOR_BINARY = ( + stream: ReturnType, +) => + new ReadableStream({ + start(controller) { + stream.on({ + next(base64: string) { + try { + controller.enqueue(base64ToUint8Array(base64)) + } catch { + // Stream may be closed + } + }, + throw(error: unknown) { + controller.error(error) + }, + return() { + try { + controller.close() + } catch { + // Stream may already be closed + } + }, + }) + }, + }) + +// Factory constructor for text mode - converts seroval stream to ReadableStream +// Chunks are either strings (UTF-8) or { $b64: string } (base64 fallback) +// Use module-level TextEncoder to avoid per-factory allocation +const textEncoderForFactory = new TextEncoder() +const RAW_STREAM_FACTORY_CONSTRUCTOR_TEXT = ( + stream: ReturnType, +) => { + return new ReadableStream({ + start(controller) { + stream.on({ + next(value: string | { $b64: string }) { + try { + if (typeof value === 'string') { + controller.enqueue(textEncoderForFactory.encode(value)) + } else { + controller.enqueue(base64ToUint8Array(value.$b64)) + } + } catch { + // Stream may be closed + } + }, + throw(error: unknown) { + controller.error(error) + }, + return() { + try { + controller.close() + } catch { + // Stream may already be closed + } + }, + }) + }, + }) +} + +// Minified factory function for binary mode - all chunks are base64 strings +// This must be self-contained since it's injected into the HTML +const FACTORY_BINARY = `(s=>new ReadableStream({start(c){s.on({next(b){try{const d=atob(b),a=new Uint8Array(d.length);for(let i=0;i{const e=new TextEncoder();return new ReadableStream({start(c){s.on({next(v){try{if(typeof v==='string'){c.enqueue(e.encode(v))}else{const d=atob(v.$b64),a=new Uint8Array(d.length);for(let i=0;i to seroval stream with base64-encoded chunks (binary mode) +function toBinaryStream(readable: ReadableStream) { + const stream = createStream() + const reader = readable.getReader() + + // Use iterative loop instead of recursive async to avoid stack accumulation + ;(async () => { + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + stream.return(undefined) + break + } + stream.next(uint8ArrayToBase64(value)) + } + } catch (error) { + stream.throw(error) + } finally { + reader.releaseLock() + } + })() + + return stream +} + +// Convert ReadableStream to seroval stream with UTF-8 first, base64 fallback (text mode) +function toTextStream(readable: ReadableStream) { + const stream = createStream() + const reader = readable.getReader() + const decoder = new TextDecoder('utf-8', { fatal: true }) + + // Use iterative loop instead of recursive async to avoid stack accumulation + ;(async () => { + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + // Flush any remaining bytes in the decoder + try { + const remaining = decoder.decode() + if (remaining.length > 0) { + stream.next(remaining) + } + } catch { + // Ignore decode errors on flush + } + stream.return(undefined) + break + } + + try { + // Try UTF-8 decode first + const text = decoder.decode(value, { stream: true }) + if (text.length > 0) { + stream.next(text) + } + } catch { + // UTF-8 decode failed, fallback to base64 + stream.next({ $b64: uint8ArrayToBase64(value) }) + } + } + } catch (error) { + stream.throw(error) + } finally { + reader.releaseLock() + } + })() + + return stream +} + +// Factory plugin for binary mode +const RawStreamFactoryBinaryPlugin = createPlugin< + Record, + undefined +>({ + tag: 'tss/RawStreamFactory', + test(value) { + return value === RAW_STREAM_FACTORY_BINARY + }, + parse: { + sync() { + return undefined + }, + async() { + return Promise.resolve(undefined) + }, + stream() { + return undefined + }, + }, + serialize() { + return FACTORY_BINARY + }, + deserialize() { + return RAW_STREAM_FACTORY_BINARY + }, +}) + +// Factory plugin for text mode +const RawStreamFactoryTextPlugin = createPlugin< + Record, + undefined +>({ + tag: 'tss/RawStreamFactoryText', + test(value) { + return value === RAW_STREAM_FACTORY_TEXT + }, + parse: { + sync() { + return undefined + }, + async() { + return Promise.resolve(undefined) + }, + stream() { + return undefined + }, + }, + serialize() { + return FACTORY_TEXT + }, + deserialize() { + return RAW_STREAM_FACTORY_TEXT + }, +}) + +/** + * SSR Plugin - uses base64 or UTF-8+base64 encoding for chunks, delegates to seroval's stream mechanism. + * Used during SSR when serializing to JavaScript code for HTML injection. + * + * Supports two modes based on RawStream hint: + * - 'binary': Always base64 encode (default) + * - 'text': Try UTF-8 first, fallback to base64 for invalid UTF-8 + */ +export const RawStreamSSRPlugin: Plugin = createPlugin({ + tag: 'tss/RawStream', + extends: [RawStreamFactoryBinaryPlugin, RawStreamFactoryTextPlugin], + + test(value: unknown) { + return value instanceof RawStream + }, + + parse: { + sync(value: RawStream, ctx) { + // Sync parse not really supported for streams, return empty stream + const factory = + value.hint === 'text' + ? RAW_STREAM_FACTORY_TEXT + : RAW_STREAM_FACTORY_BINARY + return { + hint: value.hint, + factory: ctx.parse(factory), + stream: ctx.parse(createStream()), + } + }, + async async(value: RawStream, ctx) { + const factory = + value.hint === 'text' + ? RAW_STREAM_FACTORY_TEXT + : RAW_STREAM_FACTORY_BINARY + const encodedStream = + value.hint === 'text' + ? toTextStream(value.stream) + : toBinaryStream(value.stream) + return { + hint: value.hint, + factory: await ctx.parse(factory), + stream: await ctx.parse(encodedStream), + } + }, + stream(value: RawStream, ctx) { + const factory = + value.hint === 'text' + ? RAW_STREAM_FACTORY_TEXT + : RAW_STREAM_FACTORY_BINARY + const encodedStream = + value.hint === 'text' + ? toTextStream(value.stream) + : toBinaryStream(value.stream) + return { + hint: value.hint, + factory: ctx.parse(factory), + stream: ctx.parse(encodedStream), + } + }, + }, + + serialize(node: { hint: RawStreamHint; factory: any; stream: any }, ctx) { + return ( + '(' + + ctx.serialize(node.factory) + + ')(' + + ctx.serialize(node.stream) + + ')' + ) + }, + + deserialize( + node: { hint: RawStreamHint; factory: any; stream: any }, + ctx, + ): any { + const stream: ReturnType = ctx.deserialize(node.stream) + return node.hint === 'text' + ? RAW_STREAM_FACTORY_CONSTRUCTOR_TEXT(stream) + : RAW_STREAM_FACTORY_CONSTRUCTOR_BINARY(stream) + }, +}) as Plugin + +/** + * Node type for RPC plugin serialization + */ +interface RawStreamRPCNode { + streamId: number +} + +/** + * Creates an RPC plugin instance that registers raw streams with a multiplexer. + * Used for server function responses where we want binary framing. + * Note: RPC always uses binary framing regardless of hint. + * + * @param onRawStream Callback invoked when a RawStream is encountered during serialization + */ +export function createRawStreamRPCPlugin( + onRawStream: OnRawStreamCallback, +): Plugin { + // Own stream counter - sequential IDs starting at 1, independent of seroval internals + let nextStreamId = 1 + + return createPlugin({ + tag: 'tss/RawStream', + + test(value: unknown) { + return value instanceof RawStream + }, + + parse: { + async(value: RawStream) { + const streamId = nextStreamId++ + onRawStream(streamId, value.stream) + return Promise.resolve({ streamId }) + }, + stream(value: RawStream) { + const streamId = nextStreamId++ + onRawStream(streamId, value.stream) + return { streamId } + }, + }, + + serialize(): never { + // RPC uses toCrossJSONStream which produces JSON nodes, not JS code. + // This method is only called by crossSerialize* which we don't use. + throw new Error( + 'RawStreamRPCPlugin.serialize should not be called. RPC uses JSON serialization, not JS code generation.', + ) + }, + + deserialize(): never { + // Client uses createRawStreamDeserializePlugin instead + throw new Error( + 'RawStreamRPCPlugin.deserialize should not be called. Use createRawStreamDeserializePlugin on client.', + ) + }, + }) as Plugin +} + +/** + * Creates a deserialize-only plugin for client-side stream reconstruction. + * Used in serverFnFetcher to wire up streams from frame decoder. + * + * @param getOrCreateStream Function to get/create a stream by ID from frame decoder + */ +export function createRawStreamDeserializePlugin( + getOrCreateStream: (id: number) => ReadableStream, +): Plugin { + return createPlugin({ + tag: 'tss/RawStream', + + test: () => false, // Client never serializes RawStream + + parse: {}, // Client only deserializes, never parses + + serialize(): never { + // Client never serializes RawStream back to server + throw new Error( + 'RawStreamDeserializePlugin.serialize should not be called. Client only deserializes.', + ) + }, + + deserialize(node: RawStreamRPCNode) { + return getOrCreateStream(node.streamId) + }, + }) as Plugin +} diff --git a/packages/router-core/src/ssr/serializer/seroval-plugins.ts b/packages/router-core/src/ssr/serializer/seroval-plugins.ts index 669df5eb23e..de10d3bdc04 100644 --- a/packages/router-core/src/ssr/serializer/seroval-plugins.ts +++ b/packages/router-core/src/ssr/serializer/seroval-plugins.ts @@ -1,9 +1,12 @@ import { ReadableStreamPlugin } from 'seroval-plugins/web' import { ShallowErrorPlugin } from './ShallowErrorPlugin' +import { RawStreamSSRPlugin } from './RawStream' import type { Plugin } from 'seroval' export const defaultSerovalPlugins = [ ShallowErrorPlugin as Plugin, + // RawStreamSSRPlugin must come before ReadableStreamPlugin to match first + RawStreamSSRPlugin, // ReadableStreamNode is not exported by seroval ReadableStreamPlugin as Plugin, ] diff --git a/packages/router-core/src/ssr/serializer/transformer.ts b/packages/router-core/src/ssr/serializer/transformer.ts index df86f965edd..85f55dd4340 100644 --- a/packages/router-core/src/ssr/serializer/transformer.ts +++ b/packages/router-core/src/ssr/serializer/transformer.ts @@ -8,6 +8,7 @@ import type { } from '../../router' import type { LooseReturnType } from '../../utils' import type { AnyRoute, ResolveAllSSR } from '../../route' +import type { RawStream } from './RawStream' declare const TSR_SERIALIZABLE: unique symbol export type TSR_SERIALIZABLE = typeof TSR_SERIALIZABLE @@ -21,6 +22,8 @@ export interface DefaultSerializable { undefined: undefined bigint: bigint Date: Date + Uint8Array: Uint8Array + RawStream: RawStream TsrSerializable: TsrSerializable } diff --git a/packages/router-core/tests/RawStream.test.ts b/packages/router-core/tests/RawStream.test.ts new file mode 100644 index 00000000000..106f0a7c218 --- /dev/null +++ b/packages/router-core/tests/RawStream.test.ts @@ -0,0 +1,228 @@ +import { describe, expect, it } from 'vitest' +import { toCrossJSONAsync, fromCrossJSON } from 'seroval' +import { + RawStream, + createRawStreamRPCPlugin, + createRawStreamDeserializePlugin, +} from '../src/ssr/serializer/RawStream' + +describe('RawStream', () => { + describe('RawStream class', () => { + it('should wrap a ReadableStream', () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])) + controller.close() + }, + }) + const rawStream = new RawStream(stream) + expect(rawStream.stream).toBe(stream) + }) + + it('should be an instance of RawStream', () => { + const stream = new ReadableStream() + const rawStream = new RawStream(stream) + expect(rawStream instanceof RawStream).toBe(true) + }) + + it('should default to binary hint', () => { + const stream = new ReadableStream() + const rawStream = new RawStream(stream) + expect(rawStream.hint).toBe('binary') + }) + + it('should accept binary hint option', () => { + const stream = new ReadableStream() + const rawStream = new RawStream(stream, { hint: 'binary' }) + expect(rawStream.hint).toBe('binary') + }) + + it('should accept text hint option', () => { + const stream = new ReadableStream() + const rawStream = new RawStream(stream, { hint: 'text' }) + expect(rawStream.hint).toBe('text') + }) + }) + + describe('createRawStreamRPCPlugin', () => { + it('should call onRawStream callback with stream id and stream', async () => { + const collectedStreams = new Map>() + + const plugin = createRawStreamRPCPlugin((id, stream) => { + collectedStreams.set(id, stream) + }) + + const testStream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])) + controller.close() + }, + }) + + const rawStream = new RawStream(testStream) + + await toCrossJSONAsync(rawStream, { + refs: new Map(), + plugins: [plugin], + }) + + expect(collectedStreams.size).toBe(1) + // Stream ID is assigned by our internal counter (sequential starting at 1) + const streamEntry = Array.from(collectedStreams.entries())[0] + expect(streamEntry).toBeDefined() + expect(streamEntry![1]).toBe(testStream) + }) + + it('should serialize with tss/RawStream tag', async () => { + const plugin = createRawStreamRPCPlugin(() => {}) + + const testStream = new ReadableStream() + const rawStream = new RawStream(testStream) + + const serialized = await toCrossJSONAsync(rawStream, { + refs: new Map(), + plugins: [plugin], + }) + + // The serialized output should have the plugin tag and contain streamId + const jsonStr = JSON.stringify(serialized) + expect(jsonStr).toContain('tss/RawStream') + expect(jsonStr).toContain('streamId') + }) + + it('should collect multiple streams with unique ids', async () => { + const collectedStreams = new Map>() + + const plugin = createRawStreamRPCPlugin((id, stream) => { + collectedStreams.set(id, stream) + }) + + const stream1 = new ReadableStream() + const stream2 = new ReadableStream() + + const data = { + first: new RawStream(stream1), + second: new RawStream(stream2), + } + + await toCrossJSONAsync(data, { + refs: new Map(), + plugins: [plugin], + }) + + expect(collectedStreams.size).toBe(2) + const ids = Array.from(collectedStreams.keys()) + expect(ids[0]).not.toBe(ids[1]) + }) + }) + + describe('createRawStreamDeserializePlugin', () => { + it('should reconstruct stream from getOrCreateStream function', () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([42])) + controller.close() + }, + }) + + const streams = new Map>() + streams.set(5, mockStream) + + // getOrCreateStream function that returns from map + const getOrCreateStream = (id: number) => { + let stream = streams.get(id) + if (!stream) { + stream = new ReadableStream() + streams.set(id, stream) + } + return stream + } + + const plugin = createRawStreamDeserializePlugin(getOrCreateStream) + + // Simulate seroval calling deserialize with a node + const node = { streamId: 5 } + + // Access the deserialize function directly + const deserializedStream = (plugin as any).deserialize(node, {}) + + expect(deserializedStream).toBe(mockStream) + }) + + it('should create stream if not found', () => { + const streams = new Map>() + + const getOrCreateStream = (id: number) => { + let stream = streams.get(id) + if (!stream) { + stream = new ReadableStream() + streams.set(id, stream) + } + return stream + } + + const plugin = createRawStreamDeserializePlugin(getOrCreateStream) + + const node = { streamId: 999 } + + const result = (plugin as any).deserialize(node, {}) + expect(result).toBeInstanceOf(ReadableStream) + expect(streams.get(999)).toBe(result) + }) + }) + + describe('round-trip serialization', () => { + it('should serialize and deserialize RawStream correctly', async () => { + // Collect streams during serialization + const collectedStreams = new Map>() + const rpcPlugin = createRawStreamRPCPlugin((id, stream) => { + collectedStreams.set(id, stream) + }) + + const testStream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])) + controller.close() + }, + }) + + const data = { + message: 'test', + rawData: new RawStream(testStream), + } + + // Serialize using RPC plugin + const refs = new Map() + const serialized = await toCrossJSONAsync(data, { + refs, + plugins: [rpcPlugin], + }) + + // Verify we collected the stream + expect(collectedStreams.size).toBe(1) + const streamId = Array.from(collectedStreams.keys())[0]! + + // Create getOrCreateStream function + const getOrCreateStream = (id: number) => { + const stream = collectedStreams.get(id) + if (!stream) { + throw new Error(`Stream ${id} not found in collected streams`) + } + return stream + } + + // Create deserialize plugin with function + const deserializePlugin = + createRawStreamDeserializePlugin(getOrCreateStream) + + // Deserialize + const deserialized = fromCrossJSON(serialized, { + refs: new Map(), + plugins: [deserializePlugin], + }) as any + + expect(deserialized.message).toBe('test') + expect(deserialized.rawData).toBe(testStream) + }) + }) +}) diff --git a/packages/start-client-core/src/client-rpc/frame-decoder.ts b/packages/start-client-core/src/client-rpc/frame-decoder.ts new file mode 100644 index 00000000000..dbdd605bf2f --- /dev/null +++ b/packages/start-client-core/src/client-rpc/frame-decoder.ts @@ -0,0 +1,389 @@ +/** + * Client-side frame decoder for multiplexed responses. + * + * Decodes binary frame protocol and reconstructs: + * - JSON stream (NDJSON lines for seroval) + * - Raw streams (binary data as ReadableStream) + */ + +import { FRAME_HEADER_SIZE, FrameType } from '../constants' + +/** Cached TextDecoder for frame decoding */ +const textDecoder = new TextDecoder() + +/** Shared empty buffer for empty buffer case - avoids allocation */ +const EMPTY_BUFFER = new Uint8Array(0) + +/** Hardening limits to prevent memory/CPU DoS */ +const MAX_FRAME_PAYLOAD_SIZE = 16 * 1024 * 1024 // 16MiB +const MAX_BUFFERED_BYTES = 32 * 1024 * 1024 // 32MiB +const MAX_STREAMS = 1024 +const MAX_FRAMES = 100_000 // Limit total frames to prevent CPU DoS + +/** + * Result of frame decoding. + */ +export interface FrameDecoderResult { + /** Gets or creates a raw stream by ID (for use by deserialize plugin) */ + getOrCreateStream: (id: number) => ReadableStream + /** Stream of JSON strings (NDJSON lines) */ + jsonChunks: ReadableStream +} + +/** + * Creates a frame decoder that processes a multiplexed response stream. + * + * @param input The raw response body stream + * @returns Decoded JSON stream and stream getter function + */ +export function createFrameDecoder( + input: ReadableStream, +): FrameDecoderResult { + const streamControllers = new Map< + number, + ReadableStreamDefaultController + >() + const streams = new Map>() + const cancelledStreamIds = new Set() + + let cancelled = false as boolean + let inputReader: ReadableStreamReader | null = null + let frameCount = 0 + + let jsonController!: ReadableStreamDefaultController + const jsonChunks = new ReadableStream({ + start(controller) { + jsonController = controller + }, + cancel() { + cancelled = true + try { + inputReader?.cancel() + } catch { + // Ignore + } + + streamControllers.forEach((ctrl) => { + try { + ctrl.error(new Error('Framed response cancelled')) + } catch { + // Ignore + } + }) + streamControllers.clear() + streams.clear() + cancelledStreamIds.clear() + }, + }) + + /** + * Gets or creates a stream for a given stream ID. + * Called by deserialize plugin when it encounters a RawStream reference. + */ + function getOrCreateStream(id: number): ReadableStream { + const existing = streams.get(id) + if (existing) { + return existing + } + + // If we already received an END/ERROR for this streamId, returning a fresh stream + // would hang consumers. Return an already-closed stream instead. + if (cancelledStreamIds.has(id)) { + return new ReadableStream({ + start(controller) { + controller.close() + }, + }) + } + + if (streams.size >= MAX_STREAMS) { + throw new Error( + `Too many raw streams in framed response (max ${MAX_STREAMS})`, + ) + } + + const stream = new ReadableStream({ + start(ctrl) { + streamControllers.set(id, ctrl) + }, + cancel() { + cancelledStreamIds.add(id) + streamControllers.delete(id) + streams.delete(id) + }, + }) + streams.set(id, stream) + return stream + } + + /** + * Ensures stream exists and returns its controller for enqueuing data. + * Used for CHUNK frames where we need to ensure stream is created. + */ + function ensureController( + id: number, + ): ReadableStreamDefaultController | undefined { + getOrCreateStream(id) + return streamControllers.get(id) + } + + // Process frames asynchronously + ;(async () => { + const reader = input.getReader() + inputReader = reader + + const bufferList: Array = [] + let totalLength = 0 + + /** + * Reads header bytes from buffer chunks without flattening. + * Returns header data or null if not enough bytes available. + */ + function readHeader(): { + type: number + streamId: number + length: number + } | null { + if (totalLength < FRAME_HEADER_SIZE) return null + + const first = bufferList[0]! + + // Fast path: header fits entirely in first chunk (common case) + if (first.length >= FRAME_HEADER_SIZE) { + const type = first[0]! + const streamId = + ((first[1]! << 24) | + (first[2]! << 16) | + (first[3]! << 8) | + first[4]!) >>> + 0 + const length = + ((first[5]! << 24) | + (first[6]! << 16) | + (first[7]! << 8) | + first[8]!) >>> + 0 + return { type, streamId, length } + } + + // Slow path: header spans multiple chunks - flatten header bytes only + const headerBytes = new Uint8Array(FRAME_HEADER_SIZE) + let offset = 0 + let remaining = FRAME_HEADER_SIZE + for (let i = 0; i < bufferList.length && remaining > 0; i++) { + const chunk = bufferList[i]! + const toCopy = Math.min(chunk.length, remaining) + headerBytes.set(chunk.subarray(0, toCopy), offset) + offset += toCopy + remaining -= toCopy + } + + const type = headerBytes[0]! + const streamId = + ((headerBytes[1]! << 24) | + (headerBytes[2]! << 16) | + (headerBytes[3]! << 8) | + headerBytes[4]!) >>> + 0 + const length = + ((headerBytes[5]! << 24) | + (headerBytes[6]! << 16) | + (headerBytes[7]! << 8) | + headerBytes[8]!) >>> + 0 + + return { type, streamId, length } + } + + /** + * Flattens buffer list into single Uint8Array and removes from list. + */ + function extractFlattened(count: number): Uint8Array { + if (count === 0) return EMPTY_BUFFER + + const result = new Uint8Array(count) + let offset = 0 + let remaining = count + + while (remaining > 0 && bufferList.length > 0) { + const chunk = bufferList[0] + if (!chunk) break + const toCopy = Math.min(chunk.length, remaining) + result.set(chunk.subarray(0, toCopy), offset) + + offset += toCopy + remaining -= toCopy + + if (toCopy === chunk.length) { + bufferList.shift() + } else { + bufferList[0] = chunk.subarray(toCopy) + } + } + + totalLength -= count + return result + } + + try { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { done, value } = await reader.read() + if (cancelled) break + if (done) break + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (!value) continue + + // Append incoming chunk to buffer list + if (totalLength + value.length > MAX_BUFFERED_BYTES) { + throw new Error( + `Framed response buffer exceeded ${MAX_BUFFERED_BYTES} bytes`, + ) + } + bufferList.push(value) + totalLength += value.length + + // Parse complete frames from buffer + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const header = readHeader() + if (!header) break // Not enough bytes for header + + const { type, streamId, length } = header + + if ( + type !== FrameType.JSON && + type !== FrameType.CHUNK && + type !== FrameType.END && + type !== FrameType.ERROR + ) { + throw new Error(`Unknown frame type: ${type}`) + } + + // Enforce stream id conventions: JSON uses streamId 0, raw streams use non-zero ids + if (type === FrameType.JSON) { + if (streamId !== 0) { + throw new Error('Invalid JSON frame streamId (expected 0)') + } + } else { + if (streamId === 0) { + throw new Error('Invalid raw frame streamId (expected non-zero)') + } + } + + if (length > MAX_FRAME_PAYLOAD_SIZE) { + throw new Error( + `Frame payload too large: ${length} bytes (max ${MAX_FRAME_PAYLOAD_SIZE})`, + ) + } + + const frameSize = FRAME_HEADER_SIZE + length + if (totalLength < frameSize) break // Wait for more data + + if (++frameCount > MAX_FRAMES) { + throw new Error( + `Too many frames in framed response (max ${MAX_FRAMES})`, + ) + } + + // Extract and consume header bytes + extractFlattened(FRAME_HEADER_SIZE) + + // Extract payload + const payload = extractFlattened(length) + + // Process frame by type + switch (type) { + case FrameType.JSON: { + try { + jsonController.enqueue(textDecoder.decode(payload)) + } catch { + // JSON stream may be cancelled/closed + } + break + } + + case FrameType.CHUNK: { + const ctrl = ensureController(streamId) + if (ctrl) { + ctrl.enqueue(payload) + } + break + } + + case FrameType.END: { + const ctrl = ensureController(streamId) + cancelledStreamIds.add(streamId) + if (ctrl) { + try { + ctrl.close() + } catch { + // Already closed + } + streamControllers.delete(streamId) + } + break + } + + case FrameType.ERROR: { + const ctrl = ensureController(streamId) + cancelledStreamIds.add(streamId) + if (ctrl) { + const message = textDecoder.decode(payload) + ctrl.error(new Error(message)) + streamControllers.delete(streamId) + } + break + } + } + } + } + + if (totalLength !== 0) { + throw new Error('Incomplete frame at end of framed response') + } + + // Close JSON stream when done + try { + jsonController.close() + } catch { + // JSON stream may be cancelled/closed + } + + // Close any remaining streams (shouldn't happen in normal operation) + streamControllers.forEach((ctrl) => { + try { + ctrl.close() + } catch { + // Already closed + } + }) + streamControllers.clear() + } catch (error) { + // Error reading - propagate to all streams + try { + jsonController.error(error) + } catch { + // Already errored/closed + } + streamControllers.forEach((ctrl) => { + try { + ctrl.error(error) + } catch { + // Already errored/closed + } + }) + streamControllers.clear() + } finally { + try { + reader.releaseLock() + } catch { + // Ignore + } + inputReader = null + } + })() + + return { getOrCreateStream, jsonChunks } +} diff --git a/packages/start-client-core/src/client-rpc/serverFnFetcher.ts b/packages/start-client-core/src/client-rpc/serverFnFetcher.ts index 6e54793d863..11b18fe81b8 100644 --- a/packages/start-client-core/src/client-rpc/serverFnFetcher.ts +++ b/packages/start-client-core/src/client-rpc/serverFnFetcher.ts @@ -1,12 +1,20 @@ -import { encode, isNotFound, parseRedirect } from '@tanstack/router-core' +import { + createRawStreamDeserializePlugin, + encode, + isNotFound, + parseRedirect, +} from '@tanstack/router-core' import { fromCrossJSON, toJSONAsync } from 'seroval' import invariant from 'tiny-invariant' import { getDefaultSerovalPlugins } from '../getDefaultSerovalPlugins' import { + TSS_CONTENT_TYPE_FRAMED, TSS_FORMDATA_CONTEXT, X_TSS_RAW_RESPONSE, X_TSS_SERIALIZED, + validateFramedProtocolVersion, } from '../constants' +import { createFrameDecoder } from './frame-decoder' import type { FunctionMiddlewareClientFnOptions } from '../createMiddleware' import type { Plugin as SerovalPlugin } from 'seroval' @@ -55,7 +63,10 @@ export async function serverFnFetcher( headers.set('x-tsr-serverFn', 'true') if (type === 'payload') { - headers.set('accept', 'application/x-ndjson, application/json') + headers.set( + 'accept', + `${TSS_CONTENT_TYPE_FRAMED}, application/x-ndjson, application/json`, + ) } // If the method is GET, we need to move the payload to the query string @@ -177,8 +188,36 @@ async function getResponse(fn: () => Promise) { // differently than a normal response. if (serializedByStart) { let result + + // If it's a framed response (contains RawStream), use frame decoder + if (contentType.includes(TSS_CONTENT_TYPE_FRAMED)) { + // Validate protocol version compatibility + validateFramedProtocolVersion(contentType) + + if (!response.body) { + throw new Error('No response body for framed response') + } + + const { getOrCreateStream, jsonChunks } = createFrameDecoder( + response.body, + ) + + // Create deserialize plugin that wires up the raw streams + const rawStreamPlugin = + createRawStreamDeserializePlugin(getOrCreateStream) + const plugins = [rawStreamPlugin, ...(serovalPlugins || [])] + + const refs = new Map() + result = await processFramedResponse({ + jsonStream: jsonChunks, + onMessage: (msg: any) => fromCrossJSON(msg, { refs, plugins }), + onError(msg, error) { + console.error(msg, error) + }, + }) + } // If it's a stream from the start serializer, process it as such - if (contentType.includes('application/x-ndjson')) { + else if (contentType.includes('application/x-ndjson')) { const refs = new Map() result = await processServerFnResponse({ response, @@ -191,7 +230,7 @@ async function getResponse(fn: () => Promise) { }) } // If it's a JSON response, it can be simpler - if (contentType.includes('application/json')) { + else if (contentType.includes('application/json')) { const jsonPayload = await response.json() result = fromCrossJSON(jsonPayload, { plugins: serovalPlugins! }) } @@ -310,3 +349,50 @@ async function processServerFnResponse({ return onMessage(firstObject) } + +/** + * Processes a framed response where each JSON chunk is a complete JSON string + * (already decoded by frame decoder). + */ +async function processFramedResponse({ + jsonStream, + onMessage, + onError, +}: { + jsonStream: ReadableStream + onMessage: (msg: any) => any + onError?: (msg: string, error?: any) => void +}) { + const reader = jsonStream.getReader() + + // Read first JSON frame - this is the main result + const { value: firstValue, done: firstDone } = await reader.read() + if (firstDone || !firstValue) { + throw new Error('Stream ended before first object') + } + + // Each frame is a complete JSON string + const firstObject = JSON.parse(firstValue) + + // Process remaining frames asynchronously (for streaming refs like RawStream) + ;(async () => { + try { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { value, done } = await reader.read() + if (done) break + if (value) { + try { + onMessage(JSON.parse(value)) + } catch (e) { + onError?.(`Invalid JSON: ${value}`, e) + } + } + } + } catch (err) { + onError?.('Stream processing error:', err) + } + })() + + return onMessage(firstObject) +} diff --git a/packages/start-client-core/src/constants.ts b/packages/start-client-core/src/constants.ts index 4e0777068d2..1cf6b66fc47 100644 --- a/packages/start-client-core/src/constants.ts +++ b/packages/start-client-core/src/constants.ts @@ -7,4 +7,63 @@ export const TSS_SERVER_FUNCTION_FACTORY = Symbol.for( export const X_TSS_SERIALIZED = 'x-tss-serialized' export const X_TSS_RAW_RESPONSE = 'x-tss-raw' export const X_TSS_CONTEXT = 'x-tss-context' + +/** Content-Type for multiplexed framed responses (RawStream support) */ +export const TSS_CONTENT_TYPE_FRAMED = 'application/x-tss-framed' + +/** + * Frame types for binary multiplexing protocol. + */ +export const FrameType = { + /** Seroval JSON chunk (NDJSON line) */ + JSON: 0, + /** Raw stream data chunk */ + CHUNK: 1, + /** Raw stream end (EOF) */ + END: 2, + /** Raw stream error */ + ERROR: 3, +} as const + +export type FrameType = (typeof FrameType)[keyof typeof FrameType] + +/** Header size in bytes: type(1) + streamId(4) + length(4) */ +export const FRAME_HEADER_SIZE = 9 + +/** Current protocol version for framed responses */ +export const TSS_FRAMED_PROTOCOL_VERSION = 1 + +/** Full Content-Type header value with version parameter */ +export const TSS_CONTENT_TYPE_FRAMED_VERSIONED = `${TSS_CONTENT_TYPE_FRAMED}; v=${TSS_FRAMED_PROTOCOL_VERSION}` + +/** + * Parses the version parameter from a framed Content-Type header. + * Returns undefined if no version parameter is present. + */ +const FRAMED_VERSION_REGEX = /;\s*v=(\d+)/ +export function parseFramedProtocolVersion( + contentType: string, +): number | undefined { + // Match "v=" in the content-type parameters + const match = contentType.match(FRAMED_VERSION_REGEX) + return match ? parseInt(match[1]!, 10) : undefined +} + +/** + * Validates that the server's protocol version is compatible with this client. + * Throws an error if versions are incompatible. + */ +export function validateFramedProtocolVersion(contentType: string): void { + const serverVersion = parseFramedProtocolVersion(contentType) + if (serverVersion === undefined) { + // No version specified - assume compatible (backwards compat) + return + } + if (serverVersion !== TSS_FRAMED_PROTOCOL_VERSION) { + throw new Error( + `Incompatible framed protocol version: server=${serverVersion}, client=${TSS_FRAMED_PROTOCOL_VERSION}. ` + + `Please ensure client and server are using compatible versions.`, + ) + } +} export {} diff --git a/packages/start-client-core/src/index.tsx b/packages/start-client-core/src/index.tsx index 1dbdff8f19c..0996e684c9c 100644 --- a/packages/start-client-core/src/index.tsx +++ b/packages/start-client-core/src/index.tsx @@ -2,6 +2,9 @@ export type { JsonResponse } from '@tanstack/router-core/ssr/client' export { hydrate, json, mergeHeaders } from '@tanstack/router-core/ssr/client' +export { RawStream } from '@tanstack/router-core' +export type { OnRawStreamCallback } from '@tanstack/router-core' + export { createIsomorphicFn, createServerOnlyFn, @@ -80,10 +83,17 @@ export { export { TSS_FORMDATA_CONTEXT, TSS_SERVER_FUNCTION, + TSS_CONTENT_TYPE_FRAMED, + TSS_CONTENT_TYPE_FRAMED_VERSIONED, + TSS_FRAMED_PROTOCOL_VERSION, + FrameType, + FRAME_HEADER_SIZE, X_TSS_SERIALIZED, X_TSS_RAW_RESPONSE, X_TSS_CONTEXT, + validateFramedProtocolVersion, } from './constants' +export type { FrameType as FrameTypeValue } from './constants' export type * from './serverRoute' diff --git a/packages/start-client-core/tests/frame-decoder.test.ts b/packages/start-client-core/tests/frame-decoder.test.ts new file mode 100644 index 00000000000..29b8974dea8 --- /dev/null +++ b/packages/start-client-core/tests/frame-decoder.test.ts @@ -0,0 +1,562 @@ +import { describe, expect, it } from 'vitest' +import { createFrameDecoder } from '../src/client-rpc/frame-decoder' +import { FRAME_HEADER_SIZE, FrameType } from '../src/constants' + +/** + * Helper to encode a frame for testing + */ +function encodeFrame( + type: number, + streamId: number, + payload: Uint8Array, +): Uint8Array { + const frame = new Uint8Array(FRAME_HEADER_SIZE + payload.length) + const view = new DataView(frame.buffer) + view.setUint8(0, type) + view.setUint32(1, streamId, false) + view.setUint32(5, payload.length, false) + frame.set(payload, FRAME_HEADER_SIZE) + return frame +} + +function encodeJSONFrame(json: string): Uint8Array { + return encodeFrame(FrameType.JSON, 0, new TextEncoder().encode(json)) +} + +function encodeChunkFrame(streamId: number, data: Uint8Array): Uint8Array { + return encodeFrame(FrameType.CHUNK, streamId, data) +} + +function encodeEndFrame(streamId: number): Uint8Array { + return encodeFrame(FrameType.END, streamId, new Uint8Array(0)) +} + +function encodeErrorFrame(streamId: number, message: string): Uint8Array { + return encodeFrame( + FrameType.ERROR, + streamId, + new TextEncoder().encode(message), + ) +} + +describe('frame-decoder', () => { + describe('createFrameDecoder', () => { + it('should reject unknown frame type', async () => { + const badFrame = encodeFrame(99, 0, new Uint8Array(0)) + const input = new ReadableStream({ + start(controller) { + controller.enqueue(badFrame) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('Unknown frame type') + }) + + it('should reject raw frames with streamId 0', async () => { + const badChunk = encodeFrame(FrameType.CHUNK, 0, new Uint8Array([1])) + const input = new ReadableStream({ + start(controller) { + controller.enqueue(badChunk) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('Invalid raw frame streamId') + }) + + it('should reject JSON frames with non-zero streamId', async () => { + const badJson = encodeFrame( + FrameType.JSON, + 1, + new TextEncoder().encode('{}\n'), + ) + const input = new ReadableStream({ + start(controller) { + controller.enqueue(badJson) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('Invalid JSON frame streamId') + }) + + it('should reject oversized frame payloads', async () => { + // Declare a payload length > MAX_FRAME_PAYLOAD_SIZE with no payload. + const headerOnly = new Uint8Array(FRAME_HEADER_SIZE) + const view = new DataView(headerOnly.buffer) + view.setUint8(0, FrameType.JSON) + view.setUint32(1, 0, false) + view.setUint32(5, 16 * 1024 * 1024 + 1, false) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(headerOnly) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('Frame payload too large') + }) + + it('should reject incomplete frames at end-of-stream', async () => { + const headerOnly = new Uint8Array(FRAME_HEADER_SIZE) + const view = new DataView(headerOnly.buffer) + view.setUint8(0, FrameType.JSON) + view.setUint32(1, 0, false) + view.setUint32(5, 3, false) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(headerOnly) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('Incomplete frame') + }) + + it('should cancel input when jsonChunks cancelled', async () => { + let cancelled = false + const input = new ReadableStream({ + pull() {}, + cancel() { + cancelled = true + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await reader.cancel() + expect(cancelled).toBe(true) + }) + + it('should reject too many raw streams', async () => { + // END frames create streams via ensureController, even with no CHUNKs. + const frames: Array = [] + for (let i = 1; i <= 1025; i++) { + frames.push(encodeEndFrame(i)) + } + + const totalLen = frames.reduce((acc, f) => acc + f.length, 0) + const combined = new Uint8Array(totalLen) + let offset = 0 + for (const frame of frames) { + combined.set(frame, offset) + offset += frame.length + } + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combined) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('Too many raw streams') + }) + + it('should reject when buffered bytes exceed limit', async () => { + // No valid frame can be parsed from this; we just want to exceed MAX_BUFFERED_BYTES. + const tooLarge = new Uint8Array(32 * 1024 * 1024 + 1) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(tooLarge) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + + await expect(reader.read()).rejects.toThrow('buffer exceeded') + }) + + it('should decode JSON frames', async () => { + const frame1 = encodeJSONFrame('{"line":1}') + const frame2 = encodeJSONFrame('{"line":2}') + + const combinedFrames = new Uint8Array(frame1.length + frame2.length) + combinedFrames.set(frame1, 0) + combinedFrames.set(frame2, frame1.length) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combinedFrames) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + + const reader = jsonChunks.getReader() + const chunks: Array = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + expect(chunks).toEqual(['{"line":1}', '{"line":2}']) + }) + + it('should decode raw stream chunks', async () => { + const jsonFrame = encodeJSONFrame('{}') + const chunkFrame = encodeChunkFrame(5, new Uint8Array([1, 2, 3])) + const endFrame = encodeEndFrame(5) + + const combined = new Uint8Array( + jsonFrame.length + chunkFrame.length + endFrame.length, + ) + combined.set(jsonFrame, 0) + combined.set(chunkFrame, jsonFrame.length) + combined.set(endFrame, jsonFrame.length + chunkFrame.length) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combined) + controller.close() + }, + }) + + const { jsonChunks, getOrCreateStream } = createFrameDecoder(input) + + // Pre-create the stream before consuming + const stream5 = getOrCreateStream(5) + + // Consume JSON first + const jsonReader = jsonChunks.getReader() + const { value: jsonValue } = await jsonReader.read() + expect(jsonValue).toBe('{}') + + // Read the raw stream + const rawReader = stream5.getReader() + const { value: rawValue, done: rawDone } = await rawReader.read() + + expect(rawDone).toBe(false) + expect(rawValue).toEqual(new Uint8Array([1, 2, 3])) + + const { done: finalDone } = await rawReader.read() + expect(finalDone).toBe(true) + }) + + it('should handle partial frames across chunks', async () => { + const frame = encodeJSONFrame('{"test":"data"}') + + // Split frame in the middle + const part1 = frame.slice(0, 5) + const part2 = frame.slice(5) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(part1) + controller.enqueue(part2) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + + const reader = jsonChunks.getReader() + const chunks: Array = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + expect(chunks).toEqual(['{"test":"data"}']) + }) + + it('should use fast path when header fits in first chunk', async () => { + // Single chunk contains entire frame - exercises fast path + const frame = encodeJSONFrame('{"fast":"path"}') + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(frame) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + const { value } = await reader.read() + + expect(value).toBe('{"fast":"path"}') + }) + + it('should use slow path when header spans multiple chunks', async () => { + // Split header itself across multiple chunks - exercises slow path + const frame = encodeJSONFrame('{"slow":"path"}') + + // Split at byte 3, then byte 6, then rest - header is 9 bytes + const part1 = frame.slice(0, 3) // first 3 bytes of header + const part2 = frame.slice(3, 6) // next 3 bytes of header + const part3 = frame.slice(6) // last 3 bytes of header + payload + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(part1) + controller.enqueue(part2) + controller.enqueue(part3) + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + const { value } = await reader.read() + + expect(value).toBe('{"slow":"path"}') + }) + + it('should handle header split at every byte boundary', async () => { + // Extreme case: each header byte in separate chunk + const frame = encodeJSONFrame('{"byte":"split"}') + + // Split into 9 single-byte chunks for header, then payload + const chunks: Array = [] + for (let i = 0; i < FRAME_HEADER_SIZE; i++) { + chunks.push(frame.slice(i, i + 1)) + } + chunks.push(frame.slice(FRAME_HEADER_SIZE)) // payload + + const input = new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk) + } + controller.close() + }, + }) + + const { jsonChunks } = createFrameDecoder(input) + const reader = jsonChunks.getReader() + const { value } = await reader.read() + + expect(value).toBe('{"byte":"split"}') + }) + + it('should handle multiple raw streams', async () => { + const jsonFrame = encodeJSONFrame('{}') + const chunk1 = encodeChunkFrame(1, new Uint8Array([10])) + const chunk2 = encodeChunkFrame(2, new Uint8Array([20])) + const end1 = encodeEndFrame(1) + const end2 = encodeEndFrame(2) + + const totalLen = + jsonFrame.length + + chunk1.length + + chunk2.length + + end1.length + + end2.length + const combined = new Uint8Array(totalLen) + let offset = 0 + for (const frame of [jsonFrame, chunk1, chunk2, end1, end2]) { + combined.set(frame, offset) + offset += frame.length + } + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combined) + controller.close() + }, + }) + + const { getOrCreateStream, jsonChunks } = createFrameDecoder(input) + + // Pre-create streams before consuming + const stream1 = getOrCreateStream(1) + const stream2 = getOrCreateStream(2) + + // Drain JSON + const jsonReader = jsonChunks.getReader() + await jsonReader.read() + + // Read stream 1 + const reader1 = stream1.getReader() + const { value: val1 } = await reader1.read() + expect(val1).toEqual(new Uint8Array([10])) + + // Read stream 2 + const reader2 = stream2.getReader() + const { value: val2 } = await reader2.read() + expect(val2).toEqual(new Uint8Array([20])) + }) + + it('should handle error frames for existing streams', async () => { + const jsonFrame = encodeJSONFrame('{}') + // Create a stream, then error it immediately + const chunkFrame = encodeChunkFrame(3, new Uint8Array([1])) + const errorFrame = encodeErrorFrame(3, 'Stream failed') + + const combined = new Uint8Array( + jsonFrame.length + chunkFrame.length + errorFrame.length, + ) + combined.set(jsonFrame, 0) + combined.set(chunkFrame, jsonFrame.length) + combined.set(errorFrame, jsonFrame.length + chunkFrame.length) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combined) + controller.close() + }, + }) + + const { getOrCreateStream, jsonChunks } = createFrameDecoder(input) + + // Pre-create stream 3 + const stream3 = getOrCreateStream(3) + + // Drain JSON + const jsonReader = jsonChunks.getReader() + await jsonReader.read() + + const reader = stream3.getReader() + + // Stream was created but then errored + // Reading should throw the error + let errorCaught = false + let chunkReceived = false + try { + while (true) { + const { value, done } = await reader.read() + if (done) break + if (value) chunkReceived = true + } + } catch (error) { + errorCaught = true + expect((error as Error).message).toBe('Stream failed') + } + + // Either we got the chunk before error, or we got error immediately + // The important thing is that the error was thrown + expect(errorCaught).toBe(true) + }) + + it('should preserve stream after END frame for late consumers', async () => { + // This tests a race condition fix: stream should still be available + // even if END frame is processed before getOrCreateStream is called + const jsonFrame = encodeJSONFrame('{"streamRef":7}') + const chunkFrame = encodeChunkFrame(7, new Uint8Array([42, 43, 44])) + const endFrame = encodeEndFrame(7) + + const combined = new Uint8Array( + jsonFrame.length + chunkFrame.length + endFrame.length, + ) + combined.set(jsonFrame, 0) + combined.set(chunkFrame, jsonFrame.length) + combined.set(endFrame, jsonFrame.length + chunkFrame.length) + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combined) + controller.close() + }, + }) + + const { getOrCreateStream, jsonChunks } = createFrameDecoder(input) + + // First, fully consume JSON stream (this processes all frames) + const jsonReader = jsonChunks.getReader() + const chunks: Array = [] + while (true) { + const { done, value } = await jsonReader.read() + if (done) break + chunks.push(value) + } + expect(chunks).toEqual(['{"streamRef":7}']) + + // Now call getOrCreateStream AFTER all frames processed (including END) + // This simulates deserializer calling getOrCreateStream late + const stream7 = getOrCreateStream(7) + + // The stream should still have the data that was enqueued + const rawReader = stream7.getReader() + const { value, done } = await rawReader.read() + + expect(done).toBe(false) + expect(value).toEqual(new Uint8Array([42, 43, 44])) + + // Next read should be done (stream was closed by END frame) + const { done: finalDone } = await rawReader.read() + expect(finalDone).toBe(true) + }) + + it('should handle CHUNK creating stream before getOrCreateStream called', async () => { + // CHUNK frame arrives first and creates stream internally, + // then getOrCreateStream returns the same stream with data + const chunkFrame1 = encodeChunkFrame(9, new Uint8Array([1, 2])) + const chunkFrame2 = encodeChunkFrame(9, new Uint8Array([3, 4])) + const endFrame = encodeEndFrame(9) + const jsonFrame = encodeJSONFrame('{"ref":9}') + + // Order: CHUNK, CHUNK, END, then JSON (unusual but valid) + const combined = new Uint8Array( + chunkFrame1.length + + chunkFrame2.length + + endFrame.length + + jsonFrame.length, + ) + let offset = 0 + for (const frame of [chunkFrame1, chunkFrame2, endFrame, jsonFrame]) { + combined.set(frame, offset) + offset += frame.length + } + + const input = new ReadableStream({ + start(controller) { + controller.enqueue(combined) + controller.close() + }, + }) + + const { getOrCreateStream, jsonChunks } = createFrameDecoder(input) + + // Drain JSON (processes all frames) + const jsonReader = jsonChunks.getReader() + while (true) { + const { done } = await jsonReader.read() + if (done) break + } + + // Now get the stream - should have all the data + const stream9 = getOrCreateStream(9) + const reader = stream9.getReader() + + const { value: v1 } = await reader.read() + expect(v1).toEqual(new Uint8Array([1, 2])) + + const { value: v2 } = await reader.read() + expect(v2).toEqual(new Uint8Array([3, 4])) + + const { done: finalDone } = await reader.read() + expect(finalDone).toBe(true) + }) + }) +}) diff --git a/packages/start-server-core/src/frame-protocol.ts b/packages/start-server-core/src/frame-protocol.ts new file mode 100644 index 00000000000..50929ce65aa --- /dev/null +++ b/packages/start-server-core/src/frame-protocol.ts @@ -0,0 +1,216 @@ +/** + * Binary frame protocol for multiplexing JSON and raw streams over HTTP. + * + * Frame format: [type:1][streamId:4][length:4][payload:length] + * - type: 1 byte - frame type (JSON, CHUNK, END, ERROR) + * - streamId: 4 bytes big-endian uint32 - stream identifier + * - length: 4 bytes big-endian uint32 - payload length + * - payload: variable length bytes + */ + +// Re-export constants from shared location +import { FRAME_HEADER_SIZE, FrameType } from '@tanstack/start-client-core' + +export { + FRAME_HEADER_SIZE, + FrameType, + TSS_CONTENT_TYPE_FRAMED, + TSS_CONTENT_TYPE_FRAMED_VERSIONED, + TSS_FRAMED_PROTOCOL_VERSION, +} from '@tanstack/start-client-core' + +/** Cached TextEncoder for frame encoding */ +const textEncoder = new TextEncoder() + +/** Shared empty payload for END frames - avoids allocation per call */ +const EMPTY_PAYLOAD = new Uint8Array(0) + +/** + * Encodes a single frame with header and payload. + */ +export function encodeFrame( + type: FrameType, + streamId: number, + payload: Uint8Array, +): Uint8Array { + const frame = new Uint8Array(FRAME_HEADER_SIZE + payload.length) + // Write header bytes directly to avoid DataView allocation per frame + // Frame format: [type:1][streamId:4 BE][length:4 BE] + frame[0] = type + frame[1] = (streamId >>> 24) & 0xff + frame[2] = (streamId >>> 16) & 0xff + frame[3] = (streamId >>> 8) & 0xff + frame[4] = streamId & 0xff + frame[5] = (payload.length >>> 24) & 0xff + frame[6] = (payload.length >>> 16) & 0xff + frame[7] = (payload.length >>> 8) & 0xff + frame[8] = payload.length & 0xff + frame.set(payload, FRAME_HEADER_SIZE) + return frame +} + +/** + * Encodes a JSON frame (type 0, streamId 0). + */ +export function encodeJSONFrame(json: string): Uint8Array { + return encodeFrame(FrameType.JSON, 0, textEncoder.encode(json)) +} + +/** + * Encodes a raw stream chunk frame. + */ +export function encodeChunkFrame( + streamId: number, + chunk: Uint8Array, +): Uint8Array { + return encodeFrame(FrameType.CHUNK, streamId, chunk) +} + +/** + * Encodes a raw stream end frame. + */ +export function encodeEndFrame(streamId: number): Uint8Array { + return encodeFrame(FrameType.END, streamId, EMPTY_PAYLOAD) +} + +/** + * Encodes a raw stream error frame. + */ +export function encodeErrorFrame(streamId: number, error: unknown): Uint8Array { + const message = + error instanceof Error ? error.message : String(error ?? 'Unknown error') + return encodeFrame(FrameType.ERROR, streamId, textEncoder.encode(message)) +} + +/** + * Creates a multiplexed ReadableStream from JSON stream and raw streams. + * + * The JSON stream emits NDJSON lines (from seroval's toCrossJSONStream). + * Raw streams are pumped concurrently, interleaved with JSON frames. + * + * @param jsonStream Stream of JSON strings (each string is one NDJSON line) + * @param rawStreams Map of stream IDs to raw binary streams + */ +export function createMultiplexedStream( + jsonStream: ReadableStream, + rawStreams: Map>, +): ReadableStream { + // Track active pumps for completion + let activePumps = 1 + rawStreams.size // 1 for JSON + raw streams + let controllerRef: ReadableStreamDefaultController | null = null + let cancelled = false as boolean + const cancelReaders: Array<() => void> = [] + + const safeEnqueue = (chunk: Uint8Array) => { + if (cancelled || !controllerRef) return + try { + controllerRef.enqueue(chunk) + } catch { + // Ignore enqueue after close/cancel + } + } + + const safeError = (err: unknown) => { + if (cancelled || !controllerRef) return + try { + controllerRef.error(err) + } catch { + // Ignore + } + } + + const safeClose = () => { + if (cancelled || !controllerRef) return + try { + controllerRef.close() + } catch { + // Ignore + } + } + + const checkComplete = () => { + activePumps-- + if (activePumps === 0) { + safeClose() + } + } + + return new ReadableStream({ + start(controller) { + controllerRef = controller + cancelReaders.length = 0 + + // Pump JSON stream (streamId 0) + const pumpJSON = async () => { + const reader = jsonStream.getReader() + cancelReaders.push(() => { + // Catch async rejection - reader may already be released + reader.cancel().catch(() => {}) + }) + try { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { done, value } = await reader.read() + // Check cancelled after await - flag may have changed while waiting + if (cancelled) break + if (done) break + safeEnqueue(encodeJSONFrame(value)) + } + } catch (error) { + // JSON stream error - fatal, error the whole response + safeError(error) + } finally { + reader.releaseLock() + checkComplete() + } + } + + // Pump a single raw stream with its streamId + const pumpRawStream = async ( + streamId: number, + stream: ReadableStream, + ) => { + const reader = stream.getReader() + cancelReaders.push(() => { + // Catch async rejection - reader may already be released + reader.cancel().catch(() => {}) + }) + try { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { done, value } = await reader.read() + // Check cancelled after await - flag may have changed while waiting + if (cancelled) break + if (done) { + safeEnqueue(encodeEndFrame(streamId)) + break + } + safeEnqueue(encodeChunkFrame(streamId, value)) + } + } catch (error) { + // Stream error - send ERROR frame (non-fatal, other streams continue) + safeEnqueue(encodeErrorFrame(streamId, error)) + } finally { + reader.releaseLock() + checkComplete() + } + } + + // Start all pumps concurrently + pumpJSON() + for (const [streamId, stream] of rawStreams) { + pumpRawStream(streamId, stream) + } + }, + + cancel() { + cancelled = true + controllerRef = null + // Proactively cancel all underlying readers to stop work quickly. + for (const cancelReader of cancelReaders) { + cancelReader() + } + cancelReaders.length = 0 + }, + }) +} diff --git a/packages/start-server-core/src/server-functions-handler.ts b/packages/start-server-core/src/server-functions-handler.ts index 0c3b5cb99a9..6dac0592c5b 100644 --- a/packages/start-server-core/src/server-functions-handler.ts +++ b/packages/start-server-core/src/server-functions-handler.ts @@ -1,4 +1,8 @@ -import { isNotFound, isRedirect } from '@tanstack/router-core' +import { + createRawStreamRPCPlugin, + isNotFound, + isRedirect, +} from '@tanstack/router-core' import invariant from 'tiny-invariant' import { TSS_FORMDATA_CONTEXT, @@ -10,11 +14,18 @@ import { import { fromJSON, toCrossJSONAsync, toCrossJSONStream } from 'seroval' import { getResponse } from './request-response' import { getServerFnById } from './getServerFnById' +import { + TSS_CONTENT_TYPE_FRAMED_VERSIONED, + createMultiplexedStream, +} from './frame-protocol' import type { Plugin as SerovalPlugin } from 'seroval' // Cache serovalPlugins at module level to avoid repeated calls let serovalPlugins: Array> | undefined = undefined +// Cache TextEncoder for NDJSON serialization +const textEncoder = new TextEncoder() + // Known FormData 'Content-Type' header values - module-level constant const FORM_DATA_CONTENT_TYPES = [ 'multipart/form-data', @@ -162,6 +173,17 @@ export const handleServerAction = async ({ const alsResponse = getResponse() if (res !== undefined) { + // Collect raw streams encountered during serialization + const rawStreams = new Map>() + const rawStreamPlugin = createRawStreamRPCPlugin( + (id: number, stream: ReadableStream) => { + rawStreams.set(id, stream) + }, + ) + + // Build plugins with RawStreamRPCPlugin first (before default SSR plugin) + const plugins = [rawStreamPlugin, ...(serovalPlugins || [])] + // first run without the stream in case `result` does not need streaming let done = false as boolean const callbacks: { @@ -181,7 +203,7 @@ export const handleServerAction = async ({ } toCrossJSONStream(res, { refs: new Map(), - plugins: serovalPlugins, + plugins, onParse(value) { callbacks.onParse(value) }, @@ -192,7 +214,9 @@ export const handleServerAction = async ({ callbacks.onError(error) }, }) - if (done) { + + // If no raw streams and done synchronously, return simple JSON + if (done && rawStreams.size === 0) { return new Response( nonStreamingBody ? JSON.stringify(nonStreamingBody) : undefined, { @@ -206,12 +230,52 @@ export const handleServerAction = async ({ ) } - // not done yet, we need to stream - const encoder = new TextEncoder() + // If we have raw streams, use framed protocol + if (rawStreams.size > 0) { + // Create a stream of JSON chunks (NDJSON style) + const jsonStream = new ReadableStream({ + start(controller) { + callbacks.onParse = (value) => { + controller.enqueue(JSON.stringify(value) + '\n') + } + callbacks.onDone = () => { + try { + controller.close() + } catch { + // Already closed + } + } + callbacks.onError = (error) => controller.error(error) + // Emit initial body if we have one + if (nonStreamingBody !== undefined) { + callbacks.onParse(nonStreamingBody) + } + }, + }) + + // Create multiplexed stream with JSON and raw streams + const multiplexedStream = createMultiplexedStream( + jsonStream, + rawStreams, + ) + + return new Response(multiplexedStream, { + status: alsResponse.status, + statusText: alsResponse.statusText, + headers: { + 'Content-Type': TSS_CONTENT_TYPE_FRAMED_VERSIONED, + [X_TSS_SERIALIZED]: 'true', + }, + }) + } + + // No raw streams but not done yet - use standard NDJSON streaming const stream = new ReadableStream({ start(controller) { callbacks.onParse = (value) => - controller.enqueue(encoder.encode(JSON.stringify(value) + '\n')) + controller.enqueue( + textEncoder.encode(JSON.stringify(value) + '\n'), + ) callbacks.onDone = () => { try { controller.close() @@ -220,7 +284,7 @@ export const handleServerAction = async ({ } } callbacks.onError = (error) => controller.error(error) - // stream the initial body + // stream initial body if (nonStreamingBody !== undefined) { callbacks.onParse(nonStreamingBody) } diff --git a/packages/start-server-core/tests/frame-protocol.test.ts b/packages/start-server-core/tests/frame-protocol.test.ts new file mode 100644 index 00000000000..7c4200e3507 --- /dev/null +++ b/packages/start-server-core/tests/frame-protocol.test.ts @@ -0,0 +1,351 @@ +import { describe, expect, it } from 'vitest' +import { + FRAME_HEADER_SIZE, + FrameType, + createMultiplexedStream, + encodeChunkFrame, + encodeEndFrame, + encodeErrorFrame, + encodeFrame, + encodeJSONFrame, +} from '../src/frame-protocol' + +describe('frame-protocol', () => { + describe('encodeFrame', () => { + it('should encode frame with header and payload', () => { + const payload = new Uint8Array([1, 2, 3, 4]) + const frame = encodeFrame(FrameType.CHUNK, 42, payload) + + expect(frame.length).toBe(FRAME_HEADER_SIZE + payload.length) + + // Check header + const view = new DataView(frame.buffer) + expect(view.getUint8(0)).toBe(FrameType.CHUNK) + expect(view.getUint32(1, false)).toBe(42) // streamId big-endian + expect(view.getUint32(5, false)).toBe(4) // length big-endian + + // Check payload + expect(frame.slice(FRAME_HEADER_SIZE)).toEqual(payload) + }) + + it('should handle empty payload', () => { + const frame = encodeFrame(FrameType.END, 1, new Uint8Array(0)) + + expect(frame.length).toBe(FRAME_HEADER_SIZE) + + const view = new DataView(frame.buffer) + expect(view.getUint8(0)).toBe(FrameType.END) + expect(view.getUint32(5, false)).toBe(0) // length is 0 + }) + }) + + describe('encodeJSONFrame', () => { + it('should encode JSON string as frame type 0 with streamId 0', () => { + const json = '{"hello":"world"}' + const frame = encodeJSONFrame(json) + + const view = new DataView(frame.buffer) + expect(view.getUint8(0)).toBe(FrameType.JSON) + expect(view.getUint32(1, false)).toBe(0) // streamId always 0 for JSON + + const encoder = new TextEncoder() + const expectedPayload = encoder.encode(json) + expect(view.getUint32(5, false)).toBe(expectedPayload.length) + + const payload = frame.slice(FRAME_HEADER_SIZE) + expect(new TextDecoder().decode(payload)).toBe(json) + }) + }) + + describe('encodeChunkFrame', () => { + it('should encode binary chunk with frame type CHUNK', () => { + const chunk = new Uint8Array([0xff, 0xfe, 0xfd]) + const frame = encodeChunkFrame(123, chunk) + + const view = new DataView(frame.buffer) + expect(view.getUint8(0)).toBe(FrameType.CHUNK) + expect(view.getUint32(1, false)).toBe(123) + expect(view.getUint32(5, false)).toBe(3) + + expect(frame.slice(FRAME_HEADER_SIZE)).toEqual(chunk) + }) + }) + + describe('encodeEndFrame', () => { + it('should encode end frame with empty payload', () => { + const frame = encodeEndFrame(456) + + expect(frame.length).toBe(FRAME_HEADER_SIZE) + + const view = new DataView(frame.buffer) + expect(view.getUint8(0)).toBe(FrameType.END) + expect(view.getUint32(1, false)).toBe(456) + expect(view.getUint32(5, false)).toBe(0) + }) + }) + + describe('encodeErrorFrame', () => { + it('should encode Error message', () => { + const frame = encodeErrorFrame(789, new Error('Something went wrong')) + + const view = new DataView(frame.buffer) + expect(view.getUint8(0)).toBe(FrameType.ERROR) + expect(view.getUint32(1, false)).toBe(789) + + const payload = frame.slice(FRAME_HEADER_SIZE) + expect(new TextDecoder().decode(payload)).toBe('Something went wrong') + }) + + it('should handle non-Error values', () => { + const frame = encodeErrorFrame(1, 'string error') + + const payload = frame.slice(FRAME_HEADER_SIZE) + expect(new TextDecoder().decode(payload)).toBe('string error') + }) + + it('should handle undefined error', () => { + const frame = encodeErrorFrame(1, undefined) + + const payload = frame.slice(FRAME_HEADER_SIZE) + expect(new TextDecoder().decode(payload)).toBe('Unknown error') + }) + }) + + describe('createMultiplexedStream', () => { + it('should multiplex JSON stream only', async () => { + const jsonStream = new ReadableStream({ + start(controller) { + controller.enqueue('{"data":1}') + controller.enqueue('{"data":2}') + controller.close() + }, + }) + + const multiplexed = createMultiplexedStream( + jsonStream, + new Map(), // no raw streams + ) + + const reader = multiplexed.getReader() + const chunks: Array = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + expect(chunks.length).toBe(2) + + // Both should be JSON frames + for (const chunk of chunks) { + const view = new DataView(chunk.buffer, chunk.byteOffset) + expect(view.getUint8(0)).toBe(FrameType.JSON) + } + }) + + it('should multiplex JSON and raw streams', async () => { + const jsonStream = new ReadableStream({ + start(controller) { + controller.enqueue('{"result":"ok"}') + controller.close() + }, + }) + + const rawStream = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])) + controller.close() + }, + }) + + const rawStreams = new Map>() + rawStreams.set(5, rawStream) + + const multiplexed = createMultiplexedStream(jsonStream, rawStreams) + + const reader = multiplexed.getReader() + const chunks: Array = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + // Should have: JSON frame, CHUNK frame, END frame + expect(chunks.length).toBe(3) + + const types = chunks.map((chunk) => { + const view = new DataView(chunk.buffer, chunk.byteOffset) + return view.getUint8(0) + }) + + expect(types).toContain(FrameType.JSON) + expect(types).toContain(FrameType.CHUNK) + expect(types).toContain(FrameType.END) + }) + + it('should handle cancel without errors', async () => { + // Create slow streams that won't complete before cancel + let jsonCancelled = false + let rawCancelled = false + + const jsonStream = new ReadableStream({ + async start(controller) { + await new Promise((r) => setTimeout(r, 100)) + controller.enqueue('{}\n') + controller.close() + }, + cancel() { + jsonCancelled = true + }, + }) + + const rawStream = new ReadableStream({ + async start(controller) { + await new Promise((r) => setTimeout(r, 100)) + controller.enqueue(new Uint8Array([1, 2, 3])) + controller.close() + }, + cancel() { + rawCancelled = true + }, + }) + + const rawStreams = new Map>() + rawStreams.set(1, rawStream) + + const multiplexed = createMultiplexedStream(jsonStream, rawStreams) + const reader = multiplexed.getReader() + + // Cancel immediately before streams complete + // Should not throw ERR_INVALID_STATE + await reader.cancel() + + // Underlying reader.cancel should propagate to sources + expect(jsonCancelled).toBe(true) + expect(rawCancelled).toBe(true) + }) + + it('should interleave multiple raw streams correctly', async () => { + // Two streams that emit chunks with different timing + let resolve1: () => void + let resolve2: () => void + const gate1 = new Promise((r) => (resolve1 = r)) + const gate2 = new Promise((r) => (resolve2 = r)) + + const jsonStream = new ReadableStream({ + start(controller) { + controller.enqueue('{"streams":[1,2]}') + controller.close() + }, + }) + + const rawStream1 = new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([0x11])) + await gate1 + controller.enqueue(new Uint8Array([0x12])) + controller.close() + }, + }) + + const rawStream2 = new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([0x21])) + await gate2 + controller.enqueue(new Uint8Array([0x22])) + controller.close() + }, + }) + + const rawStreams = new Map>() + rawStreams.set(1, rawStream1) + rawStreams.set(2, rawStream2) + + const multiplexed = createMultiplexedStream(jsonStream, rawStreams) + const reader = multiplexed.getReader() + + const chunks: Array = [] + + // Read first batch (JSON + first chunks from both streams) + for (let i = 0; i < 3; i++) { + const { value } = await reader.read() + if (value) chunks.push(value) + } + + // Release gates to let streams continue + resolve1!() + resolve2!() + + // Read remaining chunks + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + // Should have: 1 JSON + 2 CHUNKs + 2 CHUNKs + 2 ENDs = 7 frames + expect(chunks.length).toBe(7) + + // Verify all frame types present + const types = chunks.map((chunk) => { + const view = new DataView(chunk.buffer, chunk.byteOffset) + return view.getUint8(0) + }) + + expect(types.filter((t) => t === FrameType.JSON).length).toBe(1) + expect(types.filter((t) => t === FrameType.CHUNK).length).toBe(4) + expect(types.filter((t) => t === FrameType.END).length).toBe(2) + }) + + it('should handle raw stream error', async () => { + const jsonStream = new ReadableStream({ + start(controller) { + controller.enqueue('{}') + controller.close() + }, + }) + + const errorStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Stream failed')) + }, + }) + + const rawStreams = new Map>() + rawStreams.set(10, errorStream) + + const multiplexed = createMultiplexedStream(jsonStream, rawStreams) + + const reader = multiplexed.getReader() + const chunks: Array = [] + + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + // Should have JSON frame and ERROR frame + const types = chunks.map((chunk) => { + const view = new DataView(chunk.buffer, chunk.byteOffset) + return view.getUint8(0) + }) + + expect(types).toContain(FrameType.JSON) + expect(types).toContain(FrameType.ERROR) + + // Find ERROR frame and check content + const errorFrame = chunks.find((chunk) => { + const view = new DataView(chunk.buffer, chunk.byteOffset) + return view.getUint8(0) === FrameType.ERROR + }) + + expect(errorFrame).toBeDefined() + const payload = errorFrame!.slice(FRAME_HEADER_SIZE) + expect(new TextDecoder().decode(payload)).toBe('Stream failed') + }) + }) +})