Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 20 additions & 50 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,32 +138,18 @@ class WorkerQueue: public jsg::Object {
jsg::Promise<Metrics> metrics(jsg::Lock& js, const jsg::TypeHandler<Metrics>& metricsHandler);

JSG_RESOURCE_TYPE(WorkerQueue, CompatibilityFlags::Reader flags) {
if (flags.getWorkerdExperimental()) {
JSG_METHOD_NAMED(send, sendWithResponse);
JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);
JSG_METHOD(metrics);
} else {
JSG_METHOD(send);
JSG_METHOD(sendBatch);
}
JSG_METHOD(metrics);
JSG_METHOD_NAMED(send, sendWithResponse);
JSG_METHOD_NAMED(sendBatch, sendBatchWithResponse);

JSG_TS_ROOT();
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<QueueSendResponse>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<QueueSendBatchResponse>;
metrics(): Promise<QueueMetrics>;
});
} else {
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<void>;
});
}
JSG_TS_OVERRIDE(Queue<Body = unknown> {
send(message: Body, options?: QueueSendOptions): Promise<QueueSendResponse>;
sendBatch(messages
: Iterable<MessageSendRequest<Body>>, options ?: QueueSendBatchOptions)
: Promise<QueueSendBatchResponse>;
metrics(): Promise<QueueMetrics>;
});
JSG_TS_DEFINE(type QueueContentType = "text" | "bytes" | "json" | "v8");
}

Expand Down Expand Up @@ -339,24 +325,16 @@ class QueueEvent final: public ExtendableEvent {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages);
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);

if (flags.getWorkerdExperimental()) {
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
}
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);

JSG_METHOD(retryAll);
JSG_METHOD(ackAll);

JSG_TS_ROOT();
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
} else {
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
}
JSG_TS_OVERRIDE(QueueEvent<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down Expand Up @@ -422,24 +400,16 @@ class QueueController final: public jsg::Object {
JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages);
JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName);

if (flags.getWorkerdExperimental()) {
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);
}
JSG_READONLY_INSTANCE_PROPERTY(metadata, getMetadata);

JSG_METHOD(retryAll);
JSG_METHOD(ackAll);

JSG_TS_ROOT();
if (flags.getWorkerdExperimental()) {
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
} else {
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
});
}
JSG_TS_OVERRIDE(MessageBatch<Body = unknown> {
readonly messages: readonly Message<Body>[];
readonly metadata: MessageBatchMetadata;
});
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
60 changes: 20 additions & 40 deletions src/workerd/api/tests/queue-metadata-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@ import assert from 'node:assert';

