diff --git a/README.md b/README.md index 993851e..01bbf5c 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,7 @@ class Sum extends ClientHandler { async function startServer() { const rpcServer = new RPCServer({ logger: new Logger('rpc-server'), - handlerTimeoutTime: 60000, + timeoutTime: 60000, idGen, }); @@ -421,7 +421,7 @@ function factorialOf(n: number): number { async function startServer() { const rpcServer = new RPCServer({ - handlerTimeoutTime: 200, + timeoutTime: 200, logger, idGen, }); @@ -671,7 +671,7 @@ async function startServer() { const wss = new WebSocket.Server({ port: 8080 }); const rpcServer = new RPCServer({ logger: new Logger('rpc-server'), - handlerTimeoutTime: 1000, + timeoutTime: 1000, idGen, }); rpcServer.start({ @@ -835,7 +835,7 @@ function createSyntheticStreams() { async function startServer() { const rpcServer = new RPCServer({ logger: new Logger('rpc-server'), - handlerTimeoutTime: 1000, + timeoutTime: 1000, idGen, }); @@ -924,6 +924,35 @@ class TestMethod extends UnaryHandler { } ``` +### Timeout Priority + +A `timeoutTime` can be passed both to the constructors of `RPCServer` and `RPCClient`. This is the default `timeoutTime` for all callers/handlers. + +In the case of `RPCServer`, a `timeout` can be specified when extending any `Handler` class. This will override the default `timeoutTime` set on `RPCServer` for that handler only. + +```ts +class TestMethodArbitraryTimeout extends UnaryHandler { + public timeout = 100; + public handle = async ( + input: JSONValue, + _cancel, + _meta, + ctx_, + ): Promise => { + return input; + }; +} +``` + +In the case of `RPCClient`, a `ctx` with the property `timer` can be supplied with a `Timer` instance or `number` when making making an RPC call. This will override the default `timeoutTime` set on `RPCClient` for that call only. + +```ts +await rpcClient.methods.testMethod({}, { timer: 100 }); +await rpcClient.methods.testMethod({}, { timer: new Timer(undefined, 100) }); +``` + +It's important to note that any of these timeouts will ultimately be overridden by the shortest timeout of the server and client combined using the timeout middleware below. + ### Timeout Middleware The `timeoutMiddleware` sets an RPCServer's timeout based on the lowest timeout between the Client and the Server. This is so that handlers can eagerly time out and stop processing as soon as it is known that the client has timed out. diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 51aacd3..ac3e08b 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -41,7 +41,7 @@ class RPCClient { this.onTimeoutCallback = callback; } // Method proxies - public readonly streamKeepAliveTimeoutTime: number; + public readonly timeoutTime: number; public readonly methodsProxy = new Proxy( {}, { @@ -76,7 +76,7 @@ class RPCClient { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward * path and `Uint8Array` to `JSONRPCResponse` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call. + * @param obj.timeoutTime - Timeout time used if no timeout timer was provided when making a call. * Defaults to 60,000 milliseconds. * for a client call. * @param obj.logger @@ -85,7 +85,7 @@ class RPCClient { manifest, streamFactory, middlewareFactory = middleware.defaultClientMiddlewareWrapper(), - streamKeepAliveTimeoutTime = Infinity, + timeoutTime = Infinity, logger, toError = utils.toError, idGen = () => null, @@ -98,16 +98,19 @@ class RPCClient { JSONRPCResponse, Uint8Array >; - streamKeepAliveTimeoutTime?: number; + timeoutTime?: number; logger?: Logger; idGen?: IdGen; toError?: ToError; }) { + if (timeoutTime < 0) { + throw new errors.ErrorRPCInvalidTimeout(); + } this.idGen = idGen; this.callerTypes = utils.getHandlerTypes(manifest); this.streamFactory = streamFactory; this.middlewareFactory = middlewareFactory; - this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime; + this.timeoutTime = timeoutTime; this.logger = logger ?? new Logger(this.constructor.name); this.toError = toError; } @@ -254,7 +257,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeoutTime, }); } else { timer = ctx.timer; @@ -403,7 +406,7 @@ class RPCClient { let timer: Timer; if (!(ctx.timer instanceof Timer)) { timer = new Timer({ - delay: ctx.timer ?? this.streamKeepAliveTimeoutTime, + delay: ctx.timer ?? this.timeoutTime, }); } else { timer = ctx.timer; diff --git a/src/RPCServer.ts b/src/RPCServer.ts index 62dc853..fa4a80d 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -58,7 +58,7 @@ class RPCServer { protected logger: Logger; protected handlerMap: Map = new Map(); protected defaultTimeoutMap: Map = new Map(); - protected handlerTimeoutTime: number; + protected timeoutTime: number; protected activeStreams: Set> = new Set(); protected fromError: FromError; protected replacer?: (key: string, value: any) => any; @@ -80,18 +80,15 @@ class RPCServer { * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward * path and `JSONRPCResponse` to `Uint8Array` on the reverse path. - * @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the + * @param obj.timeoutTime - Time before a stream is cleaned up due to no activity. This is the * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a * signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000 * milliseconds. - * @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time. - * The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for - * the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds. * @param obj.logger */ public constructor({ middlewareFactory = middleware.defaultServerMiddlewareWrapper(), - handlerTimeoutTime = Infinity, + timeoutTime = Infinity, logger, idGen = () => null, fromError = utils.fromError, @@ -103,15 +100,18 @@ class RPCServer { Uint8Array, JSONRPCResponseResult >; - handlerTimeoutTime?: number; + timeoutTime?: number; logger?: Logger; idGen?: IdGen; fromError?: FromError; replacer?: (key: string, value: any) => any; }) { + if (timeoutTime < 0) { + throw new errors.ErrorRPCInvalidTimeout(); + } this.idGen = idGen; this.middlewareFactory = middlewareFactory; - this.handlerTimeoutTime = handlerTimeoutTime; + this.timeoutTime = timeoutTime; this.fromError = fromError; this.replacer = replacer; this.logger = logger ?? new Logger(this.constructor.name); @@ -129,58 +129,68 @@ class RPCServer { manifest: ServerManifest; }): Promise { this.logger.info(`Start ${this.constructor.name}`); - for (const [key, manifestItem] of Object.entries(manifest)) { - if (manifestItem instanceof RawHandler) { - this.registerRawStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof DuplexHandler) { - this.registerDuplexStreamHandler( - key, - // Bind the `this` to the generator handler to make the container available - manifestItem.handle.bind(manifestItem), - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ServerHandler) { - this.registerServerStreamHandler( - key, - // Bind the `this` to the generator handler to make the container available - manifestItem.handle.bind(manifestItem), - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ClientHandler) { - this.registerClientStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof ClientHandler) { - this.registerClientStreamHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; - } - if (manifestItem instanceof UnaryHandler) { - this.registerUnaryHandler( - key, - manifestItem.handle, - manifestItem.timeout, - ); - continue; + try { + for (const [key, manifestItem] of Object.entries(manifest)) { + if (manifestItem.timeout != null && manifestItem.timeout < 0) { + throw new errors.ErrorRPCInvalidHandlerTimeout(); + } + if (manifestItem instanceof RawHandler) { + this.registerRawStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof DuplexHandler) { + this.registerDuplexStreamHandler( + key, + // Bind the `this` to the generator handler to make the container available + manifestItem.handle.bind(manifestItem), + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ServerHandler) { + this.registerServerStreamHandler( + key, + // Bind the `this` to the generator handler to make the container available + manifestItem.handle.bind(manifestItem), + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ClientHandler) { + this.registerClientStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof ClientHandler) { + this.registerClientStreamHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + if (manifestItem instanceof UnaryHandler) { + this.registerUnaryHandler( + key, + manifestItem.handle, + manifestItem.timeout, + ); + continue; + } + utils.never(); } - utils.never(); + } catch (e) { + // No need to clean up streams, as streams can only be handled after RPCServer has been started. + this.handlerMap.clear(); + this.defaultTimeoutMap.clear(); + throw e; } this.logger.info(`Started ${this.constructor.name}`); } @@ -453,7 +463,7 @@ class RPCServer { const abortController = new AbortController(); // Setting up timeout timer logic const timer = new Timer({ - delay: this.handlerTimeoutTime, + delay: this.timeoutTime, handler: () => { abortController.abort(new errors.ErrorRPCTimedOut()); if (this.onTimeoutCallback) { @@ -575,7 +585,7 @@ class RPCServer { // Setting up Timeout logic const timeout = this.defaultTimeoutMap.get(method); if (timer.status !== 'settled') { - if (timeout != null && timeout < this.handlerTimeoutTime) { + if (timeout != null) { // Reset timeout with new delay if it is less than the default timer.reset(timeout); } else { diff --git a/src/errors.ts b/src/errors.ts index 4a1fd4b..69c3ce3 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -27,6 +27,14 @@ class ErrorRPCCallerFailed extends ErrorRPC { static description = 'Failed to call stream'; } +class ErrorRPCInvalidTimeout extends ErrorRPC { + static description = 'Invalid timeout provided'; +} + +class ErrorRPCInvalidHandlerTimeout extends ErrorRPC { + static description = 'Invalid handler timeout provided'; +} + abstract class ErrorRPCProtocol extends ErrorRPC { static error = 'RPC Protocol Error'; code: number; @@ -257,6 +265,8 @@ export { ErrorRPCConnectionLocal, ErrorRPCConnectionPeer, ErrorRPCConnectionKeepAliveTimeOut, + ErrorRPCInvalidTimeout, + ErrorRPCInvalidHandlerTimeout, ErrorRPCConnectionInternal, ErrorMissingHeader, ErrorHandlerAborted, diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index 52f1dfa..3c17361 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -609,7 +609,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: timeout, + timeoutTime: timeout, }); await rpcServer.start({ manifest: { @@ -701,7 +701,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 1, + timeoutTime: 1, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); // Register callback @@ -774,8 +774,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - - handlerTimeoutTime: 400, + timeoutTime: 400, }); await rpcServer.start({ manifest: { @@ -839,11 +838,10 @@ describe('RPC', () => { await abortProm.p; }; } - const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: Infinity, + timeoutTime: Infinity, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); @@ -924,7 +922,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 150, + timeoutTime: 150, }); await rpcServer.start({ manifest: { @@ -984,7 +982,7 @@ describe('RPC', () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 150, + timeoutTime: 150, }); await rpcServer.start({ manifest: { diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 113f953..c7e724f 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -750,7 +750,7 @@ describe(`${RPCClient.name}`, () => { // Should never reach this when testing return {} as RPCStream; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -919,7 +919,7 @@ describe(`${RPCClient.name}`, () => { // Should never reach this when testing return {} as RPCStream; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -1006,7 +1006,7 @@ describe(`${RPCClient.name}`, () => { ctx = ctx_; return streamPair; }, - streamKeepAliveTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -1149,7 +1149,29 @@ describe(`${RPCClient.name}`, () => { { numRuns: 5 }, ); testProp( - 'Check that ctx is provided to the middleWare and that the middleware can reset the timer', + 'RPCClient constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const constructorF = () => + new RPCClient({ + timeoutTime, + streamFactory: () => Promise.resolve(streamPair), + manifest: {}, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); + testProp( + 'Check that ctx is provided to the middleware and that the middleware can reset the timer', [specificMessageArb], async (messages) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); @@ -1197,4 +1219,111 @@ describe(`${RPCClient.name}`, () => { { numRuns: 1 }, ); }); + describe('timeout priority', () => { + testProp( + 'check that call with ctx can override higher timeout of RPCClient', + [rpcTestUtils.timeoutsArb], + async ([lowerTimeoutTime, higherTimeoutTime]) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime: higherTimeoutTime, + }); + + await rpcClient.duplexStreamCaller( + methodName, + { + timer: lowerTimeoutTime, + }, + ); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(lowerTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that call with ctx can override lower timeout of RPCClient', + [rpcTestUtils.timeoutsArb], + async ([lowerTimeoutTime, higherTimeoutTime]) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime: lowerTimeoutTime, + }); + + await rpcClient.duplexStreamCaller( + methodName, + { + timer: higherTimeoutTime, + }, + ); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(higherTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that call with ctx can override lower timeout of RPCClient with Infinity', + [fc.integer({ min: 0 })], + async (timeoutTime) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const { p: ctxP, resolveP: resolveCtxP } = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + resolveCtxP(ctx); + return streamPair; + }, + logger, + idGen, + timeoutTime, + }); + + await rpcClient.duplexStreamCaller( + methodName, + { + timer: Infinity, + }, + ); + + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(Infinity); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + }); }); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index 3455b12..0387071 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -57,7 +57,6 @@ describe(`${RPCServer.name}`, () => { data: rpcTestUtils.safeJsonValueArb, }), ); - testProp( 'can stream data with raw duplex stream handler', [specificMessageArb], @@ -884,7 +883,7 @@ describe(`${RPCServer.name}`, () => { } const rpcServer = new RPCServer({ - handlerTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -930,7 +929,7 @@ describe(`${RPCServer.name}`, () => { }); test('timeout with default time before handler selected', async () => { const rpcServer = new RPCServer({ - handlerTimeoutTime: 100, + timeoutTime: 100, logger, idGen, }); @@ -957,85 +956,6 @@ describe(`${RPCServer.name}`, () => { } await rpcServer.stop({ force: true }); }); - test('handler overrides timeout', async () => { - { - const waitProm = promise(); - const ctxShortProm = promise(); - class TestMethodShortTimeout extends UnaryHandler { - timeout = 25; - public handle = async ( - input: JSONRPCParams, - _cancel, - _meta, - ctx_, - ): Promise => { - ctxShortProm.resolveP(ctx_); - await waitProm.p; - return input; - }; - } - const ctxLongProm = promise(); - class TestMethodLongTimeout extends UnaryHandler { - timeout = 100; - public handle = async ( - input: JSONRPCParams, - _cancel, - _meta, - ctx_, - ): Promise => { - ctxLongProm.resolveP(ctx_); - await waitProm.p; - return input; - }; - } - const rpcServer = new RPCServer({ - handlerTimeoutTime: 50, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testShort: new TestMethodShortTimeout({}), - testLong: new TestMethodLongTimeout({}), - }, - }); - const streamShort = rpcTestUtils.messagesToReadableStream([ - { - jsonrpc: '2.0', - method: 'testShort', - params: {}, - }, - ]); - const readWriteStreamShort: RPCStream = { - cancel: () => {}, - readable: streamShort, - writable: new WritableStream(), - }; - rpcServer.handleStream(readWriteStreamShort); - // Shorter timeout is updated - const ctxShort = await ctxShortProm.p; - expect(ctxShort.timer.delay).toEqual(25); - const streamLong = rpcTestUtils.messagesToReadableStream([ - { - jsonrpc: '2.0', - method: 'testLong', - params: {}, - }, - ]); - const readWriteStreamLong: RPCStream = { - cancel: () => {}, - readable: streamLong, - writable: new WritableStream(), - }; - rpcServer.handleStream(readWriteStreamLong); - - // Longer timeout is set to server's default - const ctxLong = await ctxLongProm.p; - expect(ctxLong.timer.delay).toEqual(50); - waitProm.resolveP(); - await rpcServer.stop({ force: true }); - } - }); test('duplex handler refreshes timeout when messages are sent', async () => { const contextProm = promise(); const stepProm1 = promise(); @@ -1061,7 +981,7 @@ describe(`${RPCServer.name}`, () => { const rpcServer = new RPCServer({ logger, idGen, - handlerTimeoutTime: 1000, + timeoutTime: 1000, }); await rpcServer.start({ manifest: { @@ -1162,6 +1082,54 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); + testProp( + 'RPCServer constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const constructorF = () => + new RPCServer({ + timeoutTime, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); + testProp( + 'RPCServer.start should throw when passed a handler with negative timeout', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const waitProm = promise(); + const ctxLongProm = promise(); + + class TestMethodArbitraryTimeout extends UnaryHandler { + timeout = timeoutTime; + public handle = async ( + input: JSONRPCParams, + _cancel, + _meta, + ctx_, + ): Promise => { + ctxLongProm.resolveP(ctx_); + await waitProm.p; + return input; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + + await expect( + rpcServer.start({ + manifest: { + testArbitrary: new TestMethodArbitraryTimeout({}), + }, + }), + ).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout); + }, + ); testProp( 'middleware can update timeout timer', [specificMessageArb], @@ -1209,4 +1177,132 @@ describe(`${RPCServer.name}`, () => { expect(ctx.timer.delay).toBe(12345); }, ); + describe('timeout priority', () => { + testProp( + 'check that handler can override higher timeout of RPCServer', + [specificMessageArb, rpcTestUtils.timeoutsArb], + async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = lowerTimeoutTime; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime: higherTimeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(lowerTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + testProp( + 'check that handler can override lower timeout of RPCServer', + [specificMessageArb, rpcTestUtils.timeoutsArb], + async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = higherTimeoutTime; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime: lowerTimeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(higherTimeoutTime); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); + }); + testProp( + 'check that handler can override lower timeout of RPCServer with Infinity', + [specificMessageArb, fc.integer({ min: 0 })], + async (messages, timeoutTime) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const { p: ctxP, resolveP: resolveCtxP } = promise(); + class TestMethod extends DuplexHandler { + public timeout = Infinity; + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + resolveCtxP(ctx); + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + timeoutTime, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxP; + expect(ctx.timer.delay).toBe(Infinity); + ctx.timer.cancel(); + await ctx.timer.catch(() => {}); + }, + ); }); diff --git a/tests/utils.ts b/tests/utils.ts index cea830e..6c92433 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -274,6 +274,15 @@ const errorArb = ( ), ); +const timeoutsArb = fc + .integer({ min: 0 }) + .chain((lowerTimeoutTime) => + fc.tuple( + fc.constant(lowerTimeoutTime), + fc.integer({ min: lowerTimeoutTime }), + ), + ); + export { binaryStreamToSnippedStream, binaryStreamToNoisyStream, @@ -295,4 +304,5 @@ export { tapTransformStream, createTapPairs, errorArb, + timeoutsArb, };