From 459d9fd795243431a6d567f4e0e38e0f7828fa38 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Thu, 4 Apr 2024 15:37:07 +1100 Subject: [PATCH 1/4] feat: added vertex rediscovery process - Will rediscover each vertex every hour - If a vertex hasn't successfully processed within 3 days it is removed from consideration [ci skip] --- src/discovery/Discovery.ts | 211 ++++++++++++++++++++++++++---- tests/discovery/Discovery.test.ts | 79 +++++++++++ 2 files changed, 263 insertions(+), 27 deletions(-) diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index d4849ef84..299af4dd6 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -77,7 +77,10 @@ class Discovery { nodeManager, taskManager, discoverVertexTimeoutTime = 2000, + rediscoverCheckIntervalTime = 60 * 60 * 1000, // 1 hour + rediscoverVertexDelayTime = 60 * 60 * 1000, // 1 hour rediscoverSkipTime = 60 * 60 * 1000, // 1 hour + staleVertexThresholdTime = 3 * 24 * 60 * 60 * 1000, // 3 days logger = new Logger(this.name), fresh = false, }: { @@ -88,7 +91,10 @@ class Discovery { nodeManager: NodeManager; taskManager: TaskManager; discoverVertexTimeoutTime?: number; + rediscoverCheckIntervalTime?: number; + rediscoverVertexDelayTime?: number; rediscoverSkipTime?: number; + staleVertexThresholdTime?: number; logger?: Logger; fresh?: boolean; }): Promise { @@ -101,7 +107,10 @@ class Discovery { nodeManager, taskManager, discoverVertexTimeoutTime, + rediscoverCheckIntervalTime, + rediscoverVertexDelayTime, rediscoverSkipTime, + staleVertexThresholdTime, logger, }); await discovery.start({ fresh }); @@ -117,10 +126,23 @@ class Discovery { protected nodeManager: NodeManager; protected taskManager: TaskManager; protected discoverVertexTimeoutTime: number; + /** + * Interval delay used when checking for nodes to rediscover + */ + protected rediscoverCheckIntervalTime: number; + /** + * The threshold used when deciding to rediscover a vertex based on how long ago it was processed + */ + protected rediscoverVertexThresholdTime: number; /** * The time since a vertex has been processed where re processing will be skipped */ protected rediscoverSkipTime: number; + /** + * The time threshold for + * @protected + */ + protected staleVertexThresholdTime: number; protected discoveryDbPath: LevelPath = [this.constructor.name]; /** * Last processed collection @@ -132,7 +154,7 @@ class Discovery { ]; /** * Last processed collection - * `Discovery/lastProcessed/{GestaltIdEncoded} -> number` + * `Discovery/lastProcessed/{number}/{GestaltIdEncoded} -> gestaltIdEncoded` */ protected lastProcessedOrderPath: LevelPath = [ ...this.discoveryDbPath, @@ -151,6 +173,12 @@ class Discovery { lastProcessedCutoffTime ?? undefined, ctx, ); + const gestaltId = gestaltsUtils.decodeGestaltId(vertex); + if (gestaltId == null) never('the GestaltId should always be valid here'); + await this.scheduleDiscoveryForVertex( + gestaltId, + this.rediscoverVertexThresholdTime, + ); } catch (e) { if ( e instanceof tasksErrors.ErrorTaskStop || @@ -172,6 +200,23 @@ class Discovery { public readonly discoverVertexHandlerId = `${this.constructor.name}.discoverVertexHandler` as TaskHandlerId; + /** + * This handler is run periodically to check if nodes are ready to be rediscovered + */ + protected checkRediscoveryHandler: TaskHandler = async () => { + await this.checkRediscovery( + Date.now() - this.rediscoverVertexThresholdTime, + ); + await this.taskManager.scheduleTask({ + handlerId: this.discoverVertexHandlerId, + path: [this.constructor.name, this.checkRediscoveryHandlerId], + lazy: true, + delay: this.rediscoverCheckIntervalTime, + }); + }; + public readonly checkRediscoveryHandlerId = + `${this.constructor.name}.checkForRediscoveryHandler` as TaskHandlerId; + public constructor({ keyRing, db, @@ -180,7 +225,10 @@ class Discovery { nodeManager, taskManager, discoverVertexTimeoutTime, + rediscoverCheckIntervalTime, + rediscoverVertexDelayTime, rediscoverSkipTime, + staleVertexThresholdTime, logger, }: { db: DB; @@ -190,7 +238,10 @@ class Discovery { nodeManager: NodeManager; taskManager: TaskManager; discoverVertexTimeoutTime: number; + rediscoverCheckIntervalTime: number; + rediscoverVertexDelayTime: number; rediscoverSkipTime: number; + staleVertexThresholdTime: number; logger: Logger; }) { this.db = db; @@ -200,7 +251,10 @@ class Discovery { this.nodeManager = nodeManager; this.taskManager = taskManager; this.discoverVertexTimeoutTime = discoverVertexTimeoutTime; + this.rediscoverCheckIntervalTime = rediscoverCheckIntervalTime; + this.rediscoverVertexThresholdTime = rediscoverVertexDelayTime; this.rediscoverSkipTime = rediscoverSkipTime; + this.staleVertexThresholdTime = staleVertexThresholdTime; this.logger = logger; } @@ -222,6 +276,13 @@ class Discovery { this.discoverVertexHandlerId, this.discoverVertexHandler, ); + // Start up rediscovery task + await this.taskManager.scheduleTask({ + handlerId: this.discoverVertexHandlerId, + path: [this.constructor.name, this.checkRediscoveryHandlerId], + lazy: true, + delay: this.rediscoverCheckIntervalTime, + }); this.logger.info(`Started ${this.constructor.name}`); } @@ -376,7 +437,7 @@ class Discovery { if (ctx.signal.aborted) throw ctx.signal.reason; switch (signedClaim.payload.typ) { case 'ClaimLinkNode': - await this.procesessClaimLinkNode( + await this.processClaimLinkNode( signedClaim, nodeId, lastProcessedCutoffTime, @@ -397,7 +458,7 @@ class Discovery { await this.setLastProcessed(gestaltNodeId, processedTime); } - protected async procesessClaimLinkNode( + protected async processClaimLinkNode( signedClaim: SignedClaim, nodeId: NodeId, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, @@ -680,24 +741,22 @@ class Discovery { task.cancel(abortSingletonTaskReason); } // Only create if it doesn't exist - if (!taskExisting) { - // Otherwise create a new task if none exists - await this.taskManager.scheduleTask( - { - handlerId: this.discoverVertexHandlerId, - parameters: [gestaltIdEncoded, lastProcessedCutoffTime], - path: [ - this.constructor.name, - this.discoverVertexHandlerId, - gestaltIdEncoded, - ], - lazy: true, - deadline: this.discoverVertexTimeoutTime, - delay, - }, - tran, - ); - } + if (taskExisting != null) return; + await this.taskManager.scheduleTask( + { + handlerId: this.discoverVertexHandlerId, + parameters: [gestaltIdEncoded, lastProcessedCutoffTime], + path: [ + this.constructor.name, + this.discoverVertexHandlerId, + gestaltIdEncoded, + ], + lazy: true, + deadline: this.discoverVertexTimeoutTime, + delay, + }, + tran, + ); } /** @@ -782,6 +841,86 @@ class Discovery { return identityClaims; } + /** + * Checks previously discovered vertices for ones to be re-added back on to the queue + */ + public async checkRediscovery( + lastProcessedCutoffTime: number, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.checkRediscovery(lastProcessedCutoffTime, tran), + ); + } + + const staleVertexCutoff = Date.now() - this.staleVertexThresholdTime; + const gestaltIds: Array<[GestaltIdEncoded, number]> = []; + for await (const [ + gestaltId, + lastProcessedTime, + ] of this.getLastProcessedTimes( + { + order: 'asc', + seek: lastProcessedCutoffTime, + }, + tran, + )) { + gestaltIds.push([ + gestaltsUtils.encodeGestaltId(gestaltId), + lastProcessedTime, + ]); + } + // We want to lock all the ids at once before moving ahead + const locks = gestaltIds.map((gestaltIdEncoded) => { + return [ + this.constructor.name, + this.discoverVertexHandlerId, + gestaltIdEncoded, + ].join(''); + }); + await tran.lock(...locks); + // Schedule a new task for each vertex if it doesn't already exist + for (const [gestaltIdEncoded, lastProcessedTime] of gestaltIds) { + // If we exceed an age threshold then we just remove the vertex information + if (lastProcessedTime < staleVertexCutoff) { + await this.unsetLastProcessed( + gestaltsUtils.decodeGestaltId(gestaltIdEncoded)!, + ); + } + let taskExisting: Task | null = null; + for await (const task of this.taskManager.getTasks( + 'asc', + true, + [this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded], + tran, + )) { + if (taskExisting == null) { + taskExisting = task; + continue; + } + // Any extra tasks should be cancelled, this shouldn't normally happen + task.cancel(abortSingletonTaskReason); + } + if (taskExisting != null) continue; + // Schedule a new task + await this.taskManager.scheduleTask( + { + handlerId: this.discoverVertexHandlerId, + parameters: [gestaltIdEncoded, lastProcessedCutoffTime], + path: [ + this.constructor.name, + this.discoverVertexHandlerId, + gestaltIdEncoded, + ], + lazy: true, + deadline: this.discoverVertexTimeoutTime, + }, + tran, + ); + } + } + /** * Updates the last processed time in the database for the given vertex */ @@ -878,19 +1017,37 @@ class Discovery { * Gets the last processed time for a vertex */ protected async *getLastProcessedTimes( - order: 'asc' | 'desc' = 'asc', + { + order = 'asc', + seek, + limit, + }: { + order?: 'asc' | 'desc'; + seek?: number; + limit?: number; + } = {}, tran?: DBTransaction, ): AsyncGenerator<[GestaltId, number]> { if (tran == null) { return yield* this.db.withTransactionG((tran) => - this.getLastProcessedTimes(order, tran), + this.getLastProcessedTimes({ order, seek, limit }, tran), ); } - const iterator = tran.iterator(this.lastProcessedOrderPath, { - valueAsBuffer: false, - reverse: order !== 'asc', - }); + const iterator = tran.iterator( + this.lastProcessedOrderPath, + { + keyAsBuffer: true, + keys: true, + values: true, + valueAsBuffer: false, + reverse: order !== 'asc', + lte: + seek != null + ? [utils.lexiPackBuffer(seek), Buffer.alloc(1, 0)] + : undefined, + }, + ); for await (const [path, gestaltIdEncoded] of iterator) { const lastProcessedTime = utils.lexiUnpackBuffer(path[0] as Buffer); if (lastProcessedTime == null) { diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index bafe4d74f..b4c423630 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -473,6 +473,37 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); }); + test('processed vertices are queued for rediscovery', async () => { + const discovery = await Discovery.createDiscovery({ + db, + keyRing, + gestaltGraph, + identitiesManager, + nodeManager, + taskManager, + logger, + fresh: true, + }); + await taskManager.startProcessing(); + await discovery.queueDiscoveryByNode(nodeA.keyRing.getNodeId()); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); + + // For await (const task of taskManager.getTasks('asc', false, [ + // Discovery.name, + // discovery.discoverVertexHandlerId, + // ])) { + // console.log(task); + // } + + await discovery.waitForDiscoveryTasks(true); + + await taskManager.stopProcessing(); + await discovery.stop(); + await discovery.destroy(); + }); test('should skip recently discovered vertices', async () => { const discovery = await Discovery.createDiscovery({ db, @@ -545,4 +576,52 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); }); + + test('asd', async () => { + const discovery = await Discovery.createDiscovery({ + db, + keyRing, + gestaltGraph, + identitiesManager, + nodeManager, + taskManager, + logger, + }); + await taskManager.startProcessing(); + + // Attempt initial discovery + await discovery.queueDiscoveryByNode(nodeA.keyRing.getNodeId()); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); + + // None should be re-discovered since they were processed after the threshold + await discovery.checkRediscovery(0); + // There should be 0 discovery tasks + + let tasks: number = 0; + for await (const _ of taskManager.getTasks('asc', false, [ + Discovery.name, + discovery.discoverVertexHandlerId, + ])) { + tasks++; + } + expect(tasks).toBe(0); + + // All should be re-discovered + await discovery.checkRediscovery(Date.now()); + tasks = 0; + for await (const _ of taskManager.getTasks('asc', false, [ + Discovery.name, + discovery.discoverVertexHandlerId, + ])) { + tasks++; + } + expect(tasks).toBeGreaterThan(0); + + await taskManager.stopProcessing(); + await discovery.stop(); + await discovery.destroy(); + }); }); From c634f80d809643fbee93b81ee00264f6d6a4cb43 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 15 Apr 2024 15:25:24 +1000 Subject: [PATCH 2/4] feat: moved `vertexProcessedTime` and `claimIdNewest` to `GestaltGraph` for persistence Makes more sense to keep it there. [ci skip] --- src/discovery/Discovery.ts | 226 ++++------------------------ src/gestalts/GestaltGraph.ts | 219 ++++++++++++++++++++++++++- tests/discovery/Discovery.test.ts | 50 +----- tests/gestalts/GestaltGraph.test.ts | 32 ++++ 4 files changed, 279 insertions(+), 248 deletions(-) diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index 299af4dd6..6c05ea386 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -1,4 +1,4 @@ -import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; +import type { DB, DBTransaction } from '@matrixai/db'; import type { PromiseCancellable } from '@matrixai/async-cancellable'; import type { ContextTimed } from '@matrixai/contexts'; import type { NodeId } from '../nodes/types'; @@ -18,7 +18,7 @@ import type { ProviderIdentityId, } from '../identities/types'; import type KeyRing from '../keys/KeyRing'; -import type { ClaimId, ClaimIdEncoded, SignedClaim } from '../claims/types'; +import type { ClaimIdEncoded, SignedClaim } from '../claims/types'; import type TaskManager from '../tasks/TaskManager'; import type { Task, TaskHandler, TaskHandlerId } from '../tasks/types'; import type { ClaimLinkIdentity, ClaimLinkNode } from '../claims/payloads'; @@ -35,7 +35,6 @@ import * as tasksErrors from '../tasks/errors'; import * as gestaltsUtils from '../gestalts/utils'; import * as nodesUtils from '../nodes/utils'; import * as keysUtils from '../keys/utils'; -import * as utils from '../utils'; import { never } from '../utils'; import Token from '../tokens/Token'; import { decodeClaimId } from '../ids'; @@ -140,26 +139,8 @@ class Discovery { protected rediscoverSkipTime: number; /** * The time threshold for - * @protected */ protected staleVertexThresholdTime: number; - protected discoveryDbPath: LevelPath = [this.constructor.name]; - /** - * Last processed collection - * `Discovery/lastProcessed/{GestaltIdEncoded} -> number` - */ - protected lastProcessedPath: LevelPath = [ - ...this.discoveryDbPath, - 'lastProcessed', - ]; - /** - * Last processed collection - * `Discovery/lastProcessed/{number}/{GestaltIdEncoded} -> gestaltIdEncoded` - */ - protected lastProcessedOrderPath: LevelPath = [ - ...this.discoveryDbPath, - 'lastProcessedOrder', - ]; protected discoverVertexHandler: TaskHandler = async ( ctx, @@ -173,12 +154,6 @@ class Discovery { lastProcessedCutoffTime ?? undefined, ctx, ); - const gestaltId = gestaltsUtils.decodeGestaltId(vertex); - if (gestaltId == null) never('the GestaltId should always be valid here'); - await this.scheduleDiscoveryForVertex( - gestaltId, - this.rediscoverVertexThresholdTime, - ); } catch (e) { if ( e instanceof tasksErrors.ErrorTaskStop || @@ -349,8 +324,6 @@ class Discovery { ); } - // Fixme, when processing a vertex, we need to check existing links in the - // GestaltGraph and ask for claims newer than that protected processVertex( vertex: GestaltIdEncoded, lastProcessedCutoffTime?: number, @@ -387,25 +360,13 @@ class Discovery { if (nodeId.equals(this.keyRing.getNodeId())) { // Skip our own nodeId, we actively add this information when it changes, // so there is no need to scan it. - await this.setLastProcessed(gestaltNodeId, processedTime); + await this.gestaltGraph.setVertexProcessedTime( + gestaltNodeId, + processedTime, + ); return; } - // Get the oldest known claim for this node - // get the oldest one - let newestClaimId: ClaimId | undefined = undefined; - for await (const [, gestaltLink] of this.gestaltGraph.getLinks([ - 'node', - nodeId, - ])) { - const claimIdEncoded = gestaltLink[1].claim.payload.jti; - const claimId = decodeClaimId(claimIdEncoded); - if (claimId == null) never(); - if (newestClaimId == null) { - newestClaimId = claimId; - } else if (Buffer.compare(newestClaimId, claimId) === -1) { - newestClaimId = claimId; - } - } + const newestClaimId = await this.gestaltGraph.getClaimIdNewest(nodeId); // The sigChain data of the vertex (containing all cryptolinks) let vertexChainData: Record = {}; try { @@ -415,7 +376,10 @@ class Discovery { ctx, ); } catch (e) { - await this.setLastProcessed(gestaltNodeId, processedTime); + await this.gestaltGraph.setVertexProcessedTime( + gestaltNodeId, + processedTime, + ); // Not strictly an error in this case, we can fail to connect this.logger.info( `Failed to discover ${nodesUtils.encodeNodeId( @@ -424,15 +388,7 @@ class Discovery { ); return; } - // TODO: for now, the chain data is treated as a 'disjoint' set of - // cryptolink claims from a node to another node/identity. - // That is, we have no notion of revocations, or multiple claims to - // the same node/identity. Thus, we simply iterate over this chain - // of cryptolinks. // Iterate over each of the claims in the chain (already verified). - // TODO: there is no deterministic iteration order of keys in a record. - // When we change to iterating over ordered sigchain claims, - // this must change into array iteration. for (const signedClaim of Object.values(vertexChainData)) { if (ctx.signal.aborted) throw ctx.signal.reason; switch (signedClaim.payload.typ) { @@ -455,7 +411,10 @@ class Discovery { never(); } } - await this.setLastProcessed(gestaltNodeId, processedTime); + await this.gestaltGraph.setVertexProcessedTime( + gestaltNodeId, + processedTime, + ); } protected async processClaimLinkNode( @@ -497,6 +456,9 @@ class Discovery { meta: {}, }, ); + const claimId = decodeClaimId(signedClaim.payload.jti); + if (claimId == null) never(); + await this.gestaltGraph.setClaimIdNewest(nodeId, claimId); // Add this vertex to the queue if it hasn't already been visited const linkedGestaltId: GestaltId = ['node', linkedVertexNodeId]; if ( @@ -579,6 +541,9 @@ class Discovery { }, }, ); + const claimId = decodeClaimId(signedClaim.payload.jti); + if (claimId == null) never(); + await this.gestaltGraph.setClaimIdNewest(nodeId, claimId); // Add this identity vertex to the queue if it is not present const providerIdentityId = JSON.parse(signedClaim.payload.sub!); const identityGestaltId: GestaltId = ['identity', providerIdentityId]; @@ -654,7 +619,10 @@ class Discovery { ); } } - await this.setLastProcessed(['identity', providerIdentityId], Date.now()); + await this.gestaltGraph.setVertexProcessedTime( + ['identity', providerIdentityId], + Date.now(), + ); } /** @@ -859,7 +827,7 @@ class Discovery { for await (const [ gestaltId, lastProcessedTime, - ] of this.getLastProcessedTimes( + ] of this.gestaltGraph.getVertexProcessedTimes( { order: 'asc', seek: lastProcessedCutoffTime, @@ -884,7 +852,7 @@ class Discovery { for (const [gestaltIdEncoded, lastProcessedTime] of gestaltIds) { // If we exceed an age threshold then we just remove the vertex information if (lastProcessedTime < staleVertexCutoff) { - await this.unsetLastProcessed( + await this.gestaltGraph.unsetVertexProcessedTime( gestaltsUtils.decodeGestaltId(gestaltIdEncoded)!, ); } @@ -921,144 +889,6 @@ class Discovery { } } - /** - * Updates the last processed time in the database for the given vertex - */ - protected async setLastProcessed( - vertex: GestaltId, - processedTime: number, - tran?: DBTransaction, - ): Promise { - if (tran == null) { - return this.db.withTransactionF((tran) => - this.setLastProcessed(vertex, processedTime, tran), - ); - } - - const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); - await tran.lock( - [ - this.constructor.name, - this.discoverVertexHandlerId, - gestaltIdEncoded, - ].join(''), - ); - - await tran.put( - [...this.lastProcessedPath, gestaltIdEncoded], - processedTime, - ); - await tran.put( - [ - ...this.lastProcessedOrderPath, - utils.lexiPackBuffer(processedTime), - gestaltIdEncoded, - ], - gestaltIdEncoded, - ); - } - - /** - * Removes the last processed time for a vertex - */ - protected async unsetLastProcessed( - vertex: GestaltId, - tran?: DBTransaction, - ): Promise { - if (tran == null) { - return this.db.withTransactionF((tran) => - this.unsetLastProcessed(vertex, tran), - ); - } - - const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); - await tran.lock( - [ - this.constructor.name, - this.discoverVertexHandlerId, - gestaltIdEncoded, - ].join(''), - ); - - const processedTime = await tran.get([ - ...this.lastProcessedPath, - gestaltIdEncoded, - ]); - if (processedTime == null) return; - await tran.del([...this.lastProcessedPath, gestaltIdEncoded]); - await tran.del([ - ...this.lastProcessedOrderPath, - utils.lexiPackBuffer(processedTime), - gestaltIdEncoded, - ]); - } - - /** - * Gets the last processed time for a vertex - */ - protected async getLastProcessedTime( - vertex: GestaltId, - tran?: DBTransaction, - ): Promise { - if (tran == null) { - return this.db.withTransactionF((tran) => - this.getLastProcessedTime(vertex, tran), - ); - } - - const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); - return await tran.get([ - ...this.lastProcessedPath, - gestaltIdEncoded, - ]); - } - - /** - * Gets the last processed time for a vertex - */ - protected async *getLastProcessedTimes( - { - order = 'asc', - seek, - limit, - }: { - order?: 'asc' | 'desc'; - seek?: number; - limit?: number; - } = {}, - tran?: DBTransaction, - ): AsyncGenerator<[GestaltId, number]> { - if (tran == null) { - return yield* this.db.withTransactionG((tran) => - this.getLastProcessedTimes({ order, seek, limit }, tran), - ); - } - - const iterator = tran.iterator( - this.lastProcessedOrderPath, - { - keyAsBuffer: true, - keys: true, - values: true, - valueAsBuffer: false, - reverse: order !== 'asc', - lte: - seek != null - ? [utils.lexiPackBuffer(seek), Buffer.alloc(1, 0)] - : undefined, - }, - ); - for await (const [path, gestaltIdEncoded] of iterator) { - const lastProcessedTime = utils.lexiUnpackBuffer(path[0] as Buffer); - if (lastProcessedTime == null) { - never('lastProcessedTime should be valid here'); - } - const gestaltId = gestaltsUtils.decodeGestaltId(gestaltIdEncoded); - if (gestaltId == null) never('GestaltId should be valid here'); - yield [gestaltId, lastProcessedTime]; - } - } - /** * Returns true if the vertex was processed after the given time */ @@ -1068,7 +898,7 @@ class Discovery { tran?: DBTransaction, ): Promise { const lastProcessedTime = - (await this.getLastProcessedTime(vertex, tran)) ?? 0; + (await this.gestaltGraph.getVertexProcessedTime(vertex, tran)) ?? 0; return lastProcessedTime > time; } } diff --git a/src/gestalts/GestaltGraph.ts b/src/gestalts/GestaltGraph.ts index 0c9620e81..b5838bd44 100644 --- a/src/gestalts/GestaltGraph.ts +++ b/src/gestalts/GestaltGraph.ts @@ -13,8 +13,9 @@ import type { GestaltInfo, GestaltLinkIdentity, GestaltId, + GestaltIdEncoded, } from './types'; -import type { NodeId, ProviderIdentityId } from '../ids/types'; +import type { ClaimId, NodeId, ProviderIdentityId } from '../ids/types'; import type ACL from '../acl/ACL'; import type { GestaltLinkJSON } from './types'; import Logger from '@matrixai/logger'; @@ -28,6 +29,11 @@ import * as gestaltsErrors from './errors'; import * as gestaltsEvents from './events'; import * as aclUtils from '../acl/utils'; import { never } from '../utils'; +import * as utils from '../utils'; + +// TODO: add tracking for the oldest claimID for a node. +// This needs to be removed if the node is removed. +// ALso possible undefined. interface GestaltGraph extends CreateDestroyStartStop {} @CreateDestroyStartStop( @@ -106,6 +112,29 @@ class GestaltGraph { 'identities', ]; + /** + * Last processed collection + * `GestaltGraph/VertexProcessedTime/{GestaltIdEncoded} -> number` + */ + protected vertexProcessedTimePath: LevelPath = [ + this.constructor.name, + 'vertexProcessedTime', + ]; + + /** + * Last processed collection + * `GestaltGraph/vertexProcessedTimeOrder/{number}/{GestaltIdEncoded} -> gestaltIdEncoded` + */ + protected vertexProcessedTimeOrderPath: LevelPath = [ + this.constructor.name, + 'vertexProcessedTimeOrder', + ]; + + protected nodeClaimIdOldestPath: LevelPath = [ + this.constructor.name, + 'nodeClaimIdOldestPath', + ]; + protected generateGestaltLinkId: () => GestaltLinkId; constructor({ db, acl, logger }: { db: DB; acl: ACL; logger: Logger }) { @@ -310,6 +339,194 @@ class GestaltGraph { } } + /** + * Updates the last processed time in the database for the given vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async setVertexProcessedTime( + vertex: GestaltId, + processedTime: number, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.setVertexProcessedTime(vertex, processedTime, tran), + ); + } + + const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); + await tran.lock( + [this.constructor.name, 'vertexProcessedTime', gestaltIdEncoded].join(''), + ); + + await tran.put( + [...this.vertexProcessedTimePath, gestaltIdEncoded], + processedTime, + ); + await tran.put( + [ + ...this.vertexProcessedTimeOrderPath, + utils.lexiPackBuffer(processedTime), + gestaltIdEncoded, + ], + gestaltIdEncoded, + ); + } + + /** + * Removes the last processed time for a vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async unsetVertexProcessedTime( + vertex: GestaltId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.unsetVertexProcessedTime(vertex, tran), + ); + } + + const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); + await tran.lock( + [this.constructor.name, 'vertexProcessedTime', gestaltIdEncoded].join(''), + ); + + const processedTime = await tran.get([ + ...this.vertexProcessedTimePath, + gestaltIdEncoded, + ]); + if (processedTime == null) return; + await tran.del([...this.vertexProcessedTimePath, gestaltIdEncoded]); + await tran.del([ + ...this.vertexProcessedTimeOrderPath, + utils.lexiPackBuffer(processedTime), + gestaltIdEncoded, + ]); + } + + /** + * Gets the last processed time for a vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async getVertexProcessedTime( + vertex: GestaltId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.getVertexProcessedTime(vertex, tran), + ); + } + + const gestaltIdEncoded = gestaltsUtils.encodeGestaltId(vertex); + return await tran.get([ + ...this.vertexProcessedTimePath, + gestaltIdEncoded, + ]); + } + + /** + * Gets the last processed time for a vertex + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async *getVertexProcessedTimes( + { + order = 'asc', + seek, + limit, + }: { + order?: 'asc' | 'desc'; + seek?: number; + limit?: number; + } = {}, + tran?: DBTransaction, + ): AsyncGenerator<[GestaltId, number]> { + if (tran == null) { + return yield* this.db.withTransactionG((tran) => + this.getVertexProcessedTimes({ order, seek, limit }, tran), + ); + } + + const iterator = tran.iterator( + this.vertexProcessedTimeOrderPath, + { + keyAsBuffer: true, + keys: true, + values: true, + valueAsBuffer: false, + reverse: order !== 'asc', + lte: + seek != null + ? [utils.lexiPackBuffer(seek), Buffer.alloc(1, 0)] + : undefined, + }, + ); + for await (const [path, gestaltIdEncoded] of iterator) { + const lastProcessedTime = utils.lexiUnpackBuffer(path[0] as Buffer); + if (lastProcessedTime == null) { + never('lastProcessedTime should be valid here'); + } + const gestaltId = gestaltsUtils.decodeGestaltId(gestaltIdEncoded); + if (gestaltId == null) never('GestaltId should be valid here'); + yield [gestaltId, lastProcessedTime]; + } + } + + /** + * Updates the newest `ClaimId` for a node if it's newer than the current id stored + */ + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async setClaimIdNewest( + nodeId: NodeId, + claimId: ClaimId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.setClaimIdNewest(nodeId, claimId, tran), + ); + } + + await tran.lock( + [...this.nodeClaimIdOldestPath, nodeId.toString()].join(''), + ); + // Getting existing ClaimId + const claimIdExisting = await this.getClaimIdNewest(nodeId, tran); + // If an old claimId already exists and is older, then we don't update + if ( + claimIdExisting != null && + Buffer.compare(claimIdExisting, claimId) === 1 + ) { + return; + } + // Setting new one + await tran.put( + [...this.nodeClaimIdOldestPath, nodeId.toBuffer()], + claimId.toBuffer(), + true, + ); + } + + @ready(new gestaltsErrors.ErrorGestaltsGraphNotRunning()) + public async getClaimIdNewest( + nodeId: NodeId, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.getClaimIdNewest(nodeId, tran), + ); + } + + const claimIdRaw = await tran.get( + [...this.nodeClaimIdOldestPath, nodeId.toBuffer()], + true, + ); + if (claimIdRaw == null) return; + return IdInternal.fromBuffer(claimIdRaw); + } + // LINKING AND UNLINKING VERTICES /** diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index b4c423630..3789fc0b6 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -569,59 +569,11 @@ describe('Discovery', () => { do { existingTasks = await discovery.waitForDiscoveryTasks(); } while (existingTasks > 0); - // Only the queued vertex should be processed + // All vertices should be reprocessed expect(processVertexMock).toHaveBeenCalledTimes(3); await taskManager.stopProcessing(); await discovery.stop(); await discovery.destroy(); }); - - test('asd', async () => { - const discovery = await Discovery.createDiscovery({ - db, - keyRing, - gestaltGraph, - identitiesManager, - nodeManager, - taskManager, - logger, - }); - await taskManager.startProcessing(); - - // Attempt initial discovery - await discovery.queueDiscoveryByNode(nodeA.keyRing.getNodeId()); - let existingTasks: number = 0; - do { - existingTasks = await discovery.waitForDiscoveryTasks(); - } while (existingTasks > 0); - - // None should be re-discovered since they were processed after the threshold - await discovery.checkRediscovery(0); - // There should be 0 discovery tasks - - let tasks: number = 0; - for await (const _ of taskManager.getTasks('asc', false, [ - Discovery.name, - discovery.discoverVertexHandlerId, - ])) { - tasks++; - } - expect(tasks).toBe(0); - - // All should be re-discovered - await discovery.checkRediscovery(Date.now()); - tasks = 0; - for await (const _ of taskManager.getTasks('asc', false, [ - Discovery.name, - discovery.discoverVertexHandlerId, - ])) { - tasks++; - } - expect(tasks).toBeGreaterThan(0); - - await taskManager.stopProcessing(); - await discovery.stop(); - await discovery.destroy(); - }); }); diff --git a/tests/gestalts/GestaltGraph.test.ts b/tests/gestalts/GestaltGraph.test.ts index 94d21de9e..c2a24e22a 100644 --- a/tests/gestalts/GestaltGraph.test.ts +++ b/tests/gestalts/GestaltGraph.test.ts @@ -32,7 +32,9 @@ import * as keysUtils from '@/keys/utils'; import Token from '@/tokens/Token'; import { encodeGestaltNodeId, encodeGestaltIdentityId } from '@/gestalts/utils'; import * as nodesUtils from '@/nodes/utils'; +import * as claimsUtils from '@/claims/utils'; import * as testsGestaltsUtils from './utils'; +import * as testsUtils from '../utils'; import * as testsIdentitiesUtils from '../identities/utils'; import * as testsKeysUtils from '../keys/utils'; import * as ids from '../../src/ids'; @@ -1395,4 +1397,34 @@ describe('GestaltGraph', () => { { numRuns: 20 }, ); }); + test('Should only set the newest ClaimId', async () => { + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + + // Creating 3 claimIDs. + const nodeId = testsUtils.generateRandomNodeId(); + const claimIdGenerator = claimsUtils.createClaimIdGenerator(nodeId); + const claimId1 = claimIdGenerator(); + const claimId2 = claimIdGenerator(); + const claimId3 = claimIdGenerator(); + + await gestaltGraph.setClaimIdNewest(nodeId, claimId1); + expect( + (await gestaltGraph.getClaimIdNewest(nodeId))?.toMultibase('base32hex'), + ).toEqual(claimId1.toMultibase('base32hex')); + await gestaltGraph.setClaimIdNewest(nodeId, claimId3); + expect( + (await gestaltGraph.getClaimIdNewest(nodeId))?.toMultibase('base32hex'), + ).toEqual(claimId3.toMultibase('base32hex')); + await gestaltGraph.setClaimIdNewest(nodeId, claimId2); + expect( + (await gestaltGraph.getClaimIdNewest(nodeId))?.toMultibase('base32hex'), + ).toEqual(claimId3.toMultibase('base32hex')); + + await gestaltGraph.stop(); + }); }); From eb1a9aa1d13d16ebccaddff403ea9566ad7058cf Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 15 Apr 2024 15:37:12 +1000 Subject: [PATCH 3/4] tests: converted `GestaltGraph.test.ts` tests to use `test.prop` [ci skip] --- tests/gestalts/GestaltGraph.test.ts | 319 +++++++++++++--------------- 1 file changed, 148 insertions(+), 171 deletions(-) diff --git a/tests/gestalts/GestaltGraph.test.ts b/tests/gestalts/GestaltGraph.test.ts index c2a24e22a..a60a46977 100644 --- a/tests/gestalts/GestaltGraph.test.ts +++ b/tests/gestalts/GestaltGraph.test.ts @@ -21,7 +21,7 @@ import path from 'path'; import fs from 'fs'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { DB } from '@matrixai/db'; -import { fc, testProp } from '@fast-check/jest'; +import { fc, test } from '@fast-check/jest'; import { AsyncIterableX as AsyncIterable } from 'ix/asynciterable'; import GestaltGraph from '@/gestalts/GestaltGraph'; import ACL from '@/acl/ACL'; @@ -267,9 +267,8 @@ describe('GestaltGraph', () => { gestaltsErrors.ErrorGestaltsGraphNotRunning, ); }); - testProp( + test.prop([gestaltNodeInfoComposedArb])( 'getNode, setNode and unsetNode', - [gestaltNodeInfoComposedArb], async (gestaltNodeInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -291,9 +290,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltNodeInfoComposedArb])( 'setNode updates node information', - [gestaltNodeInfoComposedArb], async (gestaltNodeInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -319,9 +317,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkNodeComposedArb])( 'linkNodeAndNode and unlinkNodeAndNode', - [linkNodeComposedArb], async ({ gestaltNodeInfo1, gestaltNodeInfo2, linkNode }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -374,9 +371,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltIdentityInfoComposedArb])( 'get, set and unset identity', - [gestaltIdentityInfoComposedArb], async (gestaltIdentityInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -405,9 +401,8 @@ describe('GestaltGraph', () => { } }, ); - testProp( + test.prop([gestaltIdentityInfoComposedArb])( 'setIdentity updates identity info', - [gestaltIdentityInfoComposedArb], async (gestaltIdentityInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -446,9 +441,8 @@ describe('GestaltGraph', () => { } }, ); - testProp( + test.prop([linkIdentityComposedArb])( 'linkNodeAndIdentity and unlinkNodeAndIdentity', - [linkIdentityComposedArb], async ({ gestaltNodeInfo, gestaltIdentityInfo, linkIdentity }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -502,9 +496,8 @@ describe('GestaltGraph', () => { } }, ); - testProp( + test.prop([gestaltInfoComposedArb])( 'getVertex, setVertex and unsetVertex', - [gestaltInfoComposedArb], async (gestaltInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -525,9 +518,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltInfoComposedArb])( 'setVertex updates vertex information', - [gestaltInfoComposedArb], async (gestaltInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -555,9 +547,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkVertexComposedArb])( 'linkVertexAndVertex and unlinkVertexAndVertex', - [linkVertexComposedArb], async ({ gestaltVertexInfo1, gestaltVertexInfo2, gestaltLink }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -632,9 +623,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([gestaltNodeInfoComposedArb])( 'getGestaltByNode', - [gestaltNodeInfoComposedArb], async (gestaltNodeInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -664,9 +654,8 @@ describe('GestaltGraph', () => { }); }, ); - testProp( + test.prop([gestaltIdentityInfoComposedArb])( 'getGestaltByIdentity', - [gestaltIdentityInfoComposedArb], async (gestaltIdentityInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -699,7 +688,7 @@ describe('GestaltGraph', () => { }); }, ); - testProp('getGestalt', [gestaltInfoComposedArb], async (gestaltInfo) => { + test.prop([gestaltInfoComposedArb])('getGestalt', async (gestaltInfo) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, acl, @@ -752,50 +741,48 @@ describe('GestaltGraph', () => { fail('invalid type'); } }); - testProp( - 'getGestalts with nodes', - [fc.array(gestaltNodeInfoComposedArb, { minLength: 2, maxLength: 10 })], - async (gestaltNodeInfos) => { - const ids = new Set(); - for (const gestaltNodeInfo of gestaltNodeInfos) { - ids.add(nodesUtils.encodeNodeId(gestaltNodeInfo.nodeId)); - } - fc.pre(ids.size === gestaltNodeInfos.length); - const gestaltGraph = await GestaltGraph.createGestaltGraph({ - db, - acl, - logger, - fresh: true, + test.prop([ + fc.array(gestaltNodeInfoComposedArb, { minLength: 2, maxLength: 10 }), + ])('getGestalts with nodes', async (gestaltNodeInfos) => { + const ids = new Set(); + for (const gestaltNodeInfo of gestaltNodeInfos) { + ids.add(nodesUtils.encodeNodeId(gestaltNodeInfo.nodeId)); + } + fc.pre(ids.size === gestaltNodeInfos.length); + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + for (const gestaltNodeInfo of gestaltNodeInfos) { + await gestaltGraph.setNode(gestaltNodeInfo); + } + const gestalts = await AsyncIterable.as( + gestaltGraph.getGestalts(), + ).toArray(); + expect(gestalts).toHaveLength(gestaltNodeInfos.length); + for (const gestalt of gestalts) { + const gestaltId = Object.keys(gestalt.nodes)[0]; + const [, nodeId] = gestaltsUtils.decodeGestaltNodeId(gestaltId)!; + expect(gestalt).toMatchObject({ + matrix: { + [gestaltId]: {}, + }, + nodes: { + [gestaltId]: { nodeId }, + }, + identities: {}, }); - for (const gestaltNodeInfo of gestaltNodeInfos) { - await gestaltGraph.setNode(gestaltNodeInfo); - } - const gestalts = await AsyncIterable.as( - gestaltGraph.getGestalts(), - ).toArray(); - expect(gestalts).toHaveLength(gestaltNodeInfos.length); - for (const gestalt of gestalts) { - const gestaltId = Object.keys(gestalt.nodes)[0]; - const [, nodeId] = gestaltsUtils.decodeGestaltNodeId(gestaltId)!; - expect(gestalt).toMatchObject({ - matrix: { - [gestaltId]: {}, - }, - nodes: { - [gestaltId]: { nodeId }, - }, - identities: {}, - }); - } - }, - ); - testProp( + } + }); + test.prop([ + fc + .array(gestaltIdentityInfoComposedArb, { minLength: 2, maxLength: 10 }) + .noShrink(), + ])( 'getGestalts with identities', - [ - fc - .array(gestaltIdentityInfoComposedArb, { minLength: 2, maxLength: 10 }) - .noShrink(), - ], + async (gestaltIdentityInfos) => { const ids = new Set(); for (const gestaltIdentityInfo of gestaltIdentityInfos) { @@ -836,80 +823,77 @@ describe('GestaltGraph', () => { } }, ); - testProp( - 'getGestalts with nodes and identities', - [fc.array(gestaltInfoComposedArb, { minLength: 2, maxLength: 10 })], - async (gestaltInfos) => { - const ids = new Set(); - for (const gestaltInfo of gestaltInfos) { - const [type, data] = gestaltInfo; - switch (type) { - case 'identity': - ids.add(data.providerId + data.identityId); - break; - case 'node': - ids.add(nodesUtils.encodeNodeId(data.nodeId)); - break; - default: - break; - } - } - fc.pre(ids.size === gestaltInfos.length); - const gestaltGraph = await GestaltGraph.createGestaltGraph({ - db, - acl, - logger, - fresh: true, - }); - for (const gestaltinfo of gestaltInfos) { - await gestaltGraph.setVertex(gestaltinfo); + test.prop([ + fc.array(gestaltInfoComposedArb, { minLength: 2, maxLength: 10 }), + ])('getGestalts with nodes and identities', async (gestaltInfos) => { + const ids = new Set(); + for (const gestaltInfo of gestaltInfos) { + const [type, data] = gestaltInfo; + switch (type) { + case 'identity': + ids.add(data.providerId + data.identityId); + break; + case 'node': + ids.add(nodesUtils.encodeNodeId(data.nodeId)); + break; + default: + break; } - const gestalts = await AsyncIterable.as( - gestaltGraph.getGestalts(), - ).toArray(); - expect(gestalts).toHaveLength(gestaltInfos.length); - for (const gestalt of gestalts) { - const gestaltId = Object.keys(gestalt.matrix)[0]; - const [type, id] = gestaltsUtils.decodeGestaltId(gestaltId)!; - switch (type) { - case 'node': - { - expect(gestalt).toMatchObject({ - matrix: { - [gestaltId]: {}, - }, - nodes: { - [gestaltId]: { nodeId: id }, - }, - identities: {}, - }); - } - break; - case 'identity': - { - expect(gestalt).toMatchObject({ - matrix: { - [gestaltId]: {}, - }, - nodes: {}, - identities: { - [gestaltId]: { - providerId: id[0], - identityId: id[1], - }, + } + fc.pre(ids.size === gestaltInfos.length); + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + for (const gestaltinfo of gestaltInfos) { + await gestaltGraph.setVertex(gestaltinfo); + } + const gestalts = await AsyncIterable.as( + gestaltGraph.getGestalts(), + ).toArray(); + expect(gestalts).toHaveLength(gestaltInfos.length); + for (const gestalt of gestalts) { + const gestaltId = Object.keys(gestalt.matrix)[0]; + const [type, id] = gestaltsUtils.decodeGestaltId(gestaltId)!; + switch (type) { + case 'node': + { + expect(gestalt).toMatchObject({ + matrix: { + [gestaltId]: {}, + }, + nodes: { + [gestaltId]: { nodeId: id }, + }, + identities: {}, + }); + } + break; + case 'identity': + { + expect(gestalt).toMatchObject({ + matrix: { + [gestaltId]: {}, + }, + nodes: {}, + identities: { + [gestaltId]: { + providerId: id[0], + identityId: id[1], }, - }); - } - break; - default: - fail('invalid type'); - } + }, + }); + } + break; + default: + fail('invalid type'); } - }, - ); - testProp( + } + }); + test.prop([linkNodeComposedArb])( 'getGestalt with node links', - [linkNodeComposedArb], async ({ gestaltNodeInfo1, gestaltNodeInfo2, linkNode }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -969,9 +953,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkIdentityComposedArb])( 'getGestalt with identity links', - [linkIdentityComposedArb], async ({ gestaltNodeInfo, gestaltIdentityInfo, linkIdentity }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -1032,9 +1015,8 @@ describe('GestaltGraph', () => { await gestaltGraph.stop(); }, ); - testProp( + test.prop([linkVertexComposedArb])( 'getGestalt with node and identity links', - [linkVertexComposedArb], async ({ gestaltVertexInfo1, gestaltVertexInfo2, gestaltLink }) => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ db, @@ -1364,38 +1346,33 @@ describe('GestaltGraph', () => { }) .noShrink(); - testProp( - 'model', - [altCommandsArb], - async (cmds) => { - await acl.start({ fresh: true }); - const gestaltGraph = await GestaltGraph.createGestaltGraph({ - db, - acl, - logger, - fresh: true, - }); - try { - const model: testsGestaltsUtils.GestaltGraphModel = { - matrix: {}, - nodes: {}, - identities: {}, - permissions: {}, - }; - const modelSetup = async () => { - return { - model, - real: gestaltGraph, - }; + test.prop([altCommandsArb], { numRuns: 20 })('model', async (cmds) => { + await acl.start({ fresh: true }); + const gestaltGraph = await GestaltGraph.createGestaltGraph({ + db, + acl, + logger, + fresh: true, + }); + try { + const model: testsGestaltsUtils.GestaltGraphModel = { + matrix: {}, + nodes: {}, + identities: {}, + permissions: {}, + }; + const modelSetup = async () => { + return { + model, + real: gestaltGraph, }; - await fc.asyncModelRun(modelSetup, cmds); - } finally { - await gestaltGraph.stop(); - await acl.stop(); - } - }, - { numRuns: 20 }, - ); + }; + await fc.asyncModelRun(modelSetup, cmds); + } finally { + await gestaltGraph.stop(); + await acl.stop(); + } + }); }); test('Should only set the newest ClaimId', async () => { const gestaltGraph = await GestaltGraph.createGestaltGraph({ From 57963a0c5a8f329eed7ba597ed01a59fdd1d6557 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 15 Apr 2024 15:55:17 +1000 Subject: [PATCH 4/4] fix: small review fix [ci skip] --- tests/discovery/Discovery.test.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index 3789fc0b6..ab40edb29 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -490,14 +490,6 @@ describe('Discovery', () => { do { existingTasks = await discovery.waitForDiscoveryTasks(); } while (existingTasks > 0); - - // For await (const task of taskManager.getTasks('asc', false, [ - // Discovery.name, - // discovery.discoverVertexHandlerId, - // ])) { - // console.log(task); - // } - await discovery.waitForDiscoveryTasks(true); await taskManager.stopProcessing();