diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index d4849ef84..6c05ea386 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -1,4 +1,4 @@ -import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; +import type { DB, DBTransaction } from '@matrixai/db'; import type { PromiseCancellable } from '@matrixai/async-cancellable'; import type { ContextTimed } from '@matrixai/contexts'; import type { NodeId } from '../nodes/types'; @@ -18,7 +18,7 @@ import type { ProviderIdentityId, } from '../identities/types'; import type KeyRing from '../keys/KeyRing'; -import type { ClaimId, ClaimIdEncoded, SignedClaim } from '../claims/types'; +import type { ClaimIdEncoded, SignedClaim } from '../claims/types'; import type TaskManager from '../tasks/TaskManager'; import type { Task, TaskHandler, TaskHandlerId } from '../tasks/types'; import type { ClaimLinkIdentity, ClaimLinkNode } from '../claims/payloads'; @@ -35,7 +35,6 @@ import * as tasksErrors from '../tasks/errors'; import * as gestaltsUtils from '../gestalts/utils'; import * as nodesUtils from '../nodes/utils'; import * as keysUtils from '../keys/utils'; -import * as utils from '../utils'; import { never } from '../utils'; import Token from '../tokens/Token'; import { decodeClaimId } from '../ids'; @@ -77,7 +76,10 @@ class Discovery { nodeManager, taskManager, discoverVertexTimeoutTime = 2000, + rediscoverCheckIntervalTime = 60 * 60 * 1000, // 1 hour + rediscoverVertexDelayTime = 60 * 60 * 1000, // 1 hour rediscoverSkipTime = 60 * 60 * 1000, // 1 hour + staleVertexThresholdTime = 3 * 24 * 60 * 60 * 1000, // 3 days logger = new Logger(this.name), fresh = false, }: { @@ -88,7 +90,10 @@ class Discovery { nodeManager: NodeManager; taskManager: TaskManager; discoverVertexTimeoutTime?: number; + rediscoverCheckIntervalTime?: number; + rediscoverVertexDelayTime?: number; rediscoverSkipTime?: number; + staleVertexThresholdTime?: number; logger?: Logger; fresh?: boolean; }): Promise { @@ -101,7 +106,10 @@ class Discovery { nodeManager, taskManager, discoverVertexTimeoutTime, + rediscoverCheckIntervalTime, + rediscoverVertexDelayTime, rediscoverSkipTime, + staleVertexThresholdTime, logger, }); await discovery.start({ fresh }); @@ -118,26 +126,21 @@ class Discovery { protected taskManager: TaskManager; protected discoverVertexTimeoutTime: number; /** - * The time since a vertex has been processed where re processing will be skipped + * Interval delay used when checking for nodes to rediscover */ - protected rediscoverSkipTime: number; - protected discoveryDbPath: LevelPath = [this.constructor.name]; + protected rediscoverCheckIntervalTime: number; + /** + * The threshold used when deciding to rediscover a vertex based on how long ago it was processed + */ + protected rediscoverVertexThresholdTime: number; /** - * Last processed collection - * `Discovery/lastProcessed/{GestaltIdEncoded} -> number` + * The time since a vertex has been processed where re processing will be skipped */ - protected lastProcessedPath: LevelPath = [ - ...this.discoveryDbPath, - 'lastProcessed', - ]; + protected rediscoverSkipTime: number; /** - * Last processed collection - * `Discovery/lastProcessed/{GestaltIdEncoded} -> number` + * The time threshold for */ - protected lastProcessedOrderPath: LevelPath = [ - ...this.discoveryDbPath, - 'lastProcessedOrder', - ]; + protected staleVertexThresholdTime: number; protected discoverVertexHandler: TaskHandler = async ( ctx, @@ -172,6 +175,23 @@ class Discovery { public readonly discoverVertexHandlerId = `${this.constructor.name}.discoverVertexHandler` as TaskHandlerId; + /** + * This handler is run periodically to check if nodes are ready to be rediscovered + */ + protected checkRediscoveryHandler: TaskHandler = async () => { + await this.checkRediscovery( + Date.now() - this.rediscoverVertexThresholdTime, + ); + await this.taskManager.scheduleTask({ + handlerId: this.discoverVertexHandlerId, + path: [this.constructor.name, this.checkRediscoveryHandlerId], + lazy: true, + delay: this.rediscoverCheckIntervalTime, + }); + }; + public readonly checkRediscoveryHandlerId = + `${this.constructor.name}.checkForRediscoveryHandler` as TaskHandlerId; + public constructor({ keyRing, db, @@ -180,7 +200,10 @@ class Discovery { nodeManager, taskManager, discoverVertexTimeoutTime, + rediscoverCheckIntervalTime, + rediscoverVertexDelayTime, rediscoverSkipTime, + staleVertexThresholdTime, logger, }: { db: DB; @@ -190,7 +213,10 @@ class Discovery { nodeManager: NodeManager; taskManager: TaskManager; discoverVertexTimeoutTime: number; + rediscoverCheckIntervalTime: number; + rediscoverVertexDelayTime: number; rediscoverSkipTime: number; + staleVertexThresholdTime: number; logger: Logger; }) { this.db = db; @@ -200,7 +226,10 @@ class Discovery { this.nodeManager = nodeManager; this.taskManager = taskManager; this.discoverVertexTimeoutTime = discoverVertexTimeoutTime; + this.rediscoverCheckIntervalTime = rediscoverCheckIntervalTime; + this.rediscoverVertexThresholdTime = rediscoverVertexDelayTime; this.rediscoverSkipTime = rediscoverSkipTime; + this.staleVertexThresholdTime = staleVertexThresholdTime; this.logger = logger; } @@ -222,6 +251,13 @@ class Discovery { this.discoverVertexHandlerId, this.discoverVertexHandler, ); + // Start up rediscovery task + await this.taskManager.scheduleTask({ + handlerId: this.discoverVertexHandlerId, + path: [this.constructor.name, this.checkRediscoveryHandlerId], + lazy: true, + delay: this.rediscoverCheckIntervalTime, + }); this.logger.info(`Started ${this.constructor.name}`); } @@ -288,8 +324,6 @@ class Discovery { ); } - // Fixme, when processing a vertex, we need to check existing links in the - // GestaltGraph and ask for claims newer than that protected processVertex( vertex: GestaltIdEncoded, lastProcessedCutoffTime?: number, @@ -326,25 +360,13 @@ class Discovery { if (nodeId.equals(this.keyRing.getNodeId())) { // Skip our own nodeId, we actively add this information when it changes, // so there is no need to scan it. - await this.setLastProcessed(gestaltNodeId, processedTime); + await this.gestaltGraph.setVertexProcessedTime( + gestaltNodeId, + processedTime, + ); return; } - // Get the oldest known claim for this node - // get the oldest one - let newestClaimId: ClaimId | undefined = undefined; - for await (const [, gestaltLink] of this.gestaltGraph.getLinks([ - 'node', - nodeId, - ])) { - const claimIdEncoded = gestaltLink[1].claim.payload.jti; - const claimId = decodeClaimId(claimIdEncoded); - if (claimId == null) never(); - if (newestClaimId == null) { - newestClaimId = claimId; - } else if (Buffer.compare(newestClaimId, claimId) === -1) { - newestClaimId = claimId; - } - } + const newestClaimId = await this.gestaltGraph.getClaimIdNewest(nodeId); // The sigChain data of the vertex (containing all cryptolinks) let vertexChainData: Record = {}; try { @@ -354,7 +376,10 @@ class Discovery { ctx, ); } catch (e) { - await this.setLastProcessed(gestaltNodeId, processedTime); + await this.gestaltGraph.setVertexProcessedTime( + gestaltNodeId, + processedTime, + ); // Not strictly an error in this case, we can fail to connect this.logger.info( `Failed to discover ${nodesUtils.encodeNodeId( @@ -363,20 +388,12 @@ class Discovery { ); return; } - // TODO: for now, the chain data is treated as a 'disjoint' set of - // cryptolink claims from a node to another node/identity. - // That is, we have no notion of revocations, or multiple claims to - // the same node/identity. Thus, we simply iterate over this chain - // of cryptolinks. // Iterate over each of the claims in the chain (already verified). - // TODO: there is no deterministic iteration order of keys in a record. - // When we change to iterating over ordered sigchain claims, - // this must change into array iteration. for (const signedClaim of Object.values(vertexChainData)) { if (ctx.signal.aborted) throw ctx.signal.reason; switch (signedClaim.payload.typ) { case 'ClaimLinkNode': - await this.procesessClaimLinkNode( + await this.processClaimLinkNode( signedClaim, nodeId, lastProcessedCutoffTime, @@ -394,10 +411,13 @@ class Discovery { never(); } } - await this.setLastProcessed(gestaltNodeId, processedTime); + await this.gestaltGraph.setVertexProcessedTime( + gestaltNodeId, + processedTime, + ); } - protected async procesessClaimLinkNode( + protected async processClaimLinkNode( signedClaim: SignedClaim, nodeId: NodeId, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, @@ -436,6 +456,9 @@ class Discovery { meta: {}, }, ); + const claimId = decodeClaimId(signedClaim.payload.jti); + if (claimId == null) never(); + await this.gestaltGraph.setClaimIdNewest(nodeId, claimId); // Add this vertex to the queue if it hasn't already been visited const linkedGestaltId: GestaltId = ['node', linkedVertexNodeId]; if ( @@ -518,6 +541,9 @@ class Discovery { }, }, ); + const claimId = decodeClaimId(signedClaim.payload.jti); + if (claimId == null) never(); + await this.gestaltGraph.setClaimIdNewest(nodeId, claimId); // Add this identity vertex to the queue if it is not present const providerIdentityId = JSON.parse(signedClaim.payload.sub!); const identityGestaltId: GestaltId = ['identity', providerIdentityId]; @@ -593,7 +619,10 @@ class Discovery { ); } } - await this.setLastProcessed(['identity', providerIdentityId], Date.now()); + await this.gestaltGraph.setVertexProcessedTime( + ['identity', providerIdentityId], + Date.now(), + ); } /** @@ -680,24 +709,22 @@ class Discovery { task.cancel(abortSingletonTaskReason); } // Only create if it doesn't exist - if (!taskExisting) { - // Otherwise create a new task if none exists - await this.taskManager.scheduleTask( - { - handlerId: this.discoverVertexHandlerId, - parameters: [gestaltIdEncoded, lastProcessedCutoffTime], - path: [ - this.constructor.name, - this.discoverVertexHandlerId, - gestaltIdEncoded, - ], - lazy: true, - deadline: this.discoverVertexTimeoutTime, - delay, - }, - tran, - ); - } + if (taskExisting != null) return; + await this.taskManager.scheduleTask( + { + handlerId: this.discoverVertexHandlerId, + parameters: [gestaltIdEncoded, lastProcessedCutoffTime], + path: [ + this.constructor.name, + this.discoverVertexHandlerId, + gestaltIdEncoded, + ], + lazy: true, + deadline: this.discoverVertexTimeoutTime, + delay, + }, + tran, + ); } /** @@ -783,122 +810,82 @@ class Discovery { } /** - * Updates the last processed time in the database for the given vertex - */ - protected async setLastProcessed( - vertex: GestaltId, - processedTime: number, - tran?: DBTransaction, - ): Promise { - if (tran == null) { - return this.db.withTransactionF((tran) => - this.setLastProcessed(vertex, processedTime, tran), - ); - } - - const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); - await tran.lock( - [ - this.constructor.name, - this.discoverVertexHandlerId, - gestaltIdEncoded, - ].join(''), - ); - - await tran.put( - [...this.lastProcessedPath, gestaltIdEncoded], - processedTime, - ); - await tran.put( - [ - ...this.lastProcessedOrderPath, - utils.lexiPackBuffer(processedTime), - gestaltIdEncoded, - ], - gestaltIdEncoded, - ); - } - - /** - * Removes the last processed time for a vertex + * Checks previously discovered vertices for ones to be re-added back on to the queue */ - protected async unsetLastProcessed( - vertex: GestaltId, + public async checkRediscovery( + lastProcessedCutoffTime: number, tran?: DBTransaction, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => - this.unsetLastProcessed(vertex, tran), + this.checkRediscovery(lastProcessedCutoffTime, tran), ); } - const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); - await tran.lock( - [ + const staleVertexCutoff = Date.now() - this.staleVertexThresholdTime; + const gestaltIds: Array<[GestaltIdEncoded, number]> = []; + for await (const [ + gestaltId, + lastProcessedTime, + ] of this.gestaltGraph.getVertexProcessedTimes( + { + order: 'asc', + seek: lastProcessedCutoffTime, + }, + tran, + )) { + gestaltIds.push([ + gestaltsUtils.encodeGestaltId(gestaltId), + lastProcessedTime, + ]); + } + // We want to lock all the ids at once before moving ahead + const locks = gestaltIds.map((gestaltIdEncoded) => { + return [ this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded, - ].join(''), - ); - - const processedTime = await tran.get([ - ...this.lastProcessedPath, - gestaltIdEncoded, - ]); - if (processedTime == null) return; - await tran.del([...this.lastProcessedPath, gestaltIdEncoded]); - await tran.del([ - ...this.lastProcessedOrderPath, - utils.lexiPackBuffer(processedTime), - gestaltIdEncoded, - ]); - } - - /** - * Gets the last processed time for a vertex - */ - protected async getLastProcessedTime( - vertex: GestaltId, - tran?: DBTransaction, - ): Promise { - if (tran == null) { - return this.db.withTransactionF((tran) => - this.getLastProcessedTime(vertex, tran), - ); - } - - const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); - return await tran.get([ - ...this.lastProcessedPath, - gestaltIdEncoded, - ]); - } - - /** - * Gets the last processed time for a vertex - */ - protected async *getLastProcessedTimes( - order: 'asc' | 'desc' = 'asc', - tran?: DBTransaction, - ): AsyncGenerator<[GestaltId, number]> { - if (tran == null) { - return yield* this.db.withTransactionG((tran) => - this.getLastProcessedTimes(order, tran), - ); - } - - const iterator = tran.iterator(this.lastProcessedOrderPath, { - valueAsBuffer: false, - reverse: order !== 'asc', + ].join(''); }); - for await (const [path, gestaltIdEncoded] of iterator) { - const lastProcessedTime = utils.lexiUnpackBuffer(path[0] as Buffer); - if (lastProcessedTime == null) { - never('lastProcessedTime should be valid here'); + await tran.lock(...locks); + // Schedule a new task for each vertex if it doesn't already exist + for (const [gestaltIdEncoded, lastProcessedTime] of gestaltIds) { + // If we exceed an age threshold then we just remove the vertex information + if (lastProcessedTime < staleVertexCutoff) { + await this.gestaltGraph.unsetVertexProcessedTime( + gestaltsUtils.decodeGestaltId(gestaltIdEncoded)!, + ); + } + let taskExisting: Task | null = null; + for await (const task of this.taskManager.getTasks( + 'asc', + true, + [this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded], + tran, + )) { + if (taskExisting == null) { + taskExisting = task; + continue; + } + // Any extra tasks should be cancelled, this shouldn't normally happen + task.cancel(abortSingletonTaskReason); } - const gestaltId = gestaltsUtils.decodeGestaltId(gestaltIdEncoded); - if (gestaltId == null) never('GestaltId should be valid here'); - yield [gestaltId, lastProcessedTime]; + if (taskExisting != null) continue; + // Schedule a new task + await this.taskManager.scheduleTask( + { + handlerId: this.discoverVertexHandlerId, + parameters: [gestaltIdEncoded, lastProcessedCutoffTime], + path: [ + this.constructor.name, + this.discoverVertexHandlerId, + gestaltIdEncoded, + ], + lazy: true, + deadline: this.discoverVertexTimeoutTime, + }, + tran, + ); } } @@ -911,7 +898,7 @@ class Discovery { tran?: DBTransaction, ): Promise { const lastProcessedTime = - (await this.getLastProcessedTime(vertex, tran)) ?? 0; + (await this.gestaltGraph.getVertexProcessedTime(vertex, tran)) ?? 0; return lastProcessedTime > time; } } diff --git a/src/gestalts/GestaltGraph.ts b/src/gestalts/GestaltGraph.ts index 0c9620e81..b5838bd44 100644 --- a/src/gestalts/GestaltGraph.ts +++ b/src/gestalts/GestaltGraph.ts @@ -13,8 +13,9 @@ import type { GestaltInfo, GestaltLinkIdentity, GestaltId, + GestaltIdEncoded, } from './types'; -import type { NodeId, ProviderIdentityId } from '../ids/types'; +import type { ClaimId, NodeId, ProviderIdentityId } from '../ids/types'; import type ACL from '../acl/ACL'; import type { GestaltLinkJSON } from './types'; import Logger from '@matrixai/logger'; @@ -28,6 +29,11 @@ import * as gestaltsErrors from './errors'; import * as gestaltsEvents from './events'; import * as aclUtils from '../acl/utils'; import { never } from '../utils'; +import * as utils from '../utils'; + +// TODO: add tracking for the oldest claimID for a node. +// This needs to be removed if the node is removed. +// ALso possible undefined. interface GestaltGraph extends CreateDestroyStartStop {} @CreateDestroyStartStop( @@ -106,6 +112,29 @@ class GestaltGraph { 'identities', ]; + /** + * Last processed collection + * `GestaltGraph/VertexProcessedTime/{GestaltIdEncoded} -> number` + */ + protected vertexProcessedTimePath: LevelPath = [ + this.constructor.name, + 'vertexProcessedTime', + ]; + + /** + * Last processed collection + * `GestaltGraph/vertexProcessedTimeOrder/{number}/{GestaltIdEncoded} -> gestaltIdEncoded` + */ + protected vertexProcessedTimeOrderPath: LevelPath = [ + this.constructor.name, + 'vertexProcessedTimeOrder', + ]; + + protected nodeClaimIdOldestPath: LevelPath = [ + this.constructor.name, + 'nodeClaimIdOldestPath', + ]; + protected generateGestaltLinkId: () => GestaltLinkId; constructor({ db, acl, logger }: { db: DB; acl: ACL; logger: Logger }) { @@ -310,6 +339,194 @@ class GestaltGraph { } } + /** + * Updates the last processed time in the database for the given vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async setVertexProcessedTime( + vertex: GestaltId, + processedTime: number, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.setVertexProcessedTime(vertex, processedTime, tran), + ); + } + + const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); + await tran.lock( + [this.constructor.name, 'vertexProcessedTime', gestaltIdEncoded].join(''), + ); + + await tran.put( + [...this.vertexProcessedTimePath, gestaltIdEncoded], + processedTime, + ); + await tran.put( + [ + ...this.vertexProcessedTimeOrderPath, + utils.lexiPackBuffer(processedTime), + gestaltIdEncoded, + ], + gestaltIdEncoded, + ); + } + + /** + * Removes the last processed time for a vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async unsetVertexProcessedTime( + vertex: GestaltId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.unsetVertexProcessedTime(vertex, tran), + ); + } + + const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); + await tran.lock( + [this.constructor.name, 'vertexProcessedTime', gestaltIdEncoded].join(''), + ); + + const processedTime = await tran.get([ + ...this.vertexProcessedTimePath, + gestaltIdEncoded, + ]); + if (processedTime == null) return; + await tran.del([...this.vertexProcessedTimePath, gestaltIdEncoded]); + await tran.del([ + ...this.vertexProcessedTimeOrderPath, + utils.lexiPackBuffer(processedTime), + gestaltIdEncoded, + ]); + } + + /** + * Gets the last processed time for a vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async getVertexProcessedTime( + vertex: GestaltId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.getVertexProcessedTime(vertex, tran), + ); + } + + const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); + return await tran.get([ + ...this.vertexProcessedTimePath, + gestaltIdEncoded, + ]); + } + + /** + * Gets the last processed time for a vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async *getVertexProcessedTimes( + { + order = 'asc', + seek, + limit, + }: { + order?: 'asc' | 'desc'; + seek?: number; + limit?: number; + } = {}, + tran?: DBTransaction, + ): AsyncGenerator<[GestaltId, number]> { + if (tran == null) { + return yield* this.db.withTransactionG((tran) => + this.getVertexProcessedTimes({ order, seek, limit }, tran), + ); + } + + const iterator = tran.iterator( + this.vertexProcessedTimeOrderPath, + { + keyAsBuffer: true, + keys: true, + values: true, + valueAsBuffer: false, + reverse: order !== 'asc', + lte: + seek != null + ? [utils.lexiPackBuffer(seek), Buffer.alloc(1, 0)] + : undefined, + }, + ); + for await (const [path, gestaltIdEncoded] of iterator) { + const lastProcessedTime = utils.lexiUnpackBuffer(path[0] as Buffer); + if (lastProcessedTime == null) { + never('lastProcessedTime should be valid here'); + } + const gestaltId = gestaltsUtils.decodeGestaltId(gestaltIdEncoded); + if (gestaltId == null) never('GestaltId should be valid here'); + yield [gestaltId, lastProcessedTime]; + } + } + + /** + * Updates the newest `ClaimId` for a node if it's newer than the current id stored + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async setClaimIdNewest( + nodeId: NodeId, + claimId: ClaimId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.setClaimIdNewest(nodeId, claimId, tran), + ); + } + + await tran.lock( + [...this.nodeClaimIdOldestPath, nodeId.toString()].join(''), + ); + // Getting existing ClaimId + const claimIdExisting = await this.getClaimIdNewest(nodeId, tran); + // If an old claimId already exists and is older, then we don't update + if ( + claimIdExisting != null && + Buffer.compare(claimIdExisting, claimId) === 1 + ) { + return; + } + // Setting new one + await tran.put( + [...this.nodeClaimIdOldestPath, nodeId.toBuffer()], + claimId.toBuffer(), + true, + ); + } + + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async getClaimIdNewest( + nodeId: NodeId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.getClaimIdNewest(nodeId, tran), + ); + } + + const claimIdRaw = await tran.get( + [...this.nodeClaimIdOldestPath, nodeId.toBuffer()], + true, + ); + if (claimIdRaw == null) return; + return IdInternal.fromBuffer(claimIdRaw); + } + // LINKING AND UNLINKING VERTICES /** diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index bafe4d74f..ab40edb29 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -473,6 +473,29 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); }); + test('processed vertices are queued for rediscovery', async () => { + const discovery = await Discovery.createDiscovery({ + db, + keyRing, + gestaltGraph, + identitiesManager, + nodeManager, + taskManager, + logger, + fresh: true, + }); + await taskManager.startProcessing(); + await discovery.queueDiscoveryByNode(nodeA.keyRing.getNodeId()); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); + await discovery.waitForDiscoveryTasks(true); + + await taskManager.stopProcessing(); + await discovery.stop(); + await discovery.destroy(); + }); test('should skip recently discovered vertices', async () => { const discovery = await Discovery.createDiscovery({ db, @@ -538,7 +561,7 @@ describe('Discovery', () => { do { existingTasks = await discovery.waitForDiscoveryTasks(); } while (existingTasks > 0); - // Only the queued vertex should be processed + // All vertices should be reprocessed expect(processVertexMock).toHaveBeenCalledTimes(3); await taskManager.stopProcessing(); diff --git a/tests/gestalts/GestaltGraph.test.ts b/tests/gestalts/GestaltGraph.test.ts index 94d21de9e..a60a46977 100644 --- a/tests/gestalts/GestaltGraph.test.ts +++ b/tests/gestalts/GestaltGraph.test.ts @@ -21,7 +21,7 @@ import path from 'path'; import fs from 'fs'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { DB } from '@matrixai/db'; -import { fc, testProp } from '@fast-check/jest'; +import { fc, test } from '@fast-check/jest'; import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; import GestaltGraph from '@/gestalts/GestaltGraph'; import ACL from '@/acl/ACL'; @@ -32,7 +32,9 @@ import * as keysUtils from '@/keys/utils'; import Token from '@/tokens/Token'; import { encodeGestaltNodeId, encodeGestaltIdentityId } from '@/gestalts/utils'; import * as nodesUtils from '@/nodes/utils'; +import * as claimsUtils from '@/claims/utils'; import * as testsGestaltsUtils from './utils'; +import * as testsUtils from '../utils'; import * as testsIdentitiesUtils from '../identities/utils'; import * as testsKeysUtils from '../keys/utils'; import * as ids from '../../src/ids'; @@ -265,9 +267,8 @@ describe('GestaltGraph', () => { gestaltsErrors.ErrorGestaltsGraphNotRunning, ); }); - testProp( + test.prop([gestaltNodeInfoComposedArb])( 'getNode, setNode and unsetNode', - [gestaltNodeInfoComposedArb], async (gestaltNodeInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -289,9 +290,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltNodeInfoComposedArb])( 'setNode updates node information', - [gestaltNodeInfoComposedArb], async (gestaltNodeInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -317,9 +317,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkNodeComposedArb])( 'linkNodeAndNode and unlinkNodeAndNode', - [linkNodeComposedArb], async ({ gestaltNodeInfo1, gestaltNodeInfo2, linkNode }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -372,9 +371,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltIdentityInfoComposedArb])( 'get, set and unset identity', - [gestaltIdentityInfoComposedArb], async (gestaltIdentityInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -403,9 +401,8 @@ describe('GestaltGraph', () => { } }, ); - testProp( + test.prop([gestaltIdentityInfoComposedArb])( 'setIdentity updates identity info', - [gestaltIdentityInfoComposedArb], async (gestaltIdentityInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -444,9 +441,8 @@ describe('GestaltGraph', () => { } }, ); - testProp( + test.prop([linkIdentityComposedArb])( 'linkNodeAndIdentity and unlinkNodeAndIdentity', - [linkIdentityComposedArb], async ({ gestaltNodeInfo, gestaltIdentityInfo, linkIdentity }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -500,9 +496,8 @@ describe('GestaltGraph', () => { } }, ); - testProp( + test.prop([gestaltInfoComposedArb])( 'getVertex, setVertex and unsetVertex', - [gestaltInfoComposedArb], async (gestaltInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -523,9 +518,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltInfoComposedArb])( 'setVertex updates vertex information', - [gestaltInfoComposedArb], async (gestaltInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -553,9 +547,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkVertexComposedArb])( 'linkVertexAndVertex and unlinkVertexAndVertex', - [linkVertexComposedArb], async ({ gestaltVertexInfo1, gestaltVertexInfo2, gestaltLink }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -630,9 +623,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltNodeInfoComposedArb])( 'getGestaltByNode', - [gestaltNodeInfoComposedArb], async (gestaltNodeInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -662,9 +654,8 @@ describe('GestaltGraph', () => { }); }, ); - testProp( + test.prop([gestaltIdentityInfoComposedArb])( 'getGestaltByIdentity', - [gestaltIdentityInfoComposedArb], async (gestaltIdentityInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -697,7 +688,7 @@ describe('GestaltGraph', () => { }); }, ); - testProp('getGestalt', [gestaltInfoComposedArb], async (gestaltInfo) => { + test.prop([gestaltInfoComposedArb])('getGestalt', async (gestaltInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, acl, @@ -750,50 +741,48 @@ describe('GestaltGraph', () => { fail('invalid type'); } }); - testProp( - 'getGestalts with nodes', - [fc.array(gestaltNodeInfoComposedArb, { minLength: 2, maxLength: 10 })], - async (gestaltNodeInfos) => { - const ids = new Set(); - for (const gestaltNodeInfo of gestaltNodeInfos) { - ids.add(nodesUtils.encodeNodeId(gestaltNodeInfo.nodeId)); - } - fc.pre(ids.size === gestaltNodeInfos.length); - const gestaltGraph = await GestaltGraph.createGestaltGraph({ - db, - acl, - logger, - fresh: true, + test.prop([ + fc.array(gestaltNodeInfoComposedArb, { minLength: 2, maxLength: 10 }), + ])('getGestalts with nodes', async (gestaltNodeInfos) => { + const ids = new Set(); + for (const gestaltNodeInfo of gestaltNodeInfos) { + ids.add(nodesUtils.encodeNodeId(gestaltNodeInfo.nodeId)); + } + fc.pre(ids.size === gestaltNodeInfos.length); + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + for (const gestaltNodeInfo of gestaltNodeInfos) { + await gestaltGraph.setNode(gestaltNodeInfo); + } + const gestalts = await AsyncIterable.as( + gestaltGraph.getGestalts(), + ).toArray(); + expect(gestalts).toHaveLength(gestaltNodeInfos.length); + for (const gestalt of gestalts) { + const gestaltId = Object.keys(gestalt.nodes)[0]; + const [, nodeId] = gestaltsUtils.decodeGestaltNodeId(gestaltId)!; + expect(gestalt).toMatchObject({ + matrix: { + [gestaltId]: {}, + }, + nodes: { + [gestaltId]: { nodeId }, + }, + identities: {}, }); - for (const gestaltNodeInfo of gestaltNodeInfos) { - await gestaltGraph.setNode(gestaltNodeInfo); - } - const gestalts = await AsyncIterable.as( - gestaltGraph.getGestalts(), - ).toArray(); - expect(gestalts).toHaveLength(gestaltNodeInfos.length); - for (const gestalt of gestalts) { - const gestaltId = Object.keys(gestalt.nodes)[0]; - const [, nodeId] = gestaltsUtils.decodeGestaltNodeId(gestaltId)!; - expect(gestalt).toMatchObject({ - matrix: { - [gestaltId]: {}, - }, - nodes: { - [gestaltId]: { nodeId }, - }, - identities: {}, - }); - } - }, - ); - testProp( + } + }); + test.prop([ + fc + .array(gestaltIdentityInfoComposedArb, { minLength: 2, maxLength: 10 }) + .noShrink(), + ])( 'getGestalts with identities', - [ - fc - .array(gestaltIdentityInfoComposedArb, { minLength: 2, maxLength: 10 }) - .noShrink(), - ], + async (gestaltIdentityInfos) => { const ids = new Set(); for (const gestaltIdentityInfo of gestaltIdentityInfos) { @@ -834,80 +823,77 @@ describe('GestaltGraph', () => { } }, ); - testProp( - 'getGestalts with nodes and identities', - [fc.array(gestaltInfoComposedArb, { minLength: 2, maxLength: 10 })], - async (gestaltInfos) => { - const ids = new Set(); - for (const gestaltInfo of gestaltInfos) { - const [type, data] = gestaltInfo; - switch (type) { - case 'identity': - ids.add(data.providerId + data.identityId); - break; - case 'node': - ids.add(nodesUtils.encodeNodeId(data.nodeId)); - break; - default: - break; - } - } - fc.pre(ids.size === gestaltInfos.length); - const gestaltGraph = await GestaltGraph.createGestaltGraph({ - db, - acl, - logger, - fresh: true, - }); - for (const gestaltinfo of gestaltInfos) { - await gestaltGraph.setVertex(gestaltinfo); + test.prop([ + fc.array(gestaltInfoComposedArb, { minLength: 2, maxLength: 10 }), + ])('getGestalts with nodes and identities', async (gestaltInfos) => { + const ids = new Set(); + for (const gestaltInfo of gestaltInfos) { + const [type, data] = gestaltInfo; + switch (type) { + case 'identity': + ids.add(data.providerId + data.identityId); + break; + case 'node': + ids.add(nodesUtils.encodeNodeId(data.nodeId)); + break; + default: + break; } - const gestalts = await AsyncIterable.as( - gestaltGraph.getGestalts(), - ).toArray(); - expect(gestalts).toHaveLength(gestaltInfos.length); - for (const gestalt of gestalts) { - const gestaltId = Object.keys(gestalt.matrix)[0]; - const [type, id] = gestaltsUtils.decodeGestaltId(gestaltId)!; - switch (type) { - case 'node': - { - expect(gestalt).toMatchObject({ - matrix: { - [gestaltId]: {}, - }, - nodes: { - [gestaltId]: { nodeId: id }, - }, - identities: {}, - }); - } - break; - case 'identity': - { - expect(gestalt).toMatchObject({ - matrix: { - [gestaltId]: {}, - }, - nodes: {}, - identities: { - [gestaltId]: { - providerId: id[0], - identityId: id[1], - }, + } + fc.pre(ids.size === gestaltInfos.length); + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + for (const gestaltinfo of gestaltInfos) { + await gestaltGraph.setVertex(gestaltinfo); + } + const gestalts = await AsyncIterable.as( + gestaltGraph.getGestalts(), + ).toArray(); + expect(gestalts).toHaveLength(gestaltInfos.length); + for (const gestalt of gestalts) { + const gestaltId = Object.keys(gestalt.matrix)[0]; + const [type, id] = gestaltsUtils.decodeGestaltId(gestaltId)!; + switch (type) { + case 'node': + { + expect(gestalt).toMatchObject({ + matrix: { + [gestaltId]: {}, + }, + nodes: { + [gestaltId]: { nodeId: id }, + }, + identities: {}, + }); + } + break; + case 'identity': + { + expect(gestalt).toMatchObject({ + matrix: { + [gestaltId]: {}, + }, + nodes: {}, + identities: { + [gestaltId]: { + providerId: id[0], + identityId: id[1], }, - }); - } - break; - default: - fail('invalid type'); - } + }, + }); + } + break; + default: + fail('invalid type'); } - }, - ); - testProp( + } + }); + test.prop([linkNodeComposedArb])( 'getGestalt with node links', - [linkNodeComposedArb], async ({ gestaltNodeInfo1, gestaltNodeInfo2, linkNode }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -967,9 +953,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkIdentityComposedArb])( 'getGestalt with identity links', - [linkIdentityComposedArb], async ({ gestaltNodeInfo, gestaltIdentityInfo, linkIdentity }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -1030,9 +1015,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkVertexComposedArb])( 'getGestalt with node and identity links', - [linkVertexComposedArb], async ({ gestaltVertexInfo1, gestaltVertexInfo2, gestaltLink }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -1362,37 +1346,62 @@ describe('GestaltGraph', () => { }) .noShrink(); - testProp( - 'model', - [altCommandsArb], - async (cmds) => { - await acl.start({ fresh: true }); - const gestaltGraph = await GestaltGraph.createGestaltGraph({ - db, - acl, - logger, - fresh: true, - }); - try { - const model: testsGestaltsUtils.GestaltGraphModel = { - matrix: {}, - nodes: {}, - identities: {}, - permissions: {}, - }; - const modelSetup = async () => { - return { - model, - real: gestaltGraph, - }; + test.prop([altCommandsArb], { numRuns: 20 })('model', async (cmds) => { + await acl.start({ fresh: true }); + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + try { + const model: testsGestaltsUtils.GestaltGraphModel = { + matrix: {}, + nodes: {}, + identities: {}, + permissions: {}, + }; + const modelSetup = async () => { + return { + model, + real: gestaltGraph, }; - await fc.asyncModelRun(modelSetup, cmds); - } finally { - await gestaltGraph.stop(); - await acl.stop(); - } - }, - { numRuns: 20 }, - ); + }; + await fc.asyncModelRun(modelSetup, cmds); + } finally { + await gestaltGraph.stop(); + await acl.stop(); + } + }); + }); + test('Should only set the newest ClaimId', async () => { + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + + // Creating 3 claimIDs. + const nodeId = testsUtils.generateRandomNodeId(); + const claimIdGenerator = claimsUtils.createClaimIdGenerator(nodeId); + const claimId1 = claimIdGenerator(); + const claimId2 = claimIdGenerator(); + const claimId3 = claimIdGenerator(); + + await gestaltGraph.setClaimIdNewest(nodeId, claimId1); + expect( + (await gestaltGraph.getClaimIdNewest(nodeId))?.toMultibase('base32hex'), + ).toEqual(claimId1.toMultibase('base32hex')); + await gestaltGraph.setClaimIdNewest(nodeId, claimId3); + expect( + (await gestaltGraph.getClaimIdNewest(nodeId))?.toMultibase('base32hex'), + ).toEqual(claimId3.toMultibase('base32hex')); + await gestaltGraph.setClaimIdNewest(nodeId, claimId2); + expect( + (await gestaltGraph.getClaimIdNewest(nodeId))?.toMultibase('base32hex'), + ).toEqual(claimId3.toMultibase('base32hex')); + + await gestaltGraph.stop(); }); });