From f5d54ae2306d71e6bb9a6e3940a49b77b3b8a66b Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 10:18:24 -0800 Subject: [PATCH 1/8] Add optional `writeToStreamMulti` function to the World interface --- .changeset/hip-candles-kick.md | 9 +++ packages/core/src/serialization.ts | 65 +++++++++++++++- packages/world-local/src/streamer.test.ts | 93 +++++++++++++++++++++++ packages/world-local/src/streamer.ts | 69 +++++++++++++++-- packages/world-postgres/src/streamer.ts | 44 ++++++++++- packages/world-vercel/src/streamer.ts | 59 ++++++++++++++ packages/world-vercel/src/utils.ts | 3 +- packages/world/src/interfaces.ts | 18 +++++ 8 files changed, 348 insertions(+), 12 deletions(-) create mode 100644 .changeset/hip-candles-kick.md diff --git a/.changeset/hip-candles-kick.md b/.changeset/hip-candles-kick.md new file mode 100644 index 0000000000..65c049f2ae --- /dev/null +++ b/.changeset/hip-candles-kick.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/world": patch +"@workflow/core": patch +--- + +Add optional `writeToStreamMulti` function to the World interface diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 98ae2abb20..f71c4f4ba0 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -272,6 +272,12 @@ export class WorkflowServerReadableStream extends ReadableStream { } } +/** + * Default flush interval in milliseconds for buffered stream writes. + * Chunks are accumulated and flushed together to reduce network overhead. + */ +const STREAM_FLUSH_INTERVAL_MS = 10; + export class WorkflowServerWritableStream extends WritableStream { constructor(name: string, runId: string | Promise) { // runId can be a promise, because we need a runID to write to a stream, @@ -287,12 +293,67 @@ export class WorkflowServerWritableStream extends WritableStream { throw new Error(`"name" is required, got "${name}"`); } const world = getWorld(); + + // Buffering state for batched writes + let buffer: Uint8Array[] = []; + let flushTimer: ReturnType | null = null; + let flushPromise: Promise | null = null; + + const flush = async (): Promise => { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + + if (buffer.length === 0) return; + + const chunksToFlush = buffer; + buffer = []; + + const _runId = await runId; + + // Use writeToStreamMulti if available for batch writes + const writeMulti = (world as any).writeToStreamMulti; + if (typeof writeMulti === 'function' && chunksToFlush.length > 1) { + await writeMulti.call(world, name, _runId, chunksToFlush); + } else { + // Fall back to sequential writes + for (const chunk of chunksToFlush) { + await world.writeToStream(name, _runId, chunk); + } + } + }; + + const scheduleFlush = (): void => { + if (flushTimer) return; // Already scheduled + + flushTimer = setTimeout(() => { + flushTimer = null; + flushPromise = flush(); + }, STREAM_FLUSH_INTERVAL_MS); + }; + super({ async write(chunk) { - const _runId = await runId; - await world.writeToStream(name, _runId, chunk); + // Wait for any in-progress flush to complete before adding to buffer + if (flushPromise) { + await flushPromise; + flushPromise = null; + } + + buffer.push(chunk); + scheduleFlush(); }, async close() { + // Wait for any in-progress flush to complete + if (flushPromise) { + await flushPromise; + flushPromise = null; + } + + // Flush any remaining buffered chunks + await flush(); + const _runId = await runId; await world.closeStream(name, _runId); }, diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index 6fb9fb1915..50162aaaf7 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -194,6 +194,99 @@ describe('streamer', () => { }); }); + describe('writeToStreamMulti', () => { + it('should write multiple chunks in a single call', async () => { + const { testDir, streamer } = await setupStreamer(); + const streamName = 'multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, [ + 'chunk1', + 'chunk2', + 'chunk3', + ]); + + const chunksDir = path.join(testDir, 'streams', 'chunks'); + const files = await fs.readdir(chunksDir); + + expect(files).toHaveLength(3); + expect(files.every((f) => f.startsWith(`${streamName}-`))).toBe(true); + }); + + it('should preserve chunk ordering', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'ordered-multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, [ + 'first', + 'second', + 'third', + ]); + await streamer.closeStream(streamName, TEST_RUN_ID); + + const readable = await streamer.readFromStream(streamName); + const reader = readable.getReader(); + const decoder = new TextDecoder(); + const chunks: string[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(decoder.decode(value)); + } + + expect(chunks).toEqual(['first', 'second', 'third']); + }); + + it('should handle empty chunks array', async () => { + const { testDir, streamer } = await setupStreamer(); + const streamName = 'empty-multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, []); + + const chunksDir = path.join(testDir, 'streams', 'chunks'); + const dirExists = await fs + .access(chunksDir) + .then(() => true) + .catch(() => false); + + // Directory might not exist if no chunks were written + if (dirExists) { + const files = await fs.readdir(chunksDir); + const streamFiles = files.filter((f) => + f.startsWith(`${streamName}-`) + ); + expect(streamFiles).toHaveLength(0); + } + }); + + it('should handle mixed string and Uint8Array chunks', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'mixed-multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, [ + 'string-chunk', + new Uint8Array([1, 2, 3, 4]), + Buffer.from('buffer-chunk'), + ]); + await streamer.closeStream(streamName, TEST_RUN_ID); + + const readable = await streamer.readFromStream(streamName); + const reader = readable.getReader(); + const chunks: Uint8Array[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + expect(chunks).toHaveLength(3); + expect(new TextDecoder().decode(chunks[0])).toBe('string-chunk'); + expect(chunks[1]).toEqual(new Uint8Array([1, 2, 3, 4])); + expect(new TextDecoder().decode(chunks[2])).toBe('buffer-chunk'); + }); + }); + describe('closeStream', () => { it('should close an empty stream', async () => { const { testDir, streamer } = await setupStreamer(); diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index bf22112d68..61b8cc359d 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -87,6 +87,17 @@ export function createStreamer(basedir: string): Streamer { registeredStreams.add(cacheKey); } + // Helper to convert a chunk to a Buffer + function toBuffer(chunk: string | Uint8Array): Buffer { + if (typeof chunk === 'string') { + return Buffer.from(new TextEncoder().encode(chunk)); + } else if (chunk instanceof Buffer) { + return chunk; + } else { + return Buffer.from(chunk); + } + } + return { async writeToStream( name: string, @@ -105,14 +116,7 @@ export function createStreamer(basedir: string): Streamer { await registerStreamForRun(runId, name); // Convert chunk to buffer for serialization - let chunkBuffer: Buffer; - if (typeof chunk === 'string') { - chunkBuffer = Buffer.from(new TextEncoder().encode(chunk)); - } else if (chunk instanceof Buffer) { - chunkBuffer = chunk; - } else { - chunkBuffer = Buffer.from(chunk); - } + const chunkBuffer = toBuffer(chunk); const serialized = serializeChunk({ chunk: chunkBuffer, @@ -138,6 +142,55 @@ export function createStreamer(basedir: string): Streamer { }); }, + async writeToStreamMulti( + name: string, + _runId: string | Promise, + chunks: (string | Uint8Array)[] + ) { + if (chunks.length === 0) return; + + // Generate all ULIDs synchronously BEFORE any await to preserve call order. + // This ensures that chunks maintain their order even when runId is a promise. + const chunkIds = chunks.map(() => `chnk_${monotonicUlid()}`); + + // Await runId if it's a promise + const runId = await _runId; + + // Register this stream for the run + await registerStreamForRun(runId, name); + + // Write all chunks in parallel for efficiency + await Promise.all( + chunks.map(async (chunk, i) => { + const chunkId = chunkIds[i]; + const chunkBuffer = toBuffer(chunk); + + const serialized = serializeChunk({ + chunk: chunkBuffer, + eof: false, + }); + + const chunkPath = path.join( + basedir, + 'streams', + 'chunks', + `${name}-${chunkId}.json` + ); + + await write(chunkPath, serialized); + + // Emit real-time event with Uint8Array (create copy to prevent ArrayBuffer detachment) + const chunkData = Uint8Array.from(chunkBuffer); + + streamEmitter.emit(`chunk:${name}` as const, { + streamName: name, + chunkData, + chunkId, + }); + }) + ); + }, + async closeStream(name: string, _runId: string | Promise) { // Generate ULID synchronously BEFORE any await to preserve call order. const chunkId = `chnk_${monotonicUlid()}`; diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 750a6af434..bef4e2a37c 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -87,6 +87,10 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { }); }); + // Helper to convert chunk to Buffer + const toBuffer = (chunk: string | Uint8Array): Buffer => + !Buffer.isBuffer(chunk) ? Buffer.from(chunk) : chunk; + return { async writeToStream( name: string, @@ -101,7 +105,7 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { chunkId, streamId: name, runId, - chunkData: !Buffer.isBuffer(chunk) ? Buffer.from(chunk) : chunk, + chunkData: toBuffer(chunk), eof: false, }); postgres.notify( @@ -114,6 +118,44 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ) ); }, + + async writeToStreamMulti( + name: string, + _runId: string | Promise, + chunks: (string | Uint8Array)[] + ) { + if (chunks.length === 0) return; + + // Generate all chunk IDs up front to preserve ordering + const chunkIds = chunks.map(() => genChunkId()); + + // Await runId if it's a promise to ensure proper flushing + const runId = await _runId; + + // Batch insert all chunks in a single query + await drizzle.insert(streams).values( + chunks.map((chunk, i) => ({ + chunkId: chunkIds[i], + streamId: name, + runId, + chunkData: toBuffer(chunk), + eof: false, + })) + ); + + // Notify for each chunk (could be batched in future if needed) + for (const chunkId of chunkIds) { + postgres.notify( + STREAM_TOPIC, + JSON.stringify( + StreamPublishMessage.encode({ + chunkId, + streamId: name, + }) + ) + ); + } + }, async closeStream( name: string, _runId: string | Promise diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 017b3eaed1..01c9f05709 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -14,6 +14,41 @@ function getStreamUrl( return new URL(`${httpConfig.baseUrl}/v2/stream/${encodeURIComponent(name)}`); } +/** + * Encode multiple chunks into a length-prefixed binary format. + * Format: [4 bytes big-endian length][chunk bytes][4 bytes length][chunk bytes]... + * + * This preserves chunk boundaries so the server can store them as separate + * chunks, maintaining correct startIndex semantics for readers. + */ +function encodeMultiChunks(chunks: (string | Uint8Array)[]): Uint8Array { + const encoder = new TextEncoder(); + + // Convert all chunks to Uint8Array and calculate total size + const binaryChunks: Uint8Array[] = []; + let totalSize = 0; + + for (const chunk of chunks) { + const binary = typeof chunk === 'string' ? encoder.encode(chunk) : chunk; + binaryChunks.push(binary); + totalSize += 4 + binary.length; // 4 bytes for length prefix + } + + // Allocate buffer and write length-prefixed chunks + const result = new Uint8Array(totalSize); + const view = new DataView(result.buffer); + let offset = 0; + + for (const binary of binaryChunks) { + view.setUint32(offset, binary.length, false); // big-endian + offset += 4; + result.set(binary, offset); + offset += binary.length; + } + + return result; +} + export function createStreamer(config?: APIConfig): Streamer { return { async writeToStream( @@ -33,6 +68,30 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, + async writeToStreamMulti( + name: string, + runId: string | Promise, + chunks: (string | Uint8Array)[] + ) { + if (chunks.length === 0) return; + + // Await runId if it's a promise to ensure proper flushing + const resolvedRunId = await runId; + + const httpConfig = await getHttpConfig(config); + + // Signal to server that this is a multi-chunk batch + httpConfig.headers.set('X-Stream-Multi', 'true'); + + const body = encodeMultiChunks(chunks); + await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { + method: 'PUT', + body, + headers: httpConfig.headers, + duplex: 'half', + }); + }, + async closeStream(name: string, runId: string | Promise) { // Await runId if it's a promise to ensure proper flushing const resolvedRunId = await runId; diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index c9d09e6842..5f30c8e29e 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -14,7 +14,8 @@ import { version } from './version.js'; * * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ -const WORKFLOW_SERVER_URL_OVERRIDE = ''; +const WORKFLOW_SERVER_URL_OVERRIDE = + 'https://workflow-server-git-01-27-implementmultiwritemodeforstre-1e1eee.vercel.sh'; export interface APIConfig { token?: string; diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index f2a5802809..c737a619e5 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -29,6 +29,24 @@ export interface Streamer { runId: string | Promise, chunk: string | Uint8Array ): Promise; + + /** + * Write multiple chunks to a stream in a single operation. + * This is an optional optimization for world implementations that can + * batch multiple writes efficiently (e.g., single HTTP request for world-vercel). + * + * If not implemented, the caller should fall back to sequential writeToStream() calls. + * + * @param name - The stream name + * @param runId - The run ID (can be a promise) + * @param chunks - Array of chunks to write, in order + */ + writeToStreamMulti?( + name: string, + runId: string | Promise, + chunks: (string | Uint8Array)[] + ): Promise; + closeStream(name: string, runId: string | Promise): Promise; readFromStream( name: string, From 777285e72697449275e82637ade6ca2b22cec0c0 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 10:31:30 -0800 Subject: [PATCH 2/8] Remove 'as any' cast --- packages/core/src/serialization.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index f71c4f4ba0..5d2221eff7 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -313,9 +313,11 @@ export class WorkflowServerWritableStream extends WritableStream { const _runId = await runId; // Use writeToStreamMulti if available for batch writes - const writeMulti = (world as any).writeToStreamMulti; - if (typeof writeMulti === 'function' && chunksToFlush.length > 1) { - await writeMulti.call(world, name, _runId, chunksToFlush); + if ( + typeof world.writeToStreamMulti === 'function' && + chunksToFlush.length > 1 + ) { + await world.writeToStreamMulti(name, _runId, chunksToFlush); } else { // Fall back to sequential writes for (const chunk of chunksToFlush) { From 70630564d79b1bce69c7ef92c01081cc41dd44ac Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 11:25:35 -0800 Subject: [PATCH 3/8] Fix world-local and add debug logging --- packages/core/src/serialization.ts | 4 ++ packages/world-local/src/streamer.ts | 67 ++++++++++++++++------------ 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 5d2221eff7..bbe7bff047 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -317,6 +317,10 @@ export class WorkflowServerWritableStream extends WritableStream { typeof world.writeToStreamMulti === 'function' && chunksToFlush.length > 1 ) { + const totalBytes = chunksToFlush.reduce((sum, c) => sum + c.length, 0); + console.log( + `[streams] Flushing ${chunksToFlush.length} chunks (${totalBytes} bytes) to stream "${name}"` + ); await world.writeToStreamMulti(name, _runId, chunksToFlush); } else { // Fall back to sequential writes diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index 61b8cc359d..0be6abe708 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -159,36 +159,45 @@ export function createStreamer(basedir: string): Streamer { // Register this stream for the run await registerStreamForRun(runId, name); - // Write all chunks in parallel for efficiency - await Promise.all( - chunks.map(async (chunk, i) => { - const chunkId = chunkIds[i]; - const chunkBuffer = toBuffer(chunk); - - const serialized = serializeChunk({ - chunk: chunkBuffer, - eof: false, - }); - - const chunkPath = path.join( - basedir, - 'streams', - 'chunks', - `${name}-${chunkId}.json` - ); - - await write(chunkPath, serialized); - - // Emit real-time event with Uint8Array (create copy to prevent ArrayBuffer detachment) - const chunkData = Uint8Array.from(chunkBuffer); + // Prepare chunk data for parallel writes + const chunkBuffers = chunks.map((chunk) => toBuffer(chunk)); + + // Write all chunks in parallel for efficiency, but track individual completion + const writePromises = chunkBuffers.map(async (chunkBuffer, i) => { + const chunkId = chunkIds[i]; + + const serialized = serializeChunk({ + chunk: chunkBuffer, + eof: false, + }); + + const chunkPath = path.join( + basedir, + 'streams', + 'chunks', + `${name}-${chunkId}.json` + ); + + await write(chunkPath, serialized); + + // Return data needed for event emission + return { + chunkId, + chunkData: Uint8Array.from(chunkBuffer), + }; + }); - streamEmitter.emit(`chunk:${name}` as const, { - streamName: name, - chunkData, - chunkId, - }); - }) - ); + // Emit events in order, waiting for each chunk's write to complete + // This ensures events are emitted in order while writes happen in parallel + for (const writePromise of writePromises) { + const { chunkId, chunkData } = await writePromise; + + streamEmitter.emit(`chunk:${name}` as const, { + streamName: name, + chunkData, + chunkId, + }); + } }, async closeStream(name: string, _runId: string | Promise) { From b4a35933e35e73479ec79e7c5ec7ea945e354ec2 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 11:52:59 -0800 Subject: [PATCH 4/8] Remove debug logging --- packages/core/src/serialization.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index bbe7bff047..5d2221eff7 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -317,10 +317,6 @@ export class WorkflowServerWritableStream extends WritableStream { typeof world.writeToStreamMulti === 'function' && chunksToFlush.length > 1 ) { - const totalBytes = chunksToFlush.reduce((sum, c) => sum + c.length, 0); - console.log( - `[streams] Flushing ${chunksToFlush.length} chunks (${totalBytes} bytes) to stream "${name}"` - ); await world.writeToStreamMulti(name, _runId, chunksToFlush); } else { // Fall back to sequential writes From 164afaf6eb92d13272a8a36ec45ee1bb302417cb Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 13:03:40 -0800 Subject: [PATCH 5/8] Use production workflow-server --- packages/world-vercel/src/utils.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 5f30c8e29e..c9d09e6842 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -14,8 +14,7 @@ import { version } from './version.js'; * * Example: 'https://workflow-server-git-branch-name.vercel.sh' */ -const WORKFLOW_SERVER_URL_OVERRIDE = - 'https://workflow-server-git-01-27-implementmultiwritemodeforstre-1e1eee.vercel.sh'; +const WORKFLOW_SERVER_URL_OVERRIDE = ''; export interface APIConfig { token?: string; From b352bde1cf252109ff1130f1e3f6161a214ca076 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 13:12:13 -0800 Subject: [PATCH 6/8] Add tests and abort handler for stream buffering - Add abort() handler to WorkflowServerWritableStream to clean up timer and discard buffer on abort (prevents leaks) - Add comprehensive unit tests for WorkflowServerWritableStream buffering logic: flush timing, concurrent writes, writeToStreamMulti fallback, promise runId handling, and abort behavior - Add unit tests for encodeMultiChunks in world-vercel - Export encodeMultiChunks for testing purposes --- packages/core/src/serialization.test.ts | 331 ++++++++++++++++++++- packages/core/src/serialization.ts | 9 + packages/world-vercel/src/streamer.test.ts | 170 +++++++++++ packages/world-vercel/src/streamer.ts | 4 +- 4 files changed, 512 insertions(+), 2 deletions(-) create mode 100644 packages/world-vercel/src/streamer.test.ts diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 4274e32e8d..8ad3bfed73 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -1,7 +1,7 @@ import { runInContext } from 'node:vm'; import type { WorkflowRuntimeError } from '@workflow/errors'; import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; -import { describe, expect, it } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { registerSerializationClass } from './class-serialization.js'; import { getStepFunction, registerStepFunction } from './private.js'; import { @@ -18,6 +18,7 @@ import { hydrateWorkflowArguments, hydrateWorkflowReturnValue, SerializationFormat, + WorkflowServerWritableStream, } from './serialization.js'; import { STABLE_ULID, STREAM_NAME_SYMBOL } from './symbols.js'; import { createContext } from './vm/index.js'; @@ -2247,3 +2248,331 @@ describe('decodeFormatPrefix legacy compatibility', () => { expect(decoded).toBe('["test"]'); }); }); + +// Mock the world module for WorkflowServerWritableStream tests +vi.mock('./runtime/world.js', () => ({ + getWorld: vi.fn(), +})); + +describe('WorkflowServerWritableStream', () => { + let mockWorld: { + writeToStream: ReturnType; + writeToStreamMulti: ReturnType; + closeStream: ReturnType; + }; + + beforeEach(async () => { + vi.useFakeTimers(); + + mockWorld = { + writeToStream: vi.fn().mockResolvedValue(undefined), + writeToStreamMulti: vi.fn().mockResolvedValue(undefined), + closeStream: vi.fn().mockResolvedValue(undefined), + }; + + const { getWorld } = await import('./runtime/world.js'); + vi.mocked(getWorld).mockReturnValue(mockWorld as any); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + describe('constructor validation', () => { + it('should throw error when runId is not a string or promise', () => { + expect(() => { + new WorkflowServerWritableStream('test-stream', 123 as any); + }).toThrow( + '"runId" must be a string or a promise that resolves to a string' + ); + }); + + it('should throw error when name is empty', () => { + expect(() => { + new WorkflowServerWritableStream('', 'run-123'); + }).toThrow('"name" is required'); + }); + + it('should accept a string runId', () => { + expect(() => { + new WorkflowServerWritableStream('test-stream', 'run-123'); + }).not.toThrow(); + }); + + it('should accept a promise runId', () => { + expect(() => { + new WorkflowServerWritableStream( + 'test-stream', + Promise.resolve('run-123') + ); + }).not.toThrow(); + }); + }); + + describe('buffering behavior', () => { + it('should buffer chunks and flush after 10ms', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write first chunk + await writer.write(new Uint8Array([1, 2, 3])); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Write second chunk + await writer.write(new Uint8Array([4, 5, 6])); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should use writeToStreamMulti for multiple chunks + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])] + ); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + await writer.close(); + }); + + it('should use writeToStream for single chunk', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write single chunk + await writer.write(new Uint8Array([1, 2, 3])); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should use writeToStream for single chunk (not writeToStreamMulti) + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + new Uint8Array([1, 2, 3]) + ); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + await writer.close(); + }); + + it('should fall back to sequential writes when writeToStreamMulti is unavailable', async () => { + // Remove writeToStreamMulti from mock world + delete (mockWorld as any).writeToStreamMulti; + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write multiple chunks + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should fall back to sequential writeToStream calls + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(2); + expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( + 1, + 'test-stream', + 'run-123', + new Uint8Array([1, 2, 3]) + ); + expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( + 2, + 'test-stream', + 'run-123', + new Uint8Array([4, 5, 6]) + ); + + await writer.close(); + }); + + it('should flush remaining buffer on close', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks but don't wait for timer + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Close should flush immediately without waiting for timer + await writer.close(); + + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); + expect(mockWorld.closeStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123' + ); + }); + + it('should not schedule multiple flush timers', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write multiple chunks rapidly + await writer.write(new Uint8Array([1])); + await writer.write(new Uint8Array([2])); + await writer.write(new Uint8Array([3])); + + // Advance timer once + await vi.advanceTimersByTimeAsync(10); + + // Should only call writeToStreamMulti once with all chunks + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([1]), new Uint8Array([2]), new Uint8Array([3])] + ); + + await writer.close(); + }); + + it('should handle multiple flush cycles', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // First batch + await writer.write(new Uint8Array([1, 2])); + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + // Second batch + await writer.write(new Uint8Array([3, 4])); + await writer.write(new Uint8Array([5, 6])); + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([3, 4]), new Uint8Array([5, 6])] + ); + + await writer.close(); + }); + + it('should wait for in-progress flush before adding to buffer', async () => { + // Create a slow writeToStreamMulti that we can control + let resolveWrite: () => void; + mockWorld.writeToStreamMulti.mockImplementation( + () => + new Promise((resolve) => { + resolveWrite = resolve; + }) + ); + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write and trigger flush + await writer.write(new Uint8Array([1, 2])); + await writer.write(new Uint8Array([3, 4])); + await vi.advanceTimersByTimeAsync(10); + + // Flush started but not completed + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + + // Write more while flush is in progress + const writePromise = writer.write(new Uint8Array([5, 6])); + + // Resolve the first flush + resolveWrite!(); + await writePromise; + + // Now advance timer to flush the new chunk + await vi.advanceTimersByTimeAsync(10); + + // Second flush should have happened + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + new Uint8Array([5, 6]) + ); + + await writer.close(); + }); + }); + + describe('abort behavior', () => { + it('should clean up timer and discard buffer on abort', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Abort the stream + await writer.abort(new Error('Test abort')); + + // Advance timer - should NOT trigger flush since stream was aborted + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + expect(mockWorld.closeStream).not.toHaveBeenCalled(); + }); + }); + + describe('promise runId handling', () => { + it('should wait for runId promise before writing', async () => { + let resolveRunId: (value: string) => void; + const runIdPromise = new Promise((resolve) => { + resolveRunId = resolve; + }); + + const stream = new WorkflowServerWritableStream( + 'test-stream', + runIdPromise + ); + const writer = stream.getWriter(); + + // Write and trigger flush + await writer.write(new Uint8Array([1, 2, 3])); + await vi.advanceTimersByTimeAsync(10); + + // Write should not have happened yet because runId is not resolved + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + // Resolve runId + resolveRunId!('resolved-run-123'); + await vi.advanceTimersByTimeAsync(0); // Let promises settle + + // Now the write should have happened + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'resolved-run-123', + new Uint8Array([1, 2, 3]) + ); + + await writer.close(); + }); + }); + + describe('empty buffer handling', () => { + it('should not call write methods when buffer is empty on close', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Close without writing anything + await writer.close(); + + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 5d2221eff7..4a1248ad0c 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -359,6 +359,15 @@ export class WorkflowServerWritableStream extends WritableStream { const _runId = await runId; await world.closeStream(name, _runId); }, + abort() { + // Clean up timer to prevent leaks + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + // Discard buffered chunks - they won't be written + buffer = []; + }, }); } } diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts new file mode 100644 index 0000000000..45193ce31c --- /dev/null +++ b/packages/world-vercel/src/streamer.test.ts @@ -0,0 +1,170 @@ +import { describe, expect, it } from 'vitest'; +import { encodeMultiChunks } from './streamer.js'; + +describe('encodeMultiChunks', () => { + /** + * Helper to decode length-prefixed chunks back to verify encoding + */ + function decodeMultiChunks(encoded: Uint8Array): Uint8Array[] { + const chunks: Uint8Array[] = []; + const view = new DataView( + encoded.buffer, + encoded.byteOffset, + encoded.byteLength + ); + let offset = 0; + + while (offset < encoded.length) { + const length = view.getUint32(offset, false); // big-endian + offset += 4; + chunks.push(encoded.slice(offset, offset + length)); + offset += length; + } + + return chunks; + } + + it('should encode an empty array', () => { + const result = encodeMultiChunks([]); + expect(result.length).toBe(0); + }); + + it('should encode a single string chunk', () => { + const result = encodeMultiChunks(['hello']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(1); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + }); + + it('should encode a single Uint8Array chunk', () => { + const chunk = new Uint8Array([1, 2, 3, 4, 5]); + const result = encodeMultiChunks([chunk]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(1); + expect(decoded[0]).toEqual(chunk); + }); + + it('should encode multiple string chunks', () => { + const result = encodeMultiChunks(['hello', 'world', 'test']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + expect(new TextDecoder().decode(decoded[1])).toBe('world'); + expect(new TextDecoder().decode(decoded[2])).toBe('test'); + }); + + it('should encode multiple Uint8Array chunks', () => { + const chunks = [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5]), + new Uint8Array([6, 7, 8, 9]), + ]; + const result = encodeMultiChunks(chunks); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(decoded[0]).toEqual(chunks[0]); + expect(decoded[1]).toEqual(chunks[1]); + expect(decoded[2]).toEqual(chunks[2]); + }); + + it('should encode mixed string and Uint8Array chunks', () => { + const result = encodeMultiChunks([ + 'hello', + new Uint8Array([1, 2, 3]), + 'world', + ]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + expect(decoded[1]).toEqual(new Uint8Array([1, 2, 3])); + expect(new TextDecoder().decode(decoded[2])).toBe('world'); + }); + + it('should handle empty string chunks', () => { + const result = encodeMultiChunks(['', 'hello', '']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(decoded[0].length).toBe(0); + expect(new TextDecoder().decode(decoded[1])).toBe('hello'); + expect(decoded[2].length).toBe(0); + }); + + it('should handle empty Uint8Array chunks', () => { + const result = encodeMultiChunks([ + new Uint8Array([]), + new Uint8Array([1, 2]), + new Uint8Array([]), + ]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(decoded[0].length).toBe(0); + expect(decoded[1]).toEqual(new Uint8Array([1, 2])); + expect(decoded[2].length).toBe(0); + }); + + it('should correctly calculate total size with length prefixes', () => { + // Each chunk has a 4-byte length prefix + // 'hello' = 5 bytes, 'world' = 5 bytes + // Total = 4 + 5 + 4 + 5 = 18 bytes + const result = encodeMultiChunks(['hello', 'world']); + expect(result.length).toBe(18); + }); + + it('should use big-endian encoding for length prefix', () => { + const result = encodeMultiChunks(['hello']); + const view = new DataView( + result.buffer, + result.byteOffset, + result.byteLength + ); + + // 'hello' is 5 bytes, big-endian encoding of 5 is [0, 0, 0, 5] + expect(view.getUint32(0, false)).toBe(5); + expect(result[0]).toBe(0); + expect(result[1]).toBe(0); + expect(result[2]).toBe(0); + expect(result[3]).toBe(5); + }); + + it('should handle large chunks', () => { + // Create a 10KB chunk + const largeChunk = new Uint8Array(10 * 1024); + for (let i = 0; i < largeChunk.length; i++) { + largeChunk[i] = i % 256; + } + + const result = encodeMultiChunks([largeChunk]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(1); + expect(decoded[0]).toEqual(largeChunk); + }); + + it('should handle many small chunks', () => { + const chunks = Array.from({ length: 100 }, (_, i) => `chunk${i}`); + const result = encodeMultiChunks(chunks); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(100); + decoded.forEach((chunk, i) => { + expect(new TextDecoder().decode(chunk)).toBe(`chunk${i}`); + }); + }); + + it('should handle unicode strings correctly', () => { + const result = encodeMultiChunks(['hello', 'δΈ–η•Œ', 'πŸš€']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + expect(new TextDecoder().decode(decoded[1])).toBe('δΈ–η•Œ'); + expect(new TextDecoder().decode(decoded[2])).toBe('πŸš€'); + }); +}); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 01c9f05709..a6127ff99b 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -20,8 +20,10 @@ function getStreamUrl( * * This preserves chunk boundaries so the server can store them as separate * chunks, maintaining correct startIndex semantics for readers. + * + * @internal Exported for testing purposes */ -function encodeMultiChunks(chunks: (string | Uint8Array)[]): Uint8Array { +export function encodeMultiChunks(chunks: (string | Uint8Array)[]): Uint8Array { const encoder = new TextEncoder(); // Convert all chunks to Uint8Array and calculate total size From 8dbc060c121a5970fd4ad8a0351181ff4abac604 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 27 Jan 2026 13:18:59 -0800 Subject: [PATCH 7/8] Move WorkflowServerWritableStream tests to separate file The vi.mock() call was being hoisted and affecting other tests in serialization.test.ts. Moving to a separate file isolates the mock. --- packages/core/src/serialization.test.ts | 331 +--------------------- packages/core/src/writable-stream.test.ts | 330 +++++++++++++++++++++ 2 files changed, 331 insertions(+), 330 deletions(-) create mode 100644 packages/core/src/writable-stream.test.ts diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 8ad3bfed73..4274e32e8d 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -1,7 +1,7 @@ import { runInContext } from 'node:vm'; import type { WorkflowRuntimeError } from '@workflow/errors'; import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { describe, expect, it } from 'vitest'; import { registerSerializationClass } from './class-serialization.js'; import { getStepFunction, registerStepFunction } from './private.js'; import { @@ -18,7 +18,6 @@ import { hydrateWorkflowArguments, hydrateWorkflowReturnValue, SerializationFormat, - WorkflowServerWritableStream, } from './serialization.js'; import { STABLE_ULID, STREAM_NAME_SYMBOL } from './symbols.js'; import { createContext } from './vm/index.js'; @@ -2248,331 +2247,3 @@ describe('decodeFormatPrefix legacy compatibility', () => { expect(decoded).toBe('["test"]'); }); }); - -// Mock the world module for WorkflowServerWritableStream tests -vi.mock('./runtime/world.js', () => ({ - getWorld: vi.fn(), -})); - -describe('WorkflowServerWritableStream', () => { - let mockWorld: { - writeToStream: ReturnType; - writeToStreamMulti: ReturnType; - closeStream: ReturnType; - }; - - beforeEach(async () => { - vi.useFakeTimers(); - - mockWorld = { - writeToStream: vi.fn().mockResolvedValue(undefined), - writeToStreamMulti: vi.fn().mockResolvedValue(undefined), - closeStream: vi.fn().mockResolvedValue(undefined), - }; - - const { getWorld } = await import('./runtime/world.js'); - vi.mocked(getWorld).mockReturnValue(mockWorld as any); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.clearAllMocks(); - }); - - describe('constructor validation', () => { - it('should throw error when runId is not a string or promise', () => { - expect(() => { - new WorkflowServerWritableStream('test-stream', 123 as any); - }).toThrow( - '"runId" must be a string or a promise that resolves to a string' - ); - }); - - it('should throw error when name is empty', () => { - expect(() => { - new WorkflowServerWritableStream('', 'run-123'); - }).toThrow('"name" is required'); - }); - - it('should accept a string runId', () => { - expect(() => { - new WorkflowServerWritableStream('test-stream', 'run-123'); - }).not.toThrow(); - }); - - it('should accept a promise runId', () => { - expect(() => { - new WorkflowServerWritableStream( - 'test-stream', - Promise.resolve('run-123') - ); - }).not.toThrow(); - }); - }); - - describe('buffering behavior', () => { - it('should buffer chunks and flush after 10ms', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write first chunk - await writer.write(new Uint8Array([1, 2, 3])); - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); - - // Write second chunk - await writer.write(new Uint8Array([4, 5, 6])); - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); - - // Advance timer to trigger flush - await vi.advanceTimersByTimeAsync(10); - - // Should use writeToStreamMulti for multiple chunks - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( - 'test-stream', - 'run-123', - [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])] - ); - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - - await writer.close(); - }); - - it('should use writeToStream for single chunk', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write single chunk - await writer.write(new Uint8Array([1, 2, 3])); - - // Advance timer to trigger flush - await vi.advanceTimersByTimeAsync(10); - - // Should use writeToStream for single chunk (not writeToStreamMulti) - expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); - expect(mockWorld.writeToStream).toHaveBeenCalledWith( - 'test-stream', - 'run-123', - new Uint8Array([1, 2, 3]) - ); - expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); - - await writer.close(); - }); - - it('should fall back to sequential writes when writeToStreamMulti is unavailable', async () => { - // Remove writeToStreamMulti from mock world - delete (mockWorld as any).writeToStreamMulti; - - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write multiple chunks - await writer.write(new Uint8Array([1, 2, 3])); - await writer.write(new Uint8Array([4, 5, 6])); - - // Advance timer to trigger flush - await vi.advanceTimersByTimeAsync(10); - - // Should fall back to sequential writeToStream calls - expect(mockWorld.writeToStream).toHaveBeenCalledTimes(2); - expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( - 1, - 'test-stream', - 'run-123', - new Uint8Array([1, 2, 3]) - ); - expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( - 2, - 'test-stream', - 'run-123', - new Uint8Array([4, 5, 6]) - ); - - await writer.close(); - }); - - it('should flush remaining buffer on close', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write chunks but don't wait for timer - await writer.write(new Uint8Array([1, 2, 3])); - await writer.write(new Uint8Array([4, 5, 6])); - - expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); - - // Close should flush immediately without waiting for timer - await writer.close(); - - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); - expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); - expect(mockWorld.closeStream).toHaveBeenCalledWith( - 'test-stream', - 'run-123' - ); - }); - - it('should not schedule multiple flush timers', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write multiple chunks rapidly - await writer.write(new Uint8Array([1])); - await writer.write(new Uint8Array([2])); - await writer.write(new Uint8Array([3])); - - // Advance timer once - await vi.advanceTimersByTimeAsync(10); - - // Should only call writeToStreamMulti once with all chunks - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( - 'test-stream', - 'run-123', - [new Uint8Array([1]), new Uint8Array([2]), new Uint8Array([3])] - ); - - await writer.close(); - }); - - it('should handle multiple flush cycles', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // First batch - await writer.write(new Uint8Array([1, 2])); - await vi.advanceTimersByTimeAsync(10); - - expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); - - // Second batch - await writer.write(new Uint8Array([3, 4])); - await writer.write(new Uint8Array([5, 6])); - await vi.advanceTimersByTimeAsync(10); - - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( - 'test-stream', - 'run-123', - [new Uint8Array([3, 4]), new Uint8Array([5, 6])] - ); - - await writer.close(); - }); - - it('should wait for in-progress flush before adding to buffer', async () => { - // Create a slow writeToStreamMulti that we can control - let resolveWrite: () => void; - mockWorld.writeToStreamMulti.mockImplementation( - () => - new Promise((resolve) => { - resolveWrite = resolve; - }) - ); - - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write and trigger flush - await writer.write(new Uint8Array([1, 2])); - await writer.write(new Uint8Array([3, 4])); - await vi.advanceTimersByTimeAsync(10); - - // Flush started but not completed - expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); - - // Write more while flush is in progress - const writePromise = writer.write(new Uint8Array([5, 6])); - - // Resolve the first flush - resolveWrite!(); - await writePromise; - - // Now advance timer to flush the new chunk - await vi.advanceTimersByTimeAsync(10); - - // Second flush should have happened - expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); - expect(mockWorld.writeToStream).toHaveBeenCalledWith( - 'test-stream', - 'run-123', - new Uint8Array([5, 6]) - ); - - await writer.close(); - }); - }); - - describe('abort behavior', () => { - it('should clean up timer and discard buffer on abort', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Write chunks - await writer.write(new Uint8Array([1, 2, 3])); - await writer.write(new Uint8Array([4, 5, 6])); - - // Abort the stream - await writer.abort(new Error('Test abort')); - - // Advance timer - should NOT trigger flush since stream was aborted - await vi.advanceTimersByTimeAsync(10); - - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); - expect(mockWorld.closeStream).not.toHaveBeenCalled(); - }); - }); - - describe('promise runId handling', () => { - it('should wait for runId promise before writing', async () => { - let resolveRunId: (value: string) => void; - const runIdPromise = new Promise((resolve) => { - resolveRunId = resolve; - }); - - const stream = new WorkflowServerWritableStream( - 'test-stream', - runIdPromise - ); - const writer = stream.getWriter(); - - // Write and trigger flush - await writer.write(new Uint8Array([1, 2, 3])); - await vi.advanceTimersByTimeAsync(10); - - // Write should not have happened yet because runId is not resolved - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - - // Resolve runId - resolveRunId!('resolved-run-123'); - await vi.advanceTimersByTimeAsync(0); // Let promises settle - - // Now the write should have happened - expect(mockWorld.writeToStream).toHaveBeenCalledWith( - 'test-stream', - 'resolved-run-123', - new Uint8Array([1, 2, 3]) - ); - - await writer.close(); - }); - }); - - describe('empty buffer handling', () => { - it('should not call write methods when buffer is empty on close', async () => { - const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); - const writer = stream.getWriter(); - - // Close without writing anything - await writer.close(); - - expect(mockWorld.writeToStream).not.toHaveBeenCalled(); - expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); - expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); - }); - }); -}); diff --git a/packages/core/src/writable-stream.test.ts b/packages/core/src/writable-stream.test.ts new file mode 100644 index 0000000000..714809d999 --- /dev/null +++ b/packages/core/src/writable-stream.test.ts @@ -0,0 +1,330 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { WorkflowServerWritableStream } from './serialization.js'; + +// Mock the world module for WorkflowServerWritableStream tests +vi.mock('./runtime/world.js', () => ({ + getWorld: vi.fn(), +})); + +describe('WorkflowServerWritableStream', () => { + let mockWorld: { + writeToStream: ReturnType; + writeToStreamMulti: ReturnType; + closeStream: ReturnType; + }; + + beforeEach(async () => { + vi.useFakeTimers(); + + mockWorld = { + writeToStream: vi.fn().mockResolvedValue(undefined), + writeToStreamMulti: vi.fn().mockResolvedValue(undefined), + closeStream: vi.fn().mockResolvedValue(undefined), + }; + + const { getWorld } = await import('./runtime/world.js'); + vi.mocked(getWorld).mockReturnValue(mockWorld as any); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + describe('constructor validation', () => { + it('should throw error when runId is not a string or promise', () => { + expect(() => { + new WorkflowServerWritableStream('test-stream', 123 as any); + }).toThrow( + '"runId" must be a string or a promise that resolves to a string' + ); + }); + + it('should throw error when name is empty', () => { + expect(() => { + new WorkflowServerWritableStream('', 'run-123'); + }).toThrow('"name" is required'); + }); + + it('should accept a string runId', () => { + expect(() => { + new WorkflowServerWritableStream('test-stream', 'run-123'); + }).not.toThrow(); + }); + + it('should accept a promise runId', () => { + expect(() => { + new WorkflowServerWritableStream( + 'test-stream', + Promise.resolve('run-123') + ); + }).not.toThrow(); + }); + }); + + describe('buffering behavior', () => { + it('should buffer chunks and flush after 10ms', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write first chunk + await writer.write(new Uint8Array([1, 2, 3])); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Write second chunk + await writer.write(new Uint8Array([4, 5, 6])); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should use writeToStreamMulti for multiple chunks + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])] + ); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + await writer.close(); + }); + + it('should use writeToStream for single chunk', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write single chunk + await writer.write(new Uint8Array([1, 2, 3])); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should use writeToStream for single chunk (not writeToStreamMulti) + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + new Uint8Array([1, 2, 3]) + ); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + await writer.close(); + }); + + it('should fall back to sequential writes when writeToStreamMulti is unavailable', async () => { + // Remove writeToStreamMulti from mock world + delete (mockWorld as any).writeToStreamMulti; + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write multiple chunks + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should fall back to sequential writeToStream calls + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(2); + expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( + 1, + 'test-stream', + 'run-123', + new Uint8Array([1, 2, 3]) + ); + expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( + 2, + 'test-stream', + 'run-123', + new Uint8Array([4, 5, 6]) + ); + + await writer.close(); + }); + + it('should flush remaining buffer on close', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks but don't wait for timer + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Close should flush immediately without waiting for timer + await writer.close(); + + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); + expect(mockWorld.closeStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123' + ); + }); + + it('should not schedule multiple flush timers', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write multiple chunks rapidly + await writer.write(new Uint8Array([1])); + await writer.write(new Uint8Array([2])); + await writer.write(new Uint8Array([3])); + + // Advance timer once + await vi.advanceTimersByTimeAsync(10); + + // Should only call writeToStreamMulti once with all chunks + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([1]), new Uint8Array([2]), new Uint8Array([3])] + ); + + await writer.close(); + }); + + it('should handle multiple flush cycles', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // First batch + await writer.write(new Uint8Array([1, 2])); + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + // Second batch + await writer.write(new Uint8Array([3, 4])); + await writer.write(new Uint8Array([5, 6])); + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([3, 4]), new Uint8Array([5, 6])] + ); + + await writer.close(); + }); + + it('should wait for in-progress flush before adding to buffer', async () => { + // Create a slow writeToStreamMulti that we can control + let resolveWrite: () => void; + mockWorld.writeToStreamMulti.mockImplementation( + () => + new Promise((resolve) => { + resolveWrite = resolve; + }) + ); + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write and trigger flush + await writer.write(new Uint8Array([1, 2])); + await writer.write(new Uint8Array([3, 4])); + await vi.advanceTimersByTimeAsync(10); + + // Flush started but not completed + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + + // Write more while flush is in progress + const writePromise = writer.write(new Uint8Array([5, 6])); + + // Resolve the first flush + resolveWrite!(); + await writePromise; + + // Now advance timer to flush the new chunk + await vi.advanceTimersByTimeAsync(10); + + // Second flush should have happened + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + new Uint8Array([5, 6]) + ); + + await writer.close(); + }); + }); + + describe('abort behavior', () => { + it('should clean up timer and discard buffer on abort', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Abort the stream + await writer.abort(new Error('Test abort')); + + // Advance timer - should NOT trigger flush since stream was aborted + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + expect(mockWorld.closeStream).not.toHaveBeenCalled(); + }); + }); + + describe('promise runId handling', () => { + it('should wait for runId promise before writing', async () => { + let resolveRunId: (value: string) => void; + const runIdPromise = new Promise((resolve) => { + resolveRunId = resolve; + }); + + const stream = new WorkflowServerWritableStream( + 'test-stream', + runIdPromise + ); + const writer = stream.getWriter(); + + // Write and trigger flush + await writer.write(new Uint8Array([1, 2, 3])); + await vi.advanceTimersByTimeAsync(10); + + // Write should not have happened yet because runId is not resolved + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + // Resolve runId + resolveRunId!('resolved-run-123'); + await vi.advanceTimersByTimeAsync(0); // Let promises settle + + // Now the write should have happened + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'resolved-run-123', + new Uint8Array([1, 2, 3]) + ); + + await writer.close(); + }); + }); + + describe('empty buffer handling', () => { + it('should not call write methods when buffer is empty on close', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Close without writing anything + await writer.close(); + + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); + }); + }); +}); From 4d7474669d791ba04069b35cfffe99e498b27a12 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Mon, 2 Feb 2026 14:03:24 -0800 Subject: [PATCH 8/8] Address review comments --- packages/core/src/serialization.ts | 8 ++++++-- packages/core/src/writable-stream.test.ts | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 4a1248ad0c..b8fa4a3f51 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -307,8 +307,9 @@ export class WorkflowServerWritableStream extends WritableStream { if (buffer.length === 0) return; - const chunksToFlush = buffer; - buffer = []; + // Copy chunks to flush, but don't clear buffer until write succeeds + // This prevents data loss if the write operation fails + const chunksToFlush = buffer.slice(); const _runId = await runId; @@ -324,6 +325,9 @@ export class WorkflowServerWritableStream extends WritableStream { await world.writeToStream(name, _runId, chunk); } } + + // Only clear buffer after successful write to prevent data loss + buffer = []; }; const scheduleFlush = (): void => { diff --git a/packages/core/src/writable-stream.test.ts b/packages/core/src/writable-stream.test.ts index 714809d999..6941b65342 100644 --- a/packages/core/src/writable-stream.test.ts +++ b/packages/core/src/writable-stream.test.ts @@ -327,4 +327,21 @@ describe('WorkflowServerWritableStream', () => { expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); }); }); + + describe('error handling', () => { + it('should propagate write errors from close', async () => { + // Make writeToStreamMulti fail + mockWorld.writeToStreamMulti.mockRejectedValue(new Error('Write failed')); + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks (buffered, no error yet) + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Close should propagate the error from flush + await expect(writer.close()).rejects.toThrow('Write failed'); + }); + }); });