diff --git a/.changeset/validate-queue-consumer-type.md b/.changeset/validate-queue-consumer-type.md new file mode 100644 index 0000000000..95c41d91df --- /dev/null +++ b/.changeset/validate-queue-consumer-type.md @@ -0,0 +1,10 @@ +--- +"wrangler": patch +"@cloudflare/workers-utils": patch +--- + +Validate that queue consumers in wrangler config only use the "worker" type + +Previously, non-worker consumer types (e.g. `http_pull`) could be specified in the `queues.consumers` config. Now, wrangler will error if a consumer `type` other than `"worker"` is specified in the config file. + +To configure non-worker consumer types, use the `wrangler queues consumer` CLI commands instead (e.g. `wrangler queues consumer http-pull add`). diff --git a/packages/workers-utils/src/config/environment.ts b/packages/workers-utils/src/config/environment.ts index 4836d7e9a6..b9a0cf70a0 100644 --- a/packages/workers-utils/src/config/environment.ts +++ b/packages/workers-utils/src/config/environment.ts @@ -863,8 +863,8 @@ export interface EnvironmentNonInheritable { /** The name of the queue from which this consumer should consume. */ queue: string; - /** The consumer type, e.g., worker, http-pull, r2-bucket, etc. Default is worker. */ - type?: string; + /** The consumer type. Only "worker" is supported in wrangler config. Default is "worker". */ + type?: "worker"; /** The maximum number of messages per batch */ max_batch_size?: number; diff --git a/packages/workers-utils/src/config/validation.ts b/packages/workers-utils/src/config/validation.ts index 19997250de..e60ca0de7c 100644 --- a/packages/workers-utils/src/config/validation.ts +++ b/packages/workers-utils/src/config/validation.ts @@ -4425,6 +4425,16 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => { ); } + // Validate that consumer type, if specified, is "worker". + // Non-worker consumer types (e.g., "http_pull") cannot be configured via + // wrangler config. Use `wrangler queues consumer http add` instead. + if ("type" in value && value.type !== undefined && value.type !== "worker") { + diagnostics.errors.push( + `"${field}.type" has an invalid value "${value.type}". Only "worker" consumers can be configured in your Wrangler configuration.` + ); + isValid = false; + } + const options: { key: string; type: "number" | "string" | "boolean"; diff --git a/packages/workers-utils/src/types.ts b/packages/workers-utils/src/types.ts index ec96a43bad..c36a57384f 100644 --- a/packages/workers-utils/src/types.ts +++ b/packages/workers-utils/src/types.ts @@ -276,7 +276,7 @@ export type Trigger = | ({ type: "route" } & ZoneNameRoute) | ({ type: "route" } & CustomDomainRoute) | { type: "cron"; cron: string } - | ({ type: "queue-consumer" } & QueueConsumer); + | ({ type: "queue-consumer" } & Omit); type BindingOmit = Omit; type NameOmit = Omit; diff --git a/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts b/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts index d3d6375953..33042d2600 100644 --- a/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts +++ b/packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts @@ -3432,6 +3432,47 @@ describe("normalizeAndValidateConfig()", () => { `); }); + it("should error if queues consumer type is not 'worker'", ({ + expect, + }) => { + const { diagnostics } = normalizeAndValidateConfig( + { + queues: { + consumers: [ + { queue: "myQueue", type: "http_pull" }, + { queue: "myQueue2", type: "r2_bucket" }, + ], + }, + } as unknown as RawConfig, + undefined, + undefined, + { env: undefined } + ); + + expect(diagnostics.renderErrors()).toMatchInlineSnapshot(` + "Processing wrangler configuration: + - "queues.consumers[0].type" has an invalid value "http_pull". Only "worker" consumers can be configured in your Wrangler configuration. + - "queues.consumers[1].type" has an invalid value "r2_bucket". Only "worker" consumers can be configured in your Wrangler configuration." + `); + }); + + it("should allow queues consumer type 'worker' explicitly", ({ + expect, + }) => { + const { diagnostics } = normalizeAndValidateConfig( + { + queues: { + consumers: [{ queue: "myQueue", type: "worker" }], + }, + } as unknown as RawConfig, + undefined, + undefined, + { env: undefined } + ); + + expect(diagnostics.hasErrors()).toBe(false); + }); + it("should error if queues consumers are not valid", ({ expect }) => { const { config, diagnostics } = normalizeAndValidateConfig( { diff --git a/packages/wrangler/src/__tests__/deploy/helpers.ts b/packages/wrangler/src/__tests__/deploy/helpers.ts index 391db51c4b..40eaaec572 100644 --- a/packages/wrangler/src/__tests__/deploy/helpers.ts +++ b/packages/wrangler/src/__tests__/deploy/helpers.ts @@ -657,32 +657,6 @@ export function mockPostConsumerById( return requests; } -export function mockPostQueueHTTPConsumer( - expectedQueueId: string, - expectedBody: PostTypedConsumerBody -) { - const requests = { count: 0 }; - msw.use( - http.post( - `*/accounts/:accountId/queues/:queueId/consumers`, - async ({ request, params }) => { - const body = await request.json(); - expect(params.queueId).toEqual(expectedQueueId); - expect(params.accountId).toEqual("some-account-id"); - expect(body).toEqual(expectedBody); - requests.count += 1; - return HttpResponse.json({ - success: true, - errors: [], - messages: [], - result: {}, - }); - } - ) - ); - return requests; -} - export const mockAUSRequest = async ( bodies?: AssetManifest[], buckets: string[][] = [[]], diff --git a/packages/wrangler/src/__tests__/deploy/queues.test.ts b/packages/wrangler/src/__tests__/deploy/queues.test.ts index cb4f030b5d..db4abf76f0 100644 --- a/packages/wrangler/src/__tests__/deploy/queues.test.ts +++ b/packages/wrangler/src/__tests__/deploy/queues.test.ts @@ -23,7 +23,6 @@ import { mockLastDeploymentRequest, mockPatchScriptSettings, mockPostConsumerById, - mockPostQueueHTTPConsumer, mockPutQueueConsumerById, } from "./helpers"; import type { QueueResponse } from "../../queues/client"; @@ -433,62 +432,7 @@ describe("deploy", () => { `); }); - it("should post queue http consumers on deploy", async ({ expect }) => { - writeWranglerConfig({ - queues: { - consumers: [ - { - queue: queueName, - type: "http_pull", - dead_letter_queue: "myDLQ", - max_batch_size: 5, - visibility_timeout_ms: 4000, - max_retries: 10, - retry_delay: 1, - }, - ], - }, - }); - await fs.promises.writeFile("index.js", `export default {};`); - mockSubDomainRequest(); - mockUploadWorkerRequest(); - const existingQueue: QueueResponse = { - queue_id: queueId, - queue_name: queueName, - created_on: "", - producers: [], - consumers: [], - producers_total_count: 0, - consumers_total_count: 0, - modified_on: "", - }; - mockGetQueueByName(queueName, existingQueue); - mockPostQueueHTTPConsumer(queueId, { - type: "http_pull", - dead_letter_queue: "myDLQ", - settings: { - batch_size: 5, - max_retries: 10, - visibility_timeout_ms: 4000, - retry_delay: 1, - }, - }); - await runWrangler("deploy index.js"); - expect(std.out).toMatchInlineSnapshot(` - " - ⛅️ wrangler x.x.x - ────────────────── - Total Upload: xx KiB / gzip: xx KiB - Worker Startup Time: 100 ms - Uploaded test-name (TIMINGS) - Deployed test-name triggers (TIMINGS) - https://test-name.test-sub-domain.workers.dev - Consumer for queue1 - Current Version ID: Galaxy-Class" - `); - }); - - it("should update queue http consumers when one already exists for queue", async ({ + it("should reject http_pull consumer type in config", async ({ expect, }) => { writeWranglerConfig({ @@ -496,61 +440,16 @@ describe("deploy", () => { consumers: [ { queue: queueName, - type: "http_pull", + // Cast needed to simulate invalid user input that bypasses static type checking; runtime validation is what this test exercises + type: "http_pull" as "worker", }, ], }, }); await fs.promises.writeFile("index.js", `export default {};`); - mockSubDomainRequest(); - mockUploadWorkerRequest(); - const existingQueue: QueueResponse = { - queue_id: queueId, - queue_name: queueName, - created_on: "", - producers: [], - consumers: [ - { - type: "http_pull", - consumer_id: "queue1-consumer-id", - settings: {}, - }, - ], - producers_total_count: 0, - consumers_total_count: 0, - modified_on: "", - }; - mockGetQueueByName(queueName, existingQueue); - - msw.use( - http.put( - `*/accounts/:accountId/queues/:queueId/consumers/:consumerId`, - async ({ params }) => { - expect(params.queueId).toEqual(queueId); - expect(params.consumerId).toEqual("queue1-consumer-id"); - expect(params.accountId).toEqual("some-account-id"); - return HttpResponse.json({ - success: true, - errors: [], - messages: [], - result: null, - }); - } - ) + await expect(runWrangler("deploy index.js")).rejects.toThrowError( + /Only "worker" consumers can be configured in your Wrangler configuration/ ); - await runWrangler("deploy index.js"); - expect(std.out).toMatchInlineSnapshot(` - " - ⛅️ wrangler x.x.x - ────────────────── - Total Upload: xx KiB / gzip: xx KiB - Worker Startup Time: 100 ms - Uploaded test-name (TIMINGS) - Deployed test-name triggers (TIMINGS) - https://test-name.test-sub-domain.workers.dev - Consumer for queue1 - Current Version ID: Galaxy-Class" - `); }); it("should support queue consumer concurrency with a max concurrency specified", async ({ diff --git a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts index 33d60ca8f5..e56d0f44cc 100644 --- a/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts +++ b/packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts @@ -55,7 +55,8 @@ export async function convertToConfigBundle( if (trigger.type === "cron") { crons.push(trigger.cron); } else if (trigger.type === "queue-consumer") { - queueConsumers.push(trigger); + const { type: _, ...consumer } = trigger; + queueConsumers.push(consumer); } } if (event.bundle.entry.format === "service-worker") { diff --git a/packages/wrangler/src/deploy/deploy.ts b/packages/wrangler/src/deploy/deploy.ts index a807da43c7..03c7b8fc30 100644 --- a/packages/wrangler/src/deploy/deploy.ts +++ b/packages/wrangler/src/deploy/deploy.ts @@ -56,7 +56,6 @@ import { getQueue, postConsumer, putConsumer, - putConsumerById, } from "../queues/client"; import { parseBulkInputToObject } from "../secret"; import { syncWorkersSite } from "../sites"; @@ -1501,80 +1500,48 @@ export async function updateQueueConsumers( for (const consumer of consumers) { const queue = await getQueue(config, consumer.queue); - if (consumer.type === "http_pull") { - const body: PostTypedConsumerBody = { - type: consumer.type, - dead_letter_queue: consumer.dead_letter_queue, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - visibility_timeout_ms: consumer.visibility_timeout_ms, - retry_delay: consumer.retry_delay, - }, - }; - - const existingConsumer = queue.consumers && queue.consumers[0]; - if (existingConsumer) { - updateConsumers.push( - putConsumerById( - config, - queue.queue_id, - existingConsumer.consumer_id, - body - ).then(() => [`Consumer for ${consumer.queue}`]) - ); - continue; - } - updateConsumers.push( - postConsumer(config, consumer.queue, body).then(() => [ - `Consumer for ${consumer.queue}`, - ]) - ); - } else { - if (scriptName === undefined) { - // TODO: how can we reliably get the current script name? - throw new UserError( - "Script name is required to update queue consumers", - { telemetryMessage: true } - ); - } + if (scriptName === undefined) { + // TODO: how can we reliably get the current script name? + throw new UserError("Script name is required to update queue consumers", { + telemetryMessage: true, + }); + } - const body: PostTypedConsumerBody = { - type: "worker", - dead_letter_queue: consumer.dead_letter_queue, - script_name: scriptName, - settings: { - batch_size: consumer.max_batch_size, - max_retries: consumer.max_retries, - max_wait_time_ms: - consumer.max_batch_timeout !== undefined - ? 1000 * consumer.max_batch_timeout - : undefined, - max_concurrency: consumer.max_concurrency, - retry_delay: consumer.retry_delay, - }, - }; + const body: PostTypedConsumerBody = { + type: "worker", + dead_letter_queue: consumer.dead_letter_queue, + script_name: scriptName, + settings: { + batch_size: consumer.max_batch_size, + max_retries: consumer.max_retries, + max_wait_time_ms: + consumer.max_batch_timeout !== undefined + ? 1000 * consumer.max_batch_timeout + : undefined, + max_concurrency: consumer.max_concurrency, + retry_delay: consumer.retry_delay, + }, + }; - // Current script already assigned to queue? - const existingConsumer = - queue.consumers.filter( - (c) => c.script === scriptName || c.service === scriptName - ).length > 0; - const envName = undefined; // TODO: script environment for wrangler deploy? - if (existingConsumer) { - updateConsumers.push( - putConsumer(config, consumer.queue, scriptName, envName, body).then( - () => [`Consumer for ${consumer.queue}`] - ) - ); - continue; - } + // Current script already assigned to queue? + const existingConsumer = + queue.consumers.filter( + (c) => c.script === scriptName || c.service === scriptName + ).length > 0; + const envName = undefined; // TODO: script environment for wrangler deploy? + if (existingConsumer) { updateConsumers.push( - postConsumer(config, consumer.queue, body).then(() => [ - `Consumer for ${consumer.queue}`, - ]) + putConsumer(config, consumer.queue, scriptName, envName, body).then( + () => [`Consumer for ${consumer.queue}`] + ) ); + continue; } + updateConsumers.push( + postConsumer(config, consumer.queue, body).then(() => [ + `Consumer for ${consumer.queue}`, + ]) + ); } return updateConsumers;