From 7a3fe481c5badfb4b21e4e66515b8ffd12497332 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 24 Jan 2024 16:18:09 +1100 Subject: [PATCH 1/4] fix: added grace time to client timeout This allows for a period of time for the RPCServer to respond if it's handler times out at the same time. --- src/RPCClient.ts | 15 +++- tests/RPC.test.ts | 158 ++++++++++++++++++++++++++++++++++++++++ tests/RPCClient.test.ts | 11 ++- 3 files changed, 182 insertions(+), 2 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index bc5b641..9903baf 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -42,6 +42,7 @@ class RPCClient { } // Method proxies public readonly timeoutTime: number; + public readonly graceTime: number; public readonly methodsProxy = new Proxy( {}, { @@ -86,6 +87,7 @@ class RPCClient { streamFactory, middlewareFactory = middleware.defaultClientMiddlewareWrapper(), timeoutTime = Infinity, + graceTime = 1000, logger, toError = utils.toError, idGen = () => null, @@ -99,6 +101,7 @@ class RPCClient { Uint8Array >; timeoutTime?: number; + graceTime?: number; logger?: Logger; idGen?: IdGen; toError?: ToError; @@ -111,6 +114,7 @@ class RPCClient { this.streamFactory = streamFactory; this.middlewareFactory = middlewareFactory; this.timeoutTime = timeoutTime; + this.graceTime = graceTime; this.logger = logger ?? new Logger(this.constructor.name); this.toError = toError; } @@ -262,7 +266,9 @@ class RPCClient { } else { timer = ctx.timer; } + let timerGrace: Timer | undefined; const cleanUp = () => { + if (timerGrace != null) timerGrace.cancel(timerCleanupReasonSymbol); // Clean up the timer and signal if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol); if (ctx.signal != null) { @@ -298,7 +304,14 @@ class RPCClient { throw e; } void timer.then( - () => { + async () => { + timerGrace = new Timer({ delay: this.graceTime }); + try { + await timerGrace; + } catch (e) { + if (e === timerCleanupReasonSymbol) return; + throw e; + } rpcStream.cancel( new errors.ErrorRPCTimedOut('RPC has timed out', { cause: ctx.signal?.reason, diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index fef381f..cd86f5c 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -266,6 +266,91 @@ describe('RPC', () => { await rpcServer.stop({ force: true }); }, ); + testProp( + 'RPC communication with duplex stream responds after timeout', + [fc.array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 }).noShrink()], + async (values) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + const { p, resolveP } = utils.promise(); + if (ctx.signal.aborted) resolveP(); + ctx.signal.addEventListener( + 'abort', + () => { + resolveP(); + }, + { once: true }, + ); + await p; + yield* input; + }; + } + const rpcServer = new RPCServer({ + timeoutTime: 500, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + + let aborted = false; + const rpcClient = new RPCClient({ + manifest: { + testMethod: new DuplexCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => { + aborted = true; + }, + }; + }, + timeoutTime: 500, + graceTime: 1000, + logger, + idGen, + }); + + const callerInterface = await rpcClient.methods.testMethod(); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + for (const value of values) { + await writer.write(value); + const receivedValue = (await reader.read()).value; + if ( + receivedValue?.metadata != null && + receivedValue.metadata.timeout === null + ) { + receivedValue.metadata.timeout = Infinity; + } + expect(receivedValue).toStrictEqual(value); + } + await writer.close(); + const result = await reader.read(); + expect(result.value).toBeUndefined(); + expect(result.done).toBeTrue(); + expect(aborted).toBeFalse(); + await rpcServer.stop({ force: true }); + }, + { numRuns: 1 }, + ); testProp( 'RPC communication with server stream', [fc.integer({ min: 1, max: 100 })], @@ -443,6 +528,79 @@ describe('RPC', () => { await rpcServer.stop({ force: true }); }, ); + testProp( + 'RPC communication with unary call responds after timeout', + [rpcTestUtils.safeJsonObjectArb], + async (value) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONRPCRequestParams, + _cancel, + _meta, + ctx: ContextTimed, + ): Promise => { + const { p, resolveP } = utils.promise(); + if (ctx.signal.aborted) resolveP(); + ctx.signal.addEventListener( + 'abort', + () => { + resolveP(); + }, + { once: true }, + ); + await p; + return input; + }; + } + const rpcServer = new RPCServer({ + timeoutTime: 500, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + + let aborted = false; + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => { + aborted = true; + }, + }; + }, + timeoutTime: 500, + graceTime: 1000, + logger, + idGen, + }); + + const result = await rpcClient.methods.testMethod(value); + if (result.metadata != null && result.metadata.timeout === null) { + result.metadata.timeout = Infinity; + } + expect(result).toEqual(value); + expect(aborted).toBeFalse(); + await rpcServer.stop({ force: true }); + }, + { numRuns: 1 }, + ); testProp( 'RPC handles and sends errors', [ diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index 63526bc..e1e5838 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -26,6 +26,7 @@ import RPCServer from '@/RPCServer'; import * as rpcErrors from '@/errors'; import * as rpcUtilsMiddleware from '@/middleware'; import { promise, timeoutCancelledReason } from '@/utils'; +import * as utils from '@/utils'; import * as rpcTestUtils from './utils'; describe(`${RPCClient.name}`, () => { @@ -1059,8 +1060,11 @@ describe(`${RPCClient.name}`, () => { Uint8Array, Uint8Array >(); + const { p: reasonP, resolveP: reasonResolveP } = utils.promise(); const streamPair: RPCStream = { - cancel: () => {}, + cancel: (reason) => { + reasonResolveP(reason); + }, meta: undefined, writable: forwardPassThroughStream.writable, readable: reversePassThroughStream.readable, @@ -1072,6 +1076,7 @@ describe(`${RPCClient.name}`, () => { ctx = ctx_; return streamPair; }, + graceTime: 500, logger, idGen, }); @@ -1080,9 +1085,13 @@ describe(`${RPCClient.name}`, () => { await rpcClient.duplexStreamCaller('testMethod', { timer: 100, }); + const start = Date.now(); await ctx?.timer; expect(ctx?.signal.aborted).toBeTrue(); expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); + const reason = await reasonP; + expect(Date.now() - start).toBeGreaterThan(500); + expect(reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }); test('duplex caller handles abort awaiting stream', async () => { const forwardPassThroughStream = new TransformStream< From 3e43ec2f558e8af49d54fbee5c922b341e500037 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Wed, 24 Jan 2024 18:45:57 +1100 Subject: [PATCH 2/4] tests: replaces `testProp` usage with `test.prop` `testProp` is deprecated and `test.prop` is much cleaner to use anyway. --- tests/RPC.test.ts | 1396 +++++++++++++++++++------------------- tests/RPCClient.test.ts | 1112 +++++++++++++++--------------- tests/RPCServer.test.ts | 1306 ++++++++++++++++++----------------- tests/middleware.test.ts | 159 +++-- tests/utils.test.ts | 30 +- 5 files changed, 1995 insertions(+), 2008 deletions(-) diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index cd86f5c..a5b20fb 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -9,7 +9,7 @@ import type { ReadableStream } from 'stream/web'; import type { JSONValue, IdGen } from '@/types'; import type { ContextTimed } from '@matrixai/contexts'; import { TransformStream } from 'stream/web'; -import { fc, testProp } from '@fast-check/jest'; +import { fc, test } from '@fast-check/jest'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { Timer } from '@matrixai/timer'; import RawCaller from '@/callers/RawCaller'; @@ -34,84 +34,86 @@ import * as rpcTestUtils from './utils'; describe('RPC', () => { const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]); const idGen: IdGen = () => Promise.resolve(null); - testProp( - 'RPC communication with raw stream', - [rpcTestUtils.rawDataArb], - async (values) => { - const [outputResult, outputWriterStream] = - rpcTestUtils.streamToArray(); - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - let header: JSONRPCRequest | undefined; - - class TestMethod extends RawHandler { - public handle = async ( - input: [JSONRPCRequest, ReadableStream], - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ): Promise<[JSONObject, ReadableStream]> => { - return new Promise((resolve) => { - const [header_, stream] = input; - header = header_; - resolve([{ value: 'some leading data' }, stream]); - }); - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); + test.prop( + { + values: rpcTestUtils.rawDataArb, + }, + {}, + )('RPC communication with raw stream', async ({ values }) => { + const [outputResult, outputWriterStream] = + rpcTestUtils.streamToArray(); + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new RawCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); + let header: JSONRPCRequest | undefined; - const callerInterface = await rpcClient.methods.testMethod({ - hello: 'world', - }); - const writer = callerInterface.writable.getWriter(); - const pipeProm = callerInterface.readable.pipeTo(outputWriterStream); - for (const value of values) { - await writer.write(value); - } - await writer.close(); - const expectedHeader: JSONRPCRequest = { - jsonrpc: '2.0', - method: 'testMethod', - params: { hello: 'world' }, - id: null, + class TestMethod extends RawHandler { + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ): Promise<[JSONObject, ReadableStream]> => { + return new Promise((resolve) => { + const [header_, stream] = input; + header = header_; + resolve([{ value: 'some leading data' }, stream]); + }); }; - expect(header).toStrictEqual(expectedHeader); - expect(callerInterface.meta?.result).toStrictEqual({ - value: 'some leading data', - }); - expect(await outputResult).toStrictEqual(values); - await pipeProm; - await rpcServer.stop({ force: true }); - }, - ); + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); + + const rpcClient = new RPCClient({ + manifest: { + testMethod: new RawCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); + + const callerInterface = await rpcClient.methods.testMethod({ + hello: 'world', + }); + const writer = callerInterface.writable.getWriter(); + const pipeProm = callerInterface.readable.pipeTo(outputWriterStream); + for (const value of values) { + await writer.write(value); + } + await writer.close(); + const expectedHeader: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'testMethod', + params: { hello: 'world' }, + id: null, + }; + expect(header).toStrictEqual(expectedHeader); + expect(callerInterface.meta?.result).toStrictEqual({ + value: 'some leading data', + }); + expect(await outputResult).toStrictEqual(values); + await pipeProm; + await rpcServer.stop({ force: true }); + }); test('RPC communication with raw stream times out waiting for leading message', async () => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, @@ -199,77 +201,84 @@ describe('RPC', () => { await rpcServer.stop({ force: true }); }); - testProp( - 'RPC communication with duplex stream', - [fc.array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 })], - async (values) => { - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - yield* input; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); + test.prop( + { + values: fc.array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 }), + }, + {}, + )('RPC communication with duplex stream', async ({ values }) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + yield* input; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new DuplexCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new DuplexCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); - const callerInterface = await rpcClient.methods.testMethod(); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - for (const value of values) { - await writer.write(value); - const receivedValue = (await reader.read()).value; - if ( - receivedValue?.metadata != null && - receivedValue.metadata.timeout === null - ) { - receivedValue.metadata.timeout = Infinity; - } - expect(receivedValue).toStrictEqual(value); + const callerInterface = await rpcClient.methods.testMethod(); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + for (const value of values) { + await writer.write(value); + const receivedValue = (await reader.read()).value; + if ( + receivedValue?.metadata != null && + receivedValue.metadata.timeout === null + ) { + receivedValue.metadata.timeout = Infinity; } - await writer.close(); - const result = await reader.read(); - expect(result.value).toBeUndefined(); - expect(result.done).toBeTrue(); - await rpcServer.stop({ force: true }); + expect(receivedValue).toStrictEqual(value); + } + await writer.close(); + const result = await reader.read(); + expect(result.value).toBeUndefined(); + expect(result.done).toBeTrue(); + await rpcServer.stop({ force: true }); + }); + test.prop( + { + values: fc + .array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 }) + .noShrink(), }, - ); - testProp( + { numRuns: 1 }, + )( 'RPC communication with duplex stream responds after timeout', - [fc.array(rpcTestUtils.safeJsonObjectArb, { minLength: 1 }).noShrink()], - async (values) => { + async ({ values }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array @@ -349,189 +358,186 @@ describe('RPC', () => { expect(aborted).toBeFalse(); await rpcServer.stop({ force: true }); }, - { numRuns: 1 }, ); - testProp( - 'RPC communication with server stream', - [fc.integer({ min: 1, max: 100 })], - async (value) => { - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + test.prop({ + value: fc.integer({ min: 1, max: 100 }), + })('RPC communication with server stream', async ({ value }) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + + class TestMethod extends ServerHandler< + ContainerType, + { value: number }, + { value: number } + > { + public handle = async function* (input: { + value: number; + }): AsyncGenerator<{ value: number }> { + for (let i = 0; i < input.value; i++) { + yield { value: i }; + } + }; + } + + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); - class TestMethod extends ServerHandler< - ContainerType, - { value: number }, - { value: number } - > { - public handle = async function* (input: { - value: number; - }): AsyncGenerator<{ value: number }> { - for (let i = 0; i < input.value; i++) { - yield { value: i }; - } + const rpcClient = new RPCClient({ + manifest: { + testMethod: new ServerCaller<{ value: number }, { value: number }>(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, }; - } + }, + logger, + idGen, + }); - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); + const callerInterface = await rpcClient.methods.testMethod({ value }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new ServerCaller<{ value: number }, { value: number }>(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); + const outputs: Array = []; + for await (const num of callerInterface) { + outputs.push(num.value); + } + expect(outputs.length).toEqual(value); + await rpcServer.stop({ force: true }); + }); + test.prop({ + values: fc.array(fc.integer(), { minLength: 1 }).noShrink(), + })('RPC communication with client stream', async ({ values }) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - const callerInterface = await rpcClient.methods.testMethod({ value }); + class TestMethod extends ClientHandler< + ContainerType, + { value: number }, + { value: number } + > { + public handle = async ( + input: AsyncIterable<{ value: number }>, + ): Promise<{ value: number }> => { + let acc = 0; + for await (const number of input) { + acc += number.value; + } + return { value: acc }; + }; + } - const outputs: Array = []; - for await (const num of callerInterface) { - outputs.push(num.value); - } - expect(outputs.length).toEqual(value); - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'RPC communication with client stream', - [fc.array(fc.integer(), { minLength: 1 }).noShrink()], - async (values) => { - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); - class TestMethod extends ClientHandler< - ContainerType, - { value: number }, - { value: number } - > { - public handle = async ( - input: AsyncIterable<{ value: number }>, - ): Promise<{ value: number }> => { - let acc = 0; - for await (const number of input) { - acc += number.value; - } - return { value: acc }; + const rpcClient = new RPCClient({ + manifest: { + testMethod: new ClientCaller<{ value: number }, { value: number }>(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, }; - } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); + }, + logger, + idGen, + }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new ClientCaller<{ value: number }, { value: number }>(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); + const { output, writable } = await rpcClient.methods.testMethod(); + const writer = writable.getWriter(); + for (const value of values) { + await writer.write({ value }); + } + await writer.close(); + const expectedResult = values.reduce((p, c) => p + c); + await expect(output).resolves.toHaveProperty('value', expectedResult); + await rpcServer.stop({ force: true }); + }); + test.prop({ + value: rpcTestUtils.safeJsonObjectArb, + })('RPC communication with unary call', async ({ value }) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - const { output, writable } = await rpcClient.methods.testMethod(); - const writer = writable.getWriter(); - for (const value of values) { - await writer.write({ value }); - } - await writer.close(); - const expectedResult = values.reduce((p, c) => p + c); - await expect(output).resolves.toHaveProperty('value', expectedResult); - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'RPC communication with unary call', - [rpcTestUtils.safeJsonObjectArb], - async (value) => { - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONRPCRequestParams, + ): Promise => { + return input; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); - class TestMethod extends UnaryHandler { - public handle = async ( - input: JSONRPCRequestParams, - ): Promise => { - return input; + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); - - const rpcClient = new RPCClient({ - manifest: { - testMethod: new UnaryCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); + }, + logger, + idGen, + }); - const result = await rpcClient.methods.testMethod(value); - if (result.metadata != null && result.metadata.timeout === null) { - result.metadata.timeout = Infinity; - } - expect(result).toEqual(value); - await rpcServer.stop({ force: true }); + const result = await rpcClient.methods.testMethod(value); + if (result.metadata != null && result.metadata.timeout === null) { + result.metadata.timeout = Infinity; + } + expect(result).toEqual(value); + await rpcServer.stop({ force: true }); + }); + test.prop( + { + value: rpcTestUtils.safeJsonObjectArb, }, - ); - testProp( + { numRuns: 1 }, + )( 'RPC communication with unary call responds after timeout', - [rpcTestUtils.safeJsonObjectArb], - async (value) => { + async ({ value }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array @@ -599,67 +605,62 @@ describe('RPC', () => { expect(aborted).toBeFalse(); await rpcServer.stop({ force: true }); }, - { numRuns: 1 }, ); - testProp( - 'RPC handles and sends errors', - [ - rpcTestUtils.safeJsonValueArb, - rpcTestUtils.errorArb(rpcTestUtils.errorArb()), - ], - async (value, error) => { - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + test.prop({ + value: rpcTestUtils.safeJsonValueArb, + error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), + })('RPC handles and sends errors', async ({ value, error }) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - class TestMethod extends UnaryHandler { - public handle = async ( - _input: JSONObject, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): Promise => { - throw error; - }; - } + class TestMethod extends UnaryHandler { + public handle = async ( + _input: JSONObject, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise => { + throw error; + }; + } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ ...serverPair, cancel: () => {} }); + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new UnaryCaller(), - }, - streamFactory: async () => { - return { ...clientPair, cancel: () => {} }; - }, - logger, - idGen, - }); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { ...clientPair, cancel: () => {} }; + }, + logger, + idGen, + }); - // Create a new promise so we can await it multiple times for assertions - const callProm = rpcClient.methods.testMethod({ value }); + // Create a new promise so we can await it multiple times for assertions + const callProm = rpcClient.methods.testMethod({ value }); - // The promise should be rejected - const rejection = await callProm.catch((e) => e); + // The promise should be rejected + const rejection = await callProm.catch((e) => e); - // The error should have specific properties - expect(rejection.cause).toBeInstanceOf(error.constructor); - expect(rejection.cause).toEqual(error); + // The error should have specific properties + expect(rejection.cause).toBeInstanceOf(error.constructor); + expect(rejection.cause).toEqual(error); - // Cleanup - await rpcServer.stop({ force: true }); - }, - ); + // Cleanup + await rpcServer.stop({ force: true }); + }); test('middleware can end stream early', async () => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, @@ -732,328 +733,321 @@ describe('RPC', () => { await expect(reader.closed).toReject(); await expect(rpcServer.stop({ force: false })).toResolve(); }); - testProp( - 'RPC client and server timeout concurrently', - [rpcTestUtils.safeJsonValueArb], - async (inputData) => { - let serverTimedOut = false; - let clientTimedOut = false; + test.prop({ + inputData: rpcTestUtils.safeJsonValueArb, + })('RPC client and server timeout concurrently', async ({ inputData }) => { + let serverTimedOut = false; + let clientTimedOut = false; - // Setup server and client communication pairs - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - const timeout = 1; - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, - ctx: ContextTimed, - ): AsyncIterableIterator { - // Check for abort event - ctx.signal.throwIfAborted(); - const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { - abortProm.rejectP(ctx.signal.reason); - }); - await abortProm.p; - }; - } - const testMethodInstance = new TestMethod({}); - // Set up a client and server with matching timeout settings - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: timeout, - }); - await rpcServer.start({ - manifest: { - testMethod: testMethodInstance, - }, - }); - // Register callback - rpcServer.registerOnTimeoutCallback(() => { - serverTimedOut = true; - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); + const timeout = 1; + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncIterableIterator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): AsyncIterableIterator { + // Check for abort event + ctx.signal.throwIfAborted(); + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + await abortProm.p; + }; + } + const testMethodInstance = new TestMethod({}); + // Set up a client and server with matching timeout settings + const rpcServer = new RPCServer({ + logger, + idGen, + timeoutTime: timeout, + }); + await rpcServer.start({ + manifest: { + testMethod: testMethodInstance, + }, + }); + // Register callback + rpcServer.registerOnTimeoutCallback(() => { + serverTimedOut = true; + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new DuplexCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.testMethod({ - timer: timeout, - }); - // Register callback - rpcClient.registerOnTimeoutCallback(() => { - clientTimedOut = true; - }); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - // Wait for server and client to timeout by checking the flag - await new Promise((resolve) => { - const checkFlag = () => { - if (serverTimedOut && clientTimedOut) resolve(); - else setTimeout(() => checkFlag(), 10); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new DuplexCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, }; - checkFlag(); - }); - // Expect both the client and the server to time out - await expect(writer.write({ value: inputData })).rejects.toThrow( - 'Timed out waiting for header', - ); + }, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.testMethod({ + timer: timeout, + }); + // Register callback + rpcClient.registerOnTimeoutCallback(() => { + clientTimedOut = true; + }); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + // Wait for server and client to timeout by checking the flag + await new Promise((resolve) => { + const checkFlag = () => { + if (serverTimedOut && clientTimedOut) resolve(); + else setTimeout(() => checkFlag(), 10); + }; + checkFlag(); + }); + // Expect both the client and the server to time out + await expect(writer.write({ value: inputData })).rejects.toThrow( + 'Timed out waiting for header', + ); - await expect(reader.read()).rejects.toThrow( - 'Timed out waiting for header', - ); + await expect(reader.read()).rejects.toThrow('Timed out waiting for header'); - await rpcServer.stop({ force: true }); + await rpcServer.stop({ force: true }); + }); + test.prop( + { + inputData: rpcTestUtils.safeJsonValueArb, }, - ); - // Test description - testProp( - 'RPC server times out before client', - [rpcTestUtils.safeJsonValueArb], - async (inputData) => { - let serverTimedOut = false; + { numRuns: 1 }, + )('RPC server times out before client', async ({ inputData }) => { + let serverTimedOut = false; - // Setup server and client communication pairs - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - // Define the server's method behavior - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, - ctx: ContextTimed, - ) { - ctx.signal.throwIfAborted(); - const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { - abortProm.rejectP(ctx.signal.reason); - }); - await abortProm.p; - }; - } + // Define the server's method behavior + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncIterableIterator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ) { + ctx.signal.throwIfAborted(); + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + await abortProm.p; + }; + } - // Create an instance of the RPC server with a shorter timeout - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 1, - }); - await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); - // Register callback - rpcServer.registerOnTimeoutCallback(() => { - serverTimedOut = true; - }); - rpcServer.handleStream({ ...serverPair, cancel: () => {} }); + // Create an instance of the RPC server with a shorter timeout + const rpcServer = new RPCServer({ + logger, + idGen, + timeoutTime: 1, + }); + await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); + // Register callback + rpcServer.registerOnTimeoutCallback(() => { + serverTimedOut = true; + }); + rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - // Create an instance of the RPC client with a longer timeout - const rpcClient = new RPCClient({ - manifest: { testMethod: new DuplexCaller() }, - streamFactory: async () => ({ ...clientPair, cancel: () => {} }), - logger, - idGen, - }); + // Create an instance of the RPC client with a longer timeout + const rpcClient = new RPCClient({ + manifest: { testMethod: new DuplexCaller() }, + streamFactory: async () => ({ ...clientPair, cancel: () => {} }), + logger, + idGen, + }); - // Get server and client interfaces - const callerInterface = await rpcClient.methods.testMethod({ - timer: 10, - }); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - // Wait for server to timeout by checking the flag - await new Promise((resolve) => { - const checkFlag = () => { - if (serverTimedOut) resolve(); - else setTimeout(() => checkFlag(), 10); - }; - checkFlag(); - }); + // Get server and client interfaces + const callerInterface = await rpcClient.methods.testMethod({ + timer: 10, + }); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + // Wait for server to timeout by checking the flag + await new Promise((resolve) => { + const checkFlag = () => { + if (serverTimedOut) resolve(); + else setTimeout(() => checkFlag(), 10); + }; + checkFlag(); + }); - // We expect server to timeout before the client - await expect(writer.write({ value: inputData })).rejects.toThrow( - 'Timed out waiting for header', - ); - await expect(reader.read()).rejects.toThrow( - 'Timed out waiting for header', - ); + // We expect server to timeout before the client + await expect(writer.write({ value: inputData })).rejects.toThrow( + 'Timed out waiting for header', + ); + await expect(reader.read()).rejects.toThrow('Timed out waiting for header'); - // Cleanup - await rpcServer.stop({ force: true }); + // Cleanup + await rpcServer.stop({ force: true }); + }); + test.prop( + { + value: rpcTestUtils.safeJsonValueArb, }, { numRuns: 1 }, - ); - testProp( - 'RPC client times out before server', - [rpcTestUtils.safeJsonValueArb], - async (value) => { - // Setup server and client communication pairs - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, - ctx: ContextTimed, - ): AsyncIterableIterator { - ctx.signal.throwIfAborted(); - const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { - abortProm.rejectP(ctx.signal.reason); - }); - await abortProm.p; - }; - } - // Set up a client and server with matching timeout settings - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: 400, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); + )('RPC client times out before server', async ({ value }) => { + // Setup server and client communication pairs + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncIterableIterator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): AsyncIterableIterator { + ctx.signal.throwIfAborted(); + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + await abortProm.p; + }; + } + // Set up a client and server with matching timeout settings + const rpcServer = new RPCServer({ + logger, + idGen, + timeoutTime: 400, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ + ...serverPair, + cancel: () => {}, + }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new DuplexCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.testMethod({ - timer: 300, - }); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - // Expect the client to time out first - await expect(writer.write({ value })).toResolve(); - await expect(reader.read()).toReject(); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new DuplexCaller(), + }, + streamFactory: async () => { + return { + ...clientPair, + cancel: () => {}, + }; + }, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.testMethod({ + timer: 300, + }); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); + // Expect the client to time out first + await expect(writer.write({ value })).toResolve(); + await expect(reader.read()).toReject(); - await rpcServer.stop({ force: true }); + await rpcServer.stop({ force: true }); + }); + test.prop( + { + inputData: rpcTestUtils.safeJsonValueArb, }, { numRuns: 1 }, - ); - testProp( - 'RPC client and server with infinite timeout', - [rpcTestUtils.safeJsonValueArb], - async (inputData) => { - // Set up a client and server with infinite timeout settings + )('RPC client and server with infinite timeout', async ({ inputData }) => { + // Set up a client and server with infinite timeout settings - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, - ctx: ContextTimed, - ) { - ctx.signal.throwIfAborted(); - const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { - abortProm.rejectP(ctx.signal.reason); - }); - await abortProm.p; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: Infinity, - }); - await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); - rpcServer.handleStream({ ...serverPair, cancel: () => {} }); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncIterableIterator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ) { + ctx.signal.throwIfAborted(); + const abortProm = utils.promise(); + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + await abortProm.p; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + timeoutTime: Infinity, + }); + await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); + rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ - manifest: { testMethod: new DuplexCaller() }, - streamFactory: async () => ({ ...clientPair, cancel: () => {} }), - logger, - idGen, - }); + const rpcClient = new RPCClient({ + manifest: { testMethod: new DuplexCaller() }, + streamFactory: async () => ({ ...clientPair, cancel: () => {} }), + logger, + idGen, + }); - const callerTimer = new Timer(() => {}, Infinity); + const callerTimer = new Timer(() => {}, Infinity); - const callerInterface = await rpcClient.methods.testMethod({ - timer: callerTimer, - }); + const callerInterface = await rpcClient.methods.testMethod({ + timer: callerTimer, + }); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); + const writer = callerInterface.writable.getWriter(); + const reader = callerInterface.readable.getReader(); - // Trigger a call that will hang indefinitely or for a long time #TODO + // Trigger a call that will hang indefinitely or for a long time #TODO - // Write a value to the stream - await writer.write({ value: inputData }); + // Write a value to the stream + await writer.write({ value: inputData }); - // Trigger a read that will hang indefinitely + // Trigger a read that will hang indefinitely - const readPromise = reader.read(); - // Adding a randomized sleep here to check that neither timeout - const randomSleepTime = Math.floor(Math.random() * 1000) + 1; - // Random time between 1 and 1,000 ms - await utils.sleep(randomSleepTime); - // At this point, writePromise and readPromise should neither be resolved nor rejected - // because the server method is hanging. + const readPromise = reader.read(); + // Adding a randomized sleep here to check that neither timeout + const randomSleepTime = Math.floor(Math.random() * 1000) + 1; + // Random time between 1 and 1,000 ms + await utils.sleep(randomSleepTime); + // At this point, writePromise and readPromise should neither be resolved nor rejected + // because the server method is hanging. - // Check if the promises are neither resolved nor rejected - const timeoutPromise = new Promise((_, reject) => - setTimeout(() => reject('timeout'), 1000), - ); + // Check if the promises are neither resolved nor rejected + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject('timeout'), 1000), + ); - // Check if read status is still pending; + // Check if read status is still pending; - await expect(Promise.race([readPromise, timeoutPromise])).rejects.toBe( - 'timeout', - ); + await expect(Promise.race([readPromise, timeoutPromise])).rejects.toBe( + 'timeout', + ); - // Cancel caller timer - callerTimer.cancel(); + // Cancel caller timer + callerTimer.cancel(); - // Expect neither to time out and verify that they can still handle other operations #TODO - await rpcServer.stop({ force: true }); - }, - { numRuns: 1 }, - ); + // Expect neither to time out and verify that they can still handle other operations #TODO + await rpcServer.stop({ force: true }); + }); test('RPC server times out using client timeout', async () => { // Setup server and client communication pairs const { clientPair, serverPair } = rpcTestUtils.createTapPairs< @@ -1110,10 +1104,14 @@ describe('RPC', () => { await rpcServer.stop({ force: true }); }); - testProp( + test.prop( + { + message: fc.string(), + }, + { numRuns: 1 }, + )( 'RPC client times out and server is able to ignore exception', - [fc.string()], - async (message) => { + async ({ message }) => { // Setup server and client communication pairs const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, @@ -1172,61 +1170,59 @@ describe('RPC', () => { await rpcServer.stop({ force: true }); }, - { numRuns: 1 }, ); - testProp( - 'RPC Serializes and Deserializes Error', - [rpcTestUtils.errorArb(rpcTestUtils.errorArb())], - async (error) => { - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); + test.prop({ + error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), + })('RPC Serializes and Deserializes Error', async ({ error }) => { + const { clientPair, serverPair } = rpcTestUtils.createTapPairs< + Uint8Array, + Uint8Array + >(); - class TestMethod extends UnaryHandler { - public handle = async ( - _input: JSONObject, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): Promise => { - throw error; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - rpcServer.handleStream({ ...serverPair, cancel: () => {} }); + class TestMethod extends UnaryHandler { + public handle = async ( + _input: JSONObject, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise => { + throw error; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + rpcServer.handleStream({ ...serverPair, cancel: () => {} }); - const rpcClient = new RPCClient({ - manifest: { - testMethod: new UnaryCaller(), - }, - streamFactory: async () => { - return { ...clientPair, cancel: () => {} }; - }, - logger, - idGen, - }); + const rpcClient = new RPCClient({ + manifest: { + testMethod: new UnaryCaller(), + }, + streamFactory: async () => { + return { ...clientPair, cancel: () => {} }; + }, + logger, + idGen, + }); - const callProm = rpcClient.methods.testMethod({}); - const callError = await callProm.catch((e) => e); - await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); - expect(callError.cause).toEqual(error); + const callProm = rpcClient.methods.testMethod({}); + const callError = await callProm.catch((e) => e); + await expect(callProm).rejects.toThrow(rpcErrors.ErrorRPCRemote); + expect(callError.cause).toEqual(error); - await rpcServer.stop({ force: true }); - }, - ); - testProp( + await rpcServer.stop({ force: true }); + }); + test.prop({ + error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), + })( 'RPC Serializes and Deserializes Error with Custom Replacer Function', - [rpcTestUtils.errorArb(rpcTestUtils.errorArb())], - async (error) => { + async ({ error }) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< Uint8Array, Uint8Array diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index e1e5838..aa180c5 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -15,7 +15,7 @@ import type { IdGen } from '@/types'; import { TransformStream, ReadableStream } from 'stream/web'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { Timer } from '@matrixai/timer'; -import { testProp, fc } from '@fast-check/jest'; +import { test, fc } from '@fast-check/jest'; import RawCaller from '@/callers/RawCaller'; import DuplexCaller from '@/callers/DuplexCaller'; import ServerCaller from '@/callers/ServerCaller'; @@ -42,68 +42,66 @@ describe(`${RPCClient.name}`, () => { }) .noShrink(); - testProp( - 'raw caller', - [ - rpcTestUtils.safeJsonObjectArb, - rpcTestUtils.rawDataArb, - rpcTestUtils.rawDataArb, - ], - async (headerParams, inputData, outputData) => { - const [inputResult, inputWritableStream] = - rpcTestUtils.streamToArray(); - const [outputResult, outputWritableStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: new ReadableStream({ - start: (controller) => { - const leadingResponse: JSONRPCResponseSuccess = { - jsonrpc: '2.0', - result: {}, - id: null, - }; - controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); - for (const datum of outputData) { - controller.enqueue(datum); - } - controller.close(); - }, - }), - writable: inputWritableStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.rawStreamCaller( - 'testMethod', - headerParams, - ); - await callerInterface.readable.pipeTo(outputWritableStream); - const writer = callerInterface.writable.getWriter(); - for (const inputDatum of inputData) { - await writer.write(inputDatum); - } - await writer.close(); + test.prop({ + headerParams: rpcTestUtils.safeJsonObjectArb, + inputData: rpcTestUtils.rawDataArb, + outputData: rpcTestUtils.rawDataArb, + })('raw caller', async ({ headerParams, inputData, outputData }) => { + const [inputResult, inputWritableStream] = + rpcTestUtils.streamToArray(); + const [outputResult, outputWritableStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream({ + start: (controller) => { + const leadingResponse: JSONRPCResponseSuccess = { + jsonrpc: '2.0', + result: {}, + id: null, + }; + controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); + for (const datum of outputData) { + controller.enqueue(datum); + } + controller.close(); + }, + }), + writable: inputWritableStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.rawStreamCaller( + 'testMethod', + headerParams, + ); + await callerInterface.readable.pipeTo(outputWritableStream); + const writer = callerInterface.writable.getWriter(); + for (const inputDatum of inputData) { + await writer.write(inputDatum); + } + await writer.close(); - const expectedHeader: JSONRPCRequest = { - jsonrpc: '2.0', - method: methodName, - params: headerParams, - id: null, - }; - expect(await inputResult).toStrictEqual([ - Buffer.from(JSON.stringify(expectedHeader)), - ...inputData, - ]); - expect(await outputResult).toStrictEqual(outputData); - }, - ); - testProp('generic duplex caller', [specificMessageArb], async (messages) => { + const expectedHeader: JSONRPCRequest = { + jsonrpc: '2.0', + method: methodName, + params: headerParams, + id: null, + }; + expect(await inputResult).toStrictEqual([ + Buffer.from(JSON.stringify(expectedHeader)), + ...inputData, + ]); + expect(await outputResult).toStrictEqual(outputData); + }); + test.prop({ + messages: specificMessageArb, + })('generic duplex caller', async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); @@ -147,135 +145,130 @@ describe(`${RPCClient.name}`, () => { expect(outputMessages).toStrictEqual(expectedMessages); }); - testProp( - 'generic server stream caller', - [specificMessageArb, rpcTestUtils.safeJsonObjectArb], - async (messages, params) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.serverStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName, params); - const values: Array = []; - for await (const value of callerInterface) { - values.push(value); - } - const expectedValues = messages.map((v) => v.result); - expect(values).toStrictEqual(expectedValues); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: methodName, - jsonrpc: '2.0', - id: null, - params: { - ...params, - metadata: { - timeout: null, - }, + test.prop({ + messages: specificMessageArb, + params: rpcTestUtils.safeJsonObjectArb, + })('generic server stream caller', async ({ messages, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.serverStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName, params); + const values: Array = []; + for await (const value of callerInterface) { + values.push(value); + } + const expectedValues = messages.map((v) => v.result); + expect(values).toStrictEqual(expectedValues); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: methodName, + jsonrpc: '2.0', + id: null, + params: { + ...params, + metadata: { + timeout: null, }, - }), - ); - }, - ); - testProp( - 'generic client stream caller', - [ - rpcTestUtils.JSONRPCResponseSuccessArb(), - fc.array(rpcTestUtils.safeJsonObjectArb), - ], - async (message, params) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const { output, writable } = await rpcClient.clientStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const writer = writable.getWriter(); - for (const param of params) { - await writer.write(param); - } - await writer.close(); - expect(await output).toStrictEqual(message.result); - const expectedOutput = params.map((v, i) => - JSON.stringify({ - method: methodName, - jsonrpc: '2.0', - id: null, - params: { ...v, ...(i === 0 ? { metadata: { timeout: null } } : {}) }, - }), - ); + }, + }), + ); + }); + test.prop({ + message: rpcTestUtils.JSONRPCResponseSuccessArb(), + params: fc.array(rpcTestUtils.safeJsonObjectArb), + })('generic client stream caller', async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const { output, writable } = await rpcClient.clientStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName); + const writer = writable.getWriter(); + for (const param of params) { + await writer.write(param); + } + await writer.close(); + expect(await output).toStrictEqual(message.result); + const expectedOutput = params.map((v, i) => + JSON.stringify({ + method: methodName, + jsonrpc: '2.0', + id: null, + params: { ...v, ...(i === 0 ? { metadata: { timeout: null } } : {}) }, + }), + ); - expect((await outputResult).map((v) => v.toString())).toStrictEqual( - expectedOutput, - ); - }, - ); - testProp( - 'generic unary caller', - [rpcTestUtils.JSONRPCResponseSuccessArb(), rpcTestUtils.safeJsonObjectArb], - async (message, params) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const result = await rpcClient.unaryCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName, params); - expect(result).toStrictEqual(message.result); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: methodName, - jsonrpc: '2.0', - id: null, - params: { ...params, metadata: { timeout: null } }, - }), - ); - }, - ); - testProp( + expect((await outputResult).map((v) => v.toString())).toStrictEqual( + expectedOutput, + ); + }); + test.prop({ + message: rpcTestUtils.JSONRPCResponseSuccessArb(), + params: rpcTestUtils.safeJsonObjectArb, + })('generic unary caller', async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const result = await rpcClient.unaryCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName, params); + expect(result).toStrictEqual(message.result); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: methodName, + jsonrpc: '2.0', + id: null, + params: { ...params, metadata: { timeout: null } }, + }), + ); + }); + test.prop({ + messages: fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), + errorMessage: rpcTestUtils.JSONRPCResponseFailedArb( + rpcTestUtils.errorArb(), + ), + })( 'generic duplex caller can throw received error message', - [ - fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), - rpcTestUtils.JSONRPCResponseFailedArb(rpcTestUtils.errorArb()), - ], - async (messages, errorMessage) => { + async ({ messages, errorMessage }) => { const inputStream = rpcTestUtils.messagesToReadableStream([ ...messages, errorMessage, @@ -307,13 +300,14 @@ describe(`${RPCClient.name}`, () => { await outputResult; }, ); - testProp( + test.prop({ + messages: fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), + errorMessage: rpcTestUtils.JSONRPCResponseFailedArb( + rpcTestUtils.errorArb(), + ), + })( 'generic duplex caller can throw received error message with sensitive', - [ - fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), - rpcTestUtils.JSONRPCResponseFailedArb(rpcTestUtils.errorArb()), - ], - async (messages, errorMessage) => { + async ({ messages, errorMessage }) => { const inputStream = rpcTestUtils.messagesToReadableStream([ ...messages, errorMessage, @@ -346,15 +340,14 @@ describe(`${RPCClient.name}`, () => { await outputResult; }, ); - testProp( + test.prop({ + messages: fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), + errorMessage: rpcTestUtils.JSONRPCResponseFailedArb( + rpcTestUtils.errorArb(rpcTestUtils.errorArb()), + ), + })( 'generic duplex caller can throw received error message with causes', - [ - fc.array(rpcTestUtils.JSONRPCResponseSuccessArb()), - rpcTestUtils.JSONRPCResponseFailedArb( - rpcTestUtils.errorArb(rpcTestUtils.errorArb()), - ), - ], - async (messages, errorMessage) => { + async ({ messages, errorMessage }) => { const inputStream = rpcTestUtils.messagesToReadableStream([ ...messages, errorMessage, @@ -387,355 +380,335 @@ describe(`${RPCClient.name}`, () => { await outputResult; }, ); - testProp( - 'generic duplex caller with forward Middleware', - [specificMessageArb], - async (messages) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - () => { - return { - forward: new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue({ - ...chunk, - params: { value: 'one', metadata: chunk.params?.metadata }, - }); - }, - }), - reverse: new TransformStream(), - }; - }, - ), - logger, - idGen, - }); + test.prop({ + messages: specificMessageArb, + })('generic duplex caller with forward Middleware', async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( + () => { + return { + forward: new TransformStream({ + transform: (chunk, controller) => { + controller.enqueue({ + ...chunk, + params: { value: 'one', metadata: chunk.params?.metadata }, + }); + }, + }), + reverse: new TransformStream(), + }; + }, + ), + logger, + idGen, + }); - const callerInterface = await rpcClient.duplexStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const reader = callerInterface.readable.getReader(); - const writer = callerInterface.writable.getWriter(); - while (true) { - const { value, done } = await reader.read(); - if (done) { - // We have to end the writer otherwise the stream never closes - await writer.close(); - break; - } - await writer.write(value); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName); + const reader = callerInterface.readable.getReader(); + const writer = callerInterface.writable.getWriter(); + while (true) { + const { value, done } = await reader.read(); + if (done) { + // We have to end the writer otherwise the stream never closes + await writer.close(); + break; } + await writer.write(value); + } - const expectedMessages: Array = messages.map( - (_, i) => ({ - jsonrpc: '2.0', - method: methodName, - id: null, - params: { - value: 'one', - ...(i === 0 ? { metadata: { timeout: null } } : {}), - }, - }), - ); - - const outputMessages = (await outputResult).map((v) => - JSON.parse(v.toString()), - ); - expect(outputMessages).toStrictEqual(expectedMessages); - }, - ); - testProp( - 'generic duplex caller with reverse Middleware', - [specificMessageArb], - async (messages) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: {}, - streamFactory: async () => streamPair, - middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( - () => { - return { - forward: new TransformStream(), - reverse: new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue({ - ...chunk, - result: { value: 'one' }, - }); - }, - }), - }; - }, - ), - logger, - idGen, - }); - - const callerInterface = await rpcClient.duplexStreamCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(methodName); - const reader = callerInterface.readable.getReader(); - const writer = callerInterface.writable.getWriter(); - while (true) { - const { value, done } = await reader.read(); - if (done) { - // We have to end the writer otherwise the stream never closes - await writer.close(); - break; - } - expect(value).toStrictEqual({ value: 'one' }); - await writer.write(value); - } - await outputResult; - }, - ); - testProp( - 'manifest server call', - [specificMessageArb, fc.string()], - async (messages, params) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - server: new ServerCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(), + const expectedMessages: Array = messages.map( + (_, i) => ({ + jsonrpc: '2.0', + method: methodName, + id: null, + params: { + value: 'one', + ...(i === 0 ? { metadata: { timeout: null } } : {}), }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.server({ value: params }); - const values: Array = []; - for await (const value of callerInterface) { - values.push(value); - } - const expectedValues = messages.map((v) => v.result); - expect(values).toStrictEqual(expectedValues); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: 'server', - jsonrpc: '2.0', - id: null, - params: { value: params, metadata: { timeout: null } }, - }), - ); - }, - ); - testProp( - 'manifest client call', - [ - rpcTestUtils.JSONRPCResponseSuccessArb(rpcTestUtils.safeJsonObjectArb), - fc.array(fc.string(), { minLength: 5 }), - ], - async (message, params) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - client: new ClientCaller< - JSONRPCRequestParams, - JSONRPCResponseResult - >(), + }), + ); + + const outputMessages = (await outputResult).map((v) => + JSON.parse(v.toString()), + ); + expect(outputMessages).toStrictEqual(expectedMessages); + }); + test.prop({ + messages: specificMessageArb, + })('generic duplex caller with reverse Middleware', async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: {}, + streamFactory: async () => streamPair, + middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper( + () => { + return { + forward: new TransformStream(), + reverse: new TransformStream({ + transform: (chunk, controller) => { + controller.enqueue({ + ...chunk, + result: { value: 'one' }, + }); + }, + }), + }; }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const { output, writable } = await rpcClient.methods.client(); - const writer = writable.getWriter(); - for (const param of params) { - await writer.write({ value: param }); + ), + logger, + idGen, + }); + + const callerInterface = await rpcClient.duplexStreamCaller< + JSONRPCRequestParams, + JSONRPCResponseResult + >(methodName); + const reader = callerInterface.readable.getReader(); + const writer = callerInterface.writable.getWriter(); + while (true) { + const { value, done } = await reader.read(); + if (done) { + // We have to end the writer otherwise the stream never closes + await writer.close(); + break; } - expect(await output).toStrictEqual(message.result); - await writer.close(); - const expectedOutput = params.map((v, i) => - JSON.stringify({ - method: 'client', - jsonrpc: '2.0', - id: null, - params: { - value: v, - ...(i === 0 ? { metadata: { timeout: null } } : {}), - }, - }), - ); - expect((await outputResult).map((v) => v.toString())).toStrictEqual( - expectedOutput, - ); - }, - ); - testProp( - 'manifest unary call', - [rpcTestUtils.JSONRPCResponseSuccessArb().noShrink(), fc.string()], - async (message, params) => { - const inputStream = rpcTestUtils.messagesToReadableStream([message]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - unary: new UnaryCaller(), - }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const result = await rpcClient.methods.unary({ value: params }); - expect(result).toStrictEqual(message.result); - expect((await outputResult)[0]?.toString()).toStrictEqual( - JSON.stringify({ - method: 'unary', - jsonrpc: '2.0', - id: null, - params: { value: params, metadata: { timeout: null } }, - }), - ); - }, - ); - testProp( - 'manifest raw caller', - [ + expect(value).toStrictEqual({ value: 'one' }); + await writer.write(value); + } + await outputResult; + }); + test.prop({ + messages: specificMessageArb, + params: fc.string(), + })('manifest server call', async ({ messages, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + server: new ServerCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.server({ value: params }); + const values: Array = []; + for await (const value of callerInterface) { + values.push(value); + } + const expectedValues = messages.map((v) => v.result); + expect(values).toStrictEqual(expectedValues); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: 'server', + jsonrpc: '2.0', + id: null, + params: { value: params, metadata: { timeout: null } }, + }), + ); + }); + test.prop({ + message: rpcTestUtils.JSONRPCResponseSuccessArb( rpcTestUtils.safeJsonObjectArb, - rpcTestUtils.rawDataArb, - rpcTestUtils.rawDataArb, - ], - async (headerParams, inputData, outputData) => { - const [inputResult, inputWritableStream] = - rpcTestUtils.streamToArray(); - const [outputResult, outputWritableStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: new ReadableStream({ - start: (controller) => { - const leadingResponse: JSONRPCResponseSuccess = { - jsonrpc: '2.0', - result: { value: null }, - id: null, - }; - controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); - for (const datum of outputData) { - controller.enqueue(datum); - } - controller.close(); - }, - }), - writable: inputWritableStream, - }; - const rpcClient = new RPCClient({ - manifest: { - raw: new RawCaller(), + ), + params: fc.array(fc.string(), { minLength: 5 }), + })('manifest client call', async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + client: new ClientCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const { output, writable } = await rpcClient.methods.client(); + const writer = writable.getWriter(); + for (const param of params) { + await writer.write({ value: param }); + } + expect(await output).toStrictEqual(message.result); + await writer.close(); + const expectedOutput = params.map((v, i) => + JSON.stringify({ + method: 'client', + jsonrpc: '2.0', + id: null, + params: { + value: v, + ...(i === 0 ? { metadata: { timeout: null } } : {}), }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.raw(headerParams); - await callerInterface.readable.pipeTo(outputWritableStream); - const writer = callerInterface.writable.getWriter(); - for (const inputDatum of inputData) { - await writer.write(inputDatum); - } - await writer.close(); - - const expectedHeader: JSONRPCRequest = { + }), + ); + expect((await outputResult).map((v) => v.toString())).toStrictEqual( + expectedOutput, + ); + }); + test.prop({ + message: rpcTestUtils.JSONRPCResponseSuccessArb(), + params: fc.string(), + })('manifest unary call', async ({ message, params }) => { + const inputStream = rpcTestUtils.messagesToReadableStream([message]); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + unary: new UnaryCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const result = await rpcClient.methods.unary({ value: params }); + expect(result).toStrictEqual(message.result); + expect((await outputResult)[0]?.toString()).toStrictEqual( + JSON.stringify({ + method: 'unary', jsonrpc: '2.0', - method: 'raw', - params: headerParams, id: null, - }; - expect(await inputResult).toStrictEqual([ - Buffer.from(JSON.stringify(expectedHeader)), - ...inputData, - ]); - expect(await outputResult).toStrictEqual(outputData); + params: { value: params, metadata: { timeout: null } }, + }), + ); + }); + test.prop( + { + headerParams: rpcTestUtils.safeJsonObjectArb, + inputData: rpcTestUtils.rawDataArb, + outputData: rpcTestUtils.rawDataArb, }, - { seed: -783452149, path: '0:0:0:0:0:0:0', endOnFailure: true }, - ); - testProp( - 'manifest duplex caller', - [ - fc.array( - rpcTestUtils.JSONRPCResponseSuccessArb(rpcTestUtils.safeJsonObjectArb), - { - minLength: 1, - }, - ), - ], - async (messages) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: RPCStream = { - cancel: () => {}, - meta: undefined, - readable: inputStream, - writable: outputStream, - }; - const rpcClient = new RPCClient({ - manifest: { - duplex: new DuplexCaller(), + { seed: -783452149, path: '0:0:0:0:0:0:0', endOnFailure: true }, // FIXME: remove + )('manifest raw caller', async ({ headerParams, inputData, outputData }) => { + const [inputResult, inputWritableStream] = + rpcTestUtils.streamToArray(); + const [outputResult, outputWritableStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: new ReadableStream({ + start: (controller) => { + const leadingResponse: JSONRPCResponseSuccess = { + jsonrpc: '2.0', + result: { value: null }, + id: null, + }; + controller.enqueue(Buffer.from(JSON.stringify(leadingResponse))); + for (const datum of outputData) { + controller.enqueue(datum); + } + controller.close(); }, - streamFactory: async () => streamPair, - logger, - idGen, - }); - let count = 0; - const callerInterface = await rpcClient.methods.duplex(); - const writer = callerInterface.writable.getWriter(); - for await (const value of callerInterface.readable) { - count += 1; - await writer.write(value); - } - await writer.close(); - const result = await outputResult; - // We're just checking that it's consuming the messages as expected - expect(result.length).toEqual(messages.length); - expect(count).toEqual(messages.length); - }, - ); + }), + writable: inputWritableStream, + }; + const rpcClient = new RPCClient({ + manifest: { + raw: new RawCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + const callerInterface = await rpcClient.methods.raw(headerParams); + await callerInterface.readable.pipeTo(outputWritableStream); + const writer = callerInterface.writable.getWriter(); + for (const inputDatum of inputData) { + await writer.write(inputDatum); + } + await writer.close(); + + const expectedHeader: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'raw', + params: headerParams, + id: null, + }; + expect(await inputResult).toStrictEqual([ + Buffer.from(JSON.stringify(expectedHeader)), + ...inputData, + ]); + expect(await outputResult).toStrictEqual(outputData); + }); + test.prop({ + messages: fc.array( + rpcTestUtils.JSONRPCResponseSuccessArb(rpcTestUtils.safeJsonObjectArb), + { + minLength: 1, + }, + ), + })('manifest duplex caller', async ({ messages }) => { + const inputStream = rpcTestUtils.messagesToReadableStream(messages); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); + const streamPair: RPCStream = { + cancel: () => {}, + meta: undefined, + readable: inputStream, + writable: outputStream, + }; + const rpcClient = new RPCClient({ + manifest: { + duplex: new DuplexCaller(), + }, + streamFactory: async () => streamPair, + logger, + idGen, + }); + let count = 0; + const callerInterface = await rpcClient.methods.duplex(); + const writer = callerInterface.writable.getWriter(); + for await (const value of callerInterface.readable) { + count += 1; + await writer.write(value); + } + await writer.close(); + const result = await outputResult; + // We're just checking that it's consuming the messages as expected + expect(result.length).toEqual(messages.length); + expect(count).toEqual(messages.length); + }); test('manifest without handler errors', async () => { const rpcClient = new RPCClient({ manifest: {}, @@ -750,10 +723,11 @@ describe(`${RPCClient.name}`, () => { // @ts-ignore: ignoring type safety here expect(() => rpcClient.withMethods.someMethod()).toThrow(); }); - testProp( + test.prop({ + timeoutTime: fc.integer({ max: -1 }), + })( 'constructor should throw when passed a negative timeoutTime', - [fc.integer({ max: -1 })], - async (timeoutTime) => { + async ({ timeoutTime }) => { const streamPair: RPCStream = { cancel: () => {}, meta: undefined, @@ -1138,10 +1112,14 @@ describe(`${RPCClient.name}`, () => { expect(ctx?.signal.reason).toBe(rejectReason); stream.cancel(Error('asd')); }); - testProp( + test.prop( + { + messages: specificMessageArb, + }, + { numRuns: 5 }, + )( 'duplex caller timeout is cancelled when receiving message', - [specificMessageArb], - async (messages) => { + async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); const streamPair: RPCStream = { cancel: () => {}, @@ -1172,13 +1150,16 @@ describe(`${RPCClient.name}`, () => { } await expect(ctx.timer).rejects.toBe(timeoutCancelledReason); }, - { numRuns: 5 }, ); }); - testProp( + test.prop( + { + messages: specificMessageArb, + }, + { numRuns: 5 }, + )( 'duplex caller timeout is not cancelled when receiving message with provided ctx', - [specificMessageArb], - async (messages) => { + async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); const streamPair: RPCStream = { cancel: () => {}, @@ -1210,13 +1191,13 @@ describe(`${RPCClient.name}`, () => { await ctx.timer; expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); }, - { numRuns: 5 }, ); describe('timeout priority', () => { - testProp( + test.prop({ + timeouts: rpcTestUtils.timeoutsArb, + })( 'check that call with ctx can override higher timeout of RPCClient', - [rpcTestUtils.timeoutsArb], - async ([lowerTimeoutTime, higherTimeoutTime]) => { + async ({ timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const streamPair: RPCStream = { cancel: () => {}, meta: undefined, @@ -1248,10 +1229,11 @@ describe(`${RPCClient.name}`, () => { await ctx.timer.catch(() => {}); }, ); - testProp( + test.prop({ + timeouts: rpcTestUtils.timeoutsArb, + })( 'check that call with ctx can override lower timeout of RPCClient', - [rpcTestUtils.timeoutsArb], - async ([lowerTimeoutTime, higherTimeoutTime]) => { + async ({ timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const streamPair: RPCStream = { cancel: () => {}, meta: undefined, @@ -1283,10 +1265,11 @@ describe(`${RPCClient.name}`, () => { await ctx.timer.catch(() => {}); }, ); - testProp( + test.prop({ + timeoutTime: fc.integer({ min: 0 }), + })( 'check that call with ctx can override lower timeout of RPCClient with Infinity', - [fc.integer({ min: 0 })], - async (timeoutTime) => { + async ({ timeoutTime }) => { const streamPair: RPCStream = { cancel: () => {}, meta: undefined, @@ -1318,10 +1301,14 @@ describe(`${RPCClient.name}`, () => { await ctx.timer.catch(() => {}); }, ); - testProp( + test.prop( + { + messages: specificMessageArb, + }, + { numRuns: 1 }, + )( 'Check that ctx is provided to the middleware and that the middleware can reset the timer', - [specificMessageArb], - async (messages) => { + async ({ messages }) => { const inputStream = rpcTestUtils.messagesToReadableStream(messages); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); @@ -1364,7 +1351,6 @@ describe(`${RPCClient.name}`, () => { await outputResult; }, - { numRuns: 1 }, ); }); }); diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index ce02e53..01e8f9d 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -12,7 +12,7 @@ import type { import type { RPCErrorEvent } from '@/events'; import type { IdGen } from '@/types'; import { ReadableStream, TransformStream, WritableStream } from 'stream/web'; -import { fc, testProp } from '@fast-check/jest'; +import { fc, test } from '@fast-check/jest'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import RPCServer from '@/RPCServer'; import * as rpcErrors from '@/errors'; @@ -57,296 +57,285 @@ describe(`${RPCServer.name}`, () => { data: rpcTestUtils.safeJsonValueArb, }), ); - testProp( - 'can stream data with raw duplex stream handler', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils - .messagesToReadableStream(messages) - .pipeThrough( - rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6]), - ); - class TestHandler extends RawHandler { - public handle = async ( - input: [JSONRPCRequest, ReadableStream], - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): Promise<[JSONRPCResponseResult, ReadableStream]> => { - for await (const _ of input[1]) { - // No touch, only consume - } - const readableStream = new ReadableStream({ - start: (controller) => { - controller.enqueue(Buffer.from('hello world!')); - controller.close(); - }, - }); - return Promise.resolve([{}, readableStream]); - }; - } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestHandler({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); + test.prop( + { + messages: specificMessageArb, }, { numRuns: 1 }, - ); - testProp( - 'can stream data with duplex stream handler', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - for await (const val of input) { - yield val; - break; - } - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'can stream data with client stream handler', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends ClientHandler { - public handle = async ( - input: AsyncGenerator>, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): Promise> => { - let count = 0; - for await (const _ of input) { - count += 1; - } - return { value: count }; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, + )('can stream data with raw duplex stream handler', async ({ messages }) => { + const stream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6])); + class TestHandler extends RawHandler { + public handle = async ( + input: [JSONRPCRequest, ReadableStream], + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise<[JSONRPCResponseResult, ReadableStream]> => { + for await (const _ of input[1]) { + // No touch, only consume + } + const readableStream = new ReadableStream({ + start: (controller) => { + controller.enqueue(Buffer.from('hello world!')); + controller.close(); + }, + }); + return Promise.resolve([{}, readableStream]); }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'can stream data with server stream handler', - [singleNumberMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends ServerHandler< - ContainerType, - JSONRPCRequestParams, - JSONRPCResponseResult - > { - public handle = async function* ( - input: JSONRPCRequestParams, - ): AsyncGenerator { - for (let i = 0; i < (input.value ?? 0); i++) { - yield { value: i }; - } - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, + } + + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestHandler({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + })('can stream data with duplex stream handler', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + for await (const val of input) { + yield val; + break; + } }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'can stream data with server stream handler', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends UnaryHandler { - public handle = async ( - input: JSONRPCRequestParams, - ): Promise => { - return input; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + })('can stream data with client stream handler', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends ClientHandler { + public handle = async ( + input: AsyncGenerator>, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): Promise> => { + let count = 0; + for await (const _ of input) { + count += 1; + } + return { value: count }; }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'handler is provided with container', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - const container = { - a: Symbol('a'), - B: Symbol('b'), - C: Symbol('c'), + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: singleNumberMessageArb, + })('can stream data with server stream handler', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends ServerHandler< + ContainerType, + JSONRPCRequestParams, + JSONRPCResponseResult + > { + public handle = async function* ( + input: JSONRPCRequestParams, + ): AsyncGenerator { + const number = (input.value as number) ?? 0; + for (let i = 0; i < number; i++) { + yield { value: i }; + } }; - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - expect(this.container).toBe(container); - for await (const val of input) { - yield val; - } - }; - } - - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod(container), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + })('can stream data with server stream handler', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends UnaryHandler { + public handle = async ( + input: JSONRPCRequestParams, + ): Promise => { + return input; }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - }, - ); - testProp( - 'handler is provided with connectionInfo', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - const meta = { - localHost: 'hostA', - localPort: 12341, - remoteCertificates: [], - remoteHost: 'hostA', - remotePort: 12341, + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + })('handler is provided with container', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const container = { + a: Symbol('a'), + B: Symbol('b'), + C: Symbol('c'), + }; + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + expect(this.container).toBe(container); + for await (const val of input) { + yield val; + } }; - let handledMeta; - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - cancel: (reason?: any) => void, - meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - handledMeta = meta; - for await (const val of input) { - yield val; - } - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - meta, - readable: stream, - writable: outputStream, + } + + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod(container), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + })('handler is provided with connectionInfo', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const meta = { + localHost: 'hostA', + localPort: 12341, + remoteCertificates: [], + remoteHost: 'hostA', + remotePort: 12341, + }; + let handledMeta; + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + handledMeta = meta; + for await (const val of input) { + yield val; + } }; - rpcServer.handleStream(readWriteStream); - await outputResult; - await rpcServer.stop({ force: true }); - expect(handledMeta).toBe(meta); - }, - ); - testProp('handler can be aborted', [specificMessageArb], async (messages) => { + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + meta, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + await rpcServer.stop({ force: true }); + expect(handledMeta).toBe(meta); + }); + test.prop({ + messages: specificMessageArb, + })('handler can be aborted', async ({ messages }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { public handle = async function* ( @@ -401,7 +390,9 @@ describe(`${RPCServer.name}`, () => { ).not.toThrow(); await rpcServer.stop({ force: true }); }); - testProp('handler yields nothing', [specificMessageArb], async (messages) => { + test.prop({ + messages: specificMessageArb, + })('handler yields nothing', async ({ messages }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { public handle = async function* ( @@ -435,53 +426,54 @@ describe(`${RPCServer.name}`, () => { // We're just expecting no errors await rpcServer.stop({ force: true }); }); - testProp( - 'should send error message', - [specificMessageArb, rpcTestUtils.errorArb(rpcTestUtils.errorArb())], - async (messages, error) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = - async function* (): AsyncGenerator { - throw error; - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - let resolve, reject; - const errorProm = new Promise((resolve_, reject_) => { - resolve = resolve_; - reject = reject_; - }); - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const rawErrorMessage = (await outputResult)[0]!.toString(); - const errorMessage = JSON.parse(rawErrorMessage); - expect(errorMessage.error.message).toEqual(error.message); - reject(); - await expect(errorProm).toReject(); - await rpcServer.stop({ force: true }); - }, - ); - testProp( + test.prop({ + messages: specificMessageArb, + error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), + })('should send error message', async ({ messages, error }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = + async function* (): AsyncGenerator { + throw error; + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + let resolve, reject; + const errorProm = new Promise((resolve_, reject_) => { + resolve = resolve_; + reject = reject_; + }); + rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { + resolve(thing); + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + const rawErrorMessage = (await outputResult)[0]!.toString(); + const errorMessage = JSON.parse(rawErrorMessage); + expect(errorMessage.error.message).toEqual(error.message); + reject(); + await expect(errorProm).toReject(); + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + error: rpcTestUtils.errorArb(rpcTestUtils.errorArb()), + })( 'should send error message with sensitive', - [specificMessageArb, rpcTestUtils.errorArb(rpcTestUtils.errorArb())], - async (messages, error) => { + async ({ messages, error }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { public handle = @@ -522,141 +514,140 @@ describe(`${RPCServer.name}`, () => { await rpcServer.stop({ force: true }); }, ); - testProp( - 'should emit stream error if input stream fails', - [specificMessageArb], - async (messages) => { - const handlerEndedProm = promise(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input, - ): AsyncGenerator { - try { - for await (const _ of input) { - // Consume but don't yield anything - } - } finally { - handlerEndedProm.resolveP(); - } - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - let resolve; - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); - }); - const passThroughStreamIn = new TransformStream(); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: passThroughStreamIn.readable, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const writer = passThroughStreamIn.writable.getWriter(); - // Write messages - for (const message of messages) { - await writer.write(Buffer.from(JSON.stringify(message))); - } - // Abort stream - const writerReason = new Error('writerAbort'); - await writer.abort(writerReason); - // We should get an error RPC message - await expect(outputResult).toResolve(); - const errorMessage = JSON.parse((await outputResult)[0].toString()); - // Parse without error - rpcUtils.parseJSONRPCResponseFailed(errorMessage); - // Check that the handler was cleaned up. - await expect(handlerEndedProm.p).toResolve(); - await rpcServer.stop({ force: true }); + test.prop( + { + messages: specificMessageArb, }, { numRuns: 1 }, - ); - testProp( - 'should emit stream error if output stream fails', - [specificMessageArb], - async (messages) => { - const handlerEndedProm = promise(); - let ctx: ContextTimed | undefined; - class TestMethod extends DuplexHandler { - public handle = async function* ( - input, - _cancel, - _meta, - ctx_, - ): AsyncGenerator { - ctx = ctx_; - // Echo input - try { - yield* input; - } finally { - handlerEndedProm.resolveP(); + )('should emit stream error if input stream fails', async ({ messages }) => { + const handlerEndedProm = promise(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input, + ): AsyncGenerator { + try { + for await (const _ of input) { + // Consume but don't yield anything } - }; - } - const rpcServer = new RPCServer({ - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - let resolve; - const errorProm = new Promise((resolve_) => { - resolve = resolve_; - }); - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); - }); - const passThroughStreamIn = new TransformStream(); - const passThroughStreamOut = new TransformStream< - Uint8Array, - Uint8Array - >(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: passThroughStreamIn.readable, - writable: passThroughStreamOut.writable, + } finally { + handlerEndedProm.resolveP(); + } }; - rpcServer.handleStream(readWriteStream); - const writer = passThroughStreamIn.writable.getWriter(); - const reader = passThroughStreamOut.readable.getReader(); - // Write messages - for (const message of messages) { - await writer.write(Buffer.from(JSON.stringify(message))); - await reader.read(); - } - // Abort stream - // const writerReason = Symbol('writerAbort'); - const readerReason = Symbol('readerAbort'); - // Await writer.abort(writerReason); - await reader.cancel(readerReason); - // We should get an error event - const event = await errorProm; - await writer.close(); - // Expect(event.detail.cause).toContain(writerReason); - expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCStreamEnded); - // Check that the handler was cleaned up. - await expect(handlerEndedProm.p).toResolve(); - // Check that an abort signal happened - expect(ctx).toBeDefined(); - expect(ctx?.signal.aborted).toBeTrue(); - expect(ctx?.signal.reason).toBe(readerReason); - await rpcServer.stop({ force: true }); + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + let resolve; + rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { + resolve(thing); + }); + const passThroughStreamIn = new TransformStream(); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: passThroughStreamIn.readable, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + const writer = passThroughStreamIn.writable.getWriter(); + // Write messages + for (const message of messages) { + await writer.write(Buffer.from(JSON.stringify(message))); + } + // Abort stream + const writerReason = new Error('writerAbort'); + await writer.abort(writerReason); + // We should get an error RPC message + await expect(outputResult).toResolve(); + const errorMessage = JSON.parse((await outputResult)[0].toString()); + // Parse without error + rpcUtils.parseJSONRPCResponseFailed(errorMessage); + // Check that the handler was cleaned up. + await expect(handlerEndedProm.p).toResolve(); + await rpcServer.stop({ force: true }); + }); + test.prop( + { + messages: specificMessageArb, }, { numRuns: 1 }, - ); - testProp('forward middlewares', [specificMessageArb], async (messages) => { + )('should emit stream error if output stream fails', async ({ messages }) => { + const handlerEndedProm = promise(); + let ctx: ContextTimed | undefined; + class TestMethod extends DuplexHandler { + public handle = async function* ( + input, + _cancel, + _meta, + ctx_, + ): AsyncGenerator { + ctx = ctx_; + // Echo input + try { + yield* input; + } finally { + handlerEndedProm.resolveP(); + } + }; + } + const rpcServer = new RPCServer({ + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + let resolve; + const errorProm = new Promise((resolve_) => { + resolve = resolve_; + }); + rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { + resolve(thing); + }); + const passThroughStreamIn = new TransformStream(); + const passThroughStreamOut = new TransformStream(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: passThroughStreamIn.readable, + writable: passThroughStreamOut.writable, + }; + rpcServer.handleStream(readWriteStream); + const writer = passThroughStreamIn.writable.getWriter(); + const reader = passThroughStreamOut.readable.getReader(); + // Write messages + for (const message of messages) { + await writer.write(Buffer.from(JSON.stringify(message))); + await reader.read(); + } + // Abort stream + // const writerReason = Symbol('writerAbort'); + const readerReason = Symbol('readerAbort'); + // Await writer.abort(writerReason); + await reader.cancel(readerReason); + // We should get an error event + const event = await errorProm; + await writer.close(); + // Expect(event.detail.cause).toContain(writerReason); + expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCStreamEnded); + // Check that the handler was cleaned up. + await expect(handlerEndedProm.p).toResolve(); + // Check that an abort signal happened + expect(ctx).toBeDefined(); + expect(ctx?.signal.aborted).toBeTrue(); + expect(ctx?.signal.reason).toBe(readerReason); + await rpcServer.stop({ force: true }); + }); + test.prop({ + messages: specificMessageArb, + })('forward middlewares', async ({ messages }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); class TestMethod extends DuplexHandler { public handle = async function* ( @@ -710,153 +701,148 @@ describe(`${RPCServer.name}`, () => { ); await rpcServer.stop({ force: true }); }); - testProp( - 'reverse middlewares', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator>, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator> { - yield* input; - }; - } - const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( - () => { - return { - forward: new TransformStream(), - reverse: new TransformStream({ - transform: (chunk, controller) => { - if ('result' in chunk) chunk.result = { value: 1 }; - controller.enqueue(chunk); - }, - }), - }; - }, - ); - const rpcServer = new RPCServer({ - middlewareFactory: middleware, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, - }; - rpcServer.handleStream(readWriteStream); - const out = await outputResult; - expect(out.map((v) => v!.toString())).toStrictEqual( - messages.map(() => - JSON.stringify({ - jsonrpc: '2.0', - result: { value: 1 }, - id: null, - }), - ), - ); - await rpcServer.stop({ force: true }); + test.prop( + { + messages: specificMessageArb, }, { numRuns: 1 }, - ); - testProp( - 'forward middleware authentication', - [invalidTokenMessageArb], - async (message) => { - const stream = rpcTestUtils.messagesToReadableStream([message]); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - _ctx: ContextTimed, - ): AsyncGenerator { - yield* input; - }; - } - const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( - () => { - let first = true; - let reverseController: TransformStreamDefaultController; - return { - forward: new TransformStream< - JSONRPCRequest, - JSONRPCRequest - >({ - transform: (chunk, controller) => { - if (first && chunk.params?.metadata?.token !== validToken) { - reverseController.enqueue(failureMessage); - // Closing streams early - controller.terminate(); - reverseController.terminate(); - } - first = false; - controller.enqueue(chunk); - }, - }), - reverse: new TransformStream({ - start: (controller) => { - // Kidnapping reverse controller - reverseController = controller; - }, - transform: (chunk, controller) => { - controller.enqueue(chunk); - }, - }), - }; - }, - ); - const rpcServer = new RPCServer({ - middlewareFactory: middleware, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, + )('reverse middlewares', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator>, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator> { + yield* input; }; - type TestType = { - metadata: { - token: string; - }; - data: JSONValue; + } + const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { + return { + forward: new TransformStream(), + reverse: new TransformStream({ + transform: (chunk, controller) => { + if ('result' in chunk) chunk.result = { value: 1 }; + controller.enqueue(chunk); + }, + }), }; - const failureMessage: JSONRPCResponseFailed = { - jsonrpc: '2.0', - id: null, - error: { - code: 1, - message: 'failure of some kind', - }, + }); + const rpcServer = new RPCServer({ + middlewareFactory: middleware, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + const out = await outputResult; + expect(out.map((v) => v!.toString())).toStrictEqual( + messages.map(() => + JSON.stringify({ + jsonrpc: '2.0', + result: { value: 1 }, + id: null, + }), + ), + ); + await rpcServer.stop({ force: true }); + }); + test.prop({ + message: invalidTokenMessageArb, + })('forward middleware authentication', async ({ message }) => { + const stream = rpcTestUtils.messagesToReadableStream([message]); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + _ctx: ContextTimed, + ): AsyncGenerator { + yield* input; }; - rpcServer.handleStream(readWriteStream); - expect((await outputResult).toString()).toEqual( - JSON.stringify(failureMessage), - ); - await rpcServer.stop({ force: true }); - }, - ); - testProp( + } + const middleware = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(() => { + let first = true; + let reverseController: TransformStreamDefaultController; + return { + forward: new TransformStream< + JSONRPCRequest, + JSONRPCRequest + >({ + transform: (chunk, controller) => { + if (first && chunk.params?.metadata?.token !== validToken) { + reverseController.enqueue(failureMessage); + // Closing streams early + controller.terminate(); + reverseController.terminate(); + } + first = false; + controller.enqueue(chunk); + }, + }), + reverse: new TransformStream({ + start: (controller) => { + // Kidnapping reverse controller + reverseController = controller; + }, + transform: (chunk, controller) => { + controller.enqueue(chunk); + }, + }), + }; + }); + const rpcServer = new RPCServer({ + middlewareFactory: middleware, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + type TestType = { + metadata: { + token: string; + }; + data: JSONValue; + }; + const failureMessage: JSONRPCResponseFailed = { + jsonrpc: '2.0', + id: null, + error: { + code: 1, + message: 'failure of some kind', + }, + }; + rpcServer.handleStream(readWriteStream); + expect((await outputResult).toString()).toEqual( + JSON.stringify(failureMessage), + ); + await rpcServer.stop({ force: true }); + }); + test.prop({ + timeoutTime: fc.integer({ max: -1 }), + })( 'constructor should throw when passed a negative timeout', - [fc.integer({ max: -1 })], - async (timeoutTime) => { + async ({ timeoutTime }) => { const constructorF = () => new RPCServer({ timeoutTime, @@ -867,10 +853,11 @@ describe(`${RPCServer.name}`, () => { expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); }, ); - testProp( + test.prop({ + timeoutTime: fc.integer({ max: -1 }), + })( 'start should throw when passed a handler with negative timeout', - [fc.integer({ max: -1 })], - async (timeoutTime) => { + async ({ timeoutTime }) => { const waitProm = promise(); const ctxLongProm = promise(); @@ -1122,58 +1109,59 @@ describe(`${RPCServer.name}`, () => { await expect(ctx.timer).toReject(); await rpcServer.stop({ force: true }); }); - testProp( - 'middleware can update timeout timer', - [specificMessageArb], - async (messages) => { - const stream = rpcTestUtils.messagesToReadableStream(messages); - const ctxProm = promise(); - class TestMethod extends DuplexHandler { - public handle = async function* ( - input: AsyncGenerator, - cancel: (reason?: any) => void, - meta: Record | undefined, - ctx: ContextTimed, - ): AsyncGenerator { - ctxProm.resolveP(ctx); - yield* input; - }; - } - const middlewareFactory = - rpcUtilsMiddleware.defaultServerMiddlewareWrapper((ctx) => { - ctx.timer.reset(12345); - return { - forward: new TransformStream(), - reverse: new TransformStream(), - }; - }); - const rpcServer = new RPCServer({ - middlewareFactory: middlewareFactory, - logger, - idGen, - }); - await rpcServer.start({ - manifest: { - testMethod: new TestMethod({}), - }, - }); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const readWriteStream: RPCStream = { - cancel: () => {}, - readable: stream, - writable: outputStream, + test.prop({ + messages: specificMessageArb, + })('middleware can update timeout timer', async ({ messages }) => { + const stream = rpcTestUtils.messagesToReadableStream(messages); + const ctxProm = promise(); + class TestMethod extends DuplexHandler { + public handle = async function* ( + input: AsyncGenerator, + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): AsyncGenerator { + ctxProm.resolveP(ctx); + yield* input; }; - rpcServer.handleStream(readWriteStream); - await outputResult; - const ctx = await ctxProm.p; - expect(ctx.timer.delay).toBe(12345); - }, - ); + } + const middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper( + (ctx) => { + ctx.timer.reset(12345); + return { + forward: new TransformStream(), + reverse: new TransformStream(), + }; + }, + ); + const rpcServer = new RPCServer({ + middlewareFactory: middlewareFactory, + logger, + idGen, + }); + await rpcServer.start({ + manifest: { + testMethod: new TestMethod({}), + }, + }); + const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const readWriteStream: RPCStream = { + cancel: () => {}, + readable: stream, + writable: outputStream, + }; + rpcServer.handleStream(readWriteStream); + await outputResult; + const ctx = await ctxProm.p; + expect(ctx.timer.delay).toBe(12345); + }); describe('timeout priority', () => { - testProp( + test.prop({ + messages: specificMessageArb, + timeouts: rpcTestUtils.timeoutsArb, + })( 'check that handler can override higher timeout of RPCServer', - [specificMessageArb, rpcTestUtils.timeoutsArb], - async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + async ({ messages, timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); const { p: ctxP, resolveP: resolveCtxP } = promise(); class TestMethod extends DuplexHandler { @@ -1212,10 +1200,12 @@ describe(`${RPCServer.name}`, () => { await ctx.timer.catch(() => {}); }, ); - testProp( + test.prop({ + messages: specificMessageArb, + timeouts: rpcTestUtils.timeoutsArb, + })( 'check that handler can override lower timeout of RPCServer', - [specificMessageArb, rpcTestUtils.timeoutsArb], - async (messages, [lowerTimeoutTime, higherTimeoutTime]) => { + async ({ messages, timeouts: [lowerTimeoutTime, higherTimeoutTime] }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); const { p: ctxP, resolveP: resolveCtxP } = promise(); class TestMethod extends DuplexHandler { @@ -1255,10 +1245,12 @@ describe(`${RPCServer.name}`, () => { }, ); }); - testProp( + test.prop({ + messages: specificMessageArb, + timeoutTime: fc.integer({ min: 0 }), + })( 'check that handler can override lower timeout of RPCServer with Infinity', - [specificMessageArb, fc.integer({ min: 0 })], - async (messages, timeoutTime) => { + async ({ messages, timeoutTime }) => { const stream = rpcTestUtils.messagesToReadableStream(messages); const { p: ctxP, resolveP: resolveCtxP } = promise(); class TestMethod extends DuplexHandler { diff --git a/tests/middleware.test.ts b/tests/middleware.test.ts index 8d8d5ff..829ef10 100644 --- a/tests/middleware.test.ts +++ b/tests/middleware.test.ts @@ -1,4 +1,4 @@ -import { fc, testProp } from '@fast-check/jest'; +import { fc, test } from '@fast-check/jest'; import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; import { Timer } from '@matrixai/timer'; import * as rpcUtils from '@/utils'; @@ -15,57 +15,62 @@ describe('Middleware tests', () => { ) .noShrink(); - testProp( - 'can parse json stream', - [rpcTestUtils.jsonMessagesArb], - async (messages) => { - const parsedStream = rpcTestUtils - .messagesToReadableStream(messages) - .pipeThrough( - rpcUtilsMiddleware.binaryToJsonMessageStream( - rpcUtils.parseJSONRPCMessage, - ), - ); // Converting back. - - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toEqual(messages); + test.prop( + { + messages: rpcTestUtils.jsonMessagesArb, }, - { numRuns: 1000 }, - ); - testProp( - 'Message size limit is enforced when parsing', - [ - fc.array( + { + numRuns: 1000, + }, + )('asd', async ({ messages }) => { + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ); // Converting back. + + const asd = await AsyncIterable.as(parsedStream).toArray(); + expect(asd).toEqual(messages); + }); + test.prop( + { + messages: fc.array( rpcTestUtils.jsonRpcRequestMessageArb(fc.string({ minLength: 100 })), { minLength: 1, }, ), - ], - async (messages) => { - const parsedStream = rpcTestUtils - .messagesToReadableStream(messages) - .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream([10])) - .pipeThrough( - rpcUtilsMiddleware.binaryToJsonMessageStream( - rpcUtils.parseJSONRPCMessage, - 50, - ), - ); + }, + { numRuns: 1000 }, + )('Message size limit is enforced when parsing', async ({ messages }) => { + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream([10])) + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + 50, + ), + ); - const doThing = async () => { - for await (const _ of parsedStream) { - // No touch, only consume - } - }; - await expect(doThing()).rejects.toThrow(rpcErrors.ErrorRPCMessageLength); + const doThing = async () => { + for await (const _ of parsedStream) { + // No touch, only consume + } + }; + await expect(doThing()).rejects.toThrow(rpcErrors.ErrorRPCMessageLength); + }); + test.prop( + { + messages: rpcTestUtils.jsonMessagesArb, + snippattern: rpcTestUtils.snippingPatternArb, }, { numRuns: 1000 }, - ); - testProp( + )( 'can parse json stream with random chunk sizes', - [rpcTestUtils.jsonMessagesArb, rpcTestUtils.snippingPatternArb], - async (messages, snippattern) => { + async ({ messages, snippattern }) => { const parsedStream = rpcTestUtils .messagesToReadableStream(messages) .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snippattern)) // Imaginary internet here @@ -78,32 +83,35 @@ describe('Middleware tests', () => { const asd = await AsyncIterable.as(parsedStream).toArray(); expect(asd).toStrictEqual(messages); }, - { numRuns: 1000 }, ); - testProp( - 'Will error on bad data', - [rpcTestUtils.jsonMessagesArb, rpcTestUtils.snippingPatternArb, noiseArb], - async (messages, snippattern, noise) => { - const parsedStream = rpcTestUtils - .messagesToReadableStream(messages) - .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snippattern)) // Imaginary internet here - .pipeThrough(rpcTestUtils.binaryStreamToNoisyStream(noise)) // Adding bad data to the stream - .pipeThrough( - rpcUtilsMiddleware.binaryToJsonMessageStream( - rpcUtils.parseJSONRPCMessage, - ), - ); // Converting back. - - await expect(AsyncIterable.as(parsedStream).toArray()).rejects.toThrow( - rpcErrors.ErrorRPCParse, - ); + test.prop( + { + messages: rpcTestUtils.jsonMessagesArb, + snippattern: rpcTestUtils.snippingPatternArb, + noise: noiseArb, }, { numRuns: 1000 }, - ); - testProp( + )('Will error on bad data', async ({ messages, snippattern, noise }) => { + const parsedStream = rpcTestUtils + .messagesToReadableStream(messages) + .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snippattern)) // Imaginary internet here + .pipeThrough(rpcTestUtils.binaryStreamToNoisyStream(noise)) // Adding bad data to the stream + .pipeThrough( + rpcUtilsMiddleware.binaryToJsonMessageStream( + rpcUtils.parseJSONRPCMessage, + ), + ); // Converting back. + + await expect(AsyncIterable.as(parsedStream).toArray()).rejects.toThrow( + rpcErrors.ErrorRPCParse, + ); + }); + test.prop({ + messages: rpcTestUtils.jsonMessagesArb, + timeout: fc.integer({ min: 0 }), + })( 'timeoutMiddlewareServer should set ctx.timeout if timeout is lower', - [rpcTestUtils.jsonMessagesArb, fc.integer({ min: 0 })], - async (messages, timeout) => { + async ({ messages, timeout }) => { if (messages[0].params == null) messages[0].params = {}; messages[0].params.metadata = { ...messages[0].params.metadata, timeout }; const abortController = new AbortController(); @@ -133,10 +141,12 @@ describe('Middleware tests', () => { await timer.catch(() => {}); }, ); - testProp( + test.prop({ + messages: rpcTestUtils.jsonMessagesArb, + timeout: fc.integer({ min: 1 }), + })( 'timeoutMiddlewareServer wont set ctx.timeout if timeout is higher', - [rpcTestUtils.jsonMessagesArb, fc.integer({ min: 1 })], - async (messages, timeout) => { + async ({ messages, timeout }) => { if (messages[0].params == null) messages[0].params = {}; messages[0].params.metadata = { ...messages[0].params.metadata, timeout }; const abortController = new AbortController(); @@ -166,10 +176,11 @@ describe('Middleware tests', () => { await timer.catch(() => {}); }, ); - testProp( + test.prop({ + messages: rpcTestUtils.jsonMessagesArb, + })( 'timeoutMiddlewareServer should set ctx.timeout if timeout is infinity/null', - [rpcTestUtils.jsonMessagesArb], - async (messages) => { + async ({ messages }) => { if (messages[0].params == null) messages[0].params = {}; messages[0].params.metadata = { ...messages[0].params.metadata, @@ -206,10 +217,12 @@ describe('Middleware tests', () => { await timer.catch(() => {}); }, ); - testProp( + test.prop({ + messages: rpcTestUtils.jsonMessagesArb, + timeout: fc.integer({ min: 0 }), + })( 'timeoutMiddlewareClient can encode ctx.timeout', - [rpcTestUtils.jsonMessagesArb, fc.integer({ min: 0 })], - async (messages, timeout) => { + async ({ messages, timeout }) => { const abortController = new AbortController(); const timer = new Timer(undefined, timeout); const ctx = { diff --git a/tests/utils.test.ts b/tests/utils.test.ts index 7b37b6c..0e812dc 100644 --- a/tests/utils.test.ts +++ b/tests/utils.test.ts @@ -1,25 +1,25 @@ -import { testProp, fc } from '@fast-check/jest'; +import { test, fc } from '@fast-check/jest'; import * as rpcUtils from '@/utils'; import 'ix/add/asynciterable-operators/toarray'; import * as rpcTestUtils from './utils'; describe('utils tests', () => { - testProp( - 'can parse messages', - [rpcTestUtils.jsonRpcMessageArb()], - async (message) => { - rpcUtils.parseJSONRPCMessage(message); + test.prop( + { + message: rpcTestUtils.jsonRpcMessageArb(), }, { numRuns: 1000 }, - ); - testProp( - 'malformed data cases parsing errors', - [fc.json()], - async (message) => { - expect(() => - rpcUtils.parseJSONRPCMessage(Buffer.from(JSON.stringify(message))), - ).toThrow(); + )('can parse messages', async ({ message }) => { + rpcUtils.parseJSONRPCMessage(message); + }); + test.prop( + { + message: fc.json(), }, { numRuns: 1000 }, - ); + )('malformed data cases parsing errors', async ({ message }) => { + expect(() => + rpcUtils.parseJSONRPCMessage(Buffer.from(JSON.stringify(message))), + ).toThrow(); + }); }); From 7d499342aeada478dabd62384be65377b0c1f61e Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 29 Jan 2024 13:58:25 +1100 Subject: [PATCH 3/4] syntax: general code clean up [ci skip] --- src/RPCClient.ts | 15 ++-- src/RPCServer.ts | 1 + src/types.ts | 6 +- src/utils.ts | 9 +-- tests/RPC.test.ts | 46 ++++++------ tests/RPCClient.test.ts | 15 ++-- tests/RPCServer.test.ts | 155 +++++++++++++++++---------------------- tests/middleware.test.ts | 38 +++++----- 8 files changed, 134 insertions(+), 151 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 9903baf..6b69545 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -50,16 +50,20 @@ class RPCClient { if (typeof method === 'symbol') return; switch (this.callerTypes[method]) { case 'UNARY': - return (params, ctx) => this.unaryCaller(method, params, ctx); + return (params: JSONObject, ctx: Partial) => + this.unaryCaller(method, params, ctx); case 'SERVER': - return (params, ctx) => + return (params: JSONObject, ctx: Partial) => this.serverStreamCaller(method, params, ctx); case 'CLIENT': - return (ctx) => this.clientStreamCaller(method, ctx); + return (ctx: Partial) => + this.clientStreamCaller(method, ctx); case 'DUPLEX': - return (ctx) => this.duplexStreamCaller(method, ctx); + return (ctx: Partial) => + this.duplexStreamCaller(method, ctx); case 'RAW': - return (header, ctx) => this.rawStreamCaller(method, header, ctx); + return (header: JSONObject, ctx: Partial) => + this.rawStreamCaller(method, header, ctx); default: return; } @@ -381,7 +385,6 @@ class RPCClient { * single RPC message that is sent to specify the method for the RPC call. * Any metadata of extra parameters is provided here. * @param ctx - ContextTimed used for timeouts and cancellation. - * @param id - Id is generated only once, and used throughout the stream for the rest of the communication */ public async rawStreamCaller( method: string, diff --git a/src/RPCServer.ts b/src/RPCServer.ts index ddd4e1a..f4214f5 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -76,6 +76,7 @@ class RPCServer { /** * RPCServer Constructor * + * @param obj * @param obj.middlewareFactory - Middleware used to process the rpc messages. * The middlewareFactory needs to be a function that creates a pair of * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward diff --git a/src/types.ts b/src/types.ts index da1fdaa..c643032 100644 --- a/src/types.ts +++ b/src/types.ts @@ -121,7 +121,7 @@ type JSONRPCRequestMetadata = Partial<{ }>; /** - * `T` is the the params you want to specify. + * `T` is the params you want to specify. * * `M` is the metadata you want to specify. * @@ -141,7 +141,7 @@ type JSONRPCResponseMetadata = Partial<{ }>; /** - * `T` is the the result you want to specify. + * `T` is the result you want to specify. * * `M` is the metadata you want to specify. * @@ -251,7 +251,7 @@ interface RPCStream } /** - * This is a factory for creating a `RPCStream` when making a RPC call. + * This is a factory for creating a `RPCStream` when making an RPC call. * The transport mechanism is a black box to the RPC system. So long as it is * provided as a RPCStream the RPC system should function. It is assumed that * the RPCStream communicates with an `RPCServer`. diff --git a/src/utils.ts b/src/utils.ts index 5a60e38..edd2816 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -3,12 +3,12 @@ import type { ClientManifest, HandlerType, JSONObject, - JSONRPCResponseError, JSONRPCMessage, JSONRPCRequest, JSONRPCRequestMessage, JSONRPCRequestNotification, JSONRPCResponse, + JSONRPCResponseError, JSONRPCResponseFailed, JSONRPCResponseSuccess, JSONValue, @@ -22,7 +22,7 @@ import * as errors from './errors'; const timeoutCancelledReason = Symbol('timeoutCancelledReason'); -// Importing PK funcs and utils which are essential for RPC +// Importing PK functions and utils which are essential for RPC function isObject(o: unknown): o is object { return o !== null && typeof o === 'object'; } @@ -222,7 +222,7 @@ function parseJSONRPCMessage( * @throws {TypeError} If the error is an instance of {@link Symbol}, {@link BigInt} or {@link Function}. */ function fromError(error: any): JSONValue { - // TODO: Linked-List traversal must be done iteractively rather than recusively to prevent stack overflow. + // TODO: Linked-List traversal must be done interactively rather than recursively to prevent stack overflow. switch (typeof error) { case 'symbol': case 'bigint': @@ -385,10 +385,9 @@ function toError( e.cause = toError(errorData.data.cause, clientMetadata, false); } if (top) { - const err = new errors.ErrorRPCRemote(clientMetadata, undefined, { + return new errors.ErrorRPCRemote(clientMetadata, undefined, { cause: e, }); - return err; } else { return e; } diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index a5b20fb..895b046 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -48,7 +48,7 @@ describe('RPC', () => { Uint8Array >(); - let header: JSONRPCRequest | undefined; + let header: JSONRPCRequest | undefined = undefined; class TestMethod extends RawHandler { public handle = async ( @@ -546,8 +546,8 @@ describe('RPC', () => { class TestMethod extends UnaryHandler { public handle = async ( input: JSONRPCRequestParams, - _cancel, - _meta, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): Promise => { const { p, resolveP } = utils.promise(); @@ -648,7 +648,7 @@ describe('RPC', () => { idGen, }); - // Create a new promise so we can await it multiple times for assertions + // Create a new promise, so we can await it multiple times for assertions const callProm = rpcClient.methods.testMethod({ value }); // The promise should be rejected @@ -748,9 +748,9 @@ describe('RPC', () => { const timeout = 1; class TestMethod extends DuplexHandler { public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, + _input: AsyncIterableIterator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): AsyncIterableIterator { // Check for abort event @@ -839,9 +839,9 @@ describe('RPC', () => { // Define the server's method behavior class TestMethod extends DuplexHandler { public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, + _input: AsyncIterableIterator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ) { ctx.signal.throwIfAborted(); @@ -911,9 +911,9 @@ describe('RPC', () => { >(); class TestMethod extends DuplexHandler { public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, + _input: AsyncIterableIterator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): AsyncIterableIterator { ctx.signal.throwIfAborted(); @@ -979,9 +979,9 @@ describe('RPC', () => { class TestMethod extends DuplexHandler { public handle = async function* ( - input: AsyncIterableIterator, - cancel: (reason?: any) => void, - meta: Record | undefined, + _input: AsyncIterableIterator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ) { ctx.signal.throwIfAborted(); @@ -1016,7 +1016,7 @@ describe('RPC', () => { const writer = callerInterface.writable.getWriter(); const reader = callerInterface.readable.getReader(); - // Trigger a call that will hang indefinitely or for a long time #TODO + // Trigger a call that will hang indefinitely or for a long time // Write a value to the stream await writer.write({ value: inputData }); @@ -1045,7 +1045,7 @@ describe('RPC', () => { // Cancel caller timer callerTimer.cancel(); - // Expect neither to time out and verify that they can still handle other operations #TODO + // Expect neither to time out and verify that they can still handle other operations await rpcServer.stop({ force: true }); }); test('RPC server times out using client timeout', async () => { @@ -1057,9 +1057,9 @@ describe('RPC', () => { const { p: ctxP, resolveP: resolveCtxP } = utils.promise(); class TestMethod extends UnaryHandler { public handle = async ( - input: JSONObject, - cancel: (reason?: any) => void, - meta: Record | undefined, + _input: JSONObject, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): Promise => { const abortProm = utils.promise(); @@ -1121,8 +1121,8 @@ describe('RPC', () => { class TestMethod extends UnaryHandler { public handle = async ( input: JSONObject, - cancel: (reason?: any) => void, - meta: Record | undefined, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): Promise => { const abortProm = utils.promise(); diff --git a/tests/RPCClient.test.ts b/tests/RPCClient.test.ts index aa180c5..622ea72 100644 --- a/tests/RPCClient.test.ts +++ b/tests/RPCClient.test.ts @@ -612,14 +612,11 @@ describe(`${RPCClient.name}`, () => { }), ); }); - test.prop( - { - headerParams: rpcTestUtils.safeJsonObjectArb, - inputData: rpcTestUtils.rawDataArb, - outputData: rpcTestUtils.rawDataArb, - }, - { seed: -783452149, path: '0:0:0:0:0:0:0', endOnFailure: true }, // FIXME: remove - )('manifest raw caller', async ({ headerParams, inputData, outputData }) => { + test.prop({ + headerParams: rpcTestUtils.safeJsonObjectArb, + inputData: rpcTestUtils.rawDataArb, + outputData: rpcTestUtils.rawDataArb, + })('manifest raw caller', async ({ headerParams, inputData, outputData }) => { const [inputResult, inputWritableStream] = rpcTestUtils.streamToArray(); const [outputResult, outputWritableStream] = @@ -743,7 +740,7 @@ describe(`${RPCClient.name}`, () => { idGen, }); - expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + expect(constructorF).toThrow(rpcErrors.ErrorRPCInvalidTimeout); }, ); describe('raw caller', () => { diff --git a/tests/RPCServer.test.ts b/tests/RPCServer.test.ts index 01e8f9d..2d55b7c 100644 --- a/tests/RPCServer.test.ts +++ b/tests/RPCServer.test.ts @@ -9,8 +9,9 @@ import type { JSONValue, RPCStream, } from '@/types'; -import type { RPCErrorEvent } from '@/events'; import type { IdGen } from '@/types'; +import type { PromiseCancellable } from '@matrixai/async-cancellable'; +import type * as rpcEvents from '@/events'; import { ReadableStream, TransformStream, WritableStream } from 'stream/web'; import { fc, test } from '@fast-check/jest'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; @@ -116,10 +117,10 @@ describe(`${RPCServer.name}`, () => { _meta: Record | undefined, _ctx: ContextTimed, ): AsyncGenerator { - for await (const val of input) { - yield val; - break; - } + // Yield only the first value + const result = await input.next(); + if (!result.done) yield result.value; + await input.return(undefined); }; } const rpcServer = new RPCServer({ @@ -298,11 +299,11 @@ describe(`${RPCServer.name}`, () => { remoteHost: 'hostA', remotePort: 12341, }; - let handledMeta; + let handledMeta: Record | undefined = undefined; class TestMethod extends DuplexHandler { public handle = async function* ( input: AsyncGenerator, - cancel: (reason?: any) => void, + _cancel: (reason?: any) => void, meta: Record | undefined, _ctx: ContextTimed, ): AsyncGenerator { @@ -340,8 +341,8 @@ describe(`${RPCServer.name}`, () => { class TestMethod extends DuplexHandler { public handle = async function* ( input: AsyncGenerator, - cancel: (reason?: any) => void, - meta: Record | undefined, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): AsyncGenerator { for await (const val of input) { @@ -361,15 +362,15 @@ describe(`${RPCServer.name}`, () => { }); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - let thing; + let activeStream: PromiseCancellable | undefined = undefined; const tapStream = rpcTestUtils.tapTransformStream( async (_, iteration) => { if (iteration === 2) { // @ts-ignore: kidnap private property const activeStreams = rpcServer.activeStreams.values(); - for (const activeStream of activeStreams) { - thing = activeStream; - activeStream.cancel(Error('Some error')); + for (const activeStream_ of activeStreams) { + activeStream = activeStream_; + activeStream_.cancel(Error('Some error')); } } }, @@ -383,7 +384,8 @@ describe(`${RPCServer.name}`, () => { rpcServer.handleStream(readWriteStream); const result = await outputResult; const lastMessage = result[result.length - 1]; - await expect(thing).toResolve(); + expect(activeStream).toBeDefined(); + await expect(activeStream!).toResolve(); expect(lastMessage).toBeDefined(); expect(() => rpcUtils.parseJSONRPCResponseFailed(JSON.parse(lastMessage.toString())), @@ -446,13 +448,13 @@ describe(`${RPCServer.name}`, () => { testMethod: new TestMethod({}), }, }); - let resolve, reject; - const errorProm = new Promise((resolve_, reject_) => { - resolve = resolve_; - reject = reject_; - }); - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); + const { + p: errorEventP, + resolveP: resolveErrorEventP, + rejectP: rejectErrorEventP, + } = rpcUtils.promise(); + rpcServer.addEventListener('error', (event: rpcEvents.RPCErrorEvent) => { + resolveErrorEventP(event); }); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const readWriteStream: RPCStream = { @@ -464,8 +466,8 @@ describe(`${RPCServer.name}`, () => { const rawErrorMessage = (await outputResult)[0]!.toString(); const errorMessage = JSON.parse(rawErrorMessage); expect(errorMessage.error.message).toEqual(error.message); - reject(); - await expect(errorProm).toReject(); + rejectErrorEventP(Error('Never received error event')); + await expect(errorEventP).toReject(); await rpcServer.stop({ force: true }); }); test.prop({ @@ -491,13 +493,13 @@ describe(`${RPCServer.name}`, () => { testMethod: new TestMethod({}), }, }); - let resolve, reject; - const errorProm = new Promise((resolve_, reject_) => { - resolve = resolve_; - reject = reject_; - }); - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); + const { + p: errorEventP, + resolveP: resolveErrorEventP, + rejectP: rejectErrorEventP, + } = rpcUtils.promise(); + rpcServer.addEventListener('error', (thing: rpcEvents.RPCErrorEvent) => { + resolveErrorEventP(thing); }); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const readWriteStream: RPCStream = { @@ -509,8 +511,8 @@ describe(`${RPCServer.name}`, () => { const rawErrorMessage = (await outputResult)[0]!.toString(); const errorMessage = JSON.parse(rawErrorMessage); expect(errorMessage.error.message).toEqual(error.message); - reject(); - await expect(errorProm).toReject(); + rejectErrorEventP(Error('Never received error event')); + await expect(errorEventP).toReject(); await rpcServer.stop({ force: true }); }, ); @@ -523,7 +525,7 @@ describe(`${RPCServer.name}`, () => { const handlerEndedProm = promise(); class TestMethod extends DuplexHandler { public handle = async function* ( - input, + input: AsyncGenerator, ): AsyncGenerator { try { for await (const _ of input) { @@ -543,10 +545,6 @@ describe(`${RPCServer.name}`, () => { testMethod: new TestMethod({}), }, }); - let resolve; - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); - }); const passThroughStreamIn = new TransformStream(); const [outputResult, outputStream] = rpcTestUtils.streamToArray(); const readWriteStream: RPCStream = { @@ -579,15 +577,15 @@ describe(`${RPCServer.name}`, () => { { numRuns: 1 }, )('should emit stream error if output stream fails', async ({ messages }) => { const handlerEndedProm = promise(); - let ctx: ContextTimed | undefined; + const ctxProm = promise(); class TestMethod extends DuplexHandler { public handle = async function* ( - input, - _cancel, - _meta, - ctx_, + input: AsyncGenerator, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, ): AsyncGenerator { - ctx = ctx_; + ctxProm.resolveP(ctx); // Echo input try { yield* input; @@ -605,13 +603,14 @@ describe(`${RPCServer.name}`, () => { testMethod: new TestMethod({}), }, }); - let resolve; - const errorProm = new Promise((resolve_) => { - resolve = resolve_; - }); - rpcServer.addEventListener('error', (thing: RPCErrorEvent) => { - resolve(thing); - }); + const { p: errorEventP, resolveP: resolveErrorEventP } = + rpcUtils.promise(); + rpcServer.addEventListener( + 'error', + (rpcErrorEvent: rpcEvents.RPCErrorEvent) => { + resolveErrorEventP(rpcErrorEvent); + }, + ); const passThroughStreamIn = new TransformStream(); const passThroughStreamOut = new TransformStream(); const readWriteStream: RPCStream = { @@ -628,21 +627,18 @@ describe(`${RPCServer.name}`, () => { await reader.read(); } // Abort stream - // const writerReason = Symbol('writerAbort'); const readerReason = Symbol('readerAbort'); - // Await writer.abort(writerReason); await reader.cancel(readerReason); // We should get an error event - const event = await errorProm; + const event = await errorEventP; await writer.close(); - // Expect(event.detail.cause).toContain(writerReason); expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCStreamEnded); // Check that the handler was cleaned up. await expect(handlerEndedProm.p).toResolve(); // Check that an abort signal happened - expect(ctx).toBeDefined(); - expect(ctx?.signal.aborted).toBeTrue(); - expect(ctx?.signal.reason).toBe(readerReason); + const ctx = await ctxProm.p; + expect(ctx.signal.aborted).toBeTrue(); + expect(ctx.signal.reason).toBe(readerReason); await rpcServer.stop({ force: true }); }); test.prop({ @@ -850,7 +846,7 @@ describe(`${RPCServer.name}`, () => { idGen, }); - expect(constructorF).toThrowError(rpcErrors.ErrorRPCInvalidTimeout); + expect(constructorF).toThrow(rpcErrors.ErrorRPCInvalidTimeout); }, ); test.prop({ @@ -865,11 +861,11 @@ describe(`${RPCServer.name}`, () => { timeout = timeoutTime; public handle = async ( input: JSONRPCRequestParams, - _cancel, - _meta, - ctx_, + _cancel: (reason?: any) => void, + _meta: Record | undefined, + ctx: ContextTimed, ): Promise => { - ctxLongProm.resolveP(ctx_); + ctxLongProm.resolveP(ctx); await waitProm.p; return input; }; @@ -895,16 +891,13 @@ describe(`${RPCServer.name}`, () => { class TestHandler extends RawHandler { public handle = async ( - _input: [ - JSONRPCRequest, - ReadableStream, - ], + _input: [JSONRPCRequest, ReadableStream], _cancel: (reason?: any) => void, _meta: Record | undefined, - ctx_: ContextTimed, + ctx: ContextTimed, ): Promise<[JSONRPCResponseResult, ReadableStream]> => { return new Promise((resolve) => { - ctxProm.resolveP(ctx_); + ctxProm.resolveP(ctx); let controller: ReadableStreamController; const stream = new ReadableStream({ @@ -913,7 +906,7 @@ describe(`${RPCServer.name}`, () => { }, }); - ctx_.signal.addEventListener('abort', () => { + ctx.signal.addEventListener('abort', () => { controller!.error(Error('ending')); }); @@ -922,7 +915,6 @@ describe(`${RPCServer.name}`, () => { }); }; } - const rpcServer = new RPCServer({ timeoutTime: 100, logger, @@ -953,17 +945,11 @@ describe(`${RPCServer.name}`, () => { readable: stream, writable: outputStream, }; - rpcServer.handleStream(readWriteStream); - const ctx = await ctxProm.p; - expect(ctx.timer.delay).toEqual(100); - await ctx.timer; - expect(ctx.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut); - await expect(outputResult).toReject(); await rpcServer.stop({ force: true }); @@ -1003,8 +989,8 @@ describe(`${RPCServer.name}`, () => { class TestHandler extends DuplexHandler { public handle = async function* ( input: AsyncGenerator>, - cancel: (reason?: any) => void, - meta: Record | undefined, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): AsyncGenerator> { contextProm.resolveP(ctx); @@ -1053,16 +1039,13 @@ describe(`${RPCServer.name}`, () => { const ctxProm = promise(); class TestHandler extends RawHandler { public handle = async ( - input: [ - JSONRPCRequest, - ReadableStream, - ], + input: [JSONRPCRequest, ReadableStream], _cancel: (reason?: any) => void, _meta: Record | undefined, - ctx_: ContextTimed, + ctx: ContextTimed, ): Promise<[JSONRPCResponseResult, ReadableStream]> => { return new Promise((resolve) => { - ctxProm.resolveP(ctx_); + ctxProm.resolveP(ctx); void (async () => { for await (const _ of input[1]) { // Do nothing, just consume @@ -1117,8 +1100,8 @@ describe(`${RPCServer.name}`, () => { class TestMethod extends DuplexHandler { public handle = async function* ( input: AsyncGenerator, - cancel: (reason?: any) => void, - meta: Record | undefined, + _cancel: (reason?: any) => void, + _meta: Record | undefined, ctx: ContextTimed, ): AsyncGenerator { ctxProm.resolveP(ctx); diff --git a/tests/middleware.test.ts b/tests/middleware.test.ts index 829ef10..4b738db 100644 --- a/tests/middleware.test.ts +++ b/tests/middleware.test.ts @@ -22,7 +22,7 @@ describe('Middleware tests', () => { { numRuns: 1000, }, - )('asd', async ({ messages }) => { + )('converting to raw and back to JSON', async ({ messages }) => { const parsedStream = rpcTestUtils .messagesToReadableStream(messages) .pipeThrough( @@ -31,8 +31,8 @@ describe('Middleware tests', () => { ), ); // Converting back. - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toEqual(messages); + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toEqual(messages); }); test.prop( { @@ -65,36 +65,36 @@ describe('Middleware tests', () => { test.prop( { messages: rpcTestUtils.jsonMessagesArb, - snippattern: rpcTestUtils.snippingPatternArb, + snipPattern: rpcTestUtils.snippingPatternArb, }, { numRuns: 1000 }, )( 'can parse json stream with random chunk sizes', - async ({ messages, snippattern }) => { + async ({ messages, snipPattern: snipPattern }) => { const parsedStream = rpcTestUtils .messagesToReadableStream(messages) - .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snippattern)) // Imaginary internet here + .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snipPattern)) // Imaginary internet here .pipeThrough( rpcUtilsMiddleware.binaryToJsonMessageStream( rpcUtils.parseJSONRPCMessage, ), ); // Converting back. - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toStrictEqual(messages); + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toStrictEqual(messages); }, ); test.prop( { messages: rpcTestUtils.jsonMessagesArb, - snippattern: rpcTestUtils.snippingPatternArb, + snipPattern: rpcTestUtils.snippingPatternArb, noise: noiseArb, }, { numRuns: 1000 }, - )('Will error on bad data', async ({ messages, snippattern, noise }) => { + )('Will error on bad data', async ({ messages, snipPattern, noise }) => { const parsedStream = rpcTestUtils .messagesToReadableStream(messages) - .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snippattern)) // Imaginary internet here + .pipeThrough(rpcTestUtils.binaryStreamToSnippedStream(snipPattern)) // Imaginary internet here .pipeThrough(rpcTestUtils.binaryStreamToNoisyStream(noise)) // Adding bad data to the stream .pipeThrough( rpcUtilsMiddleware.binaryToJsonMessageStream( @@ -134,8 +134,8 @@ describe('Middleware tests', () => { ) // Converting back. .pipeThrough(timeoutMiddleware.forward); - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toEqual(messages); + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toEqual(messages); expect(timer.delay).toBe(timeout); timer.cancel(); await timer.catch(() => {}); @@ -169,8 +169,8 @@ describe('Middleware tests', () => { ) // Converting back. .pipeThrough(timeoutMiddleware.forward); - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toEqual(messages); + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toEqual(messages); expect(timer.delay).toBe(0); timer.cancel(); await timer.catch(() => {}); @@ -210,8 +210,8 @@ describe('Middleware tests', () => { if (expectedMessages[0].params?.metadata != null) { expectedMessages[0].params.metadata.timeout = null; } - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toEqual(expectedMessages); + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toEqual(expectedMessages); expect(timer.delay).toBe(Infinity); timer.cancel(); await timer.catch(() => {}); @@ -249,8 +249,8 @@ describe('Middleware tests', () => { ...expectedMessages[0].params.metadata, timeout, }; - const asd = await AsyncIterable.as(parsedStream).toArray(); - expect(asd).toEqual(expectedMessages); + const messagesParsed = await AsyncIterable.as(parsedStream).toArray(); + expect(messagesParsed).toEqual(expectedMessages); expect(timer.delay).toBe(timeout); timer.cancel(); await timer.catch(() => {}); From 144d5e30eeec841ab5032d807fdc6a3ea18f344c Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 29 Jan 2024 15:00:53 +1100 Subject: [PATCH 4/4] fix: removed usage of `registerOnTimeoutCallback` This was a very odd thing to add to the API. So I've removed it. Also fixed up some tests relating to that. [ci skip] --- src/RPCClient.ts | 7 --- src/RPCServer.ts | 8 --- tests/RPC.test.ts | 128 +++++++--------------------------------------- tests/utils.ts | 3 +- 4 files changed, 21 insertions(+), 125 deletions(-) diff --git a/src/RPCClient.ts b/src/RPCClient.ts index 6b69545..bf57357 100644 --- a/src/RPCClient.ts +++ b/src/RPCClient.ts @@ -25,7 +25,6 @@ import * as utils from './utils'; const timerCleanupReasonSymbol = Symbol('timerCleanUpReasonSymbol'); class RPCClient { - protected onTimeoutCallback?: () => void; protected idGen: IdGen; protected logger: Logger; protected streamFactory: StreamFactory; @@ -37,9 +36,6 @@ class RPCClient { Uint8Array >; protected callerTypes: Record; - public registerOnTimeoutCallback(callback: () => void) { - this.onTimeoutCallback = callback; - } // Method proxies public readonly timeoutTime: number; public readonly graceTime: number; @@ -288,9 +284,6 @@ class RPCClient { void timer.then( () => { abortController.abort(timeoutError); - if (this.onTimeoutCallback) { - this.onTimeoutCallback(); - } }, () => {}, // Ignore cancellation error ); diff --git a/src/RPCServer.ts b/src/RPCServer.ts index f4214f5..0d65799 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -53,7 +53,6 @@ interface RPCServer extends startStop.StartStop {} eventStopped: events.EventRPCServerStopped, }) class RPCServer { - protected onTimeoutCallback?: () => void; protected idGen: IdGen; protected logger: Logger; protected handlerMap: Map = new Map(); @@ -68,10 +67,6 @@ class RPCServer { Uint8Array, JSONRPCResponseSuccess >; - // Function to register a callback for timeout - public registerOnTimeoutCallback(callback: () => void) { - this.onTimeoutCallback = callback; - } /** * RPCServer Constructor @@ -465,9 +460,6 @@ class RPCServer { delay: this.timeoutTime, handler: () => { abortController.abort(new errors.ErrorRPCTimedOut()); - if (this.onTimeoutCallback) { - this.onTimeoutCallback(); - } }, }); diff --git a/tests/RPC.test.ts b/tests/RPC.test.ts index 895b046..478b9bc 100644 --- a/tests/RPC.test.ts +++ b/tests/RPC.test.ts @@ -733,102 +733,13 @@ describe('RPC', () => { await expect(reader.closed).toReject(); await expect(rpcServer.stop({ force: false })).toResolve(); }); - test.prop({ - inputData: rpcTestUtils.safeJsonValueArb, - })('RPC client and server timeout concurrently', async ({ inputData }) => { - let serverTimedOut = false; - let clientTimedOut = false; - - // Setup server and client communication pairs - const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Uint8Array, - Uint8Array - >(); - - const timeout = 1; - class TestMethod extends DuplexHandler { - public handle = async function* ( - _input: AsyncIterableIterator, - _cancel: (reason?: any) => void, - _meta: Record | undefined, - ctx: ContextTimed, - ): AsyncIterableIterator { - // Check for abort event - ctx.signal.throwIfAborted(); - const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { - abortProm.rejectP(ctx.signal.reason); - }); - await abortProm.p; - }; - } - const testMethodInstance = new TestMethod({}); - // Set up a client and server with matching timeout settings - const rpcServer = new RPCServer({ - logger, - idGen, - timeoutTime: timeout, - }); - await rpcServer.start({ - manifest: { - testMethod: testMethodInstance, - }, - }); - // Register callback - rpcServer.registerOnTimeoutCallback(() => { - serverTimedOut = true; - }); - rpcServer.handleStream({ - ...serverPair, - cancel: () => {}, - }); - - const rpcClient = new RPCClient({ - manifest: { - testMethod: new DuplexCaller(), - }, - streamFactory: async () => { - return { - ...clientPair, - cancel: () => {}, - }; - }, - logger, - idGen, - }); - const callerInterface = await rpcClient.methods.testMethod({ - timer: timeout, - }); - // Register callback - rpcClient.registerOnTimeoutCallback(() => { - clientTimedOut = true; - }); - const writer = callerInterface.writable.getWriter(); - const reader = callerInterface.readable.getReader(); - // Wait for server and client to timeout by checking the flag - await new Promise((resolve) => { - const checkFlag = () => { - if (serverTimedOut && clientTimedOut) resolve(); - else setTimeout(() => checkFlag(), 10); - }; - checkFlag(); - }); - // Expect both the client and the server to time out - await expect(writer.write({ value: inputData })).rejects.toThrow( - 'Timed out waiting for header', - ); - - await expect(reader.read()).rejects.toThrow('Timed out waiting for header'); - - await rpcServer.stop({ force: true }); - }); test.prop( { inputData: rpcTestUtils.safeJsonValueArb, }, { numRuns: 1 }, )('RPC server times out before client', async ({ inputData }) => { - let serverTimedOut = false; + const serverTimedOutProm = utils.promise(); // Setup server and client communication pairs const { clientPair, serverPair } = rpcTestUtils.createTapPairs< @@ -844,12 +755,18 @@ describe('RPC', () => { _meta: Record | undefined, ctx: ContextTimed, ) { - ctx.signal.throwIfAborted(); const abortProm = utils.promise(); - ctx.signal.addEventListener('abort', () => { + if (ctx.signal.aborted) { abortProm.rejectP(ctx.signal.reason); + } else { + ctx.signal.addEventListener('abort', () => { + abortProm.rejectP(ctx.signal.reason); + }); + } + await abortProm.p.catch((e) => { + serverTimedOutProm.resolveP(); + throw e; }); - await abortProm.p; }; } @@ -860,10 +777,6 @@ describe('RPC', () => { timeoutTime: 1, }); await rpcServer.start({ manifest: { testMethod: new TestMethod({}) } }); - // Register callback - rpcServer.registerOnTimeoutCallback(() => { - serverTimedOut = true; - }); rpcServer.handleStream({ ...serverPair, cancel: () => {} }); // Create an instance of the RPC client with a longer timeout @@ -880,20 +793,17 @@ describe('RPC', () => { }); const writer = callerInterface.writable.getWriter(); const reader = callerInterface.readable.getReader(); - // Wait for server to timeout by checking the flag - await new Promise((resolve) => { - const checkFlag = () => { - if (serverTimedOut) resolve(); - else setTimeout(() => checkFlag(), 10); - }; - checkFlag(); - }); // We expect server to timeout before the client - await expect(writer.write({ value: inputData })).rejects.toThrow( - 'Timed out waiting for header', - ); - await expect(reader.read()).rejects.toThrow('Timed out waiting for header'); + await expect(writer.write({ value: inputData })).toResolve(); + await serverTimedOutProm.p; + const readP = reader.read(); + await expect(readP).rejects.toThrow(ErrorRPCRemote); + await expect( + readP.catch((e) => { + throw e.cause; + }), + ).rejects.toThrow(rpcErrors.ErrorRPCTimedOut); // Cleanup await rpcServer.stop({ force: true }); diff --git a/tests/utils.ts b/tests/utils.ts index 69643c6..dc13f31 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -83,7 +83,8 @@ const messagesToReadableStream = (messages: Array) => { */ const safeJsonValueArb = fc .json() - .map((value) => JSON.parse(value.replace('__proto__', 'proto')) as JSONValue); + .map((value) => JSON.parse(value.replace('__proto__', 'proto')) as JSONValue) + .noShrink(); const safeJsonObjectArb = fc.dictionary( fc.string().map((s) => s.replace('__proto__', 'proto')),