export default {
async queue(batch, env, ctx) {
const flagEnabled = env.METADATA_FLAG;

if (!flagEnabled) {
// Flag disabled → metadata property should not exist
assert.strictEqual(batch.metadata, undefined);
batch.ackAll();
return;
}

// Flag enabled → metadata should always be present
assert.ok(batch.metadata, 'Expected batch.metadata to be defined');
assert.ok(
batch.metadata.metrics,
Expand Down Expand Up @@ -47,37 +37,27 @@ export default {
},

async test(ctrl, env, ctx) {
const flagEnabled = env.METADATA_FLAG;
const timestamp = new Date();

if (flagEnabled) {
const response1 = await env.SERVICE.queue(
'test-queue',
[{ id: '0', timestamp, body: 'hello', attempts: 1 }],
{
metrics: {
backlogCount: 100,
backlogBytes: 2048,
oldestMessageTimestamp: 1000000,
},
}
);
assert.strictEqual(response1.outcome, 'ok');
assert(response1.ackAll);

// Test with omitted metadata
const response2 = await env.SERVICE.queue('test-queue', [
{ id: '1', timestamp, body: 'world', attempts: 1 },
]);
assert.strictEqual(response2.outcome, 'ok');
assert(response2.ackAll);
} else {
// Flag disabled → handler still works, metadata not visible
const response = await env.SERVICE.queue('test-queue', [
{ id: '0', timestamp, body: 'foobar', attempts: 1 },
]);
assert.strictEqual(response.outcome, 'ok');
assert(response.ackAll);
}
const response1 = await env.SERVICE.queue(
'test-queue',
[{ id: '0', timestamp, body: 'hello', attempts: 1 }],
{
metrics: {
backlogCount: 100,
backlogBytes: 2048,
oldestMessageTimestamp: 1000000,
},
}
);
assert.strictEqual(response1.outcome, 'ok');
assert(response1.ackAll);

// Test with omitted metadata
const response2 = await env.SERVICE.queue('test-queue', [
{ id: '1', timestamp, body: 'world', attempts: 1 },
]);
assert.strictEqual(response2.outcome, 'ok');
assert(response2.ackAll);
},
};
13 changes: 0 additions & 13 deletions src/workerd/api/tests/queue-metadata-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@ const unitTests :Workerd.Config = (
],
bindings = [
( name = "SERVICE", service = "queue-metadata-test" ),
( name = "METADATA_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "experimental"],
)
),
( name = "queue-metadata-disabled-test",
worker = (
modules = [
( name = "worker-disabled", esModule = embed "queue-metadata-test.js" )
],
bindings = [
( name = "SERVICE", service = "queue-metadata-disabled-test" ),
( name = "METADATA_FLAG", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const unitTests :Workerd.Config = (
bindings = [
( name = "QUEUE", queue = "queue-metrics-sentinel-test" ),
],
compatibilityFlags = ["nodejs_compat", "queues_json_messages", "experimental", "capture_async_api_throws"],
compatibilityFlags = ["nodejs_compat", "queues_json_messages", "capture_async_api_throws"],
)
),
],
Expand Down
25 changes: 9 additions & 16 deletions src/workerd/api/tests/queue-metrics-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,14 @@ export default {
},

async test(ctrl, env, ctx) {
const metricsEnabled = env.METRICS_FLAG;
if (metricsEnabled) {
// Flag ON → metrics() should exist and return data
assert.strictEqual(typeof env.QUEUE.metrics, 'function');
const metrics = await env.QUEUE.metrics();
assert.strictEqual(metrics.backlogCount, 100);
assert.strictEqual(metrics.backlogBytes, 2048);
assert.ok(
metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(metrics.oldestMessageTimestamp.getTime(), 1000000);
} else {
// Flag OFF → metrics() should not be exposed on the binding
assert.strictEqual(typeof env.QUEUE.metrics, 'undefined');
}
assert.strictEqual(typeof env.QUEUE.metrics, 'function');
const metrics = await env.QUEUE.metrics();
assert.strictEqual(metrics.backlogCount, 100);
assert.strictEqual(metrics.backlogBytes, 2048);
assert.ok(
metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(metrics.oldestMessageTimestamp.getTime(), 1000000);
},
};
19 changes: 3 additions & 16 deletions src/workerd/api/tests/queue-metrics-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,13 @@ using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "queue-metrics-enabled",
( name = "queue-metrics-test",
worker = (
modules = [
( name = "worker-metrics-enabled", esModule = embed "queue-metrics-test.js" )
( name = "worker-metrics-test", esModule = embed "queue-metrics-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-metrics-enabled" ),
( name = "METRICS_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "experimental", "capture_async_api_throws"],
)
),
( name = "queue-metrics-disabled",
worker = (
modules = [
( name = "worker-metrics-disabled", esModule = embed "queue-metrics-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-metrics-disabled" ),
( name = "METRICS_FLAG", json = "false" ),
( name = "QUEUE", queue = "queue-metrics-test" ),
],
compatibilityFlags = ["nodejs_compat", "capture_async_api_throws"],
)
Expand Down
47 changes: 20 additions & 27 deletions src/workerd/api/tests/queue-producer-metadata-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,31 @@ export default {
},

async test(ctrl, env) {
const responseBodyEnabled = env.RESPONSE_BODY_FLAG;

const sendResult = await env.QUEUE.send('abc', { contentType: 'text' });
const sendBatchResult = await env.QUEUE.sendBatch([
{ body: 'def', contentType: 'text' },
]);

if (responseBodyEnabled) {
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100);
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048);
assert.ok(
sendResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
sendResult.metadata.metrics.oldestMessageTimestamp.getTime(),
1000000
);
assert.strictEqual(sendResult.metadata.metrics.backlogCount, 100);
assert.strictEqual(sendResult.metadata.metrics.backlogBytes, 2048);
assert.ok(
sendResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
sendResult.metadata.metrics.oldestMessageTimestamp.getTime(),
1000000
);

assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200);
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096);
assert.ok(
sendBatchResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
sendBatchResult.metadata.metrics.oldestMessageTimestamp.getTime(),
2000000
);
} else {
assert.strictEqual(sendResult, undefined);
assert.strictEqual(sendBatchResult, undefined);
}
assert.strictEqual(sendBatchResult.metadata.metrics.backlogCount, 200);
assert.strictEqual(sendBatchResult.metadata.metrics.backlogBytes, 4096);
assert.ok(
sendBatchResult.metadata.metrics.oldestMessageTimestamp instanceof Date,
'Expected oldestMessageTimestamp to be a Date'
);
assert.strictEqual(
sendBatchResult.metadata.metrics.oldestMessageTimestamp.getTime(),
2000000
);
},
};
19 changes: 3 additions & 16 deletions src/workerd/api/tests/queue-producer-metadata-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,13 @@ using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "queue-producer-metadata-enabled",
( name = "queue-producer-metadata-test",
worker = (
modules = [
( name = "worker-producer-metadata-enabled", esModule = embed "queue-producer-metadata-test.js" )
( name = "worker-producer-metadata-test", esModule = embed "queue-producer-metadata-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-producer-metadata-enabled" ),
( name = "RESPONSE_BODY_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "queues_json_messages", "experimental", "capture_async_api_throws"],
)
),
( name = "queue-producer-metadata-disabled",
worker = (
modules = [
( name = "worker-producer-metadata-disabled", esModule = embed "queue-producer-metadata-test.js" )
],
bindings = [
( name = "QUEUE", queue = "queue-producer-metadata-disabled" ),
( name = "RESPONSE_BODY_FLAG", json = "false" ),
( name = "QUEUE", queue = "queue-producer-metadata-test" ),
],
compatibilityFlags = ["nodejs_compat", "queues_json_messages", "capture_async_api_throws"],
)
Expand Down
10 changes: 9 additions & 1 deletion src/workerd/api/tests/queue-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ export default {
} else {
assert.fail(`Unexpected pathname: ${JSON.stringify(pathname)}`);
}
return new Response();
return Response.json({
metadata: {
metrics: {
backlogCount: 0,
backlogBytes: 0,
oldestMessageTimestamp: 0,
},
},
});
},

// Consumer receiver (from `env.SERVICE`)
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/tests/queue-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const unitTests :Workerd.Config = (
( name = "QUEUE", queue = "queue-error-codes-enabled" ),
( name = "ERROR_CODES_FLAG", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "queue_expose_error_codes", "experimental", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "queue_expose_error_codes", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
)
),
( name = "queue-error-codes-disabled",
Expand All @@ -35,7 +35,7 @@ const unitTests :Workerd.Config = (
( name = "QUEUE", queue = "queue-error-codes-disabled" ),
( name = "ERROR_CODES_FLAG", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "no_queue_expose_error_codes", "experimental", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "queues_json_messages", "no_queue_expose_error_codes", "rpc", "capture_async_api_throws", "disable_fast_jsg_struct"],
)
),
],
Expand Down
Loading
Loading