Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,14 @@ If the client were to time out, the stream is forcibly closed and `ErrorRPCTimed

If the server were to time out, is is advisory. Meaning that the server may choose to optionally eagerly throw `ErrorRPCTimedOut`, or continue processing as normal.

After the client receives the subsequent message from the server, the timeout timer is cancelled.

Likewise on the server, the timeout timer is cancelled after the first message is sent to the client.

This means that the timeout for Streaming calls acts as a Proof of Life, and after it is established, the timeout no longer applies. This allows for long-running Streaming calls.

Note that when supplying a `Timer` instance to the call-site in `RPCClient`, the timeout timer will not be cancelled. As it is expected for the library to not mutate the passed-in `Timer`, and for the user to expect that receiving a messsage will have meaned that the timer no longer matters.

#### Throwing Timeouts Server-Side

By default, a timeout will not cause an RPC call to automatically throw, this must be manually done by the handler when it receives the abort signal from `ctx.signal`. An example of this is like so:
Expand Down
17 changes: 8 additions & 9 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ class RPCClient<M extends ClientManifest> {
},
() => {}, // Ignore cancellation error
);
// Deciding if we want to allow refreshing
// We want to refresh timer if none was provided
const refreshingTimer: Timer | undefined =
ctx.timer == null ? timer : undefined;
// Deciding if we want to allow cancelling
// We want to cancel timer if none was provided
const cancellingTimer: Timer | undefined = !(ctx.timer instanceof Timer)
? timer
: undefined;
// Composing stream transforms and middleware
const metadata = {
...(rpcStream.meta ?? {}),
Expand All @@ -319,12 +320,10 @@ class RPCClient<M extends ClientManifest> {
const outputMessageTransformStream = utils.clientOutputTransformStream<O>(
metadata,
this.toError,
refreshingTimer,
);
const inputMessageTransformStream = utils.clientInputTransformStream<I>(
method,
refreshingTimer,
cancellingTimer,
);
const inputMessageTransformStream =
utils.clientInputTransformStream<I>(method);
const middleware = this.middlewareFactory(
{ signal, timer },
rpcStream.cancel,
Expand Down
5 changes: 1 addition & 4 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ class RPCServer {
// Input generator derived from the forward stream
const inputGen = async function* (): AsyncIterable<I> {
for await (const data of forwardStream) {
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
}
yield data.params as I;
}
};
Expand All @@ -309,7 +306,7 @@ class RPCServer {
});
for await (const response of handlerG) {
if (ctx.timer.status !== 'settled') {
ctx.timer.refresh();
ctx.timer.cancel(utils.timeoutCancelledReason);
}
const responseMessage: JSONRPCResponseResult = {
jsonrpc: '2.0',
Expand Down
10 changes: 6 additions & 4 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { JSONParser } from '@streamparser/json';
import { AbstractError } from '@matrixai/errors';
import * as errors from './errors';

const timeoutCancelledReason = Symbol('timeoutCancelledReason');

// Importing PK funcs and utils which are essential for RPC
function isObject(o: unknown): o is object {
return o !== null && typeof o === 'object';
Expand Down Expand Up @@ -410,15 +412,12 @@ function toError(
* one is provided.
* @param method - Name of the method that was called, used to select the
* server side.
* @param timer - Timer that gets refreshed each time a message is provided.
*/
function clientInputTransformStream<I extends JSONObject>(
method: string,
timer?: Timer,
): TransformStream<I, JSONRPCRequest> {
return new TransformStream<I, JSONRPCRequest>({
transform: (chunk, controller) => {
timer?.refresh();
const message: JSONRPCRequest = {
method,
jsonrpc: '2.0',
Expand Down Expand Up @@ -446,7 +445,9 @@ function clientOutputTransformStream<O extends JSONObject>(
): TransformStream<JSONRPCResponse<O>, O> {
return new TransformStream<JSONRPCResponse<O> | JSONRPCResponseError, O>({
transform: (chunk, controller) => {
timer?.refresh();
if (timer?.status !== 'settled') {
timer?.cancel(timeoutCancelledReason);
}
// `error` indicates it's an error message
if ('error' in chunk) {
const e = toError(chunk.error.data, clientMetadata);
Expand Down Expand Up @@ -542,6 +543,7 @@ function never(): never {
}

export {
timeoutCancelledReason,
parseJSONRPCRequest,
parseJSONRPCRequestMessage,
parseJSONRPCRequestNotification,
Expand Down
202 changes: 112 additions & 90 deletions tests/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
import type { IdGen } from '@/types';
import { TransformStream, ReadableStream } from 'stream/web';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { Timer } from '@matrixai/timer';
import { testProp, fc } from '@fast-check/jest';
import RawCaller from '@/callers/RawCaller';
import DuplexCaller from '@/callers/DuplexCaller';
Expand All @@ -20,7 +21,7 @@ import RPCClient from '@/RPCClient';
import RPCServer from '@/RPCServer';
import * as rpcErrors from '@/errors';
import * as rpcUtilsMiddleware from '@/middleware';
import { promise, sleep } from '@/utils';
import { promise, timeoutCancelledReason } from '@/utils';
import * as rpcTestUtils from './utils';

describe(`${RPCClient.name}`, () => {
Expand Down Expand Up @@ -738,6 +739,28 @@ describe(`${RPCClient.name}`, () => {
// @ts-ignore: ignoring type safety here
expect(() => rpcClient.withMethods.someMethod()).toThrow();
});
testProp(
'constructor should throw when passed a negative timeoutTime',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const constructorF = () =>
new RPCClient({
timeoutTime,
streamFactory: () => Promise.resolve(streamPair),
manifest: {},
logger,
idGen,
});

expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout);
},
);
describe('raw caller', () => {
test('raw caller uses default timeout when creating stream', async () => {
const holdProm = promise();
Expand Down Expand Up @@ -1097,17 +1120,15 @@ describe(`${RPCClient.name}`, () => {
stream.cancel(Error('asd'));
});
testProp(
'duplex caller timeout is refreshed when sending message',
'duplex caller timeout is cancelled when receiving message',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
writable: new WritableStream(),
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
Expand All @@ -1125,100 +1146,53 @@ describe(`${RPCClient.name}`, () => {
>(methodName, { timer: 200 });

const ctx = await ctxProm.p;
// Reading refreshes timer
const reader = callerInterface.readable.getReader();
await sleep(50);
let timeLeft = ctx.timer.getTimeout();
const message = await reader.read();
expect(ctx.timer.getTimeout() + 2).toBeGreaterThanOrEqual(timeLeft);
reader.releaseLock();
for await (const _ of callerInterface.readable) {
// Do nothing
}

// Writing should refresh timer
const writer = callerInterface.writable.getWriter();
await sleep(50);
timeLeft = ctx.timer.getTimeout();
await writer.write(message.value);
expect(ctx.timer.getTimeout() + 1).toBeGreaterThanOrEqual(timeLeft);
await writer.close();

await outputResult;
await expect(ctx.timer).rejects.toBe(timeoutCancelledReason);
},
{ numRuns: 5 },
);
testProp(
'RPCClient constructor should throw when passed a negative timeoutTime',
[fc.integer({ max: -1 })],
async (timeoutTime) => {
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: new ReadableStream(),
writable: new WritableStream(),
};
const constructorF = () =>
new RPCClient({
timeoutTime,
streamFactory: () => Promise.resolve(streamPair),
manifest: {},
logger,
idGen,
});

expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout);
},
);
testProp(
'Check that ctx is provided to the middleware and that the middleware can reset the timer',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper(
(ctx) => {
ctx.timer.reset(123);
return {
forward: new TransformStream(),
reverse: new TransformStream(),
};
},
),
logger,
idGen,
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONRPCParams,
JSONRPCResult
>(methodName);

const ctx = await ctxProm.p;
// Writing should refresh timer engage the middleware
const writer = callerInterface.writable.getWriter();
await writer.write({});
expect(ctx.timer.delay).toBe(123);
await writer.close();

await outputResult;
},
{ numRuns: 1 },
);
});
testProp(
'duplex caller timeout is not cancelled when receiving message with provided ctx',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: new WritableStream(),
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
logger,
idGen,
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONRPCParams,
JSONRPCResult
>(methodName, { timer: new Timer(undefined, 200) });

const ctx = await ctxProm.p;
const reader = callerInterface.readable.getReader();
reader.releaseLock();
for await (const _ of callerInterface.readable) {
// Do nothing
}
await ctx.timer;
expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut);
},
{ numRuns: 5 },
);
describe('timeout priority', () => {
testProp(
'check that call with ctx can override higher timeout of RPCClient',
Expand Down Expand Up @@ -1325,5 +1299,53 @@ describe(`${RPCClient.name}`, () => {
await ctx.timer.catch(() => {});
},
);
testProp(
'Check that ctx is provided to the middleware and that the middleware can reset the timer',
[specificMessageArb],
async (messages) => {
const inputStream = rpcTestUtils.messagesToReadableStream(messages);
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
cancel: () => {},
meta: undefined,
readable: inputStream,
writable: outputStream,
};
const ctxProm = promise<ContextTimed>();
const rpcClient = new RPCClient({
manifest: {},
streamFactory: async (ctx) => {
ctxProm.resolveP(ctx);
return streamPair;
},
middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper(
(ctx) => {
ctx.timer.reset(123);
return {
forward: new TransformStream(),
reverse: new TransformStream(),
};
},
),
logger,
idGen,
});
const callerInterface = await rpcClient.duplexStreamCaller<
JSONRPCParams,
JSONRPCResult
>(methodName);

const ctx = await ctxProm.p;
// Writing should refresh timer engage the middleware
const writer = callerInterface.writable.getWriter();
await writer.write({});
expect(ctx.timer.delay).toBe(123);
await writer.close();

await outputResult;
},
{ numRuns: 1 },
);
});
});
Loading