From 7b2edf6316c136a7956a90856e58d84ce7f5f227 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 5 Feb 2026 18:29:20 -0800 Subject: [PATCH] Make serialization functions async with Encryptor interface --- .changeset/async-serialization.md | 11 + packages/cli/src/lib/inspect/output.ts | 30 +- packages/core/src/observability.test.ts | 14 +- packages/core/src/observability.ts | 73 +- packages/core/src/private.ts | 5 + packages/core/src/runtime.ts | 22 +- packages/core/src/runtime/resume-hook.ts | 74 +- packages/core/src/runtime/run.ts | 8 +- packages/core/src/runtime/runs.ts | 6 +- packages/core/src/runtime/start.ts | 5 +- packages/core/src/runtime/step-handler.ts | 14 +- .../core/src/runtime/suspension-handler.ts | 44 +- packages/core/src/serialization.test.ts | 448 ++++++++---- packages/core/src/serialization.ts | 47 +- packages/core/src/step.test.ts | 8 +- packages/core/src/step.ts | 20 +- packages/core/src/workflow.test.ts | 641 ++++++++++++------ packages/core/src/workflow.ts | 19 +- packages/core/src/workflow/hook.test.ts | 30 +- packages/core/src/workflow/hook.ts | 34 +- .../web/src/server/workflow-server-actions.ts | 28 +- packages/world-testing/src/addition.mts | 2 +- packages/world-testing/src/errors.mts | 2 +- packages/world-testing/src/hooks.mts | 2 +- packages/world-testing/src/idempotency.mts | 2 +- packages/world-testing/src/null-byte.mts | 2 +- packages/world/src/interfaces.ts | 55 +- .../pages/api/trigger-pages.ts | 25 +- .../nextjs-webpack/pages/api/trigger-pages.ts | 25 +- 29 files changed, 1138 insertions(+), 558 deletions(-) create mode 100644 .changeset/async-serialization.md diff --git a/.changeset/async-serialization.md b/.changeset/async-serialization.md new file mode 100644 index 0000000000..13f4f4b0ae --- /dev/null +++ b/.changeset/async-serialization.md @@ -0,0 +1,11 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/cli": patch +"@workflow/web": patch +"@workflow/world-testing": patch +--- + +Make serialization functions async with Encryptor interface + +All 8 dehydrate/hydrate functions in the serialization layer are now async and accept an `Encryptor` parameter for future encryption support. Adds `Encryptor`, `EncryptionContext`, and `KeyMaterial` interfaces to `@workflow/world`. This is a no-op refactor — the encryptor parameter is unused in this change. diff --git a/packages/cli/src/lib/inspect/output.ts b/packages/cli/src/lib/inspect/output.ts index 947cc48704..176a3d1480 100644 --- a/packages/cli/src/lib/inspect/output.ts +++ b/packages/cli/src/lib/inspect/output.ts @@ -534,7 +534,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => { }, resolveData, }); - const runsWithHydratedIO = runs.data.map(hydrateResourceIO); + const runsWithHydratedIO = await Promise.all( + runs.data.map(async (run) => hydrateResourceIO(run, world)) + ); showJson({ ...runs, data: runsWithHydratedIO }); return; } catch (error) { @@ -572,7 +574,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => { } }, displayPage: async (runs) => { - const runsWithHydratedIO = runs.map(hydrateResourceIO); + const runsWithHydratedIO = await Promise.all( + runs.map(async (run) => hydrateResourceIO(run, world)) + ); logger.log(showTable(runsWithHydratedIO, props, opts)); }, }); @@ -588,7 +592,9 @@ export const getRecentRun = async ( pagination: { limit: 1, sortOrder: opts.sort || 'desc' }, resolveData: 'none', // Don't need data for just getting the ID }); - runs.data = runs.data.map(hydrateResourceIO); + runs.data = await Promise.all( + runs.data.map(async (run) => hydrateResourceIO(run, world)) + ); return runs.data[0]; } catch (error) { if (handleApiError(error, opts.backend)) { @@ -608,7 +614,7 @@ export const showRun = async ( } try { const run = await world.runs.get(runId, { resolveData: 'all' }); - const runWithHydratedIO = hydrateResourceIO(run); + const runWithHydratedIO = await hydrateResourceIO(run, world); if (opts.json) { showJson(runWithHydratedIO); return; @@ -711,7 +717,9 @@ export const listSteps = async ( } }, displayPage: async (steps) => { - const stepsWithHydratedIO = steps.map(hydrateResourceIO); + const stepsWithHydratedIO = await Promise.all( + steps.map(async (step) => hydrateResourceIO(step, world)) + ); logger.log(showTable(stepsWithHydratedIO, props, opts)); showInspectInfoBox('step'); }, @@ -735,7 +743,7 @@ export const showStep = async ( const step = await world.steps.get(opts.runId, stepId, { resolveData: 'all', }); - const stepWithHydratedIO = hydrateResourceIO(step); + const stepWithHydratedIO = await hydrateResourceIO(step, world); if (opts.json) { showJson(stepWithHydratedIO); return; @@ -950,7 +958,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => { }, resolveData, }); - const hydratedHooks = hooks.data.map(hydrateResourceIO); + const hydratedHooks = await Promise.all( + hooks.data.map(async (hook) => hydrateResourceIO(hook, world)) + ); showJson({ ...hooks, data: hydratedHooks }); return; } catch (error) { @@ -994,7 +1004,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => { } }, displayPage: async (hooks) => { - const hydratedHooks = hooks.map(hydrateResourceIO); + const hydratedHooks = await Promise.all( + hooks.map(async (hook) => hydrateResourceIO(hook, world)) + ); logger.log(showTable(hydratedHooks, HOOK_LISTED_PROPS, opts)); showInspectInfoBox('hook'); }, @@ -1013,7 +1025,7 @@ export const showHook = async ( const hook = await world.hooks.get(hookId, { resolveData: 'all', }); - const hydratedHook = hydrateResourceIO(hook); + const hydratedHook = await hydrateResourceIO(hook, world); if (opts.json) { showJson(hydratedHook); return; diff --git a/packages/core/src/observability.test.ts b/packages/core/src/observability.test.ts index a2d4cdc513..8890486d28 100644 --- a/packages/core/src/observability.test.ts +++ b/packages/core/src/observability.test.ts @@ -1,5 +1,6 @@ import { inspect } from 'node:util'; import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; +import type { Encryptor } from '@workflow/world'; import { describe, expect, it } from 'vitest'; import { registerSerializationClass } from './class-serialization.js'; import { @@ -15,6 +16,8 @@ import { } from './observability.js'; import { dehydrateStepReturnValue } from './serialization.js'; +const mockEncryptor: Encryptor = {}; + describe('ClassInstanceRef', () => { describe('constructor and properties', () => { it('should create instance with correct properties', () => { @@ -332,10 +335,15 @@ describe('hydrateResourceIO with custom class instances', () => { (TestPoint as any).classId = 'test//TestPoint'; registerSerializationClass('test//TestPoint', TestPoint); - it('should convert Instance type to ClassInstanceRef in step output', () => { + it('should convert Instance type to ClassInstanceRef in step output', async () => { // Simulate serialized step data with a custom class instance const point = new TestPoint(3, 4); - const serialized = dehydrateStepReturnValue(point, [], 'wrun_test'); + const serialized = await dehydrateStepReturnValue( + point, + 'wrun_test', + {}, + [] + ); // Create a step resource with serialized output const step = { @@ -346,7 +354,7 @@ describe('hydrateResourceIO with custom class instances', () => { // Hydrate the step - this should convert Instance to ClassInstanceRef // because the class is not registered in the o11y context (streamPrintRevivers) - const hydrated = hydrateResourceIO(step); + const hydrated = await hydrateResourceIO(step, mockEncryptor); // The output should be a ClassInstanceRef expect(isClassInstanceRef(hydrated.output)).toBe(true); diff --git a/packages/core/src/observability.ts b/packages/core/src/observability.ts index 2c26247026..ceafbfba0a 100644 --- a/packages/core/src/observability.ts +++ b/packages/core/src/observability.ts @@ -5,6 +5,7 @@ import { inspect } from 'node:util'; import { parseClassName } from '@workflow/utils/parse-name'; +import type { Encryptor } from '@workflow/world'; import { unflatten } from 'devalue'; import { runtimeLogger } from './logger.js'; import { @@ -254,20 +255,22 @@ const hydrateLegacyData = (data: any[]): unknown => { return unflatten(data, getObservabilityRevivers()); }; -const hydrateStepIO = < +const hydrateStepIO = async < T extends { stepId?: string; input?: any; output?: any; runId?: string }, >( - step: T -): T => { + step: T, + encryptor: Encryptor +): Promise => { let hydratedInput = step.input; let hydratedOutput = step.output; // Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats if (isBinaryFormat(step.input) && step.input.byteLength > 0) { - hydratedInput = hydrateStepArguments( + hydratedInput = await hydrateStepArguments( step.input, - [], step.runId as string, + encryptor, + [], globalThis, streamPrintRevivers ); @@ -277,8 +280,10 @@ const hydrateStepIO = < // Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats if (isBinaryFormat(step.output)) { - hydratedOutput = hydrateStepReturnValue( + hydratedOutput = await hydrateStepReturnValue( step.output, + step.runId as string, + encryptor, globalThis, streamPrintRevivers ); @@ -293,18 +298,21 @@ const hydrateStepIO = < }; }; -const hydrateWorkflowIO = < +const hydrateWorkflowIO = async < T extends { runId?: string; input?: any; output?: any }, >( - workflow: T -): T => { + workflow: T, + encryptor: Encryptor +): Promise => { let hydratedInput = workflow.input; let hydratedOutput = workflow.output; // Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats if (isBinaryFormat(workflow.input) && workflow.input.byteLength > 0) { - hydratedInput = hydrateWorkflowArguments( + hydratedInput = await hydrateWorkflowArguments( workflow.input, + workflow.runId as string, + encryptor, globalThis, streamPrintRevivers ); @@ -314,10 +322,11 @@ const hydrateWorkflowIO = < // Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats if (isBinaryFormat(workflow.output)) { - hydratedOutput = hydrateWorkflowReturnValue( + hydratedOutput = await hydrateWorkflowReturnValue( workflow.output, - [], workflow.runId as string, + encryptor, + [], globalThis, streamPrintRevivers ); @@ -332,11 +341,12 @@ const hydrateWorkflowIO = < }; }; -const hydrateEventData = < +const hydrateEventData = async < T extends { eventId?: string; eventData?: any; runId?: string }, >( - event: T -): T => { + event: T, + encryptor: Encryptor +): Promise => { if (!event.eventData) { return event; } @@ -348,8 +358,10 @@ const hydrateEventData = < if ('result' in eventData && typeof eventData.result === 'object') { // Handle both binary (specVersion 2) and legacy (specVersion 1) formats if (isBinaryFormat(eventData.result)) { - eventData.result = hydrateStepReturnValue( + eventData.result = await hydrateStepReturnValue( eventData.result, + event.runId as string, + encryptor, globalThis, streamPrintRevivers ); @@ -369,18 +381,22 @@ const hydrateEventData = < }; }; -const hydrateHookMetadata = ( - hook: T -): T => { +const hydrateHookMetadata = async < + T extends { hookId?: string; metadata?: any }, +>( + hook: T, + encryptor: Encryptor +): Promise => { let hydratedMetadata = hook.metadata; if (hook.metadata && 'runId' in hook) { // Handle both binary (specVersion 2) and legacy (specVersion 1) formats if (isBinaryFormat(hook.metadata)) { - hydratedMetadata = hydrateStepArguments( + hydratedMetadata = await hydrateStepArguments( hook.metadata, - [], hook.runId as string, + encryptor, + [], globalThis, streamPrintRevivers ); @@ -395,7 +411,7 @@ const hydrateHookMetadata = ( }; }; -export const hydrateResourceIO = < +export const hydrateResourceIO = async < T extends { stepId?: string; hookId?: string; @@ -407,20 +423,21 @@ export const hydrateResourceIO = < executionContext?: any; }, >( - resource: T -): T => { + resource: T, + encryptor: Encryptor +): Promise => { if (!resource) { return resource; } let hydrated: T; if ('stepId' in resource) { - hydrated = hydrateStepIO(resource); + hydrated = await hydrateStepIO(resource, encryptor); } else if ('hookId' in resource) { - hydrated = hydrateHookMetadata(resource); + hydrated = await hydrateHookMetadata(resource, encryptor); } else if ('eventId' in resource) { - hydrated = hydrateEventData(resource); + hydrated = await hydrateEventData(resource, encryptor); } else { - hydrated = hydrateWorkflowIO(resource); + hydrated = await hydrateWorkflowIO(resource, encryptor); } if ('executionContext' in hydrated) { const { executionContext, ...rest } = hydrated; diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index ee9738aa5b..0f19a6a89a 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -2,6 +2,7 @@ * Utils used by the bundler when transforming code */ +import type { Encryptor } from '@workflow/world'; import type { EventsConsumer } from './events-consumer.js'; import type { QueueItem } from './global.js'; import type { Serializable } from './schemas.js'; @@ -49,4 +50,8 @@ export interface WorkflowOrchestratorContext { onWorkflowError: (error: Error) => void; generateUlid: () => string; generateNanoid: () => string; + /** The workflow run ID */ + runId: string; + /** Encryptor for serialization (optional encryption support) */ + encryptor: Encryptor; } diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 99e63d0951..d979c0f0f7 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -50,11 +50,11 @@ export { export { cancelRun, listStreams, + type ReadStreamOptions, + type RecreateRunOptions, readStream, recreateRunFromExisting, reenqueueRun, - type ReadStreamOptions, - type RecreateRunOptions, type StopSleepOptions, type StopSleepResult, wakeUpRun, @@ -219,19 +219,11 @@ export function workflowEntrypoint( events.push(result.event!); } - const result = await trace( - 'workflow.replay', - {}, - async (replaySpan) => { - replaySpan?.setAttributes({ - ...Attribute.WorkflowEventsCount(events.length), - }); - return await runWorkflow( - workflowCode, - workflowRun, - events - ); - } + const result = await runWorkflow( + workflowCode, + workflowRun, + events, + world ); // Complete the workflow run via event (event-sourced architecture) diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0a28de38a3..4e23e06c3a 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -1,6 +1,7 @@ import { waitUntil } from '@vercel/functions'; import { ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; import { + type Encryptor, type Hook, isLegacySpecVersion, SPEC_VERSION_CURRENT, @@ -17,6 +18,21 @@ import { waitedUntil } from '../util.js'; import { getWorkflowQueueName } from './helpers.js'; import { getWorld } from './world.js'; +/** + * Resolve the appropriate Encryptor for a given workflow run. + * + * Uses `world.getEncryptorForRun()` when available (needed for cross-deployment + * scenarios like `resumeHook()` called from a different deployment). Falls back + * to the world itself (which works for same-deployment operations). + */ +async function resolveEncryptorForRun(runId: string): Promise { + const world = getWorld(); + if (world.getEncryptorForRun) { + return world.getEncryptorForRun(runId); + } + return world; +} + /** * Get the hook by token to find the associated workflow run, * and hydrate the `metadata` property if it was set from within @@ -25,12 +41,33 @@ import { getWorld } from './world.js'; * @param token - The unique token identifying the hook */ export async function getHookByToken(token: string): Promise { + const { hook } = await getHookByTokenWithEncryptor(token); + return hook; +} + +/** + * Internal: Get the hook by token and also return the resolved encryptor, + * so callers like `resumeHook()` can reuse it for payload encryption + * without a redundant key resolution. + */ +async function getHookByTokenWithEncryptor( + token: string +): Promise<{ hook: Hook; encryptor: Encryptor }> { const world = getWorld(); const hook = await world.hooks.getByToken(token); + + // Resolve the encryptor for the target run — metadata was encrypted + // by the workflow run's deployment, which may differ from the current one + const encryptor = await resolveEncryptorForRun(hook.runId); + if (typeof hook.metadata !== 'undefined') { - hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId); + hook.metadata = await hydrateStepArguments( + hook.metadata as any, + hook.runId, + encryptor + ); } - return hook; + return { hook, encryptor }; } /** @@ -64,17 +101,27 @@ export async function getHookByToken(token: string): Promise { */ export async function resumeHook( tokenOrHook: string | Hook, - payload: T + payload: T, + /** @internal Pre-resolved encryptor to avoid redundant key resolution */ + _encryptor?: Encryptor ): Promise { return await waitedUntil(() => { return trace('hook.resume', async (span) => { const world = getWorld(); try { - const hook = - typeof tokenOrHook === 'string' - ? await getHookByToken(tokenOrHook) - : tokenOrHook; + let hook: Hook; + let encryptor: Encryptor; + if (typeof tokenOrHook === 'string') { + // Resolve hook + encryptor together — single key resolution + // covers both metadata decryption and payload encryption + const result = await getHookByTokenWithEncryptor(tokenOrHook); + hook = result.hook; + encryptor = result.encryptor; + } else { + hook = tokenOrHook; + encryptor = _encryptor ?? (await resolveEncryptorForRun(hook.runId)); + } span?.setAttributes({ ...Attribute.HookToken(hook.token), @@ -82,13 +129,14 @@ export async function resumeHook( ...Attribute.WorkflowRunId(hook.runId), }); - // Dehydrate the payload for storage + // Dehydrate the payload for storage (reusing the same encryptor) const ops: Promise[] = []; const v1Compat = isLegacySpecVersion(hook.specVersion); - const dehydratedPayload = dehydrateStepReturnValue( + const dehydratedPayload = await dehydrateStepReturnValue( payload, - ops, hook.runId, + encryptor, + ops, globalThis, v1Compat ); @@ -196,7 +244,9 @@ export async function resumeWebhook( token: string, request: Request ): Promise { - const hook = await getHookByToken(token); + // Resolve hook + encryptor together — single key resolution covers + // metadata decryption and payload encryption in resumeHook below + const { hook, encryptor } = await getHookByTokenWithEncryptor(token); let response: Response | undefined; let responseReadable: ReadableStream | undefined; @@ -225,7 +275,7 @@ export async function resumeWebhook( response = new Response(null, { status: 202 }); } - await resumeHook(hook, request); + await resumeHook(hook, request, encryptor); if (responseReadable) { // Wait for the readable stream to emit one chunk, diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 298e5850e0..f72d904192 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -5,8 +5,8 @@ import { } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, - type World, type WorkflowRunStatus, + type World, } from '@workflow/world'; import { getExternalRevivers, @@ -153,7 +153,11 @@ export class Run { const run = await this.world.runs.get(this.runId); if (run.status === 'completed') { - return hydrateWorkflowReturnValue(run.output, [], this.runId); + return await hydrateWorkflowReturnValue( + run.output, + this.runId, + this.world + ); } if (run.status === 'cancelled') { diff --git a/packages/core/src/runtime/runs.ts b/packages/core/src/runtime/runs.ts index 63b0568c3b..9e528c7e6f 100644 --- a/packages/core/src/runtime/runs.ts +++ b/packages/core/src/runtime/runs.ts @@ -1,10 +1,10 @@ -import { hydrateWorkflowArguments } from '../serialization.js'; import { type Event, isLegacySpecVersion, SPEC_VERSION_LEGACY, type World, } from '@workflow/world'; +import { hydrateWorkflowArguments } from '../serialization.js'; import { getWorkflowQueueName } from './helpers.js'; import { start } from './start.js'; @@ -48,8 +48,10 @@ export async function recreateRunFromExisting( ): Promise { try { const run = await world.runs.get(runId, { resolveData: 'all' }); + // TODO: Use world.getEncryptorForRun(runId) here once encryption is wired in, + // since the run may belong to a different deployment const workflowArgs = normalizeWorkflowArgs( - hydrateWorkflowArguments(run.input, globalThis) + await hydrateWorkflowArguments(run.input, runId, {}, globalThis) ); const specVersion = options.specVersion ?? run.specVersion ?? SPEC_VERSION_LEGACY; diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index b536145fc3..7db947813c 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -119,10 +119,11 @@ export async function start( // Create run via run_created event (event-sourced architecture) // Pass client-generated runId - server will accept and use it - const workflowArguments = dehydrateWorkflowArguments( + const workflowArguments = await dehydrateWorkflowArguments( args, - ops, runId, + world, + ops, globalThis, v1Compat ); diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 65daaa33c1..1bd42aa8df 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -285,10 +285,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( {}, async (hydrateSpan) => { const startTime = Date.now(); - const result = hydrateStepArguments( + const result = await hydrateStepArguments( step.input, - ops, - workflowRunId + workflowRunId, + world, + ops ); const durationMs = Date.now() - startTime; hydrateSpan?.setAttributes({ @@ -341,10 +342,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( {}, async (dehydrateSpan) => { const startTime = Date.now(); - const dehydrated = dehydrateStepReturnValue( + const dehydrated = await dehydrateStepReturnValue( result, - ops, - workflowRunId + workflowRunId, + world, + ops ); const durationMs = Date.now() - startTime; dehydrateSpan?.setAttributes({ diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index 5f9f4b7852..6f4177f426 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -78,24 +78,28 @@ export async function handleSuspension({ ); // Build hook_created events (World will atomically create hook entities) - const hookEvents: CreateEventRequest[] = hookItems.map((queueItem) => { - const hookMetadata: SerializedData | undefined = - typeof queueItem.metadata === 'undefined' - ? undefined - : (dehydrateStepArguments( - queueItem.metadata, - suspension.globalThis - ) as SerializedData); - return { - eventType: 'hook_created' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - token: queueItem.token, - metadata: hookMetadata, - }, - }; - }); + const hookEvents: CreateEventRequest[] = await Promise.all( + hookItems.map(async (queueItem) => { + const hookMetadata: SerializedData | undefined = + typeof queueItem.metadata === 'undefined' + ? undefined + : ((await dehydrateStepArguments( + queueItem.metadata, + runId, + world, + suspension.globalThis + )) as SerializedData); + return { + eventType: 'hook_created' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + token: queueItem.token, + metadata: hookMetadata, + }, + }; + }) + ); // Process hooks first to prevent race conditions with webhook receivers // All hook creations run in parallel @@ -153,12 +157,14 @@ export async function handleSuspension({ (async () => { // Create step event if not already created if (stepsNeedingCreation.has(queueItem.correlationId)) { - const dehydratedInput = dehydrateStepArguments( + const dehydratedInput = await dehydrateStepArguments( { args: queueItem.args, closureVars: queueItem.closureVars, thisVal: queueItem.thisVal, }, + runId, + world, suspension.globalThis ); const stepEvent: CreateEventRequest = { diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index 13db84908f..05d06915db 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -5,24 +5,154 @@ import { describe, expect, it } from 'vitest'; import { registerSerializationClass } from './class-serialization.js'; import { getStepFunction, registerStepFunction } from './private.js'; import { + dehydrateStepArguments as _dehydrateStepArguments, + dehydrateStepReturnValue as _dehydrateStepReturnValue, + dehydrateWorkflowArguments as _dehydrateWorkflowArguments, + dehydrateWorkflowReturnValue as _dehydrateWorkflowReturnValue, + hydrateStepArguments as _hydrateStepArguments, + hydrateStepReturnValue as _hydrateStepReturnValue, + hydrateWorkflowArguments as _hydrateWorkflowArguments, + hydrateWorkflowReturnValue as _hydrateWorkflowReturnValue, decodeFormatPrefix, - dehydrateStepArguments, - dehydrateStepReturnValue, - dehydrateWorkflowArguments, - dehydrateWorkflowReturnValue, getCommonRevivers, getStreamType, getWorkflowReducers, - hydrateStepArguments, - hydrateStepReturnValue, - hydrateWorkflowArguments, - hydrateWorkflowReturnValue, SerializationFormat, } from './serialization.js'; import { STABLE_ULID, STREAM_NAME_SYMBOL } from './symbols.js'; import { createContext } from './vm/index.js'; +// ============================================================================= +// Test wrapper functions that preserve backwards-compatible signatures +// These wrap the new async functions for use in existing tests +// ============================================================================= + const mockRunId = 'wrun_mockidnumber0001'; +const mockEncryptor = {}; + +async function dehydrateWorkflowArguments( + value: unknown, + ops: Promise[] = [], + runId: string = mockRunId, + global: Record = globalThis, + v1Compat = false +): Promise { + return _dehydrateWorkflowArguments( + value, + runId, + mockEncryptor, + ops, + global, + v1Compat + ); +} + +async function hydrateWorkflowArguments( + value: Uint8Array | unknown, + global: Record = globalThis, + extraRevivers: Record any> = {} +): Promise { + return _hydrateWorkflowArguments( + value, + mockRunId, + mockEncryptor, + global, + extraRevivers + ); +} + +async function dehydrateWorkflowReturnValue( + value: unknown, + global: Record = globalThis, + v1Compat = false +): Promise { + return _dehydrateWorkflowReturnValue( + value, + mockRunId, + mockEncryptor, + global, + v1Compat + ); +} + +async function hydrateWorkflowReturnValue( + value: Uint8Array | unknown, + ops: Promise[] = [], + runId: string = mockRunId, + global: Record = globalThis, + extraRevivers: Record any> = {} +): Promise { + return _hydrateWorkflowReturnValue( + value, + runId, + mockEncryptor, + ops, + global, + extraRevivers + ); +} + +async function dehydrateStepArguments( + value: unknown, + global: Record = globalThis, + v1Compat = false +): Promise { + return _dehydrateStepArguments( + value, + mockRunId, + mockEncryptor, + global, + v1Compat + ); +} + +async function hydrateStepArguments( + value: Uint8Array | unknown, + ops: Promise[] = [], + runId: string = mockRunId, + global: Record = globalThis, + extraRevivers: Record any> = {} +): Promise { + return _hydrateStepArguments( + value, + runId, + mockEncryptor, + ops, + global, + extraRevivers + ); +} + +async function dehydrateStepReturnValue( + value: unknown, + ops: Promise[] = [], + runId: string = mockRunId, + global: Record = globalThis, + v1Compat = false +): Promise { + return _dehydrateStepReturnValue( + value, + runId, + mockEncryptor, + ops, + global, + v1Compat + ); +} + +async function hydrateStepReturnValue( + value: Uint8Array | unknown, + global: Record = globalThis, + extraRevivers: Record any> = {} +): Promise { + return _hydrateStepReturnValue( + value, + mockRunId, + mockEncryptor, + global, + extraRevivers + ); +} describe('getStreamType', () => { it('should return `undefined` for a regular stream', () => { @@ -48,9 +178,9 @@ describe('workflow arguments', () => { fixedTimestamp: 1714857600000, }); - it('should work with Date', () => { + it('should work with Date', async () => { const date = new Date('2025-07-17T04:30:34.824Z'); - const serialized = dehydrateWorkflowArguments(date, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(date, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -99,16 +229,16 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Date', context)).toBe(true); expect(hydrated.getTime()).toEqual(date.getTime()); }); - it('should work with invalid Date', () => { + it('should work with invalid Date', async () => { const date = new Date('asdf'); - const serialized = dehydrateWorkflowArguments(date, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(date, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -134,16 +264,16 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Date', context)).toBe(true); expect(hydrated.getTime()).toEqual(NaN); }); - it('should work with BigInt', () => { + it('should work with BigInt', async () => { const bigInt = BigInt('9007199254740992'); - const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(bigInt, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -186,14 +316,14 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toBe(BigInt(9007199254740992)); expect(typeof hydrated).toBe('bigint'); }); - it('should work with BigInt negative', () => { + it('should work with BigInt negative', async () => { const bigInt = BigInt('-12345678901234567890'); - const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(bigInt, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -241,17 +371,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toBe(BigInt('-12345678901234567890')); expect(typeof hydrated).toBe('bigint'); }); - it('should work with Map', () => { + it('should work with Map', async () => { const map = new Map([ [2, 'foo'], [6, 'bar'], ]); - const serialized = dehydrateWorkflowArguments(map, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(map, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -306,15 +436,15 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Map', context)).toBe(true); }); - it('should work with Set', () => { + it('should work with Set', async () => { const set = new Set([1, '2', true]); - const serialized = dehydrateWorkflowArguments(set, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(set, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -354,22 +484,22 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Set', context)).toBe(true); }); - it('should work with WritableStream', () => { + it('should work with WritableStream', async () => { const stream = new WritableStream(); - const serialized = dehydrateWorkflowArguments(stream, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(stream, [], mockRunId); expect(serialized instanceof Uint8Array).toBe(true); // Verify the serialized data contains WritableStream reference const serializedStr = new TextDecoder().decode(serialized); expect(serializedStr).toContain('WritableStream'); class OurWritableStream {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { WritableStream: OurWritableStream, }); expect(hydrated).toBeInstanceOf(OurWritableStream); @@ -377,16 +507,16 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - it('should work with ReadableStream', () => { + it('should work with ReadableStream', async () => { const stream = new ReadableStream(); - const serialized = dehydrateWorkflowArguments(stream, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(stream, [], mockRunId); expect(serialized instanceof Uint8Array).toBe(true); // Verify the serialized data contains ReadableStream reference const serializedStr = new TextDecoder().decode(serialized); expect(serializedStr).toContain('ReadableStream'); class OurReadableStream {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { ReadableStream: OurReadableStream, }); expect(hydrated).toBeInstanceOf(OurReadableStream); @@ -394,12 +524,12 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - it('should work with Headers', () => { + it('should work with Headers', async () => { const headers = new Headers(); headers.set('foo', 'bar'); headers.append('set-cookie', 'a'); headers.append('set-cookie', 'b'); - const serialized = dehydrateWorkflowArguments(headers, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(headers, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -483,13 +613,13 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toBeInstanceOf(Headers); expect(hydrated.get('foo')).toEqual('bar'); expect(hydrated.get('set-cookie')).toEqual('a, b'); }); - it('should work with Response', () => { + it('should work with Response', async () => { const response = new Response('Hello, world!', { status: 202, statusText: 'Custom', @@ -499,7 +629,11 @@ describe('workflow arguments', () => { ['set-cookie', 'b'], ]), }); - const serialized = dehydrateWorkflowArguments(response, [], mockRunId); + const serialized = await dehydrateWorkflowArguments( + response, + [], + mockRunId + ); expect(serialized instanceof Uint8Array).toBe(true); // Verify the serialized data contains Response reference const serializedStr = new TextDecoder().decode(serialized); @@ -516,7 +650,7 @@ describe('workflow arguments', () => { } class OurReadableStream {} class OurHeaders {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { Headers: OurHeaders, Response: OurResponse, ReadableStream: OurReadableStream, @@ -529,10 +663,10 @@ describe('workflow arguments', () => { expect(bodyStreamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - it('should work with URLSearchParams', () => { + it('should work with URLSearchParams', async () => { const params = new URLSearchParams('a=1&b=2&a=3'); - const serialized = dehydrateWorkflowArguments(params, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(params, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -579,7 +713,7 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof URLSearchParams', context)).toBe(true); expect(hydrated.getAll('a')).toEqual(['1', '3']); @@ -592,10 +726,10 @@ describe('workflow arguments', () => { ]); }); - it('should work with empty URLSearchParams', () => { + it('should work with empty URLSearchParams', async () => { const params = new URLSearchParams(); - const serialized = dehydrateWorkflowArguments(params, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(params, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -632,17 +766,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof URLSearchParams', context)).toBe(true); expect(hydrated.toString()).toEqual(''); expect(Array.from(hydrated.entries())).toEqual([]); }); - it('should work with empty ArrayBuffer', () => { + it('should work with empty ArrayBuffer', async () => { const buffer = new ArrayBuffer(0); - const serialized = dehydrateWorkflowArguments(buffer, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(buffer, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -675,16 +809,16 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof ArrayBuffer', context)).toBe(true); expect(hydrated.byteLength).toEqual(0); }); - it('should work with empty Uint8Array', () => { + it('should work with empty Uint8Array', async () => { const array = new Uint8Array(0); - const serialized = dehydrateWorkflowArguments(array, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -716,17 +850,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Uint8Array', context)).toBe(true); expect(hydrated.length).toEqual(0); expect(hydrated.byteLength).toEqual(0); }); - it('should work with empty Int32Array', () => { + it('should work with empty Int32Array', async () => { const array = new Int32Array(0); - const serialized = dehydrateWorkflowArguments(array, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -758,17 +892,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Int32Array', context)).toBe(true); expect(hydrated.length).toEqual(0); expect(hydrated.byteLength).toEqual(0); }); - it('should work with empty Float64Array', () => { + it('should work with empty Float64Array', async () => { const array = new Float64Array(0); - const serialized = dehydrateWorkflowArguments(array, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -802,14 +936,14 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Float64Array', context)).toBe(true); expect(hydrated.length).toEqual(0); expect(hydrated.byteLength).toEqual(0); }); - it('should work with Request (without responseWritable)', () => { + it('should work with Request (without responseWritable)', async () => { // Mock STABLE_ULID to return a deterministic value const originalStableUlid = (globalThis as any)[STABLE_ULID]; (globalThis as any)[STABLE_ULID] = () => '01ARZ3NDEKTSV4RRFFQ69G5FA1'; @@ -825,7 +959,11 @@ describe('workflow arguments', () => { duplex: 'half', } as RequestInit); - const serialized = dehydrateWorkflowArguments(request, [], mockRunId); + const serialized = await dehydrateWorkflowArguments( + request, + [], + mockRunId + ); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -1134,7 +1272,7 @@ describe('workflow arguments', () => { } class OurReadableStream {} class OurHeaders {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { Request: OurRequest, Headers: OurHeaders, ReadableStream: OurReadableStream, @@ -1150,7 +1288,7 @@ describe('workflow arguments', () => { } }); - it('should work with Request (with responseWritable)', () => { + it('should work with Request (with responseWritable)', async () => { // Mock STABLE_ULID to return deterministic values const originalStableUlid = (globalThis as any)[STABLE_ULID]; let ulidCounter = 0; @@ -1174,7 +1312,11 @@ describe('workflow arguments', () => { const responseWritable = new WritableStream(); request[Symbol.for('WEBHOOK_RESPONSE_WRITABLE')] = responseWritable; - const serialized = dehydrateWorkflowArguments(request, [], mockRunId); + const serialized = await dehydrateWorkflowArguments( + request, + [], + mockRunId + ); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -1550,7 +1692,7 @@ describe('workflow arguments', () => { class OurReadableStream {} class OurWritableStream {} class OurHeaders {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { Request: OurRequest, Headers: OurHeaders, ReadableStream: OurReadableStream, @@ -1577,11 +1719,11 @@ describe('workflow arguments', () => { } }); - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateWorkflowArguments(new Foo(), [], mockRunId); + await dehydrateWorkflowArguments(new Foo(), [], mockRunId); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1593,11 +1735,11 @@ describe('workflow arguments', () => { }); describe('workflow return value', () => { - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateWorkflowReturnValue(new Foo()); + await dehydrateWorkflowReturnValue(new Foo()); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1609,11 +1751,11 @@ describe('workflow return value', () => { }); describe('step arguments', () => { - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateStepArguments(new Foo(), globalThis); + await dehydrateStepArguments(new Foo(), globalThis); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1625,11 +1767,11 @@ describe('step arguments', () => { }); describe('step return value', () => { - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateStepReturnValue(new Foo(), [], mockRunId); + await dehydrateStepReturnValue(new Foo(), [], mockRunId); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1687,7 +1829,7 @@ describe('step function serialization', () => { expect(retrieved).toBeUndefined(); }); - it('should deserialize step function name through reviver', () => { + it('should deserialize step function name through reviver', async () => { const stepName = 'step//test//testStep'; const stepFn = async () => 42; @@ -1704,11 +1846,11 @@ describe('step function serialization', () => { }); // Serialize using workflow reducers (which handle StepFunction) - const dehydrated = dehydrateStepArguments([fnWithStepId], globalThis); + const dehydrated = await dehydrateStepArguments([fnWithStepId], globalThis); // Hydrate it back using step revivers const ops: Promise[] = []; - const hydrated = hydrateStepArguments( + const hydrated = await hydrateStepArguments( dehydrated, ops, mockRunId, @@ -1719,7 +1861,7 @@ describe('step function serialization', () => { expect(hydrated[0]).toBe(stepFn); }); - it('should throw error when reviver cannot find registered step function', () => { + it('should throw error when reviver cannot find registered step function', async () => { // Create a function with a non-existent stepId const fnWithNonExistentStepId = async () => 42; Object.defineProperty(fnWithNonExistentStepId, 'stepId', { @@ -1730,26 +1872,18 @@ describe('step function serialization', () => { }); // Serialize the step function reference - const dehydrated = dehydrateStepArguments( + const dehydrated = await dehydrateStepArguments( [fnWithNonExistentStepId], globalThis ); // Hydrating should throw an error - const ops: Promise[] = []; - let err: Error | undefined; - try { - hydrateStepArguments(dehydrated, ops, mockRunId, globalThis); - } catch (err_) { - err = err_ as Error; - } - - expect(err).toBeDefined(); - expect(err?.message).toContain('Step function "nonExistentStep" not found'); - expect(err?.message).toContain('Make sure the step function is registered'); + await expect( + hydrateStepArguments(dehydrated, [], mockRunId, globalThis) + ).rejects.toThrow('Step function "nonExistentStep" not found'); }); - it('should dehydrate step function passed as argument to a step', () => { + it('should dehydrate step function passed as argument to a step', async () => { const stepName = 'step//workflows/test.ts//myStep'; const stepFn = async (x: number) => x * 2; @@ -1769,7 +1903,7 @@ describe('step function serialization', () => { const args = [stepFn, 42]; // This should serialize the step function by its name using the reducer - const dehydrated = dehydrateStepArguments(args, globalThis); + const dehydrated = await dehydrateStepArguments(args, globalThis); // Verify it dehydrated successfully expect(dehydrated).toBeDefined(); @@ -1815,7 +1949,7 @@ describe('step function serialization', () => { // Serialize the step function with closure variables const args = [stepFn, 7]; - const dehydrated = dehydrateStepArguments(args, globalThis); + const dehydrated = await dehydrateStepArguments(args, globalThis); // Verify it serialized expect(dehydrated).toBeDefined(); @@ -1826,7 +1960,7 @@ describe('step function serialization', () => { expect(serialized).toContain('prefix'); // Now hydrate it back - const hydrated = hydrateStepArguments( + const hydrated = await hydrateStepArguments( dehydrated, [], 'test-run-123', @@ -1883,7 +2017,7 @@ describe('step function serialization', () => { expect(result).toEqual({ stepId: stepName }); }); - it('should hydrate step function from workflow arguments using WORKFLOW_USE_STEP', () => { + it('should hydrate step function from workflow arguments using WORKFLOW_USE_STEP', async () => { // This tests the flow: client mode serializes step function with stepId, // workflow mode deserializes it using WORKFLOW_USE_STEP from vmGlobalThis const stepId = 'step//workflows/test.ts//addNumbers'; @@ -1916,7 +2050,7 @@ describe('step function serialization', () => { // Serialize from client side using external reducers const ops: Promise[] = []; - const dehydrated = dehydrateWorkflowArguments( + const dehydrated = await dehydrateWorkflowArguments( [clientStepFn, 3, 5], ops, mockRunId, @@ -1924,7 +2058,7 @@ describe('step function serialization', () => { ); // Hydrate in workflow context using VM's globalThis - const hydrated = hydrateWorkflowArguments(dehydrated, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(dehydrated, vmGlobalThis); // Verify the hydrated result expect(Array.isArray(hydrated)).toBe(true); @@ -1941,11 +2075,11 @@ describe('step function serialization', () => { expect(hydratedStepFn.stepId).toBe(stepId); }); - it('should throw error when WORKFLOW_USE_STEP is not set on globalThis', () => { + it('should throw error when WORKFLOW_USE_STEP is not set on globalThis', async () => { const stepId = 'step//workflows/test.ts//missingUseStep'; // Create a VM context WITHOUT setting up WORKFLOW_USE_STEP - const { context, globalThis: vmGlobalThis } = createContext({ + const { globalThis: vmGlobalThis } = createContext({ seed: 'test', fixedTimestamp: 1714857600000, }); @@ -1961,7 +2095,7 @@ describe('step function serialization', () => { // Serialize from client side const ops: Promise[] = []; - const dehydrated = dehydrateWorkflowArguments( + const dehydrated = await dehydrateWorkflowArguments( [clientStepFn], ops, mockRunId, @@ -1969,9 +2103,9 @@ describe('step function serialization', () => { ); // Hydrating should throw because WORKFLOW_USE_STEP is not set - expect(() => hydrateWorkflowArguments(dehydrated, vmGlobalThis)).toThrow( - 'WORKFLOW_USE_STEP not found on global object' - ); + await expect( + hydrateWorkflowArguments(dehydrated, vmGlobalThis) + ).rejects.toThrow('WORKFLOW_USE_STEP not found on global object'); }); }); @@ -2010,7 +2144,7 @@ describe('custom class serialization', () => { context ); - it('should serialize and deserialize a class with WORKFLOW_SERIALIZE/DESERIALIZE', () => { + it('should serialize and deserialize a class with WORKFLOW_SERIALIZE/DESERIALIZE', async () => { // Define the class in the host context (for serialization) class Point { constructor( @@ -2056,7 +2190,7 @@ describe('custom class serialization', () => { ); const point = new Point(10, 20); - const serialized = dehydrateWorkflowArguments(point, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(point, [], mockRunId); // Verify it serialized with the Instance type expect(serialized).toBeDefined(); @@ -2066,7 +2200,7 @@ describe('custom class serialization', () => { expect(serializedStr).toContain('test/Point'); // Hydrate it back (inside the VM context) - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); // Note: hydrated is an instance of the VM's Point class, not the host's // so we check constructor.name instead of instanceof expect(hydrated.constructor.name).toBe('Point'); @@ -2074,7 +2208,7 @@ describe('custom class serialization', () => { expect(hydrated.y).toBe(20); }); - it('should serialize nested custom serializable objects', () => { + it('should serialize nested custom serializable objects', async () => { // Define the class in the host context (for serialization) class Vector { constructor( @@ -2126,8 +2260,8 @@ describe('custom class serialization', () => { }, }; - const serialized = dehydrateWorkflowArguments(data, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const serialized = await dehydrateWorkflowArguments(data, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated.name).toBe('test'); expect(hydrated.vector.constructor.name).toBe('Vector'); @@ -2138,7 +2272,7 @@ describe('custom class serialization', () => { expect(hydrated.nested.anotherVector.dy).toBe(2); }); - it('should serialize custom class in an array', () => { + it('should serialize custom class in an array', async () => { // Define the class in the host context (for serialization) class Item { constructor(public id: string) {} @@ -2180,8 +2314,8 @@ describe('custom class serialization', () => { const items = [new Item('a'), new Item('b'), new Item('c')]; - const serialized = dehydrateWorkflowArguments(items, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const serialized = await dehydrateWorkflowArguments(items, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(Array.isArray(hydrated)).toBe(true); expect(hydrated).toHaveLength(3); @@ -2193,7 +2327,7 @@ describe('custom class serialization', () => { expect(hydrated[2].id).toBe('c'); }); - it('should work with step arguments', () => { + it('should work with step arguments', async () => { class Config { constructor( public setting: string, @@ -2216,8 +2350,8 @@ describe('custom class serialization', () => { registerSerializationClass('test/Config', Config); const config = new Config('maxRetries', 3); - const serialized = dehydrateStepArguments([config], globalThis); - const hydrated = hydrateStepArguments( + const serialized = await dehydrateStepArguments([config], globalThis); + const hydrated = await hydrateStepArguments( serialized, [], mockRunId, @@ -2230,7 +2364,7 @@ describe('custom class serialization', () => { expect(hydrated[0].value).toBe(3); }); - it('should work with step return values', () => { + it('should work with step return values', async () => { class Result { constructor( public success: boolean, @@ -2253,16 +2387,16 @@ describe('custom class serialization', () => { registerSerializationClass('test/Result', Result); const result = new Result(true, 'completed'); - const serialized = dehydrateStepReturnValue(result, [], mockRunId); + const serialized = await dehydrateStepReturnValue(result, [], mockRunId); // Step return values are hydrated with workflow revivers - const hydrated = hydrateWorkflowArguments(serialized, globalThis); + const hydrated = await hydrateWorkflowArguments(serialized, globalThis); expect(hydrated).toBeInstanceOf(Result); expect(hydrated.success).toBe(true); expect(hydrated.data).toBe('completed'); }); - it('should not serialize classes without WORKFLOW_SERIALIZE', () => { + it('should not serialize classes without WORKFLOW_SERIALIZE', async () => { class PlainClass { constructor(public value: string) {} } @@ -2270,10 +2404,12 @@ describe('custom class serialization', () => { const instance = new PlainClass('test'); // Should throw because PlainClass is not serializable - expect(() => dehydrateWorkflowArguments(instance, [], mockRunId)).toThrow(); + await expect( + dehydrateWorkflowArguments(instance, [], mockRunId) + ).rejects.toThrow(); }); - it('should throw error when classId is missing', () => { + it('should throw error when classId is missing', async () => { // NOTE: Missing `classId` property so serializatoin will fail. class NoClassId { constructor(public value: string) {} @@ -2292,14 +2428,14 @@ describe('custom class serialization', () => { // Should throw with our specific error message about missing classId let errorMessage = ''; try { - dehydrateWorkflowArguments(instance, [], mockRunId); + await dehydrateWorkflowArguments(instance, [], mockRunId); } catch (e: any) { errorMessage = e.cause?.message || e.message; } expect(errorMessage).toMatch(/must have a static "classId" property/); }); - it('should serialize class with complex data types in payload', () => { + it('should serialize class with complex data types in payload', async () => { class ComplexData { constructor( public items: Map, @@ -2331,8 +2467,8 @@ describe('custom class serialization', () => { const date = new Date('2025-01-01T00:00:00.000Z'); const complex = new ComplexData(map, date); - const serialized = dehydrateWorkflowArguments(complex, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, globalThis); + const serialized = await dehydrateWorkflowArguments(complex, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, globalThis); expect(hydrated).toBeInstanceOf(ComplexData); expect(hydrated.items).toBeInstanceOf(Map); @@ -2342,7 +2478,7 @@ describe('custom class serialization', () => { expect(hydrated.created.toISOString()).toBe('2025-01-01T00:00:00.000Z'); }); - it('should pass class as this context to WORKFLOW_SERIALIZE and WORKFLOW_DESERIALIZE', () => { + it('should pass class as this context to WORKFLOW_SERIALIZE and WORKFLOW_DESERIALIZE', async () => { // This test verifies that serialize.call(cls, value) and deserialize.call(cls, data) // properly pass the class as `this` context, which is required when the serializer/deserializer // needs to access static properties or methods on the class @@ -2383,13 +2519,13 @@ describe('custom class serialization', () => { // Serialize an instance - this should increment serializedCount via `this` const counter = new Counter(42); - const serialized = dehydrateWorkflowArguments(counter, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(counter, [], mockRunId); // Verify serialization used `this` correctly expect(Counter.serializedCount).toBe(1); // Deserialize - this should increment deserializedCount via `this` - const hydrated = hydrateWorkflowArguments(serialized, globalThis); + const hydrated = await hydrateWorkflowArguments(serialized, globalThis); // Verify deserialization used `this` correctly expect(Counter.deserializedCount).toBe(1); @@ -2398,7 +2534,7 @@ describe('custom class serialization', () => { // Serialize another instance to verify counter increments const counter2 = new Counter(100); - dehydrateWorkflowArguments(counter2, [], mockRunId); + await dehydrateWorkflowArguments(counter2, [], mockRunId); expect(Counter.serializedCount).toBe(2); }); }); @@ -2409,70 +2545,84 @@ describe('format prefix system', () => { fixedTimestamp: 1714857600000, }); - it('should encode data with format prefix', () => { + it('should encode data with format prefix', async () => { const data = { message: 'hello' }; - const serialized = dehydrateWorkflowArguments(data, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(data, [], mockRunId); // Check that the first 4 bytes are the format prefix "devl" const prefix = new TextDecoder().decode(serialized.subarray(0, 4)); expect(prefix).toBe('devl'); }); - it('should decode prefixed data correctly', () => { + it('should decode prefixed data correctly', async () => { const data = { message: 'hello', count: 42 }; - const serialized = dehydrateWorkflowArguments(data, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const serialized = await dehydrateWorkflowArguments(data, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toEqual({ message: 'hello', count: 42 }); }); - it('should handle all dehydrate/hydrate function pairs with format prefix', () => { + it('should handle all dehydrate/hydrate function pairs with format prefix', async () => { const testData = { test: 'data', nested: { value: 123 } }; // Workflow arguments - const workflowArgs = dehydrateWorkflowArguments(testData, [], mockRunId); + const workflowArgs = await dehydrateWorkflowArguments( + testData, + [], + mockRunId + ); expect(new TextDecoder().decode(workflowArgs.subarray(0, 4))).toBe('devl'); - expect(hydrateWorkflowArguments(workflowArgs, vmGlobalThis)).toEqual( + expect(await hydrateWorkflowArguments(workflowArgs, vmGlobalThis)).toEqual( testData ); // Workflow return value - const workflowReturn = dehydrateWorkflowReturnValue(testData, globalThis); + const workflowReturn = await dehydrateWorkflowReturnValue( + testData, + globalThis + ); expect(new TextDecoder().decode(workflowReturn.subarray(0, 4))).toBe( 'devl' ); expect( - hydrateWorkflowReturnValue(workflowReturn, [], mockRunId, vmGlobalThis) + await hydrateWorkflowReturnValue( + workflowReturn, + [], + mockRunId, + vmGlobalThis + ) ).toEqual(testData); // Step arguments - const stepArgs = dehydrateStepArguments(testData, globalThis); + const stepArgs = await dehydrateStepArguments(testData, globalThis); expect(new TextDecoder().decode(stepArgs.subarray(0, 4))).toBe('devl'); - expect(hydrateStepArguments(stepArgs, [], mockRunId, vmGlobalThis)).toEqual( - testData - ); + expect( + await hydrateStepArguments(stepArgs, [], mockRunId, vmGlobalThis) + ).toEqual(testData); // Step return value - const stepReturn = dehydrateStepReturnValue(testData, [], mockRunId); + const stepReturn = await dehydrateStepReturnValue(testData, [], mockRunId); expect(new TextDecoder().decode(stepReturn.subarray(0, 4))).toBe('devl'); - expect(hydrateStepReturnValue(stepReturn, vmGlobalThis)).toEqual(testData); + expect(await hydrateStepReturnValue(stepReturn, vmGlobalThis)).toEqual( + testData + ); }); - it('should throw error for unknown format prefix', () => { + it('should throw error for unknown format prefix', async () => { // Create data with an unknown 4-character format prefix const unknownFormat = new TextEncoder().encode('unkn{"test":true}'); - expect(() => hydrateWorkflowArguments(unknownFormat, vmGlobalThis)).toThrow( - /Unknown serialization format/ - ); + await expect( + hydrateWorkflowArguments(unknownFormat, vmGlobalThis) + ).rejects.toThrow(/Unknown serialization format/); }); - it('should throw error for data too short to contain format prefix', () => { + it('should throw error for data too short to contain format prefix', async () => { const tooShort = new TextEncoder().encode('dev'); - expect(() => hydrateWorkflowArguments(tooShort, vmGlobalThis)).toThrow( - /Data too short to contain format prefix/ - ); + await expect( + hydrateWorkflowArguments(tooShort, vmGlobalThis) + ).rejects.toThrow(/Data too short to contain format prefix/); }); }); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 5b4b18d70f..d7e3e6af7c 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1,5 +1,6 @@ import { WorkflowRuntimeError } from '@workflow/errors'; import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde'; +import type { Encryptor } from '@workflow/world'; import { DevalueError, parse, stringify, unflatten } from 'devalue'; import { monotonicFactory } from 'ulid'; import { getSerializationClass } from './class-serialization.js'; @@ -1303,13 +1304,14 @@ function getStepRevivers( * @param runId * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateWorkflowArguments( +export async function dehydrateWorkflowArguments( value: unknown, - ops: Promise[], runId: string, + _encryptor: Encryptor, + ops: Promise[] = [], global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getExternalReducers(global, ops, runId)); if (v1Compat) { @@ -1334,8 +1336,10 @@ export function dehydrateWorkflowArguments( * @param extraRevivers * @returns The hydrated value */ -export function hydrateWorkflowArguments( +export async function hydrateWorkflowArguments( value: Uint8Array | unknown, + _runId: string, + _encryptor: Encryptor, global: Record = globalThis, extraRevivers: Record any> = {} ) { @@ -1368,11 +1372,13 @@ export function hydrateWorkflowArguments( * @param global * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateWorkflowReturnValue( +export async function dehydrateWorkflowReturnValue( value: unknown, + _runId: string, + _encryptor: Encryptor, global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getWorkflowReducers(global)); if (v1Compat) { @@ -1400,10 +1406,11 @@ export function dehydrateWorkflowReturnValue( * @param runId * @returns The hydrated return value, ready to be consumed by the client */ -export function hydrateWorkflowReturnValue( +export async function hydrateWorkflowReturnValue( value: Uint8Array | unknown, - ops: Promise[], runId: string, + _encryptor: Encryptor, + ops: Promise[] = [], global: Record = globalThis, extraRevivers: Record any> = {} ) { @@ -1437,11 +1444,13 @@ export function hydrateWorkflowReturnValue( * @param global * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateStepArguments( +export async function dehydrateStepArguments( value: unknown, - global: Record, + _runId: string, + _encryptor: Encryptor, + global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getWorkflowReducers(global)); if (v1Compat) { @@ -1468,10 +1477,11 @@ export function dehydrateStepArguments( * @param runId * @returns The hydrated value, ready to be consumed by the step user-code function */ -export function hydrateStepArguments( +export async function hydrateStepArguments( value: Uint8Array | unknown, - ops: Promise[], runId: string, + _encryptor: Encryptor, + ops: Promise[] = [], global: Record = globalThis, extraRevivers: Record any> = {} ) { @@ -1507,13 +1517,14 @@ export function hydrateStepArguments( * @param runId * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateStepReturnValue( +export async function dehydrateStepReturnValue( value: unknown, - ops: Promise[], runId: string, + _encryptor: Encryptor, + ops: Promise[] = [], global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getStepReducers(global, ops, runId)); if (v1Compat) { @@ -1538,8 +1549,10 @@ export function dehydrateStepReturnValue( * @param extraRevivers * @returns The hydrated return value of a step, ready to be consumed by the workflow handler */ -export function hydrateStepReturnValue( +export async function hydrateStepReturnValue( value: Uint8Array | unknown, + _runId: string, + _encryptor: Encryptor, global: Record = globalThis, extraRevivers: Record any> = {} ) { diff --git a/packages/core/src/step.test.ts b/packages/core/src/step.test.ts index 98c84058a3..03ea451e26 100644 --- a/packages/core/src/step.test.ts +++ b/packages/core/src/step.test.ts @@ -27,6 +27,8 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), + runId: 'wrun_test', + encryptor: {}, }; } @@ -39,7 +41,7 @@ describe('createUseStep', () => { eventType: 'step_completed', correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - result: dehydrateStepReturnValue(3), + result: await dehydrateStepReturnValue(3, 'wrun_123', {}), }, createdAt: new Date(), }, @@ -190,7 +192,7 @@ describe('createUseStep', () => { eventType: 'step_completed', correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - result: dehydrateStepReturnValue(undefined), + result: await dehydrateStepReturnValue(undefined, 'wrun_123', {}), }, createdAt: new Date(), }, @@ -409,7 +411,7 @@ describe('createUseStep', () => { eventType: 'step_completed', correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - result: dehydrateStepReturnValue(42), + result: await dehydrateStepReturnValue(42, 'wrun_123', {}), }, createdAt: new Date(), }, diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index caec871107..ce7f1566d8 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -142,13 +142,23 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { ctx.invocationsQueue.delete(event.correlationId); // Step has completed, so resolve the Promise with the cached result - const hydratedResult = hydrateStepReturnValue( + // Use .then() pattern since hydrateStepReturnValue is async + hydrateStepReturnValue( event.eventData.result, + ctx.runId, + ctx.encryptor, ctx.globalThis - ); - setTimeout(() => { - resolve(hydratedResult); - }, 0); + ) + .then((hydratedResult) => { + setTimeout(() => { + resolve(hydratedResult as Result); + }, 0); + }) + .catch((error) => { + setTimeout(() => { + reject(error); + }, 0); + }); return EventConsumerResult.Finished; } diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index 32159ea6e8..2b86e71c60 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -1,6 +1,6 @@ import { types } from 'node:util'; import { WorkflowRuntimeError } from '@workflow/errors'; -import type { Event, WorkflowRun } from '@workflow/world'; +import type { Encryptor, Event, WorkflowRun } from '@workflow/world'; import { assert, describe, expect, it } from 'vitest'; import type { WorkflowSuspension } from './global.js'; import { @@ -10,6 +10,8 @@ import { } from './serialization.js'; import { runWorkflow } from './workflow.js'; +const mockEncryptor: Encryptor = {}; + describe('runWorkflow', () => { const getWorkflowTransformCode = (workflowName?: string) => `;globalThis.__private_workflows = new Map(); @@ -31,7 +33,7 @@ describe('runWorkflow', () => { runId: 'wrun_123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -40,8 +42,15 @@ describe('runWorkflow', () => { const events: Event[] = []; - const result = await runWorkflow(workflowCode, workflowRun, events); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual('success'); + const result = await runWorkflow( + workflowCode, + workflowRun, + events, + mockEncryptor + ); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual('success'); }); it('should execute workflow with arguments', async () => { @@ -52,7 +61,7 @@ describe('runWorkflow', () => { runId: 'wrun_123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([1, 2], ops), + input: await dehydrateWorkflowArguments([1, 2], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -61,8 +70,15 @@ describe('runWorkflow', () => { const events: Event[] = []; - const result = await runWorkflow(workflowCode, workflowRun, events); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(3); + const result = await runWorkflow( + workflowCode, + workflowRun, + events, + mockEncryptor + ); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual(3); }); it('allow user code to handle user-defined errors', async () => { @@ -79,7 +95,7 @@ describe('runWorkflow', () => { runId: 'wrun_123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -88,8 +104,15 @@ describe('runWorkflow', () => { const events: Event[] = []; - const result = hydrateWorkflowReturnValue( - (await runWorkflow(workflowCode, workflowRun, events)) as any, + const result = await hydrateWorkflowReturnValue( + (await runWorkflow( + workflowCode, + workflowRun, + events, + mockEncryptor + )) as any, + 'wrun_test', + {}, ops ); assert(types.isNativeError(result)); @@ -105,7 +128,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -126,7 +149,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HF', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -140,9 +163,12 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(3); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual(3); }); // Test that timestamps update correctly as events are consumed @@ -153,7 +179,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -194,7 +220,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HF', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), }, @@ -218,7 +244,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HG', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date('2024-01-01T00:00:04.000Z'), }, @@ -242,7 +268,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HH', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date('2024-01-01T00:00:06.000Z'), }, @@ -262,14 +288,17 @@ describe('runWorkflow', () => { return timestamps; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); // Timestamps: // - Initial: 0s (from startedAt) // - After step 1 completes (at 2s), timestamp advances to step2_created (2.5s) // - After step 2 completes (at 4s), timestamp advances to step3_created (4.5s) // - After step 3 completes: 6s - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual([ new Date('2024-01-01T00:00:00.000Z'), 1704067202500, // 2.5s (step2_created timestamp) 1704067204500, // 4.5s (step3_created timestamp) @@ -287,7 +316,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -332,23 +361,43 @@ describe('runWorkflow', () => { }${getWorkflowTransformCode('workflow')}`; // Execute the workflow with only sleep(1) resolved - const result1 = await runWorkflow(workflowCode, workflowRun, events); + const result1 = await runWorkflow( + workflowCode, + workflowRun, + events, + mockEncryptor + ); // Execute again with both sleeps resolved this time - const result2 = await runWorkflow(workflowCode, workflowRun, [ - ...events, - { - eventId: 'event-3', - runId: workflowRunId, - eventType: 'wait_completed', - correlationId: 'wait_01HK153X008RT6YEW43G8QX6JY', - createdAt: new Date('2024-01-01T00:00:04.000Z'), - }, - ]); + const result2 = await runWorkflow( + workflowCode, + workflowRun, + [ + ...events, + { + eventId: 'event-3', + runId: workflowRunId, + eventType: 'wait_completed', + correlationId: 'wait_01HK153X008RT6YEW43G8QX6JY', + createdAt: new Date('2024-01-01T00:00:04.000Z'), + }, + ], + mockEncryptor + ); // The date should be the same - const date1 = hydrateWorkflowReturnValue(result1 as any, ops); - const date2 = hydrateWorkflowReturnValue(result2 as any, ops); + const date1 = await hydrateWorkflowReturnValue( + result1 as any, + 'wrun_test', + {}, + ops + ); + const date2 = await hydrateWorkflowReturnValue( + result2 as any, + 'wrun_test', + {}, + ops + ); expect(date1).toEqual(date2); } ); @@ -361,7 +410,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -389,7 +438,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -399,7 +448,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(7, ops), + result: await dehydrateStepReturnValue(7, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -412,9 +461,12 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([3, 7]); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual([3, 7]); }); it('should resolve `Promise.race()` steps that have `step_completed` events (first promise resolves first)', async () => { @@ -424,7 +476,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -452,7 +504,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -462,7 +514,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(7, ops), + result: await dehydrateStepReturnValue(7, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -475,9 +527,12 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(3); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual(3); }); it('should resolve `Promise.race()` steps that have `step_completed` events (second promise resolves first)', async () => { @@ -487,7 +542,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -515,7 +570,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(7, ops), + result: await dehydrateStepReturnValue(7, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -525,7 +580,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, createdAt: new Date(), }, @@ -538,9 +593,12 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(7); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual(7); }); it('should handle Promise.race with multiple concurrent steps completing out of order', async () => { @@ -549,7 +607,7 @@ describe('runWorkflow', () => { runId: 'wrun_01K75533W56DAE35VY3082DN3P', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -596,7 +654,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGH', eventData: { - result: dehydrateStepReturnValue(4, ops), + result: await dehydrateStepReturnValue(4, 'wrun_test', {}, ops), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553EABWCK00JQ9R8P1FTK7', @@ -606,7 +664,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGG', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, 'wrun_test', {}, ops), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553F31YS6C94NG23WGEEMV', @@ -616,7 +674,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGF', eventData: { - result: dehydrateStepReturnValue(2, ops), + result: await dehydrateStepReturnValue(2, 'wrun_test', {}, ops), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553G0XEE4R440QS5SV89YE', @@ -626,7 +684,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGE', eventData: { - result: dehydrateStepReturnValue(1, ops), + result: await dehydrateStepReturnValue(1, 'wrun_test', {}, ops), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553HS9R1XJQKVVW0ZRCMNP', @@ -636,7 +694,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGD', eventData: { - result: dehydrateStepReturnValue(0, ops), + result: await dehydrateStepReturnValue(0, 'wrun_test', {}, ops), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553K67FQG02YCFE9QDKJ90', @@ -664,11 +722,12 @@ describe('runWorkflow', () => { return done; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ - 4, 3, 2, 1, 0, - ]); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual([4, 3, 2, 1, 0]); }); }); @@ -681,7 +740,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'value', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -693,7 +752,8 @@ describe('runWorkflow', () => { await runWorkflow( `const value = "test"${getWorkflowTransformCode()}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -713,7 +773,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -725,7 +785,8 @@ describe('runWorkflow', () => { await runWorkflow( `function workflow() { throw new Error("test"); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -743,7 +804,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'testWorkflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -755,7 +816,8 @@ describe('runWorkflow', () => { await runWorkflow( `function testWorkflow() { throw new Error("test error"); }${getWorkflowTransformCode('testWorkflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -776,7 +838,7 @@ describe('runWorkflow', () => { runId: 'test-run-nested', workflowName: 'nestedWorkflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -798,7 +860,7 @@ describe('runWorkflow', () => { } ${getWorkflowTransformCode('nestedWorkflow')}`; - await runWorkflow(workflowCode, workflowRun, events); + await runWorkflow(workflowCode, workflowRun, events, mockEncryptor); } catch (err) { error = err as Error; } @@ -821,7 +883,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -838,7 +900,8 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -864,7 +927,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -889,7 +952,8 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -909,7 +973,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -925,7 +989,8 @@ describe('runWorkflow', () => { return a; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -957,7 +1022,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -978,7 +1043,8 @@ describe('runWorkflow', () => { } }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -996,7 +1062,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1012,7 +1078,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' @@ -1025,7 +1092,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1041,7 +1108,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' @@ -1054,7 +1122,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1070,7 +1138,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' @@ -1083,7 +1152,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1099,7 +1168,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' @@ -1112,7 +1182,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1128,7 +1198,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' @@ -1141,7 +1212,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1157,7 +1228,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Timeout functions like "setTimeout" and "setInterval" are not supported in workflow functions' @@ -1172,7 +1244,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1187,7 +1259,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -1211,7 +1284,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1228,7 +1301,8 @@ describe('runWorkflow', () => { return payload.message; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -1246,7 +1320,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1260,8 +1334,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { message: 'Hello from hook' }, + 'wrun_test', + {}, ops ), }, @@ -1277,11 +1353,12 @@ describe('runWorkflow', () => { return payload.message; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events - ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'Hello from hook' + events, + mockEncryptor ); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual('Hello from hook'); }); it('should resolve multiple `createHook` awaits upon "hook_received" events', async () => { @@ -1290,7 +1367,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1304,8 +1381,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { message: 'First payload' }, + 'wrun_test', + {}, ops ), }, @@ -1317,8 +1396,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { message: 'Second payload' }, + 'wrun_test', + {}, ops ), }, @@ -1335,12 +1416,12 @@ describe('runWorkflow', () => { return [payload1.message, payload2.message]; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ - 'First payload', - 'Second payload', - ]); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual(['First payload', 'Second payload']); }); it('should support `for await` loops with `createHook`', async () => { @@ -1349,7 +1430,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1363,8 +1444,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { count: 1, status: 'active' }, + 'wrun_test', + {}, ops ), }, @@ -1376,8 +1459,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { count: 2, status: 'complete' }, + 'wrun_test', + {}, ops ), }, @@ -1399,9 +1484,12 @@ describe('runWorkflow', () => { return payloads; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual([ { count: 1, status: 'active' }, { count: 2, status: 'complete' }, ]); @@ -1413,7 +1501,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1427,7 +1515,12 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ value: 100 }, ops), + payload: await dehydrateStepReturnValue( + { value: 100 }, + 'wrun_test', + {}, + ops + ), }, createdAt: new Date(), }, @@ -1437,7 +1530,12 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ value: 200 }, ops), + payload: await dehydrateStepReturnValue( + { value: 200 }, + 'wrun_test', + {}, + ops + ), }, createdAt: new Date(), }, @@ -1451,9 +1549,12 @@ describe('runWorkflow', () => { return payload.value; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(100); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual(100); }); it('should support multiple queued "hook_received" events with step events in between', async () => { @@ -1462,7 +1563,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1476,7 +1577,12 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ data: 'first' }, ops), + payload: await dehydrateStepReturnValue( + { data: 'first' }, + 'wrun_test', + {}, + ops + ), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -1486,7 +1592,12 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ data: 'second' }, ops), + payload: await dehydrateStepReturnValue( + { data: 'second' }, + 'wrun_test', + {}, + ops + ), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), }, @@ -1503,7 +1614,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(42, ops), + result: await dehydrateStepReturnValue(42, 'wrun_test', {}, ops), }, createdAt: new Date('2024-01-01T00:00:04.000Z'), }, @@ -1524,9 +1635,12 @@ describe('runWorkflow', () => { }; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual({ + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual({ data1: 'first', stepResult: 42, data2: 'second', @@ -1539,7 +1653,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1553,7 +1667,12 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ iteration: 1 }, ops), + payload: await dehydrateStepReturnValue( + { iteration: 1 }, + 'wrun_test', + {}, + ops + ), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -1570,7 +1689,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(10, ops), + result: await dehydrateStepReturnValue(10, 'wrun_test', {}, ops), }, createdAt: new Date('2024-01-01T00:00:03.000Z'), }, @@ -1588,7 +1707,8 @@ describe('runWorkflow', () => { } }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -1606,7 +1726,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1620,7 +1740,12 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ result: 'success' }, ops), + payload: await dehydrateStepReturnValue( + { result: 'success' }, + 'wrun_test', + {}, + ops + ), }, createdAt: new Date(), }, @@ -1634,9 +1759,12 @@ describe('runWorkflow', () => { return { token: hook.token, result: payload.result }; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual({ + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual({ token: 'my-custom-token', result: 'success', }); @@ -1648,7 +1776,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1678,7 +1806,8 @@ describe('runWorkflow', () => { return payload; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -1695,7 +1824,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1729,7 +1858,8 @@ describe('runWorkflow', () => { return results; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -1747,7 +1877,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1761,9 +1891,15 @@ describe('runWorkflow', () => { return new Response('Hello, world!', { status: 201 }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(201); expect(res.body).toBeInstanceOf(ReadableStream); @@ -1779,7 +1915,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1793,9 +1929,15 @@ describe('runWorkflow', () => { return Response.json({ message: 'success', count: 42 }, { status: 201 }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(201); expect(res.headers.get('content-type')).toEqual('application/json'); @@ -1811,7 +1953,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1828,9 +1970,15 @@ describe('runWorkflow', () => { }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(202); expect(res.headers.get('X-Custom-Header')).toEqual('custom-value'); @@ -1843,7 +1991,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1857,9 +2005,15 @@ describe('runWorkflow', () => { return new Response(null, { status: 204 }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(204); expect(res.body).toBeNull(); @@ -1871,7 +2025,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1886,9 +2040,15 @@ describe('runWorkflow', () => { return new Response(data, { status: 200 }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(200); expect(res.body).toBeInstanceOf(ReadableStream); @@ -1904,7 +2064,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1919,7 +2079,8 @@ describe('runWorkflow', () => { return new Response('hello', { status: 204 }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Response constructor: Invalid response status code 204' @@ -1933,7 +2094,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1947,9 +2108,15 @@ describe('runWorkflow', () => { return Response.redirect('https://example.com/redirect'); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(302); expect(res.headers.get('Location')).toEqual( @@ -1964,7 +2131,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1978,9 +2145,15 @@ describe('runWorkflow', () => { return Response.redirect('https://example.com/moved', 301); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const res = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const res = hydrateWorkflowReturnValue(result as any, ops); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(301); expect(res.headers.get('Location')).toEqual( @@ -1994,7 +2167,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2010,9 +2183,15 @@ describe('runWorkflow', () => { ); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const statuses = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const statuses = hydrateWorkflowReturnValue(result as any, ops); expect(statuses).toEqual([301, 302, 303, 307, 308]); }); @@ -2022,7 +2201,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2037,7 +2216,8 @@ describe('runWorkflow', () => { return Response.redirect('https://example.com', 200); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow( 'Invalid redirect status code: 200. Must be one of: 301, 302, 303, 307, 308' @@ -2053,7 +2233,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2067,9 +2247,15 @@ describe('runWorkflow', () => { return new Request('https://example.com/api'); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const req = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const req = hydrateWorkflowReturnValue(result as any, ops); expect(req).toBeInstanceOf(Request); expect(req.method).toEqual('GET'); expect(req.url).toEqual('https://example.com/api'); @@ -2082,7 +2268,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2099,9 +2285,15 @@ describe('runWorkflow', () => { }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const req = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const req = hydrateWorkflowReturnValue(result as any, ops); expect(req).toBeInstanceOf(Request); expect(req.method).toEqual('POST'); expect(req.url).toEqual('https://example.com/api'); @@ -2118,7 +2310,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2139,9 +2331,15 @@ describe('runWorkflow', () => { }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const req = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const req = hydrateWorkflowReturnValue(result as any, ops); expect(req).toBeInstanceOf(Request); expect(req.headers.get('Content-Type')).toEqual('application/json'); expect(req.headers.get('X-Custom-Header')).toEqual('custom-value'); @@ -2153,7 +2351,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2171,9 +2369,15 @@ describe('runWorkflow', () => { }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor + ); + const req = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops ); - const req = hydrateWorkflowReturnValue(result as any, ops); expect(req).toBeInstanceOf(Request); expect(req.method).toEqual('PUT'); expect(req.body).toBeInstanceOf(ReadableStream); @@ -2189,7 +2393,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2207,7 +2411,8 @@ describe('runWorkflow', () => { }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow('Request with GET/HEAD method cannot have body.'); }); @@ -2218,7 +2423,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2236,7 +2441,8 @@ describe('runWorkflow', () => { }); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow('Request with GET/HEAD method cannot have body.'); }); @@ -2247,7 +2453,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2262,7 +2468,8 @@ describe('runWorkflow', () => { return new Request('/'); }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ) ).rejects.toThrow('Failed to parse URL from /'); }); @@ -2273,7 +2480,7 @@ describe('runWorkflow', () => { runId: 'test-clone-bug-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2306,10 +2513,16 @@ describe('runWorkflow', () => { }; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - const result_obj = hydrateWorkflowReturnValue(result as any, ops); + const result_obj = await hydrateWorkflowReturnValue( + result as any, + 'wrun_test', + {}, + ops + ); // According to MDN, the req1 properties should be inherited // and only the method should be overridden by the init options @@ -2330,7 +2543,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2365,11 +2578,12 @@ describe('runWorkflow', () => { return 'sleep completed'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events - ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'sleep completed' + events, + mockEncryptor ); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual('sleep completed'); }); it('should throw `WorkflowSuspension` when sleep has no wait_completed event', async () => { @@ -2381,7 +2595,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2397,7 +2611,8 @@ describe('runWorkflow', () => { return 'done'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -2416,7 +2631,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2467,11 +2682,12 @@ describe('runWorkflow', () => { return 'all sleeps completed'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events - ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'all sleeps completed' + events, + mockEncryptor ); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual('all sleeps completed'); }); it('should suspend with multiple sleeps but only one wait_completed event (partial completion)', async () => { @@ -2483,7 +2699,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2527,7 +2743,8 @@ describe('runWorkflow', () => { return 'all sleeps completed'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -2545,7 +2762,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2566,7 +2783,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(42, ops), + result: await dehydrateStepReturnValue(42, 'wrun_test', {}, ops), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -2598,9 +2815,12 @@ describe('runWorkflow', () => { return { step: stepResult, slept: true }; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual({ + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual({ step: 42, slept: true, }); @@ -2614,7 +2834,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2649,11 +2869,12 @@ describe('runWorkflow', () => { return 'sleep with date completed'; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events - ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'sleep with date completed' + events, + mockEncryptor ); + expect( + await hydrateWorkflowReturnValue(result as any, 'wrun_test', {}, ops) + ).toEqual('sleep with date completed'); }); }); @@ -2666,7 +2887,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2685,7 +2906,8 @@ describe('runWorkflow', () => { return result; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; @@ -2713,7 +2935,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], 'wrun_test', {}, ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2729,7 +2951,8 @@ describe('runWorkflow', () => { return result; }${getWorkflowTransformCode('workflow')}`, workflowRun, - events + events, + mockEncryptor ); } catch (err) { error = err as Error; diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 3328f3e8b4..9301bf1f10 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -34,7 +34,8 @@ import { createSleep } from './workflow/sleep.js'; export async function runWorkflow( workflowCode: string, workflowRun: WorkflowRun, - events: Event[] + events: Event[], + encryptor: import('@workflow/world').Encryptor ): Promise { return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => { span?.setAttributes({ @@ -78,6 +79,8 @@ export async function runWorkflow( generateUlid: () => ulid(+startedAt), generateNanoid, invocationsQueue: new Map(), + runId: workflowRun.runId, + encryptor, }; // Subscribe to the events log to update the timestamp in the vm context @@ -624,7 +627,12 @@ export async function runWorkflow( ); } - const args = hydrateWorkflowArguments(workflowRun.input, vmGlobalThis); + const args = await hydrateWorkflowArguments( + workflowRun.input, + workflowRun.runId, + encryptor, + vmGlobalThis + ); span?.setAttributes({ ...Attribute.WorkflowArgumentsCount(args.length), @@ -636,7 +644,12 @@ export async function runWorkflow( workflowDiscontinuation.promise, ]); - const dehydrated = dehydrateWorkflowReturnValue(result, vmGlobalThis); + const dehydrated = await dehydrateWorkflowReturnValue( + result, + workflowRun.runId, + encryptor, + vmGlobalThis + ); span?.setAttributes({ ...Attribute.WorkflowResultType(typeof result), diff --git a/packages/core/src/workflow/hook.test.ts b/packages/core/src/workflow/hook.test.ts index a8d2d884ba..78fcb48f9b 100644 --- a/packages/core/src/workflow/hook.test.ts +++ b/packages/core/src/workflow/hook.test.ts @@ -27,6 +27,8 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) ), onWorkflowError: vi.fn(), + runId: 'wrun_test', + encryptor: {}, }; } @@ -40,7 +42,12 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ message: 'hello' }, ops), + payload: await dehydrateStepReturnValue( + { message: 'hello' }, + 'wrun_123', + {}, + ops + ), }, createdAt: new Date(), }, @@ -125,7 +132,12 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ data: 'test' }, ops), + payload: await dehydrateStepReturnValue( + { data: 'test' }, + 'wrun_123', + {}, + ops + ), }, createdAt: new Date(), }, @@ -189,7 +201,12 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ message: 'first' }, ops), + payload: await dehydrateStepReturnValue( + { message: 'first' }, + 'wrun_123', + {}, + ops + ), }, createdAt: new Date(), }, @@ -199,7 +216,12 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ message: 'second' }, ops), + payload: await dehydrateStepReturnValue( + { message: 'second' }, + 'wrun_123', + {}, + ops + ), }, createdAt: new Date(), }, diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 70763e87d0..37afa16107 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -1,3 +1,4 @@ +import { ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; import type { HookConflictEvent, HookReceivedEvent } from '@workflow/world'; import type { Hook, HookOptions } from '../create-hook.js'; @@ -6,7 +7,6 @@ import { WorkflowSuspension } from '../global.js'; import { webhookLogger } from '../logger.js'; import type { WorkflowOrchestratorContext } from '../private.js'; import { hydrateStepReturnValue } from '../serialization.js'; -import { ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; export function createCreateHook(ctx: WorkflowOrchestratorContext) { return function createHookImpl(options: HookOptions = {}): Hook { @@ -94,11 +94,23 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { const next = promises.shift(); if (next) { // Reconstruct the payload from the event data - const payload = hydrateStepReturnValue( + // Use .then() pattern since hydrateStepReturnValue is async + hydrateStepReturnValue( event.eventData.payload, + ctx.runId, + ctx.encryptor, ctx.globalThis - ); - next.resolve(payload); + ) + .then((payload) => { + next.resolve(payload as T); + }) + .catch((error) => { + ctx.onWorkflowError( + error instanceof Error + ? error + : new WorkflowRuntimeError(String(error)) + ); + }); } } else { payloadsQueue.push(event); @@ -138,11 +150,19 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { if (payloadsQueue.length > 0) { const nextPayload = payloadsQueue.shift(); if (nextPayload) { - const payload = hydrateStepReturnValue( + // Use .then() pattern since hydrateStepReturnValue is async + hydrateStepReturnValue( nextPayload.eventData.payload, + ctx.runId, + ctx.encryptor, ctx.globalThis - ); - resolvers.resolve(payload); + ) + .then((payload) => { + resolvers.resolve(payload as T); + }) + .catch((error) => { + resolvers.reject(error); + }); return resolvers.promise; } } diff --git a/packages/web/src/server/workflow-server-actions.ts b/packages/web/src/server/workflow-server-actions.ts index a1021d03b2..0158973271 100644 --- a/packages/web/src/server/workflow-server-actions.ts +++ b/packages/web/src/server/workflow-server-actions.ts @@ -545,9 +545,9 @@ const toJSONCompatible = (data: T): T => { return data; }; -const hydrate = (data: T): T => { +const hydrate = async (data: T, world: World): Promise => { try { - return hydrateResourceIO(data as any) as T; + return (await hydrateResourceIO(data as any, world)) as T; } catch (error) { throw new Error('Failed to hydrate data', { cause: error }); } @@ -595,7 +595,11 @@ export async function fetchRuns( resolveData: 'none', }); return createResponse({ - data: (result.data as unknown as WorkflowRun[]).map(hydrate), + data: await Promise.all( + (result.data as unknown as WorkflowRun[]).map((run) => + hydrate(run, world) + ) + ), cursor: result.cursor ?? undefined, hasMore: result.hasMore, }); @@ -619,7 +623,7 @@ export async function fetchRun( try { const world = await getWorldFromEnv(worldEnv); const run = await world.runs.get(runId, { resolveData }); - const hydratedRun = hydrate(run as WorkflowRun); + const hydratedRun = await hydrate(run as WorkflowRun, world); return createResponse(hydratedRun); } catch (error) { return createServerActionError(error, 'world.runs.get', { @@ -651,7 +655,9 @@ export async function fetchSteps( }); return createResponse({ // StepWithoutData has undefined input/output, but after hydration the structure is compatible - data: (result.data as unknown as Step[]).map(hydrate), + data: await Promise.all( + (result.data as unknown as Step[]).map((step) => hydrate(step, world)) + ), cursor: result.cursor ?? undefined, hasMore: result.hasMore, }); @@ -679,7 +685,7 @@ export async function fetchStep( try { const world = await getWorldFromEnv(worldEnv); const step = await world.steps.get(runId, stepId, { resolveData }); - const hydratedStep = hydrate(step as Step); + const hydratedStep = await hydrate(step as Step, world); return createResponse(hydratedStep); } catch (error) { return createServerActionError(error, 'world.steps.get', { @@ -749,7 +755,9 @@ export async function fetchEventsByCorrelationId( resolveData: withData ? 'all' : 'none', }); return createResponse({ - data: result.data.map(hydrate), + data: await Promise.all( + result.data.map((event) => hydrate(event, world)) + ), cursor: result.cursor ?? undefined, hasMore: result.hasMore, }); @@ -786,7 +794,9 @@ export async function fetchHooks( resolveData: 'none', }); return createResponse({ - data: (result.data as Hook[]).map(hydrate), + data: await Promise.all( + (result.data as Hook[]).map((hook) => hydrate(hook, world)) + ), cursor: result.cursor ?? undefined, hasMore: result.hasMore, }); @@ -810,7 +820,7 @@ export async function fetchHook( try { const world = await getWorldFromEnv(worldEnv); const hook = await world.hooks.get(hookId, { resolveData }); - return createResponse(hydrate(hook as Hook)); + return createResponse(await hydrate(hook as Hook, world)); } catch (error) { return createServerActionError(error, 'world.hooks.get', { hookId, diff --git a/packages/world-testing/src/addition.mts b/packages/world-testing/src/addition.mts index 33bfe6b6b6..136ba69d69 100644 --- a/packages/world-testing/src/addition.mts +++ b/packages/world-testing/src/addition.mts @@ -23,7 +23,7 @@ export function addition(world: string) { timeout: 10_000, } ); - const output = await hydrateWorkflowReturnValue(run.output!, [], run.runId); + const output = await hydrateWorkflowReturnValue(run.output!, run.runId, {}); expect(output).toEqual(3); }); } diff --git a/packages/world-testing/src/errors.mts b/packages/world-testing/src/errors.mts index 0ad77783a5..e6337cf6a1 100644 --- a/packages/world-testing/src/errors.mts +++ b/packages/world-testing/src/errors.mts @@ -24,7 +24,7 @@ export function errors(world: string) { timeout: 50_000, } ); - const output = await hydrateWorkflowReturnValue(run.output!, [], run.runId); + const output = await hydrateWorkflowReturnValue(run.output!, run.runId, {}); expect(output).toEqual({ gotFatalError: true, retryableResult: { diff --git a/packages/world-testing/src/hooks.mts b/packages/world-testing/src/hooks.mts index 280a0c8483..7363ea0f5f 100644 --- a/packages/world-testing/src/hooks.mts +++ b/packages/world-testing/src/hooks.mts @@ -67,7 +67,7 @@ export function hooks(world: string) { } ); - const output = await hydrateWorkflowReturnValue(run.output!, [], run.runId); + const output = await hydrateWorkflowReturnValue(run.output!, run.runId, {}); expect(output).toEqual({ collected: [ { diff --git a/packages/world-testing/src/idempotency.mts b/packages/world-testing/src/idempotency.mts index af05c9c317..6f3ce76eed 100644 --- a/packages/world-testing/src/idempotency.mts +++ b/packages/world-testing/src/idempotency.mts @@ -21,7 +21,7 @@ export function idempotency(world: string) { } ); - const output = await hydrateWorkflowReturnValue(run.output!, [], run.runId); + const output = await hydrateWorkflowReturnValue(run.output!, run.runId, {}); expect(output).toEqual({ numbers: Array.from({ length: 20 }, () => expect.any(Number)), diff --git a/packages/world-testing/src/null-byte.mts b/packages/world-testing/src/null-byte.mts index ce26eb0a31..d60f5702ab 100644 --- a/packages/world-testing/src/null-byte.mts +++ b/packages/world-testing/src/null-byte.mts @@ -23,7 +23,7 @@ export function nullByte(world: string) { timeout: 10_000, } ); - const output = await hydrateWorkflowReturnValue(run.output!, [], run.runId); + const output = await hydrateWorkflowReturnValue(run.output!, run.runId, {}); expect(output).toEqual('null byte \0'); }); } diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index a68e5437cd..40648d829e 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -168,13 +168,66 @@ export interface Storage { }; } +/** + * Context passed to encryption/decryption operations. + */ +export interface EncryptionContext { + /** The workflow run ID, used for per-run key derivation */ + runId: string; +} + +/** + * Key material for external decryption (e.g., observability tooling). + */ +export interface KeyMaterial { + /** The raw key bytes */ + key: Uint8Array; + /** Context used for key derivation */ + derivationContext: Record; + /** Algorithm identifier */ + algorithm: string; + /** Key derivation function identifier */ + kdf: string; +} + +/** + * Optional encryption support for workflow data. + * All methods are optional - when not provided, data is stored unencrypted. + */ +export interface Encryptor { + /** Encrypt data with the given context */ + encrypt?(data: Uint8Array, context: EncryptionContext): Promise; + /** Decrypt data with the given context */ + decrypt?(data: Uint8Array, context: EncryptionContext): Promise; + /** Retrieve key material for external decryption (e.g., o11y tooling) */ + getKeyMaterial?( + options: Record + ): Promise; +} + /** * The "World" interface represents how Workflows are able to communicate with the outside world. */ -export interface World extends Queue, Storage, Streamer { +export interface World extends Queue, Storage, Streamer, Encryptor { /** * A function that will be called to start any background tasks needed by the World implementation. * For example, in the case of a queue backed World, this would start the queue processing. */ start?(): Promise; + + /** + * Resolve an Encryptor that can encrypt/decrypt data for a specific workflow run. + * + * This is needed when operating on data that belongs to a different deployment + * context (e.g., `resumeHook()` called from a newer deployment targeting a + * workflow run from an older deployment). The returned Encryptor uses the + * correct key for the target run's deployment. + * + * When not implemented, the World itself is used as the Encryptor (which works + * for same-deployment operations). + * + * @param runId - The workflow run ID to resolve an encryptor for + * @returns An Encryptor that can handle data for the given run + */ + getEncryptorForRun?(runId: string): Promise; } diff --git a/workbench/nextjs-turbopack/pages/api/trigger-pages.ts b/workbench/nextjs-turbopack/pages/api/trigger-pages.ts index 860dbdf247..eb76310e00 100644 --- a/workbench/nextjs-turbopack/pages/api/trigger-pages.ts +++ b/workbench/nextjs-turbopack/pages/api/trigger-pages.ts @@ -4,25 +4,8 @@ import { WorkflowRunFailedError, WorkflowRunNotCompletedError, } from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; import { allWorkflows } from '@/_workflows'; -// Disable body parsing to receive raw binary data -export const config = { - api: { - bodyParser: false, - }, -}; - -async function readRawBody(req: NextApiRequest): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - req.on('data', (chunk: Buffer) => chunks.push(chunk)); - req.on('end', () => resolve(Buffer.concat(chunks))); - req.on('error', reject); - }); -} - export default async function handler( req: NextApiRequest, res: NextApiResponse @@ -78,13 +61,7 @@ async function handlePost(req: NextApiRequest, res: NextApiResponse) { return Number.isNaN(num) ? arg.trim() : num; }); } else { - // Args from body (binary serialized data) - const buffer = await readRawBody(req); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } + args = [42]; } console.log(`Starting "${workflowFn}" workflow with args: ${args}`); diff --git a/workbench/nextjs-webpack/pages/api/trigger-pages.ts b/workbench/nextjs-webpack/pages/api/trigger-pages.ts index 860dbdf247..eb76310e00 100644 --- a/workbench/nextjs-webpack/pages/api/trigger-pages.ts +++ b/workbench/nextjs-webpack/pages/api/trigger-pages.ts @@ -4,25 +4,8 @@ import { WorkflowRunFailedError, WorkflowRunNotCompletedError, } from 'workflow/internal/errors'; -import { hydrateWorkflowArguments } from 'workflow/internal/serialization'; import { allWorkflows } from '@/_workflows'; -// Disable body parsing to receive raw binary data -export const config = { - api: { - bodyParser: false, - }, -}; - -async function readRawBody(req: NextApiRequest): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - req.on('data', (chunk: Buffer) => chunks.push(chunk)); - req.on('end', () => resolve(Buffer.concat(chunks))); - req.on('error', reject); - }); -} - export default async function handler( req: NextApiRequest, res: NextApiResponse @@ -78,13 +61,7 @@ async function handlePost(req: NextApiRequest, res: NextApiResponse) { return Number.isNaN(num) ? arg.trim() : num; }); } else { - // Args from body (binary serialized data) - const buffer = await readRawBody(req); - if (buffer.byteLength > 0) { - args = hydrateWorkflowArguments(new Uint8Array(buffer), globalThis); - } else { - args = [42]; - } + args = [42]; } console.log(`Starting "${workflowFn}" workflow with args: ${args}`);