Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/all-fans-march.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/workflows-shared": minor
---

Add support for ReadableStream on workflow steps. This allows users to overcome the 1MB limit per step output.

`ReadableStream<Uint8Array>` is already serializable on the workers platform. This feature makes it native to workflows as well by persisting each chunk and replaying it if needed
259 changes: 242 additions & 17 deletions packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@ import { INSTANCE_METADATA, InstanceEvent, InstanceStatus } from "./instance";
import { computeHash } from "./lib/cache";
import {
ABORT_REASONS,
InvalidStepReadableStreamError,
OversizedStreamChunkError,
StreamOutputStorageLimitError,
UnsupportedStreamChunkError,
WorkflowFatalError,
WorkflowInternalError,
WorkflowTimeoutError,
} from "./lib/errors";
import { calcRetryDuration } from "./lib/retries";
import {
cleanupPendingStreamOutput,
createReplayReadableStream,
getInvalidStoredStreamOutputError,
getStreamOutputMetaKey,
isReadableStreamLike,
rollbackStreamOutput,
StreamOutputState,
writeStreamOutput,
} from "./lib/streams";
import {
isValidStepConfig,
isValidStepName,
Expand All @@ -17,6 +31,7 @@ import {
import { MODIFIER_KEYS } from "./modifier";
import type { Engine } from "./engine";
import type { InstanceMetadata } from "./instance";
import type { StreamOutputMeta } from "./lib/streams";
import type {
WorkflowSleepDuration,
WorkflowStepConfig,
Expand Down Expand Up @@ -171,6 +186,7 @@ export class Context extends RpcTarget {
const cacheKey = `${hash}-${count}`;

const valueKey = `${cacheKey}-value`;
const streamMetaKey = getStreamOutputMetaKey(cacheKey);
const configKey = `${cacheKey}-config`;
const errorKey = `${cacheKey}-error`;
const stepNameWithCounter = `${name}-${count}`;
Expand All @@ -179,15 +195,43 @@ export class Context extends RpcTarget {

const maybeMap = await this.#state.storage.get([
valueKey,
streamMetaKey,
configKey,
errorKey,
]);

// Check cache
// Check cache -- streams first, then plain values
const maybeStreamMeta = maybeMap.get(streamMetaKey) as
| StreamOutputMeta
| undefined
| null;
if (maybeStreamMeta?.state === StreamOutputState.Complete) {
const maybeOutputError = getInvalidStoredStreamOutputError(
this.#state.storage,
cacheKey,
maybeStreamMeta
);
if (maybeOutputError !== undefined) {
throw new WorkflowInternalError(
`Stored output for ${stepNameWithCounter} is corrupt or incomplete.`
);
}

return createReplayReadableStream({
storage: this.#state.storage,
cacheKey,
meta: maybeStreamMeta,
}) as T;
} else if (maybeStreamMeta !== undefined && maybeStreamMeta !== null) {
// We're not in a complete state - means we crashed while persisting a stream on a previous invocation - need to cleanup
await cleanupPendingStreamOutput(this.#state.storage, cacheKey).catch(
() => {}
);
}

const maybeResult = maybeMap.get(valueKey);

if (maybeResult) {
// console.log(`Cache hit for ${cacheKey}`);
return (maybeResult as { value: T }).value;
}

Expand Down Expand Up @@ -264,6 +308,12 @@ export class Context extends RpcTarget {
)) as StepState) ?? {
attemptedCount: 0,
};

// NOTE(caio): this might be a stream returning step - if so cleanup stale data from previous lifetimes
await cleanupPendingStreamOutput(this.#state.storage, cacheKey).catch(
() => {}
);

await this.#engine.timeoutHandler.acquire(this.#engine);

if (stepState.attemptedCount == 0) {
Expand Down Expand Up @@ -314,8 +364,16 @@ export class Context extends RpcTarget {
}
const { accountId, instance } = instanceMetadata;

let streamResultSeen = false;
let lastStreamMeta: StreamOutputMeta | undefined;
const abortController = new AbortController();
const stepExecutionSignal = AbortSignal.any([
abortController.signal,
this.#engine.engineAbortController.signal,
]);

try {
const timeoutPromise = async () => {
const timeoutPromise = async (): Promise<never> => {
const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;
let timeout = ms(config.timeout);
if (forceStepTimeout) {
Expand All @@ -335,9 +393,11 @@ export class Context extends RpcTarget {
hash: priorityQueueHash,
type: "timeout",
});
throw new WorkflowTimeoutError(
const error = new WorkflowTimeoutError(
`Execution timed out after ${timeout}ms`
);
abortController.abort(error);
throw error;
};

this.#engine.writeLog(
Expand Down Expand Up @@ -382,31 +442,163 @@ export class Context extends RpcTarget {
);
const forceStepTimeout = persistentStepTimeout || transientStepTimeout;

let timeoutTask: Promise<never> | undefined;

const persistStepResult = async (
value: unknown,
activeTimeoutTask?: Promise<never>
): Promise<unknown> => {
if (!isReadableStreamLike(value)) {
await this.#state.storage.put(valueKey, { value });
abortController.abort("step finished");
// @ts-expect-error priorityQueue is initiated in init
this.#engine.priorityQueue.remove({
hash: priorityQueueHash,
type: "timeout",
});
return value;
}

streamResultSeen = true;
const streamMeta = await writeStreamOutput({
storage: this.#state.storage,
cacheKey,
attempt: stepState.attemptedCount,
stream: value as ReadableStream<unknown>,
signal: stepExecutionSignal,
timeoutTask: activeTimeoutTask,
});
lastStreamMeta = streamMeta;

abortController.abort("step finished");
// @ts-expect-error priorityQueue is initiated in init
this.#engine.priorityQueue.remove({
hash: priorityQueueHash,
type: "timeout",
});
return createReplayReadableStream({
storage: this.#state.storage,
cacheKey,
meta: streamMeta,
});
};

if (forceStepTimeout) {
result = await timeoutPromise();
} else if (replaceResult) {
result = replaceResult;
// if there is a timeout to be forced we dont want to race with closure
// Check if the mocked result is a stream sentinel (from mockStepResult with ReadableStream)
if (
replaceResult &&
typeof replaceResult === "object" &&
(replaceResult as Record<string, unknown>).__mockStreamOutput
) {
const sentinel = replaceResult as {
__mockStreamOutput: true;
cacheKey: string;
meta: StreamOutputMeta;
};
result = createReplayReadableStream({
storage: this.#state.storage,
cacheKey: sentinel.cacheKey,
meta: sentinel.meta,
});
} else {
result = replaceResult;
}
} else {
timeoutTask = timeoutPromise();
result = await Promise.race([
doWrapperClosure({ attempt: stepState.attemptedCount }),
timeoutPromise(),
timeoutTask,
]);
}

// if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ
// @ts-expect-error priorityQueue is initiated in init
await this.#engine.priorityQueue.remove({
hash: priorityQueueHash,
type: "timeout",
});

// We store the value of `output` in an object with a `value` property. This allows us to store `undefined`,
// in the case that it's returned from the user's code. This is because DO storage will error if you try to
// store undefined directly.
try {
await this.#state.storage.put(valueKey, { value: result });
result = await persistStepResult(result, timeoutTask);
} catch (e) {
abortController.abort("step errored");
// @ts-expect-error priorityQueue is initiated in init
this.#engine.priorityQueue.remove({
hash: priorityQueueHash,
type: "timeout",
});

if (e instanceof WorkflowTimeoutError) {
throw e;
}

// Stream-specific fatal errors
if (
e instanceof InvalidStepReadableStreamError ||
e instanceof OversizedStreamChunkError ||
e instanceof UnsupportedStreamChunkError
) {
this.#engine.writeLog(
InstanceEvent.ATTEMPT_FAILURE,
cacheKey,
stepNameWithCounter,
{
attempt: stepState.attemptedCount,
error: new WorkflowFatalError(e.message),
}
);
this.#engine.writeLog(
InstanceEvent.STEP_FAILURE,
cacheKey,
stepNameWithCounter,
{}
);
this.#engine.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, {
error: new WorkflowFatalError(
`The execution of the Workflow instance was terminated, as the step "${name}" returned an invalid ReadableStream output. ${e.message}`
),
});

