Add optional writeToStreamMulti function to the World interface#867
Conversation
🦋 Changeset detectedLatest commit: 4d74746 The changes in this PR will be included in the next version bump. This PR includes changesets to release 17 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (161 failed)mongodb (40 failed):
redis (40 failed):
starter (41 failed):
turso (40 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
|
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
|
There was a problem hiding this comment.
Pull request overview
This pull request adds an optional writeToStreamMulti method to the World interface to optimize batch writing of multiple chunks to streams. This addresses issue #764 where LLM streaming responses were experiencing significant delays (3+ minutes for ~2000 tokens) in the Vercel world implementation due to the overhead of individual HTTP requests for each small chunk.
Changes:
- Added optional
writeToStreamMultimethod to theStreamerinterface for batch chunk operations - Implemented buffering in
WorkflowServerWritableStreamwith a 10ms flush interval to accumulate chunks before writing - Implemented
writeToStreamMultiin all world providers (local, postgres, vercel) with provider-specific optimizations
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/world/src/interfaces.ts | Adds optional writeToStreamMulti method to Streamer interface with comprehensive documentation |
| packages/core/src/serialization.ts | Implements buffering logic with 10ms flush intervals, uses writeToStreamMulti when available and falls back to sequential writes |
| packages/core/src/writable-stream.test.ts | Comprehensive test coverage for buffering, abort handling, and fallback behavior |
| packages/world-vercel/src/streamer.ts | Implements multi-chunk encoding as length-prefixed binary format with X-Stream-Multi header |
| packages/world-vercel/src/streamer.test.ts | Thorough tests for encoding logic including edge cases and unicode handling |
| packages/world-postgres/src/streamer.ts | Implements batch insert for multiple chunks with sequential notifications |
| packages/world-local/src/streamer.ts | Implements parallel writes with ordered event emission |
| packages/world-local/src/streamer.test.ts | Tests for chunk ordering, empty arrays, and mixed chunk types |
| .changeset/hip-candles-kick.md | Documents changes to all affected packages |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Notify for each chunk (could be batched in future if needed) | ||
| for (const chunkId of chunkIds) { | ||
| postgres.notify( | ||
| STREAM_TOPIC, | ||
| JSON.stringify( | ||
| StreamPublishMessage.encode({ | ||
| chunkId, | ||
| streamId: name, | ||
| }) | ||
| ) | ||
| ); | ||
| } |
There was a problem hiding this comment.
Performance consideration: The batch insert is efficient, but the notifications are still sent sequentially in a loop. Consider whether postgres.notify supports batch operations or if these notifications could be sent in parallel using Promise.all() for better performance when dealing with many chunks.
| // Notify for each chunk (could be batched in future if needed) | |
| for (const chunkId of chunkIds) { | |
| postgres.notify( | |
| STREAM_TOPIC, | |
| JSON.stringify( | |
| StreamPublishMessage.encode({ | |
| chunkId, | |
| streamId: name, | |
| }) | |
| ) | |
| ); | |
| } | |
| // Notify for each chunk in parallel | |
| await Promise.all( | |
| chunkIds.map((chunkId) => | |
| postgres.notify( | |
| STREAM_TOPIC, | |
| JSON.stringify( | |
| StreamPublishMessage.encode({ | |
| chunkId, | |
| streamId: name, | |
| }) | |
| ) | |
| ) | |
| ) | |
| ); |
| flushTimer = setTimeout(() => { | ||
| flushTimer = null; | ||
| flushPromise = flush(); | ||
| }, STREAM_FLUSH_INTERVAL_MS); |
There was a problem hiding this comment.
Missing error handling: If flush() throws an error (e.g., from writeToStreamMulti or writeToStream), the error will be thrown from the setTimeout callback in an unhandled way, which could crash the process or cause silent failures. The flushPromise should be awaited somewhere or the error should be caught and handled appropriately. Consider wrapping the flush() call in a try-catch or ensuring errors propagate to the write/close methods that trigger the flush.
| // Signal to server that this is a multi-chunk batch | ||
| httpConfig.headers.set('X-Stream-Multi', 'true'); | ||
|
|
||
| const body = encodeMultiChunks(chunks); | ||
| await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { | ||
| method: 'PUT', | ||
| body, | ||
| headers: httpConfig.headers, | ||
| duplex: 'half', | ||
| }); |
There was a problem hiding this comment.
The writeToStreamMulti implementation sends an X-Stream-Multi header to signal a batch write, but there's no corresponding server-side implementation in this PR to handle this header and decode the length-prefixed format. If the server doesn't recognize this header, it will likely treat the entire encoded payload as a single chunk rather than multiple chunks, breaking the streaming semantics and causing data corruption. This PR should either include the server-side changes or document that they must be deployed first.
| // Signal to server that this is a multi-chunk batch | |
| httpConfig.headers.set('X-Stream-Multi', 'true'); | |
| const body = encodeMultiChunks(chunks); | |
| await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { | |
| method: 'PUT', | |
| body, | |
| headers: httpConfig.headers, | |
| duplex: 'half', | |
| }); | |
| // Fallback implementation: send each chunk as an individual write | |
| // using the same semantics as writeToStream. This avoids relying on | |
| // any server-side support for X-Stream-Multi or custom encodings. | |
| for (const chunk of chunks) { | |
| await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { | |
| method: 'PUT', | |
| body: chunk, | |
| headers: httpConfig.headers, | |
| duplex: 'half', | |
| }); | |
| } |
| await fetch(getStreamUrl(name, resolvedRunId, httpConfig), { | ||
| method: 'PUT', | ||
| body, | ||
| headers: httpConfig.headers, | ||
| duplex: 'half', | ||
| }); |
There was a problem hiding this comment.
Missing error handling: The fetch request doesn't check the response status. If the server returns an error (e.g., 4xx or 5xx), it will silently fail without throwing an error. This should check res.ok similar to how readFromStream and listStreamsByRunId do, and throw an appropriate error message.
| const world = getWorld(); | ||
|
|
||
| // Buffering state for batched writes | ||
| let buffer: Uint8Array[] = []; |
There was a problem hiding this comment.
Memory usage consideration: The buffer accumulates chunks for up to 10ms before flushing. For high-throughput streams with large chunks, this could lead to significant memory accumulation. While this is acceptable for the intended use case (LLM token streaming with small chunks), consider documenting this behavior or adding a maximum buffer size limit to prevent unbounded memory growth in edge cases.
VaguelySerious
left a comment
There was a problem hiding this comment.
LGTM, can we add one e2e test for this? I didn't validate the postgres solution e2e
- Add abort() handler to WorkflowServerWritableStream to clean up timer and discard buffer on abort (prevents leaks) - Add comprehensive unit tests for WorkflowServerWritableStream buffering logic: flush timing, concurrent writes, writeToStreamMulti fallback, promise runId handling, and abort behavior - Add unit tests for encodeMultiChunks in world-vercel - Export encodeMultiChunks for testing purposes
The vi.mock() call was being hoisted and affecting other tests in serialization.test.ts. Moving to a separate file isolates the mock.

Added an optional
writeToStreamMultifunction to the World interface to optimize batch writing of multiple chunks to a stream.This is an alternative solution to the issue reported in #764.
What changed?
writeToStreamMultimethod to theStreamerinterface in the World APIWorkflowServerWritableStreamto batch chunks before flushingwriteToStreamMultiin all world providers:world-local: Writes multiple chunks in parallel while preserving orderworld-postgres: Performs a batch insert for all chunksworld-vercel: Encodes multiple chunks into a length-prefixed binary formatSTREAM_FLUSH_INTERVAL_MS(10ms) to control buffer flush timingWhy make this change?
This optimization reduces network overhead when writing many small chunks to a stream by batching them together. This is particularly beneficial for:
The implementation gracefully falls back to sequential writes for world implementations that don't support the new method.