diff --git a/README.md b/README.md
index 5706852..4032c58 100644
--- a/README.md
+++ b/README.md
@@ -66,6 +66,3 @@ npm publish --access public
git push
git push --tags
```
-
-Domains Diagram:
-
diff --git a/images/diagram_encapuslated.svg b/images/diagram_encapuslated.svg
deleted file mode 100644
index bad2f68..0000000
--- a/images/diagram_encapuslated.svg
+++ /dev/null
@@ -1,17 +0,0 @@
-
\ No newline at end of file
diff --git a/package.json b/package.json
index f9303d9..66c6d91 100644
--- a/package.json
+++ b/package.json
@@ -53,16 +53,6 @@
"ts-node": "^10.9.1",
"tsconfig-paths": "^3.9.0",
"typedoc": "^0.23.21",
- "typescript": "^4.9.3",
- "@fast-check/jest": "^1.1.0"
- },
- "dependencies": {
- "@matrixai/async-init": "^1.9.4",
- "@matrixai/contexts": "^1.2.0",
- "@matrixai/logger": "^3.1.0",
- "@matrixai/errors": "^1.2.0",
- "@matrixai/events": "^3.2.0",
- "@streamparser/json": "^0.0.17",
- "ix": "^5.0.0"
+ "typescript": "^4.9.3"
}
}
diff --git a/src/RPCClient.ts b/src/RPCClient.ts
deleted file mode 100644
index 6d19363..0000000
--- a/src/RPCClient.ts
+++ /dev/null
@@ -1,592 +0,0 @@
-import type { WritableStream, ReadableStream } from 'stream/web';
-import type { ContextTimedInput } from '@matrixai/contexts';
-import type {
- HandlerType,
- JSONRPCRequestMessage,
- StreamFactory,
- ClientManifest,
- RPCStream,
- JSONRPCResponseResult,
-} from './types';
-import type { JSONValue, IdGen } from './types';
-import type {
- JSONRPCRequest,
- JSONRPCResponse,
- MiddlewareFactory,
- MapCallers,
-} from './types';
-import type { ErrorRPCRemote } from './errors';
-import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
-import Logger from '@matrixai/logger';
-import { Timer } from '@matrixai/timer';
-import { createDestroy } from '@matrixai/async-init';
-import * as rpcUtilsMiddleware from './utils/middleware';
-import * as rpcErrors from './errors';
-import * as rpcUtils from './utils/utils';
-import { promise } from './utils';
-import { ErrorRPCStreamEnded, never } from './errors';
-import * as events from './events';
-
-const timerCleanupReasonSymbol = Symbol('timerCleanUpReasonSymbol');
-
-/**
- * Events:
- * - {@link events.Event}
- */
-interface RPCClient
- extends createDestroy.CreateDestroy {}
-/**
- * You must provide an error handler `addEventListener('error')`.
- * Otherwise, errors will just be ignored.
- *
- * Events:
- * - {@link events.EventRPCClientDestroy}
- * - {@link events.EventRPCClientDestroyed}
- */
-@createDestroy.CreateDestroy({
- eventDestroy: events.EventRPCClientDestroy,
- eventDestroyed: events.EventRPCClientDestroyed,
-})
-class RPCClient {
- /**
- * @param obj
- * @param obj.manifest - Client manifest that defines the types for the rpc
- * methods.
- * @param obj.streamFactory - An arrow function that when called, creates a
- * new stream for each rpc method call.
- * @param obj.middlewareFactory - Middleware used to process the rpc messages.
- * The middlewareFactory needs to be a function that creates a pair of
- * transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward
- * path and `Uint8Array` to `JSONRPCResponse` on the reverse path.
- * @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call.
- * Defaults to 60,000 milliseconds.
- * for a client call.
- * @param obj.logger
- */
- static async createRPCClient({
- manifest,
- streamFactory,
- middlewareFactory = rpcUtilsMiddleware.defaultClientMiddlewareWrapper(),
- streamKeepAliveTimeoutTime = Infinity, // 1 minute
- logger = new Logger(this.name),
- idGen = () => Promise.resolve(null),
- }: {
- manifest: M;
- streamFactory: StreamFactory;
- middlewareFactory?: MiddlewareFactory<
- Uint8Array,
- JSONRPCRequest,
- JSONRPCResponse,
- Uint8Array
- >;
- streamKeepAliveTimeoutTime?: number;
- logger?: Logger;
- idGen: IdGen;
- toError?: (errorData, metadata?: JSONValue) => ErrorRPCRemote;
- }) {
- logger.info(`Creating ${this.name}`);
- const rpcClient = new this({
- manifest,
- streamFactory,
- middlewareFactory,
- streamKeepAliveTimeoutTime: streamKeepAliveTimeoutTime,
- logger,
- idGen,
- });
- logger.info(`Created ${this.name}`);
- return rpcClient;
- }
- protected onTimeoutCallback?: () => void;
- protected idGen: IdGen;
- protected logger: Logger;
- protected streamFactory: StreamFactory;
- protected middlewareFactory: MiddlewareFactory<
- Uint8Array,
- JSONRPCRequest,
- JSONRPCResponse,
- Uint8Array
- >;
- protected callerTypes: Record;
- toError: (errorData: any, metadata?: JSONValue) => Error;
- public registerOnTimeoutCallback(callback: () => void) {
- this.onTimeoutCallback = callback;
- }
- // Method proxies
- public readonly streamKeepAliveTimeoutTime: number;
- public readonly methodsProxy = new Proxy(
- {},
- {
- get: (_, method) => {
- if (typeof method === 'symbol') return;
- switch (this.callerTypes[method]) {
- case 'UNARY':
- return (params, ctx) => this.unaryCaller(method, params, ctx);
- case 'SERVER':
- return (params, ctx) =>
- this.serverStreamCaller(method, params, ctx);
- case 'CLIENT':
- return (ctx) => this.clientStreamCaller(method, ctx);
- case 'DUPLEX':
- return (ctx) => this.duplexStreamCaller(method, ctx);
- case 'RAW':
- return (header, ctx) => this.rawStreamCaller(method, header, ctx);
- default:
- return;
- }
- },
- },
- );
-
- public constructor({
- manifest,
- streamFactory,
- middlewareFactory,
- streamKeepAliveTimeoutTime,
- logger,
- idGen = () => Promise.resolve(null),
- toError,
- }: {
- manifest: M;
- streamFactory: StreamFactory;
- middlewareFactory: MiddlewareFactory<
- Uint8Array,
- JSONRPCRequest,
- JSONRPCResponse,
- Uint8Array
- >;
- streamKeepAliveTimeoutTime: number;
- logger: Logger;
- idGen: IdGen;
- toError?: (errorData, metadata?: JSONValue) => ErrorRPCRemote;
- }) {
- this.idGen = idGen;
- this.callerTypes = rpcUtils.getHandlerTypes(manifest);
- this.streamFactory = streamFactory;
- this.middlewareFactory = middlewareFactory;
- this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
- this.logger = logger;
- this.toError = toError || rpcUtils.toError;
- }
-
- public async destroy({
- errorCode = rpcErrors.JSONRPCErrorCode.RPCStopping,
- errorMessage = '',
- force = true,
- }: {
- errorCode?: number;
- errorMessage?: string;
- force?: boolean;
- } = {}): Promise {
- this.logger.info(`Destroying ${this.constructor.name}`);
-
- // You can dispatch an event before the actual destruction starts
- this.dispatchEvent(new events.EventRPCClientDestroy());
-
- // Dispatch an event after the client has been destroyed
- this.dispatchEvent(new events.EventRPCClientDestroyed());
-
- this.logger.info(`Destroyed ${this.constructor.name}`);
- }
-
- @ready(new rpcErrors.ErrorRPCCallerFailed())
- public get methods(): MapCallers {
- return this.methodsProxy as MapCallers;
- }
-
- /**
- * Generic caller for unary RPC calls.
- * This returns the response in the provided type. No validation is done so
- * make sure the types match the handler types.
- * @param method - Method name of the RPC call
- * @param parameters - Parameters to be provided with the RPC message. Matches
- * the provided I type.
- * @param ctx - ContextTimed used for timeouts and cancellation.
- */
- @ready(new rpcErrors.ErrorMissingCaller())
- public async unaryCaller(
- method: string,
- parameters: I,
- ctx: Partial = {},
- ): Promise {
- const callerInterface = await this.duplexStreamCaller(method, ctx);
- const reader = callerInterface.readable.getReader();
- const writer = callerInterface.writable.getWriter();
- try {
- await writer.write(parameters);
- const output = await reader.read();
- if (output.done) {
- throw new rpcErrors.ErrorMissingCaller('Missing response', {
- cause: ctx.signal?.reason,
- });
- }
- await reader.cancel();
- await writer.close();
- return output.value;
- } finally {
- // Attempt clean up, ignore errors if already cleaned up
- await reader.cancel().catch(() => {});
- await writer.close().catch(() => {});
- }
- }
-
- /**
- * Generic caller for server streaming RPC calls.
- * This returns a ReadableStream of the provided type. When finished, the
- * readable needs to be cleaned up, otherwise cleanup happens mostly
- * automatically.
- * @param method - Method name of the RPC call
- * @param parameters - Parameters to be provided with the RPC message. Matches
- * the provided I type.
- * @param ctx - ContextTimed used for timeouts and cancellation.
- */
- @ready(new rpcErrors.ErrorRPCCallerFailed())
- public async serverStreamCaller(
- method: string,
- parameters: I,
- ctx: Partial = {},
- ): Promise> {
- const callerInterface = await this.duplexStreamCaller(method, ctx);
- const writer = callerInterface.writable.getWriter();
- try {
- await writer.write(parameters);
- await writer.close();
- } catch (e) {
- // Clean up if any problems, ignore errors if already closed
- await callerInterface.readable.cancel(e);
- throw e;
- }
- return callerInterface.readable;
- }
-
- /**
- * Generic caller for Client streaming RPC calls.
- * This returns a WritableStream for writing the input to and a Promise that
- * resolves when the output is received.
- * When finished the writable stream must be ended. Failing to do so will
- * hold the connection open and result in a resource leak until the
- * call times out.
- * @param method - Method name of the RPC call
- * @param ctx - ContextTimed used for timeouts and cancellation.
- */
- @ready(new rpcErrors.ErrorRPCCallerFailed())
- public async clientStreamCaller(
- method: string,
- ctx: Partial = {},
- ): Promise<{
- output: Promise;
- writable: WritableStream;
- }> {
- const callerInterface = await this.duplexStreamCaller(method, ctx);
- const reader = callerInterface.readable.getReader();
- const output = reader.read().then(({ value, done }) => {
- if (done) {
- throw new rpcErrors.ErrorMissingCaller('Missing response', {
- cause: ctx.signal?.reason,
- });
- }
- return value;
- });
- return {
- output,
- writable: callerInterface.writable,
- };
- }
-
- /**
- * Generic caller for duplex RPC calls.
- * This returns a `ReadableWritablePair` of the types specified. No validation
- * is applied to these types so make sure they match the types of the handler
- * you are calling.
- * When finished the streams must be ended manually. Failing to do so will
- * hold the connection open and result in a resource leak until the
- * call times out.
- * @param method - Method name of the RPC call
- * @param ctx - ContextTimed used for timeouts and cancellation.
- */
- @ready(new rpcErrors.ErrorRPCCallerFailed())
- public async duplexStreamCaller(
- method: string,
- ctx: Partial = {},
- ): Promise> {
- // Setting up abort signal and timer
- const abortController = new AbortController();
- const signal = abortController.signal;
- // A promise that will reject if there is an abort signal or timeout
- const abortRaceProm = promise();
- // Prevent unhandled rejection when we're done with the promise
- abortRaceProm.p.catch(() => {});
- const abortRacePromHandler = () => {
- abortRaceProm.rejectP(signal.reason);
- };
- signal.addEventListener('abort', abortRacePromHandler);
-
- let abortHandler: () => void;
- if (ctx.signal != null) {
- // Propagate signal events
- abortHandler = () => {
- abortController.abort(ctx.signal?.reason);
- };
- if (ctx.signal.aborted) abortHandler();
- ctx.signal.addEventListener('abort', abortHandler);
- }
- let timer: Timer;
- if (!(ctx.timer instanceof Timer)) {
- timer = new Timer({
- delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
- });
- } else {
- timer = ctx.timer;
- }
- const cleanUp = () => {
- // Clean up the timer and signal
- if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
- if (ctx.signal != null) {
- ctx.signal.removeEventListener('abort', abortHandler);
- }
- signal.addEventListener('abort', abortRacePromHandler);
- };
- // Setting up abort events for timeout
- const timeoutError = new rpcErrors.ErrorRPCTimedOut(
- 'Error RPC has timed out',
- { cause: ctx.signal?.reason },
- );
- void timer.then(
- () => {
- abortController.abort(timeoutError);
- if (this.onTimeoutCallback) {
- this.onTimeoutCallback();
- }
- },
- () => {}, // Ignore cancellation error
- );
-
- // Hooking up agnostic stream side
- let rpcStream: RPCStream;
- const streamFactoryProm = this.streamFactory({ signal, timer });
- try {
- rpcStream = await Promise.race([streamFactoryProm, abortRaceProm.p]);
- } catch (e) {
- cleanUp();
- void streamFactoryProm.then((stream) =>
- stream.cancel(ErrorRPCStreamEnded),
- );
- throw e;
- }
- void timer.then(
- () => {
- rpcStream.cancel(
- new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
- cause: ctx.signal?.reason,
- }),
- );
- },
- () => {}, // Ignore cancellation error
- );
- // Deciding if we want to allow refreshing
- // We want to refresh timer if none was provided
- const refreshingTimer: Timer | undefined =
- ctx.timer == null ? timer : undefined;
- // Composing stream transforms and middleware
- const metadata = {
- ...(rpcStream.meta ?? {}),
- command: method,
- };
- const outputMessageTransformStream =
- rpcUtils.clientOutputTransformStream(metadata, refreshingTimer);
- const inputMessageTransformStream = rpcUtils.clientInputTransformStream(
- method,
- refreshingTimer,
- );
- const middleware = this.middlewareFactory(
- { signal, timer },
- rpcStream.cancel,
- metadata,
- );
- // This `Promise.allSettled` is used to asynchronously track the state
- // of the streams. When both have finished we can clean up resources.
- void Promise.allSettled([
- rpcStream.readable
- .pipeThrough(middleware.reverse)
- .pipeTo(outputMessageTransformStream.writable)
- // Ignore any errors, we only care about stream ending
- .catch(() => {}),
- inputMessageTransformStream.readable
- .pipeThrough(middleware.forward)
- .pipeTo(rpcStream.writable)
- // Ignore any errors, we only care about stream ending
- .catch(() => {}),
- ]).finally(() => {
- cleanUp();
- });
-
- // Returning interface
- return {
- readable: outputMessageTransformStream.readable,
- writable: inputMessageTransformStream.writable,
- cancel: rpcStream.cancel,
- meta: metadata,
- };
- }
-
- /**
- * Generic caller for raw RPC calls.
- * This returns a `ReadableWritablePair` of the raw RPC stream.
- * When finished the streams must be ended manually. Failing to do so will
- * hold the connection open and result in a resource leak until the
- * call times out.
- * Raw streams don't support the keep alive timeout. Timeout will only apply\
- * to the creation of the stream.
- * @param method - Method name of the RPC call
- * @param headerParams - Parameters for the header message. The header is a
- * single RPC message that is sent to specify the method for the RPC call.
- * Any metadata of extra parameters is provided here.
- * @param ctx - ContextTimed used for timeouts and cancellation.
- * @param id - Id is generated only once, and used throughout the stream for the rest of the communication
- */
- @ready(new rpcErrors.ErrorRPCCallerFailed())
- public async rawStreamCaller(
- method: string,
- headerParams: JSONValue,
- ctx: Partial = {},
- ): Promise<
- RPCStream<
- Uint8Array,
- Uint8Array,
- Record & { result: JSONValue; command: string }
- >
- > {
- // Setting up abort signal and timer
- const abortController = new AbortController();
- const signal = abortController.signal;
- // A promise that will reject if there is an abort signal or timeout
- const abortRaceProm = promise();
- // Prevent unhandled rejection when we're done with the promise
- abortRaceProm.p.catch(() => {});
- const abortRacePromHandler = () => {
- abortRaceProm.rejectP(signal.reason);
- };
- signal.addEventListener('abort', abortRacePromHandler);
-
- let abortHandler: () => void;
- if (ctx.signal != null) {
- // Propagate signal events
- abortHandler = () => {
- abortController.abort(ctx.signal?.reason);
- };
- if (ctx.signal.aborted) abortHandler();
- ctx.signal.addEventListener('abort', abortHandler);
- }
- let timer: Timer;
- if (!(ctx.timer instanceof Timer)) {
- timer = new Timer({
- delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
- });
- } else {
- timer = ctx.timer;
- }
- const cleanUp = () => {
- // Clean up the timer and signal
- if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
- if (ctx.signal != null) {
- ctx.signal.removeEventListener('abort', abortHandler);
- }
- signal.addEventListener('abort', abortRacePromHandler);
- };
- // Setting up abort events for timeout
- const timeoutError = new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
- cause: ctx.signal?.reason,
- });
- void timer.then(
- () => {
- abortController.abort(timeoutError);
- },
- () => {}, // Ignore cancellation error
- );
-
- const setupStream = async (): Promise<
- [JSONValue, RPCStream]
- > => {
- if (signal.aborted) throw signal.reason;
- const abortProm = promise();
- // Ignore error if orphaned
- void abortProm.p.catch(() => {});
- signal.addEventListener(
- 'abort',
- () => {
- abortProm.rejectP(signal.reason);
- },
- { once: true },
- );
- const rpcStream = await Promise.race([
- this.streamFactory({ signal, timer }),
- abortProm.p,
- ]);
- const tempWriter = rpcStream.writable.getWriter();
- const id = await this.idGen();
- const header: JSONRPCRequestMessage = {
- jsonrpc: '2.0',
- method,
- params: headerParams,
- id,
- };
- await tempWriter.write(Buffer.from(JSON.stringify(header)));
- tempWriter.releaseLock();
- const headTransformStream = rpcUtils.parseHeadStream(
- rpcUtils.parseJSONRPCResponse,
- );
- void rpcStream.readable
- // Allow us to re-use the readable after reading the first message
- .pipeTo(headTransformStream.writable)
- // Ignore any errors here, we only care that it ended
- .catch(() => {});
- const tempReader = headTransformStream.readable.getReader();
- let leadingMessage: JSONRPCResponseResult;
- try {
- const message = await Promise.race([tempReader.read(), abortProm.p]);
- const messageValue = message.value as JSONRPCResponse;
- if (message.done) never();
- if ('error' in messageValue) {
- const metadata = {
- ...(rpcStream.meta ?? {}),
- command: method,
- };
- throw this.toError(messageValue.error.data, metadata);
- }
- leadingMessage = messageValue;
- } catch (e) {
- rpcStream.cancel(
- new ErrorRPCStreamEnded('RPC Stream Ended', { cause: e }),
- );
- throw e;
- }
- tempReader.releaseLock();
- const newRpcStream: RPCStream = {
- writable: rpcStream.writable,
- readable: headTransformStream.readable as ReadableStream,
- cancel: rpcStream.cancel,
- meta: rpcStream.meta,
- };
- return [leadingMessage.result, newRpcStream];
- };
- let streamCreation: [JSONValue, RPCStream];
- try {
- streamCreation = await setupStream();
- } finally {
- cleanUp();
- }
- const [result, rpcStream] = streamCreation;
- const metadata = {
- ...(rpcStream.meta ?? {}),
- result,
- command: method,
- };
- return {
- writable: rpcStream.writable,
- readable: rpcStream.readable,
- cancel: rpcStream.cancel,
- meta: metadata,
- };
- }
-}
-
-export default RPCClient;
diff --git a/src/RPCServer.ts b/src/RPCServer.ts
deleted file mode 100644
index 71d8966..0000000
--- a/src/RPCServer.ts
+++ /dev/null
@@ -1,662 +0,0 @@
-import type { ReadableStreamDefaultReadResult } from 'stream/web';
-import type {
- ClientHandlerImplementation,
- DuplexHandlerImplementation,
- JSONRPCError,
- JSONRPCRequest,
- JSONRPCResponse,
- JSONRPCResponseError,
- JSONRPCResponseResult,
- ServerManifest,
- RawHandlerImplementation,
- ServerHandlerImplementation,
- UnaryHandlerImplementation,
- RPCStream,
- MiddlewareFactory,
-} from './types';
-import type { JSONValue } from './types';
-import type { IdGen } from './types';
-import { ReadableStream, TransformStream } from 'stream/web';
-import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
-import Logger from '@matrixai/logger';
-import { PromiseCancellable } from '@matrixai/async-cancellable';
-import { Timer } from '@matrixai/timer';
-import { createDestroy } from '@matrixai/async-init';
-import { RawHandler } from './handlers';
-import { DuplexHandler } from './handlers';
-import { ServerHandler } from './handlers';
-import { UnaryHandler } from './handlers';
-import { ClientHandler } from './handlers';
-import * as rpcEvents from './events';
-import * as rpcUtils from './utils';
-import * as rpcErrors from './errors';
-import * as rpcUtilsMiddleware from './utils';
-import { ErrorHandlerAborted, JSONRPCErrorCode, never } from './errors';
-import * as events from './events';
-
-const cleanupReason = Symbol('CleanupReason');
-
-/**
- * You must provide a error handler `addEventListener('error')`.
- * Otherwise errors will just be ignored.
- *
- * Events:
- * - error
- */
-interface RPCServer extends createDestroy.CreateDestroy {}
-/**
- * You must provide an error handler `addEventListener('error')`.
- * Otherwise, errors will just be ignored.
- *
- * Events:
- * - {@link events.EventRPCServerDestroy}
- * - {@link events.EventRPCServerDestroyed}
- */
-@createDestroy.CreateDestroy({
- eventDestroy: events.EventRPCServerDestroy,
- eventDestroyed: events.EventRPCServerDestroyed,
-})
-class RPCServer extends EventTarget {
- /**
- * Creates RPC server.
-
- * @param obj
- * @param obj.manifest - Server manifest used to define the rpc method
- * handlers.
- * @param obj.middlewareFactory - Middleware used to process the rpc messages.
- * The middlewareFactory needs to be a function that creates a pair of
- * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward
- * path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
- * @param obj.sensitive - If true, sanitises any rpc error messages of any
- * sensitive information.
- * @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
- * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a
- * signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000
- * milliseconds.
- * @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time.
- * The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for
- * the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds.
- * @param obj.logger
- */
- public static async createRPCServer({
- manifest,
- middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(),
- sensitive = false,
- handlerTimeoutTime = Infinity, // 1 minute
- logger = new Logger(this.name),
- idGen = () => Promise.resolve(null),
- fromError = rpcUtils.fromError,
- replacer = rpcUtils.replacer,
- }: {
- manifest: ServerManifest;
- middlewareFactory?: MiddlewareFactory<
- JSONRPCRequest,
- Uint8Array,
- Uint8Array,
- JSONRPCResponse
- >;
- sensitive?: boolean;
- handlerTimeoutTime?: number;
- logger?: Logger;
- idGen: IdGen;
- fromError?: (error: Error) => JSONValue;
- replacer?: (key: string, value: any) => any;
- }): Promise {
- logger.info(`Creating ${this.name}`);
- const rpcServer = new this({
- manifest,
- middlewareFactory,
- sensitive,
- handlerTimeoutTime,
- logger,
- idGen,
- fromError,
- replacer,
- });
- logger.info(`Created ${this.name}`);
- return rpcServer;
- }
- protected onTimeoutCallback?: () => void;
- protected idGen: IdGen;
- protected logger: Logger;
- protected handlerMap: Map = new Map();
- protected defaultTimeoutMap: Map = new Map();
- protected handlerTimeoutTime: number;
- protected activeStreams: Set> = new Set();
- protected sensitive: boolean;
- protected fromError: (error: Error, sensitive?: boolean) => JSONValue;
- protected replacer: (key: string, value: any) => any;
- protected middlewareFactory: MiddlewareFactory<
- JSONRPCRequest,
- Uint8Array,
- Uint8Array,
- JSONRPCResponseResult
- >;
- // Function to register a callback for timeout
- public registerOnTimeoutCallback(callback: () => void) {
- this.onTimeoutCallback = callback;
- }
- public constructor({
- manifest,
- middlewareFactory,
- sensitive,
- handlerTimeoutTime = Infinity, // 1 minuet
- logger,
- idGen = () => Promise.resolve(null),
- fromError = rpcUtils.fromError,
- replacer = rpcUtils.replacer,
- }: {
- manifest: ServerManifest;
-
- middlewareFactory: MiddlewareFactory<
- JSONRPCRequest,
- Uint8Array,
- Uint8Array,
- JSONRPCResponseResult
- >;
- handlerTimeoutTime?: number;
- sensitive: boolean;
- logger: Logger;
- idGen: IdGen;
- fromError?: (error: Error) => JSONValue;
- replacer?: (key: string, value: any) => any;
- }) {
- super();
- for (const [key, manifestItem] of Object.entries(manifest)) {
- if (manifestItem instanceof RawHandler) {
- this.registerRawStreamHandler(
- key,
- manifestItem.handle,
- manifestItem.timeout,
- );
- continue;
- }
- if (manifestItem instanceof DuplexHandler) {
- this.registerDuplexStreamHandler(
- key,
- manifestItem.handle,
- manifestItem.timeout,
- );
- continue;
- }
- if (manifestItem instanceof ServerHandler) {
- this.registerServerStreamHandler(
- key,
- manifestItem.handle,
- manifestItem.timeout,
- );
- continue;
- }
- if (manifestItem instanceof ClientHandler) {
- this.registerClientStreamHandler(
- key,
- manifestItem.handle,
- manifestItem.timeout,
- );
- continue;
- }
- if (manifestItem instanceof ClientHandler) {
- this.registerClientStreamHandler(
- key,
- manifestItem.handle,
- manifestItem.timeout,
- );
- continue;
- }
- if (manifestItem instanceof UnaryHandler) {
- this.registerUnaryHandler(
- key,
- manifestItem.handle,
- manifestItem.timeout,
- );
- continue;
- }
- never();
- }
- this.idGen = idGen;
- this.middlewareFactory = middlewareFactory;
- this.sensitive = sensitive;
- this.handlerTimeoutTime = handlerTimeoutTime;
- this.logger = logger;
- this.fromError = fromError || rpcUtils.fromError;
- this.replacer = replacer || rpcUtils.replacer;
- }
-
- public async destroy(force: boolean = true): Promise {
- // Log and dispatch an event before starting the destruction
- this.logger.info(`Destroying ${this.constructor.name}`);
- this.dispatchEvent(new events.EventRPCServerDestroy());
-
- // Your existing logic for stopping active streams and other cleanup
- if (force) {
- for await (const [activeStream] of this.activeStreams.entries()) {
- activeStream.cancel(new rpcErrors.ErrorRPCStopping());
- }
- }
-
- for await (const [activeStream] of this.activeStreams.entries()) {
- await activeStream;
- }
-
- // Log and dispatch an event after the destruction has been completed
- this.dispatchEvent(new events.EventRPCServerDestroyed());
- this.logger.info(`Destroyed ${this.constructor.name}`);
- }
-
- /**
- * Registers a raw stream handler. This is the basis for all handlers as
- * handling the streams is done with raw streams only.
- * The raw streams do not automatically refresh the timeout timer when
- * messages are sent or received.
- */
- protected registerRawStreamHandler(
- method: string,
- handler: RawHandlerImplementation,
- timeout: number | undefined,
- ) {
- this.handlerMap.set(method, handler);
- this.defaultTimeoutMap.set(method, timeout);
- }
-
- /**
- * Registers a duplex stream handler.
- * This handles all message parsing and conversion from generators
- * to raw streams.
- *
- * @param method - The rpc method name.
- * @param handler - The handler takes an input async iterable and returns an output async iterable.
- * @param timeout
- */
- /**
- * The ID is generated only once when the function is called and stored in the id variable.
- * the ID is associated with the entire stream
- * Every response (whether successful or an error) produced within this stream will have the
- * same ID, which is consistent with the originating request.
- */
- protected registerDuplexStreamHandler<
- I extends JSONValue,
- O extends JSONValue,
- >(
- method: string,
- handler: DuplexHandlerImplementation,
- timeout: number | undefined,
- ): void {
- const rawSteamHandler: RawHandlerImplementation = async (
- [header, input],
- cancel,
- meta,
- ctx,
- ) => {
- // Setting up abort controller
- const abortController = new AbortController();
- if (ctx.signal.aborted) abortController.abort(ctx.signal.reason);
- ctx.signal.addEventListener('abort', () => {
- abortController.abort(ctx.signal.reason);
- });
- const signal = abortController.signal;
- // Setting up middleware
- const middleware = this.middlewareFactory(ctx, cancel, meta);
- // Forward from the client to the server
- // Transparent TransformStream that re-inserts the header message into the
- // stream.
- const headerStream = new TransformStream({
- start(controller) {
- controller.enqueue(Buffer.from(JSON.stringify(header)));
- },
- transform(chunk, controller) {
- controller.enqueue(chunk);
- },
- });
- const forwardStream = input
- .pipeThrough(headerStream)
- .pipeThrough(middleware.forward);
- // Reverse from the server to the client
- const reverseStream = middleware.reverse.writable;
- // Generator derived from handler
- const id = await this.idGen();
- const outputGen = async function* (): AsyncGenerator {
- if (signal.aborted) throw signal.reason;
- // Input generator derived from the forward stream
- const inputGen = async function* (): AsyncIterable {
- for await (const data of forwardStream) {
- ctx.timer.refresh();
- yield data.params as I;
- }
- };
- const handlerG = handler(inputGen(), cancel, meta, {
- signal,
- timer: ctx.timer,
- });
- for await (const response of handlerG) {
- ctx.timer.refresh();
- const responseMessage: JSONRPCResponseResult = {
- jsonrpc: '2.0',
- result: response,
- id,
- };
- yield responseMessage;
- }
- };
- const outputGenerator = outputGen();
- const reverseMiddlewareStream = new ReadableStream({
- pull: async (controller) => {
- try {
- const { value, done } = await outputGenerator.next();
- if (done) {
- controller.close();
- return;
- }
- controller.enqueue(value);
- } catch (e) {
- const rpcError: JSONRPCError = {
- code: e.exitCode ?? JSONRPCErrorCode.InternalError,
- message: e.description ?? '',
- data: JSON.stringify(this.fromError(e), this.replacer),
- };
- const rpcErrorMessage: JSONRPCResponseError = {
- jsonrpc: '2.0',
- error: rpcError,
- id,
- };
- controller.enqueue(rpcErrorMessage);
- // Clean up the input stream here, ignore error if already ended
- await forwardStream
- .cancel(
- new rpcErrors.ErrorRPCHandlerFailed('Error clean up', {
- cause: e,
- }),
- )
- .catch(() => {});
- controller.close();
- }
- },
- cancel: async (reason) => {
- this.dispatchEvent(
- new rpcEvents.RPCErrorEvent({
- detail: new rpcErrors.ErrorRPCStreamEnded(
- 'Stream has been cancelled',
- {
- cause: reason,
- },
- ),
- }),
- );
- // Abort with the reason
- abortController.abort(reason);
- // If the output stream path fails then we need to end the generator
- // early.
- await outputGenerator.return(undefined);
- },
- });
- // Ignore any errors here, it should propagate to the ends of the stream
- void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});
- return [undefined, middleware.reverse.readable];
- };
- this.registerRawStreamHandler(method, rawSteamHandler, timeout);
- }
-
- protected registerUnaryHandler(
- method: string,
- handler: UnaryHandlerImplementation,
- timeout: number | undefined,
- ) {
- const wrapperDuplex: DuplexHandlerImplementation = async function* (
- input,
- cancel,
- meta,
- ctx,
- ) {
- // The `input` is expected to be an async iterable with only 1 value.
- // Unlike generators, there is no `next()` method.
- // So we use `break` after the first iteration.
- for await (const inputVal of input) {
- yield await handler(inputVal, cancel, meta, ctx);
- break;
- }
- };
- this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
- }
-
- protected registerServerStreamHandler<
- I extends JSONValue,
- O extends JSONValue,
- >(
- method: string,
- handler: ServerHandlerImplementation,
- timeout: number | undefined,
- ) {
- const wrapperDuplex: DuplexHandlerImplementation = async function* (
- input,
- cancel,
- meta,
- ctx,
- ) {
- for await (const inputVal of input) {
- yield* handler(inputVal, cancel, meta, ctx);
- break;
- }
- };
- this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
- }
-
- protected registerClientStreamHandler<
- I extends JSONValue,
- O extends JSONValue,
- >(
- method: string,
- handler: ClientHandlerImplementation,
- timeout: number | undefined,
- ) {
- const wrapperDuplex: DuplexHandlerImplementation = async function* (
- input,
- cancel,
- meta,
- ctx,
- ) {
- yield await handler(input, cancel, meta, ctx);
- };
- this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
- }
-
- /**
- * ID is associated with the stream, not individual messages.
- */
- @ready(new rpcErrors.ErrorRPCHandlerFailed())
- public handleStream(rpcStream: RPCStream) {
- // This will take a buffer stream of json messages and set up service
- // handling for it.
- // Constructing the PromiseCancellable for tracking the active stream
- const abortController = new AbortController();
- // Setting up timeout timer logic
- const timer = new Timer({
- delay: this.handlerTimeoutTime,
- handler: () => {
- abortController.abort(new rpcErrors.ErrorRPCTimedOut());
- if (this.onTimeoutCallback) {
- this.onTimeoutCallback();
- }
- },
- });
-
- const prom = (async () => {
- const id = await this.idGen();
- const headTransformStream = rpcUtilsMiddleware.binaryToJsonMessageStream(
- rpcUtils.parseJSONRPCRequest,
- );
- // Transparent transform used as a point to cancel the input stream from
- const passthroughTransform = new TransformStream<
- Uint8Array,
- Uint8Array
- >();
- const inputStream = passthroughTransform.readable;
- const inputStreamEndProm = rpcStream.readable
- .pipeTo(passthroughTransform.writable)
- // Ignore any errors here, we only care that it ended
- .catch(() => {});
- void inputStream
- // Allow us to re-use the readable after reading the first message
- .pipeTo(headTransformStream.writable, {
- preventClose: true,
- preventCancel: true,
- })
- // Ignore any errors here, we only care that it ended
- .catch(() => {});
- const cleanUp = async (reason: any) => {
- await inputStream.cancel(reason);
- await rpcStream.writable.abort(reason);
- await inputStreamEndProm;
- timer.cancel(cleanupReason);
- await timer.catch(() => {});
- };
- // Read a single empty value to consume the first message
- const reader = headTransformStream.readable.getReader();
- // Allows timing out when waiting for the first message
- let headerMessage:
- | ReadableStreamDefaultReadResult
- | undefined
- | void;
- try {
- headerMessage = await Promise.race([
- reader.read(),
- timer.then(
- () => undefined,
- () => {},
- ),
- ]);
- } catch (e) {
- const newErr = new rpcErrors.ErrorRPCHandlerFailed(
- 'Stream failed waiting for header',
- { cause: e },
- );
- await inputStreamEndProm;
- timer.cancel(cleanupReason);
- await timer.catch(() => {});
- this.dispatchEvent(
- new rpcEvents.RPCErrorEvent({
- detail: new rpcErrors.ErrorRPCOutputStreamError(
- 'Stream failed waiting for header',
- {
- cause: newErr,
- },
- ),
- }),
- );
- return;
- }
- // Downgrade back to the raw stream
- await reader.cancel();
- // There are 2 conditions where we just end here
- // 1. The timeout timer resolves before the first message
- // 2. the stream ends before the first message
- if (headerMessage == null) {
- const newErr = new rpcErrors.ErrorRPCTimedOut(
- 'Timed out waiting for header',
- { cause: new rpcErrors.ErrorRPCStreamEnded() },
- );
- await cleanUp(newErr);
- this.dispatchEvent(
- new rpcEvents.RPCErrorEvent({
- detail: new rpcErrors.ErrorRPCTimedOut(
- 'Timed out waiting for header',
- {
- cause: newErr,
- },
- ),
- }),
- );
- return;
- }
- if (headerMessage.done) {
- const newErr = new rpcErrors.ErrorMissingHeader('Missing header');
- await cleanUp(newErr);
- this.dispatchEvent(
- new rpcEvents.RPCErrorEvent({
- detail: new rpcErrors.ErrorRPCOutputStreamError('Missing header', {
- cause: newErr,
- }),
- }),
- );
- return;
- }
- const method = headerMessage.value.method;
- const handler = this.handlerMap.get(method);
- if (handler == null) {
- await cleanUp(new rpcErrors.ErrorRPCHandlerFailed('Missing handler'));
- return;
- }
- if (abortController.signal.aborted) {
- await cleanUp(
- new rpcErrors.ErrorHandlerAborted('Aborted', {
- cause: new ErrorHandlerAborted(),
- }),
- );
- return;
- }
- // Setting up Timeout logic
- const timeout = this.defaultTimeoutMap.get(method);
- if (timeout != null && timeout < this.handlerTimeoutTime) {
- // Reset timeout with new delay if it is less than the default
- timer.reset(timeout);
- } else {
- // Otherwise refresh
- timer.refresh();
- }
- this.logger.info(`Handling stream with method (${method})`);
- let handlerResult: [JSONValue | undefined, ReadableStream];
- const headerWriter = rpcStream.writable.getWriter();
- try {
- handlerResult = await handler(
- [headerMessage.value, inputStream],
- rpcStream.cancel,
- rpcStream.meta,
- { signal: abortController.signal, timer },
- );
- } catch (e) {
- const rpcError: JSONRPCError = {
- code: e.exitCode ?? JSONRPCErrorCode.InternalError,
- message: e.description ?? '',
- data: JSON.stringify(this.fromError(e), this.replacer),
- };
- const rpcErrorMessage: JSONRPCResponseError = {
- jsonrpc: '2.0',
- error: rpcError,
- id,
- };
- await headerWriter.write(Buffer.from(JSON.stringify(rpcErrorMessage)));
- await headerWriter.close();
- // Clean up and return
- timer.cancel(cleanupReason);
- rpcStream.cancel(Error('TMP header message was an error'));
- return;
- }
- const [leadingResult, outputStream] = handlerResult;
-
- if (leadingResult !== undefined) {
- // Writing leading metadata
- const leadingMessage: JSONRPCResponseResult = {
- jsonrpc: '2.0',
- result: leadingResult,
- id,
- };
- await headerWriter.write(Buffer.from(JSON.stringify(leadingMessage)));
- }
- headerWriter.releaseLock();
- const outputStreamEndProm = outputStream
- .pipeTo(rpcStream.writable)
- .catch(() => {}); // Ignore any errors, we only care that it finished
- await Promise.allSettled([inputStreamEndProm, outputStreamEndProm]);
- this.logger.info(`Handled stream with method (${method})`);
- // Cleaning up abort and timer
- timer.cancel(cleanupReason);
- abortController.abort(new rpcErrors.ErrorRPCStreamEnded());
- })();
- const handlerProm = PromiseCancellable.from(prom, abortController).finally(
- () => this.activeStreams.delete(handlerProm),
- abortController,
- );
- // Putting the PromiseCancellable into the active streams map
- this.activeStreams.add(handlerProm);
- }
-}
-
-export default RPCServer;
diff --git a/src/callers/Caller.ts b/src/callers/Caller.ts
deleted file mode 100644
index ddc54a8..0000000
--- a/src/callers/Caller.ts
+++ /dev/null
@@ -1,13 +0,0 @@
-import type { HandlerType, JSONValue } from '../types';
-
-abstract class Caller<
- Input extends JSONValue = JSONValue,
- Output extends JSONValue = JSONValue,
-> {
- protected _inputType: Input;
- protected _outputType: Output;
- // Need this to distinguish the classes when inferring types
- abstract type: HandlerType;
-}
-
-export default Caller;
diff --git a/src/callers/ClientCaller.ts b/src/callers/ClientCaller.ts
deleted file mode 100644
index 7fb44da..0000000
--- a/src/callers/ClientCaller.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-import type { JSONValue } from '../types';
-import Caller from './Caller';
-
-class ClientCaller<
- Input extends JSONValue = JSONValue,
- Output extends JSONValue = JSONValue,
-> extends Caller {
- public type: 'CLIENT' = 'CLIENT' as const;
-}
-
-export default ClientCaller;
diff --git a/src/callers/DuplexCaller.ts b/src/callers/DuplexCaller.ts
deleted file mode 100644
index 4c079b3..0000000
--- a/src/callers/DuplexCaller.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-import type { JSONValue } from '../types';
-import Caller from './Caller';
-
-class DuplexCaller<
- Input extends JSONValue = JSONValue,
- Output extends JSONValue = JSONValue,
-> extends Caller {
- public type: 'DUPLEX' = 'DUPLEX' as const;
-}
-
-export default DuplexCaller;
diff --git a/src/callers/RawCaller.ts b/src/callers/RawCaller.ts
deleted file mode 100644
index a4721cf..0000000
--- a/src/callers/RawCaller.ts
+++ /dev/null
@@ -1,7 +0,0 @@
-import type { JSONValue } from '../types';
-import Caller from './Caller';
-class RawCaller extends Caller {
- public type: 'RAW' = 'RAW' as const;
-}
-
-export default RawCaller;
diff --git a/src/callers/ServerCaller.ts b/src/callers/ServerCaller.ts
deleted file mode 100644
index 11a9fe9..0000000
--- a/src/callers/ServerCaller.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-import type { JSONValue } from '../types';
-import Caller from './Caller';
-
-class ServerCaller<
- Input extends JSONValue = JSONValue,
- Output extends JSONValue = JSONValue,
-> extends Caller {
- public type: 'SERVER' = 'SERVER' as const;
-}
-
-export default ServerCaller;
diff --git a/src/callers/UnaryCaller.ts b/src/callers/UnaryCaller.ts
deleted file mode 100644
index c446073..0000000
--- a/src/callers/UnaryCaller.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-import type { JSONValue } from '../types';
-import Caller from './Caller';
-
-class UnaryCaller<
- Input extends JSONValue = JSONValue,
- Output extends JSONValue = JSONValue,
-> extends Caller {
- public type: 'UNARY' = 'UNARY' as const;
-}
-
-export default UnaryCaller;
diff --git a/src/callers/index.ts b/src/callers/index.ts
deleted file mode 100644
index 17e8c87..0000000
--- a/src/callers/index.ts
+++ /dev/null
@@ -1,6 +0,0 @@
-export { default as Caller } from './Caller';
-export { default as ClientCaller } from './ClientCaller';
-export { default as DuplexCaller } from './DuplexCaller';
-export { default as RawCaller } from './RawCaller';
-export { default as ServerCaller } from './ServerCaller';
-export { default as UnaryCaller } from './UnaryCaller';
diff --git a/src/errors/errors.ts b/src/errors/errors.ts
deleted file mode 100644
index 2acc942..0000000
--- a/src/errors/errors.ts
+++ /dev/null
@@ -1,266 +0,0 @@
-import type { Class } from '@matrixai/errors';
-import type { JSONValue } from '@/types';
-import { AbstractError } from '@matrixai/errors';
-
-const enum JSONRPCErrorCode {
- ParseError = -32700,
- InvalidRequest = -32600,
- MethodNotFound = -32601,
- InvalidParams = -32602,
- InternalError = -32603,
- HandlerNotFound = -32000,
- RPCStopping = -32001,
- RPCDestroyed = -32002,
- RPCMessageLength = -32003,
- RPCMissingResponse = -32004,
- RPCOutputStreamError = -32005,
- RPCRemote = -32006,
- RPCStreamEnded = -32007,
- RPCTimedOut = -32008,
- RPCConnectionLocal = -32010,
- RPCConnectionPeer = -32011,
- RPCConnectionKeepAliveTimeOut = -32012,
- RPCConnectionInternal = -32013,
- MissingHeader = -32014,
- HandlerAborted = -32015,
- MissingCaller = -32016,
-}
-interface RPCError extends Error {
- code?: number;
-}
-class ErrorRPC extends AbstractError implements RPCError {
- private _description: string = 'Generic Error';
- constructor(message?: string) {
- super(message);
- }
- code?: number;
-
- get description(): string {
- return this._description;
- }
-
- set description(value: string) {
- this._description = value;
- }
-}
-
-class ErrorRPCDestroyed extends ErrorRPC {
- constructor(message?: string) {
- super(message); // Call the parent constructor
- this.description = 'Rpc is destroyed'; // Set the specific description
- this.code = JSONRPCErrorCode.MethodNotFound;
- }
-}
-
-class ErrorRPCParse extends ErrorRPC {
- static description = 'Failed to parse Buffer stream';
-
- constructor(message?: string, options?: { cause: Error }) {
- super(message); // Call the parent constructor
- this.description = 'Failed to parse Buffer stream'; // Set the specific description
- this.code = JSONRPCErrorCode.ParseError;
- }
-}
-
-class ErrorRPCStopping extends ErrorRPC {
- constructor(message?: string) {
- super(message); // Call the parent constructor
- this.description = 'Rpc is stopping'; // Set the specific description
- this.code = JSONRPCErrorCode.RPCStopping;
- }
-}
-
-/**
- * This is an internal error, it should not reach the top level.
- */
-class ErrorRPCHandlerFailed extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message); // Call the parent constructor
- this.description = 'Failed to handle stream'; // Set the specific description
- this.code = JSONRPCErrorCode.HandlerNotFound;
- }
-}
-class ErrorRPCCallerFailed extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message); // Call the parent constructor
- this.description = 'Failed to call stream'; // Set the specific description
- this.code = JSONRPCErrorCode.MissingCaller;
- }
-}
-class ErrorMissingCaller extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message); // Call the parent constructor
- this.description = 'Header information is missing'; // Set the specific description
- this.code = JSONRPCErrorCode.MissingCaller;
- }
-}
-class ErrorMissingHeader extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message); // Call the parent constructor
- this.description = 'Header information is missing'; // Set the specific description
- this.code = JSONRPCErrorCode.MissingHeader;
- }
-}
-
-class ErrorHandlerAborted extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message); // Call the parent constructor
- this.description = 'Handler Aborted Stream.'; // Set the specific description
- this.code = JSONRPCErrorCode.HandlerAborted;
- }
-}
-class ErrorRPCMessageLength extends ErrorRPC {
- static description = 'RPC Message exceeds maximum size';
- code? = JSONRPCErrorCode.RPCMessageLength;
-}
-
-class ErrorRPCMissingResponse extends ErrorRPC {
- constructor(message?: string) {
- super(message);
- this.description = 'Stream ended before response';
- this.code = JSONRPCErrorCode.RPCMissingResponse;
- }
-}
-
-interface ErrorRPCOutputStreamErrorOptions {
- cause?: Error;
-}
-class ErrorRPCOutputStreamError extends ErrorRPC {
- constructor(message: string, options: ErrorRPCOutputStreamErrorOptions) {
- super(message);
- this.description = 'Output stream failed, unable to send data';
- this.code = JSONRPCErrorCode.RPCOutputStreamError;
- }
-}
-
-class ErrorRPCRemote extends ErrorRPC {
- static description = 'Remote error from RPC call';
- static message: string = 'The server responded with an error';
- metadata: JSONValue | undefined;
-
- constructor(metadata?: JSONValue, message?: string, options?) {
- super(message);
- this.metadata = metadata;
- this.code = JSONRPCErrorCode.RPCRemote;
- this.data = options?.data;
- }
-
- public static fromJSON>(
- this: T,
- json: any,
- ): InstanceType {
- if (
- typeof json !== 'object' ||
- json.type !== this.name ||
- typeof json.data !== 'object' ||
- typeof json.data.message !== 'string' ||
- isNaN(Date.parse(json.data.timestamp)) ||
- typeof json.data.metadata !== 'object' ||
- typeof json.data.data !== 'object' ||
- ('stack' in json.data && typeof json.data.stack !== 'string')
- ) {
- throw new TypeError(`Cannot decode JSON to ${this.name}`);
- }
-
- // Here, you can define your own metadata object, or just use the one from JSON directly.
- const parsedMetadata = json.data.metadata;
-
- const e = new this(parsedMetadata, json.data.message, {
- timestamp: new Date(json.data.timestamp),
- data: json.data.data,
- cause: json.data.cause,
- });
- e.stack = json.data.stack;
- return e;
- }
- public toJSON(): any {
- return {
- type: this.name,
- data: {
- description: this.description,
- },
- };
- }
-}
-
-class ErrorRPCStreamEnded extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message);
- this.description = 'Handled stream has ended';
- this.code = JSONRPCErrorCode.RPCStreamEnded;
- }
-}
-
-class ErrorRPCTimedOut extends ErrorRPC {
- constructor(message?: string, options?: { cause: Error }) {
- super(message);
- this.description = 'RPC handler has timed out';
- this.code = JSONRPCErrorCode.RPCTimedOut;
- }
-}
-
-class ErrorUtilsUndefinedBehaviour extends ErrorRPC {
- constructor(message?: string) {
- super(message);
- this.description = 'You should never see this error';
- this.code = JSONRPCErrorCode.MethodNotFound;
- }
-}
-export function never(): never {
- throw new ErrorRPC('This function should never be called');
-}
-
-class ErrorRPCMethodNotImplemented extends ErrorRPC {
- constructor(message?: string) {
- super(message || 'This method must be overridden'); // Default message if none provided
- this.name = 'ErrorRPCMethodNotImplemented';
- this.description =
- 'This abstract method must be implemented in a derived class';
- this.code = JSONRPCErrorCode.MethodNotFound;
- }
-}
-
-class ErrorRPCConnectionLocal extends ErrorRPC {
- static description = 'RPC Connection local error';
- code? = JSONRPCErrorCode.RPCConnectionLocal;
-}
-
-class ErrorRPCConnectionPeer extends ErrorRPC {
- static description = 'RPC Connection peer error';
- code? = JSONRPCErrorCode.RPCConnectionPeer;
-}
-
-class ErrorRPCConnectionKeepAliveTimeOut extends ErrorRPC {
- static description = 'RPC Connection keep alive timeout';
- code? = JSONRPCErrorCode.RPCConnectionKeepAliveTimeOut;
-}
-
-class ErrorRPCConnectionInternal extends ErrorRPC {
- static description = 'RPC Connection internal error';
- code? = JSONRPCErrorCode.RPCConnectionInternal;
-}
-
-export {
- ErrorRPC,
- ErrorRPCDestroyed,
- ErrorRPCStopping,
- ErrorRPCParse,
- ErrorRPCHandlerFailed,
- ErrorRPCMessageLength,
- ErrorRPCMissingResponse,
- ErrorRPCOutputStreamError,
- ErrorRPCRemote,
- ErrorRPCStreamEnded,
- ErrorRPCTimedOut,
- ErrorUtilsUndefinedBehaviour,
- ErrorRPCMethodNotImplemented,
- ErrorRPCConnectionLocal,
- ErrorRPCConnectionPeer,
- ErrorRPCConnectionKeepAliveTimeOut,
- ErrorRPCConnectionInternal,
- ErrorMissingHeader,
- ErrorHandlerAborted,
- ErrorRPCCallerFailed,
- ErrorMissingCaller,
- JSONRPCErrorCode,
-};
diff --git a/src/errors/index.ts b/src/errors/index.ts
deleted file mode 100644
index f72bc43..0000000
--- a/src/errors/index.ts
+++ /dev/null
@@ -1 +0,0 @@
-export * from './errors';
diff --git a/src/events.ts b/src/events.ts
deleted file mode 100644
index 828cca4..0000000
--- a/src/events.ts
+++ /dev/null
@@ -1,85 +0,0 @@
-import type RPCServer from './RPCServer';
-import type RPCClient from './RPCClient';
-import type {
- ErrorRPCConnectionLocal,
- ErrorRPCConnectionPeer,
- ErrorRPCConnectionKeepAliveTimeOut,
- ErrorRPCConnectionInternal,
-} from './errors';
-import { AbstractEvent } from '@matrixai/events';
-import * as rpcErrors from './errors';
-
-abstract class EventRPC extends AbstractEvent {}
-
-abstract class EventRPCClient extends AbstractEvent {}
-
-abstract class EventRPCServer extends AbstractEvent {}
-
-abstract class EventRPCConnection extends AbstractEvent {}
-
-// Client events
-class EventRPCClientDestroy extends EventRPCClient {}
-
-class EventRPCClientDestroyed extends EventRPCClient {}
-
-class EventRPCClientCreate extends EventRPCClient {}
-
-class EventRPCClientCreated extends EventRPCClient {}
-
-class EventRPCClientError extends EventRPCClient {}
-
-class EventRPCClientConnect extends EventRPCClient {}
-
-// Server events
-
-class EventRPCServerConnection extends EventRPCServer {}
-
-class EventRPCServerCreate extends EventRPCServer {}
-
-class EventRPCServerCreated extends EventRPCServer {}
-
-class EventRPCServerDestroy extends EventRPCServer {}
-
-class EventRPCServerDestroyed extends EventRPCServer {}
-
-class EventRPCServerError extends EventRPCServer {}
-
-class EventRPCConnectionError extends EventRPCConnection<
- | ErrorRPCConnectionLocal
- | ErrorRPCConnectionPeer
- | ErrorRPCConnectionKeepAliveTimeOut
- | ErrorRPCConnectionInternal
-> {}
-
-class RPCErrorEvent extends Event {
- public detail: Error;
- constructor(
- options: EventInit & {
- detail: Error;
- },
- ) {
- super('error', options);
- this.detail = options.detail;
- }
-}
-
-export {
- RPCErrorEvent,
- EventRPC,
- EventRPCClient,
- EventRPCServer,
- EventRPCConnection,
- EventRPCClientDestroy,
- EventRPCClientDestroyed,
- EventRPCClientCreate,
- EventRPCClientCreated,
- EventRPCClientError,
- EventRPCClientConnect,
- EventRPCServerConnection,
- EventRPCServerCreate,
- EventRPCServerCreated,
- EventRPCServerDestroy,
- EventRPCServerDestroyed,
- EventRPCServerError,
- EventRPCConnectionError,
-};
diff --git a/src/handlers/ClientHandler.ts b/src/handlers/ClientHandler.ts
deleted file mode 100644
index 0aea354..0000000
--- a/src/handlers/ClientHandler.ts
+++ /dev/null
@@ -1,21 +0,0 @@
-import type { ContainerType, JSONValue } from '../types';
-import type { ContextTimed } from '@matrixai/contexts';
-import Handler from './Handler';
-import { ErrorRPCMethodNotImplemented } from '../errors';
-
-abstract class ClientHandler<
- Container extends ContainerType = ContainerType,
- Input extends JSONValue = JSONValue,
- Output extends JSONValue = JSONValue,
-> extends Handler {
- public handle = async (
- input: AsyncIterableIterator,
- cancel: (reason?: any) => void,
- meta: Record | undefined,
- ctx: ContextTimed,
- ): Promise