Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,38 @@ kj::Promise<void> WorkerQueue::sendBatch(jsg::Lock& js,
.attach(context.registerPendingEvent());
};

jsg::Promise<WorkerQueue::Metrics> WorkerQueue::metrics(
jsg::Lock& js, const jsg::TypeHandler<Metrics>& metricsHandler) {
auto& context = IoContext::current();

auto headers = kj::HttpHeaders(context.getHeaderTable());

auto client = context.getHttpClient(subrequestChannel, true, kj::none, "queue_metrics"_kjc);
auto req = client->request(kj::HttpMethod::GET, "https://fake-host/metrics"_kjc, headers);
const auto& headerIds = context.getHeaderIds();

static constexpr auto handleMetrics = [](auto req, auto client,
auto& headerIds) -> kj::Promise<kj::String> {
Comment thread
jasnell marked this conversation as resolved.
auto response = co_await req.response;

JSG_REQUIRE(response.statusCode == 200, Error, buildQueueErrorMessage(response, headerIds));

co_return co_await response.body->readAllText();
};

auto promise = handleMetrics(kj::mv(req), kj::mv(client), headerIds);

return context.awaitIo(
js, kj::mv(promise), [&metricsHandler](jsg::Lock& js, kj::String text) -> Metrics {
auto parsed = jsg::JsValue::fromJson(js, text);
KJ_IF_SOME(result, metricsHandler.tryUnwrap(js, parsed)) {
return kj::mv(result);
}
_JSG_INTERNAL_FAIL_REQUIRE(
JSG_EXCEPTION(Error), "Failed to parse queue metrics response", text);
});
}

QueueMessage::QueueMessage(
jsg::Lock& js, rpc::QueueMessage::Reader message, IoPtr<QueueEventResult> result)
: id(kj::str(message.getId())),
Expand Down
43 changes: 33 additions & 10 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ class WorkerQueue: public jsg::Object {
// representing this queue.
WorkerQueue(uint subrequestChannel): subrequestChannel(subrequestChannel) {}

struct Metrics {
double backlogCount;
double backlogBytes;
double oldestMessageTimestamp;
JSG_STRUCT(backlogCount, backlogBytes, oldestMessageTimestamp);
JSG_STRUCT_TS_OVERRIDE(QueueMetrics);
};

struct SendOptions {
// TODO(soon): Support metadata.

Expand Down Expand Up @@ -73,17 +81,32 @@ class WorkerQueue: public jsg::Object {
jsg::Sequence<MessageSendRequest> batch,
jsg::Optional<SendBatchOptions> options);

JSG_RESOURCE_TYPE(WorkerQueue) {
jsg::Promise<Metrics> metrics(jsg::Lock& js, const jsg::TypeHandler<Metrics>& metricsHandler);

JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
JSG_METHOD(send);
JSG_METHOD(sendBatch);
if (flags.getWorkerdExperimental()) {
JSG_METHOD(metrics);
}

JSG_TS_ROOT();
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<void>;
});
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<void>;
metrics(): Promise<QueueMetrics>;
});
} else {
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<void>;
});
}
JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8");
}

Expand Down Expand Up @@ -377,8 +400,8 @@ class QueueCustomEvent final: public WorkerInterface::CustomEvent, public kj::Re

#define EW_QUEUE_ISOLATE_TYPES \
api::WorkerQueue, api::WorkerQueue::SendOptions, api::WorkerQueue::SendBatchOptions, \
api::WorkerQueue::MessageSendRequest, api::IncomingQueueMessage, api::QueueRetryBatch, \
api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, api::QueueMessage, \
api::QueueEvent, api::QueueController, api::QueueExportedHandler
api::WorkerQueue::MessageSendRequest, api::WorkerQueue::Metrics, api::IncomingQueueMessage, \
api::QueueRetryBatch, api::QueueRetryMessage, api::QueueResponse, api::QueueRetryOptions, \
api::QueueMessage, api::QueueEvent, api::QueueController, api::QueueExportedHandler

} // namespace workerd::api
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ wd_test(
],
)

wd_test(
src = "queue-metrics-test.wd-test",
args = ["--experimental"],
data = ["queue-metrics-test.js"],
)

wd_test(
src = "r2-test.wd-test",
args = ["--experimental"],
Expand Down
22 changes: 22 additions & 0 deletions src/workerd/api/tests/queue-error-codes-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ export default {
return new Response('');
}

if (pathname === '/metrics') {
// Return error with headers defined for metrics
return new Response('', {
status: 503,
headers: {
'CF-Queues-Error-Code': '10503',
'CF-Queues-Error-Cause': 'Service temporarily unavailable',
},
});
}

return new Response('Not Found', { status: 404 });
},

Expand Down Expand Up @@ -143,5 +154,16 @@ export default {
);
}
}

// Test metrics with error headers defined
try {
await env.QUEUE.metrics();
assert.fail('Expected metrics() to throw');
} catch (error) {
assert.strictEqual(
error.message,
'Service temporarily unavailable (10503)'
);
}
},
};
35 changes: 35 additions & 0 deletions src/workerd/api/tests/queue-metrics-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';

