From a4b1204ee3f6bc9956e234017e0572a896e23f93 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 15 Sep 2025 18:34:39 +0100 Subject: [PATCH 1/2] new collection event system --- .changeset/heavy-parts-grow.md | 5 + packages/db/src/collection-events.ts | 127 ++++++++++++++++++++ packages/db/src/collection.ts | 64 ++++++++++ packages/db/tests/collection-events.test.ts | 124 +++++++++++++++++++ 4 files changed, 320 insertions(+) create mode 100644 .changeset/heavy-parts-grow.md create mode 100644 packages/db/src/collection-events.ts create mode 100644 packages/db/tests/collection-events.test.ts diff --git a/.changeset/heavy-parts-grow.md b/.changeset/heavy-parts-grow.md new file mode 100644 index 000000000..f802ec21c --- /dev/null +++ b/.changeset/heavy-parts-grow.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Added a new events system for subscribing to status changes and other internal events. diff --git a/packages/db/src/collection-events.ts b/packages/db/src/collection-events.ts new file mode 100644 index 000000000..fd1e9c221 --- /dev/null +++ b/packages/db/src/collection-events.ts @@ -0,0 +1,127 @@ +import type { Collection } from "./collection" +import type { CollectionStatus } from "./types" + +/** + * Event emitted when the collection status changes + */ +export interface CollectionStatusChangeEvent { + type: `status:change` + collection: Collection + previousStatus: CollectionStatus + status: CollectionStatus +} + +/** + * Event emitted when the collection status changes to a specific status + */ +export interface CollectionStatusEvent { + type: `status:${T}` + collection: Collection + previousStatus: CollectionStatus + status: T +} + +/** + * Event emitted when the number of subscribers to the collection changes + */ +export interface CollectionSubscribersChangeEvent { + type: `subscribers:change` + collection: Collection + previousSubscriberCount: number + subscriberCount: number +} + +export type AllCollectionEvents = { + "status:change": CollectionStatusChangeEvent + "subscribers:change": CollectionSubscribersChangeEvent +} & { + [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent +} + +export type CollectionEventHandler = ( + event: AllCollectionEvents[T] +) => void + +export class CollectionEvents { + private collection: Collection + private listeners = new Map< + keyof AllCollectionEvents, + Set> + >() + + constructor(collection: Collection) { + this.collection = collection + } + + on( + event: T, + callback: CollectionEventHandler + ) { + if (!this.listeners.has(event)) { + this.listeners.set(event, new Set()) + } + this.listeners.get(event)!.add(callback) + + return () => { + this.listeners.get(event)!.delete(callback) + } + } + + once( + event: T, + callback: CollectionEventHandler + ) { + const unsubscribe = this.on(event, (eventPayload) => { + callback(eventPayload) + unsubscribe() + }) + return unsubscribe + } + + off( + event: T, + callback: CollectionEventHandler + ) { + this.listeners.get(event)?.delete(callback) + } + + emit( + event: T, + eventPayload: AllCollectionEvents[T] + ) { + this.listeners.get(event)?.forEach((listener) => listener(eventPayload)) + } + + emitStatusChange( + status: T, + previousStatus: CollectionStatus + ) { + this.emit(`status:change`, { + type: `status:change`, + collection: this.collection, + previousStatus, + status, + }) + + // Emit specific status event using type assertion + const eventKey: `status:${T}` = `status:${status}` + this.emit(eventKey, { + type: eventKey, + collection: this.collection, + previousStatus, + status, + } as AllCollectionEvents[`status:${T}`]) + } + + emitSubscribersChange( + subscriberCount: number, + previousSubscriberCount: number + ) { + this.emit(`subscribers:change`, { + type: `subscribers:change`, + collection: this.collection, + previousSubscriberCount, + subscriberCount, + }) + } +} diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index dc4cc9f2a..0dbf0cbdf 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -38,6 +38,11 @@ import { UpdateKeyNotFoundError, } from "./errors" import { createFilteredCallback, currentStateAsChanges } from "./change-events" +import { CollectionEvents } from "./collection-events.js" +import type { + AllCollectionEvents, + CollectionEventHandler, +} from "./collection-events.js" import type { Transaction } from "./transactions" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { SingleRowRefProxy } from "./query/builder/ref-proxy" @@ -267,6 +272,9 @@ export class CollectionImpl< private preloadPromise: Promise | null = null private syncCleanupFn: (() => void) | null = null + // Event system + private events: CollectionEvents + /** * Register a callback to be executed when the collection first becomes ready * Useful for preloading collections @@ -345,6 +353,13 @@ export class CollectionImpl< return this._status } + /** + * Get the number of subscribers to the collection + */ + public get subscriberCount(): number { + return this.activeSubscribersCount + } + /** * Validates that the collection is in a usable state for data operations * @private @@ -395,6 +410,7 @@ export class CollectionImpl< */ private setStatus(newStatus: CollectionStatus): void { this.validateStatusTransition(this._status, newStatus) + const previousStatus = this._status this._status = newStatus // Resolve indexes when collection becomes ready @@ -404,6 +420,9 @@ export class CollectionImpl< console.warn(`Failed to resolve indexes:`, error) }) } + + // Emit event + this.events.emitStatusChange(newStatus, previousStatus) } /** @@ -445,6 +464,9 @@ export class CollectionImpl< this.syncedData = new Map() } + // Set up event system + this.events = new CollectionEvents(this) + // Only start sync immediately if explicitly enabled if (config.startSync === true) { this.startSync() @@ -707,6 +729,7 @@ export class CollectionImpl< * Increment the active subscribers count and start sync if needed */ private addSubscriber(): void { + const previousSubscriberCount = this.activeSubscribersCount this.activeSubscribersCount++ this.cancelGCTimer() @@ -714,12 +737,18 @@ export class CollectionImpl< if (this._status === `cleaned-up` || this._status === `idle`) { this.startSync() } + + this.events.emitSubscribersChange( + this.activeSubscribersCount, + previousSubscriberCount + ) } /** * Decrement the active subscribers count and start GC timer if needed */ private removeSubscriber(): void { + const previousSubscriberCount = this.activeSubscribersCount this.activeSubscribersCount-- if (this.activeSubscribersCount === 0) { @@ -727,6 +756,11 @@ export class CollectionImpl< } else if (this.activeSubscribersCount < 0) { throw new NegativeActiveSubscribersError() } + + this.events.emitSubscribersChange( + this.activeSubscribersCount, + previousSubscriberCount + ) } /** @@ -2493,4 +2527,34 @@ export class CollectionImpl< this.recomputeOptimisticState(false) } + + /** + * Subscribe to a collection event + */ + public on( + event: T, + callback: CollectionEventHandler + ) { + return this.events.on(event, callback) + } + + /** + * Subscribe to a collection event once + */ + public once( + event: T, + callback: CollectionEventHandler + ) { + return this.events.once(event, callback) + } + + /** + * Unsubscribe from a collection event + */ + public off( + event: T, + callback: CollectionEventHandler + ) { + this.events.off(event, callback) + } } diff --git a/packages/db/tests/collection-events.test.ts b/packages/db/tests/collection-events.test.ts new file mode 100644 index 000000000..e3b1bec17 --- /dev/null +++ b/packages/db/tests/collection-events.test.ts @@ -0,0 +1,124 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" +import { createCollection } from "../src/collection.js" + +describe(`Collection Events System`, () => { + let collection: ReturnType + let mockSync: ReturnType + + beforeEach(() => { + mockSync = vi.fn() + collection = createCollection({ + id: `test-collection`, + getKey: (item: any) => item.id, + sync: { + sync: mockSync, + }, + }) + }) + + afterEach(() => { + vi.clearAllMocks() + }) + + describe(`Status Change Events`, () => { + it(`should emit status:change and specific status events`, () => { + const statusChangeListener = vi.fn() + const statusLoadingListener = vi.fn() + + collection.on(`status:change`, statusChangeListener) + collection.on(`status:loading`, statusLoadingListener) + + collection.startSyncImmediate() + + expect(statusChangeListener).toHaveBeenCalledWith({ + type: `status:change`, + collection, + previousStatus: `idle`, + status: `loading`, + }) + + expect(statusLoadingListener).toHaveBeenCalledWith({ + type: `status:loading`, + collection, + previousStatus: `idle`, + status: `loading`, + }) + }) + }) + + describe(`Subscriber Count Change Events`, () => { + it(`should emit subscribers:change when subscriber count changes`, () => { + const subscribersChangeListener = vi.fn() + collection.on(`subscribers:change`, subscribersChangeListener) + + const unsubscribe = collection.subscribeChanges(() => {}) + + expect(subscribersChangeListener).toHaveBeenCalledWith({ + type: `subscribers:change`, + collection, + previousSubscriberCount: 0, + subscriberCount: 1, + }) + + unsubscribe() + }) + }) + + describe(`Event Subscription Management`, () => { + it(`should support on(), once(), and off() methods`, () => { + const listener = vi.fn() + + // Test on() returns unsubscribe function + const unsubscribe = collection.on(`status:change`, listener) + expect(typeof unsubscribe).toBe(`function`) + + // Test once() auto-unsubscribes after first call + const onceListener = vi.fn() + collection.once(`status:change`, onceListener) + + collection.startSyncImmediate() + expect(listener).toHaveBeenCalledTimes(1) + expect(onceListener).toHaveBeenCalledTimes(1) + + // Second call should not trigger once listener + collection.startSyncImmediate() + expect(onceListener).toHaveBeenCalledTimes(1) + + // Test off() removes listener + collection.off(`status:change`, listener) + collection.startSyncImmediate() + expect(listener).toHaveBeenCalledTimes(1) // Still only called once + + unsubscribe() + }) + }) + + describe(`Event Structure`, () => { + it(`should emit events with correct structure`, () => { + const statusListener = vi.fn() + const subscribersListener = vi.fn() + + collection.on(`status:change`, statusListener) + collection.on(`subscribers:change`, subscribersListener) + + collection.startSyncImmediate() + const unsubscribe = collection.subscribeChanges(() => {}) + + expect(statusListener.mock.calls[0]?.[0]).toMatchObject({ + type: `status:change`, + collection, + previousStatus: expect.any(String), + status: expect.any(String), + }) + + expect(subscribersListener.mock.calls[0]?.[0]).toMatchObject({ + type: `subscribers:change`, + collection, + previousSubscriberCount: expect.any(Number), + subscriberCount: expect.any(Number), + }) + + unsubscribe() + }) + }) +}) From 144165e3f0530ebcf679553db0105e13546c0e78 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 18 Sep 2025 08:57:20 +0100 Subject: [PATCH 2/2] address review --- packages/db/src/collection-events.ts | 44 ++++++++++++++++++++- packages/db/src/collection.ts | 12 ++++++ packages/db/tests/collection-events.test.ts | 33 ++++++++++++++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/packages/db/src/collection-events.ts b/packages/db/src/collection-events.ts index fd1e9c221..e0f4ddcec 100644 --- a/packages/db/src/collection-events.ts +++ b/packages/db/src/collection-events.ts @@ -38,6 +38,11 @@ export type AllCollectionEvents = { [K in CollectionStatus as `status:${K}`]: CollectionStatusEvent } +export type CollectionEvent = + | AllCollectionEvents[keyof AllCollectionEvents] + | CollectionStatusChangeEvent + | CollectionSubscribersChangeEvent + export type CollectionEventHandler = ( event: AllCollectionEvents[T] ) => void @@ -85,11 +90,44 @@ export class CollectionEvents { this.listeners.get(event)?.delete(callback) } + waitFor( + event: T, + timeout?: number + ): Promise { + return new Promise((resolve, reject) => { + let timeoutId: NodeJS.Timeout | undefined + const unsubscribe = this.on(event, (eventPayload) => { + if (timeoutId) { + clearTimeout(timeoutId) + timeoutId = undefined + } + resolve(eventPayload) + unsubscribe() + }) + if (timeout) { + timeoutId = setTimeout(() => { + timeoutId = undefined + unsubscribe() + reject(new Error(`Timeout waiting for event ${event}`)) + }, timeout) + } + }) + } + emit( event: T, eventPayload: AllCollectionEvents[T] ) { - this.listeners.get(event)?.forEach((listener) => listener(eventPayload)) + this.listeners.get(event)?.forEach((listener) => { + try { + listener(eventPayload) + } catch (error) { + // Re-throw in a microtask to surface the error + queueMicrotask(() => { + throw error + }) + } + }) } emitStatusChange( @@ -124,4 +162,8 @@ export class CollectionEvents { subscriberCount, }) } + + cleanup() { + this.listeners.clear() + } } diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index a872a3632..837eea0b7 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -685,6 +685,8 @@ export class CollectionImpl< this.batchedEvents = [] this.shouldBatchEvents = false + this.events.cleanup() + // Update status this.setStatus(`cleaned-up`) @@ -2549,4 +2551,14 @@ export class CollectionImpl< ) { this.events.off(event, callback) } + + /** + * Wait for a collection event + */ + public waitFor( + event: T, + timeout?: number + ) { + return this.events.waitFor(event, timeout) + } } diff --git a/packages/db/tests/collection-events.test.ts b/packages/db/tests/collection-events.test.ts index e3b1bec17..d49caef8c 100644 --- a/packages/db/tests/collection-events.test.ts +++ b/packages/db/tests/collection-events.test.ts @@ -121,4 +121,37 @@ describe(`Collection Events System`, () => { unsubscribe() }) }) + + describe(`waitFor Method`, () => { + it(`should resolve when event is emitted without timeout`, async () => { + const waitPromise = collection.waitFor(`status:change`) + + // Trigger the event + collection.startSyncImmediate() + + const event = await waitPromise + + expect(event).toMatchObject({ + type: `status:change`, + collection, + previousStatus: `idle`, + status: `loading`, + }) + }) + + it(`should reject when timeout is reached`, async () => { + vi.useFakeTimers() + + const waitPromise = collection.waitFor(`status:change`, 1000) + + // Fast-forward time beyond the timeout + vi.advanceTimersByTime(1001) + + await expect(waitPromise).rejects.toThrow( + `Timeout waiting for event status:change` + ) + + vi.useRealTimers() + }) + }) })