diff --git a/.changeset/all-fans-march.md b/.changeset/all-fans-march.md new file mode 100644 index 0000000000..c845896d79 --- /dev/null +++ b/.changeset/all-fans-march.md @@ -0,0 +1,7 @@ +--- +"@cloudflare/workflows-shared": minor +--- + +Add support for ReadableStream on workflow steps. This allows users to overcome the 1MB limit per step output. + +`ReadableStream` is already serializable on the workers platform. This feature makes it native to workflows as well by persisting each chunk and replaying it if needed diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index f263b08be3..b3e35c0fc0 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -4,11 +4,25 @@ import { INSTANCE_METADATA, InstanceEvent, InstanceStatus } from "./instance"; import { computeHash } from "./lib/cache"; import { ABORT_REASONS, + InvalidStepReadableStreamError, + OversizedStreamChunkError, + StreamOutputStorageLimitError, + UnsupportedStreamChunkError, WorkflowFatalError, WorkflowInternalError, WorkflowTimeoutError, } from "./lib/errors"; import { calcRetryDuration } from "./lib/retries"; +import { + cleanupPendingStreamOutput, + createReplayReadableStream, + getInvalidStoredStreamOutputError, + getStreamOutputMetaKey, + isReadableStreamLike, + rollbackStreamOutput, + StreamOutputState, + writeStreamOutput, +} from "./lib/streams"; import { isValidStepConfig, isValidStepName, @@ -17,6 +31,7 @@ import { import { MODIFIER_KEYS } from "./modifier"; import type { Engine } from "./engine"; import type { InstanceMetadata } from "./instance"; +import type { StreamOutputMeta } from "./lib/streams"; import type { WorkflowSleepDuration, WorkflowStepConfig, @@ -171,6 +186,7 @@ export class Context extends RpcTarget { const cacheKey = `${hash}-${count}`; const valueKey = `${cacheKey}-value`; + const streamMetaKey = getStreamOutputMetaKey(cacheKey); const configKey = `${cacheKey}-config`; const errorKey = `${cacheKey}-error`; const stepNameWithCounter = `${name}-${count}`; @@ -179,15 +195,43 @@ export class Context extends RpcTarget { const maybeMap = await this.#state.storage.get([ valueKey, + streamMetaKey, configKey, errorKey, ]); - // Check cache + // Check cache -- streams first, then plain values + const maybeStreamMeta = maybeMap.get(streamMetaKey) as + | StreamOutputMeta + | undefined + | null; + if (maybeStreamMeta?.state === StreamOutputState.Complete) { + const maybeOutputError = getInvalidStoredStreamOutputError( + this.#state.storage, + cacheKey, + maybeStreamMeta + ); + if (maybeOutputError !== undefined) { + throw new WorkflowInternalError( + `Stored output for ${stepNameWithCounter} is corrupt or incomplete.` + ); + } + + return createReplayReadableStream({ + storage: this.#state.storage, + cacheKey, + meta: maybeStreamMeta, + }) as T; + } else if (maybeStreamMeta !== undefined && maybeStreamMeta !== null) { + // We're not in a complete state - means we crashed while persisting a stream on a previous invocation - need to cleanup + await cleanupPendingStreamOutput(this.#state.storage, cacheKey).catch( + () => {} + ); + } + const maybeResult = maybeMap.get(valueKey); if (maybeResult) { - // console.log(`Cache hit for ${cacheKey}`); return (maybeResult as { value: T }).value; } @@ -264,6 +308,12 @@ export class Context extends RpcTarget { )) as StepState) ?? { attemptedCount: 0, }; + + // NOTE(caio): this might be a stream returning step - if so cleanup stale data from previous lifetimes + await cleanupPendingStreamOutput(this.#state.storage, cacheKey).catch( + () => {} + ); + await this.#engine.timeoutHandler.acquire(this.#engine); if (stepState.attemptedCount == 0) { @@ -314,8 +364,16 @@ export class Context extends RpcTarget { } const { accountId, instance } = instanceMetadata; + let streamResultSeen = false; + let lastStreamMeta: StreamOutputMeta | undefined; + const abortController = new AbortController(); + const stepExecutionSignal = AbortSignal.any([ + abortController.signal, + this.#engine.engineAbortController.signal, + ]); + try { - const timeoutPromise = async () => { + const timeoutPromise = async (): Promise => { const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; let timeout = ms(config.timeout); if (forceStepTimeout) { @@ -335,9 +393,11 @@ export class Context extends RpcTarget { hash: priorityQueueHash, type: "timeout", }); - throw new WorkflowTimeoutError( + const error = new WorkflowTimeoutError( `Execution timed out after ${timeout}ms` ); + abortController.abort(error); + throw error; }; this.#engine.writeLog( @@ -382,31 +442,163 @@ export class Context extends RpcTarget { ); const forceStepTimeout = persistentStepTimeout || transientStepTimeout; + let timeoutTask: Promise | undefined; + + const persistStepResult = async ( + value: unknown, + activeTimeoutTask?: Promise + ): Promise => { + if (!isReadableStreamLike(value)) { + await this.#state.storage.put(valueKey, { value }); + abortController.abort("step finished"); + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "timeout", + }); + return value; + } + + streamResultSeen = true; + const streamMeta = await writeStreamOutput({ + storage: this.#state.storage, + cacheKey, + attempt: stepState.attemptedCount, + stream: value as ReadableStream, + signal: stepExecutionSignal, + timeoutTask: activeTimeoutTask, + }); + lastStreamMeta = streamMeta; + + abortController.abort("step finished"); + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "timeout", + }); + return createReplayReadableStream({ + storage: this.#state.storage, + cacheKey, + meta: streamMeta, + }); + }; + if (forceStepTimeout) { result = await timeoutPromise(); } else if (replaceResult) { - result = replaceResult; - // if there is a timeout to be forced we dont want to race with closure + // Check if the mocked result is a stream sentinel (from mockStepResult with ReadableStream) + if ( + replaceResult && + typeof replaceResult === "object" && + (replaceResult as Record).__mockStreamOutput + ) { + const sentinel = replaceResult as { + __mockStreamOutput: true; + cacheKey: string; + meta: StreamOutputMeta; + }; + result = createReplayReadableStream({ + storage: this.#state.storage, + cacheKey: sentinel.cacheKey, + meta: sentinel.meta, + }); + } else { + result = replaceResult; + } } else { + timeoutTask = timeoutPromise(); result = await Promise.race([ doWrapperClosure({ attempt: stepState.attemptedCount }), - timeoutPromise(), + timeoutTask, ]); } - // if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ - // @ts-expect-error priorityQueue is initiated in init - await this.#engine.priorityQueue.remove({ - hash: priorityQueueHash, - type: "timeout", - }); - // We store the value of `output` in an object with a `value` property. This allows us to store `undefined`, // in the case that it's returned from the user's code. This is because DO storage will error if you try to // store undefined directly. try { - await this.#state.storage.put(valueKey, { value: result }); + result = await persistStepResult(result, timeoutTask); } catch (e) { + abortController.abort("step errored"); + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "timeout", + }); + + if (e instanceof WorkflowTimeoutError) { + throw e; + } + + // Stream-specific fatal errors + if ( + e instanceof InvalidStepReadableStreamError || + e instanceof OversizedStreamChunkError || + e instanceof UnsupportedStreamChunkError + ) { + this.#engine.writeLog( + InstanceEvent.ATTEMPT_FAILURE, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + error: new WorkflowFatalError(e.message), + } + ); + this.#engine.writeLog( + InstanceEvent.STEP_FAILURE, + cacheKey, + stepNameWithCounter, + {} + ); + this.#engine.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, { + error: new WorkflowFatalError( + `The execution of the Workflow instance was terminated, as the step "${name}" returned an invalid ReadableStream output. ${e.message}` + ), + }); + + await this.#engine.setStatus( + accountId, + instance.id, + InstanceStatus.Errored + ); + await this.#engine.timeoutHandler.release(this.#engine); + await this.#engine.abort(ABORT_REASONS.NOT_SERIALISABLE); + return; + } + + if (e instanceof StreamOutputStorageLimitError) { + this.#engine.writeLog( + InstanceEvent.ATTEMPT_FAILURE, + cacheKey, + stepNameWithCounter, + { + attempt: stepState.attemptedCount, + error: new WorkflowFatalError(e.message), + } + ); + this.#engine.writeLog( + InstanceEvent.STEP_FAILURE, + cacheKey, + stepNameWithCounter, + {} + ); + this.#engine.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, { + error: new WorkflowFatalError( + "The instance has exceeded the 1GiB storage limit" + ), + }); + + await this.#engine.setStatus( + accountId, + instance.id, + InstanceStatus.Errored + ); + await this.#engine.timeoutHandler.release(this.#engine); + await this.#engine.abort(ABORT_REASONS.STORAGE_LIMIT_EXCEEDED); + return; + } + // something that cannot be written to storage if (e instanceof Error && e.name === "DataCloneError") { this.#engine.writeLog( @@ -455,6 +647,13 @@ export class Context extends RpcTarget { return; } + // if we reach here, means that the closure ran successfully and we can remove the timeout from the PQ + // @ts-expect-error priorityQueue is initiated in init + this.#engine.priorityQueue.remove({ + hash: priorityQueueHash, + type: "timeout", + }); + this.#engine.writeLog( InstanceEvent.ATTEMPT_SUCCESS, cacheKey, @@ -465,13 +664,26 @@ export class Context extends RpcTarget { ); } catch (e) { const error = e as Error; - // if we reach here, means that the clouse ran but errored out and we can remove the timeout from the PQ + // if we reach here, means that the closure ran but errored out and we can remove the timeout from the PQ // @ts-expect-error priorityQueue is initiated in init this.#engine.priorityQueue.remove({ hash: `${cacheKey}-${stepState.attemptedCount}`, type: "timeout", }); + // Clean up any partial stream output from this failed attempt + if (streamResultSeen) { + try { + await rollbackStreamOutput( + this.#state.storage, + cacheKey, + stepState.attemptedCount + ); + } catch { + // Best-effort cleanup + } + } + if ( e instanceof Error && (error.name === "NonRetryableError" || @@ -574,6 +786,16 @@ export class Context extends RpcTarget { return doWrapper(doWrapperClosure); } else { await this.#engine.timeoutHandler.release(this.#engine); + // Clean up any leftover stream chunks on retry exhaustion + try { + await rollbackStreamOutput( + this.#state.storage, + cacheKey, + stepState.attemptedCount + ); + } catch { + // Best-effort cleanup + } this.#engine.writeLog( InstanceEvent.STEP_FAILURE, cacheKey, @@ -592,7 +814,10 @@ export class Context extends RpcTarget { stepNameWithCounter, { // TODO (WOR-86): Add limits, figure out serialization - result, + result: lastStreamMeta ? undefined : result, + ...(lastStreamMeta && { + streamOutput: { cacheKey, meta: lastStreamMeta }, + }), } ); await this.#engine.timeoutHandler.release(this.#engine); diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index 06c3f0359b..168dc2a532 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -20,6 +20,12 @@ import { GracePeriodSemaphore, startGracePeriod, } from "./lib/gracePeriodSemaphore"; +import { + createReplayReadableStream, + getInvalidStoredStreamOutputError, + getStoredStreamOutputPreview, + StreamOutputState, +} from "./lib/streams"; import { TimePriorityQueue } from "./lib/timePriorityQueue"; import { isModifierKey, @@ -28,9 +34,11 @@ import { } from "./modifier"; import type { Event } from "./context"; import type { InstanceMetadata, RawInstanceLog } from "./instance"; +import type { StreamOutputMeta } from "./lib/streams"; import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers"; interface Env { + ENGINE: DurableObjectNamespace; USER_WORKFLOW: WorkflowEntrypoint; STEP_LIMIT?: string; // JSON-encoded number from miniflare binding } @@ -130,15 +138,22 @@ export class Engine extends DurableObject { CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1 UNIQUE (action, entryType, hash) ); - CREATE TABLE IF NOT EXISTS states ( - id INTEGER PRIMARY KEY NOT NULL, - timestamp TIMESTAMP DEFAULT (DATETIME('now','subsec')), - groupKey TEXT, - target TEXT, - metadata TEXT, - event INTEGER NOT NULL - ) - `); + CREATE TABLE IF NOT EXISTS states ( + id INTEGER PRIMARY KEY NOT NULL, + timestamp TIMESTAMP DEFAULT (DATETIME('now','subsec')), + groupKey TEXT, + target TEXT, + metadata TEXT, + event INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS streaming_step_chunks ( + cache_key TEXT NOT NULL, + attempt INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + chunk BLOB NOT NULL, + PRIMARY KEY (cache_key, attempt, chunk_index) + ) WITHOUT ROWID + `); } catch (e) { console.error(e); throw e; @@ -187,11 +202,38 @@ export class Engine extends DurableObject { ]; return { - logs: logs.map((log) => ({ - ...log, - metadata: JSON.parse(log.metadata), - group: log.groupKey, - })), + logs: logs.map((log) => { + const metadata = JSON.parse(log.metadata); + + if ( + log.event !== InstanceEvent.STEP_SUCCESS || + !metadata.streamOutput + ) { + return { ...log, metadata, group: log.groupKey }; + } + + const { cacheKey, meta } = metadata.streamOutput as { + cacheKey: string; + meta: StreamOutputMeta; + }; + try { + const preview = getStoredStreamOutputPreview({ + storage: this.ctx.storage, + cacheKey, + meta, + maxChars: 1024, + }); + metadata.result = + preview.type === "text" + ? preview.output + : `[ReadableStream (binary): ${meta.totalBytes} bytes]`; + } catch { + metadata.result = `[ReadableStream: ${meta.totalBytes} bytes]`; + } + delete metadata.streamOutput; + + return { ...log, metadata, group: log.groupKey }; + }), }; } @@ -220,14 +262,49 @@ export class Engine extends DurableObject { ), ]; - return rows.map((row) => ({ - id: row.id, - timestamp: String(row.timestamp).replace(" ", "T") + "Z", - event: row.event, - group: row.groupKey, - target: row.target, - metadata: JSON.parse(row.metadata) as Record, - })); + return rows.map((row) => { + const metadata = JSON.parse(row.metadata) as Record; + + if (row.event !== InstanceEvent.STEP_SUCCESS || !metadata.streamOutput) { + return { + id: row.id, + timestamp: String(row.timestamp).replace(" ", "T") + "Z", + event: row.event, + group: row.groupKey, + target: row.target, + metadata, + }; + } + + const { cacheKey, meta } = metadata.streamOutput as { + cacheKey: string; + meta: StreamOutputMeta; + }; + try { + const preview = getStoredStreamOutputPreview({ + storage: this.ctx.storage, + cacheKey, + meta, + maxChars: 1024, + }); + metadata.result = + preview.type === "text" + ? preview.output + : `[ReadableStream (binary): ${meta.totalBytes} bytes]`; + } catch { + metadata.result = `[ReadableStream: ${meta.totalBytes} bytes]`; + } + delete metadata.streamOutput; + + return { + id: row.id, + timestamp: String(row.timestamp).replace(" ", "T") + "Z", + event: row.event, + group: row.groupKey, + target: row.target, + metadata, + }; + }); } readLogsFromEvent(eventType: InstanceEvent): EngineLogs { @@ -426,6 +503,32 @@ export class Engine extends DurableObject { } } + /** + * Create a replay ReadableStream from stored stream output metadata. + * Returns undefined if the stream data is not in a valid/complete state. + */ + private replayStreamFromMeta(streamOutput: { + cacheKey: string; + meta: StreamOutputMeta; + }): ReadableStream | undefined { + if (streamOutput.meta.state !== StreamOutputState.Complete) { + return undefined; + } + const integrityError = getInvalidStoredStreamOutputError( + this.ctx.storage, + streamOutput.cacheKey, + streamOutput.meta + ); + if (integrityError !== undefined) { + return undefined; + } + return createReplayReadableStream({ + storage: this.ctx.storage, + cacheKey: streamOutput.cacheKey, + meta: streamOutput.meta, + }); + } + private stepResultWaiters: Map< string, { resolve: (v: unknown) => void; reject: (e: unknown) => void } @@ -453,6 +556,9 @@ export class Engine extends DurableObject { const { event, metadata } = rows[0]; const parsed = JSON.parse(metadata); if (event === InstanceEvent.STEP_SUCCESS) { + if (parsed?.streamOutput) { + return this.replayStreamFromMeta(parsed.streamOutput); + } return parsed?.result; } if (event === InstanceEvent.STEP_FAILURE) { @@ -476,8 +582,18 @@ export class Engine extends DurableObject { return; } if (event === InstanceEvent.STEP_SUCCESS) { - const result = metadata?.result; - waiter.resolve(result); + if (metadata?.streamOutput) { + waiter.resolve( + this.replayStreamFromMeta( + metadata.streamOutput as { + cacheKey: string; + meta: StreamOutputMeta; + } + ) + ); + } else { + waiter.resolve(metadata?.result); + } this.stepResultWaiters.delete(group); } else if (event === InstanceEvent.STEP_FAILURE) { const error = metadata?.error ?? new Error("Step failed"); @@ -800,6 +916,13 @@ export class Engine extends DurableObject { async attemptRestart() { this.ctx.storage.sql.exec("DELETE FROM states"); this.ctx.storage.sql.exec("DELETE FROM priority_queue"); + // Only delete non-mock streaming chunks. Mock stream outputs are stored + // at attempt=0 (see modifier.ts mockStepResult) and their sentinels + // survive restart via isModifierKey(), so the underlying SQL rows must + // be preserved too. + this.ctx.storage.sql.exec( + "DELETE FROM streaming_step_chunks WHERE attempt != 0" + ); const allKeys = await this.ctx.storage.list(); const preservedEventMapKeys = this.getMockedEventMapKeys(allKeys); diff --git a/packages/workflows-shared/src/lib/errors.ts b/packages/workflows-shared/src/lib/errors.ts index 1f529692c8..9f0c95ca29 100644 --- a/packages/workflows-shared/src/lib/errors.ts +++ b/packages/workflows-shared/src/lib/errors.ts @@ -21,6 +21,22 @@ export class WorkflowError extends Error { name = "WorkflowError"; } +export class InvalidStepReadableStreamError extends Error { + name = "InvalidStepReadableStreamError"; +} + +export class OversizedStreamChunkError extends Error { + name = "OversizedStreamChunkError"; +} + +export class UnsupportedStreamChunkError extends Error { + name = "UnsupportedStreamChunkError"; +} + +export class StreamOutputStorageLimitError extends Error { + name = "StreamOutputStorageLimitError"; +} + export function createWorkflowError( message: string, errorCode: string @@ -36,6 +52,7 @@ export const ABORT_REASONS = { USER_TERMINATE: `${ABORT_PREFIX} User called terminate`, NON_RETRYABLE_ERROR: `${ABORT_PREFIX} A step threw a NonRetryableError`, NOT_SERIALISABLE: `${ABORT_PREFIX} Value is not serialisable`, + STORAGE_LIMIT_EXCEEDED: `${ABORT_PREFIX} Storage limit exceeded`, GRACE_PERIOD_COMPLETE: `${ABORT_PREFIX} Grace period complete`, } as const; diff --git a/packages/workflows-shared/src/lib/streams.ts b/packages/workflows-shared/src/lib/streams.ts new file mode 100644 index 0000000000..563ab62639 --- /dev/null +++ b/packages/workflows-shared/src/lib/streams.ts @@ -0,0 +1,698 @@ +import { + InvalidStepReadableStreamError, + OversizedStreamChunkError, + StreamOutputStorageLimitError, + UnsupportedStreamChunkError, + WorkflowTimeoutError, +} from "./errors"; + +// ── Constants ─────────────────────────────────────────────────────────────── + +export const DEFAULT_STREAM_OUTPUT_CHUNK_SIZE = 256 * 1024; +export const STREAM_OUTPUT_META_SUFFIX = "-value-stream-meta"; +export const MAX_STREAM_OUTPUT_INPUT_CHUNK_BYTES = 16 * 1024 * 1024; +export const STREAMING_STEP_CHUNKS_TABLE = "streaming_step_chunks"; +export const MAX_OUTPUT_SHOWN_IN_LOGS = 1024; + +const DO_STORAGE_LIMIT = 1024 * 1024 * 1024 + 100 * 1024 * 1024; +const STREAM_OUTPUT_STORAGE_WRITE_HEADROOM_BYTES = 16 * 1024; + +// ── Types ─────────────────────────────────────────────────────────────────── + +export enum StreamOutputState { + Pending = "pending", + Committing = "committing", + Complete = "complete", +} + +export type StreamOutputMeta = { + version: 1; + state: StreamOutputState; + attempt: number; + startedAt: number; + chunkCount: number; + totalBytes: number; + committedAt: number | null; +}; + +export type StoredStreamOutputPreview = + | { type: "text"; output: string } + | { type: "binary" }; + +export class InvalidStoredStreamOutputError extends Error { + name = "InvalidStoredStreamOutputError"; +} + +// ── Helpers ───────────────────────────────────────────────────────────────── + +export function getStreamOutputMetaKey(cacheKey: string): string { + return `${cacheKey}${STREAM_OUTPUT_META_SUFFIX}`; +} + +export function isReadableStreamLike( + value: unknown +): value is ReadableStream { + return value instanceof ReadableStream; +} + +function createInvalidStepReadableStreamError(): InvalidStepReadableStreamError { + return new InvalidStepReadableStreamError( + "Step returned a ReadableStream that is already locked or otherwise unreadable. Return a fresh, unlocked ReadableStream from step.do()." + ); +} + +function createOversizedStreamChunkError(): OversizedStreamChunkError { + return new OversizedStreamChunkError( + `Step returned a ReadableStream chunk larger than the maximum allowed size of ${MAX_STREAM_OUTPUT_INPUT_CHUNK_BYTES} bytes. ` + + "Return smaller chunks from step.do()." + ); +} + +/** + * Normalize any incoming chunk to Uint8Array. + * Accepts ArrayBuffer, TypedArrays (except DataView), and Uint8Array directly. + * Rejects strings, objects, and other non-binary types. + */ +function normalizeChunkToUint8Array(value: unknown): Uint8Array { + if (value instanceof Uint8Array) { + return value; + } + + if (value instanceof ArrayBuffer) { + return new Uint8Array(value); + } + + if (ArrayBuffer.isView(value) && !(value instanceof DataView)) { + return new Uint8Array(value.buffer, value.byteOffset, value.byteLength); + } + + throw new UnsupportedStreamChunkError( + "Step returned a ReadableStream with unsupported chunk type. " + + "Only ArrayBuffer and TypedArray chunks are supported." + ); +} + +// ── Buffer helpers ────────────────────────────────────────────────────────── + +function takeBufferedBytes( + bufferedChunks: Uint8Array[], + byteLength: number +): Uint8Array { + const output = new Uint8Array(byteLength); + let offset = 0; + + while (offset < byteLength) { + const chunk = bufferedChunks[0]; + const remaining = byteLength - offset; + + if (chunk.byteLength <= remaining) { + output.set(chunk, offset); + offset += chunk.byteLength; + bufferedChunks.shift(); + continue; + } + + output.set(chunk.subarray(0, remaining), offset); + bufferedChunks[0] = chunk.subarray(remaining); + offset += remaining; + } + + return output; +} + +// ── Stream iteration ──────────────────────────────────────────────────────── + +async function* iterateStreamChunks( + stream: ReadableStream, + signal?: AbortSignal +): AsyncGenerator { + if (stream.locked) { + throw createInvalidStepReadableStreamError(); + } + + if (signal?.aborted) { + throw ( + signal.reason ?? + new DOMException("The operation was aborted.", "AbortError") + ); + } + + let reader: ReadableStreamDefaultReader; + try { + reader = stream.getReader(); + } catch (error) { + if (error instanceof TypeError) { + throw createInvalidStepReadableStreamError(); + } + throw error; + } + + const onAbort = () => { + void reader + .cancel( + signal?.reason ?? + new DOMException("The operation was aborted.", "AbortError") + ) + .catch(() => {}); + }; + + signal?.addEventListener("abort", onAbort, { once: true }); + let fullyRead = false; + + try { + while (true) { + let readResult: ReadableStreamReadResult; + try { + readResult = await reader.read(); + } catch (readError) { + // When the abort signal has already fired, the read error is an + // expected cancellation (e.g. step timeout or engine shutdown). + // Re-throw the original abort reason so callers can distinguish + // timeouts from genuine stream failures. + if (signal?.aborted) { + throw ( + signal.reason ?? + new DOMException("The operation was aborted.", "AbortError") + ); + } + + // Any other read failure (broken pipe, upstream connection + // drop, encoding mismatch, etc.) means the ReadableStream the + // step returned is unusable. + throw new InvalidStepReadableStreamError( + "Failed to read from step ReadableStream output. " + + (readError instanceof Error ? readError.message : String(readError)) + ); + } + + if (signal?.aborted) { + throw ( + signal.reason ?? + new DOMException("The operation was aborted.", "AbortError") + ); + } + if (readResult.done) { + fullyRead = true; + return; + } + + yield normalizeChunkToUint8Array(readResult.value); + } + } finally { + signal?.removeEventListener("abort", onAbort); + if (!fullyRead) { + await reader + .cancel( + new Error("stream output consumption stopped before completion") + ) + .catch(() => {}); + } + try { + reader.releaseLock(); + } catch { + // Reader may still be processing cancel() + } + } +} + +// ── SQL helpers ───────────────────────────────────────────────────────────── + +function deleteAttemptChunks( + storage: DurableObjectStorage, + cacheKey: string, + attempt: number +): void { + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + storage.sql.exec( + `DELETE FROM ${STREAMING_STEP_CHUNKS_TABLE} WHERE cache_key = ? AND attempt = ?`, + cacheKey, + attempt + ); +} + +async function deleteMetaForAttempt( + storage: DurableObjectStorage, + cacheKey: string, + attempt: number +): Promise { + const metaKey = getStreamOutputMetaKey(cacheKey); + const maybeMeta = await storage.get(metaKey); + if (maybeMeta === undefined) { + return; + } + if (maybeMeta.attempt !== attempt) { + return; + } + await storage.delete(metaKey); +} + +// ── Integrity validation ──────────────────────────────────────────────────── + +type StreamOutputChunkSummary = { + chunkCount: number; + minChunkIndex: number | null; + maxChunkIndex: number | null; + totalBytes: number; +}; + +function getStreamOutputChunkSummary( + storage: DurableObjectStorage, + cacheKey: string, + attempt: number +): StreamOutputChunkSummary { + const row = storage.sql + .exec( + [ + `SELECT`, + ` COUNT(*) AS chunkCount,`, + ` MIN(chunk_index) AS minChunkIndex,`, + ` MAX(chunk_index) AS maxChunkIndex,`, + ` CAST(COALESCE(SUM(LENGTH(chunk)), 0) AS INTEGER) AS totalBytes`, + `FROM ${STREAMING_STEP_CHUNKS_TABLE}`, + `WHERE cache_key = ? AND attempt = ?`, + ].join("\n"), + cacheKey, + attempt + ) + .one(); + + if (row === null) { + throw new Error("Expected stream chunk summary query to return a row"); + } + return row; +} + +export function getInvalidStoredStreamOutputError( + storage: DurableObjectStorage, + cacheKey: string, + meta: StreamOutputMeta +): InvalidStoredStreamOutputError | undefined { + const summary = getStreamOutputChunkSummary(storage, cacheKey, meta.attempt); + + if (meta.chunkCount === 0) { + if ( + summary.chunkCount === 0 && + summary.totalBytes === 0 && + summary.minChunkIndex === null && + summary.maxChunkIndex === null + ) { + return undefined; + } + } else if ( + summary.chunkCount === meta.chunkCount && + summary.minChunkIndex === 0 && + summary.maxChunkIndex === meta.chunkCount - 1 && + summary.totalBytes === meta.totalBytes + ) { + return undefined; + } + + return new InvalidStoredStreamOutputError( + `Stored streamed step output is corrupt or incomplete for cache key ${cacheKey}. ` + + `Expected ${meta.chunkCount} chunks / ${meta.totalBytes} bytes, found ` + + `${summary.chunkCount} chunks / ${summary.totalBytes} bytes with chunk index range ` + + `${summary.minChunkIndex ?? "null"}..${summary.maxChunkIndex ?? "null"}.` + ); +} + +// ── Preview ───────────────────────────────────────────────────────────────── + +function readStreamOutputPreviewBytes(options: { + storage: DurableObjectStorage; + cacheKey: string; + attempt: number; + maxBytes: number; +}): Uint8Array { + const { storage, cacheKey, attempt, maxBytes } = options; + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + const cursor = storage.sql.exec<{ + chunk_index: number; + chunk: ArrayBuffer; + }>( + `SELECT chunk_index, chunk FROM ${STREAMING_STEP_CHUNKS_TABLE} WHERE cache_key = ? AND attempt = ? ORDER BY chunk_index`, + cacheKey, + attempt + ); + const previewChunks: Uint8Array[] = []; + let expectedChunkIndex = 0; + let totalBytes = 0; + + while (totalBytes < maxBytes) { + const row = cursor.next(); + if (row.done) { + break; + } + + if (row.value.chunk_index !== expectedChunkIndex) { + throw new InvalidStoredStreamOutputError( + `Missing chunk ${expectedChunkIndex} for streamed step output` + ); + } + + if (!(row.value.chunk instanceof ArrayBuffer)) { + throw new InvalidStoredStreamOutputError( + "Invalid chunk type returned from streaming_step_chunks table" + ); + } + + const chunkBytes = new Uint8Array(row.value.chunk); + const remainingBytes = maxBytes - totalBytes; + const previewChunk = + chunkBytes.byteLength > remainingBytes + ? chunkBytes.subarray(0, remainingBytes) + : chunkBytes; + + previewChunks.push(previewChunk); + totalBytes += previewChunk.byteLength; + expectedChunkIndex++; + } + + return takeBufferedBytes(previewChunks, totalBytes); +} + +export function getStoredStreamOutputPreview(options: { + storage: DurableObjectStorage; + cacheKey: string; + meta: StreamOutputMeta; + maxChars: number; +}): StoredStreamOutputPreview { + const { storage, cacheKey, meta, maxChars } = options; + if (meta.state !== StreamOutputState.Complete) { + throw new Error( + "Cannot preview streamed step output before it is complete" + ); + } + + // UTF-8 uses at most 4 bytes per code point + const maxPreviewBytes = maxChars * 4; + const previewBytes = readStreamOutputPreviewBytes({ + storage, + cacheKey, + attempt: meta.attempt, + maxBytes: maxPreviewBytes, + }); + const previewTruncatedByBytes = meta.totalBytes > previewBytes.byteLength; + + try { + const decoded = new TextDecoder("utf-8", { + fatal: true, + ignoreBOM: false, + }).decode(previewBytes, { stream: previewTruncatedByBytes }); + + const previewOutput = decoded.substring(0, maxChars); + if (decoded.length > maxChars || previewTruncatedByBytes) { + return { type: "text", output: previewOutput + "[truncated output]" }; + } + return { type: "text", output: previewOutput }; + } catch { + return { type: "binary" }; + } +} + +// ── Cleanup ───────────────────────────────────────────────────────────────── + +export async function cleanupPendingStreamOutput( + storage: DurableObjectStorage, + cacheKey: string +): Promise { + const metaKey = getStreamOutputMetaKey(cacheKey); + const maybeMeta = await storage.get(metaKey); + + if (maybeMeta === undefined) { + return; + } + + if (maybeMeta.state === StreamOutputState.Complete) { + return; + } + + await rollbackStreamOutput(storage, cacheKey, maybeMeta.attempt); +} + +export async function rollbackStreamOutput( + storage: DurableObjectStorage, + cacheKey: string, + attempt: number +): Promise { + deleteAttemptChunks(storage, cacheKey, attempt); + await deleteMetaForAttempt(storage, cacheKey, attempt); +} + +// ── Write ─────────────────────────────────────────────────────────────────── + +async function doWriteStreamOutput(options: { + storage: DurableObjectStorage; + cacheKey: string; + attempt: number; + stream: ReadableStream; + chunkSizeBytes?: number; + signal?: AbortSignal; + skipMetaWrite?: boolean; +}): Promise { + const { storage, cacheKey, attempt, stream, signal, skipMetaWrite } = options; + const chunkSizeBytes = + options.chunkSizeBytes ?? DEFAULT_STREAM_OUTPUT_CHUNK_SIZE; + const metaKey = getStreamOutputMetaKey(cacheKey); + + const maybeInvalidState = (additionalBytes = 0): unknown => { + if (signal?.aborted) { + return ( + signal.reason ?? + new DOMException("The operation was aborted.", "AbortError") + ); + } + + const currentStorageBytes = storage.sql.databaseSize; + if ( + currentStorageBytes + + additionalBytes + + STREAM_OUTPUT_STORAGE_WRITE_HEADROOM_BYTES > + DO_STORAGE_LIMIT + ) { + return new StreamOutputStorageLimitError( + "The instance has exceeded the 1GiB storage limit" + ); + } + }; + + const initialInvalidState = maybeInvalidState(); + if (initialInvalidState !== undefined) { + throw initialInvalidState; + } + + const startedAt = Date.now(); + if (!skipMetaWrite) { + await storage.put(metaKey, { + version: 1, + state: StreamOutputState.Pending, + attempt, + startedAt, + chunkCount: 0, + totalBytes: 0, + committedAt: null, + } satisfies StreamOutputMeta); + } + + let chunkCount = 0; + let totalBytes = 0; + const bufferedChunks: Uint8Array[] = []; + let bufferedBytes = 0; + let outputCommitted = false; + + const flushChunk = async (bytes: Uint8Array) => { + const invalidState = maybeInvalidState(bytes.byteLength); + if (invalidState !== undefined) { + throw invalidState; + } + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + storage.sql.exec( + `INSERT INTO ${STREAMING_STEP_CHUNKS_TABLE} (cache_key, attempt, chunk_index, chunk) VALUES (?, ?, ?, ?)`, + cacheKey, + attempt, + chunkCount, + bytes + ); + totalBytes += bytes.byteLength; + chunkCount++; + }; + + try { + for await (const bytes of iterateStreamChunks(stream, signal)) { + if (bytes.byteLength === 0) { + continue; + } + + if (bytes.byteLength > MAX_STREAM_OUTPUT_INPUT_CHUNK_BYTES) { + throw createOversizedStreamChunkError(); + } + + bufferedChunks.push(bytes); + bufferedBytes += bytes.byteLength; + + // NOTE: we want chunks with fixed length, + // that's why we buffer them in-memory here + while (bufferedBytes >= chunkSizeBytes) { + const chunk = takeBufferedBytes(bufferedChunks, chunkSizeBytes); + bufferedBytes -= chunk.byteLength; + await flushChunk(chunk); + } + } + + // Last chunk (remainder) + if (bufferedBytes > 0) { + await flushChunk(takeBufferedBytes(bufferedChunks, bufferedBytes)); + bufferedBytes = 0; + } + + const meta = { + version: 1, + state: StreamOutputState.Complete, + attempt, + startedAt, + chunkCount, + totalBytes, + committedAt: Date.now(), + } satisfies StreamOutputMeta; + + const invalidState = maybeInvalidState(); + if (invalidState !== undefined) { + throw invalidState; + } + + if (!skipMetaWrite) { + // Transition to Committing to signal it's safe to await. + // Mock stream setup still needs the SQL chunks and returned meta, but + // must not publish the normal stream cache key because Context.do() + // treats it as a completed prior run and skips step execution logging. + await storage.put(metaKey, { + version: 1, + state: StreamOutputState.Committing, + attempt, + startedAt, + chunkCount, + totalBytes, + committedAt: null, + } satisfies StreamOutputMeta); + + await storage.put(metaKey, meta); + } + outputCommitted = true; + return meta; + } catch (error) { + if (!outputCommitted) { + await rollbackStreamOutput(storage, cacheKey, attempt); + } + throw error; + } +} + +export async function writeStreamOutput(options: { + storage: DurableObjectStorage; + cacheKey: string; + attempt: number; + stream: ReadableStream; + chunkSizeBytes?: number; + signal?: AbortSignal; + timeoutTask?: Promise; + skipMetaWrite?: boolean; +}): Promise { + const { storage, cacheKey, attempt, timeoutTask, ...writeOptions } = options; + const writeTask = doWriteStreamOutput({ + storage, + cacheKey, + attempt, + ...writeOptions, + }); + + if (timeoutTask === undefined) { + return writeTask; + } + + try { + return await Promise.race([writeTask, timeoutTask]); + } catch (error) { + if (error instanceof WorkflowTimeoutError) { + if (options.skipMetaWrite) { + void writeTask.catch(() => {}); + throw error; + } + + const maybeMeta = await storage.get( + getStreamOutputMetaKey(cacheKey) + ); + if ( + maybeMeta?.attempt === attempt && + (maybeMeta.state === StreamOutputState.Committing || + maybeMeta.state === StreamOutputState.Complete) + ) { + // Safe to await -- not in the middle of writing chunks + return await writeTask; + } + + void writeTask.catch(() => {}); + throw error; + } + + throw error; + } +} + +// ── Replay ────────────────────────────────────────────────────────────────── + +export function createReplayReadableStream(options: { + storage: DurableObjectStorage; + cacheKey: string; + meta: StreamOutputMeta; +}): ReadableStream { + const { storage, cacheKey, meta } = options; + if (meta.state !== StreamOutputState.Complete) { + throw new Error("Cannot replay streamed step output before it is complete"); + } + + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + const chunkCursor = storage.sql.exec<{ + chunk_index: number; + chunk: ArrayBuffer; + }>( + `SELECT chunk_index, chunk FROM ${STREAMING_STEP_CHUNKS_TABLE} WHERE cache_key = ? AND attempt = ? ORDER BY chunk_index`, + cacheKey, + meta.attempt + ); + let index = 0; + + return new ReadableStream({ + pull(controller) { + if (index >= meta.chunkCount) { + controller.close(); + return; + } + + const row = chunkCursor.next(); + if (row.done) { + controller.error( + new Error(`Missing chunk ${index} for streamed step output`) + ); + return; + } + + if (row.value.chunk_index !== index) { + controller.error( + new Error(`Missing chunk ${index} for streamed step output`) + ); + return; + } + + if (!(row.value.chunk instanceof ArrayBuffer)) { + controller.error( + new Error( + "Invalid chunk type returned from streaming_step_chunks table" + ) + ); + return; + } + + controller.enqueue(new Uint8Array(row.value.chunk)); + index++; + }, + }); +} diff --git a/packages/workflows-shared/src/modifier.ts b/packages/workflows-shared/src/modifier.ts index 24c48b0850..f4a6fd132e 100644 --- a/packages/workflows-shared/src/modifier.ts +++ b/packages/workflows-shared/src/modifier.ts @@ -1,5 +1,6 @@ import { RpcTarget } from "cloudflare:workers"; import { computeHash } from "./lib/cache"; +import { isReadableStreamLike, writeStreamOutput } from "./lib/streams"; import type { Event } from "./context"; import type { Engine } from "./engine"; @@ -54,16 +55,18 @@ export class WorkflowInstanceModifier extends RpcTarget { return waitForEventKey; } - async #getStepCacheKey(step: StepSelector): Promise { + async #getBaseCacheKey(step: StepSelector): Promise { const hash = await computeHash(step.name); let count = 1; if (step.index) { count = step.index; } - const cacheKey = `${hash}-${count}`; - const valueKey = `${cacheKey}-value`; + return `${hash}-${count}`; + } - return valueKey; + async #getStepCacheKey(step: StepSelector): Promise { + const baseCacheKey = await this.#getBaseCacheKey(step); + return `${baseCacheKey}-value`; } #getAndIncrementCounter = async (valueKey: string, by: number) => { @@ -130,10 +133,33 @@ export class WorkflowInstanceModifier extends RpcTarget { ); } - await this.#state.storage.put( - `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}`, - stepResult - ); + if (isReadableStreamLike(stepResult)) { + // ReadableStream is not structured-cloneable, so we consume it eagerly + // and store the chunks via the streaming infrastructure. We use attempt 0 + // to distinguish mock chunks from real execution attempts (which start at 1). + const baseCacheKey = await this.#getBaseCacheKey(step); + const streamMeta = await writeStreamOutput({ + storage: this.#state.storage, + cacheKey: baseCacheKey, + attempt: 0, + stream: stepResult, + skipMetaWrite: true, + }); + + await this.#state.storage.put( + `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}`, + { + __mockStreamOutput: true, + cacheKey: baseCacheKey, + meta: streamMeta, + } + ); + } else { + await this.#state.storage.put( + `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}`, + stepResult + ); + } } // Same logic of `mockStepResult` but stores an error instead of a value. diff --git a/packages/workflows-shared/tests/context.test.ts b/packages/workflows-shared/tests/context.test.ts index 03a5aa569f..22709adda6 100644 --- a/packages/workflows-shared/tests/context.test.ts +++ b/packages/workflows-shared/tests/context.test.ts @@ -2,9 +2,25 @@ import { env, runInDurableObject } from "cloudflare:test"; import { afterEach, describe, it, vi } from "vitest"; import workerdUnsafe from "workerd:unsafe"; import { InstanceEvent } from "../src"; +import { computeHash } from "../src/lib/cache"; +import { + InvalidStepReadableStreamError, + OversizedStreamChunkError, + UnsupportedStreamChunkError, + WorkflowTimeoutError, + isAbortError, +} from "../src/lib/errors"; +import { + STREAMING_STEP_CHUNKS_TABLE, + getStreamOutputMetaKey, + StreamOutputState, + rollbackStreamOutput, + writeStreamOutput, +} from "../src/lib/streams"; import { MODIFIER_KEYS } from "../src/modifier"; import { runWorkflow, runWorkflowAndAwait } from "./utils"; -import type { EngineLogs } from "../src/engine"; +import type { Engine, EngineLogs } from "../src/engine"; +import type { StreamOutputMeta } from "../src/lib/streams"; afterEach(async () => { await workerdUnsafe.abortAllDurableObjects(); @@ -149,3 +165,859 @@ describe("Context", () => { expect(elapsed).toBeLessThan(5000); }); }); + +// ── Helpers for stream tests ──────────────────────────────────────────────── + +function encodeUtf8(str: string): Uint8Array { + return new TextEncoder().encode(str); +} + +function decodeUtf8(bytes: Uint8Array): string { + return new TextDecoder().decode(bytes); +} + +async function readStreamBytes( + stream: ReadableStream +): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + chunks.push(value); + } + let totalLength = 0; + for (const chunk of chunks) { + totalLength += chunk.byteLength; + } + const result = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.byteLength; + } + return result; +} + +function countStreamOutputChunks( + state: DurableObjectState, + cacheKey: string +): number { + const row = state.storage.sql + .exec<{ cnt: number }>( + `SELECT COUNT(*) AS cnt FROM streaming_step_chunks WHERE cache_key = ?`, + cacheKey + ) + .one(); + return row?.cnt ?? 0; +} + +describe("Context - ReadableStream step outputs", () => { + it("should persist a readable stream output and replay from cache", async ({ + expect, + }) => { + const payload = "hello from a readable stream ".repeat(500); // ~14KB + const payloadBytes = encodeUtf8(payload); + let callCount = 0; + + const engineStub = await runWorkflow( + "STREAM-BASIC", + async (_event, step) => { + const stream = await step.do("stream step", async () => { + callCount++; + return new ReadableStream({ + start(controller) { + // Enqueue in two chunks + const mid = Math.floor(payloadBytes.length / 2); + controller.enqueue(payloadBytes.slice(0, mid)); + controller.enqueue(payloadBytes.slice(mid)); + controller.close(); + }, + }); + }); + + // The result should be a ReadableStream we can read + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return decodeUtf8(bytes); + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + const successLog = logs.logs.find( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + expect(successLog?.metadata.result).toBe(payload); + + // The closure should have been called exactly once (cache hit on replay) + expect(callCount).toBe(1); + + // Verify the stream metadata is stored and marked complete + const hash = await computeHash("stream step"); + const cacheKey = `${hash}-1`; + const metaKey = getStreamOutputMetaKey(cacheKey); + + await runInDurableObject(engineStub, async (_engine, state) => { + const meta = (await state.storage.get(metaKey)) as StreamOutputMeta; + expect(meta).toBeDefined(); + expect(meta.state).toBe(StreamOutputState.Complete); + expect(meta.totalBytes).toBe(payloadBytes.byteLength); + expect(meta.chunkCount).toBeGreaterThanOrEqual(1); + }); + }); + + it("should persist an empty readable stream", async ({ expect }) => { + const engineStub = await runWorkflow( + "STREAM-EMPTY", + async (_event, step) => { + const stream = await step.do("empty stream step", async () => { + return new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + }); + + // Should be a readable stream that yields no data + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return bytes.byteLength; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + const successLog = logs.logs.find( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + expect(successLog?.metadata.result).toBe(0); + + // Verify stream metadata + const hash = await computeHash("empty stream step"); + const cacheKey = `${hash}-1`; + const metaKey = getStreamOutputMetaKey(cacheKey); + + await runInDurableObject(engineStub, async (_engine, state) => { + const meta = (await state.storage.get(metaKey)) as StreamOutputMeta; + expect(meta).toBeDefined(); + expect(meta.state).toBe(StreamOutputState.Complete); + expect(meta.totalBytes).toBe(0); + expect(meta.chunkCount).toBe(0); + }); + }); + + // ── Tests that exercise stream error paths directly inside the DO ──── + // + // The tests below call writeStreamOutput() inside runInDurableObject() + // rather than running a full workflow through step.do(). + // + // Why: In the test harness the user callback runs in a separate + // WorkerEntrypoint (TestWorkflow) and communicates with the Engine + // Durable Object over RPC. Returning a problematic ReadableStream from + // that callback fails at the *RPC transfer* layer with native workerd + // errors that are different from the custom errors the engine raises: + // + // • Locked stream → TypeError: "The ReadableStream has been locked + // to a reader." + // • Non-byte chunks → TypeError: "This ReadableStream did not return + // bytes." + // • Very large chunks → Error: "Network connection lost." (the RPC + // pipe disconnects under load) + // + // In production the callback executes in the *same* isolate as the + // engine, so the stream reaches writeStreamOutput() directly and the + // engine's own validation (iterateStreamChunks, normalizeChunkToUint8Array, + // chunk-size check) surfaces the correct custom error types. + // + // By calling writeStreamOutput() inside the DO we replicate the + // production code-path and avoid the RPC transfer artefacts. + + it("should surface a locked readable stream as a fatal error", async ({ + expect, + }) => { + const engineId = env.ENGINE.idFromName("STREAM-LOCKED"); + const engineStub = env.ENGINE.get(engineId); + + await runInDurableObject(engineStub, async (_engine, state) => { + // Create the streaming table (normally done during engine init) + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + state.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS ${STREAMING_STEP_CHUNKS_TABLE} ( + cache_key TEXT NOT NULL, + attempt INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + chunk BLOB NOT NULL, + PRIMARY KEY (cache_key, attempt, chunk_index) + ) WITHOUT ROWID + `); + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encodeUtf8("data")); + controller.close(); + }, + }); + // Lock the stream by acquiring a reader + stream.getReader(); + + try { + await writeStreamOutput({ + storage: state.storage, + cacheKey: "locked-stream-test", + attempt: 1, + stream, + }); + expect.unreachable("writeStreamOutput should have thrown"); + } catch (e) { + expect(e).toBeInstanceOf(InvalidStepReadableStreamError); + } + }); + }); + + it("should surface an unsupported chunk type as a fatal error", async ({ + expect, + }) => { + const engineId = env.ENGINE.idFromName("STREAM-UNSUPPORTED-CHUNK"); + const engineStub = env.ENGINE.get(engineId); + + await runInDurableObject(engineStub, async (_engine, state) => { + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + state.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS ${STREAMING_STEP_CHUNKS_TABLE} ( + cache_key TEXT NOT NULL, + attempt INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + chunk BLOB NOT NULL, + PRIMARY KEY (cache_key, attempt, chunk_index) + ) WITHOUT ROWID + `); + + const stream = new ReadableStream({ + start(controller) { + // Enqueue a string -- not supported (only ArrayBuffer / TypedArray) + controller.enqueue("this is a string, not bytes"); + controller.close(); + }, + }); + + try { + await writeStreamOutput({ + storage: state.storage, + cacheKey: "unsupported-chunk-test", + attempt: 1, + stream, + }); + expect.unreachable("writeStreamOutput should have thrown"); + } catch (e) { + expect(e).toBeInstanceOf(UnsupportedStreamChunkError); + } + }); + }); + + it("should surface an oversized stream chunk as a fatal error", async ({ + expect, + }) => { + const engineId = env.ENGINE.idFromName("STREAM-OVERSIZED-CHUNK"); + const engineStub = env.ENGINE.get(engineId); + + await runInDurableObject(engineStub, async (_engine, state) => { + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + state.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS ${STREAMING_STEP_CHUNKS_TABLE} ( + cache_key TEXT NOT NULL, + attempt INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + chunk BLOB NOT NULL, + PRIMARY KEY (cache_key, attempt, chunk_index) + ) WITHOUT ROWID + `); + + const stream = new ReadableStream({ + start(controller) { + // 17 MiB chunk -- exceeds the 16 MiB per-chunk limit + controller.enqueue(new Uint8Array(17 * 1024 * 1024)); + controller.close(); + }, + }); + + try { + await writeStreamOutput({ + storage: state.storage, + cacheKey: "oversized-chunk-test", + attempt: 1, + stream, + }); + expect.unreachable("writeStreamOutput should have thrown"); + } catch (e) { + expect(e).toBeInstanceOf(OversizedStreamChunkError); + } + }); + }); + + it("should clean up streaming_step_chunks on restart", async ({ expect }) => { + const instanceId = "STREAM-RESTART-CLEANUP"; + const engineId = env.ENGINE.idFromName(instanceId); + + const engineStub = await runWorkflowAndAwait( + instanceId, + async (_event, step) => { + const stream = await step.do("stream before restart", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(encodeUtf8("data for restart test")); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return decodeUtf8(bytes); + } + ); + + // Verify chunks exist before restart + const hash = await computeHash("stream before restart"); + const cacheKey = `${hash}-1`; + + await runInDurableObject(engineStub, async (_engine, state) => { + const chunkCount = countStreamOutputChunks(state, cacheKey); + expect(chunkCount).toBeGreaterThanOrEqual(1); + }); + + // Trigger restart + try { + await runInDurableObject(engineStub, async (engine) => { + await engine.changeInstanceStatus("restart"); + }); + } catch (e) { + if (!isAbortError(e)) { + throw e; + } + } + + const restartedStub: DurableObjectStub = env.ENGINE.get(engineId); + await runInDurableObject(restartedStub, async (engine, state) => { + await engine.attemptRestart(); + + // Check immediately after attemptRestart returns, before the + // fire-and-forget init() call (engine.ts:892) has a chance to + // re-execute the workflow and recreate the chunks. + const chunkCount = countStreamOutputChunks(state, cacheKey); + expect(chunkCount).toBe(0); + }); + }); + + it("should preserve mock stream chunks across restart", async ({ + expect, + }) => { + const mockPayload = "mock stream survives restart"; + const mockPayloadBytes = encodeUtf8(mockPayload); + + const instanceId = "STREAM-MOCK-SURVIVES-RESTART"; + const engineId = env.ENGINE.idFromName(instanceId); + const engineStub = env.ENGINE.get(engineId); + + // Set up a mocked stream step via the modifier + await runInDurableObject(engineStub, async (engine) => { + const modifier = engine.getInstanceModifier(); + await modifier.mockStepResult( + { name: "mocked restart stream" }, + new ReadableStream({ + start(controller) { + controller.enqueue(mockPayloadBytes); + controller.close(); + }, + }) + ); + }); + + const hash = await computeHash("mocked restart stream"); + const baseCacheKey = `${hash}-1`; + + // Run the workflow once so the engine is initialised and can be restarted + const stub = await runWorkflow(instanceId, async (_event, step) => { + const stream = await step.do("mocked restart stream", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(encodeUtf8("WRONG - real step ran")); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes(stream as ReadableStream); + return decodeUtf8(bytes); + }); + + await vi.waitUntil( + async () => { + const logs = (await stub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + // Trigger restart + try { + await runInDurableObject(stub, async (engine) => { + await engine.changeInstanceStatus("restart"); + }); + } catch (e) { + if (!isAbortError(e)) { + throw e; + } + } + + // After restart, mock stream chunks (attempt=0) must still exist + const restartedStub: DurableObjectStub = env.ENGINE.get(engineId); + await runInDurableObject(restartedStub, async (engine, state) => { + await engine.attemptRestart(); + + const chunkCount = countStreamOutputChunks(state, baseCacheKey); + expect(chunkCount).toBeGreaterThanOrEqual(1); + }); + + // Run the workflow again — the mock sentinel (KV) and its SQL chunks + // should both be present, allowing the mocked stream to replay correctly. + const stub2 = await runWorkflow(instanceId, async (_event, step) => { + const stream = await step.do("mocked restart stream", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue( + encodeUtf8("WRONG - real step ran after restart") + ); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes(stream as ReadableStream); + return decodeUtf8(bytes); + }); + + await vi.waitUntil( + async () => { + const logs = (await stub2.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await stub2.readLogs()) as EngineLogs; + const successLog = logs.logs.find( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + expect(successLog?.metadata.result).toBe(mockPayload); + }); + + it("should normalize TypedArray chunks to Uint8Array on replay", async ({ + expect, + }) => { + // Return Int16Array chunks -- they should be stored as raw bytes + // and replayed as Uint8Array + const int16Data = new Int16Array([1, 2, 3, 256, -1]); + const expectedBytes = new Uint8Array( + int16Data.buffer, + int16Data.byteOffset, + int16Data.byteLength + ); + + const engineStub = await runWorkflow( + "STREAM-TYPED-ARRAY", + async (_event, step) => { + const stream = await step.do("typed array step", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(int16Data); + controller.close(); + }, + }); + }); + + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return Array.from(bytes); + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + const successLog = logs.logs.find( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + expect(successLog?.metadata.result).toEqual(Array.from(expectedBytes)); + }); + + it("should return a replay ReadableStream from waitForStepResult", async ({ + expect, + }) => { + const payload = "stream content for waitForStepResult"; + const payloadBytes = encodeUtf8(payload); + + const engineStub = await runWorkflow( + "STREAM-WAIT-FOR-STEP", + async (_event, step) => { + const stream = await step.do("stream step", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(payloadBytes); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return decodeUtf8(bytes); + } + ); + + // Use engine's waitForStepResult to get the stream + const stepResult = await engineStub.waitForStepResult("stream step"); + + // Should be a ReadableStream + expect(stepResult).toBeInstanceOf(ReadableStream); + + const replayBytes = await readStreamBytes( + stepResult as ReadableStream + ); + expect(decodeUtf8(replayBytes)).toBe(payload); + + // Wait for workflow to finish + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + }); + + it("should mock a step result with a ReadableStream via modifier", async ({ + expect, + }) => { + const mockPayload = "mocked stream content from modifier"; + const mockPayloadBytes = encodeUtf8(mockPayload); + + const instanceId = "STREAM-MOCK-STEP-RESULT"; + const engineId = env.ENGINE.idFromName(instanceId); + const engineStub = env.ENGINE.get(engineId); + + // Set up the mock stream result via the modifier before running the workflow + await runInDurableObject(engineStub, async (engine) => { + const modifier = engine.getInstanceModifier(); + await modifier.mockStepResult( + { name: "mocked stream step" }, + new ReadableStream({ + start(controller) { + controller.enqueue(mockPayloadBytes); + controller.close(); + }, + }) + ); + }); + + const hash = await computeHash("mocked stream step"); + const baseCacheKey = `${hash}-1`; + const valueKey = `${baseCacheKey}-value`; + + await runInDurableObject(engineStub, async (_engine, state) => { + // normal non-mocked key should not exist + expect( + await state.storage.get(getStreamOutputMetaKey(baseCacheKey)) + ).toBeUndefined(); + + const replaceResult = await state.storage.get<{ + __mockStreamOutput: true; + cacheKey: string; + meta: StreamOutputMeta; + }>(`${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}`); + + expect(replaceResult?.__mockStreamOutput).toBe(true); + expect(replaceResult?.cacheKey).toBe(baseCacheKey); + expect(replaceResult?.meta.state).toBe(StreamOutputState.Complete); + expect(replaceResult?.meta.attempt).toBe(0); + expect( + countStreamOutputChunks(state, baseCacheKey) + ).toBeGreaterThanOrEqual(1); + }); + + // Run a workflow that uses the mocked step + const stub = await runWorkflow(instanceId, async (_event, step) => { + const stream = await step.do("mocked stream step", async () => { + // This should NOT be called -- the mock replaces it + return new ReadableStream({ + start(controller) { + controller.enqueue(encodeUtf8("WRONG - real step ran")); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes(stream as ReadableStream); + return decodeUtf8(bytes); + }); + + await vi.waitUntil( + async () => { + const logs = (await stub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await stub.readLogs()) as EngineLogs; + const successLog = logs.logs.find( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + // The workflow should have received the mocked stream content + expect(successLog?.metadata.result).toBe(mockPayload); + }); + + it("should resolve stream output to a text preview in readLogs", async ({ + expect, + }) => { + const payload = "hello from stream preview test"; + const payloadBytes = encodeUtf8(payload); + + const engineStub = await runWorkflow( + "STREAM-PREVIEW-TEXT", + async (_event, step) => { + const stream = await step.do("preview step", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(payloadBytes); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return decodeUtf8(bytes); + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + const stepLog = logs.logs.find( + (val) => val.event === InstanceEvent.STEP_SUCCESS + ); + // readLogs() should resolve the stream output metadata to the preview text + expect(stepLog?.metadata.result).toBe(payload); + }); + + it("should truncate a long stream preview in readLogs", async ({ + expect, + }) => { + // Generate a payload longer than the 1024-char preview limit + const payload = "A".repeat(2048); + const payloadBytes = encodeUtf8(payload); + + const engineStub = await runWorkflow( + "STREAM-PREVIEW-TRUNCATED", + async (_event, step) => { + const stream = await step.do("long preview step", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(payloadBytes); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return decodeUtf8(bytes); + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + const stepLog = logs.logs.find( + (val) => val.event === InstanceEvent.STEP_SUCCESS + ); + // readLogs() should truncate the preview to 1024 chars + expect(stepLog?.metadata.result).toBe( + "A".repeat(1024) + "[truncated output]" + ); + }); + + it("should resolve non-UTF-8 stream output to a binary summary in readLogs", async ({ + expect, + }) => { + // Write raw bytes that are not valid UTF-8 + const invalidUtf8 = new Uint8Array([0xff, 0xfe, 0x80, 0x81]); + + const engineStub = await runWorkflow( + "STREAM-PREVIEW-BINARY", + async (_event, step) => { + const stream = await step.do("binary step", async () => { + return new ReadableStream({ + start(controller) { + controller.enqueue(invalidUtf8); + controller.close(); + }, + }); + }); + const bytes = await readStreamBytes( + stream as ReadableStream + ); + return bytes.byteLength; + } + ); + + await vi.waitUntil( + async () => { + const logs = (await engineStub.readLogs()) as EngineLogs; + return logs.logs.some( + (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS + ); + }, + { timeout: 5000 } + ); + + const logs = (await engineStub.readLogs()) as EngineLogs; + const stepLog = logs.logs.find( + (val) => val.event === InstanceEvent.STEP_SUCCESS + ); + // readLogs() should fall back to a binary size summary + expect(stepLog?.metadata.result).toBe( + `[ReadableStream (binary): ${invalidUtf8.byteLength} bytes]` + ); + }); + + it("should time out during stream write and roll back", async ({ + expect, + }) => { + // In the full workflow path a stream-write timeout triggers retries + // (default limit: 5, exponential backoff from 1 s), so the test would + // exceed its timeout before WORKFLOW_FAILURE is ever logged. Testing + // writeStreamOutput() directly lets us verify timeout + rollback + // behaviour without retry overhead. + + const engineId = env.ENGINE.idFromName("STREAM-TIMEOUT"); + const engineStub = env.ENGINE.get(engineId); + + const cacheKey = "timeout-stream-test"; + + await runInDurableObject(engineStub, async (_engine, state) => { + // eslint-disable-next-line workers-sdk/no-unsafe-command-execution -- DO SQL exec, not child_process + state.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS ${STREAMING_STEP_CHUNKS_TABLE} ( + cache_key TEXT NOT NULL, + attempt INTEGER NOT NULL, + chunk_index INTEGER NOT NULL, + chunk BLOB NOT NULL, + PRIMARY KEY (cache_key, attempt, chunk_index) + ) WITHOUT ROWID + `); + + const abortController = new AbortController(); + + // Create a timeout promise that rejects after 1 second, mirroring + // the timeoutPromise() used by Context.do() in production. + const timeoutTask = new Promise((_, reject) => { + setTimeout(() => { + const error = new WorkflowTimeoutError( + "Execution timed out after 1000ms" + ); + abortController.abort(error); + reject(error); + }, 1000); + }); + + // Stream that never closes — each pull emits a small chunk then + // waits longer than the timeout. + const stream = new ReadableStream({ + async pull(controller) { + controller.enqueue(encodeUtf8("chunk ")); + await new Promise((resolve) => setTimeout(resolve, 2000)); + }, + }); + + try { + await writeStreamOutput({ + storage: state.storage, + cacheKey, + attempt: 1, + stream, + signal: abortController.signal, + timeoutTask, + }); + expect.unreachable("writeStreamOutput should have thrown"); + } catch (e) { + expect(e).toBeInstanceOf(WorkflowTimeoutError); + } + + // Mimic the rollback that context.ts performs in the outer catch + // (line ~704) when a stream write times out. + await rollbackStreamOutput(state.storage, cacheKey, 1); + + // Verify that all chunks and metadata were cleaned up + const chunkCount = countStreamOutputChunks(state, cacheKey); + expect(chunkCount).toBe(0); + + const metaKey = getStreamOutputMetaKey(cacheKey); + const meta = await state.storage.get(metaKey); + expect(meta).toBeUndefined(); + }); + }); +});