Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/add-queue-headers-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world": patch
"@workflow/world-vercel": patch
"@workflow/world-local": patch
"@workflow/core": patch
---

Add support for custom headers in queue messages
51 changes: 36 additions & 15 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,18 @@ const stepHandler = getWorldHandlers().createQueueHandler(
});

// Re-invoke the workflow to handle the failed step
await queueMessage(world, `__wkf_workflow_${workflowName}`, {
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
});
await queueMessage(
world,
`__wkf_workflow_${workflowName}`,
{
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
headers: { 'x-workflow-run-id': workflowRunId },
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

including this so we can log it on the queue server and eventually in sfr/proxy for correlation back to logs

}
);
return;
}

Expand Down Expand Up @@ -208,11 +215,18 @@ const stepHandler = getWorldHandlers().createQueueHandler(
'cancelled',
].includes(step.status);
if (isTerminalStep) {
await queueMessage(world, `__wkf_workflow_${workflowName}`, {
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
});
await queueMessage(
world,
`__wkf_workflow_${workflowName}`,
{
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
headers: { 'x-workflow-run-id': workflowRunId },
}
);
}
return;
}
Expand Down Expand Up @@ -472,11 +486,18 @@ const stepHandler = getWorldHandlers().createQueueHandler(
}
}

await queueMessage(world, `__wkf_workflow_${workflowName}`, {
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
});
await queueMessage(
world,
`__wkf_workflow_${workflowName}`,
{
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
},
{
headers: { 'x-workflow-run-id': workflowRunId },
}
);
}
);
});
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ export async function handleSuspension({
},
{
idempotencyKey: queueItem.correlationId,
headers: { 'x-workflow-run-id': runId },
}
);
})()
Expand Down
1 change: 1 addition & 0 deletions packages/world-local/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export function createQueue(config: Partial<Config>): Queue {
duplex: 'half',
dispatcher: httpAgent,
headers: {
...opts?.headers,
'content-type': 'application/json',
'x-vqs-queue-name': queueName,
'x-vqs-message-id': messageId,
Expand Down
3 changes: 3 additions & 0 deletions packages/world-postgres/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ export function createQueue(
);
const message = QueuePayloadSchema.parse(body);
const queueName = `${queue}${messageData.id}` as const;
// TODO: Custom headers from opts.headers are not propagated into MessageData.
// To support this, MessageData schema would need to include a headers field
// and the headers would need to be stored/retrieved from pg-boss job data.
await localWorld.queue(queueName, message, {
idempotencyKey: messageData.idempotencyKey,
});
Expand Down
1 change: 1 addition & 0 deletions packages/world-vercel/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ export function createQueue(config?: APIConfig): Queue {
{
idempotencyKey: opts?.idempotencyKey,
delaySeconds: opts?.delaySeconds,
headers: opts?.headers,
}
);
return { messageId: MessageId.parse(messageId) };
Expand Down
1 change: 1 addition & 0 deletions packages/world/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export type QueuePayload = z.infer<typeof QueuePayloadSchema>;
export interface QueueOptions {
deploymentId?: string;
idempotencyKey?: string;
headers?: Record<string, string>;
}

export interface Queue {
Expand Down
16 changes: 8 additions & 8 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pnpm-workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ catalog:
"@types/node": 22.19.0
"@vercel/functions": ^3.1.4
"@vercel/oidc": 3.0.5
"@vercel/queue": 0.0.0-alpha.34
"@vercel/queue": 0.0.0-alpha.36
"@vitest/coverage-v8": ^3.2.4
ai: 5.0.104
esbuild: ^0.25.11
Expand Down
Loading