Skip to content
This repository was archived by the owner on Nov 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion deno-runtime/lib/messenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export function isErrorResponse(message: jsonrpc.JsonRpc): message is jsonrpc.Er
return message instanceof jsonrpc.ErrorObject;
}

const COMMAND_PONG = '_zPONG';

export const RPCResponseObserver = new EventTarget();

export const Queue = new (class Queue {
Expand All @@ -53,7 +55,7 @@ export const Queue = new (class Queue {
this.isProcessing = false;
}

public enqueue(message: jsonrpc.JsonRpc) {
public enqueue(message: jsonrpc.JsonRpc | typeof COMMAND_PONG) {
this.queue.push(encoder.encode(message));
this.processQueue();
}
Expand Down Expand Up @@ -157,6 +159,10 @@ export async function successResponse({ id, result }: SuccessResponseDescriptor)
await Queue.enqueue(rpc);
}

export function pongResponse(): Promise<void> {
return Promise.resolve(Queue.enqueue(COMMAND_PONG));
}

export async function sendRequest(requestDescriptor: RequestDescriptor): Promise<jsonrpc.SuccessObject> {
const request = jsonrpc.request(Math.random().toString(36).slice(2), requestDescriptor.method, requestDescriptor.params);

Expand Down
10 changes: 9 additions & 1 deletion deno-runtime/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Handlers = {
ping: (method: string, params: unknown) => 'pong';
};

const COMMAND_PING = '_zPING';

async function requestRouter({ type, payload }: Messenger.JsonRpcRequest): Promise<void> {
const methodHandlers: Handlers = {
app: handleApp,
Expand Down Expand Up @@ -98,6 +100,12 @@ async function main() {

for await (const message of decoder.decodeStream(Deno.stdin.readable)) {
try {
// Process PING command first as it is not JSON RPC
if (message === COMMAND_PING) {
Messenger.pongResponse();
continue;
}

const JSONRPCMessage = Messenger.parseMessage(message as Record<string, unknown>);

if (Messenger.isRequest(JSONRPCMessage)) {
Expand All @@ -110,7 +118,7 @@ async function main() {
}
} catch (error) {
if (Messenger.isErrorResponse(error)) {
await Messenger.Transport.send(error);
await Messenger.errorResponse(error);
} else {
await Messenger.sendParseError();
}
Expand Down
63 changes: 48 additions & 15 deletions src/server/runtime/deno/AppsEngineDenoRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export const ALLOWED_ACCESSOR_METHODS = [
>
>;

const COMMAND_PING = '_zPING';
const COMMAND_PONG = '_zPONG';

export const JSONRPC_METHOD_NOT_FOUND = -32601;

export function isValidOrigin(accessor: string): accessor is typeof ALLOWED_ACCESSOR_METHODS[number] {
Expand Down Expand Up @@ -153,19 +156,36 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
this.runtimeManager = manager.getRuntime();
}

/**
* Start up the process of ping/pong for liveness check
*
* The message exchange does not use JSON RPC as it adds a lot of overhead
* with the creation and encoding of a full object for transfer. By using a
* string the process is less intensive.
*/
private startPing() {
const ping = () => {
const start = Date.now();
this.sendRequest({ method: 'ping', params: [] })
.then((result) => {
if (result !== 'pong') {
this.debug(`Expected 'pong', got %s (%d ms)`, result, Date.now() - start);
}
})
.catch((reason: unknown) => {
this.debug('Ping failed: %s (%d ms)', reason, Date.now() - start);
})
.finally(() => setTimeout(ping, 5000));

const responsePromise = new Promise<void>((resolve, reject) => {
const onceCallback = () => {
clearTimeout(timeoutId);
this.debug('Ping successful in %d ms', Date.now() - start);
resolve();
};

const timeoutId = setTimeout(() => {
this.debug('Ping failed in %d ms', Date.now() - start);
this.off('pong', onceCallback);
reject();
}, this.options.timeout);

this.once('pong', onceCallback);
});

this.send(COMMAND_PING);

responsePromise.finally(() => setTimeout(ping, 5000));
};

ping();
Expand Down Expand Up @@ -228,7 +248,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
return this.appPackage.info.id;
}

private send(message: jsonrpc.JsonRpc) {
private send(message: jsonrpc.JsonRpc | typeof COMMAND_PING) {
this.debug('Sending message to subprocess %o', message);
this.deno.stdin.write(encoder.encode(message));
}
Expand Down Expand Up @@ -268,17 +288,24 @@ export class DenoRuntimeSubprocessController extends EventEmitter {

private waitForResponse(req: jsonrpc.RequestObject): Promise<unknown> {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => reject(new Error(`Request "${req.id}" for method "${req.method}" timed out`)), this.options.timeout);

this.once(`result:${req.id}`, (result: unknown, error: jsonrpc.IParsedObjectError['payload']['error']) => {
const responseCallback = (result: unknown, error: jsonrpc.IParsedObjectError['payload']['error']) => {
clearTimeout(timeoutId);

if (error) {
reject(error);
}

resolve(result);
});
};

const eventName = `result:${req.id}`;

const timeoutId = setTimeout(() => {
this.off(eventName, responseCallback);
reject(new Error(`Request "${req.id}" for method "${req.method}" timed out`));
}, this.options.timeout);

this.once(eventName, responseCallback);
});
}

Expand Down Expand Up @@ -481,6 +508,12 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
for await (const message of decoder.decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}

const JSONRPCMessage = jsonrpc.parseObject(message);

if (Array.isArray(JSONRPCMessage)) {
Expand Down