From bffe537a4a0007e67ae068c7a10d9865578b404f Mon Sep 17 00:00:00 2001 From: Douglas Gubert Date: Wed, 24 Apr 2024 16:59:13 -0300 Subject: [PATCH 1/3] Improve ping execution time --- deno-runtime/lib/messenger.ts | 8 +++- deno-runtime/main.ts | 8 ++++ .../runtime/deno/AppsEngineDenoRuntime.ts | 48 ++++++++++++++----- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/deno-runtime/lib/messenger.ts b/deno-runtime/lib/messenger.ts index e01eb8205..1e9ffe05c 100644 --- a/deno-runtime/lib/messenger.ts +++ b/deno-runtime/lib/messenger.ts @@ -29,6 +29,8 @@ export function isErrorResponse(message: jsonrpc.JsonRpc): message is jsonrpc.Er return message instanceof jsonrpc.ErrorObject; } +const COMMAND_PONG = '_zPONG'; + export const RPCResponseObserver = new EventTarget(); export const Queue = new (class Queue { @@ -53,7 +55,7 @@ export const Queue = new (class Queue { this.isProcessing = false; } - public enqueue(message: jsonrpc.JsonRpc) { + public enqueue(message: jsonrpc.JsonRpc | typeof COMMAND_PONG) { this.queue.push(encoder.encode(message)); this.processQueue(); } @@ -157,6 +159,10 @@ export async function successResponse({ id, result }: SuccessResponseDescriptor) await Queue.enqueue(rpc); } +export function pongResponse(): Promise { + return Promise.resolve(Queue.enqueue(COMMAND_PONG)); +} + export async function sendRequest(requestDescriptor: RequestDescriptor): Promise { const request = jsonrpc.request(Math.random().toString(36).slice(2), requestDescriptor.method, requestDescriptor.params); diff --git a/deno-runtime/main.ts b/deno-runtime/main.ts index f65ecca62..d1ae89bac 100644 --- a/deno-runtime/main.ts +++ b/deno-runtime/main.ts @@ -31,6 +31,8 @@ type Handlers = { ping: (method: string, params: unknown) => 'pong'; }; +const COMMAND_PING = '_zPING'; + async function requestRouter({ type, payload }: Messenger.JsonRpcRequest): Promise { const methodHandlers: Handlers = { app: handleApp, @@ -98,6 +100,12 @@ async function main() { for await (const message of decoder.decodeStream(Deno.stdin.readable)) { try { + // Process PING command first as it is not JSON RPC + if (message === COMMAND_PING) { + Messenger.pongResponse(); + continue; + } + const JSONRPCMessage = Messenger.parseMessage(message as Record); if (Messenger.isRequest(JSONRPCMessage)) { diff --git a/src/server/runtime/deno/AppsEngineDenoRuntime.ts b/src/server/runtime/deno/AppsEngineDenoRuntime.ts index 2a4f3859b..90f4609c2 100644 --- a/src/server/runtime/deno/AppsEngineDenoRuntime.ts +++ b/src/server/runtime/deno/AppsEngineDenoRuntime.ts @@ -41,6 +41,9 @@ export const ALLOWED_ACCESSOR_METHODS = [ > >; +const COMMAND_PING = '_zPING'; +const COMMAND_PONG = '_zPONG'; + export const JSONRPC_METHOD_NOT_FOUND = -32601; export function isValidOrigin(accessor: string): accessor is typeof ALLOWED_ACCESSOR_METHODS[number] { @@ -153,19 +156,36 @@ export class DenoRuntimeSubprocessController extends EventEmitter { this.runtimeManager = manager.getRuntime(); } + /** + * Start up the process of ping/pong for liveness check + * + * The message exchange does not use JSON RPC as it adds a lot of overhead + * with the creation and encoding of a full object for transfer. By using a + * string the process is less intensive. + */ private startPing() { const ping = () => { const start = Date.now(); - this.sendRequest({ method: 'ping', params: [] }) - .then((result) => { - if (result !== 'pong') { - this.debug(`Expected 'pong', got %s (%d ms)`, result, Date.now() - start); - } - }) - .catch((reason: unknown) => { - this.debug('Ping failed: %s (%d ms)', reason, Date.now() - start); - }) - .finally(() => setTimeout(ping, 5000)); + + const responsePromise = new Promise((resolve, reject) => { + const onceCallback = () => { + clearTimeout(timeoutId); + this.debug('Ping successful in %d ms', Date.now() - start); + resolve(); + }; + + const timeoutId = setTimeout(() => { + this.debug('Ping failed in %d ms', Date.now() - start); + this.off('pong', onceCallback); + reject(); + }, this.options.timeout); + + this.once('pong', onceCallback); + }); + + this.send(COMMAND_PING); + + responsePromise.finally(() => setTimeout(ping, 5000)); }; ping(); @@ -228,7 +248,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { return this.appPackage.info.id; } - private send(message: jsonrpc.JsonRpc) { + private send(message: jsonrpc.JsonRpc | typeof COMMAND_PING) { this.debug('Sending message to subprocess %o', message); this.deno.stdin.write(encoder.encode(message)); } @@ -481,6 +501,12 @@ export class DenoRuntimeSubprocessController extends EventEmitter { for await (const message of decoder.decodeStream(stream)) { this.debug('Received message from subprocess %o', message); try { + // Process PONG resonse first as it is not JSON RPC + if (message === COMMAND_PONG) { + this.emit('pong'); + continue; + } + const JSONRPCMessage = jsonrpc.parseObject(message); if (Array.isArray(JSONRPCMessage)) { From 0ae5b65d8b8cec4d5dff447ac6a24f3eeeafb0a1 Mon Sep 17 00:00:00 2001 From: Douglas Gubert Date: Wed, 24 Apr 2024 17:08:35 -0300 Subject: [PATCH 2/3] Fix error message in main loop --- deno-runtime/main.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deno-runtime/main.ts b/deno-runtime/main.ts index d1ae89bac..09be5258e 100644 --- a/deno-runtime/main.ts +++ b/deno-runtime/main.ts @@ -118,7 +118,7 @@ async function main() { } } catch (error) { if (Messenger.isErrorResponse(error)) { - await Messenger.Transport.send(error); + await Messenger.errorResponse(error); } else { await Messenger.sendParseError(); } From 0aeead5f3fea33011f837257e1a102be0405c509 Mon Sep 17 00:00:00 2001 From: Douglas Gubert Date: Wed, 24 Apr 2024 17:26:36 -0300 Subject: [PATCH 3/3] Refactor sendRequest method to prevent memory leak --- src/server/runtime/deno/AppsEngineDenoRuntime.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/server/runtime/deno/AppsEngineDenoRuntime.ts b/src/server/runtime/deno/AppsEngineDenoRuntime.ts index 90f4609c2..c522d667a 100644 --- a/src/server/runtime/deno/AppsEngineDenoRuntime.ts +++ b/src/server/runtime/deno/AppsEngineDenoRuntime.ts @@ -288,9 +288,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter { private waitForResponse(req: jsonrpc.RequestObject): Promise { return new Promise((resolve, reject) => { - const timeoutId = setTimeout(() => reject(new Error(`Request "${req.id}" for method "${req.method}" timed out`)), this.options.timeout); - - this.once(`result:${req.id}`, (result: unknown, error: jsonrpc.IParsedObjectError['payload']['error']) => { + const responseCallback = (result: unknown, error: jsonrpc.IParsedObjectError['payload']['error']) => { clearTimeout(timeoutId); if (error) { @@ -298,7 +296,16 @@ export class DenoRuntimeSubprocessController extends EventEmitter { } resolve(result); - }); + }; + + const eventName = `result:${req.id}`; + + const timeoutId = setTimeout(() => { + this.off(eventName, responseCallback); + reject(new Error(`Request "${req.id}" for method "${req.method}" timed out`)); + }, this.options.timeout); + + this.once(eventName, responseCallback); }); }