diff --git a/.changeset/core-otel-improvements.md b/.changeset/core-otel-improvements.md new file mode 100644 index 0000000000..fc23eb59a7 --- /dev/null +++ b/.changeset/core-otel-improvements.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": patch +--- + +Add OTEL tracing for event loading and queue timing breakdown using standard OTEL semantic conventions diff --git a/.changeset/world-vercel-otel-improvements.md b/.changeset/world-vercel-otel-improvements.md new file mode 100644 index 0000000000..de5890d33c --- /dev/null +++ b/.changeset/world-vercel-otel-improvements.md @@ -0,0 +1,5 @@ +--- +"@workflow/world-vercel": patch +--- + +Add OTEL tracing for HTTP requests and storage operations using standard OTEL semantic conventions diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 8416a346df..d22406dd25 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -94,8 +94,11 @@ export function workflowEntrypoint( span?.setAttributes({ ...Attribute.WorkflowName(workflowName), ...Attribute.WorkflowOperation('execute'), - ...Attribute.QueueName(metadata.queueName), - ...Attribute.QueueMessageId(metadata.messageId), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), ...getQueueOverhead({ requestedAt }), }); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index ff505a7a06..0c1e361330 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -249,29 +249,42 @@ export async function healthCheck( * Events must be in chronological order (ascending) for proper workflow replay. */ export async function getAllWorkflowRunEvents(runId: string): Promise { - const allEvents: Event[] = []; - let cursor: string | null = null; - let hasMore = true; - - const world = getWorld(); - while (hasMore) { - // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this - // to lazyload the data from the world instead so that we can optimize and make the event log loading - // much faster and memory efficient - const response = await world.events.list({ - runId, - pagination: { - sortOrder: 'asc', // Required: events must be in chronological order for replay - cursor: cursor ?? undefined, - }, + return trace('workflow.loadEvents', async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowRunId(runId), }); - allEvents.push(...response.data); - hasMore = response.hasMore; - cursor = response.cursor; - } + const allEvents: Event[] = []; + let cursor: string | null = null; + let hasMore = true; + let pagesLoaded = 0; + + const world = getWorld(); + while (hasMore) { + // TODO: we're currently loading all the data with resolveRef behaviour. We need to update this + // to lazyload the data from the world instead so that we can optimize and make the event log loading + // much faster and memory efficient + const response = await world.events.list({ + runId, + pagination: { + sortOrder: 'asc', // Required: events must be in chronological order for replay + cursor: cursor ?? undefined, + }, + }); + + allEvents.push(...response.data); + hasMore = response.hasMore; + cursor = response.cursor; + pagesLoaded++; + } - return allEvents; + span?.setAttributes({ + ...Attribute.WorkflowEventsCount(allEvents.length), + ...Attribute.WorkflowEventsPagesLoaded(pagesLoaded), + }); + + return allEvents; + }); } /** @@ -328,12 +341,17 @@ export async function queueMessage( await trace( 'queueMessage', { - attributes: Attribute.QueueName(queueName), + // Standard OTEL messaging conventions + attributes: { + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(queueName), + ...Attribute.MessagingOperationType('publish'), + }, kind: await getSpanKind('PRODUCER'), }, async (span) => { const { messageId } = await world.queue(...args); - span?.setAttributes(Attribute.QueueMessageId(messageId)); + span?.setAttributes(Attribute.MessagingMessageId(messageId)); } ); } diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 339b1edc85..b94d511038 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -66,7 +66,7 @@ export async function resumeHook( payload: T ): Promise { return await waitedUntil(() => { - return trace('HOOK.resume', async (span) => { + return trace('hook.resume', async (span) => { const world = getWorld(); try { diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index e3e4ced572..13e001b452 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -81,7 +81,7 @@ export async function start( ); } - return trace(`WORKFLOW.start ${workflowName}`, async (span) => { + return trace(`workflow.start ${workflowName}`, async (span) => { span?.setAttributes({ ...Attribute.WorkflowName(workflowName), ...Attribute.WorkflowOperation('start'), diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 1864d0e0b4..3e511e647e 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -67,14 +67,17 @@ const stepHandler = getWorldHandlers().createQueueHandler( const port = await getPort(); return trace( - `STEP ${stepName}`, + `step ${stepName}`, { kind: await getSpanKind('CONSUMER'), links: spanLinks }, async (span) => { span?.setAttributes({ ...Attribute.StepName(stepName), ...Attribute.StepAttempt(metadata.attempt), - ...Attribute.QueueName(metadata.queueName), - ...Attribute.QueueMessageId(metadata.messageId), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), ...getQueueOverhead({ requestedAt }), }); @@ -232,20 +235,29 @@ const stepHandler = getWorldHandlers().createQueueHandler( ); } // Hydrate the step input arguments, closure variables, and thisVal + // Track deserialization time for observability + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. + const deserializeStartTime = Date.now(); const ops: Promise[] = []; const hydratedInput = hydrateStepArguments( step.input, ops, workflowRunId ); + const deserializeTimeMs = Date.now() - deserializeStartTime; const args = hydratedInput.args; const thisVal = hydratedInput.thisVal ?? null; span?.setAttributes({ ...Attribute.StepArgumentsCount(args.length), + ...Attribute.QueueDeserializeTimeMs(deserializeTimeMs), }); + // Track execution time for observability + const executionStartTime = Date.now(); result = await contextStorage.run( { stepMetadata: { @@ -267,11 +279,23 @@ const stepHandler = getWorldHandlers().createQueueHandler( }, () => stepFn.apply(thisVal, args) ); + const executionTimeMs = Date.now() - executionStartTime; + + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); // NOTE: None of the code from this point is guaranteed to run // Since the step might fail or cause a function timeout and the process might be SIGKILL'd // The workflow runtime must be resilient to the below code not executing on a failed step + // Track serialization time for observability + const serializeStartTime = Date.now(); result = dehydrateStepReturnValue(result, ops, workflowRunId); + const serializeTimeMs = Date.now() - serializeStartTime; + + span?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(serializeTimeMs), + }); waitUntil( Promise.all(ops).catch((err) => { diff --git a/packages/core/src/telemetry.ts b/packages/core/src/telemetry.ts index aef6dfeb60..17b3d6b86e 100644 --- a/packages/core/src/telemetry.ts +++ b/packages/core/src/telemetry.ts @@ -147,6 +147,12 @@ export async function getActiveSpan() { return await withOtel((otel) => otel.trace.getActiveSpan()); } +/** + * Wraps all methods of an object with tracing spans. + * @param prefix - Prefix for span names (e.g., "WORLD.runs") + * @param o - Object with methods to instrument + * @returns Instrumented object with same interface + */ export function instrumentObject(prefix: string, o: T): T { const handlers = {} as T; for (const key of Object.keys(o) as (keyof T)[]) { diff --git a/packages/core/src/telemetry/semantic-conventions.ts b/packages/core/src/telemetry/semantic-conventions.ts index c996ced0ea..b9ca1017e5 100644 --- a/packages/core/src/telemetry/semantic-conventions.ts +++ b/packages/core/src/telemetry/semantic-conventions.ts @@ -179,19 +179,31 @@ export const StepRetryWillRetry = SemanticConvention( 'step.retry.will_retry' ); -// Queue attributes +// Queue/Messaging attributes - Standard OTEL messaging conventions +// See: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/ -/** Name of the queue being used for message processing */ -export const QueueName = SemanticConvention('queue.name'); +/** Messaging system identifier (standard OTEL: messaging.system) */ +export const MessagingSystem = SemanticConvention('messaging.system'); -/** The message id being handled */ -export const QueueMessageId = SemanticConvention( - 'messaging.message.id', - 'queue.message.id' +/** Destination name/queue name (standard OTEL: messaging.destination.name) */ +export const MessagingDestinationName = SemanticConvention( + 'messaging.destination.name' ); -/** Time taken to enqueue the message in milliseconds */ -export const QueueOverheadMs = SemanticConvention('queue.overhead_ms'); +/** The message id being handled (standard OTEL: messaging.message.id) */ +export const MessagingMessageId = SemanticConvention( + 'messaging.message.id' +); + +/** Operation type (standard OTEL: messaging.operation.type) */ +export const MessagingOperationType = SemanticConvention< + 'publish' | 'receive' | 'process' +>('messaging.operation.type'); + +/** Time taken to enqueue the message in milliseconds (workflow-specific) */ +export const QueueOverheadMs = SemanticConvention( + 'workflow.queue.overhead_ms' +); // Deployment attributes @@ -230,3 +242,59 @@ export const WorkflowSuspensionStepCount = SemanticConvention( export const WorkflowSuspensionWaitCount = SemanticConvention( 'workflow.suspension.wait_count' ); + +// World/Storage attributes - Standard OTEL HTTP conventions +// See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/ + +/** HTTP request method (standard OTEL: http.request.method) */ +export const HttpRequestMethod = SemanticConvention( + 'http.request.method' +); + +/** Full URL of the request (standard OTEL: url.full) */ +export const UrlFull = SemanticConvention('url.full'); + +/** Server hostname (standard OTEL: server.address) */ +export const ServerAddress = SemanticConvention('server.address'); + +/** Server port (standard OTEL: server.port) */ +export const ServerPort = SemanticConvention('server.port'); + +/** HTTP response status code (standard OTEL: http.response.status_code) */ +export const HttpResponseStatusCode = SemanticConvention( + 'http.response.status_code' +); + +/** Error type when request fails (standard OTEL: error.type) */ +export const ErrorType = SemanticConvention('error.type'); + +// World-specific custom attributes (for workflow-specific context) + +/** Format used for parsing response body (cbor or json) */ +export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( + 'workflow.world.parse.format' +); + +// Event loading attributes + +/** Number of pagination pages loaded when fetching workflow events */ +export const WorkflowEventsPagesLoaded = SemanticConvention( + 'workflow.events.pages_loaded' +); + +// Queue timing breakdown attributes (workflow-specific) + +/** Time spent deserializing the queue message in milliseconds */ +export const QueueDeserializeTimeMs = SemanticConvention( + 'workflow.queue.deserialize_time_ms' +); + +/** Time spent executing the handler logic in milliseconds */ +export const QueueExecutionTimeMs = SemanticConvention( + 'workflow.queue.execution_time_ms' +); + +/** Time spent serializing the response in milliseconds */ +export const QueueSerializeTimeMs = SemanticConvention( + 'workflow.queue.serialize_time_ms' +); diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 5bd9fd1126..3328f3e8b4 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -36,7 +36,7 @@ export async function runWorkflow( workflowRun: WorkflowRun, events: Event[] ): Promise { - return trace(`WORKFLOW.run ${workflowRun.workflowName}`, async (span) => { + return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => { span?.setAttributes({ ...Attribute.WorkflowName(workflowRun.workflowName), ...Attribute.WorkflowRunId(workflowRun.runId), diff --git a/packages/world-vercel/package.json b/packages/world-vercel/package.json index 7630be2752..3598bf7b9e 100644 --- a/packages/world-vercel/package.json +++ b/packages/world-vercel/package.json @@ -37,7 +37,16 @@ "cbor-x": "1.6.0", "zod": "catalog:" }, + "peerDependencies": { + "@opentelemetry/api": "1" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } + }, "devDependencies": { + "@opentelemetry/api": "1.9.0", "@types/node": "catalog:", "@workflow/tsconfig": "workspace:*", "genversion": "3.2.0", diff --git a/packages/world-vercel/src/instrumentObject.ts b/packages/world-vercel/src/instrumentObject.ts new file mode 100644 index 0000000000..a9813a1171 --- /dev/null +++ b/packages/world-vercel/src/instrumentObject.ts @@ -0,0 +1,26 @@ +/** + * Utility to instrument object methods with tracing. + * This is a minimal version for world-vercel to avoid circular dependencies with @workflow/core. + */ +import { trace } from './telemetry.js'; + +/** + * Wraps all methods of an object with tracing spans. + * @param prefix - Prefix for span names (e.g., "WORLD.runs") + * @param o - Object with methods to instrument + * @returns Instrumented object with same interface + */ +export function instrumentObject(prefix: string, o: T): T { + const handlers = {} as T; + for (const key of Object.keys(o) as (keyof T)[]) { + if (typeof o[key] !== 'function') { + handlers[key] = o[key]; + } else { + const f = o[key]; + // @ts-expect-error + handlers[key] = async (...args: any[]) => + trace(`${prefix}.${String(key)}`, {}, () => f(...args)); + } + } + return handlers; +} diff --git a/packages/world-vercel/src/storage.ts b/packages/world-vercel/src/storage.ts index d6880a2390..09b27bfe6e 100644 --- a/packages/world-vercel/src/storage.ts +++ b/packages/world-vercel/src/storage.ts @@ -1,12 +1,13 @@ import type { Storage } from '@workflow/world'; import { createWorkflowRunEvent, getWorkflowRunEvents } from './events.js'; import { getHook, getHookByToken, listHooks } from './hooks.js'; +import { instrumentObject } from './instrumentObject.js'; import { getWorkflowRun, listWorkflowRuns } from './runs.js'; import { getStep, listWorkflowRunSteps } from './steps.js'; import type { APIConfig } from './utils.js'; export function createStorage(config?: APIConfig): Storage { - return { + const storage: Storage = { // Storage interface with namespaced methods runs: { get: ((id: string, params?: any) => @@ -32,4 +33,13 @@ export function createStorage(config?: APIConfig): Storage { list: (params) => listHooks(params, config), }, }; + + // Instrument all storage methods with tracing + // NOTE: Span names are lowercase per OTEL semantic conventions + return { + runs: instrumentObject('world.runs', storage.runs), + steps: instrumentObject('world.steps', storage.steps), + events: instrumentObject('world.events', storage.events), + hooks: instrumentObject('world.hooks', storage.hooks), + }; } diff --git a/packages/world-vercel/src/telemetry.ts b/packages/world-vercel/src/telemetry.ts new file mode 100644 index 0000000000..a9c7333722 --- /dev/null +++ b/packages/world-vercel/src/telemetry.ts @@ -0,0 +1,119 @@ +/** + * Minimal telemetry utilities for world-vercel package. + * + * NOTE: This module intentionally duplicates semantic conventions from @workflow/core + * to avoid a circular dependency (world-vercel cannot depend on core). + * If you update conventions here, ensure @workflow/core/telemetry/semantic-conventions.ts + * remains synchronized. + * + * NOTE: Unlike the trace() function in @workflow/core, this implementation does not + * have special handling for WorkflowSuspension errors because world-vercel operates + * at the HTTP layer and never encounters workflow suspension effects. + */ +import type * as api from '@opentelemetry/api'; +import type { Span, SpanKind, SpanOptions } from '@opentelemetry/api'; + +// Lazy load OpenTelemetry API to make it optional +let otelApiPromise: Promise | null = null; + +async function getOtelApi(): Promise { + if (!otelApiPromise) { + otelApiPromise = import('@opentelemetry/api').catch(() => null); + } + return otelApiPromise; +} + +let tracerPromise: Promise | null = null; + +async function getTracer(): Promise { + if (!tracerPromise) { + tracerPromise = getOtelApi().then((otel) => + otel ? otel.trace.getTracer('workflow-world-vercel') : null + ); + } + return tracerPromise; +} + +/** + * Wrap an async function with a trace span. + * No-op if OpenTelemetry is not available. + */ +export async function trace( + spanName: string, + ...args: + | [fn: (span?: Span) => Promise] + | [opts: SpanOptions, fn: (span?: Span) => Promise] +): Promise { + const [tracer, otel] = await Promise.all([getTracer(), getOtelApi()]); + const { fn, opts } = + typeof args[0] === 'function' + ? { fn: args[0], opts: {} } + : { fn: args[1], opts: args[0] }; + if (!fn) throw new Error('Function to trace must be provided'); + + if (!tracer || !otel) { + return await fn(); + } + + return tracer.startActiveSpan(spanName, opts, async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: otel.SpanStatusCode.OK }); + return result; + } catch (e) { + span.setStatus({ + code: otel.SpanStatusCode.ERROR, + message: (e as Error).message, + }); + throw e; + } finally { + span.end(); + } + }); +} + +/** + * Get SpanKind enum value by name. + * Returns undefined if OpenTelemetry is not available. + */ +export async function getSpanKind( + field: keyof typeof SpanKind +): Promise { + const otel = await getOtelApi(); + if (!otel) return undefined; + return otel.SpanKind[field]; +} + +// Semantic conventions for World/Storage tracing +// Standard OTEL conventions: https://opentelemetry.io/docs/specs/semconv/http/http-spans/ +function SemanticConvention(...names: string[]) { + return (value: T) => + Object.fromEntries(names.map((name) => [name, value] as const)); +} + +/** HTTP request method (standard OTEL: http.request.method) */ +export const HttpRequestMethod = SemanticConvention( + 'http.request.method' +); + +/** Full URL of the request (standard OTEL: url.full) */ +export const UrlFull = SemanticConvention('url.full'); + +/** Server hostname (standard OTEL: server.address) */ +export const ServerAddress = SemanticConvention('server.address'); + +/** Server port (standard OTEL: server.port) */ +export const ServerPort = SemanticConvention('server.port'); + +/** HTTP response status code (standard OTEL: http.response.status_code) */ +export const HttpResponseStatusCode = SemanticConvention( + 'http.response.status_code' +); + +/** Error type when request fails (standard OTEL: error.type) */ +export const ErrorType = SemanticConvention('error.type'); + +/** Format used for parsing response body (cbor or json) */ +export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>( + 'workflow.world.parse.format' +); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index c9d09e6842..61ec5bca16 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -5,6 +5,17 @@ import { WorkflowAPIError } from '@workflow/errors'; import { type StructuredError, StructuredErrorSchema } from '@workflow/world'; import { decode, encode } from 'cbor-x'; import type { z } from 'zod'; +import { + trace, + getSpanKind, + HttpRequestMethod, + HttpResponseStatusCode, + UrlFull, + ServerAddress, + ServerPort, + ErrorType, + WorldParseFormat, +} from './telemetry.js'; import { version } from './version.js'; /** @@ -202,69 +213,124 @@ export async function makeRequest({ /** Request body data - will be CBOR encoded */ data?: unknown; }): Promise { + const method = options.method || 'GET'; const { baseUrl, headers } = await getHttpConfig(config); - headers.set('Accept', 'application/cbor'); - // NOTE: Add a unique header to bypass RSC request memoization. - // See: https://github.com/vercel/workflow/issues/618 - headers.set('X-Request-Time', Date.now().toString()); - - // Encode body as CBOR if data is provided - let body: Buffer | undefined; - if (data !== undefined) { - headers.set('Content-Type', 'application/cbor'); - body = encode(data); - } - const url = `${baseUrl}${endpoint}`; - const request = new Request(url, { - ...options, - body, - headers, - }); - const response = await fetch(request); - - if (!response.ok) { - const errorData: { message?: string; code?: string } = - await parseResponseBody(response) - .then((r) => r.data as { message?: string; code?: string }) - .catch(() => ({})); - if (process.env.DEBUG === '1') { - const stringifiedHeaders = Array.from(headers.entries()) - .map(([key, value]: [string, string]) => `-H "${key}: ${value}"`) - .join(' '); - console.error( - `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` - ); - } - throw new WorkflowAPIError( - errorData.message || - `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, - { url, status: response.status, code: errorData.code } - ); - } - // Parse the response body (CBOR or JSON) - let parseResult: ParseResult; + // Parse server address and port from URL for OTEL attributes + let serverAddress: string | undefined; + let serverPort: number | undefined; try { - parseResult = await parseResponseBody(response); - } catch (error) { - const contentType = response.headers.get('Content-Type') || 'unknown'; - throw new WorkflowAPIError( - `Failed to parse response body for ${request.method} ${endpoint} (Content-Type: ${contentType}):\n\n${error}`, - { url, cause: error } - ); + const parsedUrl = new URL(url); + serverAddress = parsedUrl.hostname; + serverPort = parsedUrl.port + ? parseInt(parsedUrl.port, 10) + : parsedUrl.protocol === 'https:' + ? 443 + : 80; + } catch { + // URL parsing failed, skip these attributes } - // Validate against the schema - const result = schema.safeParse(parseResult.data); - if (!result.success) { - throw new WorkflowAPIError( - `Schema validation failed for ${request.method} ${endpoint}:\n\n${result.error}\n\nResponse context: ${parseResult.getDebugContext()}`, - { url, cause: result.error } - ); - } + // Standard OTEL span name for HTTP client: "{method}" + // See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name + return trace( + `http ${method}`, + { kind: await getSpanKind('CLIENT') }, + async (span) => { + // Set standard OTEL HTTP client attributes + span?.setAttributes({ + ...HttpRequestMethod(method), + ...UrlFull(url), + ...(serverAddress && ServerAddress(serverAddress)), + ...(serverPort && ServerPort(serverPort)), + }); - return result.data; + headers.set('Accept', 'application/cbor'); + // NOTE: Add a unique header to bypass RSC request memoization. + // See: https://github.com/vercel/workflow/issues/618 + headers.set('X-Request-Time', Date.now().toString()); + + // Encode body as CBOR if data is provided + let body: Buffer | undefined; + if (data !== undefined) { + headers.set('Content-Type', 'application/cbor'); + body = encode(data); + } + + const request = new Request(url, { + ...options, + body, + headers, + }); + const response = await fetch(request); + + span?.setAttributes({ + ...HttpResponseStatusCode(response.status), + }); + + if (!response.ok) { + const errorData: { message?: string; code?: string } = + await parseResponseBody(response) + .then((r) => r.data as { message?: string; code?: string }) + .catch(() => ({})); + if (process.env.DEBUG === '1') { + const stringifiedHeaders = Array.from(headers.entries()) + .map(([key, value]: [string, string]) => `-H "${key}: ${value}"`) + .join(' '); + console.error( + `Failed to fetch, reproduce with:\ncurl -X ${request.method} ${stringifiedHeaders} "${url}"` + ); + } + const error = new WorkflowAPIError( + errorData.message || + `${request.method} ${endpoint} -> HTTP ${response.status}: ${response.statusText}`, + { url, status: response.status, code: errorData.code } + ); + // Record error attributes per OTEL conventions + span?.setAttributes({ + ...ErrorType(errorData.code || `HTTP ${response.status}`), + }); + span?.recordException?.(error); + throw error; + } + + // Parse the response body (CBOR or JSON) with tracing + let parseResult: ParseResult; + try { + parseResult = await trace('world.parse', async (parseSpan) => { + const result = await parseResponseBody(response); + // Extract format and size from debug context for attributes + const contentType = response.headers.get('Content-Type') || ''; + const isCbor = contentType.includes('application/cbor'); + parseSpan?.setAttributes({ + ...WorldParseFormat(isCbor ? 'cbor' : 'json'), + }); + return result; + }); + } catch (error) { + const contentType = response.headers.get('Content-Type') || 'unknown'; + throw new WorkflowAPIError( + `Failed to parse response body for ${request.method} ${endpoint} (Content-Type: ${contentType}):\n\n${error}`, + { url, cause: error } + ); + } + + // Validate against the schema with tracing + const result = await trace('world.validate', async () => { + const validationResult = schema.safeParse(parseResult.data); + if (!validationResult.success) { + throw new WorkflowAPIError( + `Schema validation failed for ${request.method} ${endpoint}:\n\n${validationResult.error}\n\nResponse context: ${parseResult.getDebugContext()}`, + { url, cause: validationResult.error } + ); + } + return validationResult.data; + }); + + return result; + } + ); } interface ParseResult { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 19e030a252..5c08cfee20 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1361,6 +1361,9 @@ importers: specifier: 'catalog:' version: 4.1.11 devDependencies: + '@opentelemetry/api': + specifier: 1.9.0 + version: 1.9.0 '@types/node': specifier: 'catalog:' version: 22.19.0