diff --git a/.changeset/queue-broker-metrics-response.md b/.changeset/queue-broker-metrics-response.md new file mode 100644 index 0000000000..ae7c5da922 --- /dev/null +++ b/.changeset/queue-broker-metrics-response.md @@ -0,0 +1,7 @@ +--- +"miniflare": patch +--- + +Return metadata in queue broker response + +The queue broker's `/message` and `/batch` endpoints now return a JSON response body containing queue metrics (`backlogCount`, `backlogBytes`, `oldestMessageTimestamp`) instead of an empty response. A new `GET /metrics` endpoint is also added to support the `metrics()` API. diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index 61689d2148..b279ca14d0 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -2,6 +2,7 @@ import assert from "node:assert"; import { Buffer } from "node:buffer"; import { bold, green, grey, red, reset, yellow } from "kleur/colors"; import { + GET, HttpError, LogLevel, MiniflareDurableObject, @@ -149,13 +150,34 @@ function serialise(msg: QueueMessage): QueueOutgoingMessage { } class QueueMessage { + static #encoder = new TextEncoder(); + /** + * Message body size in bytes. This is an approximation of production behaviour and may not be the exact same. + */ + #bytes = 0; #failedAttempts = 0; constructor( readonly id: string, readonly timestamp: Date, readonly body: QueueBody - ) {} + ) { + this.#bytes = QueueMessage.#byteLength(body); + } + + static #byteLength(body: QueueBody): number { + switch (body.contentType) { + case "text": + return this.#encoder.encode(body.body).byteLength; + case "json": + return this.#encoder.encode(JSON.stringify(body.body)).byteLength; + case "bytes": + case "v8": + return body.body.byteLength; + default: + throw new Error(`Unexpected queue message contentType received`); + } + } incrementFailedAttempts(): number { return ++this.#failedAttempts; @@ -164,6 +186,10 @@ class QueueMessage { get failedAttempts() { return this.#failedAttempts; } + + get bytes() { + return this.#bytes; + } } function formatQueueResponse( @@ -203,6 +229,7 @@ export class QueueBrokerObject extends MiniflareDurableObject; readonly #messages: QueueMessage[] = []; #pendingFlush?: PendingFlush; + #backlogBytes = 0; constructor(state: DurableObjectState, env: QueueBrokerObjectEnv) { super(state, env); @@ -226,7 +253,11 @@ export class QueueBrokerObject extends MiniflareDurableObject { @@ -255,11 +287,23 @@ export class QueueBrokerObject extends MiniflareDurableObject total + msg.bytes, 0); + const metadata: MessageBatchMetadata = { + metrics: { + backlogCount: this.#messages.length, + backlogBytes: this.#backlogBytes, + oldestMessageTimestamp: this.#messages[0]?.timestamp, + }, + }; const startTime = Date.now(); let endTime: number; let response: FetcherQueueResult; try { - response = await this.#dispatchBatch(consumer.workerName, batch); + response = await this.#dispatchBatch( + consumer.workerName, + batch, + metadata + ); endTime = Date.now(); } catch (e: any) { endTime = Date.now(); @@ -290,6 +334,7 @@ export class QueueBrokerObject extends MiniflareDurableObject { this.#messages.push(message); + this.#backlogBytes += message.bytes; this.#ensurePendingFlush(); }; const delay = retryMessages.get(message.id) ?? globalDelay; @@ -372,6 +417,7 @@ export class QueueBrokerObject extends MiniflareDurableObject { this.#messages.push(msg); + this.#backlogBytes += msg.bytes; this.#ensurePendingFlush(); }; @@ -380,11 +426,25 @@ export class QueueBrokerObject extends MiniflareDurableObject { + const messageResponse = { + metadata: { + metrics: this.#getMetricsResponseObject(), + }, + }; + // If we don't have a consumer, drop the message const consumer = this.#maybeConsumer; - if (consumer === undefined) return new Response(); + if (consumer === undefined) return Response.json(messageResponse); validateMessageSize(req.headers); const contentType = validateContentType(req.headers); @@ -396,14 +456,20 @@ export class QueueBrokerObject extends MiniflareDurableObject { + const batchResponse = { + metadata: { + metrics: this.#getMetricsResponseObject(), + }, + }; + // If we don't have a consumer, drop the message const consumer = this.#maybeConsumer; - if (consumer === undefined) return new Response(); + if (consumer === undefined) return Response.json(batchResponse); // NOTE: this endpoint is also used when moving messages to the dead-letter // queue. In this case, size headers won't be added and this validation is @@ -415,6 +481,11 @@ export class QueueBrokerObject extends MiniflareDurableObject { + return Response.json(this.#getMetricsResponseObject()); }; } diff --git a/packages/miniflare/test/plugins/queues/index.spec.ts b/packages/miniflare/test/plugins/queues/index.spec.ts index a15ffc6cca..ad2532b605 100644 --- a/packages/miniflare/test/plugins/queues/index.spec.ts +++ b/packages/miniflare/test/plugins/queues/index.spec.ts @@ -947,6 +947,96 @@ test("supports message contentTypes", async ({ expect }) => { ]); }); +test("supports metadata in send() response", async ({ expect }) => { + const mf = new Miniflare({ + workers: [ + { + name: "producer", + queueProducers: ["QUEUE"], + compatibilityFlags: ["experimental"], + modules: true, + script: `export default { + async fetch(request, env, ctx) { + const resp = await env.QUEUE.send("msg"); + return Response.json(resp); + } + }`, + }, + { + name: "consumer", + queueConsumers: ["QUEUE"], + modules: true, + script: `export default { + async queue(batch, env, ctx) {} + }`, + }, + ], + }); + useDispose(mf); + const object = await getControlStub(mf, "QUEUE"); + + const first = await mf.dispatchFetch("http://localhost"); + await first.arrayBuffer(); + await object.waitForFakeTasks(); + + const res = await mf.dispatchFetch("http://localhost"); + const body = await res.json(); + expect(body).toEqual({ + metadata: { + metrics: { + backlogCount: 1, + backlogBytes: expect.any(Number), + oldestMessageTimestamp: new Date(1_000_000).toISOString(), + }, + }, + }); +}); + +test("supports metadata in sendBatch() response", async ({ expect }) => { + const mf = new Miniflare({ + workers: [ + { + name: "producer", + queueProducers: ["QUEUE"], + compatibilityFlags: ["experimental"], + modules: true, + script: `export default { + async fetch(request, env, ctx) { + const resp = await env.QUEUE.sendBatch([{ body: "msg1" }, { body: "msg2" }]); + return Response.json(resp); + } + }`, + }, + { + name: "consumer", + queueConsumers: ["QUEUE"], + modules: true, + script: `export default { + async queue(batch, env, ctx) {} + }`, + }, + ], + }); + useDispose(mf); + const object = await getControlStub(mf, "QUEUE"); + + const first = await mf.dispatchFetch("http://localhost"); + await first.arrayBuffer(); + await object.waitForFakeTasks(); + + const res = await mf.dispatchFetch("http://localhost"); + const body = await res.json(); + expect(body).toEqual({ + metadata: { + metrics: { + backlogCount: 2, + backlogBytes: expect.any(Number), + oldestMessageTimestamp: new Date(1_000_000).toISOString(), + }, + }, + }); +}); + test("validates message size", async ({ expect }) => { const mf = new Miniflare({ queueProducers: { QUEUE: "MY_QUEUE" },