From 0e7fded5b247c06de02ee2d61a2a019e06073755 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 09:19:11 -0800 Subject: [PATCH 01/16] Parallelize async operations in step handler for performance - Parallelize getPort(), getSpanKind(), and world.steps.get() calls - Start step_started event creation while hydrating arguments (CPU work) - Parallelize step_completed event with trace serialization Co-Authored-By: Claude Opus 4.5 --- .changeset/step-handler-parallelization.md | 5 ++ packages/core/src/runtime/step-handler.ts | 74 ++++++++++++++++------ 2 files changed, 60 insertions(+), 19 deletions(-) create mode 100644 .changeset/step-handler-parallelization.md diff --git a/.changeset/step-handler-parallelization.md b/.changeset/step-handler-parallelization.md new file mode 100644 index 0000000000..d6d8a0465b --- /dev/null +++ b/.changeset/step-handler-parallelization.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Optimize step handler performance by parallelizing async operations diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 3e511e647e..cc7b556119 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -63,12 +63,22 @@ const stepHandler = getWorldHandlers().createQueueHandler( const stepName = metadata.queueName.slice('__wkf_step_'.length); const world = getWorld(); - // Get the port early to avoid async operations during step execution - const port = await getPort(); + // OPTIMIZATION 0: Run these three async operations concurrently + // - getPort(): local async operation + // - getSpanKind(): may involve async operations + // - world.steps.get(): HTTP call to fetch step entity + const [port, spanKind, step_] = await Promise.all([ + getPort(), + getSpanKind('CONSUMER'), + world.steps.get(workflowRunId, stepId), + ]); + + // Use mutable step variable for later updates + let step = step_; return trace( `step ${stepName}`, - { kind: await getSpanKind('CONSUMER'), links: spanLinks }, + { kind: spanKind, links: spanLinks }, async (span) => { span?.setAttributes({ ...Attribute.StepName(stepName), @@ -101,8 +111,6 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepTracePropagated(!!traceContext), }); - let step = await world.steps.get(workflowRunId, stepId); - runtimeLogger.debug('Step execution details', { stepName, stepId: step.stepId, @@ -210,14 +218,30 @@ const stepHandler = getWorldHandlers().createQueueHandler( return; } - // Start the step via event (event-sourced architecture) - // step_started increments the attempt counter in the World implementation - const startResult = await world.events.create(workflowRunId, { + // OPTIMIZATION 1: Start step_started event creation in background + // The HTTP call for step_started can happen while we do CPU work (hydration) + const stepStartedPromise = world.events.create(workflowRunId, { eventType: 'step_started', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, }); + // Hydrate the step input arguments, closure variables, and thisVal + // This uses step.input from the earlier world.steps.get() call + // and runs CPU-bound work while step_started HTTP is in-flight + const ops: Promise[] = []; + const hydratedInput = hydrateStepArguments( + step.input, + ops, + workflowRunId + ); + + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; + + // Now await step_started - we need attempt/startedAt for context + const startResult = await stepStartedPromise; + // Use the step entity from the event response (no extra get call needed) if (!startResult.step) { throw new WorkflowRuntimeError( @@ -306,22 +330,34 @@ const stepHandler = getWorldHandlers().createQueueHandler( }) ); - // Complete the step via event (event-sourced architecture) - // The event creation atomically updates the step entity - // result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array - await world.events.create(workflowRunId, { - eventType: 'step_completed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - result: result as Uint8Array, - }, - }); + // OPTIMIZATION 2: Parallelize step_completed with trace serialization + // Run step_completed event creation and serializeTraceCarrier() concurrently + // The trace carrier will be used in the final queueMessage call + const [, traceCarrier] = await Promise.all([ + world.events.create(workflowRunId, { + eventType: 'step_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + result: result as Uint8Array, + }, + }), + serializeTraceCarrier(), + ]); span?.setAttributes({ ...Attribute.StepStatus('completed'), ...Attribute.StepResultType(typeof result), }); + + // Queue the workflow continuation immediately after step_completed + // Using pre-computed traceCarrier from parallel operation + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier, + requestedAt: new Date(), + }); + return; } catch (err: unknown) { span?.setAttributes({ ...Attribute.StepErrorName(getErrorName(err)), From 4817ae81ca87c100e44bfada34cf22975b951bdd Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 10:04:24 -0800 Subject: [PATCH 02/16] Fix race condition: await step_started before hydration Reverts Optimization 1 to fix a race condition where hydrateStepArguments() could throw before stepStartedPromise was awaited, causing stale step.attempt in the catch handler and potentially allowing extra retries. Optimizations 0 and 2 are preserved as they don't have this issue. Co-Authored-By: Claude Opus 4.5 --- packages/core/src/runtime/step-handler.ts | 35 +++++++++++------------ 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index cc7b556119..41d5468afa 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -218,30 +218,16 @@ const stepHandler = getWorldHandlers().createQueueHandler( return; } - // OPTIMIZATION 1: Start step_started event creation in background - // The HTTP call for step_started can happen while we do CPU work (hydration) - const stepStartedPromise = world.events.create(workflowRunId, { + // Start the step via event (event-sourced architecture) + // step_started increments the attempt counter in the World implementation + // NOTE: We must await this BEFORE any code that could throw and use step.attempt + // in the catch handler, to ensure step.attempt is always up to date. + const startResult = await world.events.create(workflowRunId, { eventType: 'step_started', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, }); - // Hydrate the step input arguments, closure variables, and thisVal - // This uses step.input from the earlier world.steps.get() call - // and runs CPU-bound work while step_started HTTP is in-flight - const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId - ); - - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; - - // Now await step_started - we need attempt/startedAt for context - const startResult = await stepStartedPromise; - // Use the step entity from the event response (no extra get call needed) if (!startResult.step) { throw new WorkflowRuntimeError( @@ -275,6 +261,17 @@ const stepHandler = getWorldHandlers().createQueueHandler( const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; + // Hydrate the step input arguments, closure variables, and thisVal + const ops: Promise[] = []; + const hydratedInput = hydrateStepArguments( + step.input, + ops, + workflowRunId + ); + + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; + span?.setAttributes({ ...Attribute.StepArgumentsCount(args.length), ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), From af44568df32a1cc248555dd23a61151fbf84565d Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 13:10:14 -0800 Subject: [PATCH 03/16] Skip initial world.steps.get() in step handler for performance Eliminate the world.steps.get() HTTP call by calling step_started first and relying on server-side validation. This saves 50-80ms per step execution by removing one HTTP round-trip. The server (workflow-server) now validates: - Step not in terminal state (returns 409) - retryAfter timestamp reached (returns 425 with Retry-After header) - Workflow still active (returns 410 if completed) Changes: - Remove world.steps.get() from initial Promise.all - Call step_started first to get step entity and validate state - Handle 409 (terminal state) by re-queueing workflow - Handle 425 (retryAfter not reached) by returning timeout - Handle 410 (workflow gone) as no-op Co-Authored-By: Claude Opus 4.5 --- .changeset/step-handler-parallelization.md | 2 +- packages/core/src/runtime/step-handler.ts | 179 +++++++++++---------- 2 files changed, 94 insertions(+), 87 deletions(-) diff --git a/.changeset/step-handler-parallelization.md b/.changeset/step-handler-parallelization.md index d6d8a0465b..2bcb614e1e 100644 --- a/.changeset/step-handler-parallelization.md +++ b/.changeset/step-handler-parallelization.md @@ -2,4 +2,4 @@ "@workflow/core": patch --- -Optimize step handler performance by parallelizing async operations +Optimize step handler performance by removing initial world.steps.get() call diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 41d5468afa..b9df6d1e07 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -63,19 +63,14 @@ const stepHandler = getWorldHandlers().createQueueHandler( const stepName = metadata.queueName.slice('__wkf_step_'.length); const world = getWorld(); - // OPTIMIZATION 0: Run these three async operations concurrently - // - getPort(): local async operation - // - getSpanKind(): may involve async operations - // - world.steps.get(): HTTP call to fetch step entity - const [port, spanKind, step_] = await Promise.all([ + // OPTIMIZATION: Run local async operations concurrently + // Note: We no longer call world.steps.get() here - we rely on step_started + // to return the step entity, saving one HTTP round-trip + const [port, spanKind] = await Promise.all([ getPort(), getSpanKind('CONSUMER'), - world.steps.get(workflowRunId, stepId), ]); - // Use mutable step variable for later updates - let step = step_; - return trace( `step ${stepName}`, { kind: spanKind, links: spanLinks }, @@ -111,6 +106,92 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepTracePropagated(!!traceContext), }); + // OPTIMIZATION: Call step_started first - server validates state and returns step entity + // This eliminates the separate world.steps.get() HTTP call, saving 50-80ms per step. + // The server now validates: + // - Step not in terminal state (returns 409) + // - retryAfter timestamp reached (returns 425 with Retry-After header) + // - Workflow still active (returns 410 if completed) + let step; + try { + const startResult = await world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }); + + if (!startResult.step) { + throw new WorkflowRuntimeError( + `step_started event for "${stepId}" did not return step entity` + ); + } + step = startResult.step; + } catch (err) { + if (WorkflowAPIError.is(err)) { + // 410 Gone: Workflow has already completed + if (err.status === 410) { + console.warn( + `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` + ); + return; + } + + // 409 Conflict: Step in terminal state (completed/failed/cancelled) + // Re-enqueue the workflow to continue processing + if (err.status === 409) { + runtimeLogger.debug( + 'Step in terminal state, re-enqueuing workflow', + { + stepName, + stepId, + workflowRunId, + error: err.message, + } + ); + span?.setAttributes({ + ...Attribute.StepSkipped(true), + // Use 'completed' as a representative terminal state for the skip reason + ...Attribute.StepSkipReason('completed'), + }); + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; + } + + // 425 Too Early: retryAfter timestamp not reached yet + // Return timeout to queue so it retries later + if (err.status === 425) { + // Parse retryAfter from error response meta + const retryAfterStr = (err as any).meta?.retryAfter; + const retryAfter = retryAfterStr + ? new Date(retryAfterStr) + : new Date(Date.now() + 1000); + const timeoutSeconds = Math.max( + 1, + Math.ceil((retryAfter.getTime() - Date.now()) / 1000) + ); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + }); + runtimeLogger.debug( + 'Step retryAfter timestamp not yet reached', + { + stepName, + stepId, + retryAfter, + timeoutSeconds, + } + ); + return { timeoutSeconds }; + } + } + // Re-throw other errors + throw err; + } + runtimeLogger.debug('Step execution details', { stepName, stepId: step.stepId, @@ -122,32 +203,10 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepStatus(step.status), }); - // Check if the step has a `retryAfter` timestamp that hasn't been reached yet - const now = Date.now(); - if (step.retryAfter && step.retryAfter.getTime() > now) { - const timeoutSeconds = Math.ceil( - (step.retryAfter.getTime() - now) / 1000 - ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - }); - runtimeLogger.debug('Step retryAfter timestamp not yet reached', { - stepName, - stepId: step.stepId, - retryAfter: step.retryAfter, - timeoutSeconds, - }); - return { timeoutSeconds }; - } - let result: unknown; - // Check max retries FIRST before any state changes. + // Check max retries AFTER step_started (attempt was just incremented) // step.attempt tracks how many times step_started has been called. - // If step.attempt >= maxRetries, we've already tried maxRetries times. - // This handles edge cases where the step handler is invoked after max retries have been exceeded - // (e.g., when the step repeatedly times out or fails before reaching the catch handler). - // Without this check, the step would retry forever. // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. // The post-failure check uses >= to decide whether to retry after a failure. @@ -181,60 +240,8 @@ const stepHandler = getWorldHandlers().createQueueHandler( } try { - if (!['pending', 'running'].includes(step.status)) { - // We should only be running the step if it's either - // a) pending - initial state, or state set on re-try - // b) running - if a step fails mid-execution, like a function timeout - // otherwise, the step has been invoked erroneously - console.error( - `[Workflows] "${workflowRunId}" - Step invoked erroneously, expected status "pending" or "running", got "${step.status}" instead, skipping execution` - ); - span?.setAttributes({ - ...Attribute.StepSkipped(true), - ...Attribute.StepSkipReason(step.status), - }); - // There's a chance that a step terminates correctly, but the underlying process - // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. - // The queue lease expires and stepEntrypoint again, which leads us here, so - // we optimistically re-enqueue the workflow if the step is in a terminal state, - // under the assumption that this edge case happened. - // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case - // where the we execute this code based on the `step` entity status, but the runtime - // failed to create the `step_completed` event (due to failing between step and event update), - // in which case, this might lead to an infinite loop. - // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 - const isTerminalStep = [ - 'completed', - 'failed', - 'cancelled', - ].includes(step.status); - if (isTerminalStep) { - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - } - return; - } - - // Start the step via event (event-sourced architecture) - // step_started increments the attempt counter in the World implementation - // NOTE: We must await this BEFORE any code that could throw and use step.attempt - // in the catch handler, to ensure step.attempt is always up to date. - const startResult = await world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }); - - // Use the step entity from the event response (no extra get call needed) - if (!startResult.step) { - throw new WorkflowRuntimeError( - `step_started event for "${stepId}" did not return step entity` - ); - } - step = startResult.step; + // step_started already validated the step is in valid state (pending/running) + // and returned the updated step entity with incremented attempt // step.attempt is now the current attempt number (after increment) const attempt = step.attempt; From cc840ff21d7868c76ee5f5d1417cafa8b7e9455b Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 13:57:15 -0800 Subject: [PATCH 04/16] Add retryAfter validation to local and postgres worlds Add server-side retryAfter validation to match workflow-server behavior: - Check retryAfter timestamp before allowing step_started - Return HTTP 425 with retryAfter timestamp in response meta - Clear retryAfter field when step starts successfully This ensures consistent behavior across all world implementations and allows the step-handler optimization to work correctly. Co-Authored-By: Claude Opus 4.5 --- .changeset/step-handler-parallelization.md | 4 +++ packages/core/src/runtime/step-handler.ts | 11 -------- .../world-local/src/storage/events-storage.ts | 19 +++++++++++++ packages/world-postgres/src/storage.ts | 28 +++++++++++++++++-- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/.changeset/step-handler-parallelization.md b/.changeset/step-handler-parallelization.md index 2bcb614e1e..1d099a6e25 100644 --- a/.changeset/step-handler-parallelization.md +++ b/.changeset/step-handler-parallelization.md @@ -1,5 +1,9 @@ --- "@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch --- Optimize step handler performance by removing initial world.steps.get() call + +Add server-side retryAfter validation to local and postgres worlds to match workflow-server behavior. When a step has a retryAfter timestamp that hasn't been reached, step_started will now return HTTP 425 with the retryAfter timestamp in the response. diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index b9df6d1e07..8a92f0f98d 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -268,17 +268,6 @@ const stepHandler = getWorldHandlers().createQueueHandler( const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; - // Hydrate the step input arguments, closure variables, and thisVal - const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId - ); - - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; - span?.setAttributes({ ...Attribute.StepArgumentsCount(args.length), ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 7765708638..e31b382095 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -409,6 +409,23 @@ export function createEventsStorage(basedir: string): Storage['events'] { // Sets startedAt only on the first start (not updated on retries) // Reuse validatedStep from validation (already read above) if (validatedStep) { + // Check if retryAfter timestamp hasn't been reached yet + if ( + validatedStep.retryAfter && + validatedStep.retryAfter.getTime() > Date.now() + ) { + const err = new WorkflowAPIError( + `Cannot start step "${data.correlationId}": retryAfter timestamp has not been reached yet`, + { status: 425 } + ); + // Add meta for step-handler to extract retryAfter timestamp + (err as any).meta = { + stepId: data.correlationId, + retryAfter: validatedStep.retryAfter.toISOString(), + }; + throw err; + } + const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`; const stepPath = path.join( basedir, @@ -422,6 +439,8 @@ export function createEventsStorage(basedir: string): Storage['events'] { startedAt: validatedStep.startedAt ?? now, // Increment attempt counter on every start attempt: validatedStep.attempt + 1, + // Clear retryAfter now that the step has started + retryAfter: undefined, updatedAt: now, }; await writeJSON(stepPath, step, { overwrite: true }); diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index a944cb9d62..3183d77179 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -264,6 +264,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { .select({ status: Schema.steps.status, startedAt: Schema.steps.startedAt, + retryAfter: Schema.steps.retryAfter, }) .from(Schema.steps) .where( @@ -430,8 +431,11 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Step-related event validation (ordering and terminal state) // Fetch status + startedAt so we can reuse for step_started (avoid double read) // Skip validation for step_completed/step_failed - use conditional UPDATE instead - let validatedStep: { status: string; startedAt: Date | null } | null = - null; + let validatedStep: { + status: string; + startedAt: Date | null; + retryAfter: Date | null; + } | null = null; const stepEventsNeedingValidation = ['step_started', 'step_retrying']; if ( stepEventsNeedingValidation.includes(data.eventType) && @@ -643,7 +647,25 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Sets startedAt (maps to startedAt) only on first start // Reuse validatedStep from validation (already read above) if (data.eventType === 'step_started') { + // Check if retryAfter timestamp hasn't been reached yet + if ( + validatedStep?.retryAfter && + validatedStep.retryAfter.getTime() > Date.now() + ) { + const err = new WorkflowAPIError( + `Cannot start step "${data.correlationId}": retryAfter timestamp has not been reached yet`, + { status: 425 } + ); + // Add meta for step-handler to extract retryAfter timestamp + (err as any).meta = { + stepId: data.correlationId, + retryAfter: validatedStep.retryAfter.toISOString(), + }; + throw err; + } + const isFirstStart = !validatedStep?.startedAt; + const hadRetryAfter = !!validatedStep?.retryAfter; const [stepValue] = await drizzle .update(Schema.steps) @@ -653,6 +675,8 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { attempt: sql`${Schema.steps.attempt} + 1`, // Only set startedAt on first start (not updated on retries) ...(isFirstStart ? { startedAt: now } : {}), + // Clear retryAfter now that the step has started + ...(hadRetryAfter ? { retryAfter: null } : {}), }) .where( and( From 60db90f4404216ca407c89b232637589020a0329 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 16:51:16 -0800 Subject: [PATCH 05/16] Fix step terminal state HTTP status code to 409 (Conflict) Aligns local and postgres worlds with workflow-server, which returns 409 via InvalidOperationStateError for step in terminal state. Co-Authored-By: Claude Opus 4.5 --- packages/world-local/src/storage/events-storage.ts | 2 +- packages/world-postgres/src/storage.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index e31b382095..c8156cf919 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -194,7 +194,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { if (isStepTerminal(validatedStep.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${validatedStep.status}"`, - { status: 410 } + { status: 409 } ); } diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index 3183d77179..f6e086151f 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -460,7 +460,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { if (isStepTerminal(validatedStep.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${validatedStep.status}"`, - { status: 410 } + { status: 409 } ); } From 16e8a3eb22954aab021585f0e8003f83497198e0 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 21:10:38 -0800 Subject: [PATCH 06/16] Fix world-vercel telemetry to use parent application's tracer The world-vercel package was creating spans under a separate 'workflow-world-vercel' service name, causing HTTP spans for workflow-server API calls (step_started, step_completed) to be filtered out when viewing traces for the main application service. Now uses the same 'workflow' tracer name as @workflow/core to ensure all spans are reported under the parent application's service. --- .changeset/world-vercel-telemetry-tracer.md | 7 +++++++ packages/world-vercel/src/telemetry.ts | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 .changeset/world-vercel-telemetry-tracer.md diff --git a/.changeset/world-vercel-telemetry-tracer.md b/.changeset/world-vercel-telemetry-tracer.md new file mode 100644 index 0000000000..1162e28a47 --- /dev/null +++ b/.changeset/world-vercel-telemetry-tracer.md @@ -0,0 +1,7 @@ +--- +"@workflow/world-vercel": patch +--- + +Fix world-vercel telemetry to use parent application's tracer + +The world-vercel package was creating spans under a separate 'workflow-world-vercel' service name, causing HTTP spans for workflow-server API calls (step_started, step_completed) to be filtered out when viewing traces for the main application service. Now uses the same 'workflow' tracer name as @workflow/core to ensure all spans are reported under the parent application's service. diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts index a9c7333722..c0f0164940 100644 --- a/packages/world-vercel/src/telemetry.ts +++ b/packages/world-vercel/src/telemetry.ts @@ -9,6 +9,9 @@ * NOTE: Unlike the trace() function in @workflow/core, this implementation does not * have special handling for WorkflowSuspension errors because world-vercel operates * at the HTTP layer and never encounters workflow suspension effects. + * + * IMPORTANT: This module uses the same tracer name 'workflow' as @workflow/core to ensure + * all spans are reported under the parent application's service, not as a separate service. */ import type * as api from '@opentelemetry/api'; import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; @@ -28,7 +31,7 @@ let tracerPromise: Promise | null = null; async function getTracer(): Promise { if (!tracerPromise) { tracerPromise = getOtelApi().then((otel) => - otel ? otel.trace.getTracer('workflow-world-vercel') : null + otel ? otel.trace.getTracer('workflow') : null ); } return tracerPromise; From 23821f17ba3d128525a60ca66ab771080edc7a74 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 21:28:31 -0800 Subject: [PATCH 07/16] Update /demo command to include OTEL tracing with Jaeger - Start Jaeger container for local trace visualization - Configure OTEL exporter environment variables for dev server - Open Jaeger UI automatically - Add documentation about available trace attributes Co-Authored-By: Claude Opus 4.5 --- .claude/commands/demo.md | 48 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/.claude/commands/demo.md b/.claude/commands/demo.md index 487dd22bc1..b6525bf0a8 100644 --- a/.claude/commands/demo.md +++ b/.claude/commands/demo.md @@ -1,9 +1,51 @@ --- description: Run the 7_full demo workflow -allowed-tools: Bash(curl:*), Bash(npx workflow:*), Bash(pnpm dev) +allowed-tools: Bash(curl:*), Bash(npx workflow:*), Bash(pnpm dev), Bash(docker *), Bash(open *) --- +Run the demo workflow with OpenTelemetry tracing enabled. -Start the $ARUGMENTS workbench (default to the nextjs turboback workbench available in the workbenches directory). Run it in dev mode, and also start the workflow web UI (run `npx workflow web` inside the appropriate workbench directory). +## Steps -Then trigger the 7_full.ts workflow example. you can see how to trigger a specific example by looking at the trigger API route for the workbench - it is probably just a POST request using bash (maybe curl) to this endpoint: > +1. **Start Jaeger** for OTEL trace visualization (if not already running): + ```bash + docker run -d --name jaeger-otel \ + -p 16686:16686 \ + -p 4317:4317 \ + -p 4318:4318 \ + jaegertracing/jaeger:2.4.0 2>&1 || docker start jaeger-otel + ``` + +2. **Open the Jaeger UI** in the browser: + ```bash + open http://localhost:16686 + ``` + +3. **Start the workbench** (default: nextjs-turbopack) with OTEL tracing enabled: + ```bash + cd workbench/nextjs-turbopack + OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318" \ + OTEL_EXPORTER_OTLP_PROTOCOL="http/protobuf" \ + pnpm dev + ``` + + Also start the workflow web UI in a separate terminal: + ```bash + npx workflow web + ``` + +4. **Trigger the 7_full.ts workflow**: + ```bash + curl -X POST "http://localhost:3000/api/trigger?workflowFile=workflows/7_full.ts&workflowFn=handleUserSignup" + ``` + +5. **View traces** in Jaeger UI at http://localhost:16686 - select service `example-nextjs-workflow` + +## Tracing Details + +The traces include: +- Step execution spans with `workflow.queue.overhead_ms`, `step.attempt`, `step.status` +- Workflow orchestration spans with `workflow.run.status`, `workflow.events.count` +- Queue message spans with messaging attributes + +$ARGUMENTS can specify a different workbench (e.g., "example" or "nextjs-webpack"). From f5ee6544a6dc14a3385de74920dc592dd8ef22b6 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 21:47:11 -0800 Subject: [PATCH 08/16] Add OTEL instrumentation to world-local and improve trace consistency - Add telemetry.ts and instrumentObject.ts to world-local for tracing parity with world-vercel (world.runs, world.steps, world.events, world.hooks spans) - Change workflow span name from uppercase "WORKFLOW" to lowercase "workflow" for consistency with step spans and OTEL naming conventions - Add step.execute child span to trace actual user step function execution separately from step handler infrastructure These changes enable local development to have the same observability as production deployments, making performance analysis and debugging easier. Co-Authored-By: Claude Opus 4.5 --- packages/core/src/runtime.ts | 2 +- packages/core/src/runtime/step-handler.ts | 47 +++++++------ packages/world-local/src/instrumentObject.ts | 26 +++++++ packages/world-local/src/storage/index.ts | 17 ++++- packages/world-local/src/telemetry.ts | 71 ++++++++++++++++++++ 5 files changed, 139 insertions(+), 24 deletions(-) create mode 100644 packages/world-local/src/instrumentObject.ts create mode 100644 packages/world-local/src/telemetry.ts diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 6e229a4971..7a920bbc16 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -88,7 +88,7 @@ export function workflowEntrypoint( return await withTraceContext(traceContext, async () => { const world = getWorld(); return trace( - `WORKFLOW ${workflowName}`, + `workflow ${workflowName}`, { links: spanLinks }, async (span) => { span?.setAttributes({ diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 51ca8fc171..a6efbf8472 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -262,6 +262,9 @@ const stepHandler = getWorldHandlers().createQueueHandler( `Step "${stepId}" has no "startedAt" timestamp` ); } + // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) + const stepStartedAt = step.startedAt; + // Hydrate the step input arguments, closure variables, and thisVal // Track deserialization time for observability // NOTE: This captures only the synchronous portion of hydration. Any async @@ -284,29 +287,31 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), }); - // Track execution time for observability + // Execute the step function with tracing const executionStartTime = Date.now(); - result = await contextStorage.run( - { - stepMetadata: { - stepId, - stepStartedAt: new Date(+step.startedAt), - attempt, - }, - workflowMetadata: { - workflowRunId, - workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: process.env.VERCEL_URL - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, + result = await trace('step.execute', {}, async () => { + return await contextStorage.run( + { + stepMetadata: { + stepId, + stepStartedAt: new Date(+stepStartedAt), + attempt, + }, + workflowMetadata: { + workflowRunId, + workflowStartedAt: new Date(+workflowStartedAt), + // TODO: there should be a getUrl method on the world interface itself. This + // solution only works for vercel + local worlds. + url: process.env.VERCEL_URL + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${port ?? 3000}`, + }, + ops, + closureVars: hydratedInput.closureVars, }, - ops, - closureVars: hydratedInput.closureVars, - }, - () => stepFn.apply(thisVal, args) - ); + () => stepFn.apply(thisVal, args) + ); + }); const executionTimeMs = Date.now() - executionStartTime; span?.setAttributes({ diff --git a/packages/world-local/src/instrumentObject.ts b/packages/world-local/src/instrumentObject.ts new file mode 100644 index 0000000000..8e8623f3af --- /dev/null +++ b/packages/world-local/src/instrumentObject.ts @@ -0,0 +1,26 @@ +/** + * Utility to instrument object methods with tracing. + * This mirrors world-vercel's implementation for consistent observability. + */ +import { trace } from './telemetry.js'; + +/** + * Wraps all methods of an object with tracing spans. + * @param prefix - Prefix for span names (e.g., "world.runs") + * @param o - Object with methods to instrument + * @returns Instrumented object with same interface + */ +export function instrumentObject(prefix: string, o: T): T { + const handlers = {} as T; + for (const key of Object.keys(o) as (keyof T)[]) { + if (typeof o[key] !== 'function') { + handlers[key] = o[key]; + } else { + const f = o[key]; + // @ts-expect-error - dynamic function wrapping + handlers[key] = async (...args: unknown[]) => + trace(`${prefix}.${String(key)}`, {}, () => f(...args)); + } + } + return handlers; +} diff --git a/packages/world-local/src/storage/index.ts b/packages/world-local/src/storage/index.ts index 937bb4356a..0c7e106e6f 100644 --- a/packages/world-local/src/storage/index.ts +++ b/packages/world-local/src/storage/index.ts @@ -1,4 +1,5 @@ import type { Storage } from '@workflow/world'; +import { instrumentObject } from '../instrumentObject.js'; import { createEventsStorage } from './events-storage.js'; import { createHooksStorage } from './hooks-storage.js'; import { createRunsStorage } from './runs-storage.js'; @@ -8,14 +9,26 @@ import { createStepsStorage } from './steps-storage.js'; * Creates a complete storage implementation using the filesystem. * This is the main entry point that composes all storage implementations. * + * All storage methods are instrumented with tracing spans for observability. + * * @param basedir - The base directory for storing workflow data - * @returns A complete Storage implementation + * @returns A complete Storage implementation with tracing */ export function createStorage(basedir: string): Storage { - return { + // Create raw storage implementations + const storage: Storage = { runs: createRunsStorage(basedir), steps: createStepsStorage(basedir), events: createEventsStorage(basedir), hooks: createHooksStorage(basedir), }; + + // Instrument all storage methods with tracing + // NOTE: Span names are lowercase per OTEL semantic conventions + return { + runs: instrumentObject('world.runs', storage.runs), + steps: instrumentObject('world.steps', storage.steps), + events: instrumentObject('world.events', storage.events), + hooks: instrumentObject('world.hooks', storage.hooks), + }; } diff --git a/packages/world-local/src/telemetry.ts b/packages/world-local/src/telemetry.ts new file mode 100644 index 0000000000..2d2615ade8 --- /dev/null +++ b/packages/world-local/src/telemetry.ts @@ -0,0 +1,71 @@ +/** + * Minimal telemetry utilities for world-local package. + * + * NOTE: This module is a simplified version of world-vercel's telemetry. + * It provides tracing capabilities for local development to match + * the observability experience in production. + * + * IMPORTANT: This module uses the same tracer name 'workflow' as @workflow/core to ensure + * all spans are reported under the parent application's service, not as a separate service. + */ +import type * as api from '@opentelemetry/api'; +import type { Span, SpanOptions } from '@opentelemetry/api'; + +// Lazy load OpenTelemetry API to make it optional +let otelApiPromise: Promise | null = null; + +async function getOtelApi(): Promise { + if (!otelApiPromise) { + otelApiPromise = import('@opentelemetry/api').catch(() => null); + } + return otelApiPromise; +} + +let tracerPromise: Promise | null = null; + +async function getTracer(): Promise { + if (!tracerPromise) { + tracerPromise = getOtelApi().then((otel) => + otel ? otel.trace.getTracer('workflow') : null + ); + } + return tracerPromise; +} + +/** + * Wrap an async function with a trace span. + * No-op if OpenTelemetry is not available. + */ +export async function trace( + spanName: string, + ...args: + | [fn: (span?: Span) => Promise] + | [opts: SpanOptions, fn: (span?: Span) => Promise] +): Promise { + const [tracer, otel] = await Promise.all([getTracer(), getOtelApi()]); + const { fn, opts } = + typeof args[0] === 'function' + ? { fn: args[0], opts: {} } + : { fn: args[1], opts: args[0] }; + if (!fn) throw new Error('Function to trace must be provided'); + + if (!tracer || !otel) { + return await fn(); + } + + return tracer.startActiveSpan(spanName, opts, async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: otel.SpanStatusCode.OK }); + return result; + } catch (e) { + span.setStatus({ + code: otel.SpanStatusCode.ERROR, + message: (e as Error).message, + }); + throw e; + } finally { + span.end(); + } + }); +} From 1d063adbe800ccd4f6a6ae9f5cc31a4a12b3ebf0 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 21:52:07 -0800 Subject: [PATCH 09/16] Use uppercase WORKFLOW and STEP span names for consistency with HTTP spans HTTP spans use uppercase methods like "GET /path" and "POST /path". Following the same convention, workflow and step spans now use: - WORKFLOW - STEP Child spans (workflow.run, workflow.loadEvents, step.execute, world.events.create) remain lowercase as they represent internal operations, not top-level entries. Co-Authored-By: Claude Opus 4.5 --- packages/core/src/runtime.ts | 2 +- packages/core/src/runtime/step-handler.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 7a920bbc16..6e229a4971 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -88,7 +88,7 @@ export function workflowEntrypoint( return await withTraceContext(traceContext, async () => { const world = getWorld(); return trace( - `workflow ${workflowName}`, + `WORKFLOW ${workflowName}`, { links: spanLinks }, async (span) => { span?.setAttributes({ diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index a6efbf8472..8324230755 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -72,7 +72,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( ]); return trace( - `step ${stepName}`, + `STEP ${stepName}`, { kind: spanKind, links: spanLinks }, async (span) => { span?.setAttributes({ From 8147d020e498b4436b0a3ac330bf6cc4bd3cbc97 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 22:25:17 -0800 Subject: [PATCH 10/16] Add W3C trace context headers to step queue messages Include traceparent and tracestate headers when queueing step execution messages. This enables automatic trace propagation by Vercel's infrastructure, potentially linking step invocation spans to the parent workflow trace. The trace carrier is now serialized once and included in both: - Payload: for manual context restoration in step handler - Headers: for automatic HTTP-based trace propagation Co-Authored-By: Claude Opus 4.5 --- .../core/src/runtime/suspension-handler.ts | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index da83f02a08..5f9f4b7852 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -19,6 +19,23 @@ import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier } from '../telemetry.js'; import { queueMessage } from './helpers.js'; +/** + * Extracts W3C trace context headers from a trace carrier for HTTP propagation. + * Returns an object with `traceparent` and optionally `tracestate` headers. + */ +function extractTraceHeaders( + traceCarrier: Record +): Record { + const headers: Record = {}; + if (traceCarrier.traceparent) { + headers.traceparent = traceCarrier.traceparent; + } + if (traceCarrier.tracestate) { + headers.tracestate = traceCarrier.tracestate; + } + return headers; +} + export interface SuspensionHandlerParams { suspension: WorkflowSuspension; world: World; @@ -169,6 +186,10 @@ export async function handleSuspension({ } // Queue step execution message + // Serialize trace context once and include in both payload and headers + // Payload: for manual context restoration in step handler + // Headers: for automatic trace propagation by Vercel's infrastructure + const traceCarrier = await serializeTraceCarrier(); await queueMessage( world, `__wkf_step_${queueItem.stepName}`, @@ -177,12 +198,15 @@ export async function handleSuspension({ workflowRunId: runId, workflowStartedAt, stepId: queueItem.correlationId, - traceCarrier: await serializeTraceCarrier(), + traceCarrier, requestedAt: new Date(), }, { idempotencyKey: queueItem.correlationId, - headers: { 'x-workflow-run-id': runId }, + headers: { + 'x-workflow-run-id': runId, + ...extractTraceHeaders(traceCarrier), + }, } ); })() From 07204fe6f153d1251ade3dccae8b6a5f534dbdd0 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 23:25:11 -0800 Subject: [PATCH 11/16] Improve OTEL tracing with service attribution and enhanced instrumentation - Add peer.service attributes for workflow-server and VQS for Datadog service maps - Rename queueMessage span to queue.publish for consistency - Add step.hydrate, step.dehydrate, and workflow.replay spans - Include event type in world.events.create span names (e.g., "world.events.create step_started") - Add span.recordException() for errors with category classification (fatal/retryable/transient) - Add span events for milestones: retry.scheduled, step.skipped, step.delayed - Add HTTP semantic conventions with peer.service for world-vercel HTTP calls - Add baggage propagation for workflow context (run_id, workflow_name) Co-Authored-By: Claude Opus 4.5 --- packages/core/src/runtime.ts | 387 ++++++++++-------- packages/core/src/runtime/helpers.ts | 7 +- packages/core/src/runtime/step-handler.ts | 93 ++++- packages/core/src/telemetry.ts | 64 +++ .../src/telemetry/semantic-conventions.ts | 26 ++ packages/world-local/src/instrumentObject.ts | 58 ++- packages/world-local/src/telemetry.ts | 33 +- packages/world-vercel/src/instrumentObject.ts | 62 ++- packages/world-vercel/src/telemetry.ts | 15 + packages/world-vercel/src/utils.ts | 7 + 10 files changed, 542 insertions(+), 210 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 6e229a4971..1d28b5d8a9 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -19,7 +19,12 @@ import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld, getWorldHandlers } from './runtime/world.js'; import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; -import { linkToCurrentContext, trace, withTraceContext } from './telemetry.js'; +import { + linkToCurrentContext, + trace, + withTraceContext, + withWorkflowBaggage, +} from './telemetry.js'; import { getErrorName, getErrorStack } from './types.js'; import { buildWorkflowSuspensionMessage } from './util.js'; import { runWorkflow } from './workflow.js'; @@ -84,210 +89,234 @@ export function workflowEntrypoint( const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); const spanLinks = await linkToCurrentContext(); - // Invoke user workflow within the propagated trace context + // Invoke user workflow within the propagated trace context and baggage return await withTraceContext(traceContext, async () => { - const world = getWorld(); - return trace( - `WORKFLOW ${workflowName}`, - { links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowOperation('execute'), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - // TODO: validate `workflowName` exists before consuming message? - - span?.setAttributes({ - ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowTracePropagated(!!traceContext), - }); + // Set workflow context as baggage for automatic propagation + return await withWorkflowBaggage( + { workflowRunId: runId, workflowName }, + async () => { + const world = getWorld(); + return trace( + `WORKFLOW ${workflowName}`, + { links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowOperation('execute'), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - let workflowStartedAt = -1; - try { - let workflowRun = await world.runs.get(runId); + // TODO: validate `workflowName` exists before consuming message? - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, + span?.setAttributes({ + ...Attribute.WorkflowRunId(runId), + ...Attribute.WorkflowTracePropagated(!!traceContext), }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; + let workflowStartedAt = -1; + try { + let workflowRun = await world.runs.get(runId); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` + ); + } + workflowRun = result.run; + } - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); } - ); + workflowStartedAt = +workflowRun.startedAt; - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - return; - } + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); + + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + return; + } - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); + // Load all events into memory before running + const events = await getAllWorkflowRunEvents( + workflowRun.runId + ); - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - const result = await world.events.create(runId, waitEvent); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - const result = await runWorkflow( - workflowCode, - workflowRun, - events - ); + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + const result = await world.events.create(runId, waitEvent); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } - // Complete the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, - }); + const result = await trace( + 'workflow.replay', + {}, + async (replaySpan) => { + replaySpan?.setAttributes({ + ...Attribute.WorkflowEventsCount(events.length), + }); + return await runWorkflow( + workflowCode, + workflowRun, + events + ); + } + ); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount - ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } + // Complete the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: result, + }, + }); - const result = await handleSuspension({ - suspension: err, - world, - runId, - workflowName, - workflowStartedAt, - span, - }); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), + }); + } catch (err) { + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; - } - } else { - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. + const result = await handleSuspension({ + suspension: err, + world, + runId, + workflowName, + workflowStartedAt, + span, + }); - const errorName = getErrorName(err); - const errorMessage = - err instanceof Error ? err.message : String(err); - let errorStack = getErrorStack(err); + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + } else { + // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError + // (for instance when the event log is corrupted, this is thrown by the event consumer). We could + // specially handle these if needed. - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); - // Fail the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them - }, - }); + const errorName = getErrorName(err); + const errorMessage = + err instanceof Error ? err.message : String(err); + let errorStack = getErrorStack(err); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(String(err)), - }); + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + // Fail the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them + }, + }); + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(String(err)), + ...Attribute.ErrorType(errorName), + }); + } + } } - } + ); // End trace } - ); // End withTraceContext - }); + ); // End withWorkflowBaggage + }); // End withTraceContext } ); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 0c1e361330..2493efe3e6 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -339,13 +339,18 @@ export async function queueMessage( ) { const queueName = args[0]; await trace( - 'queueMessage', + 'queue.publish', { // Standard OTEL messaging conventions attributes: { ...Attribute.MessagingSystem('vercel-queue'), ...Attribute.MessagingDestinationName(queueName), ...Attribute.MessagingOperationType('publish'), + // Peer service for Datadog service maps + ...Attribute.PeerService('vercel-queue'), + ...Attribute.RpcSystem('vercel-queue'), + ...Attribute.RpcService('vqs'), + ...Attribute.RpcMethod('publish'), }, kind: await getSpanKind('PRODUCER'), }, diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 8324230755..3164bbc076 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -153,6 +153,12 @@ const stepHandler = getWorldHandlers().createQueueHandler( // Use 'completed' as a representative terminal state for the skip reason ...Attribute.StepSkipReason('completed'), }); + // Add span event for step skip + span?.addEvent?.('step.skipped', { + 'skip.reason': 'terminal_state', + 'step.name': stepName, + 'step.id': stepId, + }); await queueMessage(world, `__wkf_workflow_${workflowName}`, { runId: workflowRunId, traceCarrier: await serializeTraceCarrier(), @@ -176,6 +182,12 @@ const stepHandler = getWorldHandlers().createQueueHandler( span?.setAttributes({ ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), }); + // Add span event for delayed retry + span?.addEvent?.('step.delayed', { + 'delay.reason': 'retry_after_not_reached', + 'delay.timeout_seconds': timeoutSeconds, + 'delay.retry_after': retryAfter.toISOString(), + }); runtimeLogger.debug( 'Step retryAfter timestamp not yet reached', { @@ -266,27 +278,32 @@ const stepHandler = getWorldHandlers().createQueueHandler( const stepStartedAt = step.startedAt; // Hydrate the step input arguments, closure variables, and thisVal - // Track deserialization time for observability // NOTE: This captures only the synchronous portion of hydration. Any async // operations (e.g., stream loading) are added to `ops` and executed later // via Promise.all(ops) - their timing is not included in this measurement. - const deserializeStartTime = Date.now(); const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId + const hydratedInput = await trace( + 'step.hydrate', + {}, + async (hydrateSpan) => { + const startTime = Date.now(); + const result = hydrateStepArguments( + step.input, + ops, + workflowRunId + ); + const durationMs = Date.now() - startTime; + hydrateSpan?.setAttributes({ + ...Attribute.StepArgumentsCount(result.args.length), + ...Attribute.QueueDeserializeTimeMs(durationMs), + }); + return result; + } ); - const deserializeTimeMs = Date.now() - deserializeStartTime; const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; - span?.setAttributes({ - ...Attribute.StepArgumentsCount(args.length), - ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), - }); - // Execute the step function with tracing const executionStartTime = Date.now(); result = await trace('step.execute', {}, async () => { @@ -321,14 +338,24 @@ const stepHandler = getWorldHandlers().createQueueHandler( // NOTE: None of the code from this point is guaranteed to run // Since the step might fail or cause a function timeout and the process might be SIGKILL'd // The workflow runtime must be resilient to the below code not executing on a failed step - // Track serialization time for observability - const serializeStartTime = Date.now(); - result = dehydrateStepReturnValue(result, ops, workflowRunId); - const serializeTimeMs = Date.now() - serializeStartTime; - - span?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(serializeTimeMs), - }); + result = await trace( + 'step.dehydrate', + {}, + async (dehydrateSpan) => { + const startTime = Date.now(); + const dehydrated = dehydrateStepReturnValue( + result, + ops, + workflowRunId + ); + const durationMs = Date.now() - startTime; + dehydrateSpan?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(durationMs), + ...Attribute.StepResultType(typeof dehydrated), + }); + return dehydrated; + } + ); waitUntil( Promise.all(ops).catch((err) => { @@ -368,9 +395,26 @@ const stepHandler = getWorldHandlers().createQueueHandler( }); return; } catch (err: unknown) { + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + // Determine error category and retryability + const isFatal = FatalError.is(err); + const isRetryable = RetryableError.is(err); + const errorCategory = isFatal + ? 'fatal' + : isRetryable + ? 'retryable' + : 'transient'; + span?.setAttributes({ ...Attribute.StepErrorName(getErrorName(err)), ...Attribute.StepErrorMessage(String(err)), + ...Attribute.ErrorType(getErrorName(err)), + ...Attribute.ErrorCategory(errorCategory), + ...Attribute.ErrorRetryable(!isFatal), }); if (WorkflowAPIError.is(err)) { @@ -388,7 +432,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( } } - if (FatalError.is(err)) { + if (isFatal) { const errorStack = getErrorStack(err); stepLogger.error( 'Encountered FatalError while executing step, bubbling up to parent workflow', @@ -503,6 +547,13 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepRetryWillRetry(true), }); + // Add span event for retry scheduling + span?.addEvent?.('retry.scheduled', { + 'retry.timeout_seconds': timeoutSeconds, + 'retry.attempt': currentAttempt, + 'retry.max_retries': maxRetries, + }); + // It's a retryable error - so have the queue keep the message visible // so that it gets retried. return { timeoutSeconds }; diff --git a/packages/core/src/telemetry.ts b/packages/core/src/telemetry.ts index 12ed60033f..1b26294b5e 100644 --- a/packages/core/src/telemetry.ts +++ b/packages/core/src/telemetry.ts @@ -188,3 +188,67 @@ export function linkToCurrentContext(): Promise<[api.Link] | undefined> { return [{ context }]; }); } + +// ============================================================ +// Baggage Propagation Utilities +// ============================================================ + +/** + * Workflow context to propagate via baggage + */ +export interface WorkflowBaggageContext { + workflowRunId: string; + workflowName: string; +} + +/** + * Sets workflow context as OTEL baggage for automatic propagation. + * Baggage is propagated across service boundaries via HTTP headers. + * @param context - Workflow context to set as baggage + * @returns A function to run within the baggage context + */ +export async function withWorkflowBaggage( + context: WorkflowBaggageContext, + fn: () => Promise +): Promise { + const otel = await OtelApi.value; + if (!otel) return fn(); + + // Create baggage with workflow context + const baggage = otel.propagation.createBaggage({ + 'workflow.run_id': { value: context.workflowRunId }, + 'workflow.name': { value: context.workflowName }, + }); + + // Set baggage in context and run function + const contextWithBaggage = otel.propagation.setBaggage( + otel.context.active(), + baggage + ); + + return otel.context.with(contextWithBaggage, () => fn()); +} + +/** + * Retrieves workflow context from OTEL baggage. + * @returns Workflow context if present in baggage, undefined otherwise + */ +export async function getWorkflowBaggage(): Promise< + WorkflowBaggageContext | undefined +> { + const otel = await OtelApi.value; + if (!otel) return undefined; + + const baggage = otel.propagation.getBaggage(otel.context.active()); + if (!baggage) return undefined; + + const runId = baggage.getEntry('workflow.run_id')?.value; + const name = baggage.getEntry('workflow.name')?.value; + + if (!runId || !name) return undefined; + + return { + workflowRunId: runId, + workflowName: name, + }; +} diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index b9ca1017e5..afe86b1ecb 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -298,3 +298,29 @@ export const QueueExecutionTimeMs = SemanticConvention( export const QueueSerializeTimeMs = SemanticConvention( 'workflow.queue.serialize_time_ms' ); + +// RPC/Peer Service attributes - For service maps and dependency tracking +// See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ + +/** The remote service name for Datadog service maps (Datadog-specific: peer.service) */ +export const PeerService = SemanticConvention('peer.service'); + +/** RPC system identifier (standard OTEL: rpc.system) */ +export const RpcSystem = SemanticConvention('rpc.system'); + +/** RPC service name (standard OTEL: rpc.service) */ +export const RpcService = SemanticConvention('rpc.service'); + +/** RPC method name (standard OTEL: rpc.method) */ +export const RpcMethod = SemanticConvention('rpc.method'); + +// Error attributes - Standard OTEL error conventions +// See: https://opentelemetry.io/docs/specs/semconv/exceptions/exceptions-spans/ + +/** Whether the error is retryable (workflow-specific) */ +export const ErrorRetryable = SemanticConvention('error.retryable'); + +/** Error category (workflow-specific: fatal, retryable, transient) */ +export const ErrorCategory = SemanticConvention< + 'fatal' | 'retryable' | 'transient' +>('error.category'); diff --git a/packages/world-local/src/instrumentObject.ts b/packages/world-local/src/instrumentObject.ts index 8e8623f3af..ae1dff23af 100644 --- a/packages/world-local/src/instrumentObject.ts +++ b/packages/world-local/src/instrumentObject.ts @@ -2,7 +2,35 @@ * Utility to instrument object methods with tracing. * This mirrors world-vercel's implementation for consistent observability. */ -import { trace } from './telemetry.js'; +import { + trace, + getSpanKind, + PeerService, + RpcSystem, + RpcService, + RpcMethod, +} from './telemetry.js'; + +/** Configuration for peer service attribution */ +const WORLD_LOCAL_SERVICE = { + peerService: 'world-local', + rpcSystem: 'local', + rpcService: 'world-local', +}; + +/** + * Extracts the event type from arguments for events.create calls. + * The event data is the second argument and contains eventType. + */ +function extractEventType(args: unknown[]): string | undefined { + if (args.length >= 2 && typeof args[1] === 'object' && args[1] !== null) { + const data = args[1] as Record; + if (typeof data.eventType === 'string') { + return data.eventType; + } + } + return undefined; +} /** * Wraps all methods of an object with tracing spans. @@ -17,9 +45,33 @@ export function instrumentObject(prefix: string, o: T): T { handlers[key] = o[key]; } else { const f = o[key]; + const methodName = String(key); // @ts-expect-error - dynamic function wrapping - handlers[key] = async (...args: unknown[]) => - trace(`${prefix}.${String(key)}`, {}, () => f(...args)); + handlers[key] = async (...args: unknown[]) => { + // Build span name - for events.create, include the event type + let spanName = `${prefix}.${methodName}`; + if (prefix === 'world.events' && methodName === 'create') { + const eventType = extractEventType(args); + if (eventType) { + spanName = `${prefix}.${methodName} ${eventType}`; + } + } + + return trace( + spanName, + { kind: await getSpanKind('INTERNAL') }, + async (span) => { + // Add peer service attributes for service maps + span?.setAttributes({ + ...PeerService(WORLD_LOCAL_SERVICE.peerService), + ...RpcSystem(WORLD_LOCAL_SERVICE.rpcSystem), + ...RpcService(WORLD_LOCAL_SERVICE.rpcService), + ...RpcMethod(`${prefix}.${methodName}`), + }); + return f(...args); + } + ); + }; } } return handlers; diff --git a/packages/world-local/src/telemetry.ts b/packages/world-local/src/telemetry.ts index 2d2615ade8..8a1fc6310a 100644 --- a/packages/world-local/src/telemetry.ts +++ b/packages/world-local/src/telemetry.ts @@ -9,7 +9,7 @@ * all spans are reported under the parent application's service, not as a separate service. */ import type * as api from '@opentelemetry/api'; -import type { Span, SpanOptions } from '@opentelemetry/api'; +import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; // Lazy load OpenTelemetry API to make it optional let otelApiPromise: Promise | null = null; @@ -69,3 +69,34 @@ export async function trace( } }); } + +/** + * Get SpanKind enum value by name. + * Returns undefined if OpenTelemetry is not available. + */ +export async function getSpanKind( + field: keyof typeof SpanKind +): Promise { + const otel = await getOtelApi(); + if (!otel) return undefined; + return otel.SpanKind[field]; +} + +// Semantic conventions for World/Storage tracing +// Standard OTEL conventions for peer service mapping +function SemanticConvention(...names: string[]) { + return (value: T) => + Object.fromEntries(names.map((name) => [name, value] as const)); +} + +/** The remote service name for Datadog service maps (Datadog-specific: peer.service) */ +export const PeerService = SemanticConvention('peer.service'); + +/** RPC system identifier (standard OTEL: rpc.system) */ +export const RpcSystem = SemanticConvention('rpc.system'); + +/** RPC service name (standard OTEL: rpc.service) */ +export const RpcService = SemanticConvention('rpc.service'); + +/** RPC method name (standard OTEL: rpc.method) */ +export const RpcMethod = SemanticConvention('rpc.method'); diff --git a/packages/world-vercel/src/instrumentObject.ts b/packages/world-vercel/src/instrumentObject.ts index a9813a1171..fbd895286f 100644 --- a/packages/world-vercel/src/instrumentObject.ts +++ b/packages/world-vercel/src/instrumentObject.ts @@ -2,11 +2,39 @@ * Utility to instrument object methods with tracing. * This is a minimal version for world-vercel to avoid circular dependencies with @workflow/core. */ -import { trace } from './telemetry.js'; +import { + trace, + getSpanKind, + PeerService, + RpcSystem, + RpcService, + RpcMethod, +} from './telemetry.js'; + +/** Configuration for peer service attribution */ +const WORKFLOW_SERVER_SERVICE = { + peerService: 'workflow-server', + rpcSystem: 'http', + rpcService: 'workflow-server', +}; + +/** + * Extracts the event type from arguments for events.create calls. + * The event data is the second argument and contains eventType. + */ +function extractEventType(args: unknown[]): string | undefined { + if (args.length >= 2 && typeof args[1] === 'object' && args[1] !== null) { + const data = args[1] as Record; + if (typeof data.eventType === 'string') { + return data.eventType; + } + } + return undefined; +} /** * Wraps all methods of an object with tracing spans. - * @param prefix - Prefix for span names (e.g., "WORLD.runs") + * @param prefix - Prefix for span names (e.g., "world.runs") * @param o - Object with methods to instrument * @returns Instrumented object with same interface */ @@ -17,9 +45,33 @@ export function instrumentObject(prefix: string, o: T): T { handlers[key] = o[key]; } else { const f = o[key]; - // @ts-expect-error - handlers[key] = async (...args: any[]) => - trace(`${prefix}.${String(key)}`, {}, () => f(...args)); + const methodName = String(key); + // @ts-expect-error - dynamic function wrapping + handlers[key] = async (...args: unknown[]) => { + // Build span name - for events.create, include the event type + let spanName = `${prefix}.${methodName}`; + if (prefix === 'world.events' && methodName === 'create') { + const eventType = extractEventType(args); + if (eventType) { + spanName = `${prefix}.${methodName} ${eventType}`; + } + } + + return trace( + spanName, + { kind: await getSpanKind('CLIENT') }, + async (span) => { + // Add peer service attributes for service maps + span?.setAttributes({ + ...PeerService(WORKFLOW_SERVER_SERVICE.peerService), + ...RpcSystem(WORKFLOW_SERVER_SERVICE.rpcSystem), + ...RpcService(WORKFLOW_SERVER_SERVICE.rpcService), + ...RpcMethod(`${prefix}.${methodName}`), + }); + return f(...args); + } + ); + }; } } return handlers; diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts index c0f0164940..b80b4b34ea 100644 --- a/packages/world-vercel/src/telemetry.ts +++ b/packages/world-vercel/src/telemetry.ts @@ -120,3 +120,18 @@ export const ErrorType = SemanticConvention('error.type'); export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( 'workflow.world.parse.format' ); + +// RPC/Peer Service attributes - For service maps and dependency tracking +// See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ + +/** The remote service name for Datadog service maps (Datadog-specific: peer.service) */ +export const PeerService = SemanticConvention('peer.service'); + +/** RPC system identifier (standard OTEL: rpc.system) */ +export const RpcSystem = SemanticConvention('rpc.system'); + +/** RPC service name (standard OTEL: rpc.service) */ +export const RpcService = SemanticConvention('rpc.service'); + +/** RPC method name (standard OTEL: rpc.method) */ +export const RpcMethod = SemanticConvention('rpc.method'); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 61ec5bca16..b8dc85de71 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -15,6 +15,9 @@ import { ServerPort, ErrorType, WorldParseFormat, + PeerService, + RpcSystem, + RpcService, } from './telemetry.js'; import { version } from './version.js'; @@ -244,6 +247,10 @@ export async function makeRequest({ ...UrlFull(url), ...(serverAddress && ServerAddress(serverAddress)), ...(serverPort && ServerPort(serverPort)), + // Peer service for Datadog service maps + ...PeerService('workflow-server'), + ...RpcSystem('http'), + ...RpcService('workflow-server'), }); headers.set('Accept', 'application/cbor'); From f0a8a5d04f20a54b08cb0bff3eb02392ea4f65f0 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 23:29:21 -0800 Subject: [PATCH 12/16] Update changesets to reflect all PR changes - step-handler-parallelization.md: Add race condition fix and 409 status code fix - world-vercel-telemetry-tracer.md: Add peer.service and event type in span names - otel-tracing-improvements.md: New changeset for comprehensive OTEL improvements Co-Authored-By: Claude Opus 4.5 --- .changeset/otel-tracing-improvements.md | 17 +++++++++++++++++ .changeset/step-handler-parallelization.md | 7 +++++-- .changeset/world-vercel-telemetry-tracer.md | 6 ++++-- 3 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 .changeset/otel-tracing-improvements.md diff --git a/.changeset/otel-tracing-improvements.md b/.changeset/otel-tracing-improvements.md new file mode 100644 index 0000000000..42c225db9b --- /dev/null +++ b/.changeset/otel-tracing-improvements.md @@ -0,0 +1,17 @@ +--- +"@workflow/core": patch +"@workflow/world-local": patch +--- + +Improve OpenTelemetry tracing instrumentation + +- Add W3C trace context headers to step queue messages for cross-service trace linking +- Add `peer.service` and RPC semantic conventions for external service attribution +- Add `step.hydrate` and `step.dehydrate` spans for argument serialization visibility +- Add `workflow.replay` span for workflow event replay tracking +- Rename `queueMessage` span to `queue.publish` following OTEL messaging conventions +- Add OTEL baggage propagation for workflow context (`workflow.run_id`, `workflow.name`) +- Add span events for milestones: `retry.scheduled`, `step.skipped`, `step.delayed` +- Enhance error telemetry with `recordException()` and error categorization (fatal/retryable/transient) +- Use uppercase span names (WORKFLOW, STEP) for consistency with HTTP spans +- Add world-local OTEL instrumentation matching world-vercel diff --git a/.changeset/step-handler-parallelization.md b/.changeset/step-handler-parallelization.md index 1d099a6e25..50266ffbb8 100644 --- a/.changeset/step-handler-parallelization.md +++ b/.changeset/step-handler-parallelization.md @@ -4,6 +4,9 @@ "@workflow/world-postgres": patch --- -Optimize step handler performance by removing initial world.steps.get() call +Optimize step handler performance and improve server-side validation -Add server-side retryAfter validation to local and postgres worlds to match workflow-server behavior. When a step has a retryAfter timestamp that hasn't been reached, step_started will now return HTTP 425 with the retryAfter timestamp in the response. +- Skip initial `world.steps.get()` call in step handler (saves one HTTP round-trip) +- Add server-side `retryAfter` validation to local and postgres worlds (HTTP 425 when not reached) +- Fix HTTP status code for step terminal state: return 409 (Conflict) instead of 410 +- Fix race condition: await `step_started` event before hydration to ensure correct attempt count diff --git a/.changeset/world-vercel-telemetry-tracer.md b/.changeset/world-vercel-telemetry-tracer.md index 1162e28a47..ef83b5df03 100644 --- a/.changeset/world-vercel-telemetry-tracer.md +++ b/.changeset/world-vercel-telemetry-tracer.md @@ -2,6 +2,8 @@ "@workflow/world-vercel": patch --- -Fix world-vercel telemetry to use parent application's tracer +Improve world-vercel OTEL telemetry -The world-vercel package was creating spans under a separate 'workflow-world-vercel' service name, causing HTTP spans for workflow-server API calls (step_started, step_completed) to be filtered out when viewing traces for the main application service. Now uses the same 'workflow' tracer name as @workflow/core to ensure all spans are reported under the parent application's service. +- Use parent application's 'workflow' tracer instead of separate service name +- Add `peer.service` and RPC semantic conventions for Datadog service maps +- Include event type in `world.events.create` span names (e.g., `world.events.create step_started`) From 9c9ce88e635d922fbecd765998b90ca85cd3fa36 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 23:32:42 -0800 Subject: [PATCH 13/16] Use span name for rpc.method to show event type in Datadog resource Datadog derives the resource name from rpc.method attribute. Updated to use the full span name (which includes event type) instead of just the method name, so Datadog shows "world.events.create step_started" instead of "world.events.create workflow-server". Co-Authored-By: Claude Opus 4.5 --- packages/world-local/src/instrumentObject.ts | 3 ++- packages/world-vercel/src/instrumentObject.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/world-local/src/instrumentObject.ts b/packages/world-local/src/instrumentObject.ts index ae1dff23af..94887b0090 100644 --- a/packages/world-local/src/instrumentObject.ts +++ b/packages/world-local/src/instrumentObject.ts @@ -62,11 +62,12 @@ export function instrumentObject(prefix: string, o: T): T { { kind: await getSpanKind('INTERNAL') }, async (span) => { // Add peer service attributes for service maps + // Use spanName for rpc.method so Datadog shows event type in resource span?.setAttributes({ ...PeerService(WORLD_LOCAL_SERVICE.peerService), ...RpcSystem(WORLD_LOCAL_SERVICE.rpcSystem), ...RpcService(WORLD_LOCAL_SERVICE.rpcService), - ...RpcMethod(`${prefix}.${methodName}`), + ...RpcMethod(spanName), }); return f(...args); } diff --git a/packages/world-vercel/src/instrumentObject.ts b/packages/world-vercel/src/instrumentObject.ts index fbd895286f..9a12a7478d 100644 --- a/packages/world-vercel/src/instrumentObject.ts +++ b/packages/world-vercel/src/instrumentObject.ts @@ -62,11 +62,12 @@ export function instrumentObject(prefix: string, o: T): T { { kind: await getSpanKind('CLIENT') }, async (span) => { // Add peer service attributes for service maps + // Use spanName for rpc.method so Datadog shows event type in resource span?.setAttributes({ ...PeerService(WORKFLOW_SERVER_SERVICE.peerService), ...RpcSystem(WORKFLOW_SERVER_SERVICE.rpcSystem), ...RpcService(WORKFLOW_SERVER_SERVICE.rpcService), - ...RpcMethod(`${prefix}.${methodName}`), + ...RpcMethod(spanName), }); return f(...args); } From db5bf68f5fe91adaac1ffb1714a20ed01cfb5bd2 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 5 Feb 2026 17:14:13 -0800 Subject: [PATCH 14/16] Use lazy ref resolution for events where client discards response data Skip expensive S3 ref resolution (~200-460ms) for event types where the client doesn't use the response entity data (step_created, step_completed, step_failed, run_completed, etc). Only resolve refs for run_created, run_started, and step_started where the client reads the response. Co-Authored-By: Claude Opus 4.6 --- packages/world-vercel/src/events.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 0700f50b48..a2439b01be 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -132,10 +132,23 @@ export async function createWorkflowRunEvent( // For run_created events, runId is null - use "null" string in the URL path const runIdPath = id === null ? 'null' : id; + // Determine remoteRefBehavior based on event type: + // Events where the client uses the response entity data need 'resolve' (default). + // Events where the client discards the response can use 'lazy' to skip expensive + // S3 ref resolution on the server, saving ~200-460ms per event. + const eventsNeedingResolve = new Set([ + 'run_created', // client reads result.run.runId + 'run_started', // client reads result.run (checks startedAt, status) + 'step_started', // client reads result.step (checks attempt, state) + ]); + const remoteRefBehavior = eventsNeedingResolve.has(data.eventType) + ? 'resolve' + : 'lazy'; + const wireResult = await makeRequest({ endpoint: `/v2/runs/${runIdPath}/events`, options: { method: 'POST' }, - data, + data: { ...data, remoteRefBehavior }, config, schema: EventResultWireSchema, }); From e4cfaf8769f445c1f20653dc210c5aa58cf48c03 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 5 Feb 2026 17:18:31 -0800 Subject: [PATCH 15/16] Update changeset to include lazy ref resolution optimization Co-Authored-By: Claude Opus 4.6 --- .changeset/world-vercel-telemetry-tracer.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.changeset/world-vercel-telemetry-tracer.md b/.changeset/world-vercel-telemetry-tracer.md index ef83b5df03..754fff3312 100644 --- a/.changeset/world-vercel-telemetry-tracer.md +++ b/.changeset/world-vercel-telemetry-tracer.md @@ -2,8 +2,9 @@ "@workflow/world-vercel": patch --- -Improve world-vercel OTEL telemetry +Improve world-vercel telemetry and event creation performance - Use parent application's 'workflow' tracer instead of separate service name - Add `peer.service` and RPC semantic conventions for Datadog service maps - Include event type in `world.events.create` span names (e.g., `world.events.create step_started`) +- Use lazy ref resolution for fire-and-forget events to skip S3 ref resolution (~200-460ms savings) From 0058bb4b7cae7f3c7f8fdf22385502f7eea69b0a Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 6 Feb 2026 11:39:54 -0800 Subject: [PATCH 16/16] Address PR review comments: module scope const, comment cleanup, status code audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move eventsNeedingResolve Set to module scope (PR review #2) - Rewrite changelog-style OPTIMIZATION comments as current-state docs (PR review #4) - Fix run terminal state errors: 410 → 409 in world-local and world-postgres to match workflow-server's InvalidOperationStateError (409) (PR review #3) - Fix remaining step terminal state 410 → 409 in world-postgres fallback paths Co-Authored-By: Claude Opus 4.6 --- packages/core/src/runtime/step-handler.ts | 17 ++++++----------- .../world-local/src/storage/events-storage.ts | 4 ++-- packages/world-postgres/src/storage.ts | 8 ++++---- packages/world-vercel/src/events.ts | 18 +++++++++--------- 4 files changed, 21 insertions(+), 26 deletions(-) diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 3164bbc076..91c024c572 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -63,9 +63,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( const stepName = metadata.queueName.slice('__wkf_step_'.length); const world = getWorld(); - // OPTIMIZATION: Run local async operations concurrently - // Note: We no longer call world.steps.get() here - we rely on step_started - // to return the step entity, saving one HTTP round-trip + // Resolve local async values concurrently before entering the trace span const [port, spanKind] = await Promise.all([ getPort(), getSpanKind('CONSUMER'), @@ -106,9 +104,8 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepTracePropagated(!!traceContext), }); - // OPTIMIZATION: Call step_started first - server validates state and returns step entity - // This eliminates the separate world.steps.get() HTTP call, saving 50-80ms per step. - // The server now validates: + // step_started validates state and returns the step entity, so no separate + // world.steps.get() call is needed. The server checks: // - Step not in terminal state (returns 409) // - retryAfter timestamp reached (returns 425 with Retry-After header) // - Workflow still active (returns 410 if completed) @@ -366,9 +363,8 @@ const stepHandler = getWorldHandlers().createQueueHandler( }) ); - // OPTIMIZATION 2: Parallelize step_completed with trace serialization - // Run step_completed event creation and serializeTraceCarrier() concurrently - // The trace carrier will be used in the final queueMessage call + // Run step_completed and trace serialization concurrently; + // the trace carrier is used in the final queueMessage call below const [, traceCarrier] = await Promise.all([ world.events.create(workflowRunId, { eventType: 'step_completed', @@ -386,8 +382,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepResultType(typeof result), }); - // Queue the workflow continuation immediately after step_completed - // Using pre-computed traceCarrier from parallel operation + // Queue the workflow continuation with the concurrently-resolved trace carrier await queueMessage(world, `__wkf_workflow_${workflowName}`, { runId: workflowRunId, traceCarrier, diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index c8156cf919..03f7aca119 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -149,7 +149,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot transition run from terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } @@ -160,7 +160,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot create new entities on run in terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } } diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index f6e086151f..b7b0240217 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -412,7 +412,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot transition run from terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } @@ -423,7 +423,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot create new entities on run in terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } } @@ -727,7 +727,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { if (['completed', 'failed'].includes(existing.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${existing.status}"`, - { status: 410 } + { status: 409 } ); } } @@ -781,7 +781,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { if (['completed', 'failed'].includes(existing.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${existing.status}"`, - { status: 410 } + { status: 409 } ); } } diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index a2439b01be..9c4eddce14 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -50,6 +50,15 @@ const EventWithRefsSchema = z.object({ specVersion: z.number().default(1), }); +// Events where the client uses the response entity data need 'resolve' (default). +// Events where the client discards the response can use 'lazy' to skip expensive +// S3 ref resolution on the server, saving ~200-460ms per event. +const eventsNeedingResolve = new Set([ + 'run_created', // client reads result.run.runId + 'run_started', // client reads result.run (checks startedAt, status) + 'step_started', // client reads result.step (checks attempt, state) +]); + // Functions export async function getWorkflowRunEvents( params: ListEventsParams | ListEventsByCorrelationIdParams, @@ -132,15 +141,6 @@ export async function createWorkflowRunEvent( // For run_created events, runId is null - use "null" string in the URL path const runIdPath = id === null ? 'null' : id; - // Determine remoteRefBehavior based on event type: - // Events where the client uses the response entity data need 'resolve' (default). - // Events where the client discards the response can use 'lazy' to skip expensive - // S3 ref resolution on the server, saving ~200-460ms per event. - const eventsNeedingResolve = new Set([ - 'run_created', // client reads result.run.runId - 'run_started', // client reads result.run (checks startedAt, status) - 'step_started', // client reads result.step (checks attempt, state) - ]); const remoteRefBehavior = eventsNeedingResolve.has(data.eventType) ? 'resolve' : 'lazy';