diff --git a/.changeset/otel-tracing-improvements.md b/.changeset/otel-tracing-improvements.md new file mode 100644 index 0000000000..42c225db9b --- /dev/null +++ b/.changeset/otel-tracing-improvements.md @@ -0,0 +1,17 @@ +--- +"@workflow/core": patch +"@workflow/world-local": patch +--- + +Improve OpenTelemetry tracing instrumentation + +- Add W3C trace context headers to step queue messages for cross-service trace linking +- Add `peer.service` and RPC semantic conventions for external service attribution +- Add `step.hydrate` and `step.dehydrate` spans for argument serialization visibility +- Add `workflow.replay` span for workflow event replay tracking +- Rename `queueMessage` span to `queue.publish` following OTEL messaging conventions +- Add OTEL baggage propagation for workflow context (`workflow.run_id`, `workflow.name`) +- Add span events for milestones: `retry.scheduled`, `step.skipped`, `step.delayed` +- Enhance error telemetry with `recordException()` and error categorization (fatal/retryable/transient) +- Use uppercase span names (WORKFLOW, STEP) for consistency with HTTP spans +- Add world-local OTEL instrumentation matching world-vercel diff --git a/.changeset/step-handler-parallelization.md b/.changeset/step-handler-parallelization.md new file mode 100644 index 0000000000..50266ffbb8 --- /dev/null +++ b/.changeset/step-handler-parallelization.md @@ -0,0 +1,12 @@ +--- +"@workflow/core": patch +"@workflow/world-local": patch +"@workflow/world-postgres": patch +--- + +Optimize step handler performance and improve server-side validation + +- Skip initial `world.steps.get()` call in step handler (saves one HTTP round-trip) +- Add server-side `retryAfter` validation to local and postgres worlds (HTTP 425 when not reached) +- Fix HTTP status code for step terminal state: return 409 (Conflict) instead of 410 +- Fix race condition: await `step_started` event before hydration to ensure correct attempt count diff --git a/.changeset/world-vercel-telemetry-tracer.md b/.changeset/world-vercel-telemetry-tracer.md new file mode 100644 index 0000000000..754fff3312 --- /dev/null +++ b/.changeset/world-vercel-telemetry-tracer.md @@ -0,0 +1,10 @@ +--- +"@workflow/world-vercel": patch +--- + +Improve world-vercel telemetry and event creation performance + +- Use parent application's 'workflow' tracer instead of separate service name +- Add `peer.service` and RPC semantic conventions for Datadog service maps +- Include event type in `world.events.create` span names (e.g., `world.events.create step_started`) +- Use lazy ref resolution for fire-and-forget events to skip S3 ref resolution (~200-460ms savings) diff --git a/.claude/commands/demo.md b/.claude/commands/demo.md index 487dd22bc1..b6525bf0a8 100644 --- a/.claude/commands/demo.md +++ b/.claude/commands/demo.md @@ -1,9 +1,51 @@ --- description: Run the 7_full demo workflow -allowed-tools: Bash(curl:*), Bash(npx workflow:*), Bash(pnpm dev) +allowed-tools: Bash(curl:*), Bash(npx workflow:*), Bash(pnpm dev), Bash(docker *), Bash(open *) --- +Run the demo workflow with OpenTelemetry tracing enabled. -Start the $ARUGMENTS workbench (default to the nextjs turboback workbench available in the workbenches directory). Run it in dev mode, and also start the workflow web UI (run `npx workflow web` inside the appropriate workbench directory). +## Steps -Then trigger the 7_full.ts workflow example. you can see how to trigger a specific example by looking at the trigger API route for the workbench - it is probably just a POST request using bash (maybe curl) to this endpoint: > +1. **Start Jaeger** for OTEL trace visualization (if not already running): + ```bash + docker run -d --name jaeger-otel \ + -p 16686:16686 \ + -p 4317:4317 \ + -p 4318:4318 \ + jaegertracing/jaeger:2.4.0 2>&1 || docker start jaeger-otel + ``` + +2. **Open the Jaeger UI** in the browser: + ```bash + open http://localhost:16686 + ``` + +3. **Start the workbench** (default: nextjs-turbopack) with OTEL tracing enabled: + ```bash + cd workbench/nextjs-turbopack + OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318" \ + OTEL_EXPORTER_OTLP_PROTOCOL="http/protobuf" \ + pnpm dev + ``` + + Also start the workflow web UI in a separate terminal: + ```bash + npx workflow web + ``` + +4. **Trigger the 7_full.ts workflow**: + ```bash + curl -X POST "http://localhost:3000/api/trigger?workflowFile=workflows/7_full.ts&workflowFn=handleUserSignup" + ``` + +5. **View traces** in Jaeger UI at http://localhost:16686 - select service `example-nextjs-workflow` + +## Tracing Details + +The traces include: +- Step execution spans with `workflow.queue.overhead_ms`, `step.attempt`, `step.status` +- Workflow orchestration spans with `workflow.run.status`, `workflow.events.count` +- Queue message spans with messaging attributes + +$ARGUMENTS can specify a different workbench (e.g., "example" or "nextjs-webpack"). diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 6e229a4971..1d28b5d8a9 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -19,7 +19,12 @@ import { handleSuspension } from './runtime/suspension-handler.js'; import { getWorld, getWorldHandlers } from './runtime/world.js'; import { remapErrorStack } from './source-map.js'; import * as Attribute from './telemetry/semantic-conventions.js'; -import { linkToCurrentContext, trace, withTraceContext } from './telemetry.js'; +import { + linkToCurrentContext, + trace, + withTraceContext, + withWorkflowBaggage, +} from './telemetry.js'; import { getErrorName, getErrorStack } from './types.js'; import { buildWorkflowSuspensionMessage } from './util.js'; import { runWorkflow } from './workflow.js'; @@ -84,210 +89,234 @@ export function workflowEntrypoint( const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); const spanLinks = await linkToCurrentContext(); - // Invoke user workflow within the propagated trace context + // Invoke user workflow within the propagated trace context and baggage return await withTraceContext(traceContext, async () => { - const world = getWorld(); - return trace( - `WORKFLOW ${workflowName}`, - { links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowOperation('execute'), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - // TODO: validate `workflowName` exists before consuming message? - - span?.setAttributes({ - ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowTracePropagated(!!traceContext), - }); + // Set workflow context as baggage for automatic propagation + return await withWorkflowBaggage( + { workflowRunId: runId, workflowName }, + async () => { + const world = getWorld(); + return trace( + `WORKFLOW ${workflowName}`, + { links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowOperation('execute'), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - let workflowStartedAt = -1; - try { - let workflowRun = await world.runs.get(runId); + // TODO: validate `workflowName` exists before consuming message? - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, + span?.setAttributes({ + ...Attribute.WorkflowRunId(runId), + ...Attribute.WorkflowTracePropagated(!!traceContext), }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; + let workflowStartedAt = -1; + try { + let workflowRun = await world.runs.get(runId); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` + ); + } + workflowRun = result.run; + } - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); } - ); + workflowStartedAt = +workflowRun.startedAt; - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - return; - } + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); + + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + return; + } - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); + // Load all events into memory before running + const events = await getAllWorkflowRunEvents( + workflowRun.runId + ); - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - const result = await world.events.create(runId, waitEvent); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - const result = await runWorkflow( - workflowCode, - workflowRun, - events - ); + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + const result = await world.events.create(runId, waitEvent); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } - // Complete the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, - }); + const result = await trace( + 'workflow.replay', + {}, + async (replaySpan) => { + replaySpan?.setAttributes({ + ...Attribute.WorkflowEventsCount(events.length), + }); + return await runWorkflow( + workflowCode, + workflowRun, + events + ); + } + ); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount - ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } + // Complete the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: result, + }, + }); - const result = await handleSuspension({ - suspension: err, - world, - runId, - workflowName, - workflowStartedAt, - span, - }); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), + }); + } catch (err) { + if (WorkflowSuspension.is(err)) { + const suspensionMessage = buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; - } - } else { - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. + const result = await handleSuspension({ + suspension: err, + world, + runId, + workflowName, + workflowStartedAt, + span, + }); - const errorName = getErrorName(err); - const errorMessage = - err instanceof Error ? err.message : String(err); - let errorStack = getErrorStack(err); + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + } else { + // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError + // (for instance when the event log is corrupted, this is thrown by the event consumer). We could + // specially handle these if needed. - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); - } + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); - // Fail the workflow run via event (event-sourced architecture) - await world.events.create(runId, { - eventType: 'run_failed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them - }, - }); + const errorName = getErrorName(err); + const errorMessage = + err instanceof Error ? err.message : String(err); + let errorStack = getErrorStack(err); - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(String(err)), - }); + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + // Fail the workflow run via event (event-sourced architecture) + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them + }, + }); + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(String(err)), + ...Attribute.ErrorType(errorName), + }); + } + } } - } + ); // End trace } - ); // End withTraceContext - }); + ); // End withWorkflowBaggage + }); // End withTraceContext } ); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 0c1e361330..2493efe3e6 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -339,13 +339,18 @@ export async function queueMessage( ) { const queueName = args[0]; await trace( - 'queueMessage', + 'queue.publish', { // Standard OTEL messaging conventions attributes: { ...Attribute.MessagingSystem('vercel-queue'), ...Attribute.MessagingDestinationName(queueName), ...Attribute.MessagingOperationType('publish'), + // Peer service for Datadog service maps + ...Attribute.PeerService('vercel-queue'), + ...Attribute.RpcSystem('vercel-queue'), + ...Attribute.RpcService('vqs'), + ...Attribute.RpcMethod('publish'), }, kind: await getSpanKind('PRODUCER'), }, diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 38a6652b8a..91c024c572 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -63,12 +63,15 @@ const stepHandler = getWorldHandlers().createQueueHandler( const stepName = metadata.queueName.slice('__wkf_step_'.length); const world = getWorld(); - // Get the port early to avoid async operations during step execution - const port = await getPort(); + // Resolve local async values concurrently before entering the trace span + const [port, spanKind] = await Promise.all([ + getPort(), + getSpanKind('CONSUMER'), + ]); return trace( - `step ${stepName}`, - { kind: await getSpanKind('CONSUMER'), links: spanLinks }, + `STEP ${stepName}`, + { kind: spanKind, links: spanLinks }, async (span) => { span?.setAttributes({ ...Attribute.StepName(stepName), @@ -101,7 +104,102 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepTracePropagated(!!traceContext), }); - let step = await world.steps.get(workflowRunId, stepId); + // step_started validates state and returns the step entity, so no separate + // world.steps.get() call is needed. The server checks: + // - Step not in terminal state (returns 409) + // - retryAfter timestamp reached (returns 425 with Retry-After header) + // - Workflow still active (returns 410 if completed) + let step; + try { + const startResult = await world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }); + + if (!startResult.step) { + throw new WorkflowRuntimeError( + `step_started event for "${stepId}" did not return step entity` + ); + } + step = startResult.step; + } catch (err) { + if (WorkflowAPIError.is(err)) { + // 410 Gone: Workflow has already completed + if (err.status === 410) { + console.warn( + `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` + ); + return; + } + + // 409 Conflict: Step in terminal state (completed/failed/cancelled) + // Re-enqueue the workflow to continue processing + if (err.status === 409) { + runtimeLogger.debug( + 'Step in terminal state, re-enqueuing workflow', + { + stepName, + stepId, + workflowRunId, + error: err.message, + } + ); + span?.setAttributes({ + ...Attribute.StepSkipped(true), + // Use 'completed' as a representative terminal state for the skip reason + ...Attribute.StepSkipReason('completed'), + }); + // Add span event for step skip + span?.addEvent?.('step.skipped', { + 'skip.reason': 'terminal_state', + 'step.name': stepName, + 'step.id': stepId, + }); + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }); + return; + } + + // 425 Too Early: retryAfter timestamp not reached yet + // Return timeout to queue so it retries later + if (err.status === 425) { + // Parse retryAfter from error response meta + const retryAfterStr = (err as any).meta?.retryAfter; + const retryAfter = retryAfterStr + ? new Date(retryAfterStr) + : new Date(Date.now() + 1000); + const timeoutSeconds = Math.max( + 1, + Math.ceil((retryAfter.getTime() - Date.now()) / 1000) + ); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + }); + // Add span event for delayed retry + span?.addEvent?.('step.delayed', { + 'delay.reason': 'retry_after_not_reached', + 'delay.timeout_seconds': timeoutSeconds, + 'delay.retry_after': retryAfter.toISOString(), + }); + runtimeLogger.debug( + 'Step retryAfter timestamp not yet reached', + { + stepName, + stepId, + retryAfter, + timeoutSeconds, + } + ); + return { timeoutSeconds }; + } + } + // Re-throw other errors + throw err; + } runtimeLogger.debug('Step execution details', { stepName, @@ -114,32 +212,10 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepStatus(step.status), }); - // Check if the step has a `retryAfter` timestamp that hasn't been reached yet - const now = Date.now(); - if (step.retryAfter && step.retryAfter.getTime() > now) { - const timeoutSeconds = Math.ceil( - (step.retryAfter.getTime() - now) / 1000 - ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - }); - runtimeLogger.debug('Step retryAfter timestamp not yet reached', { - stepName, - stepId: step.stepId, - retryAfter: step.retryAfter, - timeoutSeconds, - }); - return { timeoutSeconds }; - } - let result: unknown; - // Check max retries FIRST before any state changes. + // Check max retries AFTER step_started (attempt was just incremented) // step.attempt tracks how many times step_started has been called. - // If step.attempt >= maxRetries, we've already tried maxRetries times. - // This handles edge cases where the step handler is invoked after max retries have been exceeded - // (e.g., when the step repeatedly times out or fails before reaching the catch handler). - // Without this check, the step would retry forever. // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. // The post-failure check uses >= to decide whether to retry after a failure. @@ -184,68 +260,8 @@ const stepHandler = getWorldHandlers().createQueueHandler( } try { - if (!['pending', 'running'].includes(step.status)) { - // We should only be running the step if it's either - // a) pending - initial state, or state set on re-try - // b) running - if a step fails mid-execution, like a function timeout - // otherwise, the step has been invoked erroneously - stepLogger.warn('Step invoked erroneously, skipping execution', { - workflowRunId, - stepName, - expectedStatus: ['pending', 'running'], - actualStatus: step.status, - }); - span?.setAttributes({ - ...Attribute.StepSkipped(true), - ...Attribute.StepSkipReason(step.status), - }); - // There's a chance that a step terminates correctly, but the underlying process - // fails or gets killed before the stepEntrypoint has a chance to re-enqueue the run. - // The queue lease expires and stepEntrypoint again, which leads us here, so - // we optimistically re-enqueue the workflow if the step is in a terminal state, - // under the assumption that this edge case happened. - // Until we move to atomic entity/event updates (World V2), there _could_ be an edge case - // where the we execute this code based on the `step` entity status, but the runtime - // failed to create the `step_completed` event (due to failing between step and event update), - // in which case, this might lead to an infinite loop. - // https://vercel.slack.com/archives/C09125LC4AX/p1765313809066679 - const isTerminalStep = [ - 'completed', - 'failed', - 'cancelled', - ].includes(step.status); - if (isTerminalStep) { - await queueMessage( - world, - `__wkf_workflow_${workflowName}`, - { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { - headers: { 'x-workflow-run-id': workflowRunId }, - } - ); - } - return; - } - - // Start the step via event (event-sourced architecture) - // step_started increments the attempt counter in the World implementation - const startResult = await world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }); - - // Use the step entity from the event response (no extra get call needed) - if (!startResult.step) { - throw new WorkflowRuntimeError( - `step_started event for "${stepId}" did not return step entity` - ); - } - step = startResult.step; + // step_started already validated the step is in valid state (pending/running) + // and returned the updated step entity with incremented attempt // step.attempt is now the current attempt number (after increment) const attempt = step.attempt; @@ -255,51 +271,61 @@ const stepHandler = getWorldHandlers().createQueueHandler( `Step "${stepId}" has no "startedAt" timestamp` ); } + // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) + const stepStartedAt = step.startedAt; + // Hydrate the step input arguments, closure variables, and thisVal - // Track deserialization time for observability // NOTE: This captures only the synchronous portion of hydration. Any async // operations (e.g., stream loading) are added to `ops` and executed later // via Promise.all(ops) - their timing is not included in this measurement. - const deserializeStartTime = Date.now(); const ops: Promise[] = []; - const hydratedInput = hydrateStepArguments( - step.input, - ops, - workflowRunId + const hydratedInput = await trace( + 'step.hydrate', + {}, + async (hydrateSpan) => { + const startTime = Date.now(); + const result = hydrateStepArguments( + step.input, + ops, + workflowRunId + ); + const durationMs = Date.now() - startTime; + hydrateSpan?.setAttributes({ + ...Attribute.StepArgumentsCount(result.args.length), + ...Attribute.QueueDeserializeTimeMs(durationMs), + }); + return result; + } ); - const deserializeTimeMs = Date.now() - deserializeStartTime; const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; - span?.setAttributes({ - ...Attribute.StepArgumentsCount(args.length), - ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), - }); - - // Track execution time for observability + // Execute the step function with tracing const executionStartTime = Date.now(); - result = await contextStorage.run( - { - stepMetadata: { - stepId, - stepStartedAt: new Date(+step.startedAt), - attempt, - }, - workflowMetadata: { - workflowRunId, - workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: process.env.VERCEL_URL - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, + result = await trace('step.execute', {}, async () => { + return await contextStorage.run( + { + stepMetadata: { + stepId, + stepStartedAt: new Date(+stepStartedAt), + attempt, + }, + workflowMetadata: { + workflowRunId, + workflowStartedAt: new Date(+workflowStartedAt), + // TODO: there should be a getUrl method on the world interface itself. This + // solution only works for vercel + local worlds. + url: process.env.VERCEL_URL + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${port ?? 3000}`, + }, + ops, + closureVars: hydratedInput.closureVars, }, - ops, - closureVars: hydratedInput.closureVars, - }, - () => stepFn.apply(thisVal, args) - ); + () => stepFn.apply(thisVal, args) + ); + }); const executionTimeMs = Date.now() - executionStartTime; span?.setAttributes({ @@ -309,14 +335,24 @@ const stepHandler = getWorldHandlers().createQueueHandler( // NOTE: None of the code from this point is guaranteed to run // Since the step might fail or cause a function timeout and the process might be SIGKILL'd // The workflow runtime must be resilient to the below code not executing on a failed step - // Track serialization time for observability - const serializeStartTime = Date.now(); - result = dehydrateStepReturnValue(result, ops, workflowRunId); - const serializeTimeMs = Date.now() - serializeStartTime; - - span?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(serializeTimeMs), - }); + result = await trace( + 'step.dehydrate', + {}, + async (dehydrateSpan) => { + const startTime = Date.now(); + const dehydrated = dehydrateStepReturnValue( + result, + ops, + workflowRunId + ); + const durationMs = Date.now() - startTime; + dehydrateSpan?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(durationMs), + ...Attribute.StepResultType(typeof dehydrated), + }); + return dehydrated; + } + ); waitUntil( Promise.all(ops).catch((err) => { @@ -327,26 +363,53 @@ const stepHandler = getWorldHandlers().createQueueHandler( }) ); - // Complete the step via event (event-sourced architecture) - // The event creation atomically updates the step entity - // result was dehydrated above by dehydrateStepReturnValue, which returns Uint8Array - await world.events.create(workflowRunId, { - eventType: 'step_completed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - result: result as Uint8Array, - }, - }); + // Run step_completed and trace serialization concurrently; + // the trace carrier is used in the final queueMessage call below + const [, traceCarrier] = await Promise.all([ + world.events.create(workflowRunId, { + eventType: 'step_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + result: result as Uint8Array, + }, + }), + serializeTraceCarrier(), + ]); span?.setAttributes({ ...Attribute.StepStatus('completed'), ...Attribute.StepResultType(typeof result), }); + + // Queue the workflow continuation with the concurrently-resolved trace carrier + await queueMessage(world, `__wkf_workflow_${workflowName}`, { + runId: workflowRunId, + traceCarrier, + requestedAt: new Date(), + }); + return; } catch (err: unknown) { + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + // Determine error category and retryability + const isFatal = FatalError.is(err); + const isRetryable = RetryableError.is(err); + const errorCategory = isFatal + ? 'fatal' + : isRetryable + ? 'retryable' + : 'transient'; + span?.setAttributes({ ...Attribute.StepErrorName(getErrorName(err)), ...Attribute.StepErrorMessage(String(err)), + ...Attribute.ErrorType(getErrorName(err)), + ...Attribute.ErrorCategory(errorCategory), + ...Attribute.ErrorRetryable(!isFatal), }); if (WorkflowAPIError.is(err)) { @@ -364,7 +427,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( } } - if (FatalError.is(err)) { + if (isFatal) { const errorStack = getErrorStack(err); stepLogger.error( 'Encountered FatalError while executing step, bubbling up to parent workflow', @@ -479,6 +542,13 @@ const stepHandler = getWorldHandlers().createQueueHandler( ...Attribute.StepRetryWillRetry(true), }); + // Add span event for retry scheduling + span?.addEvent?.('retry.scheduled', { + 'retry.timeout_seconds': timeoutSeconds, + 'retry.attempt': currentAttempt, + 'retry.max_retries': maxRetries, + }); + // It's a retryable error - so have the queue keep the message visible // so that it gets retried. return { timeoutSeconds }; diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index da83f02a08..5f9f4b7852 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -19,6 +19,23 @@ import * as Attribute from '../telemetry/semantic-conventions.js'; import { serializeTraceCarrier } from '../telemetry.js'; import { queueMessage } from './helpers.js'; +/** + * Extracts W3C trace context headers from a trace carrier for HTTP propagation. + * Returns an object with `traceparent` and optionally `tracestate` headers. + */ +function extractTraceHeaders( + traceCarrier: Record +): Record { + const headers: Record = {}; + if (traceCarrier.traceparent) { + headers.traceparent = traceCarrier.traceparent; + } + if (traceCarrier.tracestate) { + headers.tracestate = traceCarrier.tracestate; + } + return headers; +} + export interface SuspensionHandlerParams { suspension: WorkflowSuspension; world: World; @@ -169,6 +186,10 @@ export async function handleSuspension({ } // Queue step execution message + // Serialize trace context once and include in both payload and headers + // Payload: for manual context restoration in step handler + // Headers: for automatic trace propagation by Vercel's infrastructure + const traceCarrier = await serializeTraceCarrier(); await queueMessage( world, `__wkf_step_${queueItem.stepName}`, @@ -177,12 +198,15 @@ export async function handleSuspension({ workflowRunId: runId, workflowStartedAt, stepId: queueItem.correlationId, - traceCarrier: await serializeTraceCarrier(), + traceCarrier, requestedAt: new Date(), }, { idempotencyKey: queueItem.correlationId, - headers: { 'x-workflow-run-id': runId }, + headers: { + 'x-workflow-run-id': runId, + ...extractTraceHeaders(traceCarrier), + }, } ); })() diff --git a/packages/core/src/telemetry.ts b/packages/core/src/telemetry.ts index 12ed60033f..1b26294b5e 100644 --- a/packages/core/src/telemetry.ts +++ b/packages/core/src/telemetry.ts @@ -188,3 +188,67 @@ export function linkToCurrentContext(): Promise<[api.Link] | undefined> { return [{ context }]; }); } + +// ============================================================ +// Baggage Propagation Utilities +// ============================================================ + +/** + * Workflow context to propagate via baggage + */ +export interface WorkflowBaggageContext { + workflowRunId: string; + workflowName: string; +} + +/** + * Sets workflow context as OTEL baggage for automatic propagation. + * Baggage is propagated across service boundaries via HTTP headers. + * @param context - Workflow context to set as baggage + * @returns A function to run within the baggage context + */ +export async function withWorkflowBaggage( + context: WorkflowBaggageContext, + fn: () => Promise +): Promise { + const otel = await OtelApi.value; + if (!otel) return fn(); + + // Create baggage with workflow context + const baggage = otel.propagation.createBaggage({ + 'workflow.run_id': { value: context.workflowRunId }, + 'workflow.name': { value: context.workflowName }, + }); + + // Set baggage in context and run function + const contextWithBaggage = otel.propagation.setBaggage( + otel.context.active(), + baggage + ); + + return otel.context.with(contextWithBaggage, () => fn()); +} + +/** + * Retrieves workflow context from OTEL baggage. + * @returns Workflow context if present in baggage, undefined otherwise + */ +export async function getWorkflowBaggage(): Promise< + WorkflowBaggageContext | undefined +> { + const otel = await OtelApi.value; + if (!otel) return undefined; + + const baggage = otel.propagation.getBaggage(otel.context.active()); + if (!baggage) return undefined; + + const runId = baggage.getEntry('workflow.run_id')?.value; + const name = baggage.getEntry('workflow.name')?.value; + + if (!runId || !name) return undefined; + + return { + workflowRunId: runId, + workflowName: name, + }; +} diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index b9ca1017e5..afe86b1ecb 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -298,3 +298,29 @@ export const QueueExecutionTimeMs = SemanticConvention( export const QueueSerializeTimeMs = SemanticConvention( 'workflow.queue.serialize_time_ms' ); + +// RPC/Peer Service attributes - For service maps and dependency tracking +// See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ + +/** The remote service name for Datadog service maps (Datadog-specific: peer.service) */ +export const PeerService = SemanticConvention('peer.service'); + +/** RPC system identifier (standard OTEL: rpc.system) */ +export const RpcSystem = SemanticConvention('rpc.system'); + +/** RPC service name (standard OTEL: rpc.service) */ +export const RpcService = SemanticConvention('rpc.service'); + +/** RPC method name (standard OTEL: rpc.method) */ +export const RpcMethod = SemanticConvention('rpc.method'); + +// Error attributes - Standard OTEL error conventions +// See: https://opentelemetry.io/docs/specs/semconv/exceptions/exceptions-spans/ + +/** Whether the error is retryable (workflow-specific) */ +export const ErrorRetryable = SemanticConvention('error.retryable'); + +/** Error category (workflow-specific: fatal, retryable, transient) */ +export const ErrorCategory = SemanticConvention< + 'fatal' | 'retryable' | 'transient' +>('error.category'); diff --git a/packages/world-local/src/instrumentObject.ts b/packages/world-local/src/instrumentObject.ts new file mode 100644 index 0000000000..94887b0090 --- /dev/null +++ b/packages/world-local/src/instrumentObject.ts @@ -0,0 +1,79 @@ +/** + * Utility to instrument object methods with tracing. + * This mirrors world-vercel's implementation for consistent observability. + */ +import { + trace, + getSpanKind, + PeerService, + RpcSystem, + RpcService, + RpcMethod, +} from './telemetry.js'; + +/** Configuration for peer service attribution */ +const WORLD_LOCAL_SERVICE = { + peerService: 'world-local', + rpcSystem: 'local', + rpcService: 'world-local', +}; + +/** + * Extracts the event type from arguments for events.create calls. + * The event data is the second argument and contains eventType. + */ +function extractEventType(args: unknown[]): string | undefined { + if (args.length >= 2 && typeof args[1] === 'object' && args[1] !== null) { + const data = args[1] as Record; + if (typeof data.eventType === 'string') { + return data.eventType; + } + } + return undefined; +} + +/** + * Wraps all methods of an object with tracing spans. + * @param prefix - Prefix for span names (e.g., "world.runs") + * @param o - Object with methods to instrument + * @returns Instrumented object with same interface + */ +export function instrumentObject(prefix: string, o: T): T { + const handlers = {} as T; + for (const key of Object.keys(o) as (keyof T)[]) { + if (typeof o[key] !== 'function') { + handlers[key] = o[key]; + } else { + const f = o[key]; + const methodName = String(key); + // @ts-expect-error - dynamic function wrapping + handlers[key] = async (...args: unknown[]) => { + // Build span name - for events.create, include the event type + let spanName = `${prefix}.${methodName}`; + if (prefix === 'world.events' && methodName === 'create') { + const eventType = extractEventType(args); + if (eventType) { + spanName = `${prefix}.${methodName} ${eventType}`; + } + } + + return trace( + spanName, + { kind: await getSpanKind('INTERNAL') }, + async (span) => { + // Add peer service attributes for service maps + // Use spanName for rpc.method so Datadog shows event type in resource + span?.setAttributes({ + ...PeerService(WORLD_LOCAL_SERVICE.peerService), + ...RpcSystem(WORLD_LOCAL_SERVICE.rpcSystem), + ...RpcService(WORLD_LOCAL_SERVICE.rpcService), + ...RpcMethod(spanName), + }); + return f(...args); + } + ); + }; + } + } + return handlers; +} diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 7765708638..03f7aca119 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -149,7 +149,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot transition run from terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } @@ -160,7 +160,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot create new entities on run in terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } } @@ -194,7 +194,7 @@ export function createEventsStorage(basedir: string): Storage['events'] { if (isStepTerminal(validatedStep.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${validatedStep.status}"`, - { status: 410 } + { status: 409 } ); } @@ -409,6 +409,23 @@ export function createEventsStorage(basedir: string): Storage['events'] { // Sets startedAt only on the first start (not updated on retries) // Reuse validatedStep from validation (already read above) if (validatedStep) { + // Check if retryAfter timestamp hasn't been reached yet + if ( + validatedStep.retryAfter && + validatedStep.retryAfter.getTime() > Date.now() + ) { + const err = new WorkflowAPIError( + `Cannot start step "${data.correlationId}": retryAfter timestamp has not been reached yet`, + { status: 425 } + ); + // Add meta for step-handler to extract retryAfter timestamp + (err as any).meta = { + stepId: data.correlationId, + retryAfter: validatedStep.retryAfter.toISOString(), + }; + throw err; + } + const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`; const stepPath = path.join( basedir, @@ -422,6 +439,8 @@ export function createEventsStorage(basedir: string): Storage['events'] { startedAt: validatedStep.startedAt ?? now, // Increment attempt counter on every start attempt: validatedStep.attempt + 1, + // Clear retryAfter now that the step has started + retryAfter: undefined, updatedAt: now, }; await writeJSON(stepPath, step, { overwrite: true }); diff --git a/packages/world-local/src/storage/index.ts b/packages/world-local/src/storage/index.ts index 937bb4356a..0c7e106e6f 100644 --- a/packages/world-local/src/storage/index.ts +++ b/packages/world-local/src/storage/index.ts @@ -1,4 +1,5 @@ import type { Storage } from '@workflow/world'; +import { instrumentObject } from '../instrumentObject.js'; import { createEventsStorage } from './events-storage.js'; import { createHooksStorage } from './hooks-storage.js'; import { createRunsStorage } from './runs-storage.js'; @@ -8,14 +9,26 @@ import { createStepsStorage } from './steps-storage.js'; * Creates a complete storage implementation using the filesystem. * This is the main entry point that composes all storage implementations. * + * All storage methods are instrumented with tracing spans for observability. + * * @param basedir - The base directory for storing workflow data - * @returns A complete Storage implementation + * @returns A complete Storage implementation with tracing */ export function createStorage(basedir: string): Storage { - return { + // Create raw storage implementations + const storage: Storage = { runs: createRunsStorage(basedir), steps: createStepsStorage(basedir), events: createEventsStorage(basedir), hooks: createHooksStorage(basedir), }; + + // Instrument all storage methods with tracing + // NOTE: Span names are lowercase per OTEL semantic conventions + return { + runs: instrumentObject('world.runs', storage.runs), + steps: instrumentObject('world.steps', storage.steps), + events: instrumentObject('world.events', storage.events), + hooks: instrumentObject('world.hooks', storage.hooks), + }; } diff --git a/packages/world-local/src/telemetry.ts b/packages/world-local/src/telemetry.ts new file mode 100644 index 0000000000..8a1fc6310a --- /dev/null +++ b/packages/world-local/src/telemetry.ts @@ -0,0 +1,102 @@ +/** + * Minimal telemetry utilities for world-local package. + * + * NOTE: This module is a simplified version of world-vercel's telemetry. + * It provides tracing capabilities for local development to match + * the observability experience in production. + * + * IMPORTANT: This module uses the same tracer name 'workflow' as @workflow/core to ensure + * all spans are reported under the parent application's service, not as a separate service. + */ +import type * as api from '@opentelemetry/api'; +import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; + +// Lazy load OpenTelemetry API to make it optional +let otelApiPromise: Promise | null = null; + +async function getOtelApi(): Promise { + if (!otelApiPromise) { + otelApiPromise = import('@opentelemetry/api').catch(() => null); + } + return otelApiPromise; +} + +let tracerPromise: Promise | null = null; + +async function getTracer(): Promise { + if (!tracerPromise) { + tracerPromise = getOtelApi().then((otel) => + otel ? otel.trace.getTracer('workflow') : null + ); + } + return tracerPromise; +} + +/** + * Wrap an async function with a trace span. + * No-op if OpenTelemetry is not available. + */ +export async function trace( + spanName: string, + ...args: + | [fn: (span?: Span) => Promise] + | [opts: SpanOptions, fn: (span?: Span) => Promise] +): Promise { + const [tracer, otel] = await Promise.all([getTracer(), getOtelApi()]); + const { fn, opts } = + typeof args[0] === 'function' + ? { fn: args[0], opts: {} } + : { fn: args[1], opts: args[0] }; + if (!fn) throw new Error('Function to trace must be provided'); + + if (!tracer || !otel) { + return await fn(); + } + + return tracer.startActiveSpan(spanName, opts, async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: otel.SpanStatusCode.OK }); + return result; + } catch (e) { + span.setStatus({ + code: otel.SpanStatusCode.ERROR, + message: (e as Error).message, + }); + throw e; + } finally { + span.end(); + } + }); +} + +/** + * Get SpanKind enum value by name. + * Returns undefined if OpenTelemetry is not available. + */ +export async function getSpanKind( + field: keyof typeof SpanKind +): Promise { + const otel = await getOtelApi(); + if (!otel) return undefined; + return otel.SpanKind[field]; +} + +// Semantic conventions for World/Storage tracing +// Standard OTEL conventions for peer service mapping +function SemanticConvention(...names: string[]) { + return (value: T) => + Object.fromEntries(names.map((name) => [name, value] as const)); +} + +/** The remote service name for Datadog service maps (Datadog-specific: peer.service) */ +export const PeerService = SemanticConvention('peer.service'); + +/** RPC system identifier (standard OTEL: rpc.system) */ +export const RpcSystem = SemanticConvention('rpc.system'); + +/** RPC service name (standard OTEL: rpc.service) */ +export const RpcService = SemanticConvention('rpc.service'); + +/** RPC method name (standard OTEL: rpc.method) */ +export const RpcMethod = SemanticConvention('rpc.method'); diff --git a/packages/world-postgres/src/storage.ts b/packages/world-postgres/src/storage.ts index a944cb9d62..b7b0240217 100644 --- a/packages/world-postgres/src/storage.ts +++ b/packages/world-postgres/src/storage.ts @@ -264,6 +264,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { .select({ status: Schema.steps.status, startedAt: Schema.steps.startedAt, + retryAfter: Schema.steps.retryAfter, }) .from(Schema.steps) .where( @@ -411,7 +412,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot transition run from terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } @@ -422,7 +423,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { ) { throw new WorkflowAPIError( `Cannot create new entities on run in terminal state "${currentRun.status}"`, - { status: 410 } + { status: 409 } ); } } @@ -430,8 +431,11 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Step-related event validation (ordering and terminal state) // Fetch status + startedAt so we can reuse for step_started (avoid double read) // Skip validation for step_completed/step_failed - use conditional UPDATE instead - let validatedStep: { status: string; startedAt: Date | null } | null = - null; + let validatedStep: { + status: string; + startedAt: Date | null; + retryAfter: Date | null; + } | null = null; const stepEventsNeedingValidation = ['step_started', 'step_retrying']; if ( stepEventsNeedingValidation.includes(data.eventType) && @@ -456,7 +460,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { if (isStepTerminal(validatedStep.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${validatedStep.status}"`, - { status: 410 } + { status: 409 } ); } @@ -643,7 +647,25 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { // Sets startedAt (maps to startedAt) only on first start // Reuse validatedStep from validation (already read above) if (data.eventType === 'step_started') { + // Check if retryAfter timestamp hasn't been reached yet + if ( + validatedStep?.retryAfter && + validatedStep.retryAfter.getTime() > Date.now() + ) { + const err = new WorkflowAPIError( + `Cannot start step "${data.correlationId}": retryAfter timestamp has not been reached yet`, + { status: 425 } + ); + // Add meta for step-handler to extract retryAfter timestamp + (err as any).meta = { + stepId: data.correlationId, + retryAfter: validatedStep.retryAfter.toISOString(), + }; + throw err; + } + const isFirstStart = !validatedStep?.startedAt; + const hadRetryAfter = !!validatedStep?.retryAfter; const [stepValue] = await drizzle .update(Schema.steps) @@ -653,6 +675,8 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { attempt: sql`${Schema.steps.attempt} + 1`, // Only set startedAt on first start (not updated on retries) ...(isFirstStart ? { startedAt: now } : {}), + // Clear retryAfter now that the step has started + ...(hadRetryAfter ? { retryAfter: null } : {}), }) .where( and( @@ -703,7 +727,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { if (['completed', 'failed'].includes(existing.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${existing.status}"`, - { status: 410 } + { status: 409 } ); } } @@ -757,7 +781,7 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] { if (['completed', 'failed'].includes(existing.status)) { throw new WorkflowAPIError( `Cannot modify step in terminal state "${existing.status}"`, - { status: 410 } + { status: 409 } ); } } diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 0700f50b48..9c4eddce14 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -50,6 +50,15 @@ const EventWithRefsSchema = z.object({ specVersion: z.number().default(1), }); +// Events where the client uses the response entity data need 'resolve' (default). +// Events where the client discards the response can use 'lazy' to skip expensive +// S3 ref resolution on the server, saving ~200-460ms per event. +const eventsNeedingResolve = new Set([ + 'run_created', // client reads result.run.runId + 'run_started', // client reads result.run (checks startedAt, status) + 'step_started', // client reads result.step (checks attempt, state) +]); + // Functions export async function getWorkflowRunEvents( params: ListEventsParams | ListEventsByCorrelationIdParams, @@ -132,10 +141,14 @@ export async function createWorkflowRunEvent( // For run_created events, runId is null - use "null" string in the URL path const runIdPath = id === null ? 'null' : id; + const remoteRefBehavior = eventsNeedingResolve.has(data.eventType) + ? 'resolve' + : 'lazy'; + const wireResult = await makeRequest({ endpoint: `/v2/runs/${runIdPath}/events`, options: { method: 'POST' }, - data, + data: { ...data, remoteRefBehavior }, config, schema: EventResultWireSchema, }); diff --git a/packages/world-vercel/src/instrumentObject.ts b/packages/world-vercel/src/instrumentObject.ts index a9813a1171..9a12a7478d 100644 --- a/packages/world-vercel/src/instrumentObject.ts +++ b/packages/world-vercel/src/instrumentObject.ts @@ -2,11 +2,39 @@ * Utility to instrument object methods with tracing. * This is a minimal version for world-vercel to avoid circular dependencies with @workflow/core. */ -import { trace } from './telemetry.js'; +import { + trace, + getSpanKind, + PeerService, + RpcSystem, + RpcService, + RpcMethod, +} from './telemetry.js'; + +/** Configuration for peer service attribution */ +const WORKFLOW_SERVER_SERVICE = { + peerService: 'workflow-server', + rpcSystem: 'http', + rpcService: 'workflow-server', +}; + +/** + * Extracts the event type from arguments for events.create calls. + * The event data is the second argument and contains eventType. + */ +function extractEventType(args: unknown[]): string | undefined { + if (args.length >= 2 && typeof args[1] === 'object' && args[1] !== null) { + const data = args[1] as Record; + if (typeof data.eventType === 'string') { + return data.eventType; + } + } + return undefined; +} /** * Wraps all methods of an object with tracing spans. - * @param prefix - Prefix for span names (e.g., "WORLD.runs") + * @param prefix - Prefix for span names (e.g., "world.runs") * @param o - Object with methods to instrument * @returns Instrumented object with same interface */ @@ -17,9 +45,34 @@ export function instrumentObject(prefix: string, o: T): T { handlers[key] = o[key]; } else { const f = o[key]; - // @ts-expect-error - handlers[key] = async (...args: any[]) => - trace(`${prefix}.${String(key)}`, {}, () => f(...args)); + const methodName = String(key); + // @ts-expect-error - dynamic function wrapping + handlers[key] = async (...args: unknown[]) => { + // Build span name - for events.create, include the event type + let spanName = `${prefix}.${methodName}`; + if (prefix === 'world.events' && methodName === 'create') { + const eventType = extractEventType(args); + if (eventType) { + spanName = `${prefix}.${methodName} ${eventType}`; + } + } + + return trace( + spanName, + { kind: await getSpanKind('CLIENT') }, + async (span) => { + // Add peer service attributes for service maps + // Use spanName for rpc.method so Datadog shows event type in resource + span?.setAttributes({ + ...PeerService(WORKFLOW_SERVER_SERVICE.peerService), + ...RpcSystem(WORKFLOW_SERVER_SERVICE.rpcSystem), + ...RpcService(WORKFLOW_SERVER_SERVICE.rpcService), + ...RpcMethod(spanName), + }); + return f(...args); + } + ); + }; } } return handlers; diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts index a9c7333722..b80b4b34ea 100644 --- a/packages/world-vercel/src/telemetry.ts +++ b/packages/world-vercel/src/telemetry.ts @@ -9,6 +9,9 @@ * NOTE: Unlike the trace() function in @workflow/core, this implementation does not * have special handling for WorkflowSuspension errors because world-vercel operates * at the HTTP layer and never encounters workflow suspension effects. + * + * IMPORTANT: This module uses the same tracer name 'workflow' as @workflow/core to ensure + * all spans are reported under the parent application's service, not as a separate service. */ import type * as api from '@opentelemetry/api'; import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; @@ -28,7 +31,7 @@ let tracerPromise: Promise | null = null; async function getTracer(): Promise { if (!tracerPromise) { tracerPromise = getOtelApi().then((otel) => - otel ? otel.trace.getTracer('workflow-world-vercel') : null + otel ? otel.trace.getTracer('workflow') : null ); } return tracerPromise; @@ -117,3 +120,18 @@ export const ErrorType = SemanticConvention('error.type'); export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( 'workflow.world.parse.format' ); + +// RPC/Peer Service attributes - For service maps and dependency tracking +// See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ + +/** The remote service name for Datadog service maps (Datadog-specific: peer.service) */ +export const PeerService = SemanticConvention('peer.service'); + +/** RPC system identifier (standard OTEL: rpc.system) */ +export const RpcSystem = SemanticConvention('rpc.system'); + +/** RPC service name (standard OTEL: rpc.service) */ +export const RpcService = SemanticConvention('rpc.service'); + +/** RPC method name (standard OTEL: rpc.method) */ +export const RpcMethod = SemanticConvention('rpc.method'); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 61ec5bca16..b8dc85de71 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -15,6 +15,9 @@ import { ServerPort, ErrorType, WorldParseFormat, + PeerService, + RpcSystem, + RpcService, } from './telemetry.js'; import { version } from './version.js'; @@ -244,6 +247,10 @@ export async function makeRequest({ ...UrlFull(url), ...(serverAddress && ServerAddress(serverAddress)), ...(serverPort && ServerPort(serverPort)), + // Peer service for Datadog service maps + ...PeerService('workflow-server'), + ...RpcSystem('http'), + ...RpcService('workflow-server'), }); headers.set('Accept', 'application/cbor');