await this.#engine.setStatus(
accountId,
instance.id,
InstanceStatus.Errored
);
await this.#engine.timeoutHandler.release(this.#engine);
await this.#engine.abort(ABORT_REASONS.NOT_SERIALISABLE);
return;
}

if (e instanceof StreamOutputStorageLimitError) {
this.#engine.writeLog(
InstanceEvent.ATTEMPT_FAILURE,
cacheKey,
stepNameWithCounter,
{
attempt: stepState.attemptedCount,
error: new WorkflowFatalError(e.message),
}
);
this.#engine.writeLog(
InstanceEvent.STEP_FAILURE,
cacheKey,
stepNameWithCounter,
{}
);
this.#engine.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, {
error: new WorkflowFatalError(
"The instance has exceeded the 1GiB storage limit"
),
});

await this.#engine.setStatus(
accountId,
instance.id,
InstanceStatus.Errored
);
await this.#engine.timeoutHandler.release(this.#engine);
await this.#engine.abort(ABORT_REASONS.STORAGE_LIMIT_EXCEEDED);
return;
}

// something that cannot be written to storage
if (e instanceof Error && e.name === "DataCloneError") {
this.#engine.writeLog(
Expand Down Expand Up @@ -455,6 +647,13 @@ export class Context extends RpcTarget {
return;
}

// if we reach here, means that the closure ran successfully and we can remove the timeout from the PQ
// @ts-expect-error priorityQueue is initiated in init
this.#engine.priorityQueue.remove({
hash: priorityQueueHash,
type: "timeout",
});

this.#engine.writeLog(
InstanceEvent.ATTEMPT_SUCCESS,
cacheKey,
Expand All @@ -465,13 +664,26 @@ export class Context extends RpcTarget {
);
} catch (e) {
const error = e as Error;
// if we reach here, means that the clouse ran but errored out and we can remove the timeout from the PQ
// if we reach here, means that the closure ran but errored out and we can remove the timeout from the PQ
// @ts-expect-error priorityQueue is initiated in init
this.#engine.priorityQueue.remove({
hash: `${cacheKey}-${stepState.attemptedCount}`,
type: "timeout",
});

// Clean up any partial stream output from this failed attempt
if (streamResultSeen) {
try {
await rollbackStreamOutput(
this.#state.storage,
cacheKey,
stepState.attemptedCount
);
} catch {
// Best-effort cleanup
}
}

if (
e instanceof Error &&
(error.name === "NonRetryableError" ||
Expand Down Expand Up @@ -574,6 +786,16 @@ export class Context extends RpcTarget {
return doWrapper(doWrapperClosure);
} else {
await this.#engine.timeoutHandler.release(this.#engine);
// Clean up any leftover stream chunks on retry exhaustion
try {
await rollbackStreamOutput(
this.#state.storage,
cacheKey,
stepState.attemptedCount
);
} catch {
// Best-effort cleanup
}
this.#engine.writeLog(
InstanceEvent.STEP_FAILURE,
cacheKey,
Expand All @@ -592,7 +814,10 @@ export class Context extends RpcTarget {
stepNameWithCounter,
{
// TODO (WOR-86): Add limits, figure out serialization
result,
result: lastStreamMeta ? undefined : result,
...(lastStreamMeta && {
streamOutput: { cacheKey, meta: lastStreamMeta },
}),
}
);
await this.#engine.timeoutHandler.release(this.#engine);
Expand Down
Loading
Loading