diff --git a/.changeset/add-queue-headers-support.md b/.changeset/add-queue-headers-support.md new file mode 100644 index 0000000000..ac6aaffe0d --- /dev/null +++ b/.changeset/add-queue-headers-support.md @@ -0,0 +1,8 @@ +--- +"@workflow/world": patch +"@workflow/world-vercel": patch +"@workflow/world-local": patch +"@workflow/core": patch +--- + +Add support for custom headers in queue messages diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index f170c1bc57..38a6652b8a 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -168,11 +168,18 @@ const stepHandler = getWorldHandlers().createQueueHandler( }); // Re-invoke the workflow to handle the failed step - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); + await queueMessage( + world, + `__wkf_workflow_${workflowName}`, + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + headers: { 'x-workflow-run-id': workflowRunId }, + } + ); return; } @@ -208,11 +215,18 @@ const stepHandler = getWorldHandlers().createQueueHandler( 'cancelled', ].includes(step.status); if (isTerminalStep) { - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); + await queueMessage( + world, + `__wkf_workflow_${workflowName}`, + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + headers: { 'x-workflow-run-id': workflowRunId }, + } + ); } return; } @@ -472,11 +486,18 @@ const stepHandler = getWorldHandlers().createQueueHandler( } } - await queueMessage(world, `__wkf_workflow_${workflowName}`, { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); + await queueMessage( + world, + `__wkf_workflow_${workflowName}`, + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + }, + { + headers: { 'x-workflow-run-id': workflowRunId }, + } + ); } ); }); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index c1d11827a5..da83f02a08 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -182,6 +182,7 @@ export async function handleSuspension({ }, { idempotencyKey: queueItem.correlationId, + headers: { 'x-workflow-run-id': runId }, } ); })() diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 75d2886b96..03d70c6c14 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -100,6 +100,7 @@ export function createQueue(config: Partial): Queue { duplex: 'half', dispatcher: httpAgent, headers: { + ...opts?.headers, 'content-type': 'application/json', 'x-vqs-queue-name': queueName, 'x-vqs-message-id': messageId, diff --git a/packages/world-postgres/src/queue.ts b/packages/world-postgres/src/queue.ts index eba3bc978c..63c2f357a2 100644 --- a/packages/world-postgres/src/queue.ts +++ b/packages/world-postgres/src/queue.ts @@ -106,6 +106,9 @@ export function createQueue( ); const message = QueuePayloadSchema.parse(body); const queueName = `${queue}${messageData.id}` as const; + // TODO: Custom headers from opts.headers are not propagated into MessageData. + // To support this, MessageData schema would need to include a headers field + // and the headers would need to be stored/retrieved from pg-boss job data. await localWorld.queue(queueName, message, { idempotencyKey: messageData.idempotencyKey, }); diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 164ebb70e6..4d7b26efe7 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -114,6 +114,7 @@ export function createQueue(config?: APIConfig): Queue { { idempotencyKey: opts?.idempotencyKey, delaySeconds: opts?.delaySeconds, + headers: opts?.headers, } ); return { messageId: MessageId.parse(messageId) }; diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 16eea4b79a..743684f121 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -59,6 +59,7 @@ export type QueuePayload = z.infer; export interface QueueOptions { deploymentId?: string; idempotencyKey?: string; + headers?: Record; } export interface Queue { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5c08cfee20..65ca441c04 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -22,8 +22,8 @@ catalogs: specifier: 3.0.5 version: 3.0.5 '@vercel/queue': - specifier: 0.0.0-alpha.34 - version: 0.0.0-alpha.34 + specifier: 0.0.0-alpha.36 + version: 0.0.0-alpha.36 '@vitest/coverage-v8': specifier: ^3.2.4 version: 3.2.4 @@ -1203,7 +1203,7 @@ importers: dependencies: '@vercel/queue': specifier: 'catalog:' - version: 0.0.0-alpha.34 + version: 0.0.0-alpha.36 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -1249,7 +1249,7 @@ importers: dependencies: '@vercel/queue': specifier: 'catalog:' - version: 0.0.0-alpha.34 + version: 0.0.0-alpha.36 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -1347,7 +1347,7 @@ importers: version: 3.0.5 '@vercel/queue': specifier: 'catalog:' - version: 0.0.0-alpha.34 + version: 0.0.0-alpha.36 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -7971,8 +7971,8 @@ packages: '@opentelemetry/sdk-metrics': '>=1.19.0 <2.0.0' '@opentelemetry/sdk-trace-base': '>=1.19.0 <2.0.0' - '@vercel/queue@0.0.0-alpha.34': - resolution: {integrity: sha512-xy5MNbsAoN9W1gtjNkKEg8SHEsnoEj3KbQQH7EaAtqJ0ZfdPo13XLOdqvAR5IO+4X5F0nyPENMVFilzzaSAYiA==} + '@vercel/queue@0.0.0-alpha.36': + resolution: {integrity: sha512-+0RWV/ljyK0lXH7LYUbTJ02UJLhPfZIvzMOjhMdD6tEm8o+VzJGJY9KwIljohtdfeep78cFUGuWvNmT+bi29Wg==} engines: {node: '>=20.0.0'} '@vercel/routing-utils@5.3.0': @@ -22528,7 +22528,7 @@ snapshots: '@opentelemetry/sdk-metrics': 1.30.1(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-trace-base': 1.30.1(@opentelemetry/api@1.9.0) - '@vercel/queue@0.0.0-alpha.34': + '@vercel/queue@0.0.0-alpha.36': dependencies: '@vercel/oidc': 3.0.5 mixpart: 0.0.5-alpha.1 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index a74a99f483..489dd53dc1 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -10,7 +10,7 @@ catalog: "@types/node": 22.19.0 "@vercel/functions": ^3.1.4 "@vercel/oidc": 3.0.5 - "@vercel/queue": 0.0.0-alpha.34 + "@vercel/queue": 0.0.0-alpha.36 "@vitest/coverage-v8": ^3.2.4 ai: 5.0.104 esbuild: ^0.25.11