Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/queue-broker-metrics-response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"miniflare": patch
---

Return metadata in queue broker response

The queue broker's `/message` and `/batch` endpoints now return a JSON response body containing queue metrics (`backlogCount`, `backlogBytes`, `oldestMessageTimestamp`) instead of an empty response. A new `GET /metrics` endpoint is also added to support the `metrics()` API.
87 changes: 79 additions & 8 deletions packages/miniflare/src/workers/queues/broker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import assert from "node:assert";
import { Buffer } from "node:buffer";
import { bold, green, grey, red, reset, yellow } from "kleur/colors";
import {
GET,
HttpError,
LogLevel,
MiniflareDurableObject,
Expand Down Expand Up @@ -149,13 +150,34 @@ function serialise(msg: QueueMessage): QueueOutgoingMessage {
}

class QueueMessage {
static #encoder = new TextEncoder();
/**
* Message body size in bytes. This is an approximation of production behaviour and may not be the exact same.
*/
#bytes = 0;
#failedAttempts = 0;

constructor(
readonly id: string,
readonly timestamp: Date,
readonly body: QueueBody
) {}
) {
this.#bytes = QueueMessage.#byteLength(body);
}

static #byteLength(body: QueueBody): number {
switch (body.contentType) {
case "text":
return this.#encoder.encode(body.body).byteLength;
case "json":
return this.#encoder.encode(JSON.stringify(body.body)).byteLength;
case "bytes":
case "v8":
return body.body.byteLength;
default:
throw new Error(`Unexpected queue message contentType received`);
}
}

incrementFailedAttempts(): number {
return ++this.#failedAttempts;
Expand All @@ -164,6 +186,10 @@ class QueueMessage {
get failedAttempts() {
return this.#failedAttempts;
}

get bytes() {
return this.#bytes;
}
}

function formatQueueResponse(
Expand Down Expand Up @@ -203,6 +229,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
readonly #consumers: Record<string, QueueConsumer | undefined>;
readonly #messages: QueueMessage[] = [];
#pendingFlush?: PendingFlush;
#backlogBytes = 0;

constructor(state: DurableObjectState, env: QueueBrokerObjectEnv) {
super(state, env);
Expand All @@ -226,7 +253,11 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
return this.#consumers[this.name];
}

#dispatchBatch(workerName: string, batch: QueueMessage[]) {
#dispatchBatch(
workerName: string,
batch: QueueMessage[],
metadata?: MessageBatchMetadata
) {
const bindingName =
`${QueueBindings.SERVICE_WORKER_PREFIX}${workerName}` as const;
const maybeService = this.env[bindingName];
Expand All @@ -242,7 +273,8 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
return { id, timestamp, body: body.body, attempts };
}
});
return maybeService.queue(this.name, messages);

return maybeService.queue(this.name, messages, metadata);
}

#flush = async () => {
Expand All @@ -255,11 +287,23 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

// Extract and dispatch a batch
const batch = this.#messages.splice(0, batchSize);
this.#backlogBytes -= batch.reduce((total, msg) => total + msg.bytes, 0);
const metadata: MessageBatchMetadata = {
metrics: {
backlogCount: this.#messages.length,
backlogBytes: this.#backlogBytes,
oldestMessageTimestamp: this.#messages[0]?.timestamp,
Comment thread
dario-piotrowicz marked this conversation as resolved.
},
};
Comment thread
KennethRuan marked this conversation as resolved.
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
const startTime = Date.now();
let endTime: number;
let response: FetcherQueueResult;
try {
response = await this.#dispatchBatch(consumer.workerName, batch);
response = await this.#dispatchBatch(
consumer.workerName,
batch,
metadata
);
endTime = Date.now();
} catch (e: any) {
endTime = Date.now();
Expand Down Expand Up @@ -290,6 +334,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

const fn = () => {
this.#messages.push(message);
this.#backlogBytes += message.bytes;
this.#ensurePendingFlush();
};
const delay = retryMessages.get(message.id) ?? globalDelay;
Expand Down Expand Up @@ -372,6 +417,7 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE

const fn = () => {
this.#messages.push(msg);
this.#backlogBytes += msg.bytes;
this.#ensurePendingFlush();
};

Expand All @@ -380,11 +426,25 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
}
}

