Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/async-serde.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
"@workflow/cli": patch
---

Refactor serialization code to be asynchronous
24 changes: 15 additions & 9 deletions packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
},
resolveData,
});
const runsWithHydratedIO = runs.data.map(hydrateResourceIO);
const runsWithHydratedIO = await Promise.all(
runs.data.map(hydrateResourceIO)
);
showJson({ ...runs, data: runsWithHydratedIO });
return;
} catch (error) {
Expand Down Expand Up @@ -572,7 +574,7 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
}
},
displayPage: async (runs) => {
const runsWithHydratedIO = runs.map(hydrateResourceIO);
const runsWithHydratedIO = await Promise.all(runs.map(hydrateResourceIO));
logger.log(showTable(runsWithHydratedIO, props, opts));
},
});
Expand All @@ -588,7 +590,7 @@ export const getRecentRun = async (
pagination: { limit: 1, sortOrder: opts.sort || 'desc' },
resolveData: 'none', // Don't need data for just getting the ID
});
runs.data = runs.data.map(hydrateResourceIO);
runs.data = await Promise.all(runs.data.map(hydrateResourceIO));
return runs.data[0];
} catch (error) {
if (handleApiError(error, opts.backend)) {
Expand All @@ -608,7 +610,7 @@ export const showRun = async (
}
try {
const run = await world.runs.get(runId, { resolveData: 'all' });
const runWithHydratedIO = hydrateResourceIO(run);
const runWithHydratedIO = await hydrateResourceIO(run);
if (opts.json) {
showJson(runWithHydratedIO);
return;
Expand Down Expand Up @@ -711,7 +713,9 @@ export const listSteps = async (
}
},
displayPage: async (steps) => {
const stepsWithHydratedIO = steps.map(hydrateResourceIO);
const stepsWithHydratedIO = await Promise.all(
steps.map(hydrateResourceIO)
);
logger.log(showTable(stepsWithHydratedIO, props, opts));
showInspectInfoBox('step');
},
Expand All @@ -735,7 +739,7 @@ export const showStep = async (
const step = await world.steps.get(opts.runId, stepId, {
resolveData: 'all',
});
const stepWithHydratedIO = hydrateResourceIO(step);
const stepWithHydratedIO = await hydrateResourceIO(step);
if (opts.json) {
showJson(stepWithHydratedIO);
return;
Expand Down Expand Up @@ -950,7 +954,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
},
resolveData,
});
const hydratedHooks = hooks.data.map(hydrateResourceIO);
const hydratedHooks = await Promise.all(
hooks.data.map(hydrateResourceIO)
);
showJson({ ...hooks, data: hydratedHooks });
return;
} catch (error) {
Expand Down Expand Up @@ -994,7 +1000,7 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
}
},
displayPage: async (hooks) => {
const hydratedHooks = hooks.map(hydrateResourceIO);
const hydratedHooks = await Promise.all(hooks.map(hydrateResourceIO));
logger.log(showTable(hydratedHooks, HOOK_LISTED_PROPS, opts));
showInspectInfoBox('hook');
},
Expand All @@ -1013,7 +1019,7 @@ export const showHook = async (
const hook = await world.hooks.get(hookId, {
resolveData: 'all',
});
const hydratedHook = hydrateResourceIO(hook);
const hydratedHook = await hydrateResourceIO(hook);
if (opts.json) {
showJson(hydratedHook);
return;
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ export async function getHookByToken(token: string): Promise<Hook> {
const world = getWorld();
const hook = await world.hooks.getByToken(token);
if (typeof hook.metadata !== 'undefined') {
hook.metadata = hydrateStepArguments(hook.metadata as any, [], hook.runId);
hook.metadata = await hydrateStepArguments(
hook.metadata as any,
[],
hook.runId
);
}
return hook;
}
Expand Down Expand Up @@ -85,7 +89,7 @@ export async function resumeHook<T = any>(
// Dehydrate the payload for storage
const ops: Promise<any>[] = [];
const v1Compat = isLegacySpecVersion(hook.specVersion);
const dehydratedPayload = dehydrateStepReturnValue(
const dehydratedPayload = await dehydrateStepReturnValue(
payload,
ops,
hook.runId,
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/runtime/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import {
} from '@workflow/errors';
import {
SPEC_VERSION_CURRENT,
type World,
type WorkflowRunStatus,
type World,
} from '@workflow/world';
import {
getExternalRevivers,
Expand Down Expand Up @@ -153,7 +153,7 @@ export class Run<TResult> {
const run = await this.world.runs.get(this.runId);

if (run.status === 'completed') {
return hydrateWorkflowReturnValue(run.output, [], this.runId);
return await hydrateWorkflowReturnValue(run.output, [], this.runId);
}

if (run.status === 'cancelled') {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function recreateRunFromExisting(
try {
const run = await world.runs.get(runId, { resolveData: 'all' });
const workflowArgs = normalizeWorkflowArgs(
hydrateWorkflowArguments(run.input, globalThis)
await hydrateWorkflowArguments(run.input, globalThis)
);
const specVersion =
options.specVersion ?? run.specVersion ?? SPEC_VERSION_LEGACY;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export async function start<TArgs extends unknown[], TResult>(

// Create run via run_created event (event-sourced architecture)
// Pass client-generated runId - server will accept and use it
const workflowArguments = dehydrateWorkflowArguments(
const workflowArguments = await dehydrateWorkflowArguments(
args,
ops,
runId,
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ const stepHandler = getWorldHandlers().createQueueHandler(
{},
async (hydrateSpan) => {
const startTime = Date.now();
const result = hydrateStepArguments(
const result = await hydrateStepArguments(
step.input,
ops,
workflowRunId
Expand Down Expand Up @@ -354,7 +354,7 @@ const stepHandler = getWorldHandlers().createQueueHandler(
{},
async (dehydrateSpan) => {
const startTime = Date.now();
const dehydrated = dehydrateStepReturnValue(
const dehydrated = await dehydrateStepReturnValue(
result,
ops,
workflowRunId
Expand Down
40 changes: 21 additions & 19 deletions packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,26 @@ export async function handleSuspension({
);

// Build hook_created events (World will atomically create hook entities)
const hookEvents: CreateEventRequest[] = hookItems.map((queueItem) => {
const hookMetadata: SerializedData | undefined =
typeof queueItem.metadata === 'undefined'
? undefined
: (dehydrateStepArguments(
queueItem.metadata,
suspension.globalThis
) as SerializedData);
return {
eventType: 'hook_created' as const,
specVersion: SPEC_VERSION_CURRENT,
correlationId: queueItem.correlationId,
eventData: {
token: queueItem.token,
metadata: hookMetadata,
},
};
});
const hookEvents: CreateEventRequest[] = await Promise.all(
hookItems.map(async (queueItem) => {
const hookMetadata: SerializedData | undefined =
typeof queueItem.metadata === 'undefined'
? undefined
: ((await dehydrateStepArguments(
queueItem.metadata,
suspension.globalThis
)) as SerializedData);
return {
eventType: 'hook_created' as const,
specVersion: SPEC_VERSION_CURRENT,
correlationId: queueItem.correlationId,
eventData: {
token: queueItem.token,
metadata: hookMetadata,
},
};
})
);

// Process hooks first to prevent race conditions with webhook receivers
// All hook creations run in parallel
Expand Down Expand Up @@ -153,7 +155,7 @@ export async function handleSuspension({
(async () => {
// Create step event if not already created
if (stepsNeedingCreation.has(queueItem.correlationId)) {
const dehydratedInput = dehydrateStepArguments(
const dehydratedInput = await dehydrateStepArguments(
{
args: queueItem.args,
closureVars: queueItem.closureVars,
Expand Down
Loading
Loading