Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/validate-queue-consumer-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"wrangler": patch
"@cloudflare/workers-utils": patch
---

Validate that queue consumers in wrangler config only use the "worker" type

Previously, non-worker consumer types (e.g. `http_pull`) could be specified in the `queues.consumers` config. Now, wrangler will error if a consumer `type` other than `"worker"` is specified in the config file.

To configure non-worker consumer types, use the `wrangler queues consumer` CLI commands instead (e.g. `wrangler queues consumer http-pull add`).
4 changes: 2 additions & 2 deletions packages/workers-utils/src/config/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,8 @@ export interface EnvironmentNonInheritable {
/** The name of the queue from which this consumer should consume. */
queue: string;

/** The consumer type, e.g., worker, http-pull, r2-bucket, etc. Default is worker. */
type?: string;
/** The consumer type. Only "worker" is supported in wrangler config. Default is "worker". */
type?: "worker";

/** The maximum number of messages per batch */
max_batch_size?: number;
Expand Down
10 changes: 10 additions & 0 deletions packages/workers-utils/src/config/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,16 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => {
);
}

// Validate that consumer type, if specified, is "worker".
// Non-worker consumer types (e.g., "http_pull") cannot be configured via
// wrangler config. Use `wrangler queues consumer http add` instead.
if ("type" in value && value.type !== undefined && value.type !== "worker") {
diagnostics.errors.push(
`"${field}.type" has an invalid value "${value.type}". Only "worker" consumers can be configured in your Wrangler configuration.`
);
isValid = false;
}

const options: {
key: string;
type: "number" | "string" | "boolean";
Expand Down
2 changes: 1 addition & 1 deletion packages/workers-utils/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ export type Trigger =
| ({ type: "route" } & ZoneNameRoute)
| ({ type: "route" } & CustomDomainRoute)
| { type: "cron"; cron: string }
| ({ type: "queue-consumer" } & QueueConsumer);
| ({ type: "queue-consumer" } & Omit<QueueConsumer, "type">);

type BindingOmit<T> = Omit<T, "binding">;
type NameOmit<T> = Omit<T, "name">;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3432,6 +3432,47 @@ describe("normalizeAndValidateConfig()", () => {
`);
});

it("should error if queues consumer type is not 'worker'", ({
expect,
}) => {
const { diagnostics } = normalizeAndValidateConfig(
{
queues: {
consumers: [
{ queue: "myQueue", type: "http_pull" },
{ queue: "myQueue2", type: "r2_bucket" },
],
},
} as unknown as RawConfig,
undefined,
undefined,
{ env: undefined }
);

expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
"Processing wrangler configuration:
- "queues.consumers[0].type" has an invalid value "http_pull". Only "worker" consumers can be configured in your Wrangler configuration.
- "queues.consumers[1].type" has an invalid value "r2_bucket". Only "worker" consumers can be configured in your Wrangler configuration."
`);
});

it("should allow queues consumer type 'worker' explicitly", ({
expect,
}) => {
const { diagnostics } = normalizeAndValidateConfig(
{
queues: {
consumers: [{ queue: "myQueue", type: "worker" }],
},
} as unknown as RawConfig,
undefined,
undefined,
{ env: undefined }
);

expect(diagnostics.hasErrors()).toBe(false);
});

