Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import type {
ClientManifest,
RPCStream,
JSONRPCResponseResult,
ToError,
} from './types';
import type { ErrorRPC } from './errors';
import Logger from '@matrixai/logger';
import { Timer } from '@matrixai/timer';
import * as middleware from './middleware';
Expand All @@ -28,7 +28,7 @@ class RPCClient<M extends ClientManifest> {
protected idGen: IdGen;
protected logger: Logger;
protected streamFactory: StreamFactory;
protected toError?: typeof utils.toError;
protected toError: ToError;
protected middlewareFactory: MiddlewareFactory<
Uint8Array,
JSONRPCRequest,
Expand Down Expand Up @@ -86,7 +86,7 @@ class RPCClient<M extends ClientManifest> {
middlewareFactory = middleware.defaultClientMiddlewareWrapper(),
streamKeepAliveTimeoutTime = Infinity,
logger,
toError,
toError = utils.toError,
idGen = () => Promise.resolve(null),
}: {
manifest: M;
Expand All @@ -100,10 +100,7 @@ class RPCClient<M extends ClientManifest> {
streamKeepAliveTimeoutTime?: number;
logger?: Logger;
idGen?: IdGen;
toError?: (
errorData: JSONValue,
metadata: Record<string, JSONValue>,
) => ErrorRPC<any>;
toError?: ToError;
}) {
this.idGen = idGen;
this.callerTypes = utils.getHandlerTypes(manifest);
Expand Down Expand Up @@ -472,7 +469,20 @@ class RPCClient<M extends ClientManifest> {
...(rpcStream.meta ?? {}),
command: method,
};
throw utils.toError(messageValue.error, metadata);
const e: errors.ErrorRPCProtocol<any> =
errors.ErrorRPCProtocol.fromJSON(messageValue.error);
if (
e instanceof errors.ErrorRPCRemote &&
messageValue.error.data != null &&
typeof messageValue.error.data === 'object' &&
'cause' in messageValue.error.data
) {
e.metadata = metadata;
e.cause = this.toError(
JSON.parse(messageValue.error.data.cause as string),
);
}
throw e;
}
leadingMessage = messageValue;
} catch (e) {
Expand Down
73 changes: 54 additions & 19 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import type {
UnaryHandlerImplementation,
RPCStream,
MiddlewareFactory,
FromError,
} from './types';
import type { POJO } from '@matrixai/errors';
import { ReadableStream, TransformStream } from 'stream/web';
import Logger from '@matrixai/logger';
import { PromiseCancellable } from '@matrixai/async-cancellable';
Expand Down Expand Up @@ -59,8 +61,8 @@ class RPCServer {
protected defaultTimeoutMap: Map<string, number | undefined> = new Map();
protected handlerTimeoutTime: number;
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
protected fromError: (error: errors.ErrorRPC<any>) => JSONValue;
protected filterSensitive: (key: string, value: any) => any;
protected fromError: FromError;
protected replacer?: (key: string, value: any) => any;
protected middlewareFactory: MiddlewareFactory<
JSONRPCRequest,
Uint8Array,
Expand Down Expand Up @@ -94,7 +96,7 @@ class RPCServer {
logger,
idGen = () => Promise.resolve(null),
fromError = utils.fromError,
filterSensitive = utils.filterSensitive,
replacer,
}: {
middlewareFactory?: MiddlewareFactory<
JSONRPCRequest,
Expand All @@ -105,14 +107,14 @@ class RPCServer {
handlerTimeoutTime?: number;
logger?: Logger;
idGen?: IdGen;
fromError?: (error: errors.ErrorRPC<any>) => JSONValue;
filterSensitive?: (key: string, value: any) => any;
fromError?: FromError;
replacer?: (key: string, value: any) => any;
}) {
this.idGen = idGen;
this.middlewareFactory = middlewareFactory;
this.handlerTimeoutTime = handlerTimeoutTime;
this.fromError = fromError ?? utils.fromError;
this.filterSensitive = filterSensitive ?? utils.filterSensitive;
this.fromError = fromError;
this.replacer = replacer;
this.logger = logger ?? new Logger(this.constructor.name);
}

Expand Down Expand Up @@ -196,7 +198,7 @@ class RPCServer {
const handlerPs = new Array<PromiseCancellable<void>>();
if (force) {
for await (const [activeStream] of this.activeStreams.entries()) {
if (force) activeStream.cancel(new errors.ErrorRPCStopping());
if (force) activeStream.cancel(reason);
handlerPs.push(activeStream);
}
await Promise.all(handlerPs);
Expand Down Expand Up @@ -319,11 +321,26 @@ class RPCServer {
}
controller.enqueue(value);
} catch (e) {
const rpcError: JSONRPCError = {
code: e.exitCode ?? errors.JSONRPCErrorCode.InternalError,
message: e.description ?? '',
data: JSON.stringify(this.fromError(e), this.filterSensitive),
};
let rpcError: JSONRPCError;
if (e instanceof errors.ErrorRPCProtocol) {
rpcError = e.toJSON();
} else {
rpcError = new errors.ErrorRPCRemote(e?.message).toJSON();
try {
(rpcError.data as POJO).cause = JSON.stringify(
this.fromError(e),
this.replacer,
);
} catch (e) {
(rpcError.data as POJO).cause = e;
// Dispatch error in the case where the thrown value could not be parsed
this.dispatchEvent(
new events.RPCErrorEvent({
detail: e,
}),
);
}
}
const rpcErrorMessage: JSONRPCResponseError = {
jsonrpc: '2.0',
error: rpcError,
Expand Down Expand Up @@ -504,7 +521,10 @@ class RPCServer {
await timer.catch(() => {});
this.dispatchEvent(
new events.RPCErrorEvent({
detail: new errors.ErrorRPCOutputStreamError(),
detail: new errors.ErrorRPCOutputStreamError(
'Stream failed waiting for header',
{ cause: newErr },
),
}),
);
return;
Expand Down Expand Up @@ -576,11 +596,26 @@ class RPCServer {
{ signal: abortController.signal, timer },
);
} catch (e) {
const rpcError: JSONRPCError = {
code: e.exitCode ?? errors.JSONRPCErrorCode.InternalError,
message: e.description ?? '',
data: JSON.stringify(this.fromError(e), this.filterSensitive),
};
let rpcError: JSONRPCError;
if (e instanceof errors.ErrorRPCProtocol) {
rpcError = e.toJSON();
} else {
rpcError = new errors.ErrorRPCRemote(e?.message).toJSON();
try {
(rpcError.data as POJO).cause = JSON.stringify(
this.fromError(e),
this.replacer,
);
} catch (e) {
(rpcError.data as POJO).cause = e;
// Dispatch error in the case where the thrown value could not be parsed
this.dispatchEvent(
new events.RPCErrorEvent({
detail: e,
}),
);
}
}
const rpcErrorMessage: JSONRPCResponseError = {
jsonrpc: '2.0',
error: rpcError,
Expand Down
1 change: 0 additions & 1 deletion src/callers/RawCaller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { JSONValue } from '../types';
import Caller from './Caller';
class RawCaller extends Caller {
public type: 'RAW' = 'RAW' as const;
Expand Down
Loading