export default {
async fetch(request, env, ctx) {
const { pathname } = new URL(request.url);
if (pathname === '/metrics') {
assert.strictEqual(request.method, 'GET');
return Response.json({
backlogCount: 100,
backlogBytes: 2048,
oldestMessageTimestamp: 1000000,
});
}
return new Response();
},

async test(ctrl, env, ctx) {
const metricsEnabled = env.METRICS_FLAG;
if (metricsEnabled) {
// Flag ON → metrics() should exist and return data
assert.strictEqual(typeof env.QUEUE.metrics, 'function');
const metrics = await env.QUEUE.metrics();
assert.strictEqual(metrics.backlogCount, 100);
assert.strictEqual(metrics.backlogBytes, 2048);
assert.strictEqual(metrics.oldestMessageTimestamp, 1000000);
} else {
// Flag OFF → metrics() should not be exposed on the binding
assert.strictEqual(typeof env.QUEUE.metrics, 'undefined');
}
},
};
30 changes: 30 additions & 0 deletions src/workerd/api/tests/queue-metrics-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "queue-metrics-enabled",
worker = (
modules = [
( name = "worker-metrics-enabled", esModule = embed "queue-metrics-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-metrics-enabled" ),
( name = "METRICS_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "experimental", "capture_async_api_throws"],
)
),
( name = "queue-metrics-disabled",
worker = (
modules = [
( name = "worker-metrics-disabled", esModule = embed "queue-metrics-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-metrics-disabled" ),
( name = "METRICS_FLAG", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "capture_async_api_throws"],
)
),
],
);
3 changes: 2 additions & 1 deletion src/workerd/api/tests/queue-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ let serializedBody;
export default {
// Producer receiver (from `env.QUEUE`)
async fetch(request, env, ctx) {
assert.strictEqual(request.method, 'POST');
const { pathname } = new URL(request.url);
if (pathname === '/message') {
assert.strictEqual(request.method, 'POST');
const format = request.headers.get('X-Msg-Fmt') ?? 'v8';
if (format === 'text') {
assert.strictEqual(request.headers.get('X-Msg-Delay-Secs'), '2');
Expand All @@ -32,6 +32,7 @@ export default {
assert.fail(`Unexpected format: ${JSON.stringify(format)}`);
}
} else if (pathname === '/batch') {
assert.strictEqual(request.method, 'POST');
assert.strictEqual(request.headers.get('X-Msg-Delay-Secs'), '2');

const body = await request.json();
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/tests/queue-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const unitTests :Workerd.Config = (
( name = "QUEUE", queue = "queue-error-codes-enabled" ),
( name = "ERROR_CODES_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "queue_expose_error_codes", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "queue_expose_error_codes", "experimental", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
)
),
( name = "queue-error-codes-disabled",
Expand All @@ -35,7 +35,7 @@ const unitTests :Workerd.Config = (
( name = "QUEUE", queue = "queue-error-codes-disabled" ),
( name = "ERROR_CODES_FLAG", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "no_queue_expose_error_codes", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "no_queue_expose_error_codes", "experimental", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
)
),
],
Expand Down
6 changes: 6 additions & 0 deletions types/generated-snapshot/experimental/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,7 @@ interface Queue<Body = unknown> {
messages: Iterable<MessageSendRequest<Body>>,
options?: QueueSendBatchOptions,
): Promise<void>;
metrics(): Promise<QueueMetrics>;
}
interface QueueSendOptions {
contentType?: QueueContentType;
Expand All @@ -2408,6 +2409,11 @@ interface MessageSendRequest<Body = unknown> {
contentType?: QueueContentType;
delaySeconds?: number;
}
interface QueueMetrics {
backlogCount: number;
backlogBytes: number;
oldestMessageTimestamp: number;
}
interface QueueRetryBatch {
retry: boolean;
delaySeconds?: number;
Expand Down
6 changes: 6 additions & 0 deletions types/generated-snapshot/experimental/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2398,6 +2398,7 @@ export interface Queue<Body = unknown> {
messages: Iterable<MessageSendRequest<Body>>,
options?: QueueSendBatchOptions,
): Promise<void>;
metrics(): Promise<QueueMetrics>;
}
export interface QueueSendOptions {
contentType?: QueueContentType;
Expand All @@ -2411,6 +2412,11 @@ export interface MessageSendRequest<Body = unknown> {
contentType?: QueueContentType;
delaySeconds?: number;
}
export interface QueueMetrics {
backlogCount: number;
backlogBytes: number;
oldestMessageTimestamp: number;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for coming here late, but is the units of this timestamp documented anywhere? Times should generally either have an obvious type (like Date) or a clear unit (is this in seconds since the epoch? milliseconds? something else?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example the field 5 lines up is called delaySeconds rather than just delay. The unit doesn't have to be in the field name itself -- although that is often a good place for it -- but it does at least need to be clearly documented.

Copy link
Copy Markdown
Member Author

@KennethRuan KennethRuan Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it may not be very clear. On the HTTP side, it will be documented in the OpenAPI schema, what would be the best way to document it in workerd?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at it more closely, the best answer here would be to actually return this as a Date type (which IIRC can be easily done by making the C++ datatype a kj::Date).

This would parallel the timestamp field in IncomingQueueMessage or a bunch of the other runtime APIs that return a Date if you look at the rest of this file.

Copy link
Copy Markdown
Member Author

@KennethRuan KennethRuan Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thank you for pointing it out! I will get that change made.

}
export interface QueueRetryBatch {
retry: boolean;
delaySeconds?: number;
Expand Down
Loading