From 1c84049c69abe6b0de7988e1441ccff8a794d93b Mon Sep 17 00:00:00 2001 From: Kenneth Ruan Date: Mon, 2 Mar 2026 11:30:20 -0600 Subject: [PATCH] Add metrics() method to Queue binding --- src/workerd/api/queue.c++ | 32 ++++++++++++++ src/workerd/api/queue.h | 43 ++++++++++++++----- src/workerd/api/tests/BUILD.bazel | 6 +++ .../api/tests/queue-error-codes-test.js | 22 ++++++++++ src/workerd/api/tests/queue-metrics-test.js | 35 +++++++++++++++ .../api/tests/queue-metrics-test.wd-test | 30 +++++++++++++ src/workerd/api/tests/queue-test.js | 3 +- src/workerd/api/tests/queue-test.wd-test | 4 +- .../experimental/index.d.ts | 6 +++ .../generated-snapshot/experimental/index.ts | 6 +++ 10 files changed, 174 insertions(+), 13 deletions(-) create mode 100644 src/workerd/api/tests/queue-metrics-test.js create mode 100644 src/workerd/api/tests/queue-metrics-test.wd-test diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index e329f6a27c4..9cc76c906f0 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -362,6 +362,38 @@ kj::Promise WorkerQueue::sendBatch(jsg::Lock& js, .attach(context.registerPendingEvent()); }; +jsg::Promise WorkerQueue::metrics( + jsg::Lock& js, const jsg::TypeHandler& metricsHandler) { + auto& context = IoContext::current(); + + auto headers = kj::HttpHeaders(context.getHeaderTable()); + + auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_metrics"_kjc); + auto req = client->request(kj::HttpMethod::GET, "https://fake-host/metrics"_kjc, headers); + const auto& headerIds = context.getHeaderIds(); + + static constexpr auto handleMetrics = [](auto req, auto client, + auto& headerIds) -> kj::Promise { + auto response = co_await req.response; + + JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds)); + + co_return co_await response.body->readAllText(); + }; + + auto promise = handleMetrics(kj::mv(req), kj::mv(client), headerIds); + + return context.awaitIo( + js, kj::mv(promise), [&metricsHandler](jsg::Lock& js, kj::String text) -> Metrics { + auto parsed = jsg::JsValue::fromJson(js, text); + KJ_IF_SOME(result, metricsHandler.tryUnwrap(js, parsed)) { + return kj::mv(result); + } + _JSG_INTERNAL_FAIL_REQUIRE( + JSG_EXCEPTION(Error), "Failed to parse queue metrics response", text); + }); +} + QueueMessage::QueueMessage( jsg::Lock& js, rpc::QueueMessage::Reader message, IoPtr result) : id(kj::str(message.getId())), diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index cd424871686..3b785b1b511 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -27,6 +27,14 @@ class WorkerQueue: public jsg::Object { // representing this queue. WorkerQueue(uint subrequestChannel): subrequestChannel(subrequestChannel) {} + struct Metrics { + double backlogCount; + double backlogBytes; + double oldestMessageTimestamp; + JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp); + JSG_STRUCT_TS_OVERRIDE(QueueMetrics); + }; + struct SendOptions { // TODO(soon): Support metadata. @@ -73,17 +81,32 @@ class WorkerQueue: public jsg::Object { jsg::Sequence batch, jsg::Optional options); - JSG_RESOURCE_TYPE(WorkerQueue) { + jsg::Promise metrics(jsg::Lock& js, const jsg::TypeHandler& metricsHandler); + + JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) { JSG_METHOD(send); JSG_METHOD(sendBatch); + if (flags.getWorkerdExperimental()) { + JSG_METHOD(metrics); + } JSG_TS_ROOT(); - JSG_TS_OVERRIDE(Queue { - send(message: Body, options?: QueueSendOptions): Promise; - sendBatch(messages - : Iterable>, options ?: QueueSendBatchOptions) - : Promise; - }); + if (flags.getWorkerdExperimental()) { + JSG_TS_OVERRIDE(Queue { + send(message: Body, options?: QueueSendOptions): Promise; + sendBatch(messages + : Iterable>, options ?: QueueSendBatchOptions) + : Promise; + metrics(): Promise; + }); + } else { + JSG_TS_OVERRIDE(Queue { + send(message: Body, options?: QueueSendOptions): Promise; + sendBatch(messages + : Iterable>, options ?: QueueSendBatchOptions) + : Promise; + }); + } JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8"); } @@ -377,8 +400,8 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re #define EW_QUEUE_ISOLATE_TYPES \ api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \ - api::WorkerQueue::MessageSendRequest, api::IncomingQueueMessage, api::QueueRetryBatch, \ - api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \ - api::QueueEvent, api::QueueController, api::QueueExportedHandler + api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::IncomingQueueMessage, \ + api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, \ + api::QueueMessage, api::QueueEvent, api::QueueController, api::QueueExportedHandler } // namespace workerd::api diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index ffcc9892fc5..af86bd03acf 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -119,6 +119,12 @@ wd_test( ], ) +wd_test( + src = "queue-metrics-test.wd-test", + args = ["--experimental"], + data = ["queue-metrics-test.js"], +) + wd_test( src = "r2-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/queue-error-codes-test.js b/src/workerd/api/tests/queue-error-codes-test.js index 19acd9ceea5..ecce189b653 100644 --- a/src/workerd/api/tests/queue-error-codes-test.js +++ b/src/workerd/api/tests/queue-error-codes-test.js @@ -62,6 +62,17 @@ export default { return new Response(''); } + if (pathname === '/metrics') { + // Return error with headers defined for metrics + return new Response('', { + status: 503, + headers: { + 'CF-Queues-Error-Code': '10503', + 'CF-Queues-Error-Cause': 'Service temporarily unavailable', + }, + }); + } + return new Response('Not Found', { status: 404 }); }, @@ -143,5 +154,16 @@ export default { ); } } + + // Test metrics with error headers defined + try { + await env.QUEUE.metrics(); + assert.fail('Expected metrics() to throw'); + } catch (error) { + assert.strictEqual( + error.message, + 'Service temporarily unavailable (10503)' + ); + } }, }; diff --git a/src/workerd/api/tests/queue-metrics-test.js b/src/workerd/api/tests/queue-metrics-test.js new file mode 100644 index 00000000000..fdf1ef89b8b --- /dev/null +++ b/src/workerd/api/tests/queue-metrics-test.js @@ -0,0 +1,35 @@ +// Copyright (c) 2023 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import assert from 'node:assert'; + +export default { + async fetch(request, env, ctx) { + const { pathname } = new URL(request.url); + if (pathname === '/metrics') { + assert.strictEqual(request.method, 'GET'); + return Response.json({ + backlogCount: 100, + backlogBytes: 2048, + oldestMessageTimestamp: 1000000, + }); + } + return new Response(); + }, + + async test(ctrl, env, ctx) { + const metricsEnabled = env.METRICS_FLAG; + if (metricsEnabled) { + // Flag ON → metrics() should exist and return data + assert.strictEqual(typeof env.QUEUE.metrics, 'function'); + const metrics = await env.QUEUE.metrics(); + assert.strictEqual(metrics.backlogCount, 100); + assert.strictEqual(metrics.backlogBytes, 2048); + assert.strictEqual(metrics.oldestMessageTimestamp, 1000000); + } else { + // Flag OFF → metrics() should not be exposed on the binding + assert.strictEqual(typeof env.QUEUE.metrics, 'undefined'); + } + }, +}; diff --git a/src/workerd/api/tests/queue-metrics-test.wd-test b/src/workerd/api/tests/queue-metrics-test.wd-test new file mode 100644 index 00000000000..f7e80bff949 --- /dev/null +++ b/src/workerd/api/tests/queue-metrics-test.wd-test @@ -0,0 +1,30 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "queue-metrics-enabled", + worker = ( + modules = [ + ( name = "worker-metrics-enabled", esModule = embed "queue-metrics-test.js" ) + ], + bindings = [ + ( name = "QUEUE", queue = "queue-metrics-enabled" ), + ( name = "METRICS_FLAG", json = "true" ), + ], + compatibilityFlags = ["nodejs_compat", "experimental", "capture_async_api_throws"], + ) + ), + ( name = "queue-metrics-disabled", + worker = ( + modules = [ + ( name = "worker-metrics-disabled", esModule = embed "queue-metrics-test.js" ) + ], + bindings = [ + ( name = "QUEUE", queue = "queue-metrics-disabled" ), + ( name = "METRICS_FLAG", json = "false" ), + ], + compatibilityFlags = ["nodejs_compat", "capture_async_api_throws"], + ) + ), + ], +); diff --git a/src/workerd/api/tests/queue-test.js b/src/workerd/api/tests/queue-test.js index 96b4e1cb179..d0771beba5c 100644 --- a/src/workerd/api/tests/queue-test.js +++ b/src/workerd/api/tests/queue-test.js @@ -10,9 +10,9 @@ let serializedBody; export default { // Producer receiver (from `env.QUEUE`) async fetch(request, env, ctx) { - assert.strictEqual(request.method, 'POST'); const { pathname } = new URL(request.url); if (pathname === '/message') { + assert.strictEqual(request.method, 'POST'); const format = request.headers.get('X-Msg-Fmt') ?? 'v8'; if (format === 'text') { assert.strictEqual(request.headers.get('X-Msg-Delay-Secs'), '2'); @@ -32,6 +32,7 @@ export default { assert.fail(`Unexpected format: ${JSON.stringify(format)}`); } } else if (pathname === '/batch') { + assert.strictEqual(request.method, 'POST'); assert.strictEqual(request.headers.get('X-Msg-Delay-Secs'), '2'); const body = await request.json(); diff --git a/src/workerd/api/tests/queue-test.wd-test b/src/workerd/api/tests/queue-test.wd-test index c4bf66cbc70..29ccf01029d 100644 --- a/src/workerd/api/tests/queue-test.wd-test +++ b/src/workerd/api/tests/queue-test.wd-test @@ -23,7 +23,7 @@ const unitTests :Workerd.Config = ( ( name = "QUEUE", queue = "queue-error-codes-enabled" ), ( name = "ERROR_CODES_FLAG", json = "true" ), ], - compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "queue_expose_error_codes", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"], + compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "queue_expose_error_codes", "experimental", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"], ) ), ( name = "queue-error-codes-disabled", @@ -35,7 +35,7 @@ const unitTests :Workerd.Config = ( ( name = "QUEUE", queue = "queue-error-codes-disabled" ), ( name = "ERROR_CODES_FLAG", json = "false" ), ], - compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "no_queue_expose_error_codes", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"], + compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "no_queue_expose_error_codes", "experimental", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"], ) ), ], diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 9b26c76d576..c1cc3ae2b57 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -2395,6 +2395,7 @@ interface Queue { messages: Iterable>, options?: QueueSendBatchOptions, ): Promise; + metrics(): Promise; } interface QueueSendOptions { contentType?: QueueContentType; @@ -2408,6 +2409,11 @@ interface MessageSendRequest { contentType?: QueueContentType; delaySeconds?: number; } +interface QueueMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} interface QueueRetryBatch { retry: boolean; delaySeconds?: number; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index a647ae55861..c4ca0487d02 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -2398,6 +2398,7 @@ export interface Queue { messages: Iterable>, options?: QueueSendBatchOptions, ): Promise; + metrics(): Promise; } export interface QueueSendOptions { contentType?: QueueContentType; @@ -2411,6 +2412,11 @@ export interface MessageSendRequest { contentType?: QueueContentType; delaySeconds?: number; } +export interface QueueMetrics { + backlogCount: number; + backlogBytes: number; + oldestMessageTimestamp: number; +} export interface QueueRetryBatch { retry: boolean; delaySeconds?: number;