From b268e3706085f1ccec4120f3fd24a1cd4b1abf47 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Wed, 15 Nov 2023 16:02:55 +1100 Subject: [PATCH 1/5] feat: `audit` domain --- src/PolykeyAgent.ts | 18 ++ src/audit/Audit.ts | 378 +++++++++++++++++++++++++++++ src/audit/errors.ts | 44 ++++ src/audit/events.ts | 25 ++ src/audit/index.ts | 5 + src/audit/types.ts | 122 ++++++++++ src/audit/utils.ts | 77 ++++++ src/errors.ts | 2 +- src/ids/index.ts | 30 +++ src/ids/types.ts | 7 + src/network/types.ts | 2 +- src/nodes/NodeConnectionManager.ts | 12 +- src/nodes/NodeManager.ts | 12 +- src/nodes/events.ts | 10 +- tests/audit/Audit.test.ts | 257 ++++++++++++++++++++ 15 files changed, 990 insertions(+), 11 deletions(-) create mode 100644 src/audit/Audit.ts create mode 100644 src/audit/errors.ts create mode 100644 src/audit/events.ts create mode 100644 src/audit/index.ts create mode 100644 src/audit/types.ts create mode 100644 src/audit/utils.ts create mode 100644 tests/audit/Audit.test.ts diff --git a/src/PolykeyAgent.ts b/src/PolykeyAgent.ts index 670698dca..f59c5af38 100644 --- a/src/PolykeyAgent.ts +++ b/src/PolykeyAgent.ts @@ -13,6 +13,7 @@ import { ready, } from '@matrixai/async-init/dist/CreateDestroyStartStop'; import { WorkerManager } from './workers'; +import Audit from './audit/Audit'; import KeyRing from './keys/KeyRing'; import CertManager from './keys/CertManager'; import Status from './status/Status'; @@ -220,6 +221,7 @@ class PolykeyAgent { let vaultManager: VaultManager | undefined; let sessionManager: SessionManager | undefined; let clientService: ClientService | undefined; + let audit: Audit | undefined; try { status = new Status({ statusPath, @@ -442,6 +444,12 @@ class PolykeyAgent { rpcParserBufferSize: optionsDefaulted.client.rpcParserBufferSize, logger: logger.getChild(ClientService.name), }); + audit = await Audit.createAudit({ + db, + nodeConnectionManager, + fresh, + logger: logger.getChild(Audit.name), + }); } catch (e) { logger.warn(`Failed Creating ${this.name}`); await sessionManager?.stop(); @@ -455,6 +463,7 @@ class PolykeyAgent { await sigchain?.stop(); await certManager?.stop(); await taskManager?.stop(); + await audit?.stop(); await db?.stop(); await keyRing?.stop(); await schema?.stop(); @@ -463,6 +472,7 @@ class PolykeyAgent { } const pkAgent = new this({ nodePath: optionsDefaulted.nodePath, + audit, status, schema, keyRing, @@ -502,6 +512,7 @@ class PolykeyAgent { } public readonly nodePath: string; + public readonly audit: Audit; public readonly status: Status; public readonly schema: Schema; public readonly keyRing: KeyRing; @@ -548,6 +559,7 @@ class PolykeyAgent { constructor({ nodePath, + audit, status, schema, keyRing, @@ -571,6 +583,7 @@ class PolykeyAgent { logger, }: { nodePath: string; + audit: Audit; status: Status; schema: Schema; keyRing: KeyRing; @@ -595,6 +608,7 @@ class PolykeyAgent { }) { this.logger = logger; this.nodePath = nodePath; + this.audit = audit; this.status = status; this.schema = schema; this.keyRing = keyRing; @@ -804,6 +818,7 @@ class PolykeyAgent { agentHost: this.nodeConnectionManager.host, agentPort: this.nodeConnectionManager.port, }); + await this.audit.start({ fresh }); this._startTime = utils.getUnixtime(); this.logger.info(`Started ${this.constructor.name}`); } catch (e) { @@ -832,6 +847,7 @@ class PolykeyAgent { await this.sigchain?.stop(); await this.certManager?.stop(); await this.taskManager?.stop(); + await this.audit?.stop(); await this.db?.stop(); await this.keyRing?.stop(); await this.schema?.stop(); @@ -870,6 +886,7 @@ class PolykeyAgent { await this.sigchain.stop(); await this.certManager.stop(); await this.taskManager.stop(); + await this.audit.stop(); await this.db.stop(); await this.keyRing.stop(); await this.schema.stop(); @@ -918,6 +935,7 @@ class PolykeyAgent { await this.certManager.destroy(); await this.taskManager.stop(); await this.taskManager.destroy(); + await this.audit.destroy(); // Non-TaskManager dependencies await this.db.stop(); // Non-DB dependencies diff --git a/src/audit/Audit.ts b/src/audit/Audit.ts new file mode 100644 index 000000000..b16facf0c --- /dev/null +++ b/src/audit/Audit.ts @@ -0,0 +1,378 @@ +import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; +import type { + TopicSubPath, + TopicPath, + TopicSubPathToAuditEvent, + AuditEvent, + MetricPath, + MetricPathToAuditMetric, + AuditMetricNodeConnection, +} from './types'; +import type { AuditEventId } from '../ids/types'; +import type NodeConnectionManager from '../nodes/NodeConnectionManager'; +import type { AbstractEvent } from '@matrixai/events'; +import Logger from '@matrixai/logger'; +import { IdInternal } from '@matrixai/id'; +import { + CreateDestroyStartStop, + ready, +} from '@matrixai/async-init/dist/CreateDestroyStartStop'; +import * as sortableIdUtils from '@matrixai/id/dist/IdSortable'; +import * as auditErrors from './errors'; +import * as auditEvents from './events'; +import * as auditUtils from './utils'; +import * as nodesEvents from '../nodes/events'; +import * as ids from '../ids'; +import * as utils from '../utils'; + +interface Audit extends CreateDestroyStartStop {} +@CreateDestroyStartStop( + new auditErrors.ErrorAuditRunning(), + new auditErrors.ErrorAuditDestroyed(), + { + eventStart: auditEvents.EventAuditStart, + eventStarted: auditEvents.EventAuditStarted, + eventStop: auditEvents.EventAuditStop, + eventStopped: auditEvents.EventAuditStopped, + eventDestroy: auditEvents.EventAuditDestroy, + eventDestroyed: auditEvents.EventAuditDestroyed, + }, +) +class Audit { + static async createAudit({ + db, + nodeConnectionManager, + logger = new Logger(this.name), + fresh = false, + }: { + db: DB; + nodeConnectionManager: NodeConnectionManager; + logger?: Logger; + fresh?: boolean; + }): Promise { + logger.info(`Creating ${this.name}`); + const audit = new this({ db, nodeConnectionManager, logger }); + await audit.start({ fresh }); + logger.info(`Created ${this.name}`); + return audit; + } + + protected logger: Logger; + protected db: DB; + protected nodeConnectionManager: NodeConnectionManager; + + protected eventHandlerMap: Map< + typeof AbstractEvent, + { + target: EventTarget; + handler: (evt: AbstractEvent) => Promise; + } + > = new Map(); + protected auditDbPath: LevelPath = [this.constructor.name]; + protected dbLastAuditEventIdPath: LevelPath = [ + this.constructor.name, + 'lastAuditEventId', + ]; + protected auditEventDbPath: LevelPath = [this.constructor.name, 'event']; + protected auditTopicDbPath: LevelPath = [this.constructor.name, 'topic']; + protected generateAuditEventId: () => AuditEventId; + + constructor({ + db, + nodeConnectionManager, + logger, + }: { + db: DB; + nodeConnectionManager: NodeConnectionManager; + logger: Logger; + }) { + this.logger = logger; + this.nodeConnectionManager = nodeConnectionManager; + this.db = db; + } + + public async start({ + fresh = false, + }: { + fresh?: boolean; + } = {}): Promise { + this.logger.info(`Starting ${this.constructor.name}`); + if (fresh) { + await this.db.clear(this.auditDbPath); + } + const lastAuditEventId = await this.getLastAuditEventId(); + this.generateAuditEventId = + auditUtils.createAuditEventIdGenerator(lastAuditEventId); + // Setup event handlers + this.setEventHandler( + this.nodeConnectionManager, + nodesEvents.EventNodeConnectionManagerConnectionForward, + auditUtils.nodeConnectionForwardTopicPath, + auditUtils.fromEventNodeConnectionManagerConnectionForward, + ); + this.setEventHandler( + this.nodeConnectionManager, + nodesEvents.EventNodeConnectionManagerConnectionReverse, + auditUtils.nodeConnectionReverseTopicPath, + auditUtils.fromEventNodeConnectionManagerConnectionReverse, + ); + this.logger.info(`Started ${this.constructor.name}`); + } + + public async stop(): Promise { + this.logger.info(`Stopping ${this.constructor.name}`); + for (const [eventConstructor, { target, handler }] of this + .eventHandlerMap) { + target.removeEventListener(eventConstructor.name, handler); + } + this.logger.info(`Stopped ${this.constructor.name}`); + } + + public async destroy() { + this.logger.info(`Destroying ${this.constructor.name}`); + await this.db.clear(this.auditDbPath); + this.logger.info(`Destroyed ${this.constructor.name}`); + } + + @ready(new auditErrors.ErrorAuditNotRunning(), false, ['starting']) + public async getLastAuditEventId( + tran?: DBTransaction, + ): Promise { + const lastAuditEventIdBuffer = await (tran ?? this.db).get( + this.dbLastAuditEventIdPath, + true, + ); + if (lastAuditEventIdBuffer == null) return; + return IdInternal.fromBuffer(lastAuditEventIdBuffer); + } + + @ready(new auditErrors.ErrorAuditNotRunning(), false, ['starting']) + protected setEventHandler< + T extends typeof AbstractEvent, + P extends TopicPath, + >( + target: EventTarget, + event: T, + topicPath: P, + toAuditEvent: ( + evt: InstanceType, + ) => + | Promise['data']> + | TopicSubPathToAuditEvent

['data'], + ) { + const handler = async (evt: InstanceType) => { + const eventData = await toAuditEvent(evt); + await this.db.withTransactionF(async (tran) => { + const event: AuditEvent = { + data: eventData, + }; + await this.addAuditEvent(topicPath, event as any, tran); + }); + }; + target.addEventListener(event.name, handler); + this.eventHandlerMap.set(event, { target, handler: handler as any }); + } + + @ready(new auditErrors.ErrorAuditNotRunning()) + protected async setAuditEvent( + topicPath: TopicPath, + auditEventId: AuditEventId, + auditEvent: TopicSubPathToAuditEvent, + tran?: DBTransaction, + ) { + if (tran == null) { + return await this.db.withTransactionF((tran) => + this.setAuditEvent(topicPath, auditEventId, auditEvent, tran), + ); + } + const auditEventIdBuffer = auditEventId.toBuffer(); + await tran.put([...this.auditEventDbPath, auditEventIdBuffer], auditEvent); + const subTopicArray: Array = []; + for (const topic of topicPath) { + subTopicArray.push(topic); + await tran.put( + [...this.auditTopicDbPath, subTopicArray.join('.'), auditEventIdBuffer], + true, + ); + } + } + + @ready(new auditErrors.ErrorAuditNotRunning()) + protected async addAuditEvent( + topicPath: TopicPath, + auditEvent: TopicSubPathToAuditEvent, + tran?: DBTransaction, + ) { + if (tran == null) { + return await this.db.withTransactionF((tran) => + this.addAuditEvent(topicPath, auditEvent, tran), + ); + } + const auditEventId = this.generateAuditEventId(); + await this.setAuditEvent(topicPath, auditEventId, auditEvent, tran); + } + + @ready(new auditErrors.ErrorAuditNotRunning()) + public async *getAuditEvents( + topicPath: T, + { + seek, + order, + limit, + }: { + seek?: AuditEventId; + order?: 'asc' | 'desc'; + limit?: number; + } = {}, + tran?: DBTransaction, + ): AsyncGenerator< + TopicSubPathToAuditEvent, + void, + TopicSubPathToAuditEvent + > { + if (tran == null) { + const getEvents = (tran) => + this.getAuditEvents( + topicPath, + { + seek, + order, + limit, + }, + tran, + ); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* getEvents(tran); + }); + } + if (topicPath.length === 0) { + for await (const [, auditEvent] of tran.iterator< + TopicSubPathToAuditEvent + >(this.auditEventDbPath, { + keys: false, + values: true, + valueAsBuffer: false, + reverse: order !== 'asc', + limit, + gte: seek?.toBuffer(), + })) { + yield auditEvent; + } + return; + } + const iterator = tran.iterator( + [...this.auditTopicDbPath, topicPath.join('.')], + { + keys: true, + values: false, + valueAsBuffer: false, + reverse: order !== 'asc', + limit, + }, + ); + if (seek != null) { + iterator.seek(seek.toBuffer()); + } + for await (const [keyPath] of iterator) { + const event = await tran.get>([ + ...this.auditEventDbPath, + keyPath.at(-1)!, + ]); + if (event != null) { + yield event; + } + } + } + + @ready(new auditErrors.ErrorAuditNotRunning()) + public async getAuditMetric( + metricPath: T, + options: { from?: Date | number; to?: Date | number } = {}, + tran?: DBTransaction, + ): Promise> { + if (tran == null) { + return await this.db.withTransactionF((tran) => + this.getAuditMetric(metricPath, options, tran), + ); + } + + let fromEpoch = + options.from instanceof Date ? options.from.getTime() : options.from; + let toEpoch = + options.to instanceof Date ? options.to.getTime() : options.to; + + let fromIdBuffer: Buffer | undefined; + if (fromEpoch != null) { + fromIdBuffer = ids + .generateAuditEventIdFromEpoch(fromEpoch, () => new Uint8Array()) + .toBuffer(); + } + let toIdBuffer: Buffer | undefined; + if (toEpoch != null) { + toIdBuffer = ids + .generateAuditEventIdFromEpoch(toEpoch, (size) => + new Uint8Array(size).fill(0xff), + ) + .toBuffer(); + } + + if (metricPath[0] === 'node') { + if (metricPath[1] === 'connection') { + let path: readonly string[] = auditUtils.nodeConnectionMetricPath; + if (metricPath[2] === 'inbound') { + path = auditUtils.nodeConnectionReverseTopicPath; + } else if (metricPath[2] === 'outbound') { + path = auditUtils.nodeConnectionForwardTopicPath; + } + const metric: AuditMetricNodeConnection = { + data: { + total: 0, + averagePerMinute: 0, + averagePerHour: 0, + averagePerDay: 0, + }, + }; + let lastKey: Buffer | undefined; + for await (const [keyPath] of tran.iterator( + [...this.auditTopicDbPath, path.join('.')], + { + keyAsBuffer: true, + keys: true, + values: false, + gte: fromIdBuffer, + lte: toIdBuffer, + }, + )) { + const key = keyPath.at(-1)! as Buffer; + if (metric.data.total === 0) { + fromEpoch = sortableIdUtils.extractTs(key) * 1000; + } else { + lastKey = key; + } + metric.data.total += 1; + } + if (fromEpoch != null) { + if (lastKey != null) { + toEpoch = sortableIdUtils.extractTs(lastKey) * 1000; + } else { + toEpoch = Date.now(); + } + const timeframeTime = toEpoch - fromEpoch; + const timeframeMinutes = timeframeTime / 60_000; + const timeframeHours = timeframeMinutes / 60; + const timeframeDays = timeframeHours / 24; + metric.data.averagePerMinute = + metric.data.total / Math.ceil(timeframeMinutes); + metric.data.averagePerHour = + metric.data.total / Math.ceil(timeframeHours); + metric.data.averagePerDay = + metric.data.total / Math.ceil(timeframeDays); + } + return metric as any; + } + } + utils.never(); + } +} + +export default Audit; diff --git a/src/audit/errors.ts b/src/audit/errors.ts new file mode 100644 index 000000000..1900ef3ac --- /dev/null +++ b/src/audit/errors.ts @@ -0,0 +1,44 @@ +import ErrorPolykey from '../ErrorPolykey'; +import sysexits from '../utils/sysexits'; + +class ErrorAudit extends ErrorPolykey {} + +class ErrorAuditRunning extends ErrorAudit { + static description = 'Audit is running'; + exitCode = sysexits.USAGE; +} + +class ErrorAuditNotRunning extends ErrorAudit { + static description = 'Audit is not running'; + exitCode = sysexits.USAGE; +} + +class ErrorAuditDestroyed extends ErrorAudit { + static description = 'Audit is destroyed'; + exitCode = sysexits.USAGE; +} + +class ErrorAuditNodeIdMissing extends ErrorAudit { + static description = 'Could not find NodeId'; + exitCode = sysexits.NOUSER; +} + +class ErrorAuditVaultIdMissing extends ErrorAudit { + static description = 'Could not find VaultId'; + exitCode = sysexits.DATAERR; +} + +class ErrorAuditNodeIdExists extends ErrorAudit { + static description = 'NodeId already exists'; + exitCode = sysexits.DATAERR; +} + +export { + ErrorAudit, + ErrorAuditRunning, + ErrorAuditNotRunning, + ErrorAuditDestroyed, + ErrorAuditNodeIdMissing, + ErrorAuditVaultIdMissing, + ErrorAuditNodeIdExists, +}; diff --git a/src/audit/events.ts b/src/audit/events.ts new file mode 100644 index 000000000..227849e3c --- /dev/null +++ b/src/audit/events.ts @@ -0,0 +1,25 @@ +import EventPolykey from '../EventPolykey'; + +abstract class EventAudit extends EventPolykey {} + +class EventAuditStart extends EventAudit {} + +class EventAuditStarted extends EventAudit {} + +class EventAuditStop extends EventAudit {} + +class EventAuditStopped extends EventAudit {} + +class EventAuditDestroy extends EventAudit {} + +class EventAuditDestroyed extends EventAudit {} + +export { + EventAudit, + EventAuditStart, + EventAuditStarted, + EventAuditStop, + EventAuditStopped, + EventAuditDestroy, + EventAuditDestroyed, +}; diff --git a/src/audit/index.ts b/src/audit/index.ts new file mode 100644 index 000000000..5f4fded00 --- /dev/null +++ b/src/audit/index.ts @@ -0,0 +1,5 @@ +export { default as Audit } from './Audit'; +export * as types from './types'; +export * as utils from './utils'; +export * as errors from './errors'; +export * as events from './events'; diff --git a/src/audit/types.ts b/src/audit/types.ts new file mode 100644 index 000000000..026081ec5 --- /dev/null +++ b/src/audit/types.ts @@ -0,0 +1,122 @@ +import type { POJO } from '../types'; +import type { + nodeConnectionInboundMetricPath, + nodeConnectionReverseTopicPath, + nodeConnectionOutboundMetricPath, + nodeConnectionForwardTopicPath, + nodeConnectionMetricPath, +} from './utils'; + +// Events + +type IsSubpath< + T extends readonly any[], + U extends readonly any[], +> = U extends readonly [...T, ...infer _] ? true : false; + +type InferTypeFromSubpath< + T extends readonly any[], + U extends readonly any[], + E, +> = IsSubpath extends true ? E : never; + +/** + * Represents a capture of an event. + */ +type AuditEvent = { + data: T; +}; + +type TopicPath = + // Nodes + typeof nodeConnectionReverseTopicPath | typeof nodeConnectionForwardTopicPath; + +type TopicSubPath = + | (T extends readonly [...infer Head, infer Tail] + ? [...Head, Tail] | TopicSubPath + : []) + | TopicPath; + +type TopicSubPathToAuditEvent = + // Nodes + | InferTypeFromSubpath< + T, + typeof nodeConnectionReverseTopicPath, + AuditEventNodeConnectionReverse + > + | InferTypeFromSubpath< + T, + typeof nodeConnectionForwardTopicPath, + AuditEventNodeConnectionForward + >; + +// Nodes + +type AuditEventNodeConnectionBase = AuditEvent<{ + remoteNodeId: string; + remoteHost: string; + remotePort: number; +}>; + +type AuditEventNodeConnection = + | AuditEventNodeConnectionReverse + | AuditEventNodeConnectionForward; + +type AuditEventNodeConnectionReverse = AuditEventNodeConnectionBase & + AuditEvent<{ + type: 'reverse'; + }>; + +type AuditEventNodeConnectionForward = AuditEventNodeConnectionBase & + AuditEvent<{ + type: 'forward'; + }>; + +// Metrics + +type MetricPath = + | typeof nodeConnectionMetricPath + | typeof nodeConnectionInboundMetricPath + | typeof nodeConnectionOutboundMetricPath; + +type MetricPathToAuditMetric = + // Nodes + T extends + | typeof nodeConnectionMetricPath + | typeof nodeConnectionInboundMetricPath + | typeof nodeConnectionOutboundMetricPath + ? AuditMetricNodeConnection + : never; + +/** + * Represents a capture of an event. + */ +type AuditMetric = { + data: T; +}; + +type AuditMetricNodeConnection = AuditMetric<{ + total: number; + averagePerMinute: number; + averagePerHour: number; + averagePerDay: number; +}>; + +export type { + // Event + IsSubpath, + InferTypeFromSubpath, + AuditEvent, + TopicPath, + TopicSubPathToAuditEvent, + TopicSubPath, + AuditEventNodeConnectionBase, + AuditEventNodeConnection, + AuditEventNodeConnectionReverse, + AuditEventNodeConnectionForward, + // Metric + MetricPath, + MetricPathToAuditMetric, + AuditMetric, + AuditMetricNodeConnection, +}; diff --git a/src/audit/utils.ts b/src/audit/utils.ts new file mode 100644 index 000000000..e6a813f70 --- /dev/null +++ b/src/audit/utils.ts @@ -0,0 +1,77 @@ +import type { + AuditEventNodeConnectionForward, + AuditEventNodeConnectionReverse, +} from './types'; +import type * as nodesEvents from '../nodes/events'; +import * as nodesUtils from '../nodes/utils'; +import { createAuditEventIdGenerator } from '../ids'; + +// Events + +// Nodes + +const nodeConnectionReverseTopicPath = [ + 'node', + 'connection', + 'reverse', +] as const; + +function fromEventNodeConnectionManagerConnectionReverse( + evt: nodesEvents.EventNodeConnectionManagerConnectionReverse, +): AuditEventNodeConnectionReverse['data'] { + return { + remoteNodeId: nodesUtils.encodeNodeId(evt.detail.remoteNodeId), + remoteHost: evt.detail.remoteHost, + remotePort: evt.detail.remotePort, + type: 'reverse', + }; +} + +const nodeConnectionForwardTopicPath = [ + 'node', + 'connection', + 'forward', +] as const; + +function fromEventNodeConnectionManagerConnectionForward( + evt: nodesEvents.EventNodeConnectionManagerConnectionForward, +): AuditEventNodeConnectionForward['data'] { + return { + remoteNodeId: nodesUtils.encodeNodeId(evt.detail.remoteNodeId), + remoteHost: evt.detail.remoteHost, + remotePort: evt.detail.remotePort, + type: 'forward', + }; +} + +const nodeGraphTopicPath = ['node', 'graph'] as const; + +// Metrics + +// Nodes + +const nodeConnectionMetricPath = ['node', 'connection'] as const; + +const nodeConnectionInboundMetricPath = [ + 'node', + 'connection', + 'inbound', +] as const; + +const nodeConnectionOutboundMetricPath = [ + 'node', + 'connection', + 'outbound', +] as const; + +export { + createAuditEventIdGenerator, + nodeConnectionReverseTopicPath, + fromEventNodeConnectionManagerConnectionReverse, + nodeConnectionForwardTopicPath, + fromEventNodeConnectionManagerConnectionForward, + nodeGraphTopicPath, + nodeConnectionMetricPath, + nodeConnectionInboundMetricPath, + nodeConnectionOutboundMetricPath, +}; diff --git a/src/errors.ts b/src/errors.ts index 2501fc5d7..843a93d4a 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -72,7 +72,7 @@ export { * reference all Polykey errors. * This is used by RPC to serialize errors from agent to client. */ -export * from './acl/errors'; +export * from './audit/errors'; export * from './sessions/errors'; export * from './keys/errors'; export * from './vaults/errors'; diff --git a/src/ids/index.ts b/src/ids/index.ts index be3a96743..17bc005c7 100644 --- a/src/ids/index.ts +++ b/src/ids/index.ts @@ -20,6 +20,7 @@ import type { GestaltLinkId, NotificationId, NotificationIdEncoded, + AuditEventId, } from './types'; import { IdInternal, IdSortable, IdRandom } from '@matrixai/id'; import * as keysUtilsRandom from '../keys/utils/random'; @@ -32,6 +33,33 @@ function createPermIdGenerator(): () => PermissionId { return () => generator.get(); } +function createAuditEventIdGenerator( + lastAuditEventId?: AuditEventId, +): () => AuditEventId { + const generator = new IdSortable({ + lastId: lastAuditEventId, + randomSource: keysUtilsRandom.getRandomBytes, + }); + return () => generator.get(); +} + +/** + * Generates an auditId from an epoch timestamp. + * + * @param epoch + * @param randomSource + */ +function generateAuditEventIdFromEpoch( + epoch: number, + randomSource: (size: number) => Uint8Array = keysUtilsRandom.getRandomBytes, +): AuditEventId { + const generator = new IdSortable({ + timeSource: () => () => epoch, + randomSource, + }); + return generator.get(); +} + /** * Creates a NodeId generator. * This does not use `IdRandom` because it is not a UUID4. @@ -463,6 +491,8 @@ function decodeNotificationId( export { createPermIdGenerator, + createAuditEventIdGenerator, + generateAuditEventIdFromEpoch, createNodeIdGenerator, isNodeId, assertNodeId, diff --git a/src/ids/types.ts b/src/ids/types.ts index b110b361f..bc015bac5 100644 --- a/src/ids/types.ts +++ b/src/ids/types.ts @@ -6,6 +6,11 @@ import type { Opaque } from '../types'; type PermissionId = Opaque<'PermissionId', Id>; type PermissionIdString = Opaque<'PermissionIdString', string>; +// Audit + +type AuditEventId = Opaque<'AuditEventId', Id>; +type AuditEventIdString = Opaque<'AuditEventIdString', string>; + // Keys type CertId = Opaque<'CertId', Id>; @@ -100,6 +105,8 @@ type NotificationIdEncoded = Opaque<'NotificationIdEncoded', string>; export type { PermissionId, PermissionIdString, + AuditEventId, + AuditEventIdString, CertId, CertIdString, CertIdEncoded, diff --git a/src/network/types.ts b/src/network/types.ts index e58dd851f..32f2d568a 100644 --- a/src/network/types.ts +++ b/src/network/types.ts @@ -28,7 +28,7 @@ type TLSConfig = { }; /** - * Used for the connection event when receiving a reverse connection. + * Used for the connection event when creating a forward connection or receiving a reverse connection. */ type ConnectionData = { remoteNodeId: NodeId; diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 16e60a89a..0e85f47b1 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -1080,6 +1080,16 @@ class NodeConnectionManager { const newConnAndTimer = this.addConnection(nodeId, connection); // We can assume connection was established and destination was valid, we can add the target to the nodeGraph connectionsResults.set(nodeIdString, newConnAndTimer); + const connectionData: ConnectionData = { + remoteNodeId: connection.nodeId, + remoteHost: connection.host, + remotePort: connection.port, + }; + this.dispatchEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: connectionData, + }), + ); this.logger.debug( `Created NodeConnection for ${nodesUtils.encodeNodeId( nodeId, @@ -1248,7 +1258,7 @@ class NodeConnectionManager { remotePort: nodeConnection.port, }; this.dispatchEvent( - new nodesEvents.EventNodeConnectionManagerConnection({ + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ detail: connectionData, }), ); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 2ea436005..4d042d33f 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -207,8 +207,8 @@ class NodeManager { public readonly checkSeedConnectionsHandlerId: TaskHandlerId = `${this.basePath}.${this.checkSeedConnectionsHandler.name}.checkSeedConnectionsHandler` as TaskHandlerId; - protected handleNodeConnectionEvent = async ( - e: nodesEvents.EventNodeConnectionManagerConnection, + protected handleEventNodeConnectionManagerConnectionReverse = async ( + e: nodesEvents.EventNodeConnectionManagerConnectionReverse, ) => { await this.setNode( e.detail.remoteNodeId, @@ -296,8 +296,8 @@ class NodeManager { }); // Add handling for connections this.nodeConnectionManager.addEventListener( - nodesEvents.EventNodeConnectionManagerConnection.name, - this.handleNodeConnectionEvent, + nodesEvents.EventNodeConnectionManagerConnectionReverse.name, + this.handleEventNodeConnectionManagerConnectionReverse, ); this.logger.info(`Started ${this.constructor.name}`); } @@ -306,8 +306,8 @@ class NodeManager { this.logger.info(`Stopping ${this.constructor.name}`); // Remove handling for connections this.nodeConnectionManager.removeEventListener( - nodesEvents.EventNodeConnectionManagerConnection.name, - this.handleNodeConnectionEvent, + nodesEvents.EventNodeConnectionManagerConnectionReverse.name, + this.handleEventNodeConnectionManagerConnectionReverse, ); this.logger.info('Cancelling ephemeral tasks'); if (this.taskManager.isProcessing()) { diff --git a/src/nodes/events.ts b/src/nodes/events.ts index 1a412fe04..8269fd8a1 100644 --- a/src/nodes/events.ts +++ b/src/nodes/events.ts @@ -32,7 +32,11 @@ class EventNodeConnectionManagerClose extends EventNodeConnectionManager {} class EventNodeConnectionManagerConnection extends EventNodeConnectionManager {} -class EventNodeConnectionManagerConnectionFailure extends EventNodeConnectionManager< +class EventNodeConnectionManagerConnectionForward extends EventNodeConnectionManagerConnection {} + +class EventNodeConnectionManagerConnectionReverse extends EventNodeConnectionManagerConnection {} + +class EventNodeConnectionManagerConnectionReverseFailure extends EventNodeConnectionManager< Error | EventNodeConnectionError > {} @@ -76,7 +80,9 @@ export { EventNodeConnectionManagerError, EventNodeConnectionManagerClose, EventNodeConnectionManagerConnection, - EventNodeConnectionManagerConnectionFailure, + EventNodeConnectionManagerConnectionForward, + EventNodeConnectionManagerConnectionReverse, + EventNodeConnectionManagerConnectionReverseFailure, EventNodeGraph, EventNodeGraphStart, EventNodeGraphStarted, diff --git a/tests/audit/Audit.test.ts b/tests/audit/Audit.test.ts new file mode 100644 index 000000000..c5ec72ab5 --- /dev/null +++ b/tests/audit/Audit.test.ts @@ -0,0 +1,257 @@ +import type { Key } from '@/keys/types'; +import type { ConnectionData, Host, Port } from '@/network/types'; +import type NodeConnectionManager from '@/nodes/NodeConnectionManager'; +import os from 'os'; +import path from 'path'; +import fs from 'fs'; +import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import { DB } from '@matrixai/db'; +import Audit from '@/audit/Audit'; +import * as utils from '@/utils'; +import * as auditErrors from '@/audit/errors'; +import * as keysUtils from '@/keys/utils'; +import * as nodeEvents from '@/nodes/events'; +import * as nodeUtils from '@/nodes/utils'; +import * as ids from '@/ids'; +import * as testNodesUtils from '../nodes/utils'; + +describe(Audit.name, () => { + const logger = new Logger(`${Audit.name} test`, LogLevel.WARN, [ + new StreamHandler(), + ]); + + let dataDir: string; + let db: DB; + let mockNodeConnectionManager: NodeConnectionManager; + beforeEach(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const dbKey = keysUtils.generateKey(); + const dbPath = `${dataDir}/db`; + db = await DB.createDB({ + dbPath, + logger, + crypto: { + key: dbKey, + ops: { + encrypt: async (key, plainText) => { + return keysUtils.encryptWithKey( + utils.bufferWrap(key) as Key, + utils.bufferWrap(plainText), + ); + }, + decrypt: async (key, cipherText) => { + return keysUtils.decryptWithKey( + utils.bufferWrap(key) as Key, + utils.bufferWrap(cipherText), + ); + }, + }, + }, + }); + mockNodeConnectionManager = new EventTarget() as NodeConnectionManager; + }); + afterEach(async () => { + await db.stop(); + await fs.promises.rm(dataDir, { + force: true, + recursive: true, + }); + }); + test('audit readiness', async () => { + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: new EventTarget() as any, + logger, + }); + await expect(async () => { + await audit.destroy(); + }).rejects.toThrow(auditErrors.ErrorAuditRunning); + // Should be a noop + await audit.start(); + await audit.stop(); + await audit.destroy(); + await expect(async () => { + await audit.start(); + }).rejects.toThrow(auditErrors.ErrorAuditDestroyed); + await expect(audit.getLastAuditEventId()).rejects.toThrow( + auditErrors.ErrorAuditNotRunning, + ); + await expect(audit.getAuditMetric(['node', 'connection'])).rejects.toThrow( + auditErrors.ErrorAuditNotRunning, + ); + }); + describe('AuditEvent', () => { + test('node connection', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionForward) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); + let iterator = audit.getAuditEvents(['node', 'connection']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + iterator = audit.getAuditEvents(['node', 'connection', 'forward']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + iterator = audit.getAuditEvents(['node', 'connection', 'reverse']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await audit.stop(); + }); + }); + describe('AuditMetric', () => { + test('node connection', async () => { + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const nodeId = testNodesUtils.generateRandomNodeId(); + const auditEventData = { + remoteHost: '::', + remoteNodeId: nodeUtils.encodeNodeId(nodeId), + remotePort: 0, + }; + const date = Date.now(); + const date1MinuteAgo = date - 60_000; + const date1HourAgo = date - 60 * 60_000; + const date1DayAgo = date - 24 * 60 * 60_000; + const date1MonthAgo = date - 30 * 24 * 60 * 60_000; + const dates = [ + date, + date1MinuteAgo, + date1HourAgo, + date1DayAgo, + date1MonthAgo, + ]; + for (const iterDate of dates) { + // @ts-ignore: kidnap protected + await audit.setAuditEvent( + ['node', 'connection', 'reverse'], + ids.generateAuditEventIdFromEpoch(iterDate), + { + data: { + ...auditEventData, + type: 'reverse', + }, + }, + ); + } + for (const iterDate of dates) { + // @ts-ignore: kidnap protected + await audit.setAuditEvent( + ['node', 'connection', 'forward'], + ids.generateAuditEventIdFromEpoch(iterDate), + { + data: { + ...auditEventData, + type: 'forward', + }, + }, + ); + } + // Range from first element to now + await expect( + audit + .getAuditMetric(['node', 'connection', 'inbound']) + .then((e) => e.data), + ).resolves.toEqual({ + total: dates.length, + averagePerDay: dates.length / 30, + averagePerHour: dates.length / (30 * 24), + averagePerMinute: dates.length / (30 * 24 * 60), + }); + await expect( + audit + .getAuditMetric(['node', 'connection', 'outbound']) + .then((e) => e.data), + ).resolves.toEqual({ + total: dates.length, + averagePerDay: dates.length / 30, + averagePerHour: dates.length / (30 * 24), + averagePerMinute: dates.length / (30 * 24 * 60), + }); + await expect( + audit.getAuditMetric(['node', 'connection']).then((e) => e.data), + ).resolves.toEqual({ + total: dates.length * 2, + averagePerDay: (dates.length * 2) / 30, + averagePerHour: (dates.length * 2) / (30 * 24), + averagePerMinute: (dates.length * 2) / (30 * 24 * 60), + }); + // Range from day to now + await expect( + audit + .getAuditMetric(['node', 'connection', 'inbound'], { + from: date1DayAgo, + }) + .then((e) => e.data), + ).resolves.toEqual({ + total: dates.length - 1, + averagePerDay: dates.length - 1, + averagePerHour: (dates.length - 1) / 24, + averagePerMinute: (dates.length - 1) / (24 * 60), + }); + // Range from first element to minute + await expect( + audit + .getAuditMetric(['node', 'connection', 'inbound'], { + to: date1MinuteAgo, + }) + .then((e) => e.data), + ).resolves.toEqual({ + total: dates.length - 1, + averagePerDay: (dates.length - 1) / 30, + averagePerHour: (dates.length - 1) / (30 * 24), + averagePerMinute: (dates.length - 1) / (60 * 30 * 24 - 1), + }); + await audit.stop(); + }); + }); +}); From 86b74cf3249a33eaf3704a90a82091a02f61ae5a Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Sat, 18 Nov 2023 08:54:10 +1100 Subject: [PATCH 2/5] feat: made `id` property apart of the event --- src/audit/Audit.ts | 56 ++++++++++++++++++++++------------- src/audit/types.ts | 5 ++++ tests/audit/Audit.test.ts | 61 +++++++++++++++++++++++++++------------ 3 files changed, 84 insertions(+), 38 deletions(-) diff --git a/src/audit/Audit.ts b/src/audit/Audit.ts index b16facf0c..bf80b2451 100644 --- a/src/audit/Audit.ts +++ b/src/audit/Audit.ts @@ -3,10 +3,10 @@ import type { TopicSubPath, TopicPath, TopicSubPathToAuditEvent, - AuditEvent, MetricPath, MetricPathToAuditMetric, AuditMetricNodeConnection, + AuditEventSerialized, } from './types'; import type { AuditEventId } from '../ids/types'; import type NodeConnectionManager from '../nodes/NodeConnectionManager'; @@ -163,10 +163,12 @@ class Audit { const handler = async (evt: InstanceType) => { const eventData = await toAuditEvent(evt); await this.db.withTransactionF(async (tran) => { - const event: AuditEvent = { - data: eventData, - }; - await this.addAuditEvent(topicPath, event as any, tran); + const auditEventId = this.generateAuditEventId(); + await this.setAuditEvent( + topicPath, + { id: auditEventId, data: eventData } as any, + tran, + ); }); }; target.addEventListener(event.name, handler); @@ -176,17 +178,24 @@ class Audit { @ready(new auditErrors.ErrorAuditNotRunning()) protected async setAuditEvent( topicPath: TopicPath, - auditEventId: AuditEventId, auditEvent: TopicSubPathToAuditEvent, tran?: DBTransaction, ) { if (tran == null) { return await this.db.withTransactionF((tran) => - this.setAuditEvent(topicPath, auditEventId, auditEvent, tran), + this.setAuditEvent(topicPath, auditEvent, tran), ); } - const auditEventIdBuffer = auditEventId.toBuffer(); - await tran.put([...this.auditEventDbPath, auditEventIdBuffer], auditEvent); + const clonedAuditEvent: AuditEventSerialized> = + { + ...auditEvent, + }; + delete (clonedAuditEvent as any).id; + const auditEventIdBuffer = auditEvent.id.toBuffer(); + await tran.put( + [...this.auditEventDbPath, auditEventIdBuffer], + clonedAuditEvent, + ); const subTopicArray: Array = []; for (const topic of topicPath) { subTopicArray.push(topic); @@ -198,18 +207,22 @@ class Audit { } @ready(new auditErrors.ErrorAuditNotRunning()) - protected async addAuditEvent( - topicPath: TopicPath, - auditEvent: TopicSubPathToAuditEvent, - tran?: DBTransaction, + public async *getAuditEventsLongRunning( + topicPath: T, + { + seek, + order, + limit, + }: { + seek?: AuditEventId; + order?: 'asc' | 'desc'; + limit?: number; + } = {}, ) { - if (tran == null) { - return await this.db.withTransactionF((tran) => - this.addAuditEvent(topicPath, auditEvent, tran), - ); + const seekCursor = seek; + while (true) { + this.getAuditEvents(topicPath); } - const auditEventId = this.generateAuditEventId(); - await this.setAuditEvent(topicPath, auditEventId, auditEvent, tran); } @ready(new auditErrors.ErrorAuditNotRunning()) @@ -263,6 +276,7 @@ class Audit { const iterator = tran.iterator( [...this.auditTopicDbPath, topicPath.join('.')], { + keyAsBuffer: true, keys: true, values: false, valueAsBuffer: false, @@ -274,11 +288,13 @@ class Audit { iterator.seek(seek.toBuffer()); } for await (const [keyPath] of iterator) { + const key = keyPath.at(-1)! as Buffer; const event = await tran.get>([ ...this.auditEventDbPath, - keyPath.at(-1)!, + key, ]); if (event != null) { + event.id = IdInternal.fromBuffer(key); yield event; } } diff --git a/src/audit/types.ts b/src/audit/types.ts index 026081ec5..34fee1197 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -1,3 +1,4 @@ +import type { AuditEventId } from '../ids'; import type { POJO } from '../types'; import type { nodeConnectionInboundMetricPath, @@ -24,9 +25,12 @@ type InferTypeFromSubpath< * Represents a capture of an event. */ type AuditEvent = { + id: AuditEventId; data: T; }; +type AuditEventSerialized = Omit; + type TopicPath = // Nodes typeof nodeConnectionReverseTopicPath | typeof nodeConnectionForwardTopicPath; @@ -107,6 +111,7 @@ export type { IsSubpath, InferTypeFromSubpath, AuditEvent, + AuditEventSerialized, TopicPath, TopicSubPathToAuditEvent, TopicSubPath, diff --git a/tests/audit/Audit.test.ts b/tests/audit/Audit.test.ts index c5ec72ab5..1e6a1d4af 100644 --- a/tests/audit/Audit.test.ts +++ b/tests/audit/Audit.test.ts @@ -82,6 +82,37 @@ describe(Audit.name, () => { auditErrors.ErrorAuditNotRunning, ); }); + // Test('long running', async () => { + // const nodeId = testNodesUtils.generateRandomNodeId(); + // const audit = await Audit.createAudit({ + // db, + // nodeConnectionManager: mockNodeConnectionManager, + // logger, + // }); + // const eventDetail: ConnectionData = { + // remoteHost: '::' as Host, + // remoteNodeId: nodeId, + // remotePort: 0 as Port, + // }; + // const auditEventData = { + // ...eventDetail, + // remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + // }; + // // @ts-ignore: kidnap protected + // const handlerMap = audit.eventHandlerMap; + // await handlerMap + // .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + // ?.handler( + // new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + // detail: eventDetail, + // }), + // ); + + // for await (const s of audit.getAuditEventsLongRunning(['node'])) { + + // } + // await audit.stop(); + // }); describe('AuditEvent', () => { test('node connection', async () => { const nodeId = testNodesUtils.generateRandomNodeId(); @@ -172,29 +203,23 @@ describe(Audit.name, () => { ]; for (const iterDate of dates) { // @ts-ignore: kidnap protected - await audit.setAuditEvent( - ['node', 'connection', 'reverse'], - ids.generateAuditEventIdFromEpoch(iterDate), - { - data: { - ...auditEventData, - type: 'reverse', - }, + await audit.setAuditEvent(['node', 'connection', 'reverse'], { + id: ids.generateAuditEventIdFromEpoch(iterDate), + data: { + ...auditEventData, + type: 'reverse', }, - ); + }); } for (const iterDate of dates) { // @ts-ignore: kidnap protected - await audit.setAuditEvent( - ['node', 'connection', 'forward'], - ids.generateAuditEventIdFromEpoch(iterDate), - { - data: { - ...auditEventData, - type: 'forward', - }, + await audit.setAuditEvent(['node', 'connection', 'forward'], { + id: ids.generateAuditEventIdFromEpoch(iterDate), + data: { + ...auditEventData, + type: 'forward', }, - ); + }); } // Range from first element to now await expect( From 08283e9cc6ca9b6ca19852849bd304d8f1ff74d9 Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Sat, 18 Nov 2023 14:15:12 +1100 Subject: [PATCH 3/5] feat: js-rpc `audit` domain handlers [ci-skip] --- src/PolykeyClient.ts | 7 +- src/audit/Audit.ts | 118 ++++++++--- src/audit/events.ts | 13 ++ src/audit/types.ts | 44 ++++- src/audit/utils.ts | 8 +- src/client/callers/auditEventsGet.ts | 40 ++++ src/client/callers/auditMetricGet.ts | 27 +++ src/client/callers/index.ts | 5 + src/client/handlers/AuditEventsGet.ts | 61 ++++++ src/client/handlers/AuditMetricGet.ts | 40 ++++ src/client/types.ts | 24 ++- src/ids/index.ts | 31 +++ src/ids/types.ts | 2 + tests/audit/Audit.test.ts | 272 +++++++++++++++++++++++--- tests/client/handlers/audit.test.ts | 268 +++++++++++++++++++++++++ 15 files changed, 888 insertions(+), 72 deletions(-) create mode 100644 src/client/callers/auditEventsGet.ts create mode 100644 src/client/callers/auditMetricGet.ts create mode 100644 src/client/handlers/AuditEventsGet.ts create mode 100644 src/client/handlers/AuditMetricGet.ts create mode 100644 tests/client/handlers/audit.test.ts diff --git a/src/PolykeyClient.ts b/src/PolykeyClient.ts index 95c3fe4d9..ac9ce0692 100644 --- a/src/PolykeyClient.ts +++ b/src/PolykeyClient.ts @@ -1,6 +1,7 @@ import type { PromiseCancellable } from '@matrixai/async-cancellable'; import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import type { DeepPartial, FileSystem } from './types'; +import type { OverrideRPClientType } from './client/types'; import type { NodeId } from './ids/types'; import path from 'path'; import Logger from '@matrixai/logger'; @@ -151,7 +152,9 @@ class PolykeyClient { protected logger: Logger; protected _nodeId: NodeId; protected _webSocketClient: WebSocketClient; - protected _rpcClient: RPCClient; + protected _rpcClient: OverrideRPClientType< + RPCClient + >; constructor({ nodePath, @@ -302,7 +305,7 @@ class PolykeyClient { }); this._nodeId = nodeId_; this._webSocketClient = webSocketClient; - this._rpcClient = rpcClient; + this._rpcClient = rpcClient as typeof this._rpcClient; this.logger.info(`Started ${this.constructor.name}`); } diff --git a/src/audit/Audit.ts b/src/audit/Audit.ts index bf80b2451..126580117 100644 --- a/src/audit/Audit.ts +++ b/src/audit/Audit.ts @@ -6,7 +6,7 @@ import type { MetricPath, MetricPathToAuditMetric, AuditMetricNodeConnection, - AuditEventSerialized, + AuditEventToAuditEventDB, } from './types'; import type { AuditEventId } from '../ids/types'; import type NodeConnectionManager from '../nodes/NodeConnectionManager'; @@ -186,10 +186,11 @@ class Audit { this.setAuditEvent(topicPath, auditEvent, tran), ); } - const clonedAuditEvent: AuditEventSerialized> = - { - ...auditEvent, - }; + const clonedAuditEvent: AuditEventToAuditEventDB< + TopicSubPathToAuditEvent + > = { + ...auditEvent, + }; delete (clonedAuditEvent as any).id; const auditEventIdBuffer = auditEvent.id.toBuffer(); await tran.put( @@ -204,6 +205,16 @@ class Audit { true, ); } + tran.queueFinally(() => { + this.dispatchEvent( + new auditEvents.EventAuditAuditEventSet({ + detail: { + topicPath, + auditEvent, + }, + }), + ); + }); } @ready(new auditErrors.ErrorAuditNotRunning()) @@ -211,17 +222,61 @@ class Audit { topicPath: T, { seek, - order, - limit, }: { seek?: AuditEventId; - order?: 'asc' | 'desc'; - limit?: number; } = {}, - ) { - const seekCursor = seek; - while (true) { - this.getAuditEvents(topicPath); + ): AsyncGenerator> { + let blockP: Promise; + let resolveBlockP: () => void; + const handleEventAuditAuditEventSet = ( + evt: auditEvents.EventAuditAuditEventSet, + ) => { + let isSupTopic = true; + for (let i = 0; i < topicPath.length; i++) { + if (evt.detail.topicPath.at(i) !== topicPath[i]) { + isSupTopic = false; + } + } + if (isSupTopic) { + resolveBlockP(); + } + }; + try { + let seekCursor = seek; + let firstRunComplete = false; + this.addEventListener( + auditEvents.EventAuditAuditEventSet.name, + handleEventAuditAuditEventSet, + ); + while (true) { + const blockProm = utils.promise(); + blockP = blockProm.p; + resolveBlockP = blockProm.resolveP; + + const iterator = this.getAuditEvents(topicPath, { + seek: seekCursor, + order: 'asc', + }); + let i = 0; + for await (const auditEvent of iterator) { + seekCursor = auditEvent.id; + // Skip the first element if this is not the first run + if (firstRunComplete && i === 0) { + i++; + continue; + } + yield auditEvent; + i++; + } + firstRunComplete = true; + + await blockP; + } + } finally { + this.removeEventListener( + auditEvents.EventAuditAuditEventSet.name, + handleEventAuditAuditEventSet, + ); } } @@ -230,7 +285,7 @@ class Audit { topicPath: T, { seek, - order, + order = 'asc', limit, }: { seek?: AuditEventId; @@ -238,11 +293,7 @@ class Audit { limit?: number; } = {}, tran?: DBTransaction, - ): AsyncGenerator< - TopicSubPathToAuditEvent, - void, - TopicSubPathToAuditEvent - > { + ): AsyncGenerator> { if (tran == null) { const getEvents = (tran) => this.getAuditEvents( @@ -259,16 +310,20 @@ class Audit { }); } if (topicPath.length === 0) { - for await (const [, auditEvent] of tran.iterator< - TopicSubPathToAuditEvent - >(this.auditEventDbPath, { - keys: false, - values: true, - valueAsBuffer: false, - reverse: order !== 'asc', - limit, - gte: seek?.toBuffer(), - })) { + const iterator = tran.iterator>( + this.auditEventDbPath, + { + keys: false, + values: true, + valueAsBuffer: false, + reverse: order !== 'asc', + limit, + }, + ); + if (seek != null) { + iterator.seek(seek.toBuffer()); + } + for await (const [, auditEvent] of iterator) { yield auditEvent; } return; @@ -320,7 +375,10 @@ class Audit { let fromIdBuffer: Buffer | undefined; if (fromEpoch != null) { fromIdBuffer = ids - .generateAuditEventIdFromEpoch(fromEpoch, () => new Uint8Array()) + .generateAuditEventIdFromEpoch( + fromEpoch, + (size) => new Uint8Array(size), + ) .toBuffer(); } let toIdBuffer: Buffer | undefined; diff --git a/src/audit/events.ts b/src/audit/events.ts index 227849e3c..b10214f68 100644 --- a/src/audit/events.ts +++ b/src/audit/events.ts @@ -1,3 +1,4 @@ +import type { TopicPath, TopicSubPathToAuditEvent } from './types'; import EventPolykey from '../EventPolykey'; abstract class EventAudit extends EventPolykey {} @@ -14,6 +15,16 @@ class EventAuditDestroy extends EventAudit {} class EventAuditDestroyed extends EventAudit {} +abstract class EventAuditAuditEvent extends EventAudit {} + +class EventAuditAuditEventSet< + P extends TopicPath = TopicPath, + T extends TopicSubPathToAuditEvent

= TopicSubPathToAuditEvent

, +> extends EventAuditAuditEvent<{ + topicPath: P; + auditEvent: T; +}> {} + export { EventAudit, EventAuditStart, @@ -22,4 +33,6 @@ export { EventAuditStopped, EventAuditDestroy, EventAuditDestroyed, + EventAuditAuditEvent, + EventAuditAuditEventSet, }; diff --git a/src/audit/types.ts b/src/audit/types.ts index 34fee1197..9570cd6de 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -1,4 +1,4 @@ -import type { AuditEventId } from '../ids'; +import type { AuditEventId, AuditEventIdEncoded } from '../ids'; import type { POJO } from '../types'; import type { nodeConnectionInboundMetricPath, @@ -24,12 +24,33 @@ type InferTypeFromSubpath< /** * Represents a capture of an event. */ -type AuditEvent = { +type AuditEventBase = { id: AuditEventId; data: T; }; -type AuditEventSerialized = Omit; +/** + * Represents a capture of an event. + */ +type AuditEvent = + | AuditEventNodeConnectionForward + | AuditEventNodeConnectionReverse; + +/** + * Represents a capture of an event stored in the database. + */ +type AuditEventDB = AuditEventToAuditEventDB; + +/** + * Represents a capture of an event for transmission over network. + */ +type AuditEventSerialized = AuditEventToAuditEventSerialized; + +type AuditEventToAuditEventDB = Omit; + +type AuditEventToAuditEventSerialized = T & { + id: AuditEventIdEncoded; +}; type TopicPath = // Nodes @@ -56,7 +77,7 @@ type TopicSubPathToAuditEvent = // Nodes -type AuditEventNodeConnectionBase = AuditEvent<{ +type AuditEventNodeConnectionBase = AuditEventBase<{ remoteNodeId: string; remoteHost: string; remotePort: number; @@ -67,12 +88,12 @@ type AuditEventNodeConnection = | AuditEventNodeConnectionForward; type AuditEventNodeConnectionReverse = AuditEventNodeConnectionBase & - AuditEvent<{ + AuditEventBase<{ type: 'reverse'; }>; type AuditEventNodeConnectionForward = AuditEventNodeConnectionBase & - AuditEvent<{ + AuditEventBase<{ type: 'forward'; }>; @@ -92,14 +113,16 @@ type MetricPathToAuditMetric = ? AuditMetricNodeConnection : never; +type AuditMetric = AuditMetricNodeConnection; + /** * Represents a capture of an event. */ -type AuditMetric = { +type AuditMetricBase = { data: T; }; -type AuditMetricNodeConnection = AuditMetric<{ +type AuditMetricNodeConnection = AuditMetricBase<{ total: number; averagePerMinute: number; averagePerHour: number; @@ -110,8 +133,12 @@ export type { // Event IsSubpath, InferTypeFromSubpath, + AuditEventBase, AuditEvent, + AuditEventDB, + AuditEventToAuditEventDB, AuditEventSerialized, + AuditEventToAuditEventSerialized, TopicPath, TopicSubPathToAuditEvent, TopicSubPath, @@ -123,5 +150,6 @@ export type { MetricPath, MetricPathToAuditMetric, AuditMetric, + AuditMetricBase, AuditMetricNodeConnection, }; diff --git a/src/audit/utils.ts b/src/audit/utils.ts index e6a813f70..cbd189a0a 100644 --- a/src/audit/utils.ts +++ b/src/audit/utils.ts @@ -4,7 +4,11 @@ import type { } from './types'; import type * as nodesEvents from '../nodes/events'; import * as nodesUtils from '../nodes/utils'; -import { createAuditEventIdGenerator } from '../ids'; +import { + createAuditEventIdGenerator, + encodeAuditEventId, + decodeAuditEventId, +} from '../ids'; // Events @@ -66,6 +70,8 @@ const nodeConnectionOutboundMetricPath = [ export { createAuditEventIdGenerator, + encodeAuditEventId, + decodeAuditEventId, nodeConnectionReverseTopicPath, fromEventNodeConnectionManagerConnectionReverse, nodeConnectionForwardTopicPath, diff --git a/src/client/callers/auditEventsGet.ts b/src/client/callers/auditEventsGet.ts new file mode 100644 index 000000000..574a9f853 --- /dev/null +++ b/src/client/callers/auditEventsGet.ts @@ -0,0 +1,40 @@ +import type { ReadableStream } from 'stream/web'; +import type { HandlerTypes } from '@matrixai/rpc'; +import type { ContextTimedInput } from '@matrixai/contexts'; +import type { + AuditEventToAuditEventSerialized, + TopicSubPath, + TopicSubPathToAuditEvent, +} from '../../audit/types'; +import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types'; +import type AuditEventsGet from '../handlers/AuditEventsGet'; +import type { AuditEventIdEncoded } from '../../ids/types'; +import { ServerCaller } from '@matrixai/rpc'; + +type CallerTypes = HandlerTypes; + +type AuditEventsGetTypeOverride = ( + input: ClientRPCRequestParams<{ + seek?: AuditEventIdEncoded; + order?: 'asc' | 'desc'; + limit?: number; + }> & { + path: T; + }, + ctx?: ContextTimedInput, +) => Promise< + ReadableStream< + ClientRPCResponseResult< + AuditEventToAuditEventSerialized> + > + > +>; + +const auditEventsGet = new ServerCaller< + CallerTypes['input'], + CallerTypes['output'] +>(); + +export default auditEventsGet; + +export type { AuditEventsGetTypeOverride }; diff --git a/src/client/callers/auditMetricGet.ts b/src/client/callers/auditMetricGet.ts new file mode 100644 index 000000000..957209f13 --- /dev/null +++ b/src/client/callers/auditMetricGet.ts @@ -0,0 +1,27 @@ +import type { HandlerTypes } from '@matrixai/rpc'; +import type { ContextTimedInput } from '@matrixai/contexts'; +import type { MetricPath, MetricPathToAuditMetric } from '../../audit/types'; +import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types'; +import type AuditMetricGet from '../handlers/AuditMetricGet'; +import { UnaryCaller } from '@matrixai/rpc'; + +type CallerTypes = HandlerTypes; + +type AuditMetricGetTypeOverride = ( + input: ClientRPCRequestParams<{ + from?: number; + to?: number; + }> & { + path: T; + }, + ctx?: ContextTimedInput, +) => Promise>>; + +const auditMetricGet = new UnaryCaller< + CallerTypes['input'], + CallerTypes['output'] +>(); + +export default auditMetricGet; + +export type { AuditMetricGetTypeOverride }; diff --git a/src/client/callers/index.ts b/src/client/callers/index.ts index f3e96a478..c2b6f9d0f 100644 --- a/src/client/callers/index.ts +++ b/src/client/callers/index.ts @@ -2,6 +2,8 @@ import agentLockAll from './agentLockAll'; import agentStatus from './agentStatus'; import agentStop from './agentStop'; import agentUnlock from './agentUnlock'; +import auditEventsGet from './auditEventsGet'; +import auditMetricGet from './auditMetricGet'; import gestaltsActionsGetByIdentity from './gestaltsActionsGetByIdentity'; import gestaltsActionsGetByNode from './gestaltsActionsGetByNode'; import gestaltsActionsSetByIdentity from './gestaltsActionsSetByIdentity'; @@ -75,6 +77,8 @@ const clientManifest = { agentStatus, agentStop, agentUnlock, + auditEventsGet, + auditMetricGet, gestaltsActionsGetByIdentity, gestaltsActionsGetByNode, gestaltsActionsSetByIdentity, @@ -148,6 +152,7 @@ export { agentStatus, agentStop, agentUnlock, + auditEventsGet, gestaltsActionsGetByIdentity, gestaltsActionsGetByNode, gestaltsActionsSetByIdentity, diff --git a/src/client/handlers/AuditEventsGet.ts b/src/client/handlers/AuditEventsGet.ts new file mode 100644 index 000000000..06f0bd6d1 --- /dev/null +++ b/src/client/handlers/AuditEventsGet.ts @@ -0,0 +1,61 @@ +import type { ContextTimed } from '@matrixai/contexts'; +import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types'; +import type { + AuditEventSerialized, + AuditEventToAuditEventSerialized, + TopicSubPath, + TopicSubPathToAuditEvent, +} from '../../audit/types'; +import type { Audit } from '../../audit'; +import type { AuditEventIdEncoded } from '../../ids'; +import { ServerHandler } from '@matrixai/rpc'; +import * as auditUtils from '../../audit/utils'; + +class AuditEventsGet extends ServerHandler< + { + audit: Audit; + }, + ClientRPCRequestParams<{ + path: TopicSubPath & Array; + seek?: AuditEventIdEncoded; + order?: 'asc' | 'desc'; + limit?: number; + }>, + ClientRPCResponseResult +> { + public async *handle( + input: ClientRPCRequestParams<{ + seek?: AuditEventIdEncoded; + order?: 'asc' | 'desc'; + limit?: number; + }> & { + path: T; + }, + _cancel, + _meta, + ctx: ContextTimed, + ): AsyncGenerator< + ClientRPCResponseResult< + AuditEventToAuditEventSerialized> + > + > { + const { audit } = this.container; + const iterator = audit.getAuditEvents(input.path, { + seek: + input.seek != null + ? auditUtils.decodeAuditEventId(input.seek) + : undefined, + order: input.order, + limit: input.limit, + }); + ctx.signal.addEventListener('abort', async () => { + await iterator.return(ctx.signal.reason); + }); + for await (const auditEvent of iterator) { + (auditEvent.id as any) = auditUtils.encodeAuditEventId(auditEvent.id); + yield auditEvent as any; + } + } +} + +export default AuditEventsGet; diff --git a/src/client/handlers/AuditMetricGet.ts b/src/client/handlers/AuditMetricGet.ts new file mode 100644 index 000000000..3e1595ca8 --- /dev/null +++ b/src/client/handlers/AuditMetricGet.ts @@ -0,0 +1,40 @@ +import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types'; +import type { + AuditMetric, + MetricPath, + MetricPathToAuditMetric, +} from '../../audit/types'; +import type { Audit } from '../../audit'; +import { UnaryHandler } from '@matrixai/rpc'; + +class AuditMetricGet extends UnaryHandler< + { + audit: Audit; + }, + ClientRPCRequestParams<{ + path: MetricPath & Array; + from?: number; + to?: number; + }>, + ClientRPCResponseResult +> { + public handle = async ( + input: ClientRPCRequestParams<{ + from?: number; + to?: number; + }> & { + path: T; + }, + _cancel, + _meta, + _ctx, + ): Promise>> => { + const { audit } = this.container; + return (await audit.getAuditMetric(input.path, { + from: input.from, + to: input.to, + })) as any; + }; +} + +export default AuditMetricGet; diff --git a/src/client/types.ts b/src/client/types.ts index 0570182e3..a3bc767cd 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,4 +1,9 @@ -import type { JSONObject, JSONRPCResponseResult } from '@matrixai/rpc'; +import type { + ClientManifest, + JSONObject, + JSONRPCResponseResult, + RPCClient, +} from '@matrixai/rpc'; import type { GestaltIdEncoded, IdentityId, @@ -11,6 +16,8 @@ import type { CommitId, VaultAction, VaultName } from '../vaults/types'; import type { CertificatePEM, JWKEncrypted, PublicKeyJWK } from '../keys/types'; import type { Notification } from '../notifications/types'; import type { ProviderToken } from '../identities/types'; +import type { AuditEventsGetTypeOverride } from './callers/auditEventsGet'; +import type { AuditMetricGetTypeOverride } from './callers/auditMetricGet'; type ClientRPCRequestParams = JSONRPCResponseResult< @@ -306,6 +313,18 @@ type SecretStatMessage = { }; }; +// Type casting for tricky handlers + +type OverrideRPClientType> = Omit< + T, + 'methods' +> & { + methods: { + auditEventsGet: AuditEventsGetTypeOverride; + auditMetricGet: AuditMetricGetTypeOverride; + } & T['methods']; +}; + export type { ClientRPCRequestParams, ClientRPCResponseResult, @@ -363,4 +382,7 @@ export type { SecretRenameMessage, SecretStatMessage, SignatureMessage, + OverrideRPClientType, + AuditEventsGetTypeOverride, + AuditMetricGetTypeOverride, }; diff --git a/src/ids/index.ts b/src/ids/index.ts index 17bc005c7..b0e592140 100644 --- a/src/ids/index.ts +++ b/src/ids/index.ts @@ -21,6 +21,7 @@ import type { NotificationId, NotificationIdEncoded, AuditEventId, + AuditEventIdEncoded, } from './types'; import { IdInternal, IdSortable, IdRandom } from '@matrixai/id'; import * as keysUtilsRandom from '../keys/utils/random'; @@ -43,6 +44,34 @@ function createAuditEventIdGenerator( return () => generator.get(); } +/** + * Encodes `AuditEventId` to `AuditEventIdEncoded` + */ +function encodeAuditEventId(auditEventId: AuditEventId): AuditEventIdEncoded { + return auditEventId.toBuffer().toString('hex') as AuditEventIdEncoded; +} + +/** + * Decodes `AuditEventIdEncoded` to `AuditEventId` + */ +function decodeAuditEventId( + auditEventIdEncoded: unknown, +): AuditEventId | undefined { + if (typeof auditEventIdEncoded !== 'string') { + return; + } + const auditEventIdBuffer = Buffer.from(auditEventIdEncoded, 'hex'); + const auditEventId = IdInternal.fromBuffer(auditEventIdBuffer); + if (auditEventId == null) { + return; + } + // All `AuditEventId` are 16 bytes long + if (auditEventId.length !== 16) { + return; + } + return auditEventId; +} + /** * Generates an auditId from an epoch timestamp. * @@ -493,6 +522,8 @@ export { createPermIdGenerator, createAuditEventIdGenerator, generateAuditEventIdFromEpoch, + encodeAuditEventId, + decodeAuditEventId, createNodeIdGenerator, isNodeId, assertNodeId, diff --git a/src/ids/types.ts b/src/ids/types.ts index bc015bac5..394e7505c 100644 --- a/src/ids/types.ts +++ b/src/ids/types.ts @@ -10,6 +10,7 @@ type PermissionIdString = Opaque<'PermissionIdString', string>; type AuditEventId = Opaque<'AuditEventId', Id>; type AuditEventIdString = Opaque<'AuditEventIdString', string>; +type AuditEventIdEncoded = Opaque<'AuditEventIdEncoded', string>; // Keys @@ -107,6 +108,7 @@ export type { PermissionIdString, AuditEventId, AuditEventIdString, + AuditEventIdEncoded, CertId, CertIdString, CertIdEncoded, diff --git a/tests/audit/Audit.test.ts b/tests/audit/Audit.test.ts index 1e6a1d4af..5b677b7b4 100644 --- a/tests/audit/Audit.test.ts +++ b/tests/audit/Audit.test.ts @@ -9,6 +9,7 @@ import { DB } from '@matrixai/db'; import Audit from '@/audit/Audit'; import * as utils from '@/utils'; import * as auditErrors from '@/audit/errors'; +import * as auditEvents from '@/audit/events'; import * as keysUtils from '@/keys/utils'; import * as nodeEvents from '@/nodes/events'; import * as nodeUtils from '@/nodes/utils'; @@ -82,39 +83,130 @@ describe(Audit.name, () => { auditErrors.ErrorAuditNotRunning, ); }); - // Test('long running', async () => { - // const nodeId = testNodesUtils.generateRandomNodeId(); - // const audit = await Audit.createAudit({ - // db, - // nodeConnectionManager: mockNodeConnectionManager, - // logger, - // }); - // const eventDetail: ConnectionData = { - // remoteHost: '::' as Host, - // remoteNodeId: nodeId, - // remotePort: 0 as Port, - // }; - // const auditEventData = { - // ...eventDetail, - // remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), - // }; - // // @ts-ignore: kidnap protected - // const handlerMap = audit.eventHandlerMap; - // await handlerMap - // .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) - // ?.handler( - // new nodeEvents.EventNodeConnectionManagerConnectionReverse({ - // detail: eventDetail, - // }), - // ); + test('event dispatch', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; - // for await (const s of audit.getAuditEventsLongRunning(['node'])) { + const { p: eventP, resolveP: resolveEventP } = utils.promise(); + audit.addEventListener(auditEvents.EventAuditAuditEventSet.name, () => + resolveEventP(), + ); - // } - // await audit.stop(); - // }); + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + void handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await eventP; + const iterator = audit.getAuditEvents(['node']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'reverse', + }); + }); describe('AuditEvent', () => { - test('node connection', async () => { + test('order', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionForward) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); + let iterator = audit.getAuditEvents(['node', 'connection']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + iterator = audit.getAuditEvents(['node', 'connection'], { + order: 'desc', + }); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + await audit.stop(); + }); + test('limit', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + for (let i = 0; i < 10; i++) { + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + } + const limit = 5; + let count = 0; + for await (const _ of audit.getAuditEvents( + ['node', 'connection', 'reverse'], + { limit }, + )) { + count++; + } + expect(count).toBe(limit); + await audit.stop(); + }); + test('topic nesting', async () => { const nodeId = testNodesUtils.generateRandomNodeId(); const audit = await Audit.createAudit({ db, @@ -147,18 +239,138 @@ describe(Audit.name, () => { }), ); let iterator = audit.getAuditEvents(['node', 'connection']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + iterator = audit.getAuditEvents(['node']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + iterator = audit.getAuditEvents([]); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + await audit.stop(); + }); + test('long running', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + const iterator = audit.getAuditEventsLongRunning(['node']); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionForward) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( { ...auditEventData, type: 'forward', }, ); + await audit.stop(); + }); + test('node connection topic', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionForward) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); + let iterator = audit.getAuditEvents(['node', 'connection']); await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( { ...auditEventData, type: 'reverse', }, ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); iterator = audit.getAuditEvents(['node', 'connection', 'forward']); await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( { diff --git a/tests/client/handlers/audit.test.ts b/tests/client/handlers/audit.test.ts new file mode 100644 index 000000000..b7adecdac --- /dev/null +++ b/tests/client/handlers/audit.test.ts @@ -0,0 +1,268 @@ +import type { ConnectionData, Host, Port, TLSConfig } from '@/network/types'; +import type { OverrideRPClientType } from '@/client/types'; +import fs from 'fs'; +import path from 'path'; +import os from 'os'; +import Logger, { formatting, LogLevel, StreamHandler } from '@matrixai/logger'; +import { DB } from '@matrixai/db'; +import { RPCClient } from '@matrixai/rpc'; +import { WebSocketClient } from '@matrixai/ws'; +import KeyRing from '@/keys/KeyRing'; +import ClientService from '@/client/ClientService'; +import { auditEventsGet } from '@/client/callers'; +import * as keysUtils from '@/keys/utils'; +import * as nodesUtils from '@/nodes/utils'; +import * as nodesEvents from '@/nodes/events'; +import * as networkUtils from '@/network/utils'; +import AuditEventsGet from '@/client/handlers/AuditEventsGet'; +import { Audit } from '@/audit'; +import AuditMetricGet from '@/client/handlers/AuditMetricGet'; +import auditMetricGet from '@/client/callers/auditMetricGet'; +import * as testNodesUtils from '../../nodes/utils'; +import * as testsUtils from '../../utils'; + +describe('auditEventGet', () => { + const logger = new Logger('auditEventsGet test', LogLevel.WARN, [ + new StreamHandler( + formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`, + ), + ]); + const password = 'password'; + const localhost = '127.0.0.1'; + let audit: Audit; + let dataDir: string; + let db: DB; + let keyRing: KeyRing; + let clientService: ClientService; + let webSocketClient: WebSocketClient; + let rpcClient: OverrideRPClientType< + RPCClient<{ + auditEventsGet: typeof auditEventsGet; + }> + >; + let tlsConfig: TLSConfig; + + beforeEach(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const keysPath = path.join(dataDir, 'keys'); + const dbPath = path.join(dataDir, 'db'); + db = await DB.createDB({ + dbPath, + logger, + }); + keyRing = await KeyRing.createKeyRing({ + password, + keysPath, + passwordOpsLimit: keysUtils.passwordOpsLimits.min, + passwordMemLimit: keysUtils.passwordMemLimits.min, + strictMemoryLock: false, + logger, + }); + tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair); + clientService = new ClientService({ + tlsConfig, + logger: logger.getChild(ClientService.name), + }); + audit = await Audit.createAudit({ + db, + nodeConnectionManager: new EventTarget() as any, + logger: logger.getChild(Audit.name), + }); + clientService = new ClientService({ + tlsConfig, + logger: logger.getChild(ClientService.name), + }); + await clientService.start({ + manifest: { + auditEventsGet: new AuditEventsGet({ + audit, + }), + }, + host: localhost, + }); + webSocketClient = await WebSocketClient.createWebSocketClient({ + config: { + verifyPeer: false, + }, + host: localhost, + logger: logger.getChild(WebSocketClient.name), + port: clientService.port, + }); + rpcClient = new RPCClient({ + manifest: { + auditEventsGet, + }, + streamFactory: () => webSocketClient.connection.newStream(), + toError: networkUtils.toError, + logger: logger.getChild(RPCClient.name), + }) as any; + }); + afterEach(async () => { + await clientService.stop({ force: true }); + await webSocketClient.destroy({ force: true }); + await keyRing.stop(); + await audit.stop(); + await fs.promises.rm(dataDir, { + force: true, + recursive: true, + }); + }); + test('cancels', async () => { + const callerInterface = await rpcClient.methods.auditEventsGet({ + path: [], + }); + const reader = callerInterface.getReader(); + await reader.cancel(); + await expect(reader.closed).toResolve(); + }); + test('gets connection events', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodesUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + await handlerMap + .get(nodesEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + const callerInterface = await rpcClient.methods.auditEventsGet({ + path: ['node', 'connection', 'reverse'], + }); + const reader = callerInterface.getReader(); + await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'reverse', + }); + }); +}); + +describe('auditMetricGet', () => { + const logger = new Logger('auditMetricGet test', LogLevel.WARN, [ + new StreamHandler( + formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`, + ), + ]); + const password = 'password'; + const localhost = '127.0.0.1'; + let audit: Audit; + let dataDir: string; + let db: DB; + let keyRing: KeyRing; + let clientService: ClientService; + let webSocketClient: WebSocketClient; + let rpcClient: OverrideRPClientType< + RPCClient<{ + auditEventsGet: typeof auditEventsGet; + }> + >; + let tlsConfig: TLSConfig; + + beforeEach(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const keysPath = path.join(dataDir, 'keys'); + const dbPath = path.join(dataDir, 'db'); + db = await DB.createDB({ + dbPath, + logger, + }); + keyRing = await KeyRing.createKeyRing({ + password, + keysPath, + passwordOpsLimit: keysUtils.passwordOpsLimits.min, + passwordMemLimit: keysUtils.passwordMemLimits.min, + strictMemoryLock: false, + logger, + }); + tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair); + clientService = new ClientService({ + tlsConfig, + logger: logger.getChild(ClientService.name), + }); + audit = await Audit.createAudit({ + db, + nodeConnectionManager: new EventTarget() as any, + logger: logger.getChild(Audit.name), + }); + clientService = new ClientService({ + tlsConfig, + logger: logger.getChild(ClientService.name), + }); + await clientService.start({ + manifest: { + auditMetricGet: new AuditMetricGet({ + audit, + }), + }, + host: localhost, + }); + webSocketClient = await WebSocketClient.createWebSocketClient({ + config: { + verifyPeer: false, + }, + host: localhost, + logger: logger.getChild(WebSocketClient.name), + port: clientService.port, + }); + rpcClient = new RPCClient({ + manifest: { + auditMetricGet, + }, + streamFactory: () => webSocketClient.connection.newStream(), + toError: networkUtils.toError, + logger: logger.getChild(RPCClient.name), + }) as any; + }); + afterEach(async () => { + await clientService.stop({ force: true }); + await webSocketClient.destroy({ force: true }); + await keyRing.stop(); + await audit.stop(); + await fs.promises.rm(dataDir, { + force: true, + recursive: true, + }); + }); + test('gets connection metrics', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + await handlerMap + .get(nodesEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await expect( + rpcClient.methods + .auditMetricGet({ + path: ['node', 'connection', 'inbound'], + }) + .then((e) => e.data), + ).resolves.toEqual({ + total: 1, + averagePerMinute: 1, + averagePerHour: 1, + averagePerDay: 1, + }); + }); +}); From d9789722ca886ad1575e0a56e0facc7cced2534a Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Mon, 20 Nov 2023 17:24:24 +1100 Subject: [PATCH 4/5] feat: added `force` paramater to `Audit.stop` [ci-skip] --- src/audit/Audit.ts | 116 ++++++++++++++++++++++++-------------- tests/audit/Audit.test.ts | 44 +++++++++++++++ 2 files changed, 119 insertions(+), 41 deletions(-) diff --git a/src/audit/Audit.ts b/src/audit/Audit.ts index 126580117..6d1ff0fc6 100644 --- a/src/audit/Audit.ts +++ b/src/audit/Audit.ts @@ -18,6 +18,7 @@ import { ready, } from '@matrixai/async-init/dist/CreateDestroyStartStop'; import * as sortableIdUtils from '@matrixai/id/dist/IdSortable'; +import { PromiseCancellable } from '@matrixai/async-cancellable'; import * as auditErrors from './errors'; import * as auditEvents from './events'; import * as auditUtils from './utils'; @@ -68,6 +69,7 @@ class Audit { handler: (evt: AbstractEvent) => Promise; } > = new Map(); + protected taskPromises: Set> = new Set(); protected auditDbPath: LevelPath = [this.constructor.name]; protected dbLastAuditEventIdPath: LevelPath = [ this.constructor.name, @@ -119,12 +121,18 @@ class Audit { this.logger.info(`Started ${this.constructor.name}`); } - public async stop(): Promise { + public async stop({ force = true }: { force?: boolean } = {}): Promise { this.logger.info(`Stopping ${this.constructor.name}`); for (const [eventConstructor, { target, handler }] of this .eventHandlerMap) { target.removeEventListener(eventConstructor.name, handler); } + if (force) { + for (const promise of this.taskPromises) { + promise.cancel(new auditErrors.ErrorAuditNotRunning()); + } + } + await Promise.all([...this.taskPromises]).catch(() => {}); this.logger.info(`Stopped ${this.constructor.name}`); } @@ -226,8 +234,9 @@ class Audit { seek?: AuditEventId; } = {}, ): AsyncGenerator> { - let blockP: Promise; - let resolveBlockP: () => void; + let blockP: PromiseCancellable | undefined; + let resolveBlockP: (() => void) | undefined; + let blockPSignal: AbortSignal | undefined; const handleEventAuditAuditEventSet = ( evt: auditEvents.EventAuditAuditEventSet, ) => { @@ -238,7 +247,7 @@ class Audit { } } if (isSupTopic) { - resolveBlockP(); + resolveBlockP?.(); } }; try { @@ -249,9 +258,14 @@ class Audit { handleEventAuditAuditEventSet, ); while (true) { - const blockProm = utils.promise(); - blockP = blockProm.p; - resolveBlockP = blockProm.resolveP; + if (blockP != null) { + this.taskPromises.delete(blockP); + } + blockP = new PromiseCancellable((resolveP, _, signal) => { + resolveBlockP = resolveP; + blockPSignal = signal; + }); + this.taskPromises.add(blockP); const iterator = this.getAuditEvents(topicPath, { seek: seekCursor, @@ -263,13 +277,14 @@ class Audit { // Skip the first element if this is not the first run if (firstRunComplete && i === 0) { i++; + blockPSignal?.throwIfAborted(); continue; } yield auditEvent; i++; + blockPSignal?.throwIfAborted(); } firstRunComplete = true; - await blockP; } } finally { @@ -277,6 +292,10 @@ class Audit { auditEvents.EventAuditAuditEventSet.name, handleEventAuditAuditEventSet, ); + resolveBlockP?.(); + if (blockP != null) { + this.taskPromises.delete(blockP); + } } } @@ -309,12 +328,41 @@ class Audit { return yield* getEvents(tran); }); } - if (topicPath.length === 0) { - const iterator = tran.iterator>( - this.auditEventDbPath, + + let resolveFinishedP: (() => void) | undefined; + let finishedPSignal: AbortSignal | undefined; + const finishedP = new PromiseCancellable((resolveP, _, signal) => { + resolveFinishedP = resolveP; + finishedPSignal = signal; + }); + this.taskPromises.add(finishedP); + try { + if (topicPath.length === 0) { + const iterator = tran.iterator>( + this.auditEventDbPath, + { + keys: false, + values: true, + valueAsBuffer: false, + reverse: order !== 'asc', + limit, + }, + ); + if (seek != null) { + iterator.seek(seek.toBuffer()); + } + for await (const [, auditEvent] of iterator) { + yield auditEvent; + finishedPSignal?.throwIfAborted(); + } + return; + } + const iterator = tran.iterator( + [...this.auditTopicDbPath, topicPath.join('.')], { - keys: false, - values: true, + keyAsBuffer: true, + keys: true, + values: false, valueAsBuffer: false, reverse: order !== 'asc', limit, @@ -323,35 +371,21 @@ class Audit { if (seek != null) { iterator.seek(seek.toBuffer()); } - for await (const [, auditEvent] of iterator) { - yield auditEvent; - } - return; - } - const iterator = tran.iterator( - [...this.auditTopicDbPath, topicPath.join('.')], - { - keyAsBuffer: true, - keys: true, - values: false, - valueAsBuffer: false, - reverse: order !== 'asc', - limit, - }, - ); - if (seek != null) { - iterator.seek(seek.toBuffer()); - } - for await (const [keyPath] of iterator) { - const key = keyPath.at(-1)! as Buffer; - const event = await tran.get>([ - ...this.auditEventDbPath, - key, - ]); - if (event != null) { - event.id = IdInternal.fromBuffer(key); - yield event; + for await (const [keyPath] of iterator) { + const key = keyPath.at(-1)! as Buffer; + const event = await tran.get>([ + ...this.auditEventDbPath, + key, + ]); + if (event != null) { + event.id = IdInternal.fromBuffer(key); + yield event; + finishedPSignal?.throwIfAborted(); + } } + } finally { + resolveFinishedP?.(); + this.taskPromises.delete(finishedP); } } diff --git a/tests/audit/Audit.test.ts b/tests/audit/Audit.test.ts index 5b677b7b4..b9030aca9 100644 --- a/tests/audit/Audit.test.ts +++ b/tests/audit/Audit.test.ts @@ -83,6 +83,50 @@ describe(Audit.name, () => { auditErrors.ErrorAuditNotRunning, ); }); + test('audit cleanup', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: new EventTarget() as any, + logger, + }); + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + // Audit is able to stop when iterator is exhausted + let iterator = audit.getAuditEvents(['node', 'connection', 'reverse']); + await iterator.next(); + await iterator.next(); + await audit.stop({ force: false }); + await audit.start(); + // Audit is able to stop when iterator is not exhausted + await audit.start(); + iterator = audit.getAuditEvents(['node', 'connection', 'reverse']); + await iterator.next(); + await audit.stop({ force: true }); + await audit.start(); + iterator = audit.getAuditEventsLongRunning([ + 'node', + 'connection', + 'reverse', + ]); + await iterator.next(); + await audit.stop({ force: true }); + await expect(iterator.next()).rejects.toBeInstanceOf( + auditErrors.ErrorAuditNotRunning, + ); + }); test('event dispatch', async () => { const nodeId = testNodesUtils.generateRandomNodeId(); const audit = await Audit.createAudit({ From 92ce9c568426ab1c282e849fa0775618fe8c5fce Mon Sep 17 00:00:00 2001 From: Amy Yan Date: Mon, 20 Nov 2023 18:34:43 +1100 Subject: [PATCH 5/5] feat: consolidated `seek` and `seekEnd options for `Audit.getAuditEvents` fix: limits and seekEnd now woks correctly on `getAuditEventsLongRunning` --- src/PolykeyAgent.ts | 1 + src/audit/Audit.ts | 121 +++++++++++----- src/audit/utils.ts | 30 ++++ src/client/callers/auditEventsGet.ts | 4 +- src/client/callers/auditMetricGet.ts | 5 +- src/client/handlers/AuditEventsGet.ts | 56 ++++++-- src/client/handlers/AuditMetricGet.ts | 34 +++-- src/client/handlers/index.ts | 6 + src/ids/index.ts | 4 +- tests/audit/Audit.test.ts | 199 +++++++++++++++++++++----- tests/client/handlers/audit.test.ts | 35 ++++- 11 files changed, 388 insertions(+), 107 deletions(-) diff --git a/src/PolykeyAgent.ts b/src/PolykeyAgent.ts index f59c5af38..8b971bbe6 100644 --- a/src/PolykeyAgent.ts +++ b/src/PolykeyAgent.ts @@ -749,6 +749,7 @@ class PolykeyAgent { manifest: clientServerManifest({ polykeyAgent: this, acl: this.acl, + audit: this.audit, certManager: this.certManager, db: this.db, discovery: this.discovery, diff --git a/src/audit/Audit.ts b/src/audit/Audit.ts index 6d1ff0fc6..15ee5ee75 100644 --- a/src/audit/Audit.ts +++ b/src/audit/Audit.ts @@ -23,7 +23,6 @@ import * as auditErrors from './errors'; import * as auditEvents from './events'; import * as auditUtils from './utils'; import * as nodesEvents from '../nodes/events'; -import * as ids from '../ids'; import * as utils from '../utils'; interface Audit extends CreateDestroyStartStop {} @@ -230,8 +229,12 @@ class Audit { topicPath: T, { seek, + seekEnd, + limit, }: { - seek?: AuditEventId; + seek?: AuditEventId | Date | number; + seekEnd?: AuditEventId | Date | number; + limit?: number; } = {}, ): AsyncGenerator> { let blockP: PromiseCancellable | undefined; @@ -251,12 +254,15 @@ class Audit { } }; try { + let remainingLimit = limit; let seekCursor = seek; let firstRunComplete = false; this.addEventListener( auditEvents.EventAuditAuditEventSet.name, handleEventAuditAuditEventSet, ); + // There should be new entries every loop, + // if not that means that the limit has been reached based on seekEnd or limit while (true) { if (blockP != null) { this.taskPromises.delete(blockP); @@ -267,22 +273,45 @@ class Audit { }); this.taskPromises.add(blockP); + let limit: number | undefined; + if (remainingLimit != null) { + limit = !firstRunComplete ? remainingLimit : remainingLimit + 1; + } const iterator = this.getAuditEvents(topicPath, { seek: seekCursor, + seekEnd, order: 'asc', + limit, }); let i = 0; for await (const auditEvent of iterator) { seekCursor = auditEvent.id; - // Skip the first element if this is not the first run + // Skip the first element if this is not the first run as it is a duplicate if (firstRunComplete && i === 0) { i++; blockPSignal?.throwIfAborted(); continue; } yield auditEvent; - i++; blockPSignal?.throwIfAborted(); + i++; + } + // If only the first element was found after the initial loop, then we have reached the end + // if this is the first loop and there are no elements found, we have also reached the end + if ((firstRunComplete && i === 1) || i === 0) { + return; + } + if (remainingLimit != null) { + // The first element is a duplicate after the first run, so we ignore it. + if (!firstRunComplete) { + remainingLimit -= i; + } else { + remainingLimit -= i - 1; + } + // Return if the remaining limit is 0, we no longer need to yield any more events. + if (remainingLimit === 0) { + return; + } } firstRunComplete = true; await blockP; @@ -304,10 +333,12 @@ class Audit { topicPath: T, { seek, + seekEnd, order = 'asc', limit, }: { - seek?: AuditEventId; + seek?: AuditEventId | Date | number; + seekEnd?: AuditEventId | Date | number; order?: 'asc' | 'desc'; limit?: number; } = {}, @@ -319,6 +350,7 @@ class Audit { topicPath, { seek, + seekEnd, order, limit, }, @@ -329,6 +361,18 @@ class Audit { }); } + const seekAuditEventId = + seek != null + ? auditUtils.extractFromSeek(seek, (size) => new Uint8Array(size)) + .auditEventId + : undefined; + const seekEndAuditEventId = + seekEnd != null + ? auditUtils.extractFromSeek(seekEnd, (size) => + new Uint8Array(size).fill(0xff), + ).auditEventId + : undefined; + let resolveFinishedP: (() => void) | undefined; let finishedPSignal: AbortSignal | undefined; const finishedP = new PromiseCancellable((resolveP, _, signal) => { @@ -346,11 +390,10 @@ class Audit { valueAsBuffer: false, reverse: order !== 'asc', limit, + gte: seekAuditEventId?.toBuffer(), + lte: seekEndAuditEventId?.toBuffer(), }, ); - if (seek != null) { - iterator.seek(seek.toBuffer()); - } for await (const [, auditEvent] of iterator) { yield auditEvent; finishedPSignal?.throwIfAborted(); @@ -366,11 +409,10 @@ class Audit { valueAsBuffer: false, reverse: order !== 'asc', limit, + gte: seekAuditEventId?.toBuffer(), + lte: seekEndAuditEventId?.toBuffer(), }, ); - if (seek != null) { - iterator.seek(seek.toBuffer()); - } for await (const [keyPath] of iterator) { const key = keyPath.at(-1)! as Buffer; const event = await tran.get>([ @@ -392,7 +434,10 @@ class Audit { @ready(new auditErrors.ErrorAuditNotRunning()) public async getAuditMetric( metricPath: T, - options: { from?: Date | number; to?: Date | number } = {}, + options: { + seek?: AuditEventId | Date | number; + seekEnd?: AuditEventId | Date | number; + } = {}, tran?: DBTransaction, ): Promise> { if (tran == null) { @@ -401,27 +446,25 @@ class Audit { ); } - let fromEpoch = - options.from instanceof Date ? options.from.getTime() : options.from; - let toEpoch = - options.to instanceof Date ? options.to.getTime() : options.to; - - let fromIdBuffer: Buffer | undefined; - if (fromEpoch != null) { - fromIdBuffer = ids - .generateAuditEventIdFromEpoch( - fromEpoch, - (size) => new Uint8Array(size), - ) - .toBuffer(); + let seekIdBuffer: Buffer | undefined; + let seekTimestamp: number | undefined; + if (options.seek != null) { + const seekData = auditUtils.extractFromSeek( + options.seek, + (size) => new Uint8Array(size), + ); + seekIdBuffer = seekData.auditEventId.toBuffer(); + seekTimestamp = seekData.timestamp; } - let toIdBuffer: Buffer | undefined; - if (toEpoch != null) { - toIdBuffer = ids - .generateAuditEventIdFromEpoch(toEpoch, (size) => - new Uint8Array(size).fill(0xff), - ) - .toBuffer(); + + let seekEndIdBuffer: Buffer | undefined; + let seekEndTimestamp: number | undefined; + if (options.seekEnd != null) { + const seekEndData = auditUtils.extractFromSeek(options.seekEnd, (size) => + new Uint8Array(size).fill(0xff), + ); + seekEndIdBuffer = seekEndData.auditEventId.toBuffer(); + seekEndTimestamp = seekEndData.timestamp; } if (metricPath[0] === 'node') { @@ -447,25 +490,25 @@ class Audit { keyAsBuffer: true, keys: true, values: false, - gte: fromIdBuffer, - lte: toIdBuffer, + gte: seekIdBuffer, + lte: seekEndIdBuffer, }, )) { const key = keyPath.at(-1)! as Buffer; if (metric.data.total === 0) { - fromEpoch = sortableIdUtils.extractTs(key) * 1000; + seekTimestamp = sortableIdUtils.extractTs(key) * 1000; } else { lastKey = key; } metric.data.total += 1; } - if (fromEpoch != null) { + if (seekTimestamp != null) { if (lastKey != null) { - toEpoch = sortableIdUtils.extractTs(lastKey) * 1000; + seekEndTimestamp = sortableIdUtils.extractTs(lastKey) * 1000; } else { - toEpoch = Date.now(); + seekEndTimestamp = Date.now(); } - const timeframeTime = toEpoch - fromEpoch; + const timeframeTime = seekEndTimestamp - seekTimestamp; const timeframeMinutes = timeframeTime / 60_000; const timeframeHours = timeframeMinutes / 60; const timeframeDays = timeframeHours / 24; diff --git a/src/audit/utils.ts b/src/audit/utils.ts index cbd189a0a..2cd198062 100644 --- a/src/audit/utils.ts +++ b/src/audit/utils.ts @@ -3,15 +3,44 @@ import type { AuditEventNodeConnectionReverse, } from './types'; import type * as nodesEvents from '../nodes/events'; +import type { AuditEventId } from '../ids'; +import { IdInternal } from '@matrixai/id'; +import * as sortableIdUtils from '@matrixai/id/dist/IdSortable'; import * as nodesUtils from '../nodes/utils'; import { createAuditEventIdGenerator, encodeAuditEventId, decodeAuditEventId, + generateAuditEventIdFromTimestamp, } from '../ids'; // Events +function extractFromSeek( + seek: AuditEventId | number | Date, + randomSource?: (size: number) => Uint8Array, +): { + auditEventId: AuditEventId; + timestamp: number; +} { + let auditEventId: AuditEventId; + let timestamp: number | undefined; + if (seek instanceof IdInternal) { + auditEventId = seek; + timestamp = sortableIdUtils.extractTs(seek.toBuffer()) * 1000; + } else if (typeof seek === 'number') { + timestamp = seek; + auditEventId = generateAuditEventIdFromTimestamp(seek, randomSource); + } else { + timestamp = seek.getTime(); + auditEventId = generateAuditEventIdFromTimestamp(timestamp, randomSource); + } + return { + auditEventId, + timestamp, + }; +} + // Nodes const nodeConnectionReverseTopicPath = [ @@ -69,6 +98,7 @@ const nodeConnectionOutboundMetricPath = [ ] as const; export { + extractFromSeek, createAuditEventIdGenerator, encodeAuditEventId, decodeAuditEventId, diff --git a/src/client/callers/auditEventsGet.ts b/src/client/callers/auditEventsGet.ts index 574a9f853..cbca03e7a 100644 --- a/src/client/callers/auditEventsGet.ts +++ b/src/client/callers/auditEventsGet.ts @@ -15,9 +15,11 @@ type CallerTypes = HandlerTypes; type AuditEventsGetTypeOverride = ( input: ClientRPCRequestParams<{ - seek?: AuditEventIdEncoded; + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; order?: 'asc' | 'desc'; limit?: number; + awaitFutureEvents?: boolean; }> & { path: T; }, diff --git a/src/client/callers/auditMetricGet.ts b/src/client/callers/auditMetricGet.ts index 957209f13..2d041c14e 100644 --- a/src/client/callers/auditMetricGet.ts +++ b/src/client/callers/auditMetricGet.ts @@ -1,5 +1,6 @@ import type { HandlerTypes } from '@matrixai/rpc'; import type { ContextTimedInput } from '@matrixai/contexts'; +import type { AuditEventIdEncoded } from '../../ids'; import type { MetricPath, MetricPathToAuditMetric } from '../../audit/types'; import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types'; import type AuditMetricGet from '../handlers/AuditMetricGet'; @@ -9,8 +10,8 @@ type CallerTypes = HandlerTypes; type AuditMetricGetTypeOverride = ( input: ClientRPCRequestParams<{ - from?: number; - to?: number; + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; }> & { path: T; }, diff --git a/src/client/handlers/AuditEventsGet.ts b/src/client/handlers/AuditEventsGet.ts index 06f0bd6d1..9a561f529 100644 --- a/src/client/handlers/AuditEventsGet.ts +++ b/src/client/handlers/AuditEventsGet.ts @@ -7,7 +7,7 @@ import type { TopicSubPathToAuditEvent, } from '../../audit/types'; import type { Audit } from '../../audit'; -import type { AuditEventIdEncoded } from '../../ids'; +import type { AuditEventId, AuditEventIdEncoded } from '../../ids'; import { ServerHandler } from '@matrixai/rpc'; import * as auditUtils from '../../audit/utils'; @@ -17,17 +17,28 @@ class AuditEventsGet extends ServerHandler< }, ClientRPCRequestParams<{ path: TopicSubPath & Array; - seek?: AuditEventIdEncoded; + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; order?: 'asc' | 'desc'; limit?: number; + awaitFutureEvents?: boolean; }>, ClientRPCResponseResult > { public async *handle( - input: ClientRPCRequestParams<{ - seek?: AuditEventIdEncoded; + { + path, + seek, + seekEnd, + order = 'asc', + limit, + awaitFutureEvents = false, + }: ClientRPCRequestParams<{ + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; order?: 'asc' | 'desc'; limit?: number; + awaitFutureEvents?: boolean; }> & { path: T; }, @@ -40,14 +51,35 @@ class AuditEventsGet extends ServerHandler< > > { const { audit } = this.container; - const iterator = audit.getAuditEvents(input.path, { - seek: - input.seek != null - ? auditUtils.decodeAuditEventId(input.seek) - : undefined, - order: input.order, - limit: input.limit, - }); + let iterator: AsyncGenerator>; + let seek_: AuditEventId | number | undefined; + if (seek != null) { + seek_ = + typeof seek === 'string' ? auditUtils.decodeAuditEventId(seek) : seek; + } + let seekEnd_: AuditEventId | number | undefined; + if (seekEnd != null) { + seekEnd_ = + typeof seekEnd === 'string' + ? auditUtils.decodeAuditEventId(seekEnd) + : seekEnd; + } + // If the call is descending chronologically, or does not want to await future events, + // it should not await future events. + if (!awaitFutureEvents || order === 'desc') { + iterator = audit.getAuditEvents(path, { + seek: seek_, + seekEnd: seekEnd_, + order: order, + limit: limit, + }); + } else { + iterator = audit.getAuditEventsLongRunning(path, { + seek: seek_, + seekEnd: seekEnd_, + limit: limit, + }); + } ctx.signal.addEventListener('abort', async () => { await iterator.return(ctx.signal.reason); }); diff --git a/src/client/handlers/AuditMetricGet.ts b/src/client/handlers/AuditMetricGet.ts index 3e1595ca8..2160ac035 100644 --- a/src/client/handlers/AuditMetricGet.ts +++ b/src/client/handlers/AuditMetricGet.ts @@ -5,7 +5,9 @@ import type { MetricPathToAuditMetric, } from '../../audit/types'; import type { Audit } from '../../audit'; +import type { AuditEventId, AuditEventIdEncoded } from '../../ids'; import { UnaryHandler } from '@matrixai/rpc'; +import * as auditUtils from '../../audit/utils'; class AuditMetricGet extends UnaryHandler< { @@ -13,15 +15,19 @@ class AuditMetricGet extends UnaryHandler< }, ClientRPCRequestParams<{ path: MetricPath & Array; - from?: number; - to?: number; + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; }>, ClientRPCResponseResult > { public handle = async ( - input: ClientRPCRequestParams<{ - from?: number; - to?: number; + { + path, + seek, + seekEnd, + }: ClientRPCRequestParams<{ + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; }> & { path: T; }, @@ -30,9 +36,21 @@ class AuditMetricGet extends UnaryHandler< _ctx, ): Promise>> => { const { audit } = this.container; - return (await audit.getAuditMetric(input.path, { - from: input.from, - to: input.to, + let seek_: AuditEventId | number | undefined; + if (seek != null) { + seek_ = + typeof seek === 'string' ? auditUtils.decodeAuditEventId(seek) : seek; + } + let seekEnd_: AuditEventId | number | undefined; + if (seekEnd != null) { + seekEnd_ = + typeof seekEnd === 'string' + ? auditUtils.decodeAuditEventId(seekEnd) + : seekEnd; + } + return (await audit.getAuditMetric(path, { + seek: seek_, + seekEnd: seekEnd_, })) as any; }; } diff --git a/src/client/handlers/index.ts b/src/client/handlers/index.ts index 1de45d01f..4449f294c 100644 --- a/src/client/handlers/index.ts +++ b/src/client/handlers/index.ts @@ -1,6 +1,7 @@ import type { DB } from '@matrixai/db'; import type Logger from '@matrixai/logger'; import type ACL from '../../acl/ACL'; +import type Audit from '../../audit/Audit'; import type KeyRing from '../../keys/KeyRing'; import type CertManager from '../../keys/CertManager'; import type SessionManager from '../../sessions/SessionManager'; @@ -82,6 +83,8 @@ import VaultsSecretsNewDir from './VaultsSecretsNewDir'; import VaultsSecretsRename from './VaultsSecretsRename'; import VaultsSecretsStat from './VaultsSecretsStat'; import VaultsVersion from './VaultsVersion'; +import AuditEventsGet from './AuditEventsGet'; +import AuditMetricGet from './AuditMetricGet'; /** * Server manifest factory. @@ -96,6 +99,7 @@ const serverManifest = (container: { identitiesManager: IdentitiesManager; discovery: Discovery; acl: ACL; + audit: Audit; notificationsManager: NotificationsManager; nodeManager: NodeManager; nodeConnectionManager: NodeConnectionManager; @@ -109,6 +113,8 @@ const serverManifest = (container: { agentStatus: new AgentStatus(container), agentStop: new AgentStop(container), agentUnlock: new AgentUnlock(container), + auditEventsGet: new AuditEventsGet(container), + auditMetricGet: new AuditMetricGet(container), gestaltsActionsGetByIdentity: new GestaltsActionsGetByIdentity(container), gestaltsActionsGetByNode: new GestaltsActionsGetByNode(container), gestaltsActionsSetByIdentity: new GestaltsActionsSetByIdentity(container), diff --git a/src/ids/index.ts b/src/ids/index.ts index b0e592140..1238a6aa9 100644 --- a/src/ids/index.ts +++ b/src/ids/index.ts @@ -78,7 +78,7 @@ function decodeAuditEventId( * @param epoch * @param randomSource */ -function generateAuditEventIdFromEpoch( +function generateAuditEventIdFromTimestamp( epoch: number, randomSource: (size: number) => Uint8Array = keysUtilsRandom.getRandomBytes, ): AuditEventId { @@ -521,7 +521,7 @@ function decodeNotificationId( export { createPermIdGenerator, createAuditEventIdGenerator, - generateAuditEventIdFromEpoch, + generateAuditEventIdFromTimestamp, encodeAuditEventId, decodeAuditEventId, createNodeIdGenerator, diff --git a/tests/audit/Audit.test.ts b/tests/audit/Audit.test.ts index b9030aca9..e999742fc 100644 --- a/tests/audit/Audit.test.ts +++ b/tests/audit/Audit.test.ts @@ -127,45 +127,47 @@ describe(Audit.name, () => { auditErrors.ErrorAuditNotRunning, ); }); - test('event dispatch', async () => { - const nodeId = testNodesUtils.generateRandomNodeId(); - const audit = await Audit.createAudit({ - db, - nodeConnectionManager: mockNodeConnectionManager, - logger, - }); - const eventDetail: ConnectionData = { - remoteHost: '::' as Host, - remoteNodeId: nodeId, - remotePort: 0 as Port, - }; - const auditEventData = { - ...eventDetail, - remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), - }; + describe('AuditEvent', () => { + test('event dispatch', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; - const { p: eventP, resolveP: resolveEventP } = utils.promise(); - audit.addEventListener(auditEvents.EventAuditAuditEventSet.name, () => - resolveEventP(), - ); + const { p: eventP, resolveP: resolveEventP } = utils.promise(); + audit.addEventListener(auditEvents.EventAuditAuditEventSet.name, () => + resolveEventP(), + ); - // @ts-ignore: kidnap protected - const handlerMap = audit.eventHandlerMap; - void handlerMap - .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) - ?.handler( - new nodeEvents.EventNodeConnectionManagerConnectionReverse({ - detail: eventDetail, - }), + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + void handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await eventP; + const iterator = audit.getAuditEvents(['node']); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, ); - await eventP; - const iterator = audit.getAuditEvents(['node']); - await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual({ - ...auditEventData, - type: 'reverse', }); - }); - describe('AuditEvent', () => { test('order', async () => { const nodeId = testNodesUtils.generateRandomNodeId(); const audit = await Audit.createAudit({ @@ -370,6 +372,125 @@ describe(Audit.name, () => { ); await audit.stop(); }); + test('long running with limit', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + let iterator = audit.getAuditEventsLongRunning(['node'], { limit: 1 }); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionForward) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await expect(iterator.next().then((e) => e.done)).resolves.toBe(true); + iterator = audit.getAuditEventsLongRunning(['node'], { limit: 3 }); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + await handlerMap + .get(nodeEvents.EventNodeConnectionManagerConnectionReverse) + ?.handler( + new nodeEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'reverse', + }, + ); + await expect(iterator.next().then((e) => e.done)).resolves.toBe(true); + await audit.stop(); + }); + test('long running with seekEnd', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const audit = await Audit.createAudit({ + db, + nodeConnectionManager: mockNodeConnectionManager, + logger, + }); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodeUtils.encodeNodeId(eventDetail.remoteNodeId), + }; + const date = Date.now(); + // @ts-ignore: kidnap protected + await audit.setAuditEvent(['node', 'connection', 'forward'], { + id: ids.generateAuditEventIdFromTimestamp(date), + data: { + ...auditEventData, + type: 'forward', + }, + }); + let iterator = audit.getAuditEventsLongRunning(['node'], { + seekEnd: date - 10, + }); + await expect(iterator.next().then((e) => e.done)).resolves.toBe(true); + iterator = audit.getAuditEventsLongRunning(['node'], { + seekEnd: date, + }); + await expect(iterator.next().then((e) => e.value!.data)).resolves.toEqual( + { + ...auditEventData, + type: 'forward', + }, + ); + // Need to do this to bump the promise + // @ts-ignore: kidnap protected + await audit.setAuditEvent(['node', 'connection', 'forward'], { + id: ids.generateAuditEventIdFromTimestamp(date + 10), + data: { + ...auditEventData, + type: 'forward', + }, + }); + await expect(iterator.next().then((e) => e.done)).resolves.toBe(true); + await audit.stop(); + }); test('node connection topic', async () => { const nodeId = testNodesUtils.generateRandomNodeId(); const audit = await Audit.createAudit({ @@ -460,7 +581,7 @@ describe(Audit.name, () => { for (const iterDate of dates) { // @ts-ignore: kidnap protected await audit.setAuditEvent(['node', 'connection', 'reverse'], { - id: ids.generateAuditEventIdFromEpoch(iterDate), + id: ids.generateAuditEventIdFromTimestamp(iterDate), data: { ...auditEventData, type: 'reverse', @@ -470,7 +591,7 @@ describe(Audit.name, () => { for (const iterDate of dates) { // @ts-ignore: kidnap protected await audit.setAuditEvent(['node', 'connection', 'forward'], { - id: ids.generateAuditEventIdFromEpoch(iterDate), + id: ids.generateAuditEventIdFromTimestamp(iterDate), data: { ...auditEventData, type: 'forward', @@ -510,7 +631,7 @@ describe(Audit.name, () => { await expect( audit .getAuditMetric(['node', 'connection', 'inbound'], { - from: date1DayAgo, + seek: date1DayAgo, }) .then((e) => e.data), ).resolves.toEqual({ @@ -523,7 +644,7 @@ describe(Audit.name, () => { await expect( audit .getAuditMetric(['node', 'connection', 'inbound'], { - to: date1MinuteAgo, + seekEnd: date1MinuteAgo, }) .then((e) => e.data), ).resolves.toEqual({ diff --git a/tests/client/handlers/audit.test.ts b/tests/client/handlers/audit.test.ts index b7adecdac..ca5d01a64 100644 --- a/tests/client/handlers/audit.test.ts +++ b/tests/client/handlers/audit.test.ts @@ -110,10 +110,17 @@ describe('auditEventGet', () => { }); }); test('cancels', async () => { - const callerInterface = await rpcClient.methods.auditEventsGet({ + let callerInterface = await rpcClient.methods.auditEventsGet({ path: [], }); - const reader = callerInterface.getReader(); + let reader = callerInterface.getReader(); + await reader.cancel(); + await expect(reader.closed).toResolve(); + callerInterface = await rpcClient.methods.auditEventsGet({ + path: [], + awaitFutureEvents: true, + }); + reader = callerInterface.getReader(); await reader.cancel(); await expect(reader.closed).toResolve(); }); @@ -137,14 +144,34 @@ describe('auditEventGet', () => { detail: eventDetail, }), ); - const callerInterface = await rpcClient.methods.auditEventsGet({ + let callerInterface: any = await rpcClient.methods.auditEventsGet({ path: ['node', 'connection', 'reverse'], }); - const reader = callerInterface.getReader(); + let reader = callerInterface.getReader(); + await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'reverse', + }); + callerInterface = await rpcClient.methods.auditEventsGet({ + path: ['node', 'connection'], + awaitFutureEvents: true, + }); + reader = callerInterface.getReader(); await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ ...auditEventData, type: 'reverse', }); + await handlerMap + .get(nodesEvents.EventNodeConnectionManagerConnectionForward) + ?.handler( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); + await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'forward', + }); }); });