Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/core-otel-improvements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Add OTEL tracing for event loading and queue timing breakdown using standard OTEL semantic conventions
5 changes: 5 additions & 0 deletions .changeset/world-vercel-otel-improvements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-vercel": patch
---

Add OTEL tracing for HTTP requests and storage operations using standard OTEL semantic conventions
7 changes: 5 additions & 2 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ export function workflowEntrypoint(
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('execute'),
...Attribute.QueueName(metadata.queueName),
...Attribute.QueueMessageId(metadata.messageId),
// Standard OTEL messaging conventions
...Attribute.MessagingSystem('vercel-queue'),
...Attribute.MessagingDestinationName(metadata.queueName),
...Attribute.MessagingMessageId(metadata.messageId),
...Attribute.MessagingOperationType('process'),
...getQueueOverhead({ requestedAt }),
});

Expand Down
62 changes: 40 additions & 22 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,29 +249,42 @@ export async function healthCheck(
* Events must be in chronological order (ascending) for proper workflow replay.
*/
export async function getAllWorkflowRunEvents(runId: string): Promise<Event[]> {
const allEvents: Event[] = [];
let cursor: string | null = null;
let hasMore = true;

const world = getWorld();
while (hasMore) {
// TODO: we're currently loading all the data with resolveRef behaviour. We need to update this
// to lazyload the data from the world instead so that we can optimize and make the event log loading
// much faster and memory efficient
const response = await world.events.list({
runId,
pagination: {
sortOrder: 'asc', // Required: events must be in chronological order for replay
cursor: cursor ?? undefined,
},
return trace('workflow.loadEvents', async (span) => {
span?.setAttributes({
...Attribute.WorkflowRunId(runId),
});

allEvents.push(...response.data);
hasMore = response.hasMore;
cursor = response.cursor;
}
const allEvents: Event[] = [];
let cursor: string | null = null;
let hasMore = true;
let pagesLoaded = 0;

const world = getWorld();
while (hasMore) {
// TODO: we're currently loading all the data with resolveRef behaviour. We need to update this
// to lazyload the data from the world instead so that we can optimize and make the event log loading
// much faster and memory efficient
const response = await world.events.list({
runId,
pagination: {
sortOrder: 'asc', // Required: events must be in chronological order for replay
cursor: cursor ?? undefined,
},
});

allEvents.push(...response.data);
hasMore = response.hasMore;
cursor = response.cursor;
pagesLoaded++;
}

return allEvents;
span?.setAttributes({
...Attribute.WorkflowEventsCount(allEvents.length),
...Attribute.WorkflowEventsPagesLoaded(pagesLoaded),
});

return allEvents;
});
}

/**
Expand Down Expand Up @@ -328,12 +341,17 @@ export async function queueMessage(
await trace(
'queueMessage',
{
attributes: Attribute.QueueName(queueName),
// Standard OTEL messaging conventions
attributes: {
...Attribute.MessagingSystem('vercel-queue'),
...Attribute.MessagingDestinationName(queueName),
...Attribute.MessagingOperationType('publish'),
},
kind: await getSpanKind('PRODUCER'),
},
async (span) => {
const { messageId } = await world.queue(...args);
span?.setAttributes(Attribute.QueueMessageId(messageId));
span?.setAttributes(Attribute.MessagingMessageId(messageId));
}
);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function resumeHook<T = any>(
payload: T
): Promise<Hook> {
return await waitedUntil(() => {
return trace('HOOK.resume', async (span) => {
return trace('hook.resume', async (span) => {
const world = getWorld();

try {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export async function start<TArgs extends unknown[], TResult>(
);
}

return trace(`WORKFLOW.start ${workflowName}`, async (span) => {
return trace(`workflow.start ${workflowName}`, async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('start'),
Expand Down
30 changes: 27 additions & 3 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ const stepHandler = getWorldHandlers().createQueueHandler(
const port = await getPort();

return trace(
`STEP ${stepName}`,
`step ${stepName}`,
{ kind: await getSpanKind('CONSUMER'), links: spanLinks },
async (span) => {
span?.setAttributes({
...Attribute.StepName(stepName),
...Attribute.StepAttempt(metadata.attempt),
...Attribute.QueueName(metadata.queueName),
...Attribute.QueueMessageId(metadata.messageId),
// Standard OTEL messaging conventions
...Attribute.MessagingSystem('vercel-queue'),
...Attribute.MessagingDestinationName(metadata.queueName),
...Attribute.MessagingMessageId(metadata.messageId),
...Attribute.MessagingOperationType('process'),
...getQueueOverhead({ requestedAt }),
});

Expand Down Expand Up @@ -232,20 +235,29 @@ const stepHandler = getWorldHandlers().createQueueHandler(
);
}
// Hydrate the step input arguments, closure variables, and thisVal
// Track deserialization time for observability
// NOTE: This captures only the synchronous portion of hydration. Any async
// operations (e.g., stream loading) are added to `ops` and executed later
// via Promise.all(ops) - their timing is not included in this measurement.
const deserializeStartTime = Date.now();
const ops: Promise<void>[] = [];
const hydratedInput = hydrateStepArguments(
step.input,
ops,
workflowRunId
);
const deserializeTimeMs = Date.now() - deserializeStartTime;
Comment on lines +242 to +249
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deserializeTimeMs measurement (line 243) captures only the synchronous portion of hydrateStepArguments. However, hydrateStepArguments may initiate async operations that are added to the 'ops' array (line 237), and these operations are executed later via Promise.all(ops) at line 294. This means the deserialize timing doesn't fully capture all deserialization work. Consider documenting this limitation or restructuring to measure the complete deserialization time including async operations.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment documenting this limitation. The timing intentionally captures only the synchronous hydration - async operations like stream loading happen in background and their timing is included in the overall step execution time tracked separately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we also need separate spans explicitly for serializing and deserializing etc. rather than just including it as an attribute


const args = hydratedInput.args;
const thisVal = hydratedInput.thisVal ?? null;

span?.setAttributes({
...Attribute.StepArgumentsCount(args.length),
...Attribute.QueueDeserializeTimeMs(deserializeTimeMs),
});

// Track execution time for observability
const executionStartTime = Date.now();
result = await contextStorage.run(
{
stepMetadata: {
Expand All @@ -267,11 +279,23 @@ const stepHandler = getWorldHandlers().createQueueHandler(
},
() => stepFn.apply(thisVal, args)
);
const executionTimeMs = Date.now() - executionStartTime;

span?.setAttributes({
...Attribute.QueueExecutionTimeMs(executionTimeMs),
});

// NOTE: None of the code from this point is guaranteed to run
// Since the step might fail or cause a function timeout and the process might be SIGKILL'd
// The workflow runtime must be resilient to the below code not executing on a failed step
// Track serialization time for observability
const serializeStartTime = Date.now();
result = dehydrateStepReturnValue(result, ops, workflowRunId);
const serializeTimeMs = Date.now() - serializeStartTime;

span?.setAttributes({
...Attribute.QueueSerializeTimeMs(serializeTimeMs),
});

waitUntil(
Promise.all(ops).catch((err) => {
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ export async function getActiveSpan() {
return await withOtel((otel) => otel.trace.getActiveSpan());
}

/**
* Wraps all methods of an object with tracing spans.
* @param prefix - Prefix for span names (e.g., "WORLD.runs")
* @param o - Object with methods to instrument
* @returns Instrumented object with same interface
*/
export function instrumentObject<T extends object>(prefix: string, o: T): T {
const handlers = {} as T;
for (const key of Object.keys(o) as (keyof T)[]) {
Expand Down
86 changes: 77 additions & 9 deletions packages/core/src/telemetry/semantic-conventions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,31 @@ export const StepRetryWillRetry = SemanticConvention<boolean>(
'step.retry.will_retry'
);

// Queue attributes
// Queue/Messaging attributes - Standard OTEL messaging conventions
// See: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/

/** Name of the queue being used for message processing */
export const QueueName = SemanticConvention<string>('queue.name');
/** Messaging system identifier (standard OTEL: messaging.system) */
export const MessagingSystem = SemanticConvention<string>('messaging.system');

/** The message id being handled */
export const QueueMessageId = SemanticConvention<MessageId>(
'messaging.message.id',
'queue.message.id'
/** Destination name/queue name (standard OTEL: messaging.destination.name) */
export const MessagingDestinationName = SemanticConvention<string>(
'messaging.destination.name'
);

/** Time taken to enqueue the message in milliseconds */
export const QueueOverheadMs = SemanticConvention<number>('queue.overhead_ms');
/** The message id being handled (standard OTEL: messaging.message.id) */
export const MessagingMessageId = SemanticConvention<MessageId>(
'messaging.message.id'
);

/** Operation type (standard OTEL: messaging.operation.type) */
export const MessagingOperationType = SemanticConvention<
'publish' | 'receive' | 'process'
>('messaging.operation.type');

/** Time taken to enqueue the message in milliseconds (workflow-specific) */
export const QueueOverheadMs = SemanticConvention<number>(
'workflow.queue.overhead_ms'
);

// Deployment attributes

Expand Down Expand Up @@ -230,3 +242,59 @@ export const WorkflowSuspensionStepCount = SemanticConvention<number>(
export const WorkflowSuspensionWaitCount = SemanticConvention<number>(
'workflow.suspension.wait_count'
);

// World/Storage attributes - Standard OTEL HTTP conventions
// See: https://opentelemetry.io/docs/specs/semconv/http/http-spans/

/** HTTP request method (standard OTEL: http.request.method) */
export const HttpRequestMethod = SemanticConvention<string>(
'http.request.method'
);

/** Full URL of the request (standard OTEL: url.full) */
export const UrlFull = SemanticConvention<string>('url.full');

/** Server hostname (standard OTEL: server.address) */
export const ServerAddress = SemanticConvention<string>('server.address');

/** Server port (standard OTEL: server.port) */
export const ServerPort = SemanticConvention<number>('server.port');

/** HTTP response status code (standard OTEL: http.response.status_code) */
export const HttpResponseStatusCode = SemanticConvention<number>(
'http.response.status_code'
);

/** Error type when request fails (standard OTEL: error.type) */
export const ErrorType = SemanticConvention<string>('error.type');

// World-specific custom attributes (for workflow-specific context)

/** Format used for parsing response body (cbor or json) */
export const WorldParseFormat = SemanticConvention<'cbor' | 'json'>(
'workflow.world.parse.format'
);

// Event loading attributes

/** Number of pagination pages loaded when fetching workflow events */
export const WorkflowEventsPagesLoaded = SemanticConvention<number>(
'workflow.events.pages_loaded'
);

// Queue timing breakdown attributes (workflow-specific)

/** Time spent deserializing the queue message in milliseconds */
export const QueueDeserializeTimeMs = SemanticConvention<number>(
'workflow.queue.deserialize_time_ms'
);

/** Time spent executing the handler logic in milliseconds */
export const QueueExecutionTimeMs = SemanticConvention<number>(
'workflow.queue.execution_time_ms'
);

/** Time spent serializing the response in milliseconds */
export const QueueSerializeTimeMs = SemanticConvention<number>(
'workflow.queue.serialize_time_ms'
);
2 changes: 1 addition & 1 deletion packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export async function runWorkflow(
workflowRun: WorkflowRun,
events: Event[]
): Promise<Uint8Array | unknown> {
return trace(`WORKFLOW.run ${workflowRun.workflowName}`, async (span) => {
return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowRun.workflowName),
...Attribute.WorkflowRunId(workflowRun.runId),
Expand Down
9 changes: 9 additions & 0 deletions packages/world-vercel/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@
"cbor-x": "1.6.0",
"zod": "catalog:"
},
"peerDependencies": {
"@opentelemetry/api": "1"
},
"peerDependenciesMeta": {
"@opentelemetry/api": {
"optional": true
}
},
"devDependencies": {
"@opentelemetry/api": "1.9.0",
"@types/node": "catalog:",
"@workflow/tsconfig": "workspace:*",
"genversion": "3.2.0",
Expand Down
26 changes: 26 additions & 0 deletions packages/world-vercel/src/instrumentObject.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Utility to instrument object methods with tracing.
* This is a minimal version for world-vercel to avoid circular dependencies with @workflow/core.
*/
import { trace } from './telemetry.js';

/**
* Wraps all methods of an object with tracing spans.
* @param prefix - Prefix for span names (e.g., "WORLD.runs")
* @param o - Object with methods to instrument
* @returns Instrumented object with same interface
*/
export function instrumentObject<T extends object>(prefix: string, o: T): T {
const handlers = {} as T;
for (const key of Object.keys(o) as (keyof T)[]) {
if (typeof o[key] !== 'function') {
handlers[key] = o[key];
} else {
const f = o[key];
// @ts-expect-error
handlers[key] = async (...args: any[]) =>
trace(`${prefix}.${String(key)}`, {}, () => f(...args));
}
}
return handlers;
}
12 changes: 11 additions & 1 deletion packages/world-vercel/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { Storage } from '@workflow/world';
import { createWorkflowRunEvent, getWorkflowRunEvents } from './events.js';
import { getHook, getHookByToken, listHooks } from './hooks.js';
import { instrumentObject } from './instrumentObject.js';
import { getWorkflowRun, listWorkflowRuns } from './runs.js';
import { getStep, listWorkflowRunSteps } from './steps.js';
import type { APIConfig } from './utils.js';

export function createStorage(config?: APIConfig): Storage {
return {
const storage: Storage = {
// Storage interface with namespaced methods
runs: {
get: ((id: string, params?: any) =>
Expand All @@ -32,4 +33,13 @@ export function createStorage(config?: APIConfig): Storage {
list: (params) => listHooks(params, config),
},
};

// Instrument all storage methods with tracing
// NOTE: Span names are lowercase per OTEL semantic conventions
return {
runs: instrumentObject('world.runs', storage.runs),
steps: instrumentObject('world.steps', storage.steps),
events: instrumentObject('world.events', storage.events),
hooks: instrumentObject('world.hooks', storage.hooks),
};
}
Loading
Loading