Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/hip-candles-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
"@workflow/world-local": patch
"@workflow/world": patch
"@workflow/core": patch
---

Add optional `writeToStreamMulti` function to the World interface
80 changes: 78 additions & 2 deletions packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ export class WorkflowServerReadableStream extends ReadableStream<Uint8Array> {
}
}

/**
* Default flush interval in milliseconds for buffered stream writes.
* Chunks are accumulated and flushed together to reduce network overhead.
*/
Comment thread
TooTallNate marked this conversation as resolved.
const STREAM_FLUSH_INTERVAL_MS = 10;

export class WorkflowServerWritableStream extends WritableStream<Uint8Array> {
constructor(name: string, runId: string | Promise<string>) {
// runId can be a promise, because we need a runID to write to a stream,
Expand All @@ -287,15 +293,85 @@ export class WorkflowServerWritableStream extends WritableStream<Uint8Array> {
throw new Error(`"name" is required, got "${name}"`);
}
const world = getWorld();

// Buffering state for batched writes
let buffer: Uint8Array[] = [];
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory usage consideration: The buffer accumulates chunks for up to 10ms before flushing. For high-throughput streams with large chunks, this could lead to significant memory accumulation. While this is acceptable for the intended use case (LLM token streaming with small chunks), consider documenting this behavior or adding a maximum buffer size limit to prevent unbounded memory growth in edge cases.

Copilot uses AI. Check for mistakes.
let flushTimer: ReturnType<typeof setTimeout> | null = null;
let flushPromise: Promise<void> | null = null;

const flush = async (): Promise<void> => {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}

if (buffer.length === 0) return;

// Copy chunks to flush, but don't clear buffer until write succeeds
// This prevents data loss if the write operation fails
const chunksToFlush = buffer.slice();

const _runId = await runId;

// Use writeToStreamMulti if available for batch writes
if (
typeof world.writeToStreamMulti === 'function' &&
chunksToFlush.length > 1
) {
await world.writeToStreamMulti(name, _runId, chunksToFlush);
} else {
// Fall back to sequential writes
for (const chunk of chunksToFlush) {
await world.writeToStream(name, _runId, chunk);
}
Comment thread
vercel[bot] marked this conversation as resolved.
}

// Only clear buffer after successful write to prevent data loss
buffer = [];
};

const scheduleFlush = (): void => {
if (flushTimer) return; // Already scheduled

flushTimer = setTimeout(() => {
flushTimer = null;
flushPromise = flush();
}, STREAM_FLUSH_INTERVAL_MS);
Comment on lines +336 to +339
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling: If flush() throws an error (e.g., from writeToStreamMulti or writeToStream), the error will be thrown from the setTimeout callback in an unhandled way, which could crash the process or cause silent failures. The flushPromise should be awaited somewhere or the error should be caught and handled appropriately. Consider wrapping the flush() call in a try-catch or ensuring errors propagate to the write/close methods that trigger the flush.

Copilot uses AI. Check for mistakes.
};

Comment thread
TooTallNate marked this conversation as resolved.
super({
async write(chunk) {
const _runId = await runId;
await world.writeToStream(name, _runId, chunk);
// Wait for any in-progress flush to complete before adding to buffer
if (flushPromise) {
await flushPromise;
flushPromise = null;
}

buffer.push(chunk);
scheduleFlush();
},
async close() {
// Wait for any in-progress flush to complete
if (flushPromise) {
Comment thread
TooTallNate marked this conversation as resolved.
await flushPromise;
flushPromise = null;
}

// Flush any remaining buffered chunks
await flush();

const _runId = await runId;
await world.closeStream(name, _runId);
},
abort() {
// Clean up timer to prevent leaks
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
// Discard buffered chunks - they won't be written
buffer = [];
},
});
}
}
Expand Down
Loading