From 54a74b779f0673fff8d7c66646952c9eab21b1ee Mon Sep 17 00:00:00 2001 From: Aryan Jassal Date: Mon, 10 Feb 2025 14:32:26 +1100 Subject: [PATCH 1/2] feat: added proper cancellation to pending authentication --- src/discovery/Discovery.ts | 127 +++++++++++++++++++++---- src/nodes/NodeConnectionManager.ts | 75 +++++++++++++-- src/nodes/NodeGraph.ts | 59 +++++++++--- src/nodes/NodeManager.ts | 148 ++++++++++++++++++----------- src/tasks/TaskManager.ts | 26 ++++- src/vaults/VaultManager.ts | 2 +- 6 files changed, 339 insertions(+), 98 deletions(-) diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index 7d32add22..a9aa2c59c 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -153,15 +153,17 @@ class Discovery { parent: GestaltIdEncoded | null, ) => { try { + this.logger.error('out here 1') await this.processVertex( vertex, lastProcessedCutoffTime ?? undefined, ctx, ); + this.logger.error('out here 2') this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexProcessed({ detail: { - vertex, + vertex: vertex, parent: parent ?? undefined, }, }), @@ -169,18 +171,23 @@ class Discovery { } catch (e) { // We need to reschedule if the task was cancelled due to discovery domain stopping if (e === discoveryStoppingTaskReason) { + this.logger.error('out here 3') // We need to recreate the task for the vertex const vertexId = gestaltsUtils.decodeGestaltId(vertex); if (vertexId == null) { never(`failed to decode vertex GestaltId "${vertex}"`); } + this.logger.error('out here 4') await this.scheduleDiscoveryForVertex( vertexId, undefined, undefined, gestaltsUtils.decodeGestaltId(parent ?? undefined), true, + undefined, + ctx, ); + this.logger.error('out here 5') return; } // Aborting a duplicate task is not an error @@ -190,7 +197,7 @@ class Discovery { this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexFailed({ detail: { - vertex, + vertex: vertex, parent: parent ?? undefined, message: e.message, code: e.code, @@ -206,9 +213,11 @@ class Discovery { /** * This handler is run periodically to check if nodes are ready to be rediscovered */ - protected checkRediscoveryHandler: TaskHandler = async () => { + protected checkRediscoveryHandler: TaskHandler = async (ctx: ContextTimed) => { await this.checkRediscovery( Date.now() - this.rediscoverVertexThresholdTime, + undefined, + ctx, ); await this.taskManager.scheduleTask({ handlerId: this.checkRediscoveryHandlerId, @@ -328,15 +337,26 @@ class Discovery { /** * Queues a node for discovery. Internally calls `pushKeyToDiscoveryQueue`. */ - @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) public async queueDiscoveryByNode( nodeId: NodeId, lastProcessedCutoffTime?: number, + ctx?: Partial, + ): Promise; + @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) + @timedCancellable(true) + public async queueDiscoveryByNode( + nodeId: NodeId, + lastProcessedCutoffTime: number | undefined, + @context ctx: ContextTimed, ): Promise { await this.scheduleDiscoveryForVertex( ['node', nodeId], undefined, lastProcessedCutoffTime, + undefined, + undefined, + undefined, + ctx, ); } @@ -344,16 +364,28 @@ class Discovery { * Queues an identity for discovery. Internally calls * `pushKeyToDiscoveryQueue`. */ - @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) public async queueDiscoveryByIdentity( providerId: ProviderId, identityId: IdentityId, lastProcessedCutoffTime?: number, + ctx?: Partial + ): Promise; + @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) + @timedCancellable(true) + public async queueDiscoveryByIdentity( + providerId: ProviderId, + identityId: IdentityId, + lastProcessedCutoffTime: number | undefined, + @context ctx: ContextTimed, ): Promise { await this.scheduleDiscoveryForVertex( ['identity', [providerId, identityId]], undefined, lastProcessedCutoffTime, + undefined, + undefined, + undefined, + ctx, ); } @@ -407,9 +439,17 @@ class Discovery { const [type, id] = vertexId; switch (type) { case 'node': - return await this.processNode(id, ctx, lastProcessedCutoffTime); + this.logger.error('processnode before') + // return await this.processNode(id, lastProcessedCutoffTime, ctx); + const val1 = await this.processNode(id, lastProcessedCutoffTime, ctx); + this.logger.error('processnode after') + return val1 case 'identity': - return await this.processIdentity(id, ctx, lastProcessedCutoffTime); + this.logger.error('processidentity before') + // return await this.processIdentity(id, lastProcessedCutoffTime, ctx); + const val2 = await this.processIdentity(id, lastProcessedCutoffTime, ctx); + this.logger.error('processidentity after') + return val2 default: never(`type must be either "node" or "identity" got "${type}"`); } @@ -417,8 +457,8 @@ class Discovery { protected async processNode( nodeId: NodeId, + lastProcessedCutoffTime: number | undefined, ctx: ContextTimed, - lastProcessedCutoffTime?: number, ) { // If the vertex we've found is our own node, we simply get our own chain const processedTime = Date.now(); @@ -426,26 +466,34 @@ class Discovery { if (nodeId.equals(this.keyRing.getNodeId())) { // Skip our own nodeId, we actively add this information when it changes, // so there is no need to scan it. + this.logger.error('before processed tiem') await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, ); + this.logger.error('after processed tiem') return; } + this.logger.error('before claim') const newestClaimId = await this.gestaltGraph.getClaimIdNewest(nodeId); + this.logger.error('after claim') // The sigChain data of the vertex (containing all cryptolinks) let vertexChainData: Record = {}; try { + this.logger.error('before chain') vertexChainData = await this.nodeManager.requestChainData( nodeId, newestClaimId, ctx, ); + this.logger.error('after chain') } catch (e) { + this.logger.error('before chain time error') await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, ); + this.logger.error('after chain time error') // Not strictly an error in this case, we can fail to connect this.logger.info( `Failed to discover ${nodesUtils.encodeNodeId( @@ -456,22 +504,27 @@ class Discovery { } // Iterate over each of the claims in the chain (already verified). for (const signedClaim of Object.values(vertexChainData)) { - if (ctx.signal.aborted) throw ctx.signal.reason; + ctx.signal.throwIfAborted(); switch (signedClaim.payload.typ) { case 'ClaimLinkNode': + this.logger.error('claimlinknode before') await this.processClaimLinkNode( signedClaim as SignedClaim, nodeId, lastProcessedCutoffTime, + ctx, ); + this.logger.error('claimlinknode after') break; case 'ClaimLinkIdentity': + this.logger.error('claimlinkidentity before') await this.processClaimLinkIdentity( signedClaim as SignedClaim, nodeId, - ctx, lastProcessedCutoffTime, + ctx, ); + this.logger.error('claimlinkidentity after') break; default: never( @@ -479,20 +532,24 @@ class Discovery { ); } } + this.logger.error('setvertex time before') await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, ); + this.logger.error('setvertex time after') } protected async processClaimLinkNode( signedClaim: SignedClaim, nodeId: NodeId, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, + ctx: ContextTimed, ): Promise { // Get the chain data of the linked node // Could be node1 or node2 in the claim so get the one that's // not equal to nodeId from above + this.logger.error('processClaimLinkNode 1') const node1Id = nodesUtils.decodeNodeId(signedClaim.payload.iss); if (node1Id == null) { never(`failed to decode issuer NodeId "${signedClaim.payload.iss}"`); @@ -501,6 +558,7 @@ class Discovery { if (node2Id == null) { never(`failed to decode subject NodeId "${signedClaim.payload.sub}"`); } + this.logger.error('processClaimLinkNode 2') // Verify the claim const node1PublicKey = keysUtils.publicKeyFromNodeId(node1Id); const node2PublicKey = keysUtils.publicKeyFromNodeId(node2Id); @@ -514,10 +572,12 @@ class Discovery { ); return; } + this.logger.error('processClaimLinkNode 3') const linkedNodeId = node1Id.equals(nodeId) ? node2Id : node1Id; const linkedVertexNodeInfo: GestaltNodeInfo = { nodeId: linkedNodeId, }; + this.logger.error('processClaimLinkNode 4') await this.gestaltGraph.linkNodeAndNode( { nodeId, @@ -528,6 +588,7 @@ class Discovery { meta: {}, }, ); + this.logger.error('processClaimLinkNode 5') const claimId = decodeClaimId(signedClaim.payload.jti); if (claimId == null) { never(`failed to decode claimId "${signedClaim.payload.jti}"`); @@ -541,11 +602,15 @@ class Discovery { lastProcessedCutoffTime, )) ) { + this.logger.error('processClaimLinkNode 7') await this.scheduleDiscoveryForVertex( linkedGestaltId, undefined, lastProcessedCutoffTime, ['node', nodeId], + undefined, + undefined, + ctx, ); } } @@ -553,8 +618,8 @@ class Discovery { protected async processClaimLinkIdentity( signedClaim: SignedClaim, nodeId: NodeId, - ctx: ContextTimed, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, + ctx: ContextTimed, ): Promise { // Checking the claim is valid const publicKey = keysUtils.publicKeyFromNodeId(nodeId); @@ -649,14 +714,17 @@ class Discovery { undefined, lastProcessedCutoffTime, ['node', nodeId], + undefined, + undefined, + ctx, ); } } protected async processIdentity( id: ProviderIdentityId, - ctx: ContextTimed, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, + ctx: ContextTimed, ) { // If the next vertex is an identity, perform a social discovery // Firstly get the identity info of this identity @@ -725,6 +793,9 @@ class Discovery { undefined, lastProcessedCutoffTime, ['identity', providerIdentityId], + undefined, + undefined, + ctx, ); } } @@ -784,11 +855,12 @@ class Discovery { */ protected async scheduleDiscoveryForVertex( vertex: GestaltId, - delay?: number, - lastProcessedCutoffTime?: number, - parent?: GestaltId, + delay: number | undefined, + lastProcessedCutoffTime: number | undefined, + parent: GestaltId | undefined, ignoreActive: boolean = false, - tran?: DBTransaction, + tran: DBTransaction | undefined, + ctx: ContextTimed, ) { if (tran == null) { return this.db.withTransactionF((tran) => @@ -799,6 +871,7 @@ class Discovery { parent, ignoreActive, tran, + ctx, ), ); } @@ -811,6 +884,7 @@ class Discovery { gestaltIdEncoded, ].join(''), ); + this.logger.error('here1') // Check if task exists let taskExisting: Task | null = null; for await (const task of this.taskManager.getTasks( @@ -818,13 +892,17 @@ class Discovery { true, [this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded], tran, + ctx, )) { + ctx.signal.throwIfAborted(); + this.logger.error('here2') // Ignore active tasks if (ignoreActive && task.status === 'active') continue; if (taskExisting == null) { taskExisting = task; continue; } + this.logger.error('here3') // Any extra tasks should be cancelled, this shouldn't normally happen task.cancel(abortSingletonTaskReason); this.dispatchEvent( @@ -836,11 +914,13 @@ class Discovery { }), ); } + this.logger.error('here4') // Only create if it doesn't exist if (taskExisting != null) return; this.logger.info( `Scheduling new discovery for vertex with gestaltId ${gestaltIdEncoded}`, ); + this.logger.error('here5') await this.taskManager.scheduleTask( { handlerId: this.discoverVertexHandlerId, @@ -852,10 +932,11 @@ class Discovery { ], lazy: true, deadline: this.discoverVertexTimeoutTime, - delay, + delay: delay, }, tran, ); + this.logger.error('here6') this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexQueued({ detail: { @@ -1006,10 +1087,12 @@ class Discovery { } // Refresh timer in preparation for request ctx.timer.refresh(); + this.logger.error('verifyIdentityClaim before getClaim') const identitySignedClaim = await provider.getClaim( authIdentityId, claimId, ); + this.logger.error('verifyIdentityClaim after getClaim') if (identitySignedClaim == null) { continue; } @@ -1034,10 +1117,16 @@ class Discovery { public async checkRediscovery( lastProcessedCutoffTime: number, tran?: DBTransaction, + ctx?: Partial, + ): Promise; + public async checkRediscovery( + lastProcessedCutoffTime: number, + tran: DBTransaction | undefined, + ctx: ContextTimed, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => - this.checkRediscovery(lastProcessedCutoffTime, tran), + this.checkRediscovery(lastProcessedCutoffTime, tran, ctx), ); } @@ -1055,6 +1144,7 @@ class Discovery { }, tran, )) { + ctx.signal.throwIfAborted(); gestaltIds.push([ gestaltsUtils.encodeGestaltId(gestaltId), lastProcessedTime, @@ -1091,6 +1181,7 @@ class Discovery { [this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded], tran, )) { + ctx.signal.throwIfAborted(); if (taskExisting == null) { taskExisting = task; continue; diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 27bd2e6f6..cb9c043bc 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -43,7 +43,11 @@ import { status, } from '@matrixai/async-init/dist/StartStop'; import { AbstractEvent, EventAll } from '@matrixai/events'; -import { context, timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { + context, + timed, + timedCancellable +} from "@matrixai/contexts/dist/decorators"; import { Semaphore } from '@matrixai/async-locks'; import { PromiseCancellable } from '@matrixai/async-cancellable'; import NodeConnection from './NodeConnection'; @@ -768,13 +772,15 @@ class NodeConnectionManager { * itself is such that we can pass targetNodeId as a parameter (as opposed to * an acquire function with no parameters). * @param targetNodeId Id of target node to communicate with + * @param ctx * @returns ResourceAcquire Resource API for use in with contexts */ public acquireConnection( targetNodeId: NodeId, + ctx: ContextTimed, ): ResourceAcquire { return async () => { - await this.isAuthenticatedP(targetNodeId); + await this.isAuthenticatedP(targetNodeId, ctx); return await this.acquireConnectionInternal(targetNodeId)(); }; } @@ -785,14 +791,22 @@ class NodeConnectionManager { * doesn't exist. * for use with normal arrow function * @param targetNodeId Id of target node to communicate with + * @param ctx * @param f Function to handle communication */ public async withConnF( targetNodeId: NodeId, + ctx: Partial | undefined, + f: (conn: NodeConnection) => Promise, + ): Promise; + @timedCancellable(true) + public async withConnF( + targetNodeId: NodeId, + @context ctx: ContextTimed, f: (conn: NodeConnection) => Promise, ): Promise { return await withF( - [this.acquireConnection(targetNodeId)], + [this.acquireConnection(targetNodeId, ctx)], async ([conn]) => { return await f(conn); }, @@ -805,14 +819,22 @@ class NodeConnectionManager { * doesn't exist. * for use with a generator function * @param targetNodeId Id of target node to communicate with + * @param ctx * @param g Generator function to handle communication */ + public withConnG( + targetNodeId: NodeId, + ctx: Partial | undefined, + g: (conn: NodeConnection) => AsyncGenerator, + ): AsyncGenerator; @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) + @timed() public async *withConnG( targetNodeId: NodeId, + @context ctx: ContextTimed, g: (conn: NodeConnection) => AsyncGenerator, ): AsyncGenerator { - const acquire = this.acquireConnection(targetNodeId); + const acquire = this.acquireConnection(targetNodeId, ctx); const [release, conn] = await acquire(); let caughtError: Error | undefined; try { @@ -975,6 +997,7 @@ class NodeConnectionManager { } const { host, port } = await this.withConnF( nodeIdSignaller, + ctx, async (conn) => { const client = conn.getClient(); const nodeIdSource = this.keyRing.getNodeId(); @@ -1440,8 +1463,23 @@ class NodeConnectionManager { * @param targetNodeId - NodeId of the node that needs to initiate hole punching. * @param address - Address the target needs to punch to. * @param requestSignature - `base64url` encoded signature + * @param ctx */ + public async handleNodesConnectionSignalInitial( + sourceNodeId: NodeId, + targetNodeId: NodeId, + address: { + host: Host; + port: Port; + }, + requestSignature: string, + ctx?: Partial, + ): Promise<{ + host: Host; + port: Port; + }>; @ready(new nodesErrors.ErrorNodeManagerNotRunning()) + @timedCancellable(true) public async handleNodesConnectionSignalInitial( sourceNodeId: NodeId, targetNodeId: NodeId, @@ -1450,6 +1488,7 @@ class NodeConnectionManager { port: Port; }, requestSignature: string, + @context ctx: ContextTimed, ): Promise<{ host: Host; port: Port; @@ -1479,12 +1518,12 @@ class NodeConnectionManager { this.keyRing.keyPair, data, ); - const connectionSignalP = this.withConnF(targetNodeId, async (conn) => { + const connectionSignalP = this.withConnF(targetNodeId, ctx, async (conn) => { const client = conn.getClient(); await client.methods.nodesConnectionSignalFinal({ sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId), targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId), - address, + address: address, requestSignature: requestSignature, relaySignature: relaySignature.toString('base64url'), }); @@ -1745,19 +1784,39 @@ class NodeConnectionManager { * Returns a promise that resolves once the connection has authenticated, * otherwise it rejects with the authentication failure * @param nodeId + * @param ctx */ - public async isAuthenticatedP(nodeId: NodeId): Promise { + public async isAuthenticatedP( + nodeId: NodeId, + ctx?: Partial, + ): Promise; + @timedCancellable(true) + public async isAuthenticatedP( + nodeId: NodeId, + @context ctx: ContextTimed, + ): Promise { const targetNodeIdString = nodeId.toString() as NodeIdString; const connectionsEntry = this.connections.get(targetNodeIdString); if (connectionsEntry == null) { throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); } + const { p: abortP, rejectP: triggerAbort } = utils.promise(); + const abortHandler = () => { + triggerAbort(ctx.signal.reason); + }; + if (ctx.signal.aborted) { + triggerAbort(ctx.signal.reason); + } else { + ctx.signal.addEventListener('abort', abortHandler, { once: true }); + } try { - return await connectionsEntry.authenticatedP; + return await Promise.race([connectionsEntry.authenticatedP, abortP]); } catch (e) { // Capture the stacktrace here since knowing where we're waiting for authentication is more useful Error.captureStackTrace(e); throw e; + } finally { + ctx.signal.removeEventListener('abort', abortHandler); } } diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index 61a9b4f86..588b29d97 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -1,5 +1,5 @@ import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; -import type { ContextTimed } from '@matrixai/contexts'; +import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import type { NodeId, NodeAddress, @@ -23,10 +23,12 @@ import * as nodesErrors from './errors'; import * as nodesEvents from './events'; import * as utils from '../utils'; import config from '../config'; +import { timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { context } from "@matrixai/contexts/dist/decorators"; /** * NodeGraph is an implementation of Kademlia for maintaining peer to peer - * information about Polkey nodes. + * information about Polykey nodes. * * It is a database of fixed-size buckets, where each bucket * contains NodeId -> NodeData. The bucket index is a prefix key. @@ -241,14 +243,21 @@ class NodeGraph { /** * Get a single `NodeContact` */ - @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getNodeContact( nodeId: NodeId, tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) + @timedCancellable(true) + public async getNodeContact( + nodeId: NodeId, + tran: DBTransaction | undefined, + @context ctx: ContextTimed ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => - this.getNodeContact(nodeId, tran), + return await this.db.withTransactionF(async (tran) => + await this.getNodeContact(nodeId, tran, ctx), ); } const [bucketIndex] = this.bucketIndex(nodeId); @@ -266,6 +275,7 @@ class NodeGraph { valueAsBuffer: false, }, )) { + ctx.signal.throwIfAborted(); const nodeContactAddress = keyPath[0].toString(); contact[nodeContactAddress] = nodeContactAddressData; } @@ -615,18 +625,29 @@ class NodeGraph { * @param limit Limit the number of nodes returned, note that `-1` means * no limit, but `Infinity` means `0`. * @param tran + * @param ctx */ + public async getBucket( + bucketIndex: NodeBucketIndex, + sort?: 'nodeId' | 'distance' | 'connected', + order?: 'asc' | 'desc', + limit?: number, + tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @timedCancellable(true) @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getBucket( bucketIndex: NodeBucketIndex, sort: 'nodeId' | 'distance' | 'connected' = 'nodeId', order: 'asc' | 'desc' = 'asc', - limit?: number, - tran?: DBTransaction, + limit: number | undefined, + tran: DBTransaction | undefined, + @context ctx: ContextTimed ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => - this.getBucket(bucketIndex, sort, order, limit, tran), + return await this.db.withTransactionF(async (tran) => + await this.getBucket(bucketIndex, sort, order, limit, tran, ctx), ); } if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) { @@ -647,6 +668,7 @@ class NodeGraph { pathAdjust: [''], }, )) { + ctx.signal.throwIfAborted(); bucket.push(result); } if (sort === 'distance') { @@ -660,6 +682,7 @@ class NodeGraph { limit, }, )) { + ctx.signal.throwIfAborted(); const nodeId = IdInternal.fromBuffer(nodeIdBuffer); const nodeContact = await this.getNodeContact( IdInternal.fromBuffer(nodeIdBuffer), @@ -883,15 +906,23 @@ class NodeGraph { * @returns The `NodeBucket` which could have less than `limit` nodes if the * node graph has less than the requested limit. */ + public async getClosestNodes( + nodeId: NodeId, + limit?: number, + tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @timedCancellable(true) @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getClosestNodes( nodeId: NodeId, limit: number = this.nodeBucketLimit, - tran?: DBTransaction, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => - this.getClosestNodes(nodeId, limit, tran), + return await this.db.withTransactionF(async (tran) => + await this.getClosestNodes(nodeId, limit, tran), ); } // Buckets map to the target node in the following way; @@ -915,6 +946,7 @@ class NodeGraph { undefined, undefined, tran, + ctx, ); // We need to iterate over the key stream // When streaming we want all nodes in the starting bucket @@ -937,6 +969,7 @@ class NodeGraph { limit: remainingLimit, }, )) { + ctx.signal.throwIfAborted(); nodes.push(nodeEntry); } } @@ -953,6 +986,7 @@ class NodeGraph { limit: remainingLimit, }, )) { + ctx.signal.throwIfAborted(); nodes.push(nodeEntry); } } @@ -969,6 +1003,7 @@ class NodeGraph { undefined, undefined, tran, + ctx, ); // Pop off elements of the same bucket to avoid duplicates let element = nodes.pop(); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 363ad2597..628dbcb07 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -44,7 +44,11 @@ import Logger from '@matrixai/logger'; import { ready, StartStop } from '@matrixai/async-init/dist/StartStop'; import { Lock, LockBox, Semaphore } from '@matrixai/async-locks'; import { IdInternal } from '@matrixai/id'; -import { context, timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { + context, + timed, + timedCancellable, +} from '@matrixai/contexts/dist/decorators'; import * as nodesUtils from './utils'; import * as nodesEvents from './events'; import * as nodesErrors from './errors'; @@ -195,7 +199,7 @@ class NodeManager { } if (connectionCount > 0) { this.logger.debug('triggering bucket refresh for bucket 255'); - await this.updateRefreshBucketDelay(255, 0); + await this.updateRefreshBucketDelay(255, 0, undefined, undefined, ctx); } try { this.logger.debug( @@ -285,6 +289,7 @@ class NodeManager { // Getting the closest node from the `NodeGraph` let bucketIndex: number | undefined; for await (const bucket of this.nodeGraph.getBuckets('distance', 'asc')) { + if (ctx.signal.aborted) return; bucketIndex = bucket[0]; } // If no buckets then end here @@ -471,7 +476,7 @@ class NodeManager { */ public acquireConnection( nodeId: NodeId, - ctx?: Partial, + ctx: ContextTimed, ): ResourceAcquire { if (this.keyRing.getNodeId().equals(nodeId)) { throw new nodesErrors.ErrorNodeManagerNodeIdOwn(); @@ -483,18 +488,16 @@ class NodeManager { // Checking if connection already exists if (!this.nodeConnectionManager.hasConnection(nodeId)) { // Establish the connection - const result = await this.findNode( - { - nodeId: nodeId, - }, - ctx, - ); + const result = await this.findNode({ nodeId: nodeId }, ctx); if (result == null) { throw new nodesErrors.ErrorNodeManagerConnectionFailed(); } } // Initiate authentication and await - return await this.nodeConnectionManager.acquireConnection(nodeId)(); + return await this.nodeConnectionManager.acquireConnection( + nodeId, + ctx, + )(); }, ); }; @@ -537,11 +540,17 @@ class NodeManager { * @param g Generator function to handle communication * @param ctx */ + public withConnG( + nodeId: NodeId, + ctx: Partial | undefined, + g: (conn: NodeConnection) => AsyncGenerator, + ): AsyncGenerator; @ready(new nodesErrors.ErrorNodeManagerNotRunning()) + @timed() public async *withConnG( nodeId: NodeId, + @context ctx: ContextTimed, g: (conn: NodeConnection) => AsyncGenerator, - ctx?: Partial, ): AsyncGenerator { const acquire = this.acquireConnection(nodeId, ctx); const [release, conn] = await acquire(); @@ -847,6 +856,8 @@ class NodeManager { ] of await this.nodeGraph.getClosestNodes( nodeId, this.nodeGraph.nodeBucketLimit, + undefined, + ctx, )) { nodeConnectionsQueue.queueNodeDirect(nodeIdTarget, nodeContact); } @@ -975,13 +986,17 @@ class NodeManager { if (service == null) { // Setup promises const { p: endedP, resolveP: resolveEndedP } = utils.promise(); + const { + p: serviceP, + resolveP: resolveServiceP, + rejectP: rejectServiceP, + } = utils.promise(); const abortHandler = () => { resolveEndedP(); + rejectServiceP(); }; ctx.signal.addEventListener('abort', abortHandler, { once: true }); ctx.timer.catch(() => {}).finally(() => abortHandler()); - const { p: serviceP, resolveP: resolveServiceP } = - utils.promise(); const handleEventMDNSService = (evt: mdnsEvents.EventMDNSService) => { if (evt.detail.name === encodedNodeId) { resolveServiceP(evt.detail); @@ -1108,7 +1123,7 @@ class NodeManager { nodeConnectionsQueue: NodeConnectionQueue, ctx: ContextTimed, ) { - await this.nodeConnectionManager.withConnF(nodeId, async (conn) => { + await this.nodeConnectionManager.withConnF(nodeId, ctx, async (conn) => { const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget); const closestConnectionsRequestP = (async () => { const resultStream = @@ -1239,45 +1254,60 @@ class NodeManager { @context ctx: ContextTimed, ): Promise> { // Verify the node's chain with its own public key - return await this.withConnF(targetNodeId, ctx, async (connection) => { - const claims: Record = {}; - const client = connection.getClient(); - for await (const agentClaim of await client.methods.nodesClaimsGet({ - claimIdEncoded: - claimId != null - ? claimsUtils.encodeClaimId(claimId) - : ('' as ClaimIdEncoded), - })) { - if (ctx.signal.aborted) throw ctx.signal.reason; - // Need to re-construct each claim - const claimId: ClaimId = claimsUtils.decodeClaimId( - agentClaim.claimIdEncoded, - )!; - const signedClaimEncoded = agentClaim.signedTokenEncoded; - const signedClaim = claimsUtils.parseSignedClaim(signedClaimEncoded); - // Verifying the claim - const issPublicKey = keysUtils.publicKeyFromNodeId( - nodesUtils.decodeNodeId(signedClaim.payload.iss)!, - ); - const subPublicKey = - signedClaim.payload.typ === 'node' - ? keysUtils.publicKeyFromNodeId( - nodesUtils.decodeNodeId(signedClaim.payload.iss)!, - ) - : null; - const token = Token.fromSigned(signedClaim); - if (!token.verifyWithPublicKey(issPublicKey)) { - this.logger.warn('Failed to verify issuing node'); - continue; - } - if (subPublicKey != null && !token.verifyWithPublicKey(subPublicKey)) { - this.logger.warn('Failed to verify subject node'); - continue; + this.logger.error('DATA about to get connection'); + try { + return await this.withConnF(targetNodeId, ctx, async (connection) => { + const claims: Record = {}; + const client = connection.getClient(); + this.logger.error('DATA before rpc'); + for await (const agentClaim of await client.methods.nodesClaimsGet( + { + claimIdEncoded: + claimId != null + ? claimsUtils.encodeClaimId(claimId) + : ('' as ClaimIdEncoded), + }, + ctx, + )) { + this.logger.error('DATA in rpc'); + ctx.signal.throwIfAborted(); + // Need to re-construct each claim + const claimId: ClaimId = claimsUtils.decodeClaimId( + agentClaim.claimIdEncoded, + )!; + const signedClaimEncoded = agentClaim.signedTokenEncoded; + const signedClaim = claimsUtils.parseSignedClaim(signedClaimEncoded); + // Verifying the claim + const issPublicKey = keysUtils.publicKeyFromNodeId( + nodesUtils.decodeNodeId(signedClaim.payload.iss)!, + ); + const subPublicKey = + signedClaim.payload.typ === 'node' + ? keysUtils.publicKeyFromNodeId( + nodesUtils.decodeNodeId(signedClaim.payload.iss)!, + ) + : null; + const token = Token.fromSigned(signedClaim); + if (!token.verifyWithPublicKey(issPublicKey)) { + this.logger.warn('Failed to verify issuing node'); + continue; + } + if ( + subPublicKey != null && + !token.verifyWithPublicKey(subPublicKey) + ) { + this.logger.warn('Failed to verify subject node'); + continue; + } + claims[claimId] = signedClaim; } - claims[claimId] = signedClaim; - } - return claims; - }); + this.logger.error('DATA after logger'); + return claims; + }); + } catch (e) { + this.logger.error('DATA FAIL:', e); + throw e; + } } /** @@ -2112,16 +2142,25 @@ class NodeManager { await Promise.allSettled(taskPs); } + public async updateRefreshBucketDelay( + bucketIndex: number, + delay?: number, + lazy?: boolean, + tran?: DBTransaction, + ctx?: Partial, + ): Promise; @ready(new nodesErrors.ErrorNodeManagerNotRunning(), true, ['stopping']) + @timedCancellable(true) public async updateRefreshBucketDelay( bucketIndex: number, delay: number = this.refreshBucketDelayTime, lazy: boolean = true, - tran?: DBTransaction, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => - this.updateRefreshBucketDelay(bucketIndex, delay, lazy, tran), + this.updateRefreshBucketDelay(bucketIndex, delay, lazy, tran, ctx), ); } @@ -2137,6 +2176,7 @@ class NodeManager { [this.tasksPath, this.refreshBucketHandlerId, `${bucketIndex}`], tran, )) { + ctx.signal.throwIfAborted(); if (!existingTask) { foundTask = task; // Update the first one diff --git a/src/tasks/TaskManager.ts b/src/tasks/TaskManager.ts index 208a08e31..c5d1ffcfc 100644 --- a/src/tasks/TaskManager.ts +++ b/src/tasks/TaskManager.ts @@ -1,3 +1,4 @@ +import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import type { DB, DBTransaction, LevelPath, KeyPath } from '@matrixai/db'; import type { ResourceRelease } from '@matrixai/resources'; import type { @@ -22,6 +23,11 @@ import { import { Lock } from '@matrixai/async-locks'; import { PromiseCancellable } from '@matrixai/async-cancellable'; import { extractTs } from '@matrixai/id/dist/IdSortable'; +import { + context, + timed, + timedCancellable +} from "@matrixai/contexts/dist/decorators"; import { Timer } from '@matrixai/timer'; import TaskEvent from './TaskEvent'; import * as tasksUtils from './utils'; @@ -382,8 +388,8 @@ class TaskManager { return { id: taskId, status: taskStatus!, - promise, - cancel, + promise: promise, + cancel: cancel, handlerId: taskData.handlerId, parameters: taskData.parameters, delay: tasksUtils.fromDelay(taskData.delay), @@ -395,16 +401,25 @@ class TaskManager { }; } + public getTasks( + order?: 'asc' | 'desc', + lazy?: boolean, + path?: TaskPath, + tran?: DBTransaction, + ctx?: Partial, + ): AsyncGenerator; @ready(new tasksErrors.ErrorTaskManagerNotRunning()) + @timed() public async *getTasks( order: 'asc' | 'desc' = 'asc', lazy: boolean = false, - path?: TaskPath, - tran?: DBTransaction, + path: TaskPath | undefined, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): AsyncGenerator { if (tran == null) { return yield* this.db.withTransactionG((tran) => - this.getTasks(order, lazy, path, tran), + this.getTasks(order, lazy, path, tran, ctx), ); } if (path == null) { @@ -412,6 +427,7 @@ class TaskManager { [...this.tasksTaskDbPath], { values: false, reverse: order !== 'asc' }, )) { + ctx.signal.throwIfAborted(); const taskId = IdInternal.fromBuffer(taskIdBuffer as Buffer); const task = (await this.getTask(taskId, lazy, tran))!; yield task; diff --git a/src/vaults/VaultManager.ts b/src/vaults/VaultManager.ts index 62c6b2c00..6b760d3c2 100644 --- a/src/vaults/VaultManager.ts +++ b/src/vaults/VaultManager.ts @@ -978,6 +978,7 @@ class VaultManager { // Create a connection to another node return yield* this.nodeManager.withConnG( targetNodeId, + ctx, async function* (connection): AsyncGenerator<{ vaultName: VaultName; vaultIdEncoded: VaultIdEncoded; @@ -994,7 +995,6 @@ class VaultManager { }; } }, - ctx, ); } From 584af7fbe360793ce2b9a10c65746b0ee2b4de42 Mon Sep 17 00:00:00 2001 From: Aryan Jassal Date: Tue, 11 Feb 2025 13:15:43 +1100 Subject: [PATCH 2/2] chore: resolved failing NodeManager tests chore: resolved failing Discovery tests chore: resolved failing Nodes tests --- src/discovery/Discovery.ts | 107 +++--------------- src/nodes/NodeConnectionManager.ts | 51 +++++---- src/nodes/NodeGraph.ts | 21 ++-- src/nodes/NodeManager.ts | 94 +++++++-------- src/tasks/TaskManager.ts | 22 +--- src/vaults/VaultManager.ts | 2 +- tests/discovery/Discovery.test.ts | 11 +- tests/nodes/NodeConnectionManager.test.ts | 18 +++ tests/nodes/NodeManager.test.ts | 13 ++- .../nodesClosestActiveConnectionsGet.test.ts | 1 + tests/vaults/VaultInternal.test.ts | 12 +- 11 files changed, 143 insertions(+), 209 deletions(-) diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index a9aa2c59c..d5f3865bd 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -153,13 +153,11 @@ class Discovery { parent: GestaltIdEncoded | null, ) => { try { - this.logger.error('out here 1') await this.processVertex( vertex, lastProcessedCutoffTime ?? undefined, ctx, ); - this.logger.error('out here 2') this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexProcessed({ detail: { @@ -171,23 +169,18 @@ class Discovery { } catch (e) { // We need to reschedule if the task was cancelled due to discovery domain stopping if (e === discoveryStoppingTaskReason) { - this.logger.error('out here 3') // We need to recreate the task for the vertex const vertexId = gestaltsUtils.decodeGestaltId(vertex); if (vertexId == null) { never(`failed to decode vertex GestaltId "${vertex}"`); } - this.logger.error('out here 4') await this.scheduleDiscoveryForVertex( vertexId, undefined, undefined, gestaltsUtils.decodeGestaltId(parent ?? undefined), true, - undefined, - ctx, ); - this.logger.error('out here 5') return; } // Aborting a duplicate task is not an error @@ -213,7 +206,9 @@ class Discovery { /** * This handler is run periodically to check if nodes are ready to be rediscovered */ - protected checkRediscoveryHandler: TaskHandler = async (ctx: ContextTimed) => { + protected checkRediscoveryHandler: TaskHandler = async ( + ctx: ContextTimed, + ) => { await this.checkRediscovery( Date.now() - this.rediscoverVertexThresholdTime, undefined, @@ -337,26 +332,15 @@ class Discovery { /** * Queues a node for discovery. Internally calls `pushKeyToDiscoveryQueue`. */ - public async queueDiscoveryByNode( - nodeId: NodeId, - lastProcessedCutoffTime?: number, - ctx?: Partial, - ): Promise; @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) - @timedCancellable(true) public async queueDiscoveryByNode( nodeId: NodeId, - lastProcessedCutoffTime: number | undefined, - @context ctx: ContextTimed, + lastProcessedCutoffTime?: number, ): Promise { await this.scheduleDiscoveryForVertex( ['node', nodeId], undefined, lastProcessedCutoffTime, - undefined, - undefined, - undefined, - ctx, ); } @@ -364,28 +348,16 @@ class Discovery { * Queues an identity for discovery. Internally calls * `pushKeyToDiscoveryQueue`. */ - public async queueDiscoveryByIdentity( - providerId: ProviderId, - identityId: IdentityId, - lastProcessedCutoffTime?: number, - ctx?: Partial - ): Promise; @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) - @timedCancellable(true) public async queueDiscoveryByIdentity( providerId: ProviderId, identityId: IdentityId, - lastProcessedCutoffTime: number | undefined, - @context ctx: ContextTimed, + lastProcessedCutoffTime?: number, ): Promise { await this.scheduleDiscoveryForVertex( ['identity', [providerId, identityId]], undefined, lastProcessedCutoffTime, - undefined, - undefined, - undefined, - ctx, ); } @@ -439,17 +411,9 @@ class Discovery { const [type, id] = vertexId; switch (type) { case 'node': - this.logger.error('processnode before') - // return await this.processNode(id, lastProcessedCutoffTime, ctx); - const val1 = await this.processNode(id, lastProcessedCutoffTime, ctx); - this.logger.error('processnode after') - return val1 + return await this.processNode(id, lastProcessedCutoffTime, ctx); case 'identity': - this.logger.error('processidentity before') - // return await this.processIdentity(id, lastProcessedCutoffTime, ctx); - const val2 = await this.processIdentity(id, lastProcessedCutoffTime, ctx); - this.logger.error('processidentity after') - return val2 + return await this.processIdentity(id, lastProcessedCutoffTime, ctx); default: never(`type must be either "node" or "identity" got "${type}"`); } @@ -466,34 +430,26 @@ class Discovery { if (nodeId.equals(this.keyRing.getNodeId())) { // Skip our own nodeId, we actively add this information when it changes, // so there is no need to scan it. - this.logger.error('before processed tiem') await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, ); - this.logger.error('after processed tiem') return; } - this.logger.error('before claim') const newestClaimId = await this.gestaltGraph.getClaimIdNewest(nodeId); - this.logger.error('after claim') // The sigChain data of the vertex (containing all cryptolinks) let vertexChainData: Record = {}; try { - this.logger.error('before chain') vertexChainData = await this.nodeManager.requestChainData( nodeId, newestClaimId, ctx, ); - this.logger.error('after chain') } catch (e) { - this.logger.error('before chain time error') await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, ); - this.logger.error('after chain time error') // Not strictly an error in this case, we can fail to connect this.logger.info( `Failed to discover ${nodesUtils.encodeNodeId( @@ -504,27 +460,21 @@ class Discovery { } // Iterate over each of the claims in the chain (already verified). for (const signedClaim of Object.values(vertexChainData)) { - ctx.signal.throwIfAborted(); switch (signedClaim.payload.typ) { case 'ClaimLinkNode': - this.logger.error('claimlinknode before') await this.processClaimLinkNode( signedClaim as SignedClaim, nodeId, lastProcessedCutoffTime, - ctx, ); - this.logger.error('claimlinknode after') break; case 'ClaimLinkIdentity': - this.logger.error('claimlinkidentity before') await this.processClaimLinkIdentity( signedClaim as SignedClaim, nodeId, lastProcessedCutoffTime, ctx, ); - this.logger.error('claimlinkidentity after') break; default: never( @@ -532,24 +482,20 @@ class Discovery { ); } } - this.logger.error('setvertex time before') await this.gestaltGraph.setVertexProcessedTime( gestaltNodeId, processedTime, ); - this.logger.error('setvertex time after') } protected async processClaimLinkNode( signedClaim: SignedClaim, nodeId: NodeId, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, - ctx: ContextTimed, ): Promise { // Get the chain data of the linked node // Could be node1 or node2 in the claim so get the one that's // not equal to nodeId from above - this.logger.error('processClaimLinkNode 1') const node1Id = nodesUtils.decodeNodeId(signedClaim.payload.iss); if (node1Id == null) { never(`failed to decode issuer NodeId "${signedClaim.payload.iss}"`); @@ -558,7 +504,6 @@ class Discovery { if (node2Id == null) { never(`failed to decode subject NodeId "${signedClaim.payload.sub}"`); } - this.logger.error('processClaimLinkNode 2') // Verify the claim const node1PublicKey = keysUtils.publicKeyFromNodeId(node1Id); const node2PublicKey = keysUtils.publicKeyFromNodeId(node2Id); @@ -572,12 +517,10 @@ class Discovery { ); return; } - this.logger.error('processClaimLinkNode 3') const linkedNodeId = node1Id.equals(nodeId) ? node2Id : node1Id; const linkedVertexNodeInfo: GestaltNodeInfo = { nodeId: linkedNodeId, }; - this.logger.error('processClaimLinkNode 4') await this.gestaltGraph.linkNodeAndNode( { nodeId, @@ -588,7 +531,6 @@ class Discovery { meta: {}, }, ); - this.logger.error('processClaimLinkNode 5') const claimId = decodeClaimId(signedClaim.payload.jti); if (claimId == null) { never(`failed to decode claimId "${signedClaim.payload.jti}"`); @@ -602,15 +544,11 @@ class Discovery { lastProcessedCutoffTime, )) ) { - this.logger.error('processClaimLinkNode 7') await this.scheduleDiscoveryForVertex( linkedGestaltId, undefined, lastProcessedCutoffTime, ['node', nodeId], - undefined, - undefined, - ctx, ); } } @@ -714,9 +652,6 @@ class Discovery { undefined, lastProcessedCutoffTime, ['node', nodeId], - undefined, - undefined, - ctx, ); } } @@ -793,9 +728,6 @@ class Discovery { undefined, lastProcessedCutoffTime, ['identity', providerIdentityId], - undefined, - undefined, - ctx, ); } } @@ -855,13 +787,12 @@ class Discovery { */ protected async scheduleDiscoveryForVertex( vertex: GestaltId, - delay: number | undefined, - lastProcessedCutoffTime: number | undefined, - parent: GestaltId | undefined, + delay?: number, + lastProcessedCutoffTime?: number, + parent?: GestaltId, ignoreActive: boolean = false, - tran: DBTransaction | undefined, - ctx: ContextTimed, - ) { + tran?: DBTransaction, + ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => this.scheduleDiscoveryForVertex( @@ -871,7 +802,6 @@ class Discovery { parent, ignoreActive, tran, - ctx, ), ); } @@ -884,7 +814,6 @@ class Discovery { gestaltIdEncoded, ].join(''), ); - this.logger.error('here1') // Check if task exists let taskExisting: Task | null = null; for await (const task of this.taskManager.getTasks( @@ -892,17 +821,13 @@ class Discovery { true, [this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded], tran, - ctx, )) { - ctx.signal.throwIfAborted(); - this.logger.error('here2') // Ignore active tasks if (ignoreActive && task.status === 'active') continue; if (taskExisting == null) { taskExisting = task; continue; } - this.logger.error('here3') // Any extra tasks should be cancelled, this shouldn't normally happen task.cancel(abortSingletonTaskReason); this.dispatchEvent( @@ -914,13 +839,11 @@ class Discovery { }), ); } - this.logger.error('here4') // Only create if it doesn't exist if (taskExisting != null) return; this.logger.info( `Scheduling new discovery for vertex with gestaltId ${gestaltIdEncoded}`, ); - this.logger.error('here5') await this.taskManager.scheduleTask( { handlerId: this.discoverVertexHandlerId, @@ -936,7 +859,6 @@ class Discovery { }, tran, ); - this.logger.error('here6') this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexQueued({ detail: { @@ -1087,12 +1009,10 @@ class Discovery { } // Refresh timer in preparation for request ctx.timer.refresh(); - this.logger.error('verifyIdentityClaim before getClaim') const identitySignedClaim = await provider.getClaim( authIdentityId, claimId, ); - this.logger.error('verifyIdentityClaim after getClaim') if (identitySignedClaim == null) { continue; } @@ -1119,10 +1039,11 @@ class Discovery { tran?: DBTransaction, ctx?: Partial, ): Promise; + @timedCancellable(true) public async checkRediscovery( lastProcessedCutoffTime: number, tran: DBTransaction | undefined, - ctx: ContextTimed, + @context ctx: ContextTimed, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index cb9c043bc..bcafb1f6e 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -46,8 +46,8 @@ import { AbstractEvent, EventAll } from '@matrixai/events'; import { context, timed, - timedCancellable -} from "@matrixai/contexts/dist/decorators"; + timedCancellable, +} from '@matrixai/contexts/dist/decorators'; import { Semaphore } from '@matrixai/async-locks'; import { PromiseCancellable } from '@matrixai/async-cancellable'; import NodeConnection from './NodeConnection'; @@ -1479,7 +1479,11 @@ class NodeConnectionManager { port: Port; }>; @ready(new nodesErrors.ErrorNodeManagerNotRunning()) - @timedCancellable(true) + @timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) public async handleNodesConnectionSignalInitial( sourceNodeId: NodeId, targetNodeId: NodeId, @@ -1518,16 +1522,20 @@ class NodeConnectionManager { this.keyRing.keyPair, data, ); - const connectionSignalP = this.withConnF(targetNodeId, ctx, async (conn) => { - const client = conn.getClient(); - await client.methods.nodesConnectionSignalFinal({ - sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId), - targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId), - address: address, - requestSignature: requestSignature, - relaySignature: relaySignature.toString('base64url'), - }); - }) + const connectionSignalP = this.withConnF( + targetNodeId, + ctx, + async (conn) => { + const client = conn.getClient(); + await client.methods.nodesConnectionSignalFinal({ + sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId), + targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId), + address: address, + requestSignature: requestSignature, + relaySignature: relaySignature.toString('base64url'), + }); + }, + ) // Ignore results and failures, then are expected to happen and are allowed .then( () => {}, @@ -1790,25 +1798,26 @@ class NodeConnectionManager { nodeId: NodeId, ctx?: Partial, ): Promise; - @timedCancellable(true) + @timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) public async isAuthenticatedP( nodeId: NodeId, @context ctx: ContextTimed, ): Promise { + ctx.signal.throwIfAborted(); const targetNodeIdString = nodeId.toString() as NodeIdString; const connectionsEntry = this.connections.get(targetNodeIdString); if (connectionsEntry == null) { throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); } - const { p: abortP, rejectP: triggerAbort } = utils.promise(); + const { p: abortP, rejectP: rejectAbortP } = utils.promise(); const abortHandler = () => { - triggerAbort(ctx.signal.reason); + rejectAbortP(ctx.signal.reason); }; - if (ctx.signal.aborted) { - triggerAbort(ctx.signal.reason); - } else { - ctx.signal.addEventListener('abort', abortHandler, { once: true }); - } + ctx.signal.addEventListener('abort', abortHandler, { once: true }); try { return await Promise.race([connectionsEntry.authenticatedP, abortP]); } catch (e) { diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index 588b29d97..4af01f14f 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -18,13 +18,13 @@ import { ready, } from '@matrixai/async-init/dist/CreateDestroyStartStop'; import { IdInternal } from '@matrixai/id'; +import { timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { context } from '@matrixai/contexts/dist/decorators'; import * as nodesUtils from './utils'; import * as nodesErrors from './errors'; import * as nodesEvents from './events'; import * as utils from '../utils'; import config from '../config'; -import { timedCancellable } from '@matrixai/contexts/dist/decorators'; -import { context } from "@matrixai/contexts/dist/decorators"; /** * NodeGraph is an implementation of Kademlia for maintaining peer to peer @@ -253,11 +253,11 @@ class NodeGraph { public async getNodeContact( nodeId: NodeId, tran: DBTransaction | undefined, - @context ctx: ContextTimed + @context ctx: ContextTimed, ): Promise { if (tran == null) { - return await this.db.withTransactionF(async (tran) => - await this.getNodeContact(nodeId, tran, ctx), + return await this.db.withTransactionF( + async (tran) => await this.getNodeContact(nodeId, tran, ctx), ); } const [bucketIndex] = this.bucketIndex(nodeId); @@ -643,11 +643,12 @@ class NodeGraph { order: 'asc' | 'desc' = 'asc', limit: number | undefined, tran: DBTransaction | undefined, - @context ctx: ContextTimed + @context ctx: ContextTimed, ): Promise { if (tran == null) { - return await this.db.withTransactionF(async (tran) => - await this.getBucket(bucketIndex, sort, order, limit, tran, ctx), + return await this.db.withTransactionF( + async (tran) => + await this.getBucket(bucketIndex, sort, order, limit, tran, ctx), ); } if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) { @@ -921,8 +922,8 @@ class NodeGraph { @context ctx: ContextTimed, ): Promise { if (tran == null) { - return await this.db.withTransactionF(async (tran) => - await this.getClosestNodes(nodeId, limit, tran), + return await this.db.withTransactionF( + async (tran) => await this.getClosestNodes(nodeId, limit, tran), ); } // Buckets map to the target node in the following way; diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 628dbcb07..784832bd7 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -1254,60 +1254,48 @@ class NodeManager { @context ctx: ContextTimed, ): Promise> { // Verify the node's chain with its own public key - this.logger.error('DATA about to get connection'); - try { - return await this.withConnF(targetNodeId, ctx, async (connection) => { - const claims: Record = {}; - const client = connection.getClient(); - this.logger.error('DATA before rpc'); - for await (const agentClaim of await client.methods.nodesClaimsGet( - { - claimIdEncoded: - claimId != null - ? claimsUtils.encodeClaimId(claimId) - : ('' as ClaimIdEncoded), - }, - ctx, - )) { - this.logger.error('DATA in rpc'); - ctx.signal.throwIfAborted(); - // Need to re-construct each claim - const claimId: ClaimId = claimsUtils.decodeClaimId( - agentClaim.claimIdEncoded, - )!; - const signedClaimEncoded = agentClaim.signedTokenEncoded; - const signedClaim = claimsUtils.parseSignedClaim(signedClaimEncoded); - // Verifying the claim - const issPublicKey = keysUtils.publicKeyFromNodeId( - nodesUtils.decodeNodeId(signedClaim.payload.iss)!, - ); - const subPublicKey = - signedClaim.payload.typ === 'node' - ? keysUtils.publicKeyFromNodeId( - nodesUtils.decodeNodeId(signedClaim.payload.iss)!, - ) - : null; - const token = Token.fromSigned(signedClaim); - if (!token.verifyWithPublicKey(issPublicKey)) { - this.logger.warn('Failed to verify issuing node'); - continue; - } - if ( - subPublicKey != null && - !token.verifyWithPublicKey(subPublicKey) - ) { - this.logger.warn('Failed to verify subject node'); - continue; - } - claims[claimId] = signedClaim; + return await this.withConnF(targetNodeId, ctx, async (connection) => { + const claims: Record = {}; + const client = connection.getClient(); + for await (const agentClaim of await client.methods.nodesClaimsGet( + { + claimIdEncoded: + claimId != null + ? claimsUtils.encodeClaimId(claimId) + : ('' as ClaimIdEncoded), + }, + ctx, + )) { + ctx.signal.throwIfAborted(); + // Need to re-construct each claim + const claimId: ClaimId = claimsUtils.decodeClaimId( + agentClaim.claimIdEncoded, + )!; + const signedClaimEncoded = agentClaim.signedTokenEncoded; + const signedClaim = claimsUtils.parseSignedClaim(signedClaimEncoded); + // Verifying the claim + const issPublicKey = keysUtils.publicKeyFromNodeId( + nodesUtils.decodeNodeId(signedClaim.payload.iss)!, + ); + const subPublicKey = + signedClaim.payload.typ === 'node' + ? keysUtils.publicKeyFromNodeId( + nodesUtils.decodeNodeId(signedClaim.payload.iss)!, + ) + : null; + const token = Token.fromSigned(signedClaim); + if (!token.verifyWithPublicKey(issPublicKey)) { + this.logger.warn('Failed to verify issuing node'); + continue; } - this.logger.error('DATA after logger'); - return claims; - }); - } catch (e) { - this.logger.error('DATA FAIL:', e); - throw e; - } + if (subPublicKey != null && !token.verifyWithPublicKey(subPublicKey)) { + this.logger.warn('Failed to verify subject node'); + continue; + } + claims[claimId] = signedClaim; + } + return claims; + }); } /** diff --git a/src/tasks/TaskManager.ts b/src/tasks/TaskManager.ts index c5d1ffcfc..435f04db2 100644 --- a/src/tasks/TaskManager.ts +++ b/src/tasks/TaskManager.ts @@ -1,4 +1,3 @@ -import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import type { DB, DBTransaction, LevelPath, KeyPath } from '@matrixai/db'; import type { ResourceRelease } from '@matrixai/resources'; import type { @@ -23,11 +22,6 @@ import { import { Lock } from '@matrixai/async-locks'; import { PromiseCancellable } from '@matrixai/async-cancellable'; import { extractTs } from '@matrixai/id/dist/IdSortable'; -import { - context, - timed, - timedCancellable -} from "@matrixai/contexts/dist/decorators"; import { Timer } from '@matrixai/timer'; import TaskEvent from './TaskEvent'; import * as tasksUtils from './utils'; @@ -401,25 +395,16 @@ class TaskManager { }; } - public getTasks( - order?: 'asc' | 'desc', - lazy?: boolean, - path?: TaskPath, - tran?: DBTransaction, - ctx?: Partial, - ): AsyncGenerator; @ready(new tasksErrors.ErrorTaskManagerNotRunning()) - @timed() public async *getTasks( order: 'asc' | 'desc' = 'asc', lazy: boolean = false, - path: TaskPath | undefined, - tran: DBTransaction | undefined, - @context ctx: ContextTimed, + path?: TaskPath, + tran?: DBTransaction, ): AsyncGenerator { if (tran == null) { return yield* this.db.withTransactionG((tran) => - this.getTasks(order, lazy, path, tran, ctx), + this.getTasks(order, lazy, path, tran), ); } if (path == null) { @@ -427,7 +412,6 @@ class TaskManager { [...this.tasksTaskDbPath], { values: false, reverse: order !== 'asc' }, )) { - ctx.signal.throwIfAborted(); const taskId = IdInternal.fromBuffer(taskIdBuffer as Buffer); const task = (await this.getTask(taskId, lazy, tran))!; yield task; diff --git a/src/vaults/VaultManager.ts b/src/vaults/VaultManager.ts index 6b760d3c2..b1c7faf8c 100644 --- a/src/vaults/VaultManager.ts +++ b/src/vaults/VaultManager.ts @@ -540,7 +540,7 @@ class VaultManager { public async renameVault( vaultId: VaultId, newVaultName: VaultName, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index 25258ad1e..857f9e406 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -293,10 +293,17 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); await expect( - discovery.queueDiscoveryByIdentity('' as ProviderId, '' as IdentityId), + async () => + await discovery.queueDiscoveryByIdentity( + '' as ProviderId, + '' as IdentityId, + ), ).rejects.toThrow(discoveryErrors.ErrorDiscoveryNotRunning); await expect( - discovery.queueDiscoveryByNode(testNodesUtils.generateRandomNodeId()), + async () => + await discovery.queueDiscoveryByNode( + testNodesUtils.generateRandomNodeId(), + ), ).rejects.toThrow(discoveryErrors.ErrorDiscoveryNotRunning); }); test('discovery by node', async () => { diff --git a/tests/nodes/NodeConnectionManager.test.ts b/tests/nodes/NodeConnectionManager.test.ts index ef7c2ee22..483d62cb0 100644 --- a/tests/nodes/NodeConnectionManager.test.ts +++ b/tests/nodes/NodeConnectionManager.test.ts @@ -363,6 +363,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { expect(connectionAndTimer?.usageCount).toBe(1); expect(connectionAndTimer?.timer).toBeNull(); @@ -391,6 +392,7 @@ describe(`${NodeConnectionManager.name}`, () => { await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async (connection) => { expect(connection.connectionId).toBe(connectionIds[0]); }, @@ -401,6 +403,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Lowest connection is deterministically the same for the peer too await ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async (connection) => { expect(connection.connectionId).toBe(connectionIds[0]); }, @@ -421,6 +424,7 @@ describe(`${NodeConnectionManager.name}`, () => { for (const connectionId of connectionIds) { await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async (connection) => { // Should always be the lowest alive connectionId expect(connection.connectionId).toBe(connectionId); @@ -437,6 +441,7 @@ describe(`${NodeConnectionManager.name}`, () => { await expect( ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => {}, ), ).rejects.toThrow( @@ -569,6 +574,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Wait for timeout. await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { expect(ncmLocal.nodeConnectionManager.connectionsActive()).toBe(3); await connectionDestroyProm1; @@ -717,12 +723,14 @@ describe(`${NodeConnectionManager.name}`, () => { // Checking authentication result await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, ); await ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -759,6 +767,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Checking authentication result const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -769,6 +778,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP2 = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -806,6 +816,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -815,6 +826,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); const forwardAuthenticateP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -824,6 +836,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); const reverseAuthenticateP = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -861,6 +874,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -898,6 +912,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -946,6 +961,7 @@ describe(`${NodeConnectionManager.name}`, () => { const forwardAuthenticateP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -953,6 +969,7 @@ describe(`${NodeConnectionManager.name}`, () => { await expect(forwardAuthenticateP).toResolve(); const reverseAuthenticateP = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -962,6 +979,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Checking RPC again await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async (conn) => { await expect( conn.rpcClient.unaryCaller('dummyMethod', {}), diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index aba4942ed..8d6aa1e92 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -647,17 +647,21 @@ describe(`${NodeManager.name}`, () => { scopes: ['global'], }, ); + const abortController = new AbortController(); + const ctx = { signal: abortController.signal } as ContextTimed; const [resourceReleaser, nodeConnection] = - await nodeManager.acquireConnection(nodeId)(); + await nodeManager.acquireConnection(nodeId, ctx)(); expect(nodeConnection).toBeInstanceOf(NodeConnection); expect(nodeConnectionManager.hasConnection(nodeId)).toBeTrue(); await resourceReleaser(); }); test('acquire Connection fails', async () => { + const abortController = new AbortController(); + const ctx = { signal: abortController.signal } as ContextTimed; const nodeId = keyRingPeer.getNodeId(); - await expect(nodeManager.acquireConnection(nodeId)()).rejects.toThrow( - nodesErrors.ErrorNodeManagerConnectionFailed, - ); + await expect( + nodeManager.acquireConnection(nodeId, ctx)(), + ).rejects.toThrow(nodesErrors.ErrorNodeManagerConnectionFailed); }); test('withConnF', async () => { const nodeId = keyRingPeer.getNodeId(); @@ -695,6 +699,7 @@ describe(`${NodeManager.name}`, () => { const gen = nodeManager.withConnG( nodeId, + undefined, async function* ( conn, ): AsyncGenerator { diff --git a/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts b/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts index 1622f48f9..76f85f23b 100644 --- a/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts +++ b/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts @@ -161,6 +161,7 @@ describe('nodesClosestLocalNode', () => { const results = await nodeConnectionManagerLocal.withConnF( nodeIdPeer1, + undefined, async () => { const resultStream = await connection.rpcClient.methods.nodesClosestActiveConnectionsGet({ diff --git a/tests/vaults/VaultInternal.test.ts b/tests/vaults/VaultInternal.test.ts index 4a1ad59e5..aacc9c7c9 100644 --- a/tests/vaults/VaultInternal.test.ts +++ b/tests/vaults/VaultInternal.test.ts @@ -117,13 +117,13 @@ describe('VaultInternal', () => { test('VaultInternal readiness', async () => { await vault.stop(); - await expect(async () => { - await vault.log(); - }).rejects.toThrow(vaultsErrors.ErrorVaultNotRunning); + await expect(async () => await vault.log()).rejects.toThrow( + vaultsErrors.ErrorVaultNotRunning, + ); await vault.destroy(); - await expect(async () => { - await vault.start(); - }).rejects.toThrow(vaultsErrors.ErrorVaultDestroyed); + await expect(async () => await vault.start()).rejects.toThrow( + vaultsErrors.ErrorVaultDestroyed, + ); }); test('is type correct', async () => { expect(vault).toBeInstanceOf(VaultInternal);