From fa9ae20e2079b0c7af8164a71310c88f2f933572 Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Mon, 6 Apr 2026 16:46:17 -0500 Subject: [PATCH 1/5] MQ-1273: Add metadata with metrics to queue broker response in miniflare --- .changeset/queue-broker-metrics-response.md | 7 ++ .../src/workers/queues/broker.worker.ts | 91 +++++++++++++++++-- .../test/plugins/queues/index.spec.ts | 90 ++++++++++++++++++ 3 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 .changeset/queue-broker-metrics-response.md diff --git a/.changeset/queue-broker-metrics-response.md b/.changeset/queue-broker-metrics-response.md new file mode 100644 index 0000000000..1aae17b86d --- /dev/null +++ b/.changeset/queue-broker-metrics-response.md @@ -0,0 +1,7 @@ +--- +"miniflare": patch +--- + +# Return metadata in queue broker response + +The queue broker's `/message` and `/batch` endpoints now return a JSON response body containing queue metrics (`backlogCount`, `backlogBytes`, `oldestMessageTimestamp`) instead of an empty response. A new `GET /metrics` endpoint is also added to support the `metrics()` API. diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index 61689d2148..4621ff0dcb 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -2,6 +2,7 @@ import assert from "node:assert"; import { Buffer } from "node:buffer"; import { bold, green, grey, red, reset, yellow } from "kleur/colors"; import { + GET, HttpError, LogLevel, MiniflareDurableObject, @@ -149,13 +150,34 @@ function serialise(msg: QueueMessage): QueueOutgoingMessage { } class QueueMessage { + static #encoder = new TextEncoder(); + /** + * Message body size in bytes. This is an approximation of production behaviour and may not be the exact same. + */ + #bytes = 0; #failedAttempts = 0; constructor( readonly id: string, readonly timestamp: Date, readonly body: QueueBody - ) {} + ) { + this.#bytes = QueueMessage.#byteLength(body); + } + + static #byteLength(body: QueueBody): number { + switch (body.contentType) { + case "text": + return this.#encoder.encode(body.body).byteLength; + case "json": + return this.#encoder.encode(JSON.stringify(body.body)).byteLength; + case "bytes": + case "v8": + return body.body.byteLength; + default: + assert(false); + } + } incrementFailedAttempts(): number { return ++this.#failedAttempts; @@ -164,6 +186,10 @@ class QueueMessage { get failedAttempts() { return this.#failedAttempts; } + + get bytes() { + return this.#bytes; + } } function formatQueueResponse( @@ -203,6 +229,7 @@ export class QueueBrokerObject extends MiniflareDurableObject; readonly #messages: QueueMessage[] = []; #pendingFlush?: PendingFlush; + #backlogBytes = 0; constructor(state: DurableObjectState, env: QueueBrokerObjectEnv) { super(state, env); @@ -226,7 +253,11 @@ export class QueueBrokerObject extends MiniflareDurableObject { @@ -255,11 +287,22 @@ export class QueueBrokerObject extends MiniflareDurableObject { this.#messages.push(message); + this.#backlogBytes += message.bytes; this.#ensurePendingFlush(); }; const delay = retryMessages.get(message.id) ?? globalDelay; @@ -309,6 +353,7 @@ export class QueueBrokerObject extends MiniflareDurableObject total + msg.bytes, 0); await this.logWithLevel( LogLevel.INFO, formatQueueResponse(this.name, acked, batch.length, endTime - startTime) @@ -372,6 +417,7 @@ export class QueueBrokerObject extends MiniflareDurableObject { this.#messages.push(msg); + this.#backlogBytes += msg.bytes; this.#ensurePendingFlush(); }; @@ -382,9 +428,19 @@ export class QueueBrokerObject extends MiniflareDurableObject { + const messageResponse = { + metadata: { + metrics: { + backlogCount: this.#messages.length, + backlogBytes: this.#backlogBytes, + oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0, + }, + }, + }; + // If we don't have a consumer, drop the message const consumer = this.#maybeConsumer; - if (consumer === undefined) return new Response(); + if (consumer === undefined) return Response.json(messageResponse); validateMessageSize(req.headers); const contentType = validateContentType(req.headers); @@ -396,14 +452,24 @@ export class QueueBrokerObject extends MiniflareDurableObject { + const batchResponse = { + metadata: { + metrics: { + backlogCount: this.#messages.length, + backlogBytes: this.#backlogBytes, + oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0, + }, + }, + }; + // If we don't have a consumer, drop the message const consumer = this.#maybeConsumer; - if (consumer === undefined) return new Response(); + if (consumer === undefined) return Response.json(batchResponse); // NOTE: this endpoint is also used when moving messages to the dead-letter // queue. In this case, size headers won't be added and this validation is @@ -415,6 +481,15 @@ export class QueueBrokerObject extends MiniflareDurableObject { + return Response.json({ + backlogCount: this.#messages.length, + backlogBytes: this.#backlogBytes, + oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0, + }); }; } diff --git a/packages/miniflare/test/plugins/queues/index.spec.ts b/packages/miniflare/test/plugins/queues/index.spec.ts index a15ffc6cca..ad2532b605 100644 --- a/packages/miniflare/test/plugins/queues/index.spec.ts +++ b/packages/miniflare/test/plugins/queues/index.spec.ts @@ -947,6 +947,96 @@ test("supports message contentTypes", async ({ expect }) => { ]); }); +test("supports metadata in send() response", async ({ expect }) => { + const mf = new Miniflare({ + workers: [ + { + name: "producer", + queueProducers: ["QUEUE"], + compatibilityFlags: ["experimental"], + modules: true, + script: `export default { + async fetch(request, env, ctx) { + const resp = await env.QUEUE.send("msg"); + return Response.json(resp); + } + }`, + }, + { + name: "consumer", + queueConsumers: ["QUEUE"], + modules: true, + script: `export default { + async queue(batch, env, ctx) {} + }`, + }, + ], + }); + useDispose(mf); + const object = await getControlStub(mf, "QUEUE"); + + const first = await mf.dispatchFetch("http://localhost"); + await first.arrayBuffer(); + await object.waitForFakeTasks(); + + const res = await mf.dispatchFetch("http://localhost"); + const body = await res.json(); + expect(body).toEqual({ + metadata: { + metrics: { + backlogCount: 1, + backlogBytes: expect.any(Number), + oldestMessageTimestamp: new Date(1_000_000).toISOString(), + }, + }, + }); +}); + +test("supports metadata in sendBatch() response", async ({ expect }) => { + const mf = new Miniflare({ + workers: [ + { + name: "producer", + queueProducers: ["QUEUE"], + compatibilityFlags: ["experimental"], + modules: true, + script: `export default { + async fetch(request, env, ctx) { + const resp = await env.QUEUE.sendBatch([{ body: "msg1" }, { body: "msg2" }]); + return Response.json(resp); + } + }`, + }, + { + name: "consumer", + queueConsumers: ["QUEUE"], + modules: true, + script: `export default { + async queue(batch, env, ctx) {} + }`, + }, + ], + }); + useDispose(mf); + const object = await getControlStub(mf, "QUEUE"); + + const first = await mf.dispatchFetch("http://localhost"); + await first.arrayBuffer(); + await object.waitForFakeTasks(); + + const res = await mf.dispatchFetch("http://localhost"); + const body = await res.json(); + expect(body).toEqual({ + metadata: { + metrics: { + backlogCount: 2, + backlogBytes: expect.any(Number), + oldestMessageTimestamp: new Date(1_000_000).toISOString(), + }, + }, + }); +}); + test("validates message size", async ({ expect }) => { const mf = new Miniflare({ queueProducers: { QUEUE: "MY_QUEUE" }, From 1c35363a8c40cf60f1a258a8925d2955227e99c2 Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Tue, 7 Apr 2026 20:10:40 -0500 Subject: [PATCH 2/5] Address bonk comments --- .changeset/queue-broker-metrics-response.md | 2 +- packages/miniflare/src/workers/queues/broker.worker.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.changeset/queue-broker-metrics-response.md b/.changeset/queue-broker-metrics-response.md index 1aae17b86d..ae7c5da922 100644 --- a/.changeset/queue-broker-metrics-response.md +++ b/.changeset/queue-broker-metrics-response.md @@ -2,6 +2,6 @@ "miniflare": patch --- -# Return metadata in queue broker response +Return metadata in queue broker response The queue broker's `/message` and `/batch` endpoints now return a JSON response body containing queue metrics (`backlogCount`, `backlogBytes`, `oldestMessageTimestamp`) instead of an empty response. A new `GET /metrics` endpoint is also added to support the `metrics()` API. diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index 4621ff0dcb..6418503837 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -287,6 +287,7 @@ export class QueueBrokerObject extends MiniflareDurableObject total + msg.bytes, 0); const metadata: MessageBatchMetadata = { metrics: { backlogCount: this.#messages.length, @@ -353,7 +354,6 @@ export class QueueBrokerObject extends MiniflareDurableObject total + msg.bytes, 0); await this.logWithLevel( LogLevel.INFO, formatQueueResponse(this.name, acked, batch.length, endTime - startTime) From eb880f8cb202a32f7d4f15d2549af2c27c128251 Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Thu, 9 Apr 2026 09:32:23 -0500 Subject: [PATCH 3/5] Add explicit error on message byte length parse --- packages/miniflare/src/workers/queues/broker.worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index 6418503837..e136872ff0 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -175,7 +175,7 @@ class QueueMessage { case "v8": return body.body.byteLength; default: - assert(false); + throw new Error(`Unexpected queue message contentType received`); } } From b267d306eb41e62e0f80748c174b13218a2115aa Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Thu, 9 Apr 2026 09:59:56 -0500 Subject: [PATCH 4/5] Add queues metrics response helper for repeated code --- .../src/workers/queues/broker.worker.ts | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index e136872ff0..f14b999ea6 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -426,15 +426,19 @@ export class QueueBrokerObject extends MiniflareDurableObject { const messageResponse = { metadata: { - metrics: { - backlogCount: this.#messages.length, - backlogBytes: this.#backlogBytes, - oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0, - }, + metrics: this.#getMetricsResponseObject(), }, }; @@ -459,11 +463,7 @@ export class QueueBrokerObject extends MiniflareDurableObject { const batchResponse = { metadata: { - metrics: { - backlogCount: this.#messages.length, - backlogBytes: this.#backlogBytes, - oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0, - }, + metrics: this.#getMetricsResponseObject(), }, }; @@ -486,10 +486,6 @@ export class QueueBrokerObject extends MiniflareDurableObject { - return Response.json({ - backlogCount: this.#messages.length, - backlogBytes: this.#backlogBytes, - oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0, - }); + return Response.json(this.#getMetricsResponseObject()); }; } From c22ddbffa3510414ee762cc5d1ead4c02fc80f6e Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Thu, 9 Apr 2026 10:08:43 -0500 Subject: [PATCH 5/5] Format code --- packages/miniflare/src/workers/queues/broker.worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/miniflare/src/workers/queues/broker.worker.ts b/packages/miniflare/src/workers/queues/broker.worker.ts index f14b999ea6..b279ca14d0 100644 --- a/packages/miniflare/src/workers/queues/broker.worker.ts +++ b/packages/miniflare/src/workers/queues/broker.worker.ts @@ -431,7 +431,7 @@ export class QueueBrokerObject extends MiniflareDurableObject