From 8ab0bd65b60a579ae260f743c47322e80b2b01ee Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 09:41:23 -0800 Subject: [PATCH 1/5] Add improved OpenTelemetry instrumentation for workflow observability - Add tracing to HTTP requests in world-vercel with method, endpoint, and status attributes - Add tracing to storage operations (runs, steps, events, hooks) - Add tracing to event loading with event count and pages loaded attributes - Add queue timing breakdown attributes (deserialize, execution, serialize times) - Add new semantic conventions for world/storage, events, serialization, and queue breakdown Co-Authored-By: Claude Opus 4.5 --- .changeset/improve-otel-tracing.md | 12 ++ packages/core/src/runtime/helpers.ts | 53 +++--- packages/core/src/runtime/step-handler.ts | 18 ++ packages/core/src/telemetry.ts | 6 + .../src/telemetry/semantic-conventions.ts | 60 +++++++ packages/world-vercel/package.json | 9 + packages/world-vercel/src/instrumentObject.ts | 26 +++ packages/world-vercel/src/storage.ts | 11 +- packages/world-vercel/src/telemetry.ts | 102 ++++++++++++ packages/world-vercel/src/utils.ts | 155 +++++++++++------- pnpm-lock.yaml | 3 + 11 files changed, 375 insertions(+), 80 deletions(-) create mode 100644 .changeset/improve-otel-tracing.md create mode 100644 packages/world-vercel/src/instrumentObject.ts create mode 100644 packages/world-vercel/src/telemetry.ts diff --git a/.changeset/improve-otel-tracing.md b/.changeset/improve-otel-tracing.md new file mode 100644 index 0000000000..17f8edf63e --- /dev/null +++ b/.changeset/improve-otel-tracing.md @@ -0,0 +1,12 @@ +--- +"@workflow/core": patch +"@workflow/world-vercel": patch +--- + +Add improved OpenTelemetry instrumentation for better workflow observability: + +- Add tracing to HTTP requests in world-vercel with method, endpoint, and status attributes +- Add tracing to storage operations (runs, steps, events, hooks) +- Add tracing to event loading with event count and pages loaded attributes +- Add queue timing breakdown attributes (deserialize, execution, serialize times) to step handler +- Add new semantic conventions for world/storage, events, serialization, and queue breakdown diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index ff505a7a06..4b3dc1f89a 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -249,29 +249,42 @@ export async function healthCheck( * Events must be in chronological order (ascending) for proper workflow replay. */ export async function getAllWorkflowRunEvents(runId: string): Promise { - const allEvents: Event[] = []; - let cursor: string | null = null; - let hasMore = true; - - const world = getWorld(); - while (hasMore) { - // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this - // to lazyload the data from the world instead so that we can optimize and make the event log loading - // much faster and memory efficient - const response = await world.events.list({ - runId, - pagination: { - sortOrder: 'asc', // Required: events must be in chronological order for replay - cursor: cursor ?? undefined, - }, + return trace('WORKFLOW.loadEvents', async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowRunId(runId), }); - allEvents.push(...response.data); - hasMore = response.hasMore; - cursor = response.cursor; - } + const allEvents: Event[] = []; + let cursor: string | null = null; + let hasMore = true; + let pagesLoaded = 0; + + const world = getWorld(); + while (hasMore) { + // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this + // to lazyload the data from the world instead so that we can optimize and make the event log loading + // much faster and memory efficient + const response = await world.events.list({ + runId, + pagination: { + sortOrder: 'asc', // Required: events must be in chronological order for replay + cursor: cursor ?? undefined, + }, + }); + + allEvents.push(...response.data); + hasMore = response.hasMore; + cursor = response.cursor; + pagesLoaded++; + } - return allEvents; + span?.setAttributes({ + ...Attribute.WorkflowEventsCount(allEvents.length), + ...Attribute.WorkflowEventsPagesLoaded(pagesLoaded), + }); + + return allEvents; + }); } /** diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 1864d0e0b4..d9dd8a72f9 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -232,20 +232,26 @@ const stepHandler = getWorldHandlers().createQueueHandler( ); } // Hydrate the step input arguments, closure variables, and thisVal + // Track deserialization time for observability + const deserializeStartTime = Date.now(); const ops: Promise[] = []; const hydratedInput = hydrateStepArguments( step.input, ops, workflowRunId ); + const deserializeTimeMs = Date.now() - deserializeStartTime; const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; span?.setAttributes({ ...Attribute.StepArgumentsCount(args.length), + ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), }); + // Track execution time for observability + const executionStartTime = Date.now(); result = await contextStorage.run( { stepMetadata: { @@ -267,11 +273,23 @@ const stepHandler = getWorldHandlers().createQueueHandler( }, () => stepFn.apply(thisVal, args) ); + const executionTimeMs = Date.now() - executionStartTime; + + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); // NOTE: None of the code from this point is guaranteed to run // Since the step might fail or cause a function timeout and the process might be SIGKILL'd // The workflow runtime must be resilient to the below code not executing on a failed step + // Track serialization time for observability + const serializeStartTime = Date.now(); result = dehydrateStepReturnValue(result, ops, workflowRunId); + const serializeTimeMs = Date.now() - serializeStartTime; + + span?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(serializeTimeMs), + }); waitUntil( Promise.all(ops).catch((err) => { diff --git a/packages/core/src/telemetry.ts b/packages/core/src/telemetry.ts index aef6dfeb60..17b3d6b86e 100644 --- a/packages/core/src/telemetry.ts +++ b/packages/core/src/telemetry.ts @@ -147,6 +147,12 @@ export async function getActiveSpan() { return await withOtel((otel) => otel.trace.getActiveSpan()); } +/** + * Wraps all methods of an object with tracing spans. + * @param prefix - Prefix for span names (e.g., "WORLD.runs") + * @param o - Object with methods to instrument + * @returns Instrumented object with same interface + */ export function instrumentObject(prefix: string, o: T): T { const handlers = {} as T; for (const key of Object.keys(o) as (keyof T)[]) { diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index c996ced0ea..77bd26c20f 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -230,3 +230,63 @@ export const WorkflowSuspensionStepCount = SemanticConvention( export const WorkflowSuspensionWaitCount = SemanticConvention( 'workflow.suspension.wait_count' ); + +// World/Storage attributes + +/** HTTP method used in World storage request */ +export const WorldHttpMethod = SemanticConvention('world.http.method'); + +/** API endpoint path for World storage request */ +export const WorldHttpEndpoint = SemanticConvention( + 'world.http.endpoint' +); + +/** HTTP status code from World storage request */ +export const WorldHttpStatus = SemanticConvention('world.http.status'); + +/** Format used for parsing response body (cbor or json) */ +export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( + 'world.parse.format' +); + +/** Size in bytes of the parsed response body */ +export const WorldParseBytes = SemanticConvention('world.parse.bytes'); + +// Event loading attributes + +/** Number of pagination pages loaded when fetching workflow events */ +export const WorkflowEventsPagesLoaded = SemanticConvention( + 'workflow.events.pages_loaded' +); + +// Serialization attributes + +/** Format used for serialization (e.g., 'devalue') */ +export const SerializeFormat = SemanticConvention('serialize.format'); + +/** Size in bytes of the serialized data */ +export const SerializeBytes = SemanticConvention('serialize.bytes'); + +/** Format used for deserialization (e.g., 'devalue') */ +export const DeserializeFormat = + SemanticConvention('deserialize.format'); + +/** Size in bytes of the data being deserialized */ +export const DeserializeBytes = SemanticConvention('deserialize.bytes'); + +// Queue timing breakdown attributes + +/** Time spent deserializing the queue message in milliseconds */ +export const QueueDeserializeTimeMs = SemanticConvention( + 'queue.deserialize_time_ms' +); + +/** Time spent executing the handler logic in milliseconds */ +export const QueueExecutionTimeMs = SemanticConvention( + 'queue.execution_time_ms' +); + +/** Time spent serializing the response in milliseconds */ +export const QueueSerializeTimeMs = SemanticConvention( + 'queue.serialize_time_ms' +); diff --git a/packages/world-vercel/package.json b/packages/world-vercel/package.json index 7630be2752..3598bf7b9e 100644 --- a/packages/world-vercel/package.json +++ b/packages/world-vercel/package.json @@ -37,7 +37,16 @@ "cbor-x": "1.6.0", "zod": "catalog:" }, + "peerDependencies": { + "@opentelemetry/api": "1" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } + }, "devDependencies": { + "@opentelemetry/api": "1.9.0", "@types/node": "catalog:", "@workflow/tsconfig": "workspace:*", "genversion": "3.2.0", diff --git a/packages/world-vercel/src/instrumentObject.ts b/packages/world-vercel/src/instrumentObject.ts new file mode 100644 index 0000000000..a9813a1171 --- /dev/null +++ b/packages/world-vercel/src/instrumentObject.ts @@ -0,0 +1,26 @@ +/** + * Utility to instrument object methods with tracing. + * This is a minimal version for world-vercel to avoid circular dependencies with @workflow/core. + */ +import { trace } from './telemetry.js'; + +/** + * Wraps all methods of an object with tracing spans. + * @param prefix - Prefix for span names (e.g., "WORLD.runs") + * @param o - Object with methods to instrument + * @returns Instrumented object with same interface + */ +export function instrumentObject(prefix: string, o: T): T { + const handlers = {} as T; + for (const key of Object.keys(o) as (keyof T)[]) { + if (typeof o[key] !== 'function') { + handlers[key] = o[key]; + } else { + const f = o[key]; + // @ts-expect-error + handlers[key] = async (...args: any[]) => + trace(`${prefix}.${String(key)}`, {}, () => f(...args)); + } + } + return handlers; +} diff --git a/packages/world-vercel/src/storage.ts b/packages/world-vercel/src/storage.ts index d6880a2390..665e495d95 100644 --- a/packages/world-vercel/src/storage.ts +++ b/packages/world-vercel/src/storage.ts @@ -1,12 +1,13 @@ import type { Storage } from '@workflow/world'; import { createWorkflowRunEvent, getWorkflowRunEvents } from './events.js'; import { getHook, getHookByToken, listHooks } from './hooks.js'; +import { instrumentObject } from './instrumentObject.js'; import { getWorkflowRun, listWorkflowRuns } from './runs.js'; import { getStep, listWorkflowRunSteps } from './steps.js'; import type { APIConfig } from './utils.js'; export function createStorage(config?: APIConfig): Storage { - return { + const storage: Storage = { // Storage interface with namespaced methods runs: { get: ((id: string, params?: any) => @@ -32,4 +33,12 @@ export function createStorage(config?: APIConfig): Storage { list: (params) => listHooks(params, config), }, }; + + // Instrument all storage methods with tracing + return { + runs: instrumentObject('WORLD.runs', storage.runs), + steps: instrumentObject('WORLD.steps', storage.steps), + events: instrumentObject('WORLD.events', storage.events), + hooks: instrumentObject('WORLD.hooks', storage.hooks), + }; } diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts new file mode 100644 index 0000000000..63424b5232 --- /dev/null +++ b/packages/world-vercel/src/telemetry.ts @@ -0,0 +1,102 @@ +/** + * Minimal telemetry utilities for world-vercel package. + * This is a simplified version that doesn't depend on @workflow/core to avoid circular dependencies. + */ +import type * as api from '@opentelemetry/api'; +import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; + +// Lazy load OpenTelemetry API to make it optional +let otelApiPromise: Promise | null = null; + +async function getOtelApi(): Promise { + if (!otelApiPromise) { + otelApiPromise = import('@opentelemetry/api').catch(() => null); + } + return otelApiPromise; +} + +let tracerPromise: Promise | null = null; + +async function getTracer(): Promise { + if (!tracerPromise) { + tracerPromise = getOtelApi().then((otel) => + otel ? otel.trace.getTracer('workflow-world-vercel') : null + ); + } + return tracerPromise; +} + +/** + * Wrap an async function with a trace span. + * No-op if OpenTelemetry is not available. + */ +export async function trace( + spanName: string, + ...args: + | [fn: (span?: Span) => Promise] + | [opts: SpanOptions, fn: (span?: Span) => Promise] +): Promise { + const [tracer, otel] = await Promise.all([getTracer(), getOtelApi()]); + const { fn, opts } = + typeof args[0] === 'function' + ? { fn: args[0], opts: {} } + : { fn: args[1], opts: args[0] }; + if (!fn) throw new Error('Function to trace must be provided'); + + if (!tracer || !otel) { + return await fn(); + } + + return tracer.startActiveSpan(spanName, opts, async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: otel.SpanStatusCode.OK }); + return result; + } catch (e) { + span.setStatus({ + code: otel.SpanStatusCode.ERROR, + message: (e as Error).message, + }); + throw e; + } finally { + span.end(); + } + }); +} + +/** + * Get SpanKind enum value by name. + * Returns undefined if OpenTelemetry is not available. + */ +export async function getSpanKind( + field: keyof typeof SpanKind +): Promise { + const otel = await getOtelApi(); + if (!otel) return undefined; + return otel.SpanKind[field]; +} + +// Semantic conventions for World/Storage tracing +function SemanticConvention(...names: string[]) { + return (value: T) => + Object.fromEntries(names.map((name) => [name, value] as const)); +} + +/** HTTP method used in World storage request */ +export const WorldHttpMethod = SemanticConvention('world.http.method'); + +/** API endpoint path for World storage request */ +export const WorldHttpEndpoint = SemanticConvention( + 'world.http.endpoint' +); + +/** HTTP status code from World storage request */ +export const WorldHttpStatus = SemanticConvention('world.http.status'); + +/** Format used for parsing response body (cbor or json) */ +export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( + 'world.parse.format' +); + +/** Size in bytes of the parsed response body */ +export const WorldParseBytes = SemanticConvention('world.parse.bytes'); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index c9d09e6842..7c951f1b57 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -5,6 +5,14 @@ import { WorkflowAPIError } from '@workflow/errors'; import { type StructuredError, StructuredErrorSchema } from '@workflow/world'; import { decode, encode } from 'cbor-x'; import type { z } from 'zod'; +import { + trace, + getSpanKind, + WorldHttpMethod, + WorldHttpEndpoint, + WorldHttpStatus, + WorldParseFormat, +} from './telemetry.js'; import { version } from './version.js'; /** @@ -202,69 +210,98 @@ export async function makeRequest({ /** Request body data - will be CBOR encoded */ data?: unknown; }): Promise { - const { baseUrl, headers } = await getHttpConfig(config); - headers.set('Accept', 'application/cbor'); - // NOTE: Add a unique header to bypass RSC request memoization. - // See: https://github.com/vercel/workflow/issues/618 - headers.set('X-Request-Time', Date.now().toString()); - - // Encode body as CBOR if data is provided - let body: Buffer | undefined; - if (data !== undefined) { - headers.set('Content-Type', 'application/cbor'); - body = encode(data); - } + const method = options.method || 'GET'; - const url = `${baseUrl}${endpoint}`; - const request = new Request(url, { - ...options, - body, - headers, - }); - const response = await fetch(request); - - if (!response.ok) { - const errorData: { message?: string; code?: string } = - await parseResponseBody(response) - .then((r) => r.data as { message?: string; code?: string }) - .catch(() => ({})); - if (process.env.DEBUG === '1') { - const stringifiedHeaders = Array.from(headers.entries()) - .map(([key, value]: [string, string]) => `-H "${key}: ${value}"`) - .join(' '); - console.error( - `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` - ); - } - throw new WorkflowAPIError( - errorData.message || - `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, - { url, status: response.status, code: errorData.code } - ); - } + return trace( + `WORLD.http ${method} ${endpoint}`, + { kind: await getSpanKind('CLIENT') }, + async (span) => { + span?.setAttributes({ + ...WorldHttpMethod(method), + ...WorldHttpEndpoint(endpoint), + }); - // Parse the response body (CBOR or JSON) - let parseResult: ParseResult; - try { - parseResult = await parseResponseBody(response); - } catch (error) { - const contentType = response.headers.get('Content-Type') || 'unknown'; - throw new WorkflowAPIError( - `Failed to parse response body for ${request.method} ${endpoint} (Content-Type: ${contentType}):\n\n${error}`, - { url, cause: error } - ); - } + const { baseUrl, headers } = await getHttpConfig(config); + headers.set('Accept', 'application/cbor'); + // NOTE: Add a unique header to bypass RSC request memoization. + // See: https://github.com/vercel/workflow/issues/618 + headers.set('X-Request-Time', Date.now().toString()); - // Validate against the schema - const result = schema.safeParse(parseResult.data); - if (!result.success) { - throw new WorkflowAPIError( - `Schema validation failed for ${request.method} ${endpoint}:\n\n${result.error}\n\nResponse context: ${parseResult.getDebugContext()}`, - { url, cause: result.error } - ); - } + // Encode body as CBOR if data is provided + let body: Buffer | undefined; + if (data !== undefined) { + headers.set('Content-Type', 'application/cbor'); + body = encode(data); + } + + const url = `${baseUrl}${endpoint}`; + const request = new Request(url, { + ...options, + body, + headers, + }); + const response = await fetch(request); + + span?.setAttributes({ + ...WorldHttpStatus(response.status), + }); + + if (!response.ok) { + const errorData: { message?: string; code?: string } = + await parseResponseBody(response) + .then((r) => r.data as { message?: string; code?: string }) + .catch(() => ({})); + if (process.env.DEBUG === '1') { + const stringifiedHeaders = Array.from(headers.entries()) + .map(([key, value]: [string, string]) => `-H "${key}: ${value}"`) + .join(' '); + console.error( + `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` + ); + } + throw new WorkflowAPIError( + errorData.message || + `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, + { url, status: response.status, code: errorData.code } + ); + } - return result.data; + // Parse the response body (CBOR or JSON) with tracing + let parseResult: ParseResult; + try { + parseResult = await trace('WORLD.parse', async (parseSpan) => { + const result = await parseResponseBody(response); + // Extract format and size from debug context for attributes + const contentType = response.headers.get('Content-Type') || ''; + const isCbor = contentType.includes('application/cbor'); + parseSpan?.setAttributes({ + ...WorldParseFormat(isCbor ? 'cbor' : 'json'), + }); + return result; + }); + } catch (error) { + const contentType = response.headers.get('Content-Type') || 'unknown'; + throw new WorkflowAPIError( + `Failed to parse response body for ${request.method} ${endpoint} (Content-Type: ${contentType}):\n\n${error}`, + { url, cause: error } + ); + } + + // Validate against the schema with tracing + const result = await trace('WORLD.validate', async () => { + const validationResult = schema.safeParse(parseResult.data); + if (!validationResult.success) { + throw new WorkflowAPIError( + `Schema validation failed for ${request.method} ${endpoint}:\n\n${validationResult.error}\n\nResponse context: ${parseResult.getDebugContext()}`, + { url, cause: validationResult.error } + ); + } + return validationResult.data; + }); + + return result; + } + ); } interface ParseResult { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 19e030a252..5c08cfee20 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1361,6 +1361,9 @@ importers: specifier: 'catalog:' version: 4.1.11 devDependencies: + '@opentelemetry/api': + specifier: 1.9.0 + version: 1.9.0 '@types/node': specifier: 'catalog:' version: 22.19.0 From 1e67403cf914dac384e07720e29d85211bf46fa3 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 09:44:34 -0800 Subject: [PATCH 2/5] Split changeset into separate entries for core and world-vercel Co-Authored-By: Claude Opus 4.5 --- .changeset/core-otel-improvements.md | 5 +++++ .changeset/improve-otel-tracing.md | 12 ------------ .changeset/world-vercel-otel-improvements.md | 5 +++++ 3 files changed, 10 insertions(+), 12 deletions(-) create mode 100644 .changeset/core-otel-improvements.md delete mode 100644 .changeset/improve-otel-tracing.md create mode 100644 .changeset/world-vercel-otel-improvements.md diff --git a/.changeset/core-otel-improvements.md b/.changeset/core-otel-improvements.md new file mode 100644 index 0000000000..5b0d5b2e20 --- /dev/null +++ b/.changeset/core-otel-improvements.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Add OTEL tracing for event loading and queue timing breakdown diff --git a/.changeset/improve-otel-tracing.md b/.changeset/improve-otel-tracing.md deleted file mode 100644 index 17f8edf63e..0000000000 --- a/.changeset/improve-otel-tracing.md +++ /dev/null @@ -1,12 +0,0 @@ ---- -"@workflow/core": patch -"@workflow/world-vercel": patch ---- - -Add improved OpenTelemetry instrumentation for better workflow observability: - -- Add tracing to HTTP requests in world-vercel with method, endpoint, and status attributes -- Add tracing to storage operations (runs, steps, events, hooks) -- Add tracing to event loading with event count and pages loaded attributes -- Add queue timing breakdown attributes (deserialize, execution, serialize times) to step handler -- Add new semantic conventions for world/storage, events, serialization, and queue breakdown diff --git a/.changeset/world-vercel-otel-improvements.md b/.changeset/world-vercel-otel-improvements.md new file mode 100644 index 0000000000..8963e268f8 --- /dev/null +++ b/.changeset/world-vercel-otel-improvements.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Add OTEL tracing for HTTP requests and storage operations From 178fb1f1f489b3c9d681964bfd6cf8c05015e1ec Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 10:06:21 -0800 Subject: [PATCH 3/5] Update OTEL instrumentation to use standard semantic conventions - Replace custom world.http.* attributes with standard OTEL HTTP conventions (http.request.method, http.response.status_code, url.full, server.address, server.port) - Replace custom queue.* attributes with standard OTEL messaging conventions (messaging.system, messaging.destination.name, messaging.message.id, messaging.operation.type) - Add error.type attribute for HTTP error spans - Add span.recordException() for proper error recording - Namespace workflow-specific attributes under workflow.* prefix - Fix span naming: use "HTTP {method}" instead of "WORLD.http {method} {endpoint}" Co-Authored-By: Claude Opus 4.5 --- .changeset/core-otel-improvements.md | 2 +- .changeset/world-vercel-otel-improvements.md | 2 +- packages/core/src/runtime.ts | 7 +- packages/core/src/runtime/helpers.ts | 9 ++- packages/core/src/runtime/step-handler.ts | 7 +- .../src/telemetry/semantic-conventions.ts | 74 +++++++++++++------ packages/world-vercel/src/telemetry.ts | 32 +++++--- packages/world-vercel/src/utils.ts | 49 +++++++++--- 8 files changed, 132 insertions(+), 50 deletions(-) diff --git a/.changeset/core-otel-improvements.md b/.changeset/core-otel-improvements.md index 5b0d5b2e20..fc23eb59a7 100644 --- a/.changeset/core-otel-improvements.md +++ b/.changeset/core-otel-improvements.md @@ -2,4 +2,4 @@ "@workflow/core": patch --- -Add OTEL tracing for event loading and queue timing breakdown +Add OTEL tracing for event loading and queue timing breakdown using standard OTEL semantic conventions diff --git a/.changeset/world-vercel-otel-improvements.md b/.changeset/world-vercel-otel-improvements.md index 8963e268f8..de5890d33c 100644 --- a/.changeset/world-vercel-otel-improvements.md +++ b/.changeset/world-vercel-otel-improvements.md @@ -2,4 +2,4 @@ "@workflow/world-vercel": patch --- -Add OTEL tracing for HTTP requests and storage operations +Add OTEL tracing for HTTP requests and storage operations using standard OTEL semantic conventions diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 8416a346df..d22406dd25 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -94,8 +94,11 @@ export function workflowEntrypoint( span?.setAttributes({ ...Attribute.WorkflowName(workflowName), ...Attribute.WorkflowOperation('execute'), - ...Attribute.QueueName(metadata.queueName), - ...Attribute.QueueMessageId(metadata.messageId), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), ...getQueueOverhead({ requestedAt }), }); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 4b3dc1f89a..f8c79cb58c 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -341,12 +341,17 @@ export async function queueMessage( await trace( 'queueMessage', { - attributes: Attribute.QueueName(queueName), + // Standard OTEL messaging conventions + attributes: { + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(queueName), + ...Attribute.MessagingOperationType('publish'), + }, kind: await getSpanKind('PRODUCER'), }, async (span) => { const { messageId } = await world.queue(...args); - span?.setAttributes(Attribute.QueueMessageId(messageId)); + span?.setAttributes(Attribute.MessagingMessageId(messageId)); } ); } diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index d9dd8a72f9..d1d03f2e28 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -73,8 +73,11 @@ const stepHandler = getWorldHandlers().createQueueHandler( span?.setAttributes({ ...Attribute.StepName(stepName), ...Attribute.StepAttempt(metadata.attempt), - ...Attribute.QueueName(metadata.queueName), - ...Attribute.QueueMessageId(metadata.messageId), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), ...getQueueOverhead({ requestedAt }), }); diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index 77bd26c20f..62ecd35830 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -179,19 +179,31 @@ export const StepRetryWillRetry = SemanticConvention( 'step.retry.will_retry' ); -// Queue attributes +// Queue/Messaging attributes - Standard OTEL messaging conventions +// See: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/ -/** Name of the queue being used for message processing */ -export const QueueName = SemanticConvention('queue.name'); +/** Messaging system identifier (standard OTEL: messaging.system) */ +export const MessagingSystem = SemanticConvention('messaging.system'); -/** The message id being handled */ -export const QueueMessageId = SemanticConvention( - 'messaging.message.id', - 'queue.message.id' +/** Destination name/queue name (standard OTEL: messaging.destination.name) */ +export const MessagingDestinationName = SemanticConvention( + 'messaging.destination.name' ); -/** Time taken to enqueue the message in milliseconds */ -export const QueueOverheadMs = SemanticConvention('queue.overhead_ms'); +/** The message id being handled (standard OTEL: messaging.message.id) */ +export const MessagingMessageId = SemanticConvention( + 'messaging.message.id' +); + +/** Operation type (standard OTEL: messaging.operation.type) */ +export const MessagingOperationType = SemanticConvention< + 'publish' | 'receive' | 'process' +>('messaging.operation.type'); + +/** Time taken to enqueue the message in milliseconds (workflow-specific) */ +export const QueueOverheadMs = SemanticConvention( + 'workflow.queue.overhead_ms' +); // Deployment attributes @@ -231,26 +243,42 @@ export const WorkflowSuspensionWaitCount = SemanticConvention( 'workflow.suspension.wait_count' ); -// World/Storage attributes +// World/Storage attributes - Standard OTEL HTTP conventions +// See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/ + +/** HTTP request method (standard OTEL: http.request.method) */ +export const HttpRequestMethod = SemanticConvention( + 'http.request.method' +); + +/** Full URL of the request (standard OTEL: url.full) */ +export const UrlFull = SemanticConvention('url.full'); + +/** Server hostname (standard OTEL: server.address) */ +export const ServerAddress = SemanticConvention('server.address'); -/** HTTP method used in World storage request */ -export const WorldHttpMethod = SemanticConvention('world.http.method'); +/** Server port (standard OTEL: server.port) */ +export const ServerPort = SemanticConvention('server.port'); -/** API endpoint path for World storage request */ -export const WorldHttpEndpoint = SemanticConvention( - 'world.http.endpoint' +/** HTTP response status code (standard OTEL: http.response.status_code) */ +export const HttpResponseStatusCode = SemanticConvention( + 'http.response.status_code' ); -/** HTTP status code from World storage request */ -export const WorldHttpStatus = SemanticConvention('world.http.status'); +/** Error type when request fails (standard OTEL: error.type) */ +export const ErrorType = SemanticConvention('error.type'); + +// World-specific custom attributes (for workflow-specific context) /** Format used for parsing response body (cbor or json) */ export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( - 'world.parse.format' + 'workflow.world.parse.format' ); /** Size in bytes of the parsed response body */ -export const WorldParseBytes = SemanticConvention('world.parse.bytes'); +export const WorldParseBytes = SemanticConvention( + 'workflow.world.parse.bytes' +); // Event loading attributes @@ -274,19 +302,19 @@ export const DeserializeFormat = /** Size in bytes of the data being deserialized */ export const DeserializeBytes = SemanticConvention('deserialize.bytes'); -// Queue timing breakdown attributes +// Queue timing breakdown attributes (workflow-specific) /** Time spent deserializing the queue message in milliseconds */ export const QueueDeserializeTimeMs = SemanticConvention( - 'queue.deserialize_time_ms' + 'workflow.queue.deserialize_time_ms' ); /** Time spent executing the handler logic in milliseconds */ export const QueueExecutionTimeMs = SemanticConvention( - 'queue.execution_time_ms' + 'workflow.queue.execution_time_ms' ); /** Time spent serializing the response in milliseconds */ export const QueueSerializeTimeMs = SemanticConvention( - 'queue.serialize_time_ms' + 'workflow.queue.serialize_time_ms' ); diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts index 63424b5232..89a4992771 100644 --- a/packages/world-vercel/src/telemetry.ts +++ b/packages/world-vercel/src/telemetry.ts @@ -77,26 +77,40 @@ export async function getSpanKind( } // Semantic conventions for World/Storage tracing +// Standard OTEL conventions: https://opentelemetry.io/docs/specs/semconv/http/http-spans/ function SemanticConvention(...names: string[]) { return (value: T) => Object.fromEntries(names.map((name) => [name, value] as const)); } -/** HTTP method used in World storage request */ -export const WorldHttpMethod = SemanticConvention('world.http.method'); +/** HTTP request method (standard OTEL: http.request.method) */ +export const HttpRequestMethod = SemanticConvention( + 'http.request.method' +); + +/** Full URL of the request (standard OTEL: url.full) */ +export const UrlFull = SemanticConvention('url.full'); + +/** Server hostname (standard OTEL: server.address) */ +export const ServerAddress = SemanticConvention('server.address'); -/** API endpoint path for World storage request */ -export const WorldHttpEndpoint = SemanticConvention( - 'world.http.endpoint' +/** Server port (standard OTEL: server.port) */ +export const ServerPort = SemanticConvention('server.port'); + +/** HTTP response status code (standard OTEL: http.response.status_code) */ +export const HttpResponseStatusCode = SemanticConvention( + 'http.response.status_code' ); -/** HTTP status code from World storage request */ -export const WorldHttpStatus = SemanticConvention('world.http.status'); +/** Error type when request fails (standard OTEL: error.type) */ +export const ErrorType = SemanticConvention('error.type'); /** Format used for parsing response body (cbor or json) */ export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( - 'world.parse.format' + 'workflow.world.parse.format' ); /** Size in bytes of the parsed response body */ -export const WorldParseBytes = SemanticConvention('world.parse.bytes'); +export const WorldParseBytes = SemanticConvention( + 'workflow.world.parse.bytes' +); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 7c951f1b57..e9d0e8814b 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -8,9 +8,12 @@ import type { z } from 'zod'; import { trace, getSpanKind, - WorldHttpMethod, - WorldHttpEndpoint, - WorldHttpStatus, + HttpRequestMethod, + HttpResponseStatusCode, + UrlFull, + ServerAddress, + ServerPort, + ErrorType, WorldParseFormat, } from './telemetry.js'; import { version } from './version.js'; @@ -211,17 +214,38 @@ export async function makeRequest({ data?: unknown; }): Promise { const method = options.method || 'GET'; + const { baseUrl, headers } = await getHttpConfig(config); + const url = `${baseUrl}${endpoint}`; + + // Parse server address and port from URL for OTEL attributes + let serverAddress: string | undefined; + let serverPort: number | undefined; + try { + const parsedUrl = new URL(url); + serverAddress = parsedUrl.hostname; + serverPort = parsedUrl.port + ? parseInt(parsedUrl.port, 10) + : parsedUrl.protocol === 'https:' + ? 443 + : 80; + } catch { + // URL parsing failed, skip these attributes + } + // Standard OTEL span name for HTTP client: "{method}" + // See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name return trace( - `WORLD.http ${method} ${endpoint}`, + `HTTP ${method}`, { kind: await getSpanKind('CLIENT') }, async (span) => { + // Set standard OTEL HTTP client attributes span?.setAttributes({ - ...WorldHttpMethod(method), - ...WorldHttpEndpoint(endpoint), + ...HttpRequestMethod(method), + ...UrlFull(url), + ...(serverAddress && ServerAddress(serverAddress)), + ...(serverPort && ServerPort(serverPort)), }); - const { baseUrl, headers } = await getHttpConfig(config); headers.set('Accept', 'application/cbor'); // NOTE: Add a unique header to bypass RSC request memoization. // See: https://github.com/vercel/workflow/issues/618 @@ -234,7 +258,6 @@ export async function makeRequest({ body = encode(data); } - const url = `${baseUrl}${endpoint}`; const request = new Request(url, { ...options, body, @@ -243,7 +266,7 @@ export async function makeRequest({ const response = await fetch(request); span?.setAttributes({ - ...WorldHttpStatus(response.status), + ...HttpResponseStatusCode(response.status), }); if (!response.ok) { @@ -259,11 +282,17 @@ export async function makeRequest({ `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` ); } - throw new WorkflowAPIError( + const error = new WorkflowAPIError( errorData.message || `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, { url, status: response.status, code: errorData.code } ); + // Record error attributes per OTEL conventions + span?.setAttributes({ + ...ErrorType(errorData.code || `HTTP ${response.status}`), + }); + span?.recordException?.(error); + throw error; } // Parse the response body (CBOR or JSON) with tracing From 657ac4c4f2adb83f81ee2e55e4bbc8bdf4d77168 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 10:08:10 -0800 Subject: [PATCH 4/5] Address PR review comments - Remove unused semantic conventions (SerializeFormat, SerializeBytes, DeserializeFormat, DeserializeBytes, WorldParseBytes) - Add documentation explaining intentional duplication in world-vercel/telemetry.ts - Add documentation explaining why WorkflowSuspension isn't handled in world-vercel - Add comment about deserialize timing measurement limitation Co-Authored-By: Claude Opus 4.5 --- packages/core/src/runtime/step-handler.ts | 3 +++ .../src/telemetry/semantic-conventions.ts | 20 ------------------- packages/world-vercel/src/telemetry.ts | 15 ++++++++------ 3 files changed, 12 insertions(+), 26 deletions(-) diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index d1d03f2e28..a208ac4497 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -236,6 +236,9 @@ const stepHandler = getWorldHandlers().createQueueHandler( } // Hydrate the step input arguments, closure variables, and thisVal // Track deserialization time for observability + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. const deserializeStartTime = Date.now(); const ops: Promise[] = []; const hydratedInput = hydrateStepArguments( diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index 62ecd35830..b9ca1017e5 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -275,11 +275,6 @@ export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( 'workflow.world.parse.format' ); -/** Size in bytes of the parsed response body */ -export const WorldParseBytes = SemanticConvention( - 'workflow.world.parse.bytes' -); - // Event loading attributes /** Number of pagination pages loaded when fetching workflow events */ @@ -287,21 +282,6 @@ export const WorkflowEventsPagesLoaded = SemanticConvention( 'workflow.events.pages_loaded' ); -// Serialization attributes - -/** Format used for serialization (e.g., 'devalue') */ -export const SerializeFormat = SemanticConvention('serialize.format'); - -/** Size in bytes of the serialized data */ -export const SerializeBytes = SemanticConvention('serialize.bytes'); - -/** Format used for deserialization (e.g., 'devalue') */ -export const DeserializeFormat = - SemanticConvention('deserialize.format'); - -/** Size in bytes of the data being deserialized */ -export const DeserializeBytes = SemanticConvention('deserialize.bytes'); - // Queue timing breakdown attributes (workflow-specific) /** Time spent deserializing the queue message in milliseconds */ diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts index 89a4992771..a9c7333722 100644 --- a/packages/world-vercel/src/telemetry.ts +++ b/packages/world-vercel/src/telemetry.ts @@ -1,6 +1,14 @@ /** * Minimal telemetry utilities for world-vercel package. - * This is a simplified version that doesn't depend on @workflow/core to avoid circular dependencies. + * + * NOTE: This module intentionally duplicates semantic conventions from @workflow/core + * to avoid a circular dependency (world-vercel cannot depend on core). + * If you update conventions here, ensure @workflow/core/telemetry/semantic-conventions.ts + * remains synchronized. + * + * NOTE: Unlike the trace() function in @workflow/core, this implementation does not + * have special handling for WorkflowSuspension errors because world-vercel operates + * at the HTTP layer and never encounters workflow suspension effects. */ import type * as api from '@opentelemetry/api'; import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; @@ -109,8 +117,3 @@ export const ErrorType = SemanticConvention('error.type'); export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( 'workflow.world.parse.format' ); - -/** Size in bytes of the parsed response body */ -export const WorldParseBytes = SemanticConvention( - 'workflow.world.parse.bytes' -); From b142e22a8329ba27fc37a7e429076bd1c897997c Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Wed, 4 Feb 2026 15:42:11 -0800 Subject: [PATCH 5/5] Use lowercase span names per OTEL semantic conventions - world.runs, world.steps, world.events, world.hooks (storage) - world.parse, world.validate (HTTP parsing) - http {method} (HTTP client spans) - workflow.run, workflow.start, workflow.loadEvents - step {name}, hook.resume Co-Authored-By: Claude Opus 4.5 --- packages/core/src/runtime/helpers.ts | 2 +- packages/core/src/runtime/resume-hook.ts | 2 +- packages/core/src/runtime/start.ts | 2 +- packages/core/src/runtime/step-handler.ts | 2 +- packages/core/src/workflow.ts | 2 +- packages/world-vercel/src/storage.ts | 9 +++++---- packages/world-vercel/src/utils.ts | 6 +++--- 7 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index f8c79cb58c..0c1e361330 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -249,7 +249,7 @@ export async function healthCheck( * Events must be in chronological order (ascending) for proper workflow replay. */ export async function getAllWorkflowRunEvents(runId: string): Promise { - return trace('WORKFLOW.loadEvents', async (span) => { + return trace('workflow.loadEvents', async (span) => { span?.setAttributes({ ...Attribute.WorkflowRunId(runId), }); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 339b1edc85..b94d511038 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -66,7 +66,7 @@ export async function resumeHook( payload: T ): Promise { return await waitedUntil(() => { - return trace('HOOK.resume', async (span) => { + return trace('hook.resume', async (span) => { const world = getWorld(); try { diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index e3e4ced572..13e001b452 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -81,7 +81,7 @@ export async function start( ); } - return trace(`WORKFLOW.start ${workflowName}`, async (span) => { + return trace(`workflow.start ${workflowName}`, async (span) => { span?.setAttributes({ ...Attribute.WorkflowName(workflowName), ...Attribute.WorkflowOperation('start'), diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index a208ac4497..3e511e647e 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -67,7 +67,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( const port = await getPort(); return trace( - `STEP ${stepName}`, + `step ${stepName}`, { kind: await getSpanKind('CONSUMER'), links: spanLinks }, async (span) => { span?.setAttributes({ diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 5bd9fd1126..3328f3e8b4 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -36,7 +36,7 @@ export async function runWorkflow( workflowRun: WorkflowRun, events: Event[] ): Promise { - return trace(`WORKFLOW.run ${workflowRun.workflowName}`, async (span) => { + return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => { span?.setAttributes({ ...Attribute.WorkflowName(workflowRun.workflowName), ...Attribute.WorkflowRunId(workflowRun.runId), diff --git a/packages/world-vercel/src/storage.ts b/packages/world-vercel/src/storage.ts index 665e495d95..09b27bfe6e 100644 --- a/packages/world-vercel/src/storage.ts +++ b/packages/world-vercel/src/storage.ts @@ -35,10 +35,11 @@ export function createStorage(config?: APIConfig): Storage { }; // Instrument all storage methods with tracing + // NOTE: Span names are lowercase per OTEL semantic conventions return { - runs: instrumentObject('WORLD.runs', storage.runs), - steps: instrumentObject('WORLD.steps', storage.steps), - events: instrumentObject('WORLD.events', storage.events), - hooks: instrumentObject('WORLD.hooks', storage.hooks), + runs: instrumentObject('world.runs', storage.runs), + steps: instrumentObject('world.steps', storage.steps), + events: instrumentObject('world.events', storage.events), + hooks: instrumentObject('world.hooks', storage.hooks), }; } diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index e9d0e8814b..61ec5bca16 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -235,7 +235,7 @@ export async function makeRequest({ // Standard OTEL span name for HTTP client: "{method}" // See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name return trace( - `HTTP ${method}`, + `http ${method}`, { kind: await getSpanKind('CLIENT') }, async (span) => { // Set standard OTEL HTTP client attributes @@ -298,7 +298,7 @@ export async function makeRequest({ // Parse the response body (CBOR or JSON) with tracing let parseResult: ParseResult; try { - parseResult = await trace('WORLD.parse', async (parseSpan) => { + parseResult = await trace('world.parse', async (parseSpan) => { const result = await parseResponseBody(response); // Extract format and size from debug context for attributes const contentType = response.headers.get('Content-Type') || ''; @@ -317,7 +317,7 @@ export async function makeRequest({ } // Validate against the schema with tracing - const result = await trace('WORLD.validate', async () => { + const result = await trace('world.validate', async () => { const validationResult = schema.safeParse(parseResult.data); if (!validationResult.success) { throw new WorkflowAPIError(