From 8da1dc73a3d36314ed52794949c93869437a5c65 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Thu, 2 Nov 2023 12:39:47 +1100 Subject: [PATCH 1/2] feat: timeouts on `RPCClient` and `RPCServer` are now cancelled after the first initial server-sent message feat: timeouts on `RPCClient`are now cancelled after the first sent message fix: `RPCServer` and `RPCClient` tests regarding new timeout changes that cancel timer on first server message sent [ci-skip] --- src/RPCClient.ts | 17 ++-- src/RPCServer.ts | 5 +- src/utils.ts | 10 +- tests/RPCClient.test.ts | 202 ++++++++++++++++++++++------------------ tests/RPCServer.test.ts | 124 +++++++++++------------- 5 files changed, 180 insertions(+), 178 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index ac3e08b..83f03d6 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -307,10 +307,11 @@ class RPCClient { }, () => {}, // Ignore cancellation error ); - // Deciding if we want to allow refreshing - // We want to refresh timer if none was provided - const refreshingTimer: Timer | undefined = - ctx.timer == null ? timer : undefined; + // Deciding if we want to allow cancelling + // We want to cancel timer if none was provided + const cancellingTimer: Timer | undefined = !(ctx.timer instanceof Timer) + ? timer + : undefined; // Composing stream transforms and middleware const metadata = { ...(rpcStream.meta ?? {}), @@ -319,12 +320,10 @@ class RPCClient { const outputMessageTransformStream = utils.clientOutputTransformStream( metadata, this.toError, - refreshingTimer, - ); - const inputMessageTransformStream = utils.clientInputTransformStream( - method, - refreshingTimer, + cancellingTimer, ); + const inputMessageTransformStream = + utils.clientInputTransformStream(method); const middleware = this.middlewareFactory( { signal, timer }, rpcStream.cancel, diff --git a/src/RPCServer.ts b/src/RPCServer.ts index fa4a80d..eb7089b 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -297,9 +297,6 @@ class RPCServer { // Input generator derived from the forward stream const inputGen = async function* (): AsyncIterable { for await (const data of forwardStream) { - if (ctx.timer.status !== 'settled') { - ctx.timer.refresh(); - } yield data.params as I; } }; @@ -309,7 +306,7 @@ class RPCServer { }); for await (const response of handlerG) { if (ctx.timer.status !== 'settled') { - ctx.timer.refresh(); + ctx.timer.cancel(utils.timeoutCancelledReason); } const responseMessage: JSONRPCResponseResult = { jsonrpc: '2.0', diff --git a/src/utils.ts b/src/utils.ts index 785ca2b..923bfcd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -20,6 +20,8 @@ import { JSONParser } from '@streamparser/json'; import { AbstractError } from '@matrixai/errors'; import * as errors from './errors'; +const timeoutCancelledReason = Symbol('timeoutCancelledReason'); + // Importing PK funcs and utils which are essential for RPC function isObject(o: unknown): o is object { return o !== null && typeof o === 'object'; @@ -410,15 +412,12 @@ function toError( * one is provided. * @param method - Name of the method that was called, used to select the * server side. - * @param timer - Timer that gets refreshed each time a message is provided. */ function clientInputTransformStream( method: string, - timer?: Timer, ): TransformStream { return new TransformStream({ transform: (chunk, controller) => { - timer?.refresh(); const message: JSONRPCRequest = { method, jsonrpc: '2.0', @@ -446,7 +445,9 @@ function clientOutputTransformStream( ): TransformStream, O> { return new TransformStream | JSONRPCResponseError, O>({ transform: (chunk, controller) => { - timer?.refresh(); + if (timer?.status !== 'settled') { + timer?.cancel(timeoutCancelledReason); + } // `error` indicates it's an error message if ('error' in chunk) { const e = toError(chunk.error.data, clientMetadata); @@ -542,6 +543,7 @@ function never(): never { } export { + timeoutCancelledReason, parseJSONRPCRequest, parseJSONRPCRequestMessage, parseJSONRPCRequestNotification, diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index c7e724f..481d501 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -10,6 +10,7 @@ import type { import type { IdGen } from '@/types'; import { TransformStream, ReadableStream } from 'stream/web'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import { Timer } from '@matrixai/timer'; import { testProp, fc } from '@fast-check/jest'; import RawCaller from '@/callers/RawCaller'; import DuplexCaller from '@/callers/DuplexCaller'; @@ -20,7 +21,7 @@ import RPCClient from '@/RPCClient'; import RPCServer from '@/RPCServer'; import * as rpcErrors from '@/errors'; import * as rpcUtilsMiddleware from '@/middleware'; -import { promise, sleep } from '@/utils'; +import { promise, timeoutCancelledReason } from '@/utils'; import * as rpcTestUtils from './utils'; describe(`${RPCClient.name}`, () => { @@ -738,6 +739,28 @@ describe(`${RPCClient.name}`, () => { // @ts-ignore: ignoring type safety here expect(() => rpcClient.withMethods.someMethod()).toThrow(); }); + testProp( + 'constructor should throw when passed a negative timeoutTime', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream(), + writable: new WritableStream(), + }; + const constructorF = () => + new RPCClient({ + timeoutTime, + streamFactory: () => Promise.resolve(streamPair), + manifest: {}, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); describe('raw caller', () => { test('raw caller uses default timeout when creating stream', async () => { const holdProm = promise(); @@ -1097,17 +1120,15 @@ describe(`${RPCClient.name}`, () => { stream.cancel(Error('asd')); }); testProp( - 'duplex caller timeout is refreshed when sending message', + 'duplex caller timeout is cancelled when receiving message', [specificMessageArb], async (messages) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); const streamPair: RPCStream = { cancel: () => {}, meta: undefined, readable: inputStream, - writable: outputStream, + writable: new WritableStream(), }; const ctxProm = promise(); const rpcClient = new RPCClient({ @@ -1125,100 +1146,53 @@ describe(`${RPCClient.name}`, () => { >(methodName, { timer: 200 }); const ctx = await ctxProm.p; - // Reading refreshes timer const reader = callerInterface.readable.getReader(); - await sleep(50); - let timeLeft = ctx.timer.getTimeout(); - const message = await reader.read(); - expect(ctx.timer.getTimeout() + 2).toBeGreaterThanOrEqual(timeLeft); reader.releaseLock(); for await (const _ of callerInterface.readable) { // Do nothing } - - // Writing should refresh timer - const writer = callerInterface.writable.getWriter(); - await sleep(50); - timeLeft = ctx.timer.getTimeout(); - await writer.write(message.value); - expect(ctx.timer.getTimeout() + 1).toBeGreaterThanOrEqual(timeLeft); - await writer.close(); - - await outputResult; + await expect(ctx.timer).rejects.toBe(timeoutCancelledReason); }, { numRuns: 5 }, ); - testProp( - 'RPCClient constructor should throw when passed a negative timeoutTime', - [fc.integer({ max: -1 })], - async (timeoutTime) => { - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: new ReadableStream(), - writable: new WritableStream(), - }; - const constructorF = () => - new RPCClient({ - timeoutTime, - streamFactory: () => Promise.resolve(streamPair), - manifest: {}, - logger, - idGen, - }); - - expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); - }, - ); - testProp( - 'Check that ctx is provided to the middleware and that the middleware can reset the timer', - [specificMessageArb], - async (messages) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const ctxProm = promise(); - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async (ctx) => { - ctxProm.resolveP(ctx); - return streamPair; - }, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - (ctx) => { - ctx.timer.reset(123); - return { - forward: new TransformStream(), - reverse: new TransformStream(), - }; - }, - ), - logger, - idGen, - }); - const callerInterface = await rpcClient.duplexStreamCaller< - JSONRPCParams, - JSONRPCResult - >(methodName); - - const ctx = await ctxProm.p; - // Writing should refresh timer engage the middleware - const writer = callerInterface.writable.getWriter(); - await writer.write({}); - expect(ctx.timer.delay).toBe(123); - await writer.close(); - - await outputResult; - }, - { numRuns: 1 }, - ); }); + testProp( + 'duplex caller timeout is not cancelled when receiving message with provided ctx', + [specificMessageArb], + async (messages) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: new WritableStream(), + }; + const ctxProm = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + ctxProm.resolveP(ctx); + return streamPair; + }, + logger, + idGen, + }); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONRPCParams, + JSONRPCResult + >(methodName, { timer: new Timer(undefined, 200) }); + + const ctx = await ctxProm.p; + const reader = callerInterface.readable.getReader(); + reader.releaseLock(); + for await (const _ of callerInterface.readable) { + // Do nothing + } + await ctx.timer; + expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); + }, + { numRuns: 5 }, + ); describe('timeout priority', () => { testProp( 'check that call with ctx can override higher timeout of RPCClient', @@ -1325,5 +1299,53 @@ describe(`${RPCClient.name}`, () => { await ctx.timer.catch(() => {}); }, ); + testProp( + 'Check that ctx is provided to the middleware and that the middleware can reset the timer', + [specificMessageArb], + async (messages) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const ctxProm = promise(); + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async (ctx) => { + ctxProm.resolveP(ctx); + return streamPair; + }, + middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( + (ctx) => { + ctx.timer.reset(123); + return { + forward: new TransformStream(), + reverse: new TransformStream(), + }; + }, + ), + logger, + idGen, + }); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONRPCParams, + JSONRPCResult + >(methodName); + + const ctx = await ctxProm.p; + // Writing should refresh timer engage the middleware + const writer = callerInterface.writable.getWriter(); + await writer.write({}); + expect(ctx.timer.delay).toBe(123); + await writer.close(); + + await outputResult; + }, + { numRuns: 1 }, + ); }); }); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index 0387071..4262b97 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -17,7 +17,7 @@ import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import RPCServer from '@/RPCServer'; import * as rpcErrors from '@/errors'; import * as rpcUtils from '@/utils'; -import { promise, sleep } from '@/utils'; +import { promise } from '@/utils'; import * as rpcUtilsMiddleware from '@/middleware'; import ServerHandler from '@/handlers/ServerHandler'; import DuplexHandler from '@/handlers/DuplexHandler'; @@ -849,7 +849,54 @@ describe(`${RPCServer.name}`, () => { await rpcServer.stop({ force: true }); }, ); + testProp( + 'constructor should throw when passed a negative timeout', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const constructorF = () => + new RPCServer({ + timeoutTime, + logger, + idGen, + }); + + expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + }, + ); + testProp( + 'start should throw when passed a handler with negative timeout', + [fc.integer({ max: -1 })], + async (timeoutTime) => { + const waitProm = promise(); + const ctxLongProm = promise(); + + class TestMethodArbitraryTimeout extends UnaryHandler { + timeout = timeoutTime; + public handle = async ( + input: JSONRPCParams, + _cancel, + _meta, + ctx_, + ): Promise => { + ctxLongProm.resolveP(ctx_); + await waitProm.p; + return input; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await expect( + rpcServer.start({ + manifest: { + testArbitrary: new TestMethodArbitraryTimeout({}), + }, + }), + ).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout); + }, + ); test('timeout with default time after handler selected', async () => { const ctxProm = promise(); @@ -956,10 +1003,8 @@ describe(`${RPCServer.name}`, () => { } await rpcServer.stop({ force: true }); }); - test('duplex handler refreshes timeout when messages are sent', async () => { + test('duplex handler cancels timeout when messages are sent', async () => { const contextProm = promise(); - const stepProm1 = promise(); - const stepProm2 = promise(); const passthroughStream = new TransformStream(); class TestHandler extends DuplexHandler { public handle = async function* ( @@ -970,12 +1015,9 @@ describe(`${RPCServer.name}`, () => { ): AsyncGenerator> { contextProm.resolveP(ctx); for await (const _ of input) { - // Do nothing, just consume + // Do nothing } - await stepProm1.p; yield { value: 1 }; - await stepProm2.p; - yield { value: 2 }; }; } const rpcServer = new RPCServer({ @@ -1003,26 +1045,14 @@ describe(`${RPCServer.name}`, () => { }; rpcServer.handleStream(readWriteStream); const writer = passthroughStream.writable.getWriter(); + // Send request for method await writer.write(requestMessage); const ctx = await contextProm.p; - const scheduled: Date | undefined = ctx.timer.scheduled; - // Checking writing refreshes timer - await sleep(25); + // Send data await writer.write(requestMessage); - expect(ctx.timer.scheduled).toBeAfter(scheduled!); - expect( - ctx.timer.scheduled!.getTime() - scheduled!.getTime(), - ).toBeGreaterThanOrEqual(25); await writer.close(); - // Checking reading refreshes timer - await sleep(25); - stepProm1.resolveP(); - expect(ctx.timer.scheduled).toBeAfter(scheduled!); - expect( - ctx.timer.scheduled!.getTime() - scheduled!.getTime(), - ).toBeGreaterThanOrEqual(25); - stepProm2.resolveP(); await outputResult; + await expect(ctx.timer).rejects.toBe(rpcUtils.timeoutCancelledReason); await rpcServer.stop({ force: true }); }); test('stream ending cleans up timer and abortSignal', async () => { @@ -1082,54 +1112,6 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); - testProp( - 'RPCServer constructor should throw when passed a negative timeoutTime', - [fc.integer({ max: -1 })], - async (timeoutTime) => { - const constructorF = () => - new RPCServer({ - timeoutTime, - logger, - idGen, - }); - - expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); - }, - ); - testProp( - 'RPCServer.start should throw when passed a handler with negative timeout', - [fc.integer({ max: -1 })], - async (timeoutTime) => { - const waitProm = promise(); - const ctxLongProm = promise(); - - class TestMethodArbitraryTimeout extends UnaryHandler { - timeout = timeoutTime; - public handle = async ( - input: JSONRPCParams, - _cancel, - _meta, - ctx_, - ): Promise => { - ctxLongProm.resolveP(ctx_); - await waitProm.p; - return input; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - - await expect( - rpcServer.start({ - manifest: { - testArbitrary: new TestMethodArbitraryTimeout({}), - }, - }), - ).rejects.toBeInstanceOf(rpcErrors.ErrorRPCInvalidHandlerTimeout); - }, - ); testProp( 'middleware can update timeout timer', [specificMessageArb], From 356f456e4f371e1b500296cbfa2ca21fea6c0972 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Thu, 2 Nov 2023 13:59:30 +1100 Subject: [PATCH 2/2] feat: added more context for new timeouts system in `README.md` [ci-skip] --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 0378099..8f49c0f 100644 --- a/README.md +++ b/README.md @@ -391,6 +391,14 @@ If the client were to time out, the stream is forcibly closed and `ErrorRPCTimed If the server were to time out, is is advisory. Meaning that the server may choose to optionally eagerly throw `ErrorRPCTimedOut`, or continue processing as normal. +After the client receives the subsequent message from the server, the timeout timer is cancelled. + +Likewise on the server, the timeout timer is cancelled after the first message is sent to the client. + +This means that the timeout for Streaming calls acts as a Proof of Life, and after it is established, the timeout no longer applies. This allows for long-running Streaming calls. + +Note that when supplying a `Timer` instance to the call-site in `RPCClient`, the timeout timer will not be cancelled. As it is expected for the library to not mutate the passed-in `Timer`, and for the user to expect that receiving a messsage will have meaned that the timer no longer matters. + #### Throwing Timeouts Server-Side By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so: