From a51ad2198c51b9141d36cf532d033f5101e1dc9f Mon Sep 17 00:00:00 2001 From: tgarg Date: Mon, 23 Mar 2026 11:06:14 -0400 Subject: [PATCH 1/4] [wrangler] Validate queue consumer type in wrangler config --- .changeset/validate-queue-consumer-type.md | 10 ++ .../workers-utils/src/config/environment.ts | 4 +- .../workers-utils/src/config/validation.ts | 10 ++ packages/workers-utils/src/types.ts | 2 +- .../normalize-and-validate-config.test.ts | 41 +++++++ .../wrangler/src/__tests__/deploy/helpers.ts | 26 ----- .../src/__tests__/deploy/queues.test.ts | 108 +----------------- .../startDevWorker/LocalRuntimeController.ts | 13 ++- packages/wrangler/src/deploy/deploy.ts | 107 ++++++----------- 9 files changed, 111 insertions(+), 210 deletions(-) create mode 100644 .changeset/validate-queue-consumer-type.md diff --git a/.changeset/validate-queue-consumer-type.md b/.changeset/validate-queue-consumer-type.md new file mode 100644 index 0000000000..95c41d91df --- /dev/null +++ b/.changeset/validate-queue-consumer-type.md @@ -0,0 +1,10 @@ +--- +"wrangler": patch +"@cloudflare/workers-utils": patch +--- + +Validate that queue consumers in wrangler config only use the "worker" type + +Previously, non-worker consumer types (e.g. `http_pull`) could be specified in the `queues.consumers` config. Now, wrangler will error if a consumer `type` other than `"worker"` is specified in the config file. + +To configure non-worker consumer types, use the `wrangler queues consumer` CLI commands instead (e.g. `wrangler queues consumer http-pull add`). diff --git a/packages/workers-utils/src/config/environment.ts b/packages/workers-utils/src/config/environment.ts index 4836d7e9a6..b9a0cf70a0 100644 --- a/packages/workers-utils/src/config/environment.ts +++ b/packages/workers-utils/src/config/environment.ts @@ -863,8 +863,8 @@ export interface EnvironmentNonInheritable { /** The name of the queue from which this consumer should consume. */ queue: string; - /** The consumer type, e.g., worker, http-pull, r2-bucket, etc. Default is worker. */ - type?: string; + /** The consumer type. Only "worker" is supported in wrangler config. Default is "worker". */ + type?: "worker"; /** The maximum number of messages per batch */ max_batch_size?: number; diff --git a/packages/workers-utils/src/config/validation.ts b/packages/workers-utils/src/config/validation.ts index 19997250de..e60ca0de7c 100644 --- a/packages/workers-utils/src/config/validation.ts +++ b/packages/workers-utils/src/config/validation.ts @@ -4425,6 +4425,16 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => { ); } + // Validate that consumer type, if specified, is "worker". + // Non-worker consumer types (e.g., "http_pull") cannot be configured via + // wrangler config. Use `wrangler queues consumer http add` instead. + if ("type" in value && value.type !== undefined && value.type !== "worker") { + diagnostics.errors.push( + `"${field}.type" has an invalid value "${value.type}". Only "worker" consumers can be configured in your Wrangler configuration.` + ); + isValid = false; + } + const options: { key: string; type: "number" | "string" | "boolean"; diff --git a/packages/workers-utils/src/types.ts b/packages/workers-utils/src/types.ts index ec96a43bad..c36a57384f 100644 --- a/packages/workers-utils/src/types.ts +++ b/packages/workers-utils/src/types.ts @@ -276,7 +276,7 @@ export type Trigger = | ({ type: "route" } & ZoneNameRoute) | ({ type: "route" } & CustomDomainRoute) | { type: "cron"; cron: string } - | ({ type: "queue-consumer" } & QueueConsumer); + | ({ type: "queue-consumer" } & Omit); type BindingOmit = Omit; type NameOmit = Omit; diff --git a/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts b/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts index d3d6375953..33042d2600 100644 --- a/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts +++ b/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts @@ -3432,6 +3432,47 @@ describe("normalizeAndValidateConfig()", () => { `); }); + it("should error if queues consumer type is not 'worker'", ({ + expect, + }) => { + const { diagnostics } = normalizeAndValidateConfig( + { + queues: { + consumers: [ + { queue: "myQueue", type: "http_pull" }, + { queue: "myQueue2", type: "r2_bucket" }, + ], + }, + } as unknown as RawConfig, + undefined, + undefined, + { env: undefined } + ); + + expect(diagnostics.renderErrors()).toMatchInlineSnapshot(` + "Processing wrangler configuration: + - "queues.consumers[0].type" has an invalid value "http_pull". Only "worker" consumers can be configured in your Wrangler configuration. + - "queues.consumers[1].type" has an invalid value "r2_bucket". Only "worker" consumers can be configured in your Wrangler configuration." + `); + }); + + it("should allow queues consumer type 'worker' explicitly", ({ + expect, + }) => { + const { diagnostics } = normalizeAndValidateConfig( + { + queues: { + consumers: [{ queue: "myQueue", type: "worker" }], + }, + } as unknown as RawConfig, + undefined, + undefined, + { env: undefined } + ); + + expect(diagnostics.hasErrors()).toBe(false); + }); + it("should error if queues consumers are not valid", ({ expect }) => { const { config, diagnostics } = normalizeAndValidateConfig( { diff --git a/packages/wrangler/src/__tests__/deploy/helpers.ts b/packages/wrangler/src/__tests__/deploy/helpers.ts index 391db51c4b..40eaaec572 100644 --- a/packages/wrangler/src/__tests__/deploy/helpers.ts +++ b/packages/wrangler/src/__tests__/deploy/helpers.ts @@ -657,32 +657,6 @@ export function mockPostConsumerById( return requests; } -export function mockPostQueueHTTPConsumer( - expectedQueueId: string, - expectedBody: PostTypedConsumerBody -) { - const requests = { count: 0 }; - msw.use( - http.post( - `*/accounts/:accountId/queues/:queueId/consumers`, - async ({ request, params }) => { - const body = await request.json(); - expect(params.queueId).toEqual(expectedQueueId); - expect(params.accountId).toEqual("some-account-id"); - expect(body).toEqual(expectedBody); - requests.count += 1; - return HttpResponse.json({ - success: true, - errors: [], - messages: [], - result: {}, - }); - } - ) - ); - return requests; -} - export const mockAUSRequest = async ( bodies?: AssetManifest[], buckets: string[][] = [[]], diff --git a/packages/wrangler/src/__tests__/deploy/queues.test.ts b/packages/wrangler/src/__tests__/deploy/queues.test.ts index cb4f030b5d..1f5c7e35a6 100644 --- a/packages/wrangler/src/__tests__/deploy/queues.test.ts +++ b/packages/wrangler/src/__tests__/deploy/queues.test.ts @@ -23,7 +23,6 @@ import { mockLastDeploymentRequest, mockPatchScriptSettings, mockPostConsumerById, - mockPostQueueHTTPConsumer, mockPutQueueConsumerById, } from "./helpers"; import type { QueueResponse } from "../../queues/client"; @@ -433,62 +432,7 @@ describe("deploy", () => { `); }); - it("should post queue http consumers on deploy", async ({ expect }) => { - writeWranglerConfig({ - queues: { - consumers: [ - { - queue: queueName, - type: "http_pull", - dead_letter_queue: "myDLQ", - max_batch_size: 5, - visibility_timeout_ms: 4000, - max_retries: 10, - retry_delay: 1, - }, - ], - }, - }); - await fs.promises.writeFile("index.js", `export default {};`); - mockSubDomainRequest(); - mockUploadWorkerRequest(); - const existingQueue: QueueResponse = { - queue_id: queueId, - queue_name: queueName, - created_on: "", - producers: [], - consumers: [], - producers_total_count: 0, - consumers_total_count: 0, - modified_on: "", - }; - mockGetQueueByName(queueName, existingQueue); - mockPostQueueHTTPConsumer(queueId, { - type: "http_pull", - dead_letter_queue: "myDLQ", - settings: { - batch_size: 5, - max_retries: 10, - visibility_timeout_ms: 4000, - retry_delay: 1, - }, - }); - await runWrangler("deploy index.js"); - expect(std.out).toMatchInlineSnapshot(` - " - ⛅️ wrangler x.x.x - ────────────────── - Total Upload: xx KiB / gzip: xx KiB - Worker Startup Time: 100 ms - Uploaded test-name (TIMINGS) - Deployed test-name triggers (TIMINGS) - https://test-name.test-sub-domain.workers.dev - Consumer for queue1 - Current Version ID: Galaxy-Class" - `); - }); - - it("should update queue http consumers when one already exists for queue", async ({ + it("should reject http_pull consumer type in config", async ({ expect, }) => { writeWranglerConfig({ @@ -502,55 +446,9 @@ describe("deploy", () => { }, }); await fs.promises.writeFile("index.js", `export default {};`); - mockSubDomainRequest(); - mockUploadWorkerRequest(); - const existingQueue: QueueResponse = { - queue_id: queueId, - queue_name: queueName, - created_on: "", - producers: [], - consumers: [ - { - type: "http_pull", - consumer_id: "queue1-consumer-id", - settings: {}, - }, - ], - producers_total_count: 0, - consumers_total_count: 0, - modified_on: "", - }; - mockGetQueueByName(queueName, existingQueue); - - msw.use( - http.put( - `*/accounts/:accountId/queues/:queueId/consumers/:consumerId`, - async ({ params }) => { - expect(params.queueId).toEqual(queueId); - expect(params.consumerId).toEqual("queue1-consumer-id"); - expect(params.accountId).toEqual("some-account-id"); - return HttpResponse.json({ - success: true, - errors: [], - messages: [], - result: null, - }); - } - ) + await expect(runWrangler("deploy index.js")).rejects.toThrowError( + /Only "worker" consumers can be configured in your Wrangler configuration/ ); - await runWrangler("deploy index.js"); - expect(std.out).toMatchInlineSnapshot(` - " - ⛅️ wrangler x.x.x - ────────────────── - Total Upload: xx KiB / gzip: xx KiB - Worker Startup Time: 100 ms - Uploaded test-name (TIMINGS) - Deployed test-name triggers (TIMINGS) - https://test-name.test-sub-domain.workers.dev - Consumer for queue1 - Current Version ID: Galaxy-Class" - `); }); it("should support queue consumer concurrency with a max concurrency specified", async ({ diff --git a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts index 33d60ca8f5..4a0649e3a6 100644 --- a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts +++ b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts @@ -55,7 +55,8 @@ export async function convertToConfigBundle( if (trigger.type === "cron") { crons.push(trigger.cron); } else if (trigger.type === "queue-consumer") { - queueConsumers.push(trigger); + const { type: _, ...consumer } = trigger; + queueConsumers.push(consumer); } } if (event.bundle.entry.format === "service-worker") { @@ -108,7 +109,7 @@ export async function convertToConfigBundle( assetDirectory: "", excludePatterns: event.config.legacy?.site?.exclude ?? [], includePatterns: event.config.legacy?.site?.include ?? [], - } + } : undefined, assets: event.config?.assets, initialPort: undefined, @@ -119,12 +120,12 @@ export async function convertToConfigBundle( inspect: false, inspectorPort: undefined, inspectorHost: undefined, - } + } : { inspect: true, inspectorPort: 0, inspectorHost: event.config.dev.inspector?.hostname, - }), + }), localPersistencePath: event.config.dev.persist, liveReload: event.config.dev?.liveReload ?? false, crons, @@ -210,7 +211,7 @@ export class LocalRuntimeController extends RuntimeController { const auth = Object.keys(remoteBindings).length === 0 ? // If there are no remote bindings (this is a local only session) there's no need to get auth data - undefined + undefined : await unwrapHook(data.config.dev.auth); this.#remoteProxySessionData = @@ -341,7 +342,7 @@ export class LocalRuntimeController extends RuntimeController { port: userWorkerInspectorUrl.port, pathname: `/core:user:${getName(data.config)}`, }, - } + } : {}), userWorkerInnerUrlOverrides: { protocol: data.config?.dev?.origin?.secure ? "https:" : "http:", diff --git a/packages/wrangler/src/deploy/deploy.ts b/packages/wrangler/src/deploy/deploy.ts index a807da43c7..03c7b8fc30 100644 --- a/packages/wrangler/src/deploy/deploy.ts +++ b/packages/wrangler/src/deploy/deploy.ts @@ -56,7 +56,6 @@ import { getQueue, postConsumer, putConsumer, - putConsumerById, } from "../queues/client"; import { parseBulkInputToObject } from "../secret"; import { syncWorkersSite } from "../sites"; @@ -1501,80 +1500,48 @@ export async function updateQueueConsumers( for (const consumer of consumers) { const queue = await getQueue(config, consumer.queue); - if (consumer.type === "http_pull") { - const body: PostTypedConsumerBody = { - type: consumer.type, - dead_letter_queue: consumer.dead_letter_queue, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - visibility_timeout_ms: consumer.visibility_timeout_ms, - retry_delay: consumer.retry_delay, - }, - }; - - const existingConsumer = queue.consumers && queue.consumers[0]; - if (existingConsumer) { - updateConsumers.push( - putConsumerById( - config, - queue.queue_id, - existingConsumer.consumer_id, - body - ).then(() => [`Consumer for ${consumer.queue}`]) - ); - continue; - } - updateConsumers.push( - postConsumer(config, consumer.queue, body).then(() => [ - `Consumer for ${consumer.queue}`, - ]) - ); - } else { - if (scriptName === undefined) { - // TODO: how can we reliably get the current script name? - throw new UserError( - "Script name is required to update queue consumers", - { telemetryMessage: true } - ); - } + if (scriptName === undefined) { + // TODO: how can we reliably get the current script name? + throw new UserError("Script name is required to update queue consumers", { + telemetryMessage: true, + }); + } - const body: PostTypedConsumerBody = { - type: "worker", - dead_letter_queue: consumer.dead_letter_queue, - script_name: scriptName, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - max_wait_time_ms: - consumer.max_batch_timeout !== undefined - ? 1000 * consumer.max_batch_timeout - : undefined, - max_concurrency: consumer.max_concurrency, - retry_delay: consumer.retry_delay, - }, - }; + const body: PostTypedConsumerBody = { + type: "worker", + dead_letter_queue: consumer.dead_letter_queue, + script_name: scriptName, + settings: { + batch_size: consumer.max_batch_size, + max_retries: consumer.max_retries, + max_wait_time_ms: + consumer.max_batch_timeout !== undefined + ? 1000 * consumer.max_batch_timeout + : undefined, + max_concurrency: consumer.max_concurrency, + retry_delay: consumer.retry_delay, + }, + }; - // Current script already assigned to queue? - const existingConsumer = - queue.consumers.filter( - (c) => c.script === scriptName || c.service === scriptName - ).length > 0; - const envName = undefined; // TODO: script environment for wrangler deploy? - if (existingConsumer) { - updateConsumers.push( - putConsumer(config, consumer.queue, scriptName, envName, body).then( - () => [`Consumer for ${consumer.queue}`] - ) - ); - continue; - } + // Current script already assigned to queue? + const existingConsumer = + queue.consumers.filter( + (c) => c.script === scriptName || c.service === scriptName + ).length > 0; + const envName = undefined; // TODO: script environment for wrangler deploy? + if (existingConsumer) { updateConsumers.push( - postConsumer(config, consumer.queue, body).then(() => [ - `Consumer for ${consumer.queue}`, - ]) + putConsumer(config, consumer.queue, scriptName, envName, body).then( + () => [`Consumer for ${consumer.queue}`] + ) ); + continue; } + updateConsumers.push( + postConsumer(config, consumer.queue, body).then(() => [ + `Consumer for ${consumer.queue}`, + ]) + ); } return updateConsumers; From 28e07f7a7e0ca04d64ed68e36c56f677c752113a Mon Sep 17 00:00:00 2001 From: Pete Bacon Darwin Date: Wed, 25 Mar 2026 21:07:04 +0000 Subject: [PATCH 2/4] fix: apply oxfmt formatting to LocalRuntimeController.ts --- .../src/api/startDevWorker/LocalRuntimeController.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts index 4a0649e3a6..e56d0f44cc 100644 --- a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts +++ b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts @@ -109,7 +109,7 @@ export async function convertToConfigBundle( assetDirectory: "", excludePatterns: event.config.legacy?.site?.exclude ?? [], includePatterns: event.config.legacy?.site?.include ?? [], - } + } : undefined, assets: event.config?.assets, initialPort: undefined, @@ -120,12 +120,12 @@ export async function convertToConfigBundle( inspect: false, inspectorPort: undefined, inspectorHost: undefined, - } + } : { inspect: true, inspectorPort: 0, inspectorHost: event.config.dev.inspector?.hostname, - }), + }), localPersistencePath: event.config.dev.persist, liveReload: event.config.dev?.liveReload ?? false, crons, @@ -211,7 +211,7 @@ export class LocalRuntimeController extends RuntimeController { const auth = Object.keys(remoteBindings).length === 0 ? // If there are no remote bindings (this is a local only session) there's no need to get auth data - undefined + undefined : await unwrapHook(data.config.dev.auth); this.#remoteProxySessionData = @@ -342,7 +342,7 @@ export class LocalRuntimeController extends RuntimeController { port: userWorkerInspectorUrl.port, pathname: `/core:user:${getName(data.config)}`, }, - } + } : {}), userWorkerInnerUrlOverrides: { protocol: data.config?.dev?.origin?.secure ? "https:" : "http:", From 2a6afd1d295c5d4ab28f46fdbe900250b9fc7a20 Mon Sep 17 00:00:00 2001 From: Pete Bacon Darwin Date: Wed, 25 Mar 2026 22:16:02 +0000 Subject: [PATCH 3/4] fix: cast http_pull type in test to satisfy TS while testing runtime validation --- packages/wrangler/src/__tests__/deploy/queues.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/wrangler/src/__tests__/deploy/queues.test.ts b/packages/wrangler/src/__tests__/deploy/queues.test.ts index 1f5c7e35a6..71f16f548c 100644 --- a/packages/wrangler/src/__tests__/deploy/queues.test.ts +++ b/packages/wrangler/src/__tests__/deploy/queues.test.ts @@ -440,7 +440,8 @@ describe("deploy", () => { consumers: [ { queue: queueName, - type: "http_pull", + // Cast needed to simulate invalid user input that bypasses static type checking; runtime validation is what this test exercises + type: "http_pull" as "worker", }, ], }, From 6142443fdb5aaf62e15b8b849d9a8f23dd3e7e9b Mon Sep 17 00:00:00 2001 From: Pete Bacon Darwin Date: Thu, 26 Mar 2026 10:01:17 +0000 Subject: [PATCH 4/4] fix formatting --- packages/wrangler/src/__tests__/deploy/queues.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/wrangler/src/__tests__/deploy/queues.test.ts b/packages/wrangler/src/__tests__/deploy/queues.test.ts index 71f16f548c..db4abf76f0 100644 --- a/packages/wrangler/src/__tests__/deploy/queues.test.ts +++ b/packages/wrangler/src/__tests__/deploy/queues.test.ts @@ -441,7 +441,7 @@ describe("deploy", () => { { queue: queueName, // Cast needed to simulate invalid user input that bypasses static type checking; runtime validation is what this test exercises - type: "http_pull" as "worker", + type: "http_pull" as "worker", }, ], },