it("should error if queues consumers are not valid", ({ expect }) => {
const { config, diagnostics } = normalizeAndValidateConfig(
{
Expand Down
26 changes: 0 additions & 26 deletions packages/wrangler/src/__tests__/deploy/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,32 +657,6 @@ export function mockPostConsumerById(
return requests;
}

export function mockPostQueueHTTPConsumer(
expectedQueueId: string,
expectedBody: PostTypedConsumerBody
) {
const requests = { count: 0 };
msw.use(
http.post(
`*/accounts/:accountId/queues/:queueId/consumers`,
async ({ request, params }) => {
const body = await request.json();
expect(params.queueId).toEqual(expectedQueueId);
expect(params.accountId).toEqual("some-account-id");
expect(body).toEqual(expectedBody);
requests.count += 1;
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: {},
});
}
)
);
return requests;
}

export const mockAUSRequest = async (
bodies?: AssetManifest[],
buckets: string[][] = [[]],
Expand Down
111 changes: 5 additions & 106 deletions packages/wrangler/src/__tests__/deploy/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
mockLastDeploymentRequest,
mockPatchScriptSettings,
mockPostConsumerById,
mockPostQueueHTTPConsumer,
mockPutQueueConsumerById,
} from "./helpers";
import type { QueueResponse } from "../../queues/client";
Expand Down Expand Up @@ -433,124 +432,24 @@ describe("deploy", () => {
`);
});

it("should post queue http consumers on deploy", async ({ expect }) => {
writeWranglerConfig({
queues: {
consumers: [
{
queue: queueName,
type: "http_pull",
dead_letter_queue: "myDLQ",
max_batch_size: 5,
visibility_timeout_ms: 4000,
max_retries: 10,
retry_delay: 1,
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
const existingQueue: QueueResponse = {
queue_id: queueId,
queue_name: queueName,
created_on: "",
producers: [],
consumers: [],
producers_total_count: 0,
consumers_total_count: 0,
modified_on: "",
};
mockGetQueueByName(queueName, existingQueue);
mockPostQueueHTTPConsumer(queueId, {
type: "http_pull",
dead_letter_queue: "myDLQ",
settings: {
batch_size: 5,
max_retries: 10,
visibility_timeout_ms: 4000,
retry_delay: 1,
},
});
await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"
⛅️ wrangler x.x.x
──────────────────
Total Upload: xx KiB / gzip: xx KiB
Worker Startup Time: 100 ms
Uploaded test-name (TIMINGS)
Deployed test-name triggers (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Consumer for queue1
Current Version ID: Galaxy-Class"
`);
});

it("should update queue http consumers when one already exists for queue", async ({
it("should reject http_pull consumer type in config", async ({
expect,
}) => {
writeWranglerConfig({
queues: {
consumers: [
{
queue: queueName,
type: "http_pull",
// Cast needed to simulate invalid user input that bypasses static type checking; runtime validation is what this test exercises
type: "http_pull" as "worker",
},
],
},
});
await fs.promises.writeFile("index.js", `export default {};`);
mockSubDomainRequest();
mockUploadWorkerRequest();
const existingQueue: QueueResponse = {
queue_id: queueId,
queue_name: queueName,
created_on: "",
producers: [],
consumers: [
{
type: "http_pull",
consumer_id: "queue1-consumer-id",
settings: {},
},
],
producers_total_count: 0,
consumers_total_count: 0,
modified_on: "",
};
mockGetQueueByName(queueName, existingQueue);

msw.use(
http.put(
`*/accounts/:accountId/queues/:queueId/consumers/:consumerId`,
async ({ params }) => {
expect(params.queueId).toEqual(queueId);
expect(params.consumerId).toEqual("queue1-consumer-id");
expect(params.accountId).toEqual("some-account-id");
return HttpResponse.json({
success: true,
errors: [],
messages: [],
result: null,
});
}
)
await expect(runWrangler("deploy index.js")).rejects.toThrowError(
/Only "worker" consumers can be configured in your Wrangler configuration/
);
await runWrangler("deploy index.js");
expect(std.out).toMatchInlineSnapshot(`
"
⛅️ wrangler x.x.x
──────────────────
Total Upload: xx KiB / gzip: xx KiB
Worker Startup Time: 100 ms
Uploaded test-name (TIMINGS)
Deployed test-name triggers (TIMINGS)
https://test-name.test-sub-domain.workers.dev
Consumer for queue1
Current Version ID: Galaxy-Class"
`);
});

it("should support queue consumer concurrency with a max concurrency specified", async ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ export async function convertToConfigBundle(
if (trigger.type === "cron") {
crons.push(trigger.cron);
} else if (trigger.type === "queue-consumer") {
queueConsumers.push(trigger);
const { type: _, ...consumer } = trigger;
queueConsumers.push(consumer);
}
}
if (event.bundle.entry.format === "service-worker") {
Expand Down
Loading
Loading