diff --git a/.changeset/hip-candles-kick.md b/.changeset/hip-candles-kick.md new file mode 100644 index 0000000000..65c049f2ae --- /dev/null +++ b/.changeset/hip-candles-kick.md @@ -0,0 +1,9 @@ +--- +"@workflow/world-postgres": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/world": patch +"@workflow/core": patch +--- + +Add optional `writeToStreamMulti` function to the World interface diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 98ae2abb20..b8fa4a3f51 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -272,6 +272,12 @@ export class WorkflowServerReadableStream extends ReadableStream { } } +/** + * Default flush interval in milliseconds for buffered stream writes. + * Chunks are accumulated and flushed together to reduce network overhead. + */ +const STREAM_FLUSH_INTERVAL_MS = 10; + export class WorkflowServerWritableStream extends WritableStream { constructor(name: string, runId: string | Promise) { // runId can be a promise, because we need a runID to write to a stream, @@ -287,15 +293,85 @@ export class WorkflowServerWritableStream extends WritableStream { throw new Error(`"name" is required, got "${name}"`); } const world = getWorld(); + + // Buffering state for batched writes + let buffer: Uint8Array[] = []; + let flushTimer: ReturnType | null = null; + let flushPromise: Promise | null = null; + + const flush = async (): Promise => { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + + if (buffer.length === 0) return; + + // Copy chunks to flush, but don't clear buffer until write succeeds + // This prevents data loss if the write operation fails + const chunksToFlush = buffer.slice(); + + const _runId = await runId; + + // Use writeToStreamMulti if available for batch writes + if ( + typeof world.writeToStreamMulti === 'function' && + chunksToFlush.length > 1 + ) { + await world.writeToStreamMulti(name, _runId, chunksToFlush); + } else { + // Fall back to sequential writes + for (const chunk of chunksToFlush) { + await world.writeToStream(name, _runId, chunk); + } + } + + // Only clear buffer after successful write to prevent data loss + buffer = []; + }; + + const scheduleFlush = (): void => { + if (flushTimer) return; // Already scheduled + + flushTimer = setTimeout(() => { + flushTimer = null; + flushPromise = flush(); + }, STREAM_FLUSH_INTERVAL_MS); + }; + super({ async write(chunk) { - const _runId = await runId; - await world.writeToStream(name, _runId, chunk); + // Wait for any in-progress flush to complete before adding to buffer + if (flushPromise) { + await flushPromise; + flushPromise = null; + } + + buffer.push(chunk); + scheduleFlush(); }, async close() { + // Wait for any in-progress flush to complete + if (flushPromise) { + await flushPromise; + flushPromise = null; + } + + // Flush any remaining buffered chunks + await flush(); + const _runId = await runId; await world.closeStream(name, _runId); }, + abort() { + // Clean up timer to prevent leaks + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + // Discard buffered chunks - they won't be written + buffer = []; + }, }); } } diff --git a/packages/core/src/writable-stream.test.ts b/packages/core/src/writable-stream.test.ts new file mode 100644 index 0000000000..6941b65342 --- /dev/null +++ b/packages/core/src/writable-stream.test.ts @@ -0,0 +1,347 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { WorkflowServerWritableStream } from './serialization.js'; + +// Mock the world module for WorkflowServerWritableStream tests +vi.mock('./runtime/world.js', () => ({ + getWorld: vi.fn(), +})); + +describe('WorkflowServerWritableStream', () => { + let mockWorld: { + writeToStream: ReturnType; + writeToStreamMulti: ReturnType; + closeStream: ReturnType; + }; + + beforeEach(async () => { + vi.useFakeTimers(); + + mockWorld = { + writeToStream: vi.fn().mockResolvedValue(undefined), + writeToStreamMulti: vi.fn().mockResolvedValue(undefined), + closeStream: vi.fn().mockResolvedValue(undefined), + }; + + const { getWorld } = await import('./runtime/world.js'); + vi.mocked(getWorld).mockReturnValue(mockWorld as any); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + describe('constructor validation', () => { + it('should throw error when runId is not a string or promise', () => { + expect(() => { + new WorkflowServerWritableStream('test-stream', 123 as any); + }).toThrow( + '"runId" must be a string or a promise that resolves to a string' + ); + }); + + it('should throw error when name is empty', () => { + expect(() => { + new WorkflowServerWritableStream('', 'run-123'); + }).toThrow('"name" is required'); + }); + + it('should accept a string runId', () => { + expect(() => { + new WorkflowServerWritableStream('test-stream', 'run-123'); + }).not.toThrow(); + }); + + it('should accept a promise runId', () => { + expect(() => { + new WorkflowServerWritableStream( + 'test-stream', + Promise.resolve('run-123') + ); + }).not.toThrow(); + }); + }); + + describe('buffering behavior', () => { + it('should buffer chunks and flush after 10ms', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write first chunk + await writer.write(new Uint8Array([1, 2, 3])); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Write second chunk + await writer.write(new Uint8Array([4, 5, 6])); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should use writeToStreamMulti for multiple chunks + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])] + ); + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + await writer.close(); + }); + + it('should use writeToStream for single chunk', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write single chunk + await writer.write(new Uint8Array([1, 2, 3])); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should use writeToStream for single chunk (not writeToStreamMulti) + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + new Uint8Array([1, 2, 3]) + ); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + await writer.close(); + }); + + it('should fall back to sequential writes when writeToStreamMulti is unavailable', async () => { + // Remove writeToStreamMulti from mock world + delete (mockWorld as any).writeToStreamMulti; + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write multiple chunks + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Advance timer to trigger flush + await vi.advanceTimersByTimeAsync(10); + + // Should fall back to sequential writeToStream calls + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(2); + expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( + 1, + 'test-stream', + 'run-123', + new Uint8Array([1, 2, 3]) + ); + expect(mockWorld.writeToStream).toHaveBeenNthCalledWith( + 2, + 'test-stream', + 'run-123', + new Uint8Array([4, 5, 6]) + ); + + await writer.close(); + }); + + it('should flush remaining buffer on close', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks but don't wait for timer + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + + // Close should flush immediately without waiting for timer + await writer.close(); + + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); + expect(mockWorld.closeStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123' + ); + }); + + it('should not schedule multiple flush timers', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write multiple chunks rapidly + await writer.write(new Uint8Array([1])); + await writer.write(new Uint8Array([2])); + await writer.write(new Uint8Array([3])); + + // Advance timer once + await vi.advanceTimersByTimeAsync(10); + + // Should only call writeToStreamMulti once with all chunks + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([1]), new Uint8Array([2]), new Uint8Array([3])] + ); + + await writer.close(); + }); + + it('should handle multiple flush cycles', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // First batch + await writer.write(new Uint8Array([1, 2])); + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + + // Second batch + await writer.write(new Uint8Array([3, 4])); + await writer.write(new Uint8Array([5, 6])); + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + [new Uint8Array([3, 4]), new Uint8Array([5, 6])] + ); + + await writer.close(); + }); + + it('should wait for in-progress flush before adding to buffer', async () => { + // Create a slow writeToStreamMulti that we can control + let resolveWrite: () => void; + mockWorld.writeToStreamMulti.mockImplementation( + () => + new Promise((resolve) => { + resolveWrite = resolve; + }) + ); + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write and trigger flush + await writer.write(new Uint8Array([1, 2])); + await writer.write(new Uint8Array([3, 4])); + await vi.advanceTimersByTimeAsync(10); + + // Flush started but not completed + expect(mockWorld.writeToStreamMulti).toHaveBeenCalledTimes(1); + + // Write more while flush is in progress + const writePromise = writer.write(new Uint8Array([5, 6])); + + // Resolve the first flush + resolveWrite!(); + await writePromise; + + // Now advance timer to flush the new chunk + await vi.advanceTimersByTimeAsync(10); + + // Second flush should have happened + expect(mockWorld.writeToStream).toHaveBeenCalledTimes(1); + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'run-123', + new Uint8Array([5, 6]) + ); + + await writer.close(); + }); + }); + + describe('abort behavior', () => { + it('should clean up timer and discard buffer on abort', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Abort the stream + await writer.abort(new Error('Test abort')); + + // Advance timer - should NOT trigger flush since stream was aborted + await vi.advanceTimersByTimeAsync(10); + + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + expect(mockWorld.closeStream).not.toHaveBeenCalled(); + }); + }); + + describe('promise runId handling', () => { + it('should wait for runId promise before writing', async () => { + let resolveRunId: (value: string) => void; + const runIdPromise = new Promise((resolve) => { + resolveRunId = resolve; + }); + + const stream = new WorkflowServerWritableStream( + 'test-stream', + runIdPromise + ); + const writer = stream.getWriter(); + + // Write and trigger flush + await writer.write(new Uint8Array([1, 2, 3])); + await vi.advanceTimersByTimeAsync(10); + + // Write should not have happened yet because runId is not resolved + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + + // Resolve runId + resolveRunId!('resolved-run-123'); + await vi.advanceTimersByTimeAsync(0); // Let promises settle + + // Now the write should have happened + expect(mockWorld.writeToStream).toHaveBeenCalledWith( + 'test-stream', + 'resolved-run-123', + new Uint8Array([1, 2, 3]) + ); + + await writer.close(); + }); + }); + + describe('empty buffer handling', () => { + it('should not call write methods when buffer is empty on close', async () => { + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Close without writing anything + await writer.close(); + + expect(mockWorld.writeToStream).not.toHaveBeenCalled(); + expect(mockWorld.writeToStreamMulti).not.toHaveBeenCalled(); + expect(mockWorld.closeStream).toHaveBeenCalledTimes(1); + }); + }); + + describe('error handling', () => { + it('should propagate write errors from close', async () => { + // Make writeToStreamMulti fail + mockWorld.writeToStreamMulti.mockRejectedValue(new Error('Write failed')); + + const stream = new WorkflowServerWritableStream('test-stream', 'run-123'); + const writer = stream.getWriter(); + + // Write chunks (buffered, no error yet) + await writer.write(new Uint8Array([1, 2, 3])); + await writer.write(new Uint8Array([4, 5, 6])); + + // Close should propagate the error from flush + await expect(writer.close()).rejects.toThrow('Write failed'); + }); + }); +}); diff --git a/packages/world-local/src/streamer.test.ts b/packages/world-local/src/streamer.test.ts index 6fb9fb1915..50162aaaf7 100644 --- a/packages/world-local/src/streamer.test.ts +++ b/packages/world-local/src/streamer.test.ts @@ -194,6 +194,99 @@ describe('streamer', () => { }); }); + describe('writeToStreamMulti', () => { + it('should write multiple chunks in a single call', async () => { + const { testDir, streamer } = await setupStreamer(); + const streamName = 'multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, [ + 'chunk1', + 'chunk2', + 'chunk3', + ]); + + const chunksDir = path.join(testDir, 'streams', 'chunks'); + const files = await fs.readdir(chunksDir); + + expect(files).toHaveLength(3); + expect(files.every((f) => f.startsWith(`${streamName}-`))).toBe(true); + }); + + it('should preserve chunk ordering', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'ordered-multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, [ + 'first', + 'second', + 'third', + ]); + await streamer.closeStream(streamName, TEST_RUN_ID); + + const readable = await streamer.readFromStream(streamName); + const reader = readable.getReader(); + const decoder = new TextDecoder(); + const chunks: string[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(decoder.decode(value)); + } + + expect(chunks).toEqual(['first', 'second', 'third']); + }); + + it('should handle empty chunks array', async () => { + const { testDir, streamer } = await setupStreamer(); + const streamName = 'empty-multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, []); + + const chunksDir = path.join(testDir, 'streams', 'chunks'); + const dirExists = await fs + .access(chunksDir) + .then(() => true) + .catch(() => false); + + // Directory might not exist if no chunks were written + if (dirExists) { + const files = await fs.readdir(chunksDir); + const streamFiles = files.filter((f) => + f.startsWith(`${streamName}-`) + ); + expect(streamFiles).toHaveLength(0); + } + }); + + it('should handle mixed string and Uint8Array chunks', async () => { + const { streamer } = await setupStreamer(); + const streamName = 'mixed-multi-stream'; + + await streamer.writeToStreamMulti!(streamName, TEST_RUN_ID, [ + 'string-chunk', + new Uint8Array([1, 2, 3, 4]), + Buffer.from('buffer-chunk'), + ]); + await streamer.closeStream(streamName, TEST_RUN_ID); + + const readable = await streamer.readFromStream(streamName); + const reader = readable.getReader(); + const chunks: Uint8Array[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + + expect(chunks).toHaveLength(3); + expect(new TextDecoder().decode(chunks[0])).toBe('string-chunk'); + expect(chunks[1]).toEqual(new Uint8Array([1, 2, 3, 4])); + expect(new TextDecoder().decode(chunks[2])).toBe('buffer-chunk'); + }); + }); + describe('closeStream', () => { it('should close an empty stream', async () => { const { testDir, streamer } = await setupStreamer(); diff --git a/packages/world-local/src/streamer.ts b/packages/world-local/src/streamer.ts index bf22112d68..0be6abe708 100644 --- a/packages/world-local/src/streamer.ts +++ b/packages/world-local/src/streamer.ts @@ -87,6 +87,17 @@ export function createStreamer(basedir: string): Streamer { registeredStreams.add(cacheKey); } + // Helper to convert a chunk to a Buffer + function toBuffer(chunk: string | Uint8Array): Buffer { + if (typeof chunk === 'string') { + return Buffer.from(new TextEncoder().encode(chunk)); + } else if (chunk instanceof Buffer) { + return chunk; + } else { + return Buffer.from(chunk); + } + } + return { async writeToStream( name: string, @@ -105,14 +116,7 @@ export function createStreamer(basedir: string): Streamer { await registerStreamForRun(runId, name); // Convert chunk to buffer for serialization - let chunkBuffer: Buffer; - if (typeof chunk === 'string') { - chunkBuffer = Buffer.from(new TextEncoder().encode(chunk)); - } else if (chunk instanceof Buffer) { - chunkBuffer = chunk; - } else { - chunkBuffer = Buffer.from(chunk); - } + const chunkBuffer = toBuffer(chunk); const serialized = serializeChunk({ chunk: chunkBuffer, @@ -138,6 +142,64 @@ export function createStreamer(basedir: string): Streamer { }); }, + async writeToStreamMulti( + name: string, + _runId: string | Promise, + chunks: (string | Uint8Array)[] + ) { + if (chunks.length === 0) return; + + // Generate all ULIDs synchronously BEFORE any await to preserve call order. + // This ensures that chunks maintain their order even when runId is a promise. + const chunkIds = chunks.map(() => `chnk_${monotonicUlid()}`); + + // Await runId if it's a promise + const runId = await _runId; + + // Register this stream for the run + await registerStreamForRun(runId, name); + + // Prepare chunk data for parallel writes + const chunkBuffers = chunks.map((chunk) => toBuffer(chunk)); + + // Write all chunks in parallel for efficiency, but track individual completion + const writePromises = chunkBuffers.map(async (chunkBuffer, i) => { + const chunkId = chunkIds[i]; + + const serialized = serializeChunk({ + chunk: chunkBuffer, + eof: false, + }); + + const chunkPath = path.join( + basedir, + 'streams', + 'chunks', + `${name}-${chunkId}.json` + ); + + await write(chunkPath, serialized); + + // Return data needed for event emission + return { + chunkId, + chunkData: Uint8Array.from(chunkBuffer), + }; + }); + + // Emit events in order, waiting for each chunk's write to complete + // This ensures events are emitted in order while writes happen in parallel + for (const writePromise of writePromises) { + const { chunkId, chunkData } = await writePromise; + + streamEmitter.emit(`chunk:${name}` as const, { + streamName: name, + chunkData, + chunkId, + }); + } + }, + async closeStream(name: string, _runId: string | Promise) { // Generate ULID synchronously BEFORE any await to preserve call order. const chunkId = `chnk_${monotonicUlid()}`; diff --git a/packages/world-postgres/src/streamer.ts b/packages/world-postgres/src/streamer.ts index 750a6af434..bef4e2a37c 100644 --- a/packages/world-postgres/src/streamer.ts +++ b/packages/world-postgres/src/streamer.ts @@ -87,6 +87,10 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { }); }); + // Helper to convert chunk to Buffer + const toBuffer = (chunk: string | Uint8Array): Buffer => + !Buffer.isBuffer(chunk) ? Buffer.from(chunk) : chunk; + return { async writeToStream( name: string, @@ -101,7 +105,7 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { chunkId, streamId: name, runId, - chunkData: !Buffer.isBuffer(chunk) ? Buffer.from(chunk) : chunk, + chunkData: toBuffer(chunk), eof: false, }); postgres.notify( @@ -114,6 +118,44 @@ export function createStreamer(postgres: Sql, drizzle: Drizzle): Streamer { ) ); }, + + async writeToStreamMulti( + name: string, + _runId: string | Promise, + chunks: (string | Uint8Array)[] + ) { + if (chunks.length === 0) return; + + // Generate all chunk IDs up front to preserve ordering + const chunkIds = chunks.map(() => genChunkId()); + + // Await runId if it's a promise to ensure proper flushing + const runId = await _runId; + + // Batch insert all chunks in a single query + await drizzle.insert(streams).values( + chunks.map((chunk, i) => ({ + chunkId: chunkIds[i], + streamId: name, + runId, + chunkData: toBuffer(chunk), + eof: false, + })) + ); + + // Notify for each chunk (could be batched in future if needed) + for (const chunkId of chunkIds) { + postgres.notify( + STREAM_TOPIC, + JSON.stringify( + StreamPublishMessage.encode({ + chunkId, + streamId: name, + }) + ) + ); + } + }, async closeStream( name: string, _runId: string | Promise diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts new file mode 100644 index 0000000000..45193ce31c --- /dev/null +++ b/packages/world-vercel/src/streamer.test.ts @@ -0,0 +1,170 @@ +import { describe, expect, it } from 'vitest'; +import { encodeMultiChunks } from './streamer.js'; + +describe('encodeMultiChunks', () => { + /** + * Helper to decode length-prefixed chunks back to verify encoding + */ + function decodeMultiChunks(encoded: Uint8Array): Uint8Array[] { + const chunks: Uint8Array[] = []; + const view = new DataView( + encoded.buffer, + encoded.byteOffset, + encoded.byteLength + ); + let offset = 0; + + while (offset < encoded.length) { + const length = view.getUint32(offset, false); // big-endian + offset += 4; + chunks.push(encoded.slice(offset, offset + length)); + offset += length; + } + + return chunks; + } + + it('should encode an empty array', () => { + const result = encodeMultiChunks([]); + expect(result.length).toBe(0); + }); + + it('should encode a single string chunk', () => { + const result = encodeMultiChunks(['hello']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(1); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + }); + + it('should encode a single Uint8Array chunk', () => { + const chunk = new Uint8Array([1, 2, 3, 4, 5]); + const result = encodeMultiChunks([chunk]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(1); + expect(decoded[0]).toEqual(chunk); + }); + + it('should encode multiple string chunks', () => { + const result = encodeMultiChunks(['hello', 'world', 'test']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + expect(new TextDecoder().decode(decoded[1])).toBe('world'); + expect(new TextDecoder().decode(decoded[2])).toBe('test'); + }); + + it('should encode multiple Uint8Array chunks', () => { + const chunks = [ + new Uint8Array([1, 2, 3]), + new Uint8Array([4, 5]), + new Uint8Array([6, 7, 8, 9]), + ]; + const result = encodeMultiChunks(chunks); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(decoded[0]).toEqual(chunks[0]); + expect(decoded[1]).toEqual(chunks[1]); + expect(decoded[2]).toEqual(chunks[2]); + }); + + it('should encode mixed string and Uint8Array chunks', () => { + const result = encodeMultiChunks([ + 'hello', + new Uint8Array([1, 2, 3]), + 'world', + ]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + expect(decoded[1]).toEqual(new Uint8Array([1, 2, 3])); + expect(new TextDecoder().decode(decoded[2])).toBe('world'); + }); + + it('should handle empty string chunks', () => { + const result = encodeMultiChunks(['', 'hello', '']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(decoded[0].length).toBe(0); + expect(new TextDecoder().decode(decoded[1])).toBe('hello'); + expect(decoded[2].length).toBe(0); + }); + + it('should handle empty Uint8Array chunks', () => { + const result = encodeMultiChunks([ + new Uint8Array([]), + new Uint8Array([1, 2]), + new Uint8Array([]), + ]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(decoded[0].length).toBe(0); + expect(decoded[1]).toEqual(new Uint8Array([1, 2])); + expect(decoded[2].length).toBe(0); + }); + + it('should correctly calculate total size with length prefixes', () => { + // Each chunk has a 4-byte length prefix + // 'hello' = 5 bytes, 'world' = 5 bytes + // Total = 4 + 5 + 4 + 5 = 18 bytes + const result = encodeMultiChunks(['hello', 'world']); + expect(result.length).toBe(18); + }); + + it('should use big-endian encoding for length prefix', () => { + const result = encodeMultiChunks(['hello']); + const view = new DataView( + result.buffer, + result.byteOffset, + result.byteLength + ); + + // 'hello' is 5 bytes, big-endian encoding of 5 is [0, 0, 0, 5] + expect(view.getUint32(0, false)).toBe(5); + expect(result[0]).toBe(0); + expect(result[1]).toBe(0); + expect(result[2]).toBe(0); + expect(result[3]).toBe(5); + }); + + it('should handle large chunks', () => { + // Create a 10KB chunk + const largeChunk = new Uint8Array(10 * 1024); + for (let i = 0; i < largeChunk.length; i++) { + largeChunk[i] = i % 256; + } + + const result = encodeMultiChunks([largeChunk]); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(1); + expect(decoded[0]).toEqual(largeChunk); + }); + + it('should handle many small chunks', () => { + const chunks = Array.from({ length: 100 }, (_, i) => `chunk${i}`); + const result = encodeMultiChunks(chunks); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(100); + decoded.forEach((chunk, i) => { + expect(new TextDecoder().decode(chunk)).toBe(`chunk${i}`); + }); + }); + + it('should handle unicode strings correctly', () => { + const result = encodeMultiChunks(['hello', 'δΈ–η•Œ', 'πŸš€']); + const decoded = decodeMultiChunks(result); + + expect(decoded).toHaveLength(3); + expect(new TextDecoder().decode(decoded[0])).toBe('hello'); + expect(new TextDecoder().decode(decoded[1])).toBe('δΈ–η•Œ'); + expect(new TextDecoder().decode(decoded[2])).toBe('πŸš€'); + }); +}); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index 017b3eaed1..a6127ff99b 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -14,6 +14,43 @@ function getStreamUrl( return new URL(`${httpConfig.baseUrl}/v2/stream/${encodeURIComponent(name)}`); } +/** + * Encode multiple chunks into a length-prefixed binary format. + * Format: [4 bytes big-endian length][chunk bytes][4 bytes length][chunk bytes]... + * + * This preserves chunk boundaries so the server can store them as separate + * chunks, maintaining correct startIndex semantics for readers. + * + * @internal Exported for testing purposes + */ +export function encodeMultiChunks(chunks: (string | Uint8Array)[]): Uint8Array { + const encoder = new TextEncoder(); + + // Convert all chunks to Uint8Array and calculate total size + const binaryChunks: Uint8Array[] = []; + let totalSize = 0; + + for (const chunk of chunks) { + const binary = typeof chunk === 'string' ? encoder.encode(chunk) : chunk; + binaryChunks.push(binary); + totalSize += 4 + binary.length; // 4 bytes for length prefix + } + + // Allocate buffer and write length-prefixed chunks + const result = new Uint8Array(totalSize); + const view = new DataView(result.buffer); + let offset = 0; + + for (const binary of binaryChunks) { + view.setUint32(offset, binary.length, false); // big-endian + offset += 4; + result.set(binary, offset); + offset += binary.length; + } + + return result; +} + export function createStreamer(config?: APIConfig): Streamer { return { async writeToStream( @@ -33,6 +70,30 @@ export function createStreamer(config?: APIConfig): Streamer { }); }, + async writeToStreamMulti( + name: string, + runId: string | Promise, + chunks: (string | Uint8Array)[] + ) { + if (chunks.length === 0) return; + + // Await runId if it's a promise to ensure proper flushing + const resolvedRunId = await runId; + + const httpConfig = await getHttpConfig(config); + + // Signal to server that this is a multi-chunk batch + httpConfig.headers.set('X-Stream-Multi', 'true'); + + const body = encodeMultiChunks(chunks); + await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { + method: 'PUT', + body, + headers: httpConfig.headers, + duplex: 'half', + }); + }, + async closeStream(name: string, runId: string | Promise) { // Await runId if it's a promise to ensure proper flushing const resolvedRunId = await runId; diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index f2a5802809..c737a619e5 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -29,6 +29,24 @@ export interface Streamer { runId: string | Promise, chunk: string | Uint8Array ): Promise; + + /** + * Write multiple chunks to a stream in a single operation. + * This is an optional optimization for world implementations that can + * batch multiple writes efficiently (e.g., single HTTP request for world-vercel). + * + * If not implemented, the caller should fall back to sequential writeToStream() calls. + * + * @param name - The stream name + * @param runId - The run ID (can be a promise) + * @param chunks - Array of chunks to write, in order + */ + writeToStreamMulti?( + name: string, + runId: string | Promise, + chunks: (string | Uint8Array)[] + ): Promise; + closeStream(name: string, runId: string | Promise): Promise; readFromStream( name: string,