From d88cbbc529ff2b69d0d74cf83fed79ae4486ad88 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Mon, 30 Oct 2023 14:23:04 +1100 Subject: [PATCH 1/5] fix: `handlerTimeoutTime` renamed to `timeoutTime` to tests in `RPC.test.ts` --- README.md | 8 ++++---- tests/RPC.test.ts | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 993851e..08b3293 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,7 @@ class Sum extends ClientHandler { async function startServer() { const rpcServer = new RPCServer({ logger: new Logger('rpc-server'), - handlerTimeoutTime: 60000, + timeoutTime: 60000, idGen, }); @@ -421,7 +421,7 @@ function factorialOf(n: number): number { async function startServer() { const rpcServer = new RPCServer({ - handlerTimeoutTime: 200, + timeoutTime: 200, logger, idGen, }); @@ -671,7 +671,7 @@ async function startServer() { const wss = new WebSocket.Server({ port: 8080 }); const rpcServer = new RPCServer({ logger: new Logger('rpc-server'), - handlerTimeoutTime: 1000, + timeoutTime: 1000, idGen, }); rpcServer.start({ @@ -835,7 +835,7 @@ function createSyntheticStreams() { async function startServer() { const rpcServer = new RPCServer({ logger: new Logger('rpc-server'), - handlerTimeoutTime: 1000, + timeoutTime: 1000, idGen, }); diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index 52f1dfa..6eb85f6 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -924,7 +924,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 150, + timeoutTime: 150, }); await rpcServer.start({ manifest: { @@ -984,7 +984,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 150, + timeoutTime: 150, }); await rpcServer.start({ manifest: { From 48cddb71be883833d3d1335f78eb1c699b3af401 Mon Sep 17 00:00:00 2001 From: Aditya <38064122+bettercallav@users.noreply.github.com> Date: Fri, 20 Oct 2023 14:12:06 +1100 Subject: [PATCH 2/5] feat: Implementing handler and caller timeouts which can override default server and client timeouts regardless of their default valu fix: renamed `RPCClient.timeout` and `RPCServer.timeout` to `timeoutTIme` fix: timeout tests for RPCClient and RPCServer fix: fixed removed redundant tests [ci-skip] --- src/RPCClient.ts | 14 +-- src/RPCServer.ts | 17 ++-- tests/RPC.test.ts | 10 +- tests/RPCClient.test.ts | 115 ++++++++++++++++++++- tests/RPCServer.test.ts | 214 ++++++++++++++++++++++++---------------- tests/utils.ts | 10 ++ 6 files changed, 270 insertions(+), 110 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 51aacd3..07befc3 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -41,7 +41,7 @@ class RPCClient { this.onTimeoutCallback = callback; } // Method proxies - public readonly streamKeepAliveTimeoutTime: number; + public readonly timeoutTime: number; public readonly methodsProxy = new Proxy( {}, { @@ -76,7 +76,7 @@ class RPCClient { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward * path and `Uint8Array` to `JSONRPCResponse` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call. + * @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call. * Defaults to 60,000 milliseconds. * for a client call. * @param obj.logger @@ -85,7 +85,7 @@ class RPCClient { manifest, streamFactory, middlewareFactory = middleware.defaultClientMiddlewareWrapper(), - streamKeepAliveTimeoutTime = Infinity, + timeoutTime = Infinity, logger, toError = utils.toError, idGen = () => null, @@ -98,7 +98,7 @@ class RPCClient { JSONRPCResponse, Uint8Array >; - streamKeepAliveTimeoutTime?: number; + timeoutTime?: number; logger?: Logger; idGen?: IdGen; toError?: ToError; @@ -107,7 +107,7 @@ class RPCClient { this.callerTypes = utils.getHandlerTypes(manifest); this.streamFactory = streamFactory; this.middlewareFactory = middlewareFactory; - this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime; + this.timeoutTime = timeoutTime; this.logger = logger ?? new Logger(this.constructor.name); this.toError = toError; } @@ -254,7 +254,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeoutTime, }); } else { timer = ctx.timer; @@ -403,7 +403,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeoutTime, }); } else { timer = ctx.timer; diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 62dc853..27d35c0 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -58,7 +58,7 @@ class RPCServer { protected logger: Logger; protected handlerMap: Map = new Map(); protected defaultTimeoutMap: Map = new Map(); - protected handlerTimeoutTime: number; + protected timeoutTime: number; protected activeStreams: Set> = new Set(); protected fromError: FromError; protected replacer?: (key: string, value: any) => any; @@ -80,18 +80,15 @@ class RPCServer { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward * path and `JSONRPCResponse` to `Uint8Array` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the + * @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a * signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000 * milliseconds. - * @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time. - * The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for - * the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds. * @param obj.logger */ public constructor({ middlewareFactory = middleware.defaultServerMiddlewareWrapper(), - handlerTimeoutTime = Infinity, + timeoutTime = Infinity, logger, idGen = () => null, fromError = utils.fromError, @@ -103,7 +100,7 @@ class RPCServer { Uint8Array, JSONRPCResponseResult >; - handlerTimeoutTime?: number; + timeoutTime?: number; logger?: Logger; idGen?: IdGen; fromError?: FromError; @@ -111,7 +108,7 @@ class RPCServer { }) { this.idGen = idGen; this.middlewareFactory = middlewareFactory; - this.handlerTimeoutTime = handlerTimeoutTime; + this.timeoutTime = timeoutTime; this.fromError = fromError; this.replacer = replacer; this.logger = logger ?? new Logger(this.constructor.name); @@ -453,7 +450,7 @@ class RPCServer { const abortController = new AbortController(); // Setting up timeout timer logic const timer = new Timer({ - delay: this.handlerTimeoutTime, + delay: this.timeoutTime, handler: () => { abortController.abort(new errors.ErrorRPCTimedOut()); if (this.onTimeoutCallback) { @@ -575,7 +572,7 @@ class RPCServer { // Setting up Timeout logic const timeout = this.defaultTimeoutMap.get(method); if (timer.status !== 'settled') { - if (timeout != null && timeout < this.handlerTimeoutTime) { + if (timeout != null) { // Reset timeout with new delay if it is less than the default timer.reset(timeout); } else { diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index 6eb85f6..3c17361 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -609,7 +609,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: timeout, + timeoutTime: timeout, }); await rpcServer.start({ manifest: { @@ -701,7 +701,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 1, + timeoutTime: 1, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); // Register callback @@ -774,8 +774,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - - handlerTimeoutTime: 400, + timeoutTime: 400, }); await rpcServer.start({ manifest: { @@ -839,11 +838,10 @@ describe('RPC', () => { await abortProm.p; }; } - const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: Infinity, + timeoutTime: Infinity, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 113f953..8fdf26d 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -750,7 +750,7 @@ describe(`${RPCClient.name}`, () => { // Should never reach this when testing return {} as RPCStream; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -919,7 +919,7 @@ describe(`${RPCClient.name}`, () => { // Should never reach this when testing return {} as RPCStream; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -1006,7 +1006,7 @@ describe(`${RPCClient.name}`, () => { ctx = ctx_; return streamPair; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -1149,7 +1149,7 @@ describe(`${RPCClient.name}`, () => { { numRuns: 5 }, ); testProp( - 'Check that ctx is provided to the middleWare and that the middleware can reset the timer', + 'Check that ctx is provided to the middleware and that the middleware can reset the timer', [specificMessageArb], async (messages) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); @@ -1197,4 +1197,111 @@ describe(`${RPCClient.name}`, () => { { numRuns: 1 }, ); }); + describe('timeout priority', () => { + testProp( + 'check that call with ctx can override higher timeout of RPCClient', + [rpcTestUtils.timeoutsArb], + async ([lowerTimeoutTime, higherTimeoutTime]) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime: higherTimeoutTime, + }); + + await rpcClient.duplexStreamCaller( + methodName, + { + timer: lowerTimeoutTime, + }, + ); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(lowerTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that call with ctx can override lower timeout of RPCClient', + [rpcTestUtils.timeoutsArb], + async ([lowerTimeoutTime, higherTimeoutTime]) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime: lowerTimeoutTime, + }); + + await rpcClient.duplexStreamCaller( + methodName, + { + timer: higherTimeoutTime, + }, + ); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(higherTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that call with ctx can override lower timeout of RPCClient with Infinity', + [fc.integer({ min: 0 })], + async (timeoutTime) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime, + }); + + await rpcClient.duplexStreamCaller( + methodName, + { + timer: Infinity, + }, + ); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(Infinity); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + }); }); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index 3455b12..f565f1e 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -57,7 +57,6 @@ describe(`${RPCServer.name}`, () => { data: rpcTestUtils.safeJsonValueArb, }), ); - testProp( 'can stream data with raw duplex stream handler', [specificMessageArb], @@ -884,7 +883,7 @@ describe(`${RPCServer.name}`, () => { } const rpcServer = new RPCServer({ - handlerTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -930,7 +929,7 @@ describe(`${RPCServer.name}`, () => { }); test('timeout with default time before handler selected', async () => { const rpcServer = new RPCServer({ - handlerTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -957,85 +956,6 @@ describe(`${RPCServer.name}`, () => { } await rpcServer.stop({ force: true }); }); - test('handler overrides timeout', async () => { - { - const waitProm = promise(); - const ctxShortProm = promise(); - class TestMethodShortTimeout extends UnaryHandler { - timeout = 25; - public handle = async ( - input: JSONRPCParams, - _cancel, - _meta, - ctx_, - ): Promise => { - ctxShortProm.resolveP(ctx_); - await waitProm.p; - return input; - }; - } - const ctxLongProm = promise(); - class TestMethodLongTimeout extends UnaryHandler { - timeout = 100; - public handle = async ( - input: JSONRPCParams, - _cancel, - _meta, - ctx_, - ): Promise => { - ctxLongProm.resolveP(ctx_); - await waitProm.p; - return input; - }; - } - const rpcServer = new RPCServer({ - handlerTimeoutTime: 50, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testShort: new TestMethodShortTimeout({}), - testLong: new TestMethodLongTimeout({}), - }, - }); - const streamShort = rpcTestUtils.messagesToReadableStream([ - { - jsonrpc: '2.0', - method: 'testShort', - params: {}, - }, - ]); - const readWriteStreamShort: RPCStream = { - cancel: () => {}, - readable: streamShort, - writable: new WritableStream(), - }; - rpcServer.handleStream(readWriteStreamShort); - // Shorter timeout is updated - const ctxShort = await ctxShortProm.p; - expect(ctxShort.timer.delay).toEqual(25); - const streamLong = rpcTestUtils.messagesToReadableStream([ - { - jsonrpc: '2.0', - method: 'testLong', - params: {}, - }, - ]); - const readWriteStreamLong: RPCStream = { - cancel: () => {}, - readable: streamLong, - writable: new WritableStream(), - }; - rpcServer.handleStream(readWriteStreamLong); - - // Longer timeout is set to server's default - const ctxLong = await ctxLongProm.p; - expect(ctxLong.timer.delay).toEqual(50); - waitProm.resolveP(); - await rpcServer.stop({ force: true }); - } - }); test('duplex handler refreshes timeout when messages are sent', async () => { const contextProm = promise(); const stepProm1 = promise(); @@ -1061,7 +981,7 @@ describe(`${RPCServer.name}`, () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 1000, + timeoutTime: 1000, }); await rpcServer.start({ manifest: { @@ -1209,4 +1129,132 @@ describe(`${RPCServer.name}`, () => { expect(ctx.timer.delay).toBe(12345); }, ); + describe('timeout priority', () => { + testProp( + 'check that handler can override higher timeout of RPCServer', + [specificMessageArb, rpcTestUtils.timeoutsArb], + async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = lowerTimeoutTime; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime: higherTimeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(lowerTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that handler can override lower timeout of RPCServer', + [specificMessageArb, rpcTestUtils.timeoutsArb], + async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = higherTimeoutTime; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime: lowerTimeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(higherTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + }); + testProp( + 'check that handler can override lower timeout of RPCServer with Infinity', + [specificMessageArb, fc.integer({ min: 0 })], + async (messages, timeoutTime) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = Infinity; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(Infinity); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); }); diff --git a/tests/utils.ts b/tests/utils.ts index cea830e..6c92433 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -274,6 +274,15 @@ const errorArb = ( ), ); +const timeoutsArb = fc + .integer({ min: 0 }) + .chain((lowerTimeoutTime) => + fc.tuple( + fc.constant(lowerTimeoutTime), + fc.integer({ min: lowerTimeoutTime }), + ), + ); + export { binaryStreamToSnippedStream, binaryStreamToNoisyStream, @@ -295,4 +304,5 @@ export { tapTransformStream, createTapPairs, errorArb, + timeoutsArb, }; From a049c6c5cc8581e5bb0a0ade56feb61995c3785d Mon Sep 17 00:00:00 2001 From: Aditya <38064122+bettercallav@users.noreply.github.com> Date: Fri, 27 Oct 2023 12:55:41 +1100 Subject: [PATCH 3/5] feat: negative `timeoutTime` paramaters will now throw errors chore: add tests to test negative timeout values. fix: 0 `timeoutTime` value no longer throws in `RPCClient` fix: `RPCServer.start` now throws when passed a handler with a negative `timeout` [ci-skip] --- src/RPCClient.ts | 3 ++ src/RPCServer.ts | 115 ++++++++++++++++++++++------------------ src/errors.ts | 10 ++++ tests/RPCClient.test.ts | 40 +++++++++----- tests/RPCServer.test.ts | 48 +++++++++++++++++ 5 files changed, 153 insertions(+), 63 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 07befc3..ac3e08b 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -103,6 +103,9 @@ class RPCClient { idGen?: IdGen; toError?: ToError; }) { + if (timeoutTime < 0) { + throw new errors.ErrorRPCInvalidTimeout(); + } this.idGen = idGen; this.callerTypes = utils.getHandlerTypes(manifest); this.streamFactory = streamFactory; diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 27d35c0..fa4a80d 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -106,6 +106,9 @@ class RPCServer { fromError?: FromError; replacer?: (key: string, value: any) => any; }) { + if (timeoutTime < 0) { + throw new errors.ErrorRPCInvalidTimeout(); + } this.idGen = idGen; this.middlewareFactory = middlewareFactory; this.timeoutTime = timeoutTime; @@ -126,58 +129,68 @@ class RPCServer { manifest: ServerManifest; }): Promise { this.logger.info(`Start ${this.constructor.name}`); - for (const [key, manifestItem] of Object.entries(manifest)) { - if (manifestItem instanceof RawHandler) { - this.registerRawStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof DuplexHandler) { - this.registerDuplexStreamHandler( - key, - // Bind the `this` to the generator handler to make the container available - manifestItem.handle.bind(manifestItem), - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ServerHandler) { - this.registerServerStreamHandler( - key, - // Bind the `this` to the generator handler to make the container available - manifestItem.handle.bind(manifestItem), - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ClientHandler) { - this.registerClientStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ClientHandler) { - this.registerClientStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof UnaryHandler) { - this.registerUnaryHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; + try { + for (const [key, manifestItem] of Object.entries(manifest)) { + if (manifestItem.timeout != null && manifestItem.timeout < 0) { + throw new errors.ErrorRPCInvalidHandlerTimeout(); + } + if (manifestItem instanceof RawHandler) { + this.registerRawStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof DuplexHandler) { + this.registerDuplexStreamHandler( + key, + // Bind the `this` to the generator handler to make the container available + manifestItem.handle.bind(manifestItem), + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ServerHandler) { + this.registerServerStreamHandler( + key, + // Bind the `this` to the generator handler to make the container available + manifestItem.handle.bind(manifestItem), + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ClientHandler) { + this.registerClientStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ClientHandler) { + this.registerClientStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof UnaryHandler) { + this.registerUnaryHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + utils.never(); } - utils.never(); + } catch (e) { + // No need to clean up streams, as streams can only be handled after RPCServer has been started. + this.handlerMap.clear(); + this.defaultTimeoutMap.clear(); + throw e; } this.logger.info(`Started ${this.constructor.name}`); } diff --git a/src/errors.ts b/src/errors.ts index 4a1fd4b..69c3ce3 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -27,6 +27,14 @@ class ErrorRPCCallerFailed extends ErrorRPC { static description = 'Failed to call stream'; } +class ErrorRPCInvalidTimeout extends ErrorRPC { + static description = 'Invalid timeout provided'; +} + +class ErrorRPCInvalidHandlerTimeout extends ErrorRPC { + static description = 'Invalid handler timeout provided'; +} + abstract class ErrorRPCProtocol extends ErrorRPC { static error = 'RPC Protocol Error'; code: number; @@ -257,6 +265,8 @@ export { ErrorRPCConnectionLocal, ErrorRPCConnectionPeer, ErrorRPCConnectionKeepAliveTimeOut, + ErrorRPCInvalidTimeout, + ErrorRPCInvalidHandlerTimeout, ErrorRPCConnectionInternal, ErrorMissingHeader, ErrorHandlerAborted, diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 8fdf26d..92fb8a1 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -1148,6 +1148,28 @@ describe(`${RPCClient.name}`, () => { }, { numRuns: 5 }, ); + testProp( + 'RPCClient constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const constructorF = () => + new RPCClient({ + timeoutTime, + streamFactory: () => Promise.resolve(streamPair), + manifest: {}, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); testProp( 'Check that ctx is provided to the middleware and that the middleware can reset the timer', [specificMessageArb], @@ -1220,12 +1242,9 @@ describe(`${RPCClient.name}`, () => { timeoutTime: higherTimeoutTime, }); - await rpcClient.duplexStreamCaller( - methodName, - { - timer: lowerTimeoutTime, - }, - ); + await rpcClient.duplexStreamCaller(methodName, { + timer: lowerTimeoutTime, + }); const ctx = await ctxP; expect(ctx.timer.delay).toBe(lowerTimeoutTime); @@ -1255,12 +1274,9 @@ describe(`${RPCClient.name}`, () => { timeoutTime: lowerTimeoutTime, }); - await rpcClient.duplexStreamCaller( - methodName, - { - timer: higherTimeoutTime, - }, - ); + await rpcClient.duplexStreamCaller(methodName, { + timer: higherTimeoutTime, + }); const ctx = await ctxP; expect(ctx.timer.delay).toBe(higherTimeoutTime); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index f565f1e..0387071 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -1082,6 +1082,54 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); + testProp( + 'RPCServer constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const constructorF = () => + new RPCServer({ + timeoutTime, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); + testProp( + 'RPCServer.start should throw when passed a handler with negative timeout', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const waitProm = promise(); + const ctxLongProm = promise(); + + class TestMethodArbitraryTimeout extends UnaryHandler { + timeout = timeoutTime; + public handle = async ( + input: JSONRPCParams, + _cancel, + _meta, + ctx_, + ): Promise => { + ctxLongProm.resolveP(ctx_); + await waitProm.p; + return input; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + + await expect( + rpcServer.start({ + manifest: { + testArbitrary: new TestMethodArbitraryTimeout({}), + }, + }), + ).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout); + }, + ); testProp( 'middleware can update timeout timer', [specificMessageArb], From b8352267b5a451e94b1760f94ecd556811d2aa11 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Mon, 30 Oct 2023 14:37:51 +1100 Subject: [PATCH 4/5] feat: Timeout Priority documentation in `README.md` fix: `README.md` example was incorrect [ci-skip] --- README.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/README.md b/README.md index 08b3293..01bbf5c 100644 --- a/README.md +++ b/README.md @@ -924,6 +924,35 @@ class TestMethod extends UnaryHandler { } ``` +### Timeout Priority + +A `timeoutTime` can be passed both to the constructors of `RPCServer` and `RPCClient`. This is the default `timeoutTime` for all callers/handlers. + +In the case of `RPCServer`, a `timeout` can be specified when extending any `Handler` class. This will override the default `timeoutTime` set on `RPCServer` for that handler only. + +```ts +class TestMethodArbitraryTimeout extends UnaryHandler { + public timeout = 100; + public handle = async ( + input: JSONValue, + _cancel, + _meta, + ctx_, + ): Promise => { + return input; + }; +} +``` + +In the case of `RPCClient`, a `ctx` with the property `timer` can be supplied with a `Timer` instance or `number` when making making an RPC call. This will override the default `timeoutTime` set on `RPCClient` for that call only. + +```ts +await rpcClient.methods.testMethod({}, { timer: 100 }); +await rpcClient.methods.testMethod({}, { timer: new Timer(undefined, 100) }); +``` + +It's important to note that any of these timeouts will ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below. + ### Timeout Middleware The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out. From 9acf805b7cd35b1e1b460094f4d5d9596296d4c4 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Tue, 31 Oct 2023 11:52:34 +1100 Subject: [PATCH 5/5] lint: lint fix [ci skip] --- tests/RPCClient.test.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 92fb8a1..c7e724f 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -1242,9 +1242,12 @@ describe(`${RPCClient.name}`, () => { timeoutTime: higherTimeoutTime, }); - await rpcClient.duplexStreamCaller(methodName, { - timer: lowerTimeoutTime, - }); + await rpcClient.duplexStreamCaller( + methodName, + { + timer: lowerTimeoutTime, + }, + ); const ctx = await ctxP; expect(ctx.timer.delay).toBe(lowerTimeoutTime); @@ -1274,9 +1277,12 @@ describe(`${RPCClient.name}`, () => { timeoutTime: lowerTimeoutTime, }); - await rpcClient.duplexStreamCaller(methodName, { - timer: higherTimeoutTime, - }); + await rpcClient.duplexStreamCaller( + methodName, + { + timer: higherTimeoutTime, + }, + ); const ctx = await ctxP; expect(ctx.timer.delay).toBe(higherTimeoutTime);