Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"postversion": "npm install --package-lock-only --ignore-scripts --silent",
"ts-node": "ts-node",
"ts-node-inspect": "node --require ts-node/register --inspect",
"test": "node --expose-gc ./node_modules/.bin/jest",
"test": "jest",
"lint": "eslint '{src,tests,scripts}/**/*.{js,ts,json}' 'benches/**/*.{js,ts}'",
"lintfix": "eslint '{src,tests,scripts}/**/*.{js,ts,json}' 'benches/**/*.{js,ts}' --fix",
"lint-shell": "find ./src ./tests ./scripts -type f -regextype posix-extended -regex '.*\\.(sh)' -exec shellcheck {} +",
Expand Down
16 changes: 8 additions & 8 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -628,16 +628,16 @@ class PolykeyAgent {
workers?: number;
fresh?: boolean;
}) {
const optionsDefaulted = utils.mergeObjects(options, {
clientServiceHost: config.defaultsUser.clientServiceHost,
clientServicePort: config.defaultsUser.clientServicePort,
agentServiceHost: config.defaultsUser.agentServiceHost,
agentServicePort: config.defaultsUser.agentServicePort,
workers: config.defaultsUser.workers,
ipv6Only: config.defaultsUser.ipv6Only,
});
try {
this.logger.info(`Starting ${this.constructor.name}`);
const optionsDefaulted = utils.mergeObjects(options, {
clientServiceHost: config.defaultsUser.clientServiceHost,
clientServicePort: config.defaultsUser.clientServicePort,
agentServiceHost: config.defaultsUser.agentServiceHost,
agentServicePort: config.defaultsUser.agentServicePort,
workers: config.defaultsUser.workers,
ipv6Only: config.defaultsUser.ipv6Only,
});
// Register event handlers
this.certManager.addEventListener(
keysEvents.EventCertManagerCertChange.name,
Expand Down
210 changes: 147 additions & 63 deletions src/PolykeyClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class PolykeyClient {
public static createPolykeyClient(
opts: {
nodeId: string | NodeId;
port: number;
host: string;
port: number;
options?: DeepPartial<PolykeyClientOptions>;
fresh?: boolean;
fs?: FileSystem;
Expand Down Expand Up @@ -98,25 +98,6 @@ class PolykeyClient {
@context ctx: ContextTimed,
): Promise<PolykeyClient> {
logger.info(`Creating ${this.name}`);
let nodeId_: NodeId;
if (typeof nodeId === 'string') {
try {
nodeId_ = ids.parseNodeId(nodeId);
} catch (e) {
if (e instanceof validationErrors.ErrorParse) {
throw new errors.ErrorPolykeyClientNodeIdInvalid(
'Encoded node ID must be a multibase base32hex encoded public-key',
{
cause: e,
data: { nodeId },
},
);
}
throw e;
}
} else {
nodeId_ = nodeId;
}
const optionsDefaulted = utils.mergeObjects(options, {
nodePath: config.defaultsUser.nodePath,
connectTimeoutTime: config.defaultsSystem.clientConnectTimeoutTime,
Expand All @@ -138,111 +119,214 @@ class PolykeyClient {
logger: logger.getChild(Session.name),
fresh,
});
const webSocketClient = await WebSocketClient.createWebSocketClient(
const pkClient = new this({
nodePath: optionsDefaulted.nodePath,
session,
fs,
logger,
});
await pkClient.start(
{
nodeId,
host,
port,
config: {
verifyPeer: true,
verifyCallback: async (certs) => {
await clientUtils.verifyServerCertificateChain([nodeId_], certs);
},
options: {
keepAliveTimeoutTime: optionsDefaulted.keepAliveTimeoutTime,
keepAliveIntervalTime: optionsDefaulted.keepAliveIntervalTime,
rpcCallTimeoutTime: optionsDefaulted.rpcCallTimeoutTime,
rpcParserBufferSize: optionsDefaulted.rpcParserBufferSize,
},
logger: logger.getChild(WebSocketClient.name),
fresh,
},
ctx,
);
const rpcClient = new RPCClient({
manifest: clientClientManifest,
streamFactory: () => webSocketClient.connection.newStream(),
middlewareFactory: rpcMiddleware.defaultClientMiddlewareWrapper(
clientMiddleware.middlewareClient(session),
optionsDefaulted.rpcParserBufferSize,
),
toError: networkUtils.toError,
streamKeepAliveTimeoutTime: optionsDefaulted.rpcCallTimeoutTime,
logger: logger.getChild(RPCClient.name),
});
const pkClient = new this({
nodePath: optionsDefaulted.nodePath,
webSocketClient,
rpcClient,
session,
fs,
logger,
});
await pkClient.start();
logger.info(`Created ${this.name}`);
return pkClient;
}

public readonly nodePath: string;
public readonly session: Session;
public readonly webSocketClient: WebSocketClient;
public readonly rpcClient: RPCClient<typeof clientClientManifest>;

protected fs: FileSystem;
protected logger: Logger;
protected _nodeId: NodeId;
protected _webSocketClient: WebSocketClient;
protected _rpcClient: RPCClient<typeof clientClientManifest>;

constructor({
nodePath,
webSocketClient,
rpcClient,
session,
fs,
logger,
}: {
nodePath: string;
webSocketClient: WebSocketClient;
rpcClient: RPCClient<typeof clientClientManifest>;
session: Session;
fs: FileSystem;
logger: Logger;
}) {
this.logger = logger;
this.nodePath = nodePath;
this.webSocketClient = webSocketClient;
this.rpcClient = rpcClient;
this.session = session;
this.fs = fs;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get nodeId() {
return this._nodeId;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get webSocketClient() {
return this._webSocketClient;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get rpcClient() {
return this._rpcClient;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get host() {
return this.webSocketClient.connection.remoteHost;
return this._webSocketClient.connection.remoteHost;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get port() {
return this.webSocketClient.connection.remotePort;
return this._webSocketClient.connection.remotePort;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get localHost() {
return this.webSocketClient.connection.localHost;
return this._webSocketClient.connection.localHost;
}

@ready(new errors.ErrorPolykeyClientNotRunning())
public get localPort() {
return this.webSocketClient.connection.localPort;
return this._webSocketClient.connection.localPort;
}

public async start(): Promise<void> {
public start(
opts: {
nodeId: string | NodeId;
host: string;
port: number;
options?: DeepPartial<{
keepAliveTimeoutTime: number;
keepAliveIntervalTime: number;
rpcCallTimeoutTime: number;
rpcParserBufferSize: number;
}>;
fresh?: boolean;
},
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
@timedCancellable(
true,
config.defaultsSystem.clientConnectTimeoutTime,
errors.ErrorPolykeyClientCreateTimeout,
)
public async start(
{
nodeId,
host,
port,
options = {},
fresh = false,
}: {
nodeId: string | NodeId;
host: string;
port: number;
options?: DeepPartial<{
keepAliveTimeoutTime: number;
keepAliveIntervalTime: number;
rpcCallTimeoutTime: number;
rpcParserBufferSize: number;
}>;
fresh?: boolean;
},
@context ctx: ContextTimed,
): Promise<void> {
this.logger.info(`Starting ${this.constructor.name}`);
const optionsDefaulted = utils.mergeObjects(options, {
keepAliveTimeoutTime: config.defaultsSystem.clientKeepAliveTimeoutTime,
keepAliveIntervalTime: config.defaultsSystem.clientKeepAliveIntervalTime,
rpcCallTimeoutTime: config.defaultsSystem.rpcCallTimeoutTime,
rpcParserBufferSize: config.defaultsSystem.rpcParserBufferSize,
});
let nodeId_: NodeId;
if (typeof nodeId === 'string') {
try {
nodeId_ = ids.parseNodeId(nodeId);
} catch (e) {
if (e instanceof validationErrors.ErrorParse) {
throw new errors.ErrorPolykeyClientNodeIdInvalid(
'Encoded node ID must be a multibase base32hex encoded public-key',
{
cause: e,
data: { nodeId },
},
);
}
throw e;
}
} else {
nodeId_ = nodeId;
}
await this.session.start({ fresh });
const webSocketClient = await WebSocketClient.createWebSocketClient(
{
host,
port,
config: {
verifyPeer: true,
verifyCallback: async (certs) => {
await clientUtils.verifyServerCertificateChain([nodeId_], certs);
},
keepAliveTimeoutTime: optionsDefaulted.keepAliveTimeoutTime,
keepAliveIntervalTime: optionsDefaulted.keepAliveIntervalTime,
},
logger: this.logger.getChild(WebSocketClient.name),
},
ctx,
);
const rpcClient = new RPCClient({
manifest: clientClientManifest,
streamFactory: () => webSocketClient.connection.newStream(),
middlewareFactory: rpcMiddleware.defaultClientMiddlewareWrapper(
clientMiddleware.middlewareClient(this.session),
optionsDefaulted.rpcParserBufferSize,
),
toError: networkUtils.toError,
streamKeepAliveTimeoutTime: optionsDefaulted.rpcCallTimeoutTime,
logger: this.logger.getChild(RPCClient.name),
});
this._nodeId = nodeId_;
this._webSocketClient = webSocketClient;
this._rpcClient = rpcClient;
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop() {
/**
* Stops Polykey Client
*
* Stopping can force destruction of the websocket client.
* Prefer not forcing for a graceful stop.
* Stopping the session does not destroy the session state.
*/
public async stop({ force = false }: { force?: boolean } = {}) {
this.logger.info(`Stopping ${this.constructor.name}`);
await this._webSocketClient.destroy({ force });
await this.session.stop();
this.logger.info(`Stopped ${this.constructor.name}`);
}

public async destroy({ force = false }: { force?: boolean }) {
/**
* Destroys Polykey Client
*
* This will destroy the session state.
*/
public async destroy() {
this.logger.info(`Destroying ${this.constructor.name}`);
await this.webSocketClient.destroy({ force });
await this.session.destroy();
this.logger.info(`Destroyed ${this.constructor.name}`);
}
Expand Down
30 changes: 29 additions & 1 deletion tests/PolykeyClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ describe(PolykeyClient.name, () => {
await session.writeToken('abc' as SessionToken);
await pkClient.stop();
expect(await session.readToken()).toBeDefined();
await pkClient.destroy({ force: true });
await pkClient.destroy();
expect(await session.readToken()).toBeUndefined();
});
test('connect to agent client service', async () => {
Expand All @@ -155,6 +155,34 @@ describe(PolykeyClient.name, () => {
expect(remoteCertPem).toEqual(agentCertPem);
await pkClient.stop();
});
test('reconnect to agent client service', async () => {
const pkClient = await PolykeyClient.createPolykeyClient({
nodeId: pkAgent.keyRing.getNodeId(),
port: pkAgent.clientServicePort,
host: pkAgent.clientServiceHost,
options: {
nodePath: nodePath,
},
fs,
logger: logger.getChild(PolykeyClient.name),
fresh: true,
});
await pkClient.stop();
await pkClient.start({
nodeId: pkAgent.keyRing.getNodeId(),
port: pkAgent.clientServicePort,
host: pkAgent.clientServiceHost,
});
expect(pkClient.host).toBe(pkAgent.clientServiceHost);
expect(pkClient.port).toBe(pkAgent.clientServicePort);
const connectionMeta = pkClient.webSocketClient.connection.meta();
expect(connectionMeta.remoteCertsChain).toHaveLength(1);
const remoteCert = connectionMeta.remoteCertsChain[0];
const remoteCertPem = webSocketUtils.derToPEM(remoteCert);
const agentCertPem = await pkAgent.certManager.getCurrentCertPEM();
expect(remoteCertPem).toEqual(agentCertPem);
await pkClient.stop();
});
test('authenticated RPC request to agent client service', async () => {
const pkClient = await PolykeyClient.createPolykeyClient({
nodeId: pkAgent.keyRing.getNodeId(),
Expand Down