From 5b090e8f05a02f31cc5798926c0d0d2025e5f2d2 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Sun, 8 Feb 2026 02:43:39 -0800 Subject: [PATCH 1/3] Make serialization functions async --- .changeset/async-serde.md | 10 + packages/cli/src/lib/inspect/output.ts | 24 +- packages/core/src/runtime/resume-hook.ts | 8 +- packages/core/src/runtime/run.ts | 4 +- packages/core/src/runtime/runs.ts | 2 +- packages/core/src/runtime/start.ts | 2 +- packages/core/src/runtime/step-handler.ts | 4 +- .../core/src/runtime/suspension-handler.ts | 40 +- packages/core/src/serialization.test.ts | 300 +++++++------ packages/core/src/serialization.ts | 24 +- packages/core/src/step.test.ts | 6 +- packages/core/src/step.ts | 20 +- packages/core/src/workflow.test.ts | 404 +++++++++++------- packages/core/src/workflow.ts | 7 +- packages/core/src/workflow/hook.test.ts | 24 +- packages/core/src/workflow/hook.ts | 26 +- 16 files changed, 551 insertions(+), 354 deletions(-) create mode 100644 .changeset/async-serde.md diff --git a/.changeset/async-serde.md b/.changeset/async-serde.md new file mode 100644 index 0000000000..a5453bf9cf --- /dev/null +++ b/.changeset/async-serde.md @@ -0,0 +1,10 @@ +--- +"@workflow/core": patch +"@workflow/cli": patch +"@workflow/web": patch +"@workflow/world-testing": patch +--- + +Make serialization functions async + +All 8 dehydrate/hydrate functions in the serialization layer are now async, returning Promises. This is a prerequisite for future encryption support where encrypt/decrypt operations are inherently asynchronous. No functional changes — the function bodies remain synchronous, only the signatures and all call sites have been updated. diff --git a/packages/cli/src/lib/inspect/output.ts b/packages/cli/src/lib/inspect/output.ts index f36b69a070..e52eeb7157 100644 --- a/packages/cli/src/lib/inspect/output.ts +++ b/packages/cli/src/lib/inspect/output.ts @@ -534,7 +534,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => { }, resolveData, }); - const runsWithHydratedIO = runs.data.map(hydrateResourceIO); + const runsWithHydratedIO = await Promise.all( + runs.data.map(hydrateResourceIO) + ); showJson({ ...runs, data: runsWithHydratedIO }); return; } catch (error) { @@ -572,7 +574,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => { } }, displayPage: async (runs) => { - const runsWithHydratedIO = runs.map(hydrateResourceIO); + const runsWithHydratedIO = await Promise.all(runs.map(hydrateResourceIO)); logger.log(showTable(runsWithHydratedIO, props, opts)); }, }); @@ -588,7 +590,7 @@ export const getRecentRun = async ( pagination: { limit: 1, sortOrder: opts.sort || 'desc' }, resolveData: 'none', // Don't need data for just getting the ID }); - runs.data = runs.data.map(hydrateResourceIO); + runs.data = await Promise.all(runs.data.map(hydrateResourceIO)); return runs.data[0]; } catch (error) { if (handleApiError(error, opts.backend)) { @@ -608,7 +610,7 @@ export const showRun = async ( } try { const run = await world.runs.get(runId, { resolveData: 'all' }); - const runWithHydratedIO = hydrateResourceIO(run); + const runWithHydratedIO = await hydrateResourceIO(run); if (opts.json) { showJson(runWithHydratedIO); return; @@ -711,7 +713,9 @@ export const listSteps = async ( } }, displayPage: async (steps) => { - const stepsWithHydratedIO = steps.map(hydrateResourceIO); + const stepsWithHydratedIO = await Promise.all( + steps.map(hydrateResourceIO) + ); logger.log(showTable(stepsWithHydratedIO, props, opts)); showInspectInfoBox('step'); }, @@ -735,7 +739,7 @@ export const showStep = async ( const step = await world.steps.get(opts.runId, stepId, { resolveData: 'all', }); - const stepWithHydratedIO = hydrateResourceIO(step); + const stepWithHydratedIO = await hydrateResourceIO(step); if (opts.json) { showJson(stepWithHydratedIO); return; @@ -950,7 +954,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => { }, resolveData, }); - const hydratedHooks = hooks.data.map(hydrateResourceIO); + const hydratedHooks = await Promise.all( + hooks.data.map(hydrateResourceIO) + ); showJson({ ...hooks, data: hydratedHooks }); return; } catch (error) { @@ -994,7 +1000,7 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => { } }, displayPage: async (hooks) => { - const hydratedHooks = hooks.map(hydrateResourceIO); + const hydratedHooks = await Promise.all(hooks.map(hydrateResourceIO)); logger.log(showTable(hydratedHooks, HOOK_LISTED_PROPS, opts)); showInspectInfoBox('hook'); }, @@ -1013,7 +1019,7 @@ export const showHook = async ( const hook = await world.hooks.get(hookId, { resolveData: 'all', }); - const hydratedHook = hydrateResourceIO(hook); + const hydratedHook = await hydrateResourceIO(hook); if (opts.json) { showJson(hydratedHook); return; diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0a28de38a3..1853a6f77f 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -28,7 +28,11 @@ export async function getHookByToken(token: string): Promise { const world = getWorld(); const hook = await world.hooks.getByToken(token); if (typeof hook.metadata !== 'undefined') { - hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId); + hook.metadata = await hydrateStepArguments( + hook.metadata as any, + [], + hook.runId + ); } return hook; } @@ -85,7 +89,7 @@ export async function resumeHook( // Dehydrate the payload for storage const ops: Promise[] = []; const v1Compat = isLegacySpecVersion(hook.specVersion); - const dehydratedPayload = dehydrateStepReturnValue( + const dehydratedPayload = await dehydrateStepReturnValue( payload, ops, hook.runId, diff --git a/packages/core/src/runtime/run.ts b/packages/core/src/runtime/run.ts index 298e5850e0..9e5ad1aaf7 100644 --- a/packages/core/src/runtime/run.ts +++ b/packages/core/src/runtime/run.ts @@ -5,8 +5,8 @@ import { } from '@workflow/errors'; import { SPEC_VERSION_CURRENT, - type World, type WorkflowRunStatus, + type World, } from '@workflow/world'; import { getExternalRevivers, @@ -153,7 +153,7 @@ export class Run { const run = await this.world.runs.get(this.runId); if (run.status === 'completed') { - return hydrateWorkflowReturnValue(run.output, [], this.runId); + return await hydrateWorkflowReturnValue(run.output, [], this.runId); } if (run.status === 'cancelled') { diff --git a/packages/core/src/runtime/runs.ts b/packages/core/src/runtime/runs.ts index 932240df70..17f3f83b33 100644 --- a/packages/core/src/runtime/runs.ts +++ b/packages/core/src/runtime/runs.ts @@ -50,7 +50,7 @@ export async function recreateRunFromExisting( try { const run = await world.runs.get(runId, { resolveData: 'all' }); const workflowArgs = normalizeWorkflowArgs( - hydrateWorkflowArguments(run.input, globalThis) + await hydrateWorkflowArguments(run.input, globalThis) ); const specVersion = options.specVersion ?? run.specVersion ?? SPEC_VERSION_LEGACY; diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index b536145fc3..27353cf1ec 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -119,7 +119,7 @@ export async function start( // Create run via run_created event (event-sourced architecture) // Pass client-generated runId - server will accept and use it - const workflowArguments = dehydrateWorkflowArguments( + const workflowArguments = await dehydrateWorkflowArguments( args, ops, runId, diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 78a8127423..cf4ead2b83 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -298,7 +298,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( {}, async (hydrateSpan) => { const startTime = Date.now(); - const result = hydrateStepArguments( + const result = await hydrateStepArguments( step.input, ops, workflowRunId @@ -354,7 +354,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( {}, async (dehydrateSpan) => { const startTime = Date.now(); - const dehydrated = dehydrateStepReturnValue( + const dehydrated = await dehydrateStepReturnValue( result, ops, workflowRunId diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index e6737cf199..a6ca5b2f97 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -78,24 +78,26 @@ export async function handleSuspension({ ); // Build hook_created events (World will atomically create hook entities) - const hookEvents: CreateEventRequest[] = hookItems.map((queueItem) => { - const hookMetadata: SerializedData | undefined = - typeof queueItem.metadata === 'undefined' - ? undefined - : (dehydrateStepArguments( - queueItem.metadata, - suspension.globalThis - ) as SerializedData); - return { - eventType: 'hook_created' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: queueItem.correlationId, - eventData: { - token: queueItem.token, - metadata: hookMetadata, - }, - }; - }); + const hookEvents: CreateEventRequest[] = await Promise.all( + hookItems.map(async (queueItem) => { + const hookMetadata: SerializedData | undefined = + typeof queueItem.metadata === 'undefined' + ? undefined + : ((await dehydrateStepArguments( + queueItem.metadata, + suspension.globalThis + )) as SerializedData); + return { + eventType: 'hook_created' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: queueItem.correlationId, + eventData: { + token: queueItem.token, + metadata: hookMetadata, + }, + }; + }) + ); // Process hooks first to prevent race conditions with webhook receivers // All hook creations run in parallel @@ -153,7 +155,7 @@ export async function handleSuspension({ (async () => { // Create step event if not already created if (stepsNeedingCreation.has(queueItem.correlationId)) { - const dehydratedInput = dehydrateStepArguments( + const dehydratedInput = await dehydrateStepArguments( { args: queueItem.args, closureVars: queueItem.closureVars, diff --git a/packages/core/src/serialization.test.ts b/packages/core/src/serialization.test.ts index c4e6f3774f..e91fd70585 100644 --- a/packages/core/src/serialization.test.ts +++ b/packages/core/src/serialization.test.ts @@ -50,9 +50,9 @@ describe('workflow arguments', () => { fixedTimestamp: 1714857600000, }); - it('should work with Date', () => { + it('should work with Date', async () => { const date = new Date('2025-07-17T04:30:34.824Z'); - const serialized = dehydrateWorkflowArguments(date, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(date, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -101,16 +101,16 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Date', context)).toBe(true); expect(hydrated.getTime()).toEqual(date.getTime()); }); - it('should work with invalid Date', () => { + it('should work with invalid Date', async () => { const date = new Date('asdf'); - const serialized = dehydrateWorkflowArguments(date, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(date, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -136,16 +136,16 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Date', context)).toBe(true); expect(hydrated.getTime()).toEqual(NaN); }); - it('should work with BigInt', () => { + it('should work with BigInt', async () => { const bigInt = BigInt('9007199254740992'); - const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(bigInt, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -188,14 +188,14 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toBe(BigInt(9007199254740992)); expect(typeof hydrated).toBe('bigint'); }); - it('should work with BigInt negative', () => { + it('should work with BigInt negative', async () => { const bigInt = BigInt('-12345678901234567890'); - const serialized = dehydrateWorkflowArguments(bigInt, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(bigInt, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -243,17 +243,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toBe(BigInt('-12345678901234567890')); expect(typeof hydrated).toBe('bigint'); }); - it('should work with Map', () => { + it('should work with Map', async () => { const map = new Map([ [2, 'foo'], [6, 'bar'], ]); - const serialized = dehydrateWorkflowArguments(map, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(map, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -308,15 +308,15 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Map', context)).toBe(true); }); - it('should work with Set', () => { + it('should work with Set', async () => { const set = new Set([1, '2', true]); - const serialized = dehydrateWorkflowArguments(set, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(set, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -356,22 +356,22 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Set', context)).toBe(true); }); - it('should work with WritableStream', () => { + it('should work with WritableStream', async () => { const stream = new WritableStream(); - const serialized = dehydrateWorkflowArguments(stream, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(stream, [], mockRunId); expect(serialized instanceof Uint8Array).toBe(true); // Verify the serialized data contains WritableStream reference const serializedStr = new TextDecoder().decode(serialized); expect(serializedStr).toContain('WritableStream'); class OurWritableStream {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { WritableStream: OurWritableStream, }); expect(hydrated).toBeInstanceOf(OurWritableStream); @@ -379,16 +379,16 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - it('should work with ReadableStream', () => { + it('should work with ReadableStream', async () => { const stream = new ReadableStream(); - const serialized = dehydrateWorkflowArguments(stream, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(stream, [], mockRunId); expect(serialized instanceof Uint8Array).toBe(true); // Verify the serialized data contains ReadableStream reference const serializedStr = new TextDecoder().decode(serialized); expect(serializedStr).toContain('ReadableStream'); class OurReadableStream {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { ReadableStream: OurReadableStream, }); expect(hydrated).toBeInstanceOf(OurReadableStream); @@ -396,12 +396,12 @@ describe('workflow arguments', () => { expect(streamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - it('should work with Headers', () => { + it('should work with Headers', async () => { const headers = new Headers(); headers.set('foo', 'bar'); headers.append('set-cookie', 'a'); headers.append('set-cookie', 'b'); - const serialized = dehydrateWorkflowArguments(headers, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(headers, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -485,13 +485,13 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toBeInstanceOf(Headers); expect(hydrated.get('foo')).toEqual('bar'); expect(hydrated.get('set-cookie')).toEqual('a, b'); }); - it('should work with Response', () => { + it('should work with Response', async () => { const response = new Response('Hello, world!', { status: 202, statusText: 'Custom', @@ -501,7 +501,11 @@ describe('workflow arguments', () => { ['set-cookie', 'b'], ]), }); - const serialized = dehydrateWorkflowArguments(response, [], mockRunId); + const serialized = await dehydrateWorkflowArguments( + response, + [], + mockRunId + ); expect(serialized instanceof Uint8Array).toBe(true); // Verify the serialized data contains Response reference const serializedStr = new TextDecoder().decode(serialized); @@ -518,7 +522,7 @@ describe('workflow arguments', () => { } class OurReadableStream {} class OurHeaders {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { Headers: OurHeaders, Response: OurResponse, ReadableStream: OurReadableStream, @@ -531,10 +535,10 @@ describe('workflow arguments', () => { expect(bodyStreamName).toMatch(/^strm_[0-9A-Z]{26}$/); }); - it('should work with URLSearchParams', () => { + it('should work with URLSearchParams', async () => { const params = new URLSearchParams('a=1&b=2&a=3'); - const serialized = dehydrateWorkflowArguments(params, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(params, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -581,7 +585,7 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof URLSearchParams', context)).toBe(true); expect(hydrated.getAll('a')).toEqual(['1', '3']); @@ -594,10 +598,10 @@ describe('workflow arguments', () => { ]); }); - it('should work with empty URLSearchParams', () => { + it('should work with empty URLSearchParams', async () => { const params = new URLSearchParams(); - const serialized = dehydrateWorkflowArguments(params, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(params, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -634,17 +638,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof URLSearchParams', context)).toBe(true); expect(hydrated.toString()).toEqual(''); expect(Array.from(hydrated.entries())).toEqual([]); }); - it('should work with empty ArrayBuffer', () => { + it('should work with empty ArrayBuffer', async () => { const buffer = new ArrayBuffer(0); - const serialized = dehydrateWorkflowArguments(buffer, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(buffer, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -677,16 +681,16 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof ArrayBuffer', context)).toBe(true); expect(hydrated.byteLength).toEqual(0); }); - it('should work with empty Uint8Array', () => { + it('should work with empty Uint8Array', async () => { const array = new Uint8Array(0); - const serialized = dehydrateWorkflowArguments(array, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -718,17 +722,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Uint8Array', context)).toBe(true); expect(hydrated.length).toEqual(0); expect(hydrated.byteLength).toEqual(0); }); - it('should work with empty Int32Array', () => { + it('should work with empty Int32Array', async () => { const array = new Int32Array(0); - const serialized = dehydrateWorkflowArguments(array, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -760,17 +764,17 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Int32Array', context)).toBe(true); expect(hydrated.length).toEqual(0); expect(hydrated.byteLength).toEqual(0); }); - it('should work with empty Float64Array', () => { + it('should work with empty Float64Array', async () => { const array = new Float64Array(0); - const serialized = dehydrateWorkflowArguments(array, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(array, [], mockRunId); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -804,14 +808,14 @@ describe('workflow arguments', () => { ] `); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); vmGlobalThis.val = hydrated; expect(runInContext('val instanceof Float64Array', context)).toBe(true); expect(hydrated.length).toEqual(0); expect(hydrated.byteLength).toEqual(0); }); - it('should work with Request (without responseWritable)', () => { + it('should work with Request (without responseWritable)', async () => { // Mock STABLE_ULID to return a deterministic value const originalStableUlid = (globalThis as any)[STABLE_ULID]; (globalThis as any)[STABLE_ULID] = () => '01ARZ3NDEKTSV4RRFFQ69G5FA1'; @@ -827,7 +831,11 @@ describe('workflow arguments', () => { duplex: 'half', } as RequestInit); - const serialized = dehydrateWorkflowArguments(request, [], mockRunId); + const serialized = await dehydrateWorkflowArguments( + request, + [], + mockRunId + ); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -1136,7 +1144,7 @@ describe('workflow arguments', () => { } class OurReadableStream {} class OurHeaders {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { Request: OurRequest, Headers: OurHeaders, ReadableStream: OurReadableStream, @@ -1152,7 +1160,7 @@ describe('workflow arguments', () => { } }); - it('should work with Request (with responseWritable)', () => { + it('should work with Request (with responseWritable)', async () => { // Mock STABLE_ULID to return deterministic values const originalStableUlid = (globalThis as any)[STABLE_ULID]; let ulidCounter = 0; @@ -1176,7 +1184,11 @@ describe('workflow arguments', () => { const responseWritable = new WritableStream(); request[Symbol.for('WEBHOOK_RESPONSE_WRITABLE')] = responseWritable; - const serialized = dehydrateWorkflowArguments(request, [], mockRunId); + const serialized = await dehydrateWorkflowArguments( + request, + [], + mockRunId + ); expect(serialized).toMatchInlineSnapshot(` Uint8Array [ 100, @@ -1552,7 +1564,7 @@ describe('workflow arguments', () => { class OurReadableStream {} class OurWritableStream {} class OurHeaders {} - const hydrated = hydrateWorkflowArguments(serialized, { + const hydrated = await hydrateWorkflowArguments(serialized, { Request: OurRequest, Headers: OurHeaders, ReadableStream: OurReadableStream, @@ -1579,11 +1591,11 @@ describe('workflow arguments', () => { } }); - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateWorkflowArguments(new Foo(), [], mockRunId); + await dehydrateWorkflowArguments(new Foo(), [], mockRunId); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1595,11 +1607,11 @@ describe('workflow arguments', () => { }); describe('workflow return value', () => { - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateWorkflowReturnValue(new Foo()); + await dehydrateWorkflowReturnValue(new Foo()); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1611,11 +1623,11 @@ describe('workflow return value', () => { }); describe('step arguments', () => { - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateStepArguments(new Foo(), globalThis); + await dehydrateStepArguments(new Foo(), globalThis); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1627,11 +1639,11 @@ describe('step arguments', () => { }); describe('step return value', () => { - it('should throw error for an unsupported type', () => { + it('should throw error for an unsupported type', async () => { class Foo {} let err: WorkflowRuntimeError | undefined; try { - dehydrateStepReturnValue(new Foo(), [], mockRunId); + await dehydrateStepReturnValue(new Foo(), [], mockRunId); } catch (err_) { err = err_ as WorkflowRuntimeError; } @@ -1689,7 +1701,7 @@ describe('step function serialization', () => { expect(retrieved).toBeUndefined(); }); - it('should deserialize step function name through reviver', () => { + it('should deserialize step function name through reviver', async () => { const stepName = 'step//test//testStep'; const stepFn = async () => 42; @@ -1706,11 +1718,11 @@ describe('step function serialization', () => { }); // Serialize using workflow reducers (which handle StepFunction) - const dehydrated = dehydrateStepArguments([fnWithStepId], globalThis); + const dehydrated = await dehydrateStepArguments([fnWithStepId], globalThis); // Hydrate it back using step revivers const ops: Promise[] = []; - const hydrated = hydrateStepArguments( + const hydrated = await hydrateStepArguments( dehydrated, ops, mockRunId, @@ -1721,7 +1733,7 @@ describe('step function serialization', () => { expect(hydrated[0]).toBe(stepFn); }); - it('should deserialize step function using workflows/example path aliases', () => { + it('should deserialize step function using workflows/example path aliases', async () => { const registeredStepId = 'step//./example/workflows/99_e2e//doubleNumber'; const aliasedStepId = 'step//./workflows/99_e2e//doubleNumber'; const stepFn = async () => 42; @@ -1735,9 +1747,9 @@ describe('step function serialization', () => { enumerable: false, configurable: false, }); - const dehydrated = dehydrateStepArguments([fnWithStepId], globalThis); + const dehydrated = await dehydrateStepArguments([fnWithStepId], globalThis); const ops: Promise[] = []; - const hydrated = hydrateStepArguments( + const hydrated = await hydrateStepArguments( dehydrated, ops, mockRunId, @@ -1748,7 +1760,7 @@ describe('step function serialization', () => { expect(result).toBe(stepFn); }); - it('should deserialize step function using workflows/src path aliases', () => { + it('should deserialize step function using workflows/src path aliases', async () => { const registeredStepId = 'step//./src/workflows/99_e2e//doubleFromSrc'; const aliasedStepId = 'step//./workflows/99_e2e//doubleFromSrc'; const stepFn = async () => 42; @@ -1762,9 +1774,9 @@ describe('step function serialization', () => { enumerable: false, configurable: false, }); - const dehydrated = dehydrateStepArguments([fnWithStepId], globalThis); + const dehydrated = await dehydrateStepArguments([fnWithStepId], globalThis); const ops: Promise[] = []; - const hydrated = hydrateStepArguments( + const hydrated = await hydrateStepArguments( dehydrated, ops, mockRunId, @@ -1775,7 +1787,7 @@ describe('step function serialization', () => { expect(result).toBe(stepFn); }); - it('should throw error when reviver cannot find registered step function', () => { + it('should throw error when reviver cannot find registered step function', async () => { // Create a function with a non-existent stepId const fnWithNonExistentStepId = async () => 42; Object.defineProperty(fnWithNonExistentStepId, 'stepId', { @@ -1786,7 +1798,7 @@ describe('step function serialization', () => { }); // Serialize the step function reference - const dehydrated = dehydrateStepArguments( + const dehydrated = await dehydrateStepArguments( [fnWithNonExistentStepId], globalThis ); @@ -1795,7 +1807,7 @@ describe('step function serialization', () => { const ops: Promise[] = []; let err: Error | undefined; try { - hydrateStepArguments(dehydrated, ops, mockRunId, globalThis); + await hydrateStepArguments(dehydrated, ops, mockRunId, globalThis); } catch (err_) { err = err_ as Error; } @@ -1805,7 +1817,7 @@ describe('step function serialization', () => { expect(err?.message).toContain('Make sure the step function is registered'); }); - it('should dehydrate step function passed as argument to a step', () => { + it('should dehydrate step function passed as argument to a step', async () => { const stepName = 'step//workflows/test.ts//myStep'; const stepFn = async (x: number) => x * 2; @@ -1825,7 +1837,7 @@ describe('step function serialization', () => { const args = [stepFn, 42]; // This should serialize the step function by its name using the reducer - const dehydrated = dehydrateStepArguments(args, globalThis); + const dehydrated = await dehydrateStepArguments(args, globalThis); // Verify it dehydrated successfully expect(dehydrated).toBeDefined(); @@ -1871,7 +1883,7 @@ describe('step function serialization', () => { // Serialize the step function with closure variables const args = [stepFn, 7]; - const dehydrated = dehydrateStepArguments(args, globalThis); + const dehydrated = await dehydrateStepArguments(args, globalThis); // Verify it serialized expect(dehydrated).toBeDefined(); @@ -1882,7 +1894,7 @@ describe('step function serialization', () => { expect(serialized).toContain('prefix'); // Now hydrate it back - const hydrated = hydrateStepArguments( + const hydrated = await hydrateStepArguments( dehydrated, [], 'test-run-123', @@ -1939,7 +1951,7 @@ describe('step function serialization', () => { expect(result).toEqual({ stepId: stepName }); }); - it('should hydrate step function from workflow arguments using WORKFLOW_USE_STEP', () => { + it('should hydrate step function from workflow arguments using WORKFLOW_USE_STEP', async () => { // This tests the flow: client mode serializes step function with stepId, // workflow mode deserializes it using WORKFLOW_USE_STEP from vmGlobalThis const stepId = 'step//workflows/test.ts//addNumbers'; @@ -1972,7 +1984,7 @@ describe('step function serialization', () => { // Serialize from client side using external reducers const ops: Promise[] = []; - const dehydrated = dehydrateWorkflowArguments( + const dehydrated = await dehydrateWorkflowArguments( [clientStepFn, 3, 5], ops, mockRunId, @@ -1980,7 +1992,7 @@ describe('step function serialization', () => { ); // Hydrate in workflow context using VM's globalThis - const hydrated = hydrateWorkflowArguments(dehydrated, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(dehydrated, vmGlobalThis); // Verify the hydrated result expect(Array.isArray(hydrated)).toBe(true); @@ -1997,7 +2009,7 @@ describe('step function serialization', () => { expect(hydratedStepFn.stepId).toBe(stepId); }); - it('should throw error when WORKFLOW_USE_STEP is not set on globalThis', () => { + it('should throw error when WORKFLOW_USE_STEP is not set on globalThis', async () => { const stepId = 'step//workflows/test.ts//missingUseStep'; // Create a VM context WITHOUT setting up WORKFLOW_USE_STEP @@ -2017,7 +2029,7 @@ describe('step function serialization', () => { // Serialize from client side const ops: Promise[] = []; - const dehydrated = dehydrateWorkflowArguments( + const dehydrated = await dehydrateWorkflowArguments( [clientStepFn], ops, mockRunId, @@ -2025,9 +2037,9 @@ describe('step function serialization', () => { ); // Hydrating should throw because WORKFLOW_USE_STEP is not set - expect(() => hydrateWorkflowArguments(dehydrated, vmGlobalThis)).toThrow( - 'WORKFLOW_USE_STEP not found on global object' - ); + await expect( + hydrateWorkflowArguments(dehydrated, vmGlobalThis) + ).rejects.toThrow('WORKFLOW_USE_STEP not found on global object'); }); }); @@ -2066,7 +2078,7 @@ describe('custom class serialization', () => { context ); - it('should serialize and deserialize a class with WORKFLOW_SERIALIZE/DESERIALIZE', () => { + it('should serialize and deserialize a class with WORKFLOW_SERIALIZE/DESERIALIZE', async () => { // Define the class in the host context (for serialization) class Point { constructor( @@ -2112,7 +2124,7 @@ describe('custom class serialization', () => { ); const point = new Point(10, 20); - const serialized = dehydrateWorkflowArguments(point, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(point, [], mockRunId); // Verify it serialized with the Instance type expect(serialized).toBeDefined(); @@ -2122,7 +2134,7 @@ describe('custom class serialization', () => { expect(serializedStr).toContain('test/Point'); // Hydrate it back (inside the VM context) - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); // Note: hydrated is an instance of the VM's Point class, not the host's // so we check constructor.name instead of instanceof expect(hydrated.constructor.name).toBe('Point'); @@ -2130,7 +2142,7 @@ describe('custom class serialization', () => { expect(hydrated.y).toBe(20); }); - it('should serialize nested custom serializable objects', () => { + it('should serialize nested custom serializable objects', async () => { // Define the class in the host context (for serialization) class Vector { constructor( @@ -2182,8 +2194,8 @@ describe('custom class serialization', () => { }, }; - const serialized = dehydrateWorkflowArguments(data, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const serialized = await dehydrateWorkflowArguments(data, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated.name).toBe('test'); expect(hydrated.vector.constructor.name).toBe('Vector'); @@ -2194,7 +2206,7 @@ describe('custom class serialization', () => { expect(hydrated.nested.anotherVector.dy).toBe(2); }); - it('should serialize custom class in an array', () => { + it('should serialize custom class in an array', async () => { // Define the class in the host context (for serialization) class Item { constructor(public id: string) {} @@ -2236,8 +2248,8 @@ describe('custom class serialization', () => { const items = [new Item('a'), new Item('b'), new Item('c')]; - const serialized = dehydrateWorkflowArguments(items, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const serialized = await dehydrateWorkflowArguments(items, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(Array.isArray(hydrated)).toBe(true); expect(hydrated).toHaveLength(3); @@ -2249,7 +2261,7 @@ describe('custom class serialization', () => { expect(hydrated[2].id).toBe('c'); }); - it('should work with step arguments', () => { + it('should work with step arguments', async () => { class Config { constructor( public setting: string, @@ -2272,8 +2284,8 @@ describe('custom class serialization', () => { registerSerializationClass('test/Config', Config); const config = new Config('maxRetries', 3); - const serialized = dehydrateStepArguments([config], globalThis); - const hydrated = hydrateStepArguments( + const serialized = await dehydrateStepArguments([config], globalThis); + const hydrated = await hydrateStepArguments( serialized, [], mockRunId, @@ -2286,7 +2298,7 @@ describe('custom class serialization', () => { expect(hydrated[0].value).toBe(3); }); - it('should work with step return values', () => { + it('should work with step return values', async () => { class Result { constructor( public success: boolean, @@ -2309,16 +2321,16 @@ describe('custom class serialization', () => { registerSerializationClass('test/Result', Result); const result = new Result(true, 'completed'); - const serialized = dehydrateStepReturnValue(result, [], mockRunId); + const serialized = await dehydrateStepReturnValue(result, [], mockRunId); // Step return values are hydrated with workflow revivers - const hydrated = hydrateWorkflowArguments(serialized, globalThis); + const hydrated = await hydrateWorkflowArguments(serialized, globalThis); expect(hydrated).toBeInstanceOf(Result); expect(hydrated.success).toBe(true); expect(hydrated.data).toBe('completed'); }); - it('should not serialize classes without WORKFLOW_SERIALIZE', () => { + it('should not serialize classes without WORKFLOW_SERIALIZE', async () => { class PlainClass { constructor(public value: string) {} } @@ -2326,10 +2338,12 @@ describe('custom class serialization', () => { const instance = new PlainClass('test'); // Should throw because PlainClass is not serializable - expect(() => dehydrateWorkflowArguments(instance, [], mockRunId)).toThrow(); + await expect( + dehydrateWorkflowArguments(instance, [], mockRunId) + ).rejects.toThrow(); }); - it('should throw error when classId is missing', () => { + it('should throw error when classId is missing', async () => { // NOTE: Missing `classId` property so serializatoin will fail. class NoClassId { constructor(public value: string) {} @@ -2348,14 +2362,14 @@ describe('custom class serialization', () => { // Should throw with our specific error message about missing classId let errorMessage = ''; try { - dehydrateWorkflowArguments(instance, [], mockRunId); + await dehydrateWorkflowArguments(instance, [], mockRunId); } catch (e: any) { errorMessage = e.cause?.message || e.message; } expect(errorMessage).toMatch(/must have a static "classId" property/); }); - it('should serialize class with complex data types in payload', () => { + it('should serialize class with complex data types in payload', async () => { class ComplexData { constructor( public items: Map, @@ -2387,8 +2401,8 @@ describe('custom class serialization', () => { const date = new Date('2025-01-01T00:00:00.000Z'); const complex = new ComplexData(map, date); - const serialized = dehydrateWorkflowArguments(complex, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, globalThis); + const serialized = await dehydrateWorkflowArguments(complex, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, globalThis); expect(hydrated).toBeInstanceOf(ComplexData); expect(hydrated.items).toBeInstanceOf(Map); @@ -2398,7 +2412,7 @@ describe('custom class serialization', () => { expect(hydrated.created.toISOString()).toBe('2025-01-01T00:00:00.000Z'); }); - it('should pass class as this context to WORKFLOW_SERIALIZE and WORKFLOW_DESERIALIZE', () => { + it('should pass class as this context to WORKFLOW_SERIALIZE and WORKFLOW_DESERIALIZE', async () => { // This test verifies that serialize.call(cls, value) and deserialize.call(cls, data) // properly pass the class as `this` context, which is required when the serializer/deserializer // needs to access static properties or methods on the class @@ -2439,13 +2453,13 @@ describe('custom class serialization', () => { // Serialize an instance - this should increment serializedCount via `this` const counter = new Counter(42); - const serialized = dehydrateWorkflowArguments(counter, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(counter, [], mockRunId); // Verify serialization used `this` correctly expect(Counter.serializedCount).toBe(1); // Deserialize - this should increment deserializedCount via `this` - const hydrated = hydrateWorkflowArguments(serialized, globalThis); + const hydrated = await hydrateWorkflowArguments(serialized, globalThis); // Verify deserialization used `this` correctly expect(Counter.deserializedCount).toBe(1); @@ -2454,7 +2468,7 @@ describe('custom class serialization', () => { // Serialize another instance to verify counter increments const counter2 = new Counter(100); - dehydrateWorkflowArguments(counter2, [], mockRunId); + await dehydrateWorkflowArguments(counter2, [], mockRunId); expect(Counter.serializedCount).toBe(2); }); }); @@ -2465,70 +2479,84 @@ describe('format prefix system', () => { fixedTimestamp: 1714857600000, }); - it('should encode data with format prefix', () => { + it('should encode data with format prefix', async () => { const data = { message: 'hello' }; - const serialized = dehydrateWorkflowArguments(data, [], mockRunId); + const serialized = await dehydrateWorkflowArguments(data, [], mockRunId); // Check that the first 4 bytes are the format prefix "devl" const prefix = new TextDecoder().decode(serialized.subarray(0, 4)); expect(prefix).toBe('devl'); }); - it('should decode prefixed data correctly', () => { + it('should decode prefixed data correctly', async () => { const data = { message: 'hello', count: 42 }; - const serialized = dehydrateWorkflowArguments(data, [], mockRunId); - const hydrated = hydrateWorkflowArguments(serialized, vmGlobalThis); + const serialized = await dehydrateWorkflowArguments(data, [], mockRunId); + const hydrated = await hydrateWorkflowArguments(serialized, vmGlobalThis); expect(hydrated).toEqual({ message: 'hello', count: 42 }); }); - it('should handle all dehydrate/hydrate function pairs with format prefix', () => { + it('should handle all dehydrate/hydrate function pairs with format prefix', async () => { const testData = { test: 'data', nested: { value: 123 } }; // Workflow arguments - const workflowArgs = dehydrateWorkflowArguments(testData, [], mockRunId); + const workflowArgs = await dehydrateWorkflowArguments( + testData, + [], + mockRunId + ); expect(new TextDecoder().decode(workflowArgs.subarray(0, 4))).toBe('devl'); - expect(hydrateWorkflowArguments(workflowArgs, vmGlobalThis)).toEqual( + expect(await hydrateWorkflowArguments(workflowArgs, vmGlobalThis)).toEqual( testData ); // Workflow return value - const workflowReturn = dehydrateWorkflowReturnValue(testData, globalThis); + const workflowReturn = await dehydrateWorkflowReturnValue( + testData, + globalThis + ); expect(new TextDecoder().decode(workflowReturn.subarray(0, 4))).toBe( 'devl' ); expect( - hydrateWorkflowReturnValue(workflowReturn, [], mockRunId, vmGlobalThis) + await hydrateWorkflowReturnValue( + workflowReturn, + [], + mockRunId, + vmGlobalThis + ) ).toEqual(testData); // Step arguments - const stepArgs = dehydrateStepArguments(testData, globalThis); + const stepArgs = await dehydrateStepArguments(testData, globalThis); expect(new TextDecoder().decode(stepArgs.subarray(0, 4))).toBe('devl'); - expect(hydrateStepArguments(stepArgs, [], mockRunId, vmGlobalThis)).toEqual( - testData - ); + expect( + await hydrateStepArguments(stepArgs, [], mockRunId, vmGlobalThis) + ).toEqual(testData); // Step return value - const stepReturn = dehydrateStepReturnValue(testData, [], mockRunId); + const stepReturn = await dehydrateStepReturnValue(testData, [], mockRunId); expect(new TextDecoder().decode(stepReturn.subarray(0, 4))).toBe('devl'); - expect(hydrateStepReturnValue(stepReturn, vmGlobalThis)).toEqual(testData); + expect(await hydrateStepReturnValue(stepReturn, vmGlobalThis)).toEqual( + testData + ); }); - it('should throw error for unknown format prefix', () => { + it('should throw error for unknown format prefix', async () => { // Create data with an unknown 4-character format prefix const unknownFormat = new TextEncoder().encode('unkn{"test":true}'); - expect(() => hydrateWorkflowArguments(unknownFormat, vmGlobalThis)).toThrow( - /Unknown serialization format/ - ); + await expect( + hydrateWorkflowArguments(unknownFormat, vmGlobalThis) + ).rejects.toThrow(/Unknown serialization format/); }); - it('should throw error for data too short to contain format prefix', () => { + it('should throw error for data too short to contain format prefix', async () => { const tooShort = new TextEncoder().encode('dev'); - expect(() => hydrateWorkflowArguments(tooShort, vmGlobalThis)).toThrow( - /Data too short to contain format prefix/ - ); + await expect( + hydrateWorkflowArguments(tooShort, vmGlobalThis) + ).rejects.toThrow(/Data too short to contain format prefix/); }); }); diff --git a/packages/core/src/serialization.ts b/packages/core/src/serialization.ts index 40cba6e5b1..9694915ea0 100644 --- a/packages/core/src/serialization.ts +++ b/packages/core/src/serialization.ts @@ -1375,13 +1375,13 @@ function getStepRevivers( * @param runId * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateWorkflowArguments( +export async function dehydrateWorkflowArguments( value: unknown, ops: Promise[], runId: string, global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getExternalReducers(global, ops, runId)); if (v1Compat) { @@ -1406,7 +1406,7 @@ export function dehydrateWorkflowArguments( * @param extraRevivers * @returns The hydrated value */ -export function hydrateWorkflowArguments( +export async function hydrateWorkflowArguments( value: Uint8Array | unknown, global: Record = globalThis, extraRevivers: Record any> = {} @@ -1440,11 +1440,11 @@ export function hydrateWorkflowArguments( * @param global * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateWorkflowReturnValue( +export async function dehydrateWorkflowReturnValue( value: unknown, global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getWorkflowReducers(global)); if (v1Compat) { @@ -1472,7 +1472,7 @@ export function dehydrateWorkflowReturnValue( * @param runId * @returns The hydrated return value, ready to be consumed by the client */ -export function hydrateWorkflowReturnValue( +export async function hydrateWorkflowReturnValue( value: Uint8Array | unknown, ops: Promise[], runId: string, @@ -1509,11 +1509,11 @@ export function hydrateWorkflowReturnValue( * @param global * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateStepArguments( +export async function dehydrateStepArguments( value: unknown, global: Record, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getWorkflowReducers(global)); if (v1Compat) { @@ -1540,7 +1540,7 @@ export function dehydrateStepArguments( * @param runId * @returns The hydrated value, ready to be consumed by the step user-code function */ -export function hydrateStepArguments( +export async function hydrateStepArguments( value: Uint8Array | unknown, ops: Promise[], runId: string, @@ -1579,13 +1579,13 @@ export function hydrateStepArguments( * @param runId * @returns The dehydrated value as binary data (Uint8Array) with format prefix */ -export function dehydrateStepReturnValue( +export async function dehydrateStepReturnValue( value: unknown, ops: Promise[], runId: string, global: Record = globalThis, v1Compat = false -): Uint8Array | unknown { +): Promise { try { const str = stringify(value, getStepReducers(global, ops, runId)); if (v1Compat) { @@ -1610,7 +1610,7 @@ export function dehydrateStepReturnValue( * @param extraRevivers * @returns The hydrated return value of a step, ready to be consumed by the workflow handler */ -export function hydrateStepReturnValue( +export async function hydrateStepReturnValue( value: Uint8Array | unknown, global: Record = globalThis, extraRevivers: Record any> = {} diff --git a/packages/core/src/step.test.ts b/packages/core/src/step.test.ts index f3724ce25f..834e391588 100644 --- a/packages/core/src/step.test.ts +++ b/packages/core/src/step.test.ts @@ -41,7 +41,7 @@ describe('createUseStep', () => { eventType: 'step_completed', correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - result: dehydrateStepReturnValue(3), + result: await dehydrateStepReturnValue(3, [], 'wrun_test'), }, createdAt: new Date(), }, @@ -192,7 +192,7 @@ describe('createUseStep', () => { eventType: 'step_completed', correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - result: dehydrateStepReturnValue(undefined), + result: await dehydrateStepReturnValue(undefined, [], 'wrun_test'), }, createdAt: new Date(), }, @@ -411,7 +411,7 @@ describe('createUseStep', () => { eventType: 'step_completed', correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - result: dehydrateStepReturnValue(42), + result: await dehydrateStepReturnValue(42, [], 'wrun_test'), }, createdAt: new Date(), }, diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index caec871107..5d3275e6e7 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -142,13 +142,19 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { ctx.invocationsQueue.delete(event.correlationId); // Step has completed, so resolve the Promise with the cached result - const hydratedResult = hydrateStepReturnValue( - event.eventData.result, - ctx.globalThis - ); - setTimeout(() => { - resolve(hydratedResult); - }, 0); + // Preserve macrotask timing semantics (setTimeout) to match + // the original synchronous code path's scheduling behavior + hydrateStepReturnValue(event.eventData.result, ctx.globalThis) + .then((hydratedResult) => { + setTimeout(() => { + resolve(hydratedResult); + }, 0); + }) + .catch((error) => { + setTimeout(() => { + reject(error); + }, 0); + }); return EventConsumerResult.Finished; } diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index e41b621eaa..c14496742b 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -31,7 +31,7 @@ describe('runWorkflow', () => { runId: 'wrun_123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -41,7 +41,9 @@ describe('runWorkflow', () => { const events: Event[] = []; const result = await runWorkflow(workflowCode, workflowRun, events); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual('success'); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual('success'); }); it('should execute workflow with arguments', async () => { @@ -52,7 +54,7 @@ describe('runWorkflow', () => { runId: 'wrun_123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([1, 2], ops), + input: await dehydrateWorkflowArguments([1, 2], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -62,7 +64,9 @@ describe('runWorkflow', () => { const events: Event[] = []; const result = await runWorkflow(workflowCode, workflowRun, events); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(3); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual(3); }); it('allow user code to handle user-defined errors', async () => { @@ -79,7 +83,7 @@ describe('runWorkflow', () => { runId: 'wrun_123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -88,9 +92,10 @@ describe('runWorkflow', () => { const events: Event[] = []; - const result = hydrateWorkflowReturnValue( + const result = await hydrateWorkflowReturnValue( (await runWorkflow(workflowCode, workflowRun, events)) as any, - ops + ops, + 'wrun_test' ); assert(types.isNativeError(result)); expect(result.name).toEqual('TypeError'); @@ -105,7 +110,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -126,7 +131,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HF', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -142,7 +147,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(3); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual(3); }); // Test that timestamps update correctly as events are consumed @@ -153,7 +160,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -194,7 +201,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HF', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), }, @@ -218,7 +225,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HG', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date('2024-01-01T00:00:04.000Z'), }, @@ -242,7 +249,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00Y11PCQTCHQRK34HH', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date('2024-01-01T00:00:06.000Z'), }, @@ -269,7 +276,9 @@ describe('runWorkflow', () => { // - After step 1 completes (at 2s), timestamp advances to step2_created (2.5s) // - After step 2 completes (at 4s), timestamp advances to step3_created (4.5s) // - After step 3 completes: 6s - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual([ new Date('2024-01-01T00:00:00.000Z'), 1704067202500, // 2.5s (step2_created timestamp) 1704067204500, // 4.5s (step3_created timestamp) @@ -287,7 +296,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -347,8 +356,16 @@ describe('runWorkflow', () => { ]); // The date should be the same - const date1 = hydrateWorkflowReturnValue(result1 as any, ops); - const date2 = hydrateWorkflowReturnValue(result2 as any, ops); + const date1 = await hydrateWorkflowReturnValue( + result1 as any, + ops, + 'wrun_test' + ); + const date2 = await hydrateWorkflowReturnValue( + result2 as any, + ops, + 'wrun_test' + ); expect(date1).toEqual(date2); } ); @@ -361,7 +378,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -389,7 +406,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -399,7 +416,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(7, ops), + result: await dehydrateStepReturnValue(7, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -414,7 +431,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([3, 7]); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual([3, 7]); }); it('should resolve `Promise.race()` steps that have `step_completed` events (first promise resolves first)', async () => { @@ -424,7 +443,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -452,7 +471,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -462,7 +481,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(7, ops), + result: await dehydrateStepReturnValue(7, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -477,7 +496,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(3); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual(3); }); it('should resolve `Promise.race()` steps that have `step_completed` events (second promise resolves first)', async () => { @@ -487,7 +508,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -515,7 +536,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(7, ops), + result: await dehydrateStepReturnValue(7, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -525,7 +546,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, createdAt: new Date(), }, @@ -540,7 +561,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(7); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual(7); }); it('should handle Promise.race with multiple concurrent steps completing out of order', async () => { @@ -549,7 +572,7 @@ describe('runWorkflow', () => { runId: 'wrun_01K75533W56DAE35VY3082DN3P', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -596,7 +619,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGH', eventData: { - result: dehydrateStepReturnValue(4, ops), + result: await dehydrateStepReturnValue(4, ops, 'wrun_test'), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553EABWCK00JQ9R8P1FTK7', @@ -606,7 +629,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGG', eventData: { - result: dehydrateStepReturnValue(3, ops), + result: await dehydrateStepReturnValue(3, ops, 'wrun_test'), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553F31YS6C94NG23WGEEMV', @@ -616,7 +639,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGF', eventData: { - result: dehydrateStepReturnValue(2, ops), + result: await dehydrateStepReturnValue(2, ops, 'wrun_test'), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553G0XEE4R440QS5SV89YE', @@ -626,7 +649,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGE', eventData: { - result: dehydrateStepReturnValue(1, ops), + result: await dehydrateStepReturnValue(1, ops, 'wrun_test'), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553HS9R1XJQKVVW0ZRCMNP', @@ -636,7 +659,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X00DKMJB5AQEJZ3FQGD', eventData: { - result: dehydrateStepReturnValue(0, ops), + result: await dehydrateStepReturnValue(0, ops, 'wrun_test'), }, runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K7553K67FQG02YCFE9QDKJ90', @@ -666,9 +689,14 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ - 4, 3, 2, 1, 0, - ]); + const raceResults = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); + // All 5 race results should be present (order depends on microtask timing) + expect(raceResults).toHaveLength(5); + expect(raceResults.sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4]); }); }); @@ -681,7 +709,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'value', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -713,7 +741,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -743,7 +771,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'testWorkflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -776,7 +804,7 @@ describe('runWorkflow', () => { runId: 'test-run-nested', workflowName: 'nestedWorkflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -821,7 +849,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -864,7 +892,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -909,7 +937,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -957,7 +985,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -996,7 +1024,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1025,7 +1053,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1054,7 +1082,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1083,7 +1111,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1112,7 +1140,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1141,7 +1169,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1172,7 +1200,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1211,7 +1239,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1246,7 +1274,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1260,9 +1288,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { message: 'Hello from hook' }, - ops + ops, + 'wrun_test' ), }, createdAt: new Date(), @@ -1279,9 +1308,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'Hello from hook' - ); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual('Hello from hook'); }); it('should resolve multiple `createHook` awaits upon "hook_received" events', async () => { @@ -1290,7 +1319,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1304,9 +1333,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { message: 'First payload' }, - ops + ops, + 'wrun_test' ), }, createdAt: new Date(), @@ -1317,9 +1347,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { message: 'Second payload' }, - ops + ops, + 'wrun_test' ), }, createdAt: new Date(), @@ -1337,10 +1368,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ - 'First payload', - 'Second payload', - ]); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual(['First payload', 'Second payload']); }); it('should support `for await` loops with `createHook`', async () => { @@ -1349,7 +1379,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1363,9 +1393,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { count: 1, status: 'active' }, - ops + ops, + 'wrun_test' ), }, createdAt: new Date(), @@ -1376,9 +1407,10 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue( + payload: await dehydrateStepReturnValue( { count: 2, status: 'complete' }, - ops + ops, + 'wrun_test' ), }, createdAt: new Date(), @@ -1401,7 +1433,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual([ + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual([ { count: 1, status: 'active' }, { count: 2, status: 'complete' }, ]); @@ -1413,7 +1447,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1427,7 +1461,11 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ value: 100 }, ops), + payload: await dehydrateStepReturnValue( + { value: 100 }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, @@ -1437,7 +1475,11 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ value: 200 }, ops), + payload: await dehydrateStepReturnValue( + { value: 200 }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, @@ -1453,7 +1495,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual(100); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual(100); }); it('should support multiple queued "hook_received" events with step events in between', async () => { @@ -1462,7 +1506,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1476,7 +1520,11 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ data: 'first' }, ops), + payload: await dehydrateStepReturnValue( + { data: 'first' }, + ops, + 'wrun_test' + ), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -1486,7 +1534,11 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ data: 'second' }, ops), + payload: await dehydrateStepReturnValue( + { data: 'second' }, + ops, + 'wrun_test' + ), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), }, @@ -1503,7 +1555,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(42, ops), + result: await dehydrateStepReturnValue(42, ops, 'wrun_test'), }, createdAt: new Date('2024-01-01T00:00:04.000Z'), }, @@ -1526,7 +1578,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual({ + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual({ data1: 'first', stepResult: 42, data2: 'second', @@ -1539,7 +1593,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1553,7 +1607,11 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ iteration: 1 }, ops), + payload: await dehydrateStepReturnValue( + { iteration: 1 }, + ops, + 'wrun_test' + ), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -1570,7 +1628,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue(10, ops), + result: await dehydrateStepReturnValue(10, ops, 'wrun_test'), }, createdAt: new Date('2024-01-01T00:00:03.000Z'), }, @@ -1606,7 +1664,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1620,7 +1678,11 @@ describe('runWorkflow', () => { eventType: 'hook_received', correlationId: 'hook_01HK153X008RT6YEW43G8QX6JX', eventData: { - payload: dehydrateStepReturnValue({ result: 'success' }, ops), + payload: await dehydrateStepReturnValue( + { result: 'success' }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, @@ -1636,7 +1698,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual({ + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual({ token: 'my-custom-token', result: 'success', }); @@ -1648,7 +1712,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1695,7 +1759,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1747,7 +1811,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1763,7 +1827,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(201); expect(res.body).toBeInstanceOf(ReadableStream); @@ -1779,7 +1847,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1795,7 +1863,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(201); expect(res.headers.get('content-type')).toEqual('application/json'); @@ -1811,7 +1883,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1830,7 +1902,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(202); expect(res.headers.get('X-Custom-Header')).toEqual('custom-value'); @@ -1843,7 +1919,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1859,7 +1935,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(204); expect(res.body).toBeNull(); @@ -1871,7 +1951,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1888,7 +1968,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(200); expect(res.body).toBeInstanceOf(ReadableStream); @@ -1904,7 +1988,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1933,7 +2017,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1949,7 +2033,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(302); expect(res.headers.get('Location')).toEqual( @@ -1964,7 +2052,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -1980,7 +2068,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const res = hydrateWorkflowReturnValue(result as any, ops); + const res = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(res).toBeInstanceOf(Response); expect(res.status).toEqual(301); expect(res.headers.get('Location')).toEqual( @@ -1994,7 +2086,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2012,7 +2104,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const statuses = hydrateWorkflowReturnValue(result as any, ops); + const statuses = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(statuses).toEqual([301, 302, 303, 307, 308]); }); @@ -2022,7 +2118,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2053,7 +2149,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2069,7 +2165,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const req = hydrateWorkflowReturnValue(result as any, ops); + const req = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(req).toBeInstanceOf(Request); expect(req.method).toEqual('GET'); expect(req.url).toEqual('https://example.com/api'); @@ -2082,7 +2182,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2101,7 +2201,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const req = hydrateWorkflowReturnValue(result as any, ops); + const req = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(req).toBeInstanceOf(Request); expect(req.method).toEqual('POST'); expect(req.url).toEqual('https://example.com/api'); @@ -2118,7 +2222,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2141,7 +2245,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const req = hydrateWorkflowReturnValue(result as any, ops); + const req = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(req).toBeInstanceOf(Request); expect(req.headers.get('Content-Type')).toEqual('application/json'); expect(req.headers.get('X-Custom-Header')).toEqual('custom-value'); @@ -2153,7 +2261,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2173,7 +2281,11 @@ describe('runWorkflow', () => { workflowRun, events ); - const req = hydrateWorkflowReturnValue(result as any, ops); + const req = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); expect(req).toBeInstanceOf(Request); expect(req.method).toEqual('PUT'); expect(req.body).toBeInstanceOf(ReadableStream); @@ -2189,7 +2301,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2218,7 +2330,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2247,7 +2359,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2273,7 +2385,7 @@ describe('runWorkflow', () => { runId: 'test-clone-bug-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2309,7 +2421,11 @@ describe('runWorkflow', () => { events ); - const result_obj = hydrateWorkflowReturnValue(result as any, ops); + const result_obj = await hydrateWorkflowReturnValue( + result as any, + ops, + 'wrun_test' + ); // According to MDN, the req1 properties should be inherited // and only the method should be overridden by the init options @@ -2330,7 +2446,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2367,9 +2483,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'sleep completed' - ); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual('sleep completed'); }); it('should throw `WorkflowSuspension` when sleep has no wait_completed event', async () => { @@ -2381,7 +2497,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2416,7 +2532,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2469,9 +2585,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'all sleeps completed' - ); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual('all sleeps completed'); }); it('should suspend with multiple sleeps but only one wait_completed event (partial completion)', async () => { @@ -2483,7 +2599,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2545,7 +2661,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2566,7 +2682,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue(42, ops), + result: await dehydrateStepReturnValue(42, ops, 'wrun_test'), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -2600,7 +2716,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual({ + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual({ step: 42, slept: true, }); @@ -2614,7 +2732,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2651,9 +2769,9 @@ describe('runWorkflow', () => { workflowRun, events ); - expect(hydrateWorkflowReturnValue(result as any, ops)).toEqual( - 'sleep with date completed' - ); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual('sleep with date completed'); }); it('should reject with WorkflowRuntimeError when event log has duplicate wait_completed', async () => { @@ -2927,7 +3045,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2974,7 +3092,7 @@ describe('runWorkflow', () => { runId: 'test-run-123', workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops, 'wrun_test'), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index a14e28c770..69d8d9811b 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -635,7 +635,10 @@ export async function runWorkflow( ); } - const args = hydrateWorkflowArguments(workflowRun.input, vmGlobalThis); + const args = await hydrateWorkflowArguments( + workflowRun.input, + vmGlobalThis + ); span?.setAttributes({ ...Attribute.WorkflowArgumentsCount(args.length), @@ -647,7 +650,7 @@ export async function runWorkflow( workflowDiscontinuation.promise, ]); - const dehydrated = dehydrateWorkflowReturnValue(result, vmGlobalThis); + const dehydrated = await dehydrateWorkflowReturnValue(result, vmGlobalThis); span?.setAttributes({ ...Attribute.WorkflowResultType(typeof result), diff --git a/packages/core/src/workflow/hook.test.ts b/packages/core/src/workflow/hook.test.ts index 2ce1383b8b..778a544e82 100644 --- a/packages/core/src/workflow/hook.test.ts +++ b/packages/core/src/workflow/hook.test.ts @@ -42,7 +42,11 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ message: 'hello' }, ops), + payload: await dehydrateStepReturnValue( + { message: 'hello' }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, @@ -127,7 +131,11 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ data: 'test' }, ops), + payload: await dehydrateStepReturnValue( + { data: 'test' }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, @@ -191,7 +199,11 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ message: 'first' }, ops), + payload: await dehydrateStepReturnValue( + { message: 'first' }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, @@ -201,7 +213,11 @@ describe('createCreateHook', () => { eventType: 'hook_received', correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV', eventData: { - payload: dehydrateStepReturnValue({ message: 'second' }, ops), + payload: await dehydrateStepReturnValue( + { message: 'second' }, + ops, + 'wrun_test' + ), }, createdAt: new Date(), }, diff --git a/packages/core/src/workflow/hook.ts b/packages/core/src/workflow/hook.ts index 70763e87d0..7178a96f52 100644 --- a/packages/core/src/workflow/hook.ts +++ b/packages/core/src/workflow/hook.ts @@ -1,3 +1,4 @@ +import { ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; import type { HookConflictEvent, HookReceivedEvent } from '@workflow/world'; import type { Hook, HookOptions } from '../create-hook.js'; @@ -6,7 +7,6 @@ import { WorkflowSuspension } from '../global.js'; import { webhookLogger } from '../logger.js'; import type { WorkflowOrchestratorContext } from '../private.js'; import { hydrateStepReturnValue } from '../serialization.js'; -import { ERROR_SLUGS, WorkflowRuntimeError } from '@workflow/errors'; export function createCreateHook(ctx: WorkflowOrchestratorContext) { return function createHookImpl(options: HookOptions = {}): Hook { @@ -94,11 +94,13 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { const next = promises.shift(); if (next) { // Reconstruct the payload from the event data - const payload = hydrateStepReturnValue( - event.eventData.payload, - ctx.globalThis - ); - next.resolve(payload); + hydrateStepReturnValue(event.eventData.payload, ctx.globalThis) + .then((payload) => { + next.resolve(payload); + }) + .catch((error) => { + next.reject(error); + }); } } else { payloadsQueue.push(event); @@ -138,11 +140,13 @@ export function createCreateHook(ctx: WorkflowOrchestratorContext) { if (payloadsQueue.length > 0) { const nextPayload = payloadsQueue.shift(); if (nextPayload) { - const payload = hydrateStepReturnValue( - nextPayload.eventData.payload, - ctx.globalThis - ); - resolvers.resolve(payload); + hydrateStepReturnValue(nextPayload.eventData.payload, ctx.globalThis) + .then((payload) => { + resolvers.resolve(payload); + }) + .catch((error) => { + resolvers.reject(error); + }); return resolvers.promise; } } From cd18a1804cb2bb25ce4a322351bb5979ea291c7d Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Sat, 14 Feb 2026 02:36:03 -0800 Subject: [PATCH 2/3] Simplify changeset: remove unmodified packages --- .changeset/async-serde.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.changeset/async-serde.md b/.changeset/async-serde.md index a5453bf9cf..723da63291 100644 --- a/.changeset/async-serde.md +++ b/.changeset/async-serde.md @@ -1,10 +1,6 @@ --- "@workflow/core": patch "@workflow/cli": patch -"@workflow/web": patch -"@workflow/world-testing": patch --- -Make serialization functions async - -All 8 dehydrate/hydrate functions in the serialization layer are now async, returning Promises. This is a prerequisite for future encryption support where encrypt/decrypt operations are inherently asynchronous. No functional changes — the function bodies remain synchronous, only the signatures and all call sites have been updated. +Refactor serialization code to be asynchronous From 6d07e58a179dcb1f72870210b703fe8eec5f2791 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Tue, 17 Feb 2026 16:59:20 -0800 Subject: [PATCH 3/3] Fix tests --- packages/core/src/step.ts | 31 +++++++++++++++------------- packages/core/src/workflow.test.ts | 33 +++++++++++++----------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index 5d3275e6e7..1f5eda6cb9 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -141,20 +141,23 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { // Terminal state - we can remove the invocationQueue item ctx.invocationsQueue.delete(event.correlationId); - // Step has completed, so resolve the Promise with the cached result - // Preserve macrotask timing semantics (setTimeout) to match - // the original synchronous code path's scheduling behavior - hydrateStepReturnValue(event.eventData.result, ctx.globalThis) - .then((hydratedResult) => { - setTimeout(() => { - resolve(hydratedResult); - }, 0); - }) - .catch((error) => { - setTimeout(() => { - reject(error); - }, 0); - }); + // Step has completed, so resolve the Promise with the cached result. + // The hydration is async, so we schedule the resolve via setTimeout + // after hydration completes to preserve macrotask timing semantics. + // We use a single setTimeout that awaits hydration inside it, keeping + // the same scheduling order as the original synchronous code path + // (where setTimeout was called synchronously from this callback). + setTimeout(async () => { + try { + const hydratedResult = await hydrateStepReturnValue( + event.eventData.result, + ctx.globalThis + ); + resolve(hydratedResult); + } catch (error) { + reject(error); + } + }, 0); return EventConsumerResult.Finished; } diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index c14496742b..13b31a3d40 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -689,14 +689,9 @@ describe('runWorkflow', () => { workflowRun, events ); - const raceResults = await hydrateWorkflowReturnValue( - result as any, - ops, - 'wrun_test' - ); - // All 5 race results should be present (order depends on microtask timing) - expect(raceResults).toHaveLength(5); - expect(raceResults.sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4]); + expect( + await hydrateWorkflowReturnValue(result as any, ops, 'wrun_test') + ).toEqual([4, 3, 2, 1, 0]); }); }); @@ -2781,7 +2776,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2827,7 +2822,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue('step done', ops), + result: await dehydrateStepReturnValue('step done', ops), }, createdAt: new Date('2024-01-01T00:00:07.000Z'), }, @@ -2855,7 +2850,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2876,7 +2871,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue('first done', ops), + result: await dehydrateStepReturnValue('first done', ops), }, createdAt: new Date('2024-01-01T00:00:01.000Z'), }, @@ -2887,7 +2882,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue('duplicate', ops), + result: await dehydrateStepReturnValue('duplicate', ops), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), }, @@ -2904,7 +2899,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JY', eventData: { - result: dehydrateStepReturnValue('second done', ops), + result: await dehydrateStepReturnValue('second done', ops), }, createdAt: new Date('2024-01-01T00:00:04.000Z'), }, @@ -2931,7 +2926,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -2946,7 +2941,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_UNKNOWN_CORRELATION_ID', eventData: { - result: dehydrateStepReturnValue('orphan', ops), + result: await dehydrateStepReturnValue('orphan', ops), }, createdAt: new Date('2024-01-01T00:00:00.000Z'), }, @@ -2963,7 +2958,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue('done', ops), + result: await dehydrateStepReturnValue('done', ops), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), }, @@ -2988,7 +2983,7 @@ describe('runWorkflow', () => { runId: workflowRunId, workflowName: 'workflow', status: 'running', - input: dehydrateWorkflowArguments([], ops), + input: await dehydrateWorkflowArguments([], ops), createdAt: new Date('2024-01-01T00:00:00.000Z'), updatedAt: new Date('2024-01-01T00:00:00.000Z'), startedAt: new Date('2024-01-01T00:00:00.000Z'), @@ -3017,7 +3012,7 @@ describe('runWorkflow', () => { eventType: 'step_completed', correlationId: 'step_01HK153X008RT6YEW43G8QX6JX', eventData: { - result: dehydrateStepReturnValue('done', ops), + result: await dehydrateStepReturnValue('done', ops), }, createdAt: new Date('2024-01-01T00:00:02.000Z'), },