#getMetricsResponseObject() {
return {
backlogCount: this.#messages.length,
backlogBytes: this.#backlogBytes,
oldestMessageTimestamp: this.#messages[0]?.timestamp.getTime() ?? 0,
Comment thread
KennethRuan marked this conversation as resolved.
};
}

@POST("/message")
message: RouteHandler = async (req) => {
const messageResponse = {
metadata: {
metrics: this.#getMetricsResponseObject(),
},
};

// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();
if (consumer === undefined) return Response.json(messageResponse);

validateMessageSize(req.headers);
const contentType = validateContentType(req.headers);
Expand All @@ -396,14 +456,20 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
[{ contentType, delaySecs: delay, body }],
this.#maybeProducer?.deliveryDelay
);
return new Response();
return Response.json(messageResponse);
};

@POST("/batch")
batch: RouteHandler = async (req) => {
const batchResponse = {
metadata: {
metrics: this.#getMetricsResponseObject(),
},
};

// If we don't have a consumer, drop the message
const consumer = this.#maybeConsumer;
if (consumer === undefined) return new Response();
if (consumer === undefined) return Response.json(batchResponse);

// NOTE: this endpoint is also used when moving messages to the dead-letter
// queue. In this case, size headers won't be added and this validation is
Expand All @@ -415,6 +481,11 @@ export class QueueBrokerObject extends MiniflareDurableObject<QueueBrokerObjectE
const body = QueuesBatchRequestSchema.parse(await req.json());

this.#enqueue(body.messages, delay);
return new Response();
return Response.json(batchResponse);
};

@GET("/metrics")
metrics: RouteHandler = async (_req) => {
return Response.json(this.#getMetricsResponseObject());
};
}
90 changes: 90 additions & 0 deletions packages/miniflare/test/plugins/queues/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,96 @@ test("supports message contentTypes", async ({ expect }) => {
]);
});

test("supports metadata in send() response", async ({ expect }) => {
const mf = new Miniflare({
workers: [
{
name: "producer",
queueProducers: ["QUEUE"],
compatibilityFlags: ["experimental"],
modules: true,
script: `export default {
async fetch(request, env, ctx) {
const resp = await env.QUEUE.send("msg");
return Response.json(resp);
}
}`,
},
{
name: "consumer",
queueConsumers: ["QUEUE"],
modules: true,
script: `export default {
async queue(batch, env, ctx) {}
}`,
},
],
});
useDispose(mf);
const object = await getControlStub(mf, "QUEUE");

const first = await mf.dispatchFetch("http://localhost");
await first.arrayBuffer();
await object.waitForFakeTasks();

const res = await mf.dispatchFetch("http://localhost");
const body = await res.json();
expect(body).toEqual({
metadata: {
metrics: {
backlogCount: 1,
backlogBytes: expect.any(Number),
oldestMessageTimestamp: new Date(1_000_000).toISOString(),
},
},
});
});

test("supports metadata in sendBatch() response", async ({ expect }) => {
const mf = new Miniflare({
workers: [
{
name: "producer",
queueProducers: ["QUEUE"],
compatibilityFlags: ["experimental"],
modules: true,
script: `export default {
async fetch(request, env, ctx) {
const resp = await env.QUEUE.sendBatch([{ body: "msg1" }, { body: "msg2" }]);
return Response.json(resp);
}
}`,
},
{
name: "consumer",
queueConsumers: ["QUEUE"],
modules: true,
script: `export default {
async queue(batch, env, ctx) {}
}`,
},
],
});
useDispose(mf);
const object = await getControlStub(mf, "QUEUE");

const first = await mf.dispatchFetch("http://localhost");
await first.arrayBuffer();
await object.waitForFakeTasks();

const res = await mf.dispatchFetch("http://localhost");
const body = await res.json();
expect(body).toEqual({
metadata: {
metrics: {
backlogCount: 2,
backlogBytes: expect.any(Number),
oldestMessageTimestamp: new Date(1_000_000).toISOString(),
},
},
});
});

test("validates message size", async ({ expect }) => {
const mf = new Miniflare({
queueProducers: { QUEUE: "MY_QUEUE" },
Expand Down
Loading