Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/heavy-parts-grow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Added a new events system for subscribing to status changes and other internal events.
169 changes: 169 additions & 0 deletions packages/db/src/collection-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import type { Collection } from "./collection"
import type { CollectionStatus } from "./types"

/**
* Event emitted when the collection status changes
*/
export interface CollectionStatusChangeEvent {
type: `status:change`
collection: Collection
previousStatus: CollectionStatus
status: CollectionStatus
}

/**
* Event emitted when the collection status changes to a specific status
*/
export interface CollectionStatusEvent<T extends CollectionStatus> {
type: `status:${T}`
collection: Collection
previousStatus: CollectionStatus
status: T
}

/**
* Event emitted when the number of subscribers to the collection changes
*/
export interface CollectionSubscribersChangeEvent {
type: `subscribers:change`
collection: Collection
previousSubscriberCount: number
subscriberCount: number
}

export type AllCollectionEvents = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Export a union: export type CollectionEvent = AllCollectionEvents[keyof AllCollectionEvents] for switch(event.type) ergonomics.

"status:change": CollectionStatusChangeEvent
"subscribers:change": CollectionSubscribersChangeEvent
} & {
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent<K>
}

export type CollectionEvent =
| AllCollectionEvents[keyof AllCollectionEvents]
| CollectionStatusChangeEvent
| CollectionSubscribersChangeEvent

export type CollectionEventHandler<T extends keyof AllCollectionEvents> = (
event: AllCollectionEvents[T]
) => void

export class CollectionEvents {
private collection: Collection<any, any, any, any, any>
private listeners = new Map<
keyof AllCollectionEvents,
Set<CollectionEventHandler<any>>
>()

constructor(collection: Collection<any, any, any, any, any>) {
this.collection = collection
}

on<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set())
}
this.listeners.get(event)!.add(callback)

return () => {
this.listeners.get(event)!.delete(callback)
}
}

once<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
const unsubscribe = this.on(event, (eventPayload) => {
callback(eventPayload)
unsubscribe()
})
return unsubscribe
}

off<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
this.listeners.get(event)?.delete(callback)
}

waitFor<T extends keyof AllCollectionEvents>(
event: T,
timeout?: number
): Promise<AllCollectionEvents[T]> {
return new Promise((resolve, reject) => {
let timeoutId: NodeJS.Timeout | undefined
const unsubscribe = this.on(event, (eventPayload) => {
if (timeoutId) {
clearTimeout(timeoutId)
timeoutId = undefined
}
resolve(eventPayload)
unsubscribe()
})
if (timeout) {
timeoutId = setTimeout(() => {
timeoutId = undefined
unsubscribe()
reject(new Error(`Timeout waiting for event ${event}`))
}, timeout)
}
})
}

emit<T extends keyof AllCollectionEvents>(
event: T,
eventPayload: AllCollectionEvents[T]
) {
this.listeners.get(event)?.forEach((listener) => {
try {
listener(eventPayload)
} catch (error) {
// Re-throw in a microtask to surface the error
queueMicrotask(() => {
throw error
})
}
})
}

emitStatusChange<T extends CollectionStatus>(
status: T,
previousStatus: CollectionStatus
) {
this.emit(`status:change`, {
type: `status:change`,
collection: this.collection,
previousStatus,
status,
})

// Emit specific status event using type assertion
const eventKey: `status:${T}` = `status:${status}`
this.emit(eventKey, {
type: eventKey,
collection: this.collection,
previousStatus,
status,
} as AllCollectionEvents[`status:${T}`])
}

emitSubscribersChange(
subscriberCount: number,
previousSubscriberCount: number
) {
this.emit(`subscribers:change`, {
type: `subscribers:change`,
collection: this.collection,
previousSubscriberCount,
subscriberCount,
})
}

cleanup() {
this.listeners.clear()
}
}
76 changes: 76 additions & 0 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import {
UpdateKeyNotFoundError,
} from "./errors"
import { createFilteredCallback, currentStateAsChanges } from "./change-events"
import { CollectionEvents } from "./collection-events.js"
import type {
AllCollectionEvents,
CollectionEventHandler,
} from "./collection-events.js"
import type { Transaction } from "./transactions"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { SingleRowRefProxy } from "./query/builder/ref-proxy"
Expand Down Expand Up @@ -267,6 +272,9 @@ export class CollectionImpl<
private preloadPromise: Promise<void> | null = null
private syncCleanupFn: (() => void) | null = null

// Event system
private events: CollectionEvents

/**
* Register a callback to be executed when the collection first becomes ready
* Useful for preloading collections
Expand Down Expand Up @@ -345,6 +353,13 @@ export class CollectionImpl<
return this._status
}

/**
* Get the number of subscribers to the collection
*/
public get subscriberCount(): number {
return this.activeSubscribersCount
}

/**
* Validates that the collection is in a usable state for data operations
* @private
Expand Down Expand Up @@ -395,6 +410,7 @@ export class CollectionImpl<
*/
private setStatus(newStatus: CollectionStatus): void {
this.validateStatusTransition(this._status, newStatus)
const previousStatus = this._status
this._status = newStatus

// Resolve indexes when collection becomes ready
Expand All @@ -404,6 +420,9 @@ export class CollectionImpl<
console.warn(`Failed to resolve indexes:`, error)
})
}

// Emit event
this.events.emitStatusChange(newStatus, previousStatus)
}

/**
Expand Down Expand Up @@ -445,6 +464,9 @@ export class CollectionImpl<
this.syncedData = new Map<TKey, TOutput>()
}

// Set up event system
this.events = new CollectionEvents(this)

// Only start sync immediately if explicitly enabled
if (config.startSync === true) {
this.startSync()
Expand Down Expand Up @@ -663,6 +685,8 @@ export class CollectionImpl<
this.batchedEvents = []
this.shouldBatchEvents = false

this.events.cleanup()

// Update status
this.setStatus(`cleaned-up`)

Expand Down Expand Up @@ -707,26 +731,38 @@ export class CollectionImpl<
* Increment the active subscribers count and start sync if needed
*/
private addSubscriber(): void {
const previousSubscriberCount = this.activeSubscribersCount
this.activeSubscribersCount++
this.cancelGCTimer()

// Start sync if collection was cleaned up
if (this._status === `cleaned-up` || this._status === `idle`) {
this.startSync()
}

this.events.emitSubscribersChange(
this.activeSubscribersCount,
previousSubscriberCount
)
}

/**
* Decrement the active subscribers count and start GC timer if needed
*/
private removeSubscriber(): void {
const previousSubscriberCount = this.activeSubscribersCount
this.activeSubscribersCount--

if (this.activeSubscribersCount === 0) {
this.startGCTimer()
} else if (this.activeSubscribersCount < 0) {
throw new NegativeActiveSubscribersError()
}

this.events.emitSubscribersChange(
this.activeSubscribersCount,
previousSubscriberCount
)
}

/**
Expand Down Expand Up @@ -2485,4 +2521,44 @@ export class CollectionImpl<

this.recomputeOptimisticState(false)
}

/**
* Subscribe to a collection event
*/
public on<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
return this.events.on(event, callback)
}

/**
* Subscribe to a collection event once
*/
public once<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
return this.events.once(event, callback)
}

/**
* Unsubscribe from a collection event
*/
public off<T extends keyof AllCollectionEvents>(
event: T,
callback: CollectionEventHandler<T>
) {
this.events.off(event, callback)
}

/**
* Wait for a collection event
*/
public waitFor<T extends keyof AllCollectionEvents>(
event: T,
timeout?: number
) {
return this.events.waitFor(event, timeout)
}
}
Loading
Loading