diff --git a/.changeset/afraid-camels-tickle.md b/.changeset/afraid-camels-tickle.md new file mode 100644 index 000000000..9062592e7 --- /dev/null +++ b/.changeset/afraid-camels-tickle.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +optimise the live query graph execution by removing recursive calls to graph.run diff --git a/.changeset/fifty-ways-hang.md b/.changeset/fifty-ways-hang.md new file mode 100644 index 000000000..57a99d4e5 --- /dev/null +++ b/.changeset/fifty-ways-hang.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Fix a bug with distinct operator diff --git a/.changeset/plain-lights-end.md b/.changeset/plain-lights-end.md new file mode 100644 index 000000000..63610fc13 --- /dev/null +++ b/.changeset/plain-lights-end.md @@ -0,0 +1,10 @@ +--- +"@tanstack/db": minor +"@tanstack/angular-db": patch +"@tanstack/svelte-db": patch +"@tanstack/react-db": patch +"@tanstack/solid-db": patch +"@tanstack/vue-db": patch +--- + +Let collection.subscribeChanges return a subscription object. Move all data loading code related to optimizations into that subscription object. diff --git a/packages/angular-db/src/index.ts b/packages/angular-db/src/index.ts index 01c010337..2a38ec086 100644 --- a/packages/angular-db/src/index.ts +++ b/packages/angular-db/src/index.ts @@ -162,11 +162,12 @@ export function injectLiveQuery(opts: any) { } // Subscribe to changes - unsub = currentCollection.subscribeChanges( + const subscription = currentCollection.subscribeChanges( (_: Array>) => { syncDataFromCollection(currentCollection) } ) + unsub = subscription.unsubscribe.bind(subscription) // Handle ready state currentCollection.onFirstReady(() => { diff --git a/packages/angular-db/tests/inject-live-query.test.ts b/packages/angular-db/tests/inject-live-query.test.ts index 84ed14e39..0e21cb735 100644 --- a/packages/angular-db/tests/inject-live-query.test.ts +++ b/packages/angular-db/tests/inject-live-query.test.ts @@ -102,7 +102,9 @@ function createMockCollection( size: () => map.size, subscribeChanges: (cb: (changes: Array) => void) => { subs.add(cb) - return () => subs.delete(cb) + return { + unsubscribe: () => subs.delete(cb), + } }, onFirstReady: (cb: () => void) => { if (status === `ready`) { diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index dc2f5a177..f1880858d 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -4,21 +4,25 @@ import { hash } from "../hashing/index.js" import { MultiSet } from "../multiset.js" import type { Hash } from "../hashing/index.js" import type { DifferenceStreamReader } from "../graph.js" -import type { IStreamBuilder } from "../types.js" +import type { IStreamBuilder, KeyValue } from "../types.js" type Multiplicity = number +type GetValue = T extends KeyValue ? V : never + /** * Operator that removes duplicates */ -export class DistinctOperator extends UnaryOperator { +export class DistinctOperator< + T extends KeyValue, +> extends UnaryOperator>> { #by: (value: T) => any #values: Map // keeps track of the number of times each value has been seen constructor( id: number, input: DifferenceStreamReader, - output: DifferenceStreamWriter, + output: DifferenceStreamWriter>>, by: (value: T) => any = (value: T) => value ) { super(id, input, output) @@ -39,12 +43,11 @@ export class DistinctOperator extends UnaryOperator { this.#values.get(hashedValue) ?? 0 const newMultiplicity = oldMultiplicity + diff - updatedValues.set(hashedValue, [newMultiplicity, value]) } } - const result: Array<[T, number]> = [] + const result: Array<[KeyValue>, number]> = [] // Check which values became visible or disappeared for (const [ @@ -62,11 +65,11 @@ export class DistinctOperator extends UnaryOperator { if (oldMultiplicity <= 0 && newMultiplicity > 0) { // The value wasn't present in the stream // but with this change it is now present in the stream - result.push([value, 1]) + result.push([[hash(this.#by(value)), value[1]], 1]) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was present in the stream // but with this change it is no longer present in the stream - result.push([value, -1]) + result.push([[hash(this.#by(value)), value[1]], -1]) } } @@ -79,7 +82,9 @@ export class DistinctOperator extends UnaryOperator { /** * Removes duplicate values */ -export function distinct(by: (value: T) => any = (value: T) => value) { +export function distinct>( + by: (value: T) => any = (value: T) => value +) { return (stream: IStreamBuilder): IStreamBuilder => { const output = new StreamBuilder( stream.graph, diff --git a/packages/db-ivm/tests/operators/distinct.test.ts b/packages/db-ivm/tests/operators/distinct.test.ts index 9a7645ded..aa865e25c 100644 --- a/packages/db-ivm/tests/operators/distinct.test.ts +++ b/packages/db-ivm/tests/operators/distinct.test.ts @@ -4,6 +4,7 @@ import { MultiSet } from "../../src/multiset.js" import { distinct } from "../../src/operators/distinct.js" import { output } from "../../src/operators/output.js" import { MessageTracker, assertResults } from "../test-utils.js" +import { hash } from "../../src/hashing/index.js" describe(`Operators`, () => { describe(`Efficient distinct operation`, () => { @@ -39,9 +40,9 @@ function testDistinct() { expect(data).toEqual([ [ - [[1, `a`], 1], - [[2, `b`], 1], - [[2, `c`], 1], + [[hash([1, `a`]), `a`], 1], + [[hash([2, `b`]), `b`], 1], + [[hash([2, `c`]), `c`], 1], ], ]) }) @@ -74,7 +75,7 @@ function testDistinct() { graph.run() - const data = messages.map((m) => m.getInner())[0] + const data = messages.map((m) => m.getInner())[0]! const countries = data .map(([[_, value], multiplicity]) => [value.country, multiplicity]) .sort() @@ -118,8 +119,8 @@ function testDistinct() { `distinct with updates - initial`, initialResult, [ - [1, `a`], - [1, `b`], + [hash([1, `a`]), `a`], + [hash([1, `b`]), `b`], ], // Should have both distinct values 4 // Max expected messages ) @@ -140,7 +141,7 @@ function testDistinct() { assertResults( `distinct with updates - second batch`, secondResult, - [[1, `c`]], // Should only have 'c' remaining + [[hash([1, `c`]), `c`]], // Should only have 'c' remaining 4 // Max expected messages ) @@ -186,9 +187,9 @@ function testDistinct() { expect(data).toEqual([ [ - [[`key1`, 1], 1], - [[`key1`, 2], 1], - [[`key2`, 1], 1], + [[hash([`key1`, 1]), 1], 1], + [[hash([`key1`, 2]), 2], 1], + [[hash([`key2`, 1]), 1], 1], ], ]) }) @@ -224,8 +225,8 @@ function testDistinct() { `distinct with multiple batches that cancel out`, result, [ - [`key1`, 1], // Should remain (multiplicity 2 -> 1 in distinct) - [`key2`, 1], // Should remain (multiplicity 2 -> 1 in distinct) + [hash([`key1`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct) + [hash([`key2`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct) ], 6 // Max expected messages (generous upper bound) ) diff --git a/packages/db/src/change-events.ts b/packages/db/src/change-events.ts index d06b8a933..d11fa70ba 100644 --- a/packages/db/src/change-events.ts +++ b/packages/db/src/change-events.ts @@ -46,8 +46,8 @@ export function currentStateAsChanges< TKey extends string | number, >( collection: CollectionLike, - options: CurrentStateAsChangesOptions = {} -): Array> { + options: CurrentStateAsChangesOptions = {} +): Array> | void { // Helper function to collect filtered results const collectFilteredResults = ( filterFn?: (value: T) => boolean @@ -66,31 +66,17 @@ export function currentStateAsChanges< return result } - if (!options.where && !options.whereExpression) { + // TODO: handle orderBy and limit options + // by calling optimizeOrderedLimit + + if (!options.where) { // No filtering, return all items return collectFilteredResults() } // There's a where clause, let's see if we can use an index try { - let expression: BasicExpression - - if (options.whereExpression) { - // Use the pre-compiled expression directly - expression = options.whereExpression - } else if (options.where) { - // Create the single-row refProxy for the callback - const singleRowRefProxy = createSingleRowRefProxy() - - // Execute the callback to get the expression - const whereExpression = options.where(singleRowRefProxy) - - // Convert the result to a BasicExpression - expression = toExpression(whereExpression) - } else { - // This should never happen due to the check above, but TypeScript needs it - return [] - } + const expression: BasicExpression = options.where // Try to optimize the query using indexes const optimizationResult = optimizeExpressionWithIndexes( @@ -113,11 +99,11 @@ export function currentStateAsChanges< } return result } else { - // No index found or complex expression, fall back to full scan with filter - const filterFn = options.where - ? createFilterFunction(options.where) - : createFilterFunctionFromExpression(expression) + if (options.optimizedOnly) { + return + } + const filterFn = createFilterFunctionFromExpression(expression) return collectFilteredResults(filterFn) } } catch (error) { @@ -127,9 +113,11 @@ export function currentStateAsChanges< error ) - const filterFn = options.where - ? createFilterFunction(options.where) - : createFilterFunctionFromExpression(options.whereExpression!) + const filterFn = createFilterFunctionFromExpression(options.where) + + if (options.optimizedOnly) { + return + } return collectFilteredResults(filterFn) } @@ -201,11 +189,9 @@ export function createFilterFunctionFromExpression( */ export function createFilteredCallback( originalCallback: (changes: Array>) => void, - options: SubscribeChangesOptions + options: SubscribeChangesOptions ): (changes: Array>) => void { - const filterFn = options.whereExpression - ? createFilterFunctionFromExpression(options.whereExpression) - : createFilterFunction(options.where!) + const filterFn = createFilterFunctionFromExpression(options.whereExpression!) return (changes: Array>) => { const filteredChanges: Array> = [] diff --git a/packages/db/src/collection-subscription.ts b/packages/db/src/collection-subscription.ts new file mode 100644 index 000000000..dd2e92835 --- /dev/null +++ b/packages/db/src/collection-subscription.ts @@ -0,0 +1,239 @@ +import { + createFilterFunctionFromExpression, + createFilteredCallback, +} from "./change-events.js" +import { ensureIndexForExpression } from "./indexes/auto-index.js" +import { and } from "./query/index.js" +import type { BasicExpression } from "./query/ir.js" +import type { BaseIndex } from "./indexes/base-index.js" +import type { ChangeMessage } from "./types.js" +import type { Collection } from "./collection.js" + +type RequestSnapshotOptions = { + where?: BasicExpression + optimizedOnly?: boolean +} + +type RequestLimitedSnapshotOptions = { + minValue?: any + limit: number +} + +type CollectionSubscriptionOptions = { + /** Pre-compiled expression for filtering changes */ + whereExpression?: BasicExpression + /** Callback to call when the subscription is unsubscribed */ + onUnsubscribe?: () => void +} + +export class CollectionSubscription { + private loadedInitialState = false + + // Flag to indicate that we have sent at least 1 snapshot. + // While `snapshotSent` is false we filter out all changes from subscription to the collection. + private snapshotSent = false + + // Keep track of the keys we've sent (needed for join and orderBy optimizations) + private sentKeys = new Set() + + private filteredCallback: (changes: Array>) => void + + private orderByIndex: BaseIndex | undefined + + constructor( + private collection: Collection, + private callback: (changes: Array>) => void, + private options: CollectionSubscriptionOptions + ) { + // Auto-index for where expressions if enabled + if (options.whereExpression) { + ensureIndexForExpression(options.whereExpression, this.collection) + } + + const callbackWithSentKeysTracking = ( + changes: Array> + ) => { + callback(changes) + this.trackSentKeys(changes) + } + + this.callback = callbackWithSentKeysTracking + + // Create a filtered callback if where clause is provided + this.filteredCallback = options.whereExpression + ? createFilteredCallback(this.callback, options) + : this.callback + } + + setOrderByIndex(index: BaseIndex) { + this.orderByIndex = index + } + + hasLoadedInitialState() { + return this.loadedInitialState + } + + hasSentAtLeastOneSnapshot() { + return this.snapshotSent + } + + emitEvents(changes: Array>) { + const newChanges = this.filterAndFlipChanges(changes) + this.filteredCallback(newChanges) + } + + /** + * Sends the snapshot to the callback. + * Returns a boolean indicating if it succeeded. + * It can only fail if there is no index to fulfill the request + * and the optimizedOnly option is set to true, + * or, the entire state was already loaded. + */ + requestSnapshot(opts?: RequestSnapshotOptions): boolean { + if (this.loadedInitialState) { + // Subscription was deoptimized so we already sent the entire initial state + return false + } + + const stateOpts: RequestSnapshotOptions = { + where: this.options.whereExpression, + optimizedOnly: opts?.optimizedOnly ?? false, + } + + if (opts) { + if (`where` in opts) { + const snapshotWhereExp = opts.where + if (stateOpts.where) { + // Combine the two where expressions + const subWhereExp = stateOpts.where + const combinedWhereExp = and(subWhereExp, snapshotWhereExp) + stateOpts.where = combinedWhereExp + } else { + stateOpts.where = snapshotWhereExp + } + } + } else { + // No options provided so it's loading the entire initial state + this.loadedInitialState = true + } + + const snapshot = this.collection.currentStateAsChanges(stateOpts) + + if (snapshot === undefined) { + // Couldn't load from indexes + return false + } + + // Only send changes that have not been sent yet + const filteredSnapshot = snapshot.filter( + (change) => !this.sentKeys.has(change.key) + ) + + this.snapshotSent = true + this.callback(filteredSnapshot) + return true + } + + /** + * Sends a snapshot that is limited to the first `limit` rows that fulfill the `where` clause and are bigger than `minValue`. + * Requires a range index to be set with `setOrderByIndex` prior to calling this method. + * It uses that range index to load the items in the order of the index. + * Note: it does not send keys that have already been sent before. + */ + requestLimitedSnapshot({ limit, minValue }: RequestLimitedSnapshotOptions) { + if (!limit) throw new Error(`limit is required`) + + if (!this.orderByIndex) { + throw new Error( + `Ordered snapshot was requested but no index was found. You have to call setOrderByIndex before requesting an ordered snapshot.` + ) + } + + const index = this.orderByIndex + const where = this.options.whereExpression + const whereFilterFn = where + ? createFilterFunctionFromExpression(where) + : undefined + + const filterFn = (key: string | number): boolean => { + if (this.sentKeys.has(key)) { + return false + } + + const value = this.collection.get(key) + if (value === undefined) { + return false + } + + return whereFilterFn?.(value) ?? true + } + + let biggestObservedValue = minValue + const changes: Array> = [] + let keys: Array = index.take(limit, minValue, filterFn) + + const valuesNeeded = () => Math.max(limit - changes.length, 0) + const collectionExhausted = () => keys.length === 0 + + while (valuesNeeded() > 0 && !collectionExhausted()) { + for (const key of keys) { + const value = this.collection.get(key)! + changes.push({ + type: `insert`, + key, + value, + }) + biggestObservedValue = value + } + + keys = index.take(valuesNeeded(), biggestObservedValue, filterFn) + } + + this.callback(changes) + } + + /** + * Filters and flips changes for keys that have not been sent yet. + * Deletes are filtered out for keys that have not been sent yet. + * Updates are flipped into inserts for keys that have not been sent yet. + */ + private filterAndFlipChanges(changes: Array>) { + if (this.loadedInitialState) { + // We loaded the entire initial state + // so no need to filter or flip changes + return changes + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!this.sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert`, previousValue: undefined } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + this.sentKeys.add(change.key) + } + newChanges.push(newChange) + } + return newChanges + } + + private trackSentKeys(changes: Array>) { + if (this.loadedInitialState) { + // No need to track sent keys if we loaded the entire state. + // Since we sent everything, all keys must have been observed. + return + } + + for (const change of changes) { + this.sentKeys.add(change.key) + } + } + + unsubscribe() { + this.options.onUnsubscribe?.() + } +} diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 837eea0b7..9996c1710 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -7,7 +7,6 @@ import { } from "./query/builder/ref-proxy" import { BTreeIndex } from "./indexes/btree-index.js" import { IndexProxy, LazyIndexWrapper } from "./indexes/lazy-index.js" -import { ensureIndexForExpression } from "./indexes/auto-index.js" import { createTransaction, getActiveTransaction } from "./transactions" import { CollectionInErrorStateError, @@ -37,17 +36,17 @@ import { UndefinedKeyError, UpdateKeyNotFoundError, } from "./errors" -import { createFilteredCallback, currentStateAsChanges } from "./change-events" import { CollectionEvents } from "./collection-events.js" import type { AllCollectionEvents, CollectionEventHandler, } from "./collection-events.js" +import { currentStateAsChanges } from "./change-events" +import { CollectionSubscription } from "./collection-subscription.js" import type { Transaction } from "./transactions" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { SingleRowRefProxy } from "./query/builder/ref-proxy" import type { - ChangeListener, ChangeMessage, CollectionConfig, CollectionStatus, @@ -240,11 +239,7 @@ export class CollectionImpl< private indexCounter = 0 // Event system - private changeListeners = new Set>() - private changeKeyListeners = new Map< - TKey, - Set> - >() + private changeSubscriptions = new Set() // Utilities namespace // This is populated by createCollection @@ -339,7 +334,7 @@ export class CollectionImpl< // Always notify dependents when markReady is called, after status is set // This ensures live queries get notified when their dependencies become ready - if (this.changeListeners.size > 0) { + if (this.changeSubscriptions.size > 0) { this.emitEmptyReadyEvent() } } @@ -964,15 +959,9 @@ export class CollectionImpl< * This bypasses the normal empty array check in emitEvents */ private emitEmptyReadyEvent(): void { - // Emit empty array directly to all listeners - for (const listener of this.changeListeners) { - listener([]) - } - // Emit to key-specific listeners - for (const [_key, keyListeners] of this.changeKeyListeners) { - for (const listener of keyListeners) { - listener([]) - } + // Emit empty array directly to all subscribers + for (const subscription of this.changeSubscriptions) { + subscription.emitEvents([]) } } @@ -1003,30 +992,8 @@ export class CollectionImpl< if (eventsToEmit.length === 0) return // Emit to all listeners - for (const listener of this.changeListeners) { - listener(eventsToEmit) - } - - // Emit to key-specific listeners - if (this.changeKeyListeners.size > 0) { - // Group changes by key, but only for keys that have listeners - const changesByKey = new Map>>() - for (const change of eventsToEmit) { - if (this.changeKeyListeners.has(change.key)) { - if (!changesByKey.has(change.key)) { - changesByKey.set(change.key, []) - } - changesByKey.get(change.key)!.push(change) - } - } - - // Emit batched changes to each key's listeners - for (const [key, keyChanges] of changesByKey) { - const keyListeners = this.changeKeyListeners.get(key)! - for (const listener of keyListeners) { - listener(keyChanges) - } - } + for (const subscription of this.changeSubscriptions) { + subscription.emitEvents(eventsToEmit) } } @@ -2354,8 +2321,8 @@ export class CollectionImpl< * }) */ public currentStateAsChanges( - options: CurrentStateAsChangesOptions = {} - ): Array> { + options: CurrentStateAsChangesOptions = {} + ): Array> | void { return currentStateAsChanges(this, options) } @@ -2366,23 +2333,23 @@ export class CollectionImpl< * @returns Unsubscribe function - Call this to stop listening for changes * @example * // Basic subscription - * const unsubscribe = collection.subscribeChanges((changes) => { + * const subscription = collection.subscribeChanges((changes) => { * changes.forEach(change => { * console.log(`${change.type}: ${change.key}`, change.value) * }) * }) * - * // Later: unsubscribe() + * // Later: subscription.unsubscribe() * * @example * // Include current state immediately - * const unsubscribe = collection.subscribeChanges((changes) => { + * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { includeInitialState: true }) * * @example * // Subscribe only to changes matching a condition - * const unsubscribe = collection.subscribeChanges((changes) => { + * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { * includeInitialState: true, @@ -2391,7 +2358,7 @@ export class CollectionImpl< * * @example * // Subscribe using a pre-compiled expression - * const unsubscribe = collection.subscribeChanges((changes) => { + * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { * includeInitialState: true, @@ -2400,78 +2367,27 @@ export class CollectionImpl< */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {} - ): () => void { + options: SubscribeChangesOptions = {} + ): CollectionSubscription { // Start sync and track subscriber this.addSubscriber() - // Auto-index for where expressions if enabled - if (options.whereExpression) { - ensureIndexForExpression(options.whereExpression, this) - } - - // Create a filtered callback if where clause is provided - const filteredCallback = - options.where || options.whereExpression - ? createFilteredCallback(callback, options) - : callback + const subscription = new CollectionSubscription(this, callback, { + ...options, + onUnsubscribe: () => { + this.removeSubscriber() + this.changeSubscriptions.delete(subscription) + }, + }) if (options.includeInitialState) { - // First send the current state as changes (filtered if needed) - const initialChanges = this.currentStateAsChanges({ - where: options.where, - whereExpression: options.whereExpression, - }) - filteredCallback(initialChanges) + subscription.requestSnapshot() } // Add to batched listeners - this.changeListeners.add(filteredCallback) + this.changeSubscriptions.add(subscription) - return () => { - this.changeListeners.delete(filteredCallback) - this.removeSubscriber() - } - } - - /** - * Subscribe to changes for a specific key - */ - public subscribeChangesKey( - key: TKey, - listener: ChangeListener, - { includeInitialState = false }: { includeInitialState?: boolean } = {} - ): () => void { - // Start sync and track subscriber - this.addSubscriber() - - if (!this.changeKeyListeners.has(key)) { - this.changeKeyListeners.set(key, new Set()) - } - - if (includeInitialState) { - // First send the current state as changes - listener([ - { - type: `insert`, - key, - value: this.get(key)!, - }, - ]) - } - - this.changeKeyListeners.get(key)!.add(listener) - - return () => { - const listeners = this.changeKeyListeners.get(key) - if (listeners) { - listeners.delete(listener) - if (listeners.size === 0) { - this.changeKeyListeners.delete(key) - } - } - this.removeSubscriber() - } + return subscription } /** diff --git a/packages/db/src/indexes/base-index.ts b/packages/db/src/indexes/base-index.ts index 1b868427a..80544fc90 100644 --- a/packages/db/src/indexes/base-index.ts +++ b/packages/db/src/indexes/base-index.ts @@ -1,6 +1,6 @@ import { compileSingleRowExpression } from "../query/compiler/evaluators.js" import { comparisonFunctions } from "../query/builder/functions.js" -import type { BasicExpression, OrderByDirection } from "../query/ir.js" +import type { BasicExpression } from "../query/ir.js" /** * Operations that indexes can support, imported from available comparison functions @@ -58,8 +58,8 @@ export abstract class BaseIndex< abstract lookup(operation: IndexOperation, value: any): Set abstract take( n: number, - direction?: OrderByDirection, - from?: TKey + from?: TKey, + filterFn?: (key: TKey) => boolean ): Array abstract get keyCount(): number diff --git a/packages/db/src/indexes/btree-index.ts b/packages/db/src/indexes/btree-index.ts index 07edf700c..15b00e8e4 100644 --- a/packages/db/src/indexes/btree-index.ts +++ b/packages/db/src/indexes/btree-index.ts @@ -236,7 +236,7 @@ export class BTreeIndex< * @param from - The item to start from (exclusive). Starts from the smallest item (inclusive) if not provided. * @returns The next n items after the provided key. Returns the first n items if no from item is provided. */ - take(n: number, from?: any): Array { + take(n: number, from?: any, filterFn?: (key: TKey) => boolean): Array { const keysInResult: Set = new Set() const result: Array = [] const nextKey = (k?: any) => this.orderedEntries.nextHigherKey(k) @@ -248,7 +248,7 @@ export class BTreeIndex< const it = keys.values() let ks: TKey | undefined while (result.length < n && (ks = it.next().value)) { - if (!keysInResult.has(ks)) { + if (!keysInResult.has(ks) && (filterFn?.(ks) ?? true)) { result.push(ks) keysInResult.add(ks) } diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 9ea827d61..257408fe8 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -28,6 +28,7 @@ import type { ResultStream, } from "../../types.js" import type { QueryCache, QueryMapping } from "./types.js" +import { CollectionSubscription } from "../../collection-subscription.js" /** * Result of query compilation including both the pipeline and collection-specific WHERE clauses @@ -53,6 +54,7 @@ export function compileQuery( rawQuery: QueryIR, inputs: Record, collections: Record>, + subscriptions: Record, callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, @@ -88,6 +90,7 @@ export function compileQuery( query.from, allInputs, collections, + subscriptions, callbacks, lazyCollections, optimizableOrderByCollections, @@ -120,6 +123,7 @@ export function compileQuery( cache, queryMapping, collections, + subscriptions, callbacks, lazyCollections, optimizableOrderByCollections, @@ -323,6 +327,7 @@ function processFrom( from: CollectionRef | QueryRef, allInputs: Record, collections: Record, + subscriptions: Record, callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, @@ -346,6 +351,7 @@ function processFrom( originalQuery, allInputs, collections, + subscriptions, callbacks, lazyCollections, optimizableOrderByCollections, diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 961937ecd..7cabe8450 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -16,8 +16,9 @@ import { UnsupportedJoinSourceTypeError, UnsupportedJoinTypeError, } from "../../errors.js" -import { findIndexForField } from "../../utils/index-optimization.js" import { ensureIndexForField } from "../../indexes/auto-index.js" +import { PropRef } from "../ir.js" +import { inArray } from "../builder/functions.js" import { compileExpression } from "./evaluators.js" import { compileQuery, followRef } from "./index.js" import type { OrderByOptimizationInfo } from "./order-by.js" @@ -25,7 +26,6 @@ import type { BasicExpression, CollectionRef, JoinClause, - PropRef, QueryIR, QueryRef, } from "../ir.js" @@ -37,7 +37,7 @@ import type { NamespacedRow, } from "../../types.js" import type { QueryCache, QueryMapping } from "./types.js" -import type { BaseIndex } from "../../indexes/base-index.js" +import type { CollectionSubscription } from "../../collection-subscription.js" export type LoadKeysFn = (key: Set) => void export type LazyCollectionCallbacks = { @@ -58,6 +58,7 @@ export function processJoins( cache: QueryCache, queryMapping: QueryMapping, collections: Record, + subscriptions: Record, callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, @@ -76,6 +77,7 @@ export function processJoins( cache, queryMapping, collections, + subscriptions, callbacks, lazyCollections, optimizableOrderByCollections, @@ -99,6 +101,7 @@ function processJoin( cache: QueryCache, queryMapping: QueryMapping, collections: Record, + subscriptions: Record, callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, @@ -113,6 +116,7 @@ function processJoin( joinClause.from, allInputs, collections, + subscriptions, callbacks, lazyCollections, optimizableOrderByCollections, @@ -215,8 +219,6 @@ function processJoin( const activePipeline = activeCollection === `main` ? mainPipeline : joinedPipeline - let index: BaseIndex | undefined - const lazyCollectionJoinExpr = activeCollection === `main` ? (joinedExpr as PropRef) @@ -238,50 +240,33 @@ function processJoin( ) } - let deoptimized = false - const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( tap((data) => { - if (deoptimized) { - return - } + const lazyCollectionSubscription = subscriptions[lazyCollection.id] - // Find the index for the path we join on - // we need to find the index inside the map operator - // because the indexes are only available after the initial sync - // so we can't fetch it during compilation - index ??= findIndexForField( - followRefCollection.indexes, - followRefResult.path - ) - - // The `callbacks` object is passed by the liveQueryCollection to the compiler. - // It contains a function to lazy load keys for each lazy collection - // as well as a function to switch back to a regular collection - // (useful when there's no index for available for lazily loading the collection) - const collectionCallbacks = callbacks[lazyCollection.id] - if (!collectionCallbacks) { + if (!lazyCollectionSubscription) { throw new Error( - `Internal error: callbacks for collection are missing in join pipeline. Make sure the live query collection sets them before running the pipeline.` + `Internal error: subscription for collection is missing in join pipeline. Make sure the live query collection sets the subscription before running the pipeline.` ) } - const { loadKeys, loadInitialState } = collectionCallbacks - - if (index && index.supports(`in`)) { - // Use the index to fetch the PKs of the rows in the lazy collection - // that match this row from the active collection based on the value of the joinKey - const joinKeys = data.getInner().map(([[joinKey]]) => joinKey) - const matchingKeys = index.lookup(`in`, joinKeys) - // Inform the lazy collection that those keys need to be loaded - loadKeys(matchingKeys) - } else { - // We can't optimize the join because there is no index for the join key - // on the lazy collection, so we load the initial state - deoptimized = true - loadInitialState() + if (lazyCollectionSubscription.hasLoadedInitialState()) { + // Entire state was already loaded because we deoptimized the join + return + } + + const joinKeys = data.getInner().map(([[joinKey]]) => joinKey) + const lazyJoinRef = new PropRef(followRefResult.path) + const loaded = lazyCollectionSubscription.requestSnapshot({ + where: inArray(lazyJoinRef, joinKeys), + optimizedOnly: true, + }) + + if (!loaded) { + // Snapshot wasn't sent because it could not be loaded from the indexes + lazyCollectionSubscription.requestSnapshot() } }) ) @@ -397,6 +382,7 @@ function processJoinSource( from: CollectionRef | QueryRef, allInputs: Record, collections: Record, + subscriptions: Record, callbacks: Record, lazyCollections: Set, optimizableOrderByCollections: Record, @@ -420,6 +406,7 @@ function processJoinSource( originalQuery, allInputs, collections, + subscriptions, callbacks, lazyCollections, optimizableOrderByCollections, diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 26bfd6c2f..8f67cb644 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -2,6 +2,7 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" import { CollectionSubscriber } from "./collection-subscriber.js" +import type { CollectionSubscription } from "../../collection-subscription.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" import type { OrderByOptimizationInfo } from "../compiler/order-by.js" import type { Collection } from "../../collection.js" @@ -41,6 +42,8 @@ export class CollectionConfigBuilder< private readonly compare?: (val1: TResult, val2: TResult) => number + private isGraphRunning = false + private graphCache: D2 | undefined private inputsCache: Record> | undefined private pipelineCache: ResultStream | undefined @@ -48,6 +51,8 @@ export class CollectionConfigBuilder< | Map> | undefined + // Map of collection ID to subscription + readonly subscriptions: Record = {} // Map of collection IDs to functions that load keys for that lazy collection lazyCollectionsCallbacks: Record = {} // Set of collection IDs that are lazy collections @@ -104,25 +109,41 @@ export class CollectionConfigBuilder< syncState: FullSyncState, callback?: () => boolean ) { - const { begin, commit, markReady } = config + if (this.isGraphRunning) { + // no nested runs of the graph + // which is possible if the `callback` + // would call `maybeRunGraph` e.g. after it has loaded some more data + return + } - // We only run the graph if all the collections are ready - if ( - this.allCollectionsReadyOrInitialCommit() && - syncState.subscribedToAllCollections - ) { - syncState.graph.run() - const ready = callback?.() ?? true - // On the initial run, we may need to do an empty commit to ensure that - // the collection is initialized - if (syncState.messagesCount === 0) { - begin() - commit() - } - // Mark the collection as ready after the first successful run - if (ready && this.allCollectionsReady()) { - markReady() + this.isGraphRunning = true + + try { + const { begin, commit, markReady } = config + + // We only run the graph if all the collections are ready + if ( + this.allCollectionsReadyOrInitialCommit() && + syncState.subscribedToAllCollections + ) { + while (syncState.graph.pendingWork()) { + syncState.graph.run() + callback?.() + } + + // On the initial run, we may need to do an empty commit to ensure that + // the collection is initialized + if (syncState.messagesCount === 0) { + begin() + commit() + } + // Mark the collection as ready after the first successful run + if (this.allCollectionsReady()) { + markReady() + } } + } finally { + this.isGraphRunning = false } } @@ -189,6 +210,7 @@ export class CollectionConfigBuilder< this.query, this.inputsCache as Record, this.collections, + this.subscriptions, this.lazyCollectionsCallbacks, this.lazyCollections, this.optimizableOrderByCollections @@ -320,17 +342,21 @@ export class CollectionConfigBuilder< syncState, this ) - collectionSubscriber.subscribe() - const loadMore = - collectionSubscriber.loadMoreIfNeeded.bind(collectionSubscriber) + const subscription = collectionSubscriber.subscribe() + this.subscriptions[collectionId] = subscription + + const loadMore = collectionSubscriber.loadMoreIfNeeded.bind( + collectionSubscriber, + subscription + ) return loadMore } ) const loadMoreDataCallback = () => { - loaders.map((loader) => loader()) // .every((doneLoading) => doneLoading) + loaders.map((loader) => loader()) return true } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index bb45a0f35..319b8e09e 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -1,5 +1,4 @@ import { MultiSet } from "@tanstack/db-ivm" -import { createFilterFunctionFromExpression } from "../../change-events.js" import { convertToBasicExpression } from "../compiler/expressions.js" import type { FullSyncState } from "./types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" @@ -8,14 +7,12 @@ import type { ChangeMessage, SyncConfig } from "../../types.js" import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression } from "../ir.js" import type { CollectionConfigBuilder } from "./collection-config-builder.js" +import type { CollectionSubscription } from "../../collection-subscription.js" export class CollectionSubscriber< TContext extends Context, TResult extends object = GetResult, > { - // Keep track of the keys we've sent (needed for join and orderBy optimizations) - private sentKeys = new Set() - // Keep track of the biggest value we've sent so far (needed for orderBy optimization) private biggest: any = undefined @@ -27,7 +24,7 @@ export class CollectionSubscriber< private collectionConfigBuilder: CollectionConfigBuilder ) {} - subscribe() { + subscribe(): CollectionSubscription { const collectionAlias = findCollectionAlias( this.collectionId, this.collectionConfigBuilder.query @@ -43,7 +40,7 @@ export class CollectionSubscriber< if (whereExpression) { // Use index optimization for this collection - this.subscribeToChanges(whereExpression) + return this.subscribeToChanges(whereExpression) } else { // This should not happen - if we have a whereClause but can't create whereExpression, // it indicates a bug in our optimization logic @@ -54,25 +51,34 @@ export class CollectionSubscriber< } } else { // No WHERE clause for this collection, use regular subscription - this.subscribeToChanges() + return this.subscribeToChanges() } } private subscribeToChanges(whereExpression?: BasicExpression) { - let unsubscribe: () => void - if (this.collectionConfigBuilder.lazyCollections.has(this.collectionId)) { - unsubscribe = this.subscribeToMatchingChanges(whereExpression) - } else if ( + let subscription: CollectionSubscription + if ( Object.hasOwn( this.collectionConfigBuilder.optimizableOrderByCollections, this.collectionId ) ) { - unsubscribe = this.subscribeToOrderedChanges(whereExpression) + subscription = this.subscribeToOrderedChanges(whereExpression) } else { - unsubscribe = this.subscribeToAllChanges(whereExpression) + // If the collection is lazy then we should not include the initial state + const includeInitialState = + !this.collectionConfigBuilder.lazyCollections.has(this.collectionId) + + subscription = this.subscribeToMatchingChanges( + whereExpression, + includeInitialState + ) + } + const unsubscribe = () => { + subscription.unsubscribe() } this.syncState.unsubscribeCallbacks.add(unsubscribe) + return subscription } private sendChangesToPipeline( @@ -101,153 +107,38 @@ export class CollectionSubscriber< ) } - // Wraps the sendChangesToPipeline function - // in order to turn `update`s into `insert`s - // for keys that have not been sent to the pipeline yet - // and filter out deletes for keys that have not been sent - private sendVisibleChangesToPipeline = ( - changes: Array>, - loadedInitialState: boolean - ) => { - if (loadedInitialState) { - // There was no index for the join key - // so we loaded the initial state - // so we can safely assume that the pipeline has seen all keys - return this.sendChangesToPipeline(changes) - } - - const newChanges = [] - for (const change of changes) { - let newChange = change - if (!this.sentKeys.has(change.key)) { - if (change.type === `update`) { - newChange = { ...change, type: `insert` } - } else if (change.type === `delete`) { - // filter out deletes for keys that have not been sent - continue - } - this.sentKeys.add(change.key) - } - newChanges.push(newChange) - } - - return this.sendChangesToPipeline(newChanges) - } - - private loadKeys( - keys: Iterable, - filterFn: (item: object) => boolean - ) { - const changes: Array> = [] - for (const key of keys) { - // Only load the key once - if (this.sentKeys.has(key)) continue - - const value = this.collection.get(key) - if (value !== undefined && filterFn(value)) { - this.sentKeys.add(key) - changes.push({ type: `insert`, key, value }) - } - } - if (changes.length > 0) { - this.sendChangesToPipeline(changes) - } - } - - private subscribeToAllChanges( - whereExpression: BasicExpression | undefined - ) { - const sendChangesToPipeline = this.sendChangesToPipeline.bind(this) - const unsubscribe = this.collection.subscribeChanges( - sendChangesToPipeline, - { - includeInitialState: true, - ...(whereExpression ? { whereExpression } : undefined), - } - ) - return unsubscribe - } - private subscribeToMatchingChanges( - whereExpression: BasicExpression | undefined + whereExpression: BasicExpression | undefined, + includeInitialState: boolean = false ) { - // Flag to indicate we have send to whole initial state of the collection - // to the pipeline, this is set when there are no indexes that can be used - // to filter the changes and so the whole state was requested from the collection - let loadedInitialState = false - - // Flag to indicate that we have started sending changes to the pipeline. - // This is set to true by either the first call to `loadKeys` or when the - // query requests the whole initial state in `loadInitialState`. - // Until that point we filter out all changes from subscription to the collection. - let sendChanges = false - - const sendVisibleChanges = ( + const sendChanges = ( changes: Array> ) => { - // We filter out changes when sendChanges is false to ensure that we don't send - // any changes from the live subscription until the join operator requests either - // the initial state or its first key. This is needed otherwise it could receive - // changes which are then later subsumed by the initial state (and that would - // lead to weird bugs due to the data being received twice). - this.sendVisibleChangesToPipeline( - sendChanges ? changes : [], - loadedInitialState - ) + this.sendChangesToPipeline(changes) } - const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { + const subscription = this.collection.subscribeChanges(sendChanges, { + includeInitialState, whereExpression, }) - // Create a function that loads keys from the collection - // into the query pipeline on demand - const filterFn = whereExpression - ? createFilterFunctionFromExpression(whereExpression) - : () => true - const loadKs = (keys: Set) => { - sendChanges = true - return this.loadKeys(keys, filterFn) - } - - // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map - // This is used by the join operator to dynamically load matching keys from the lazy collection - // or to get the full initial state of the collection if there's no index for the join key - this.collectionConfigBuilder.lazyCollectionsCallbacks[this.collectionId] = { - loadKeys: loadKs, - loadInitialState: () => { - // Make sure we only load the initial state once - if (loadedInitialState) return - loadedInitialState = true - sendChanges = true - - const changes = this.collection.currentStateAsChanges({ - whereExpression, - }) - this.sendChangesToPipeline(changes) - }, - } - return unsubscribe + return subscription } private subscribeToOrderedChanges( whereExpression: BasicExpression | undefined ) { - const { offset, limit, comparator, dataNeeded } = + const { offset, limit, comparator, dataNeeded, index } = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! - // Load the first `offset + limit` values from the index - // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - this.loadNextItems(offset + limit) - const sendChangesInRange = ( changes: Iterable> ) => { // Split live updates into a delete of the old value and an insert of the new value // and filter out changes that are bigger than the biggest value we've sent so far - // because they can't affect the topK + // because they can't affect the topK (and if later we need more data, we will dynamically load more data) const splittedChanges = splitUpdates(changes) let filteredChanges = splittedChanges if (dataNeeded!() === 0) { @@ -261,25 +152,31 @@ export class CollectionSubscriber< this.biggest ) } - this.sendChangesToPipeline( - filteredChanges, - this.loadMoreIfNeeded.bind(this) - ) + + this.sendChangesToPipelineWithTracking(filteredChanges, subscription) } // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far // values that are bigger don't need to be sent because they can't affect the topK - const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, { + const subscription = this.collection.subscribeChanges(sendChangesInRange, { whereExpression, }) - return unsubscribe + subscription.setOrderByIndex(index) + + // Load the first `offset + limit` values from the index + // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ + subscription.requestLimitedSnapshot({ + limit: offset + limit, + }) + + return subscription } // This function is called by maybeRunGraph // after each iteration of the query pipeline // to ensure that the orderBy operator has enough data to work with - loadMoreIfNeeded() { + loadMoreIfNeeded(subscription: CollectionSubscription) { const orderByInfo = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId @@ -287,7 +184,7 @@ export class CollectionSubscriber< if (!orderByInfo) { // This query has no orderBy operator - // so there's no data to load, just return true + // so there's no data to load return true } @@ -304,32 +201,31 @@ export class CollectionSubscriber< // `dataNeeded` probes the orderBy operator to see if it needs more data // if it needs more data, it returns the number of items it needs const n = dataNeeded() - let noMoreNextItems = false if (n > 0) { - const loadedItems = this.loadNextItems(n) - noMoreNextItems = loadedItems === 0 + this.loadNextItems(n, subscription) } - - // Indicate that we're done loading data if we didn't need to load more data - // or there's no more data to load - return n === 0 || noMoreNextItems + return true } private sendChangesToPipelineWithTracking( - changes: Iterable> + changes: Iterable>, + subscription: CollectionSubscription ) { const { comparator } = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! const trackedChanges = this.trackSentValues(changes, comparator) - this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this)) + this.sendChangesToPipeline( + trackedChanges, + this.loadMoreIfNeeded.bind(this, subscription) + ) } // Loads the next `n` items from the collection // starting from the biggest item it has sent - private loadNextItems(n: number) { - const { valueExtractorForRawRow, index } = + private loadNextItems(n: number, subscription: CollectionSubscription) { + const { valueExtractorForRawRow } = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! @@ -338,13 +234,10 @@ export class CollectionSubscriber< ? valueExtractorForRawRow(biggestSentRow) : biggestSentRow // Take the `n` items after the biggest sent value - const nextOrderedKeys = index.take(n, biggestSentValue) - const nextInserts: Array> = - nextOrderedKeys.map((key) => { - return { type: `insert`, key, value: this.collection.get(key) } - }) - this.sendChangesToPipelineWithTracking(nextInserts) - return nextInserts.length + subscription.requestLimitedSnapshot({ + limit: n, + minValue: biggestSentValue, + }) } private getWhereClauseFromAlias( @@ -363,8 +256,6 @@ export class CollectionSubscriber< comparator: (a: any, b: any) => number ) { for (const change of changes) { - this.sentKeys.add(change.key) - if (!this.biggest) { this.biggest = change.value } else if (comparator(this.biggest, change.value) < 0) { @@ -427,7 +318,11 @@ function sendChangesToInput( multiSetArray.push([[key, change.value], -1]) } } - input.sendData(new MultiSet(multiSetArray)) + + if (multiSetArray.length !== 0) { + input.sendData(new MultiSet(multiSetArray)) + } + return multiSetArray.length } @@ -463,7 +358,7 @@ function* filterChanges< } /** - * Filters changes to only include those that are smaller than the provided max value + * Filters changes to only include those that are smaller or equal to the provided max value * @param changes - Iterable of changes to filter * @param comparator - Comparator function to use for filtering * @param maxValue - Range to filter changes within (range boundaries are exclusive) diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 52e67dc29..6cdb41f88 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -2,9 +2,7 @@ import type { IStreamBuilder } from "@tanstack/db-ivm" import type { Collection } from "./collection" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { Transaction } from "./transactions" - -import type { SingleRowRefProxy } from "./query/builder/ref-proxy" -import type { BasicExpression } from "./query/ir.js" +import type { BasicExpression, OrderBy } from "./query/ir.js" /** * Helper type to extract the output type from a standard schema @@ -541,27 +539,28 @@ export type NamespacedAndKeyedStream = IStreamBuilder /** * Options for subscribing to collection changes */ -export interface SubscribeChangesOptions< - T extends object = Record, -> { +export interface SubscribeChangesOptions { /** Whether to include the current state as initial changes */ includeInitialState?: boolean - /** Filter changes using a where expression */ - where?: (row: SingleRowRefProxy) => any /** Pre-compiled expression for filtering changes */ whereExpression?: BasicExpression } +export interface SubscribeChangesSnapshotOptions + extends Omit { + orderBy?: OrderBy + limit?: number +} + /** * Options for getting current state as changes */ -export interface CurrentStateAsChangesOptions< - T extends object = Record, -> { - /** Filter the current state using a where expression */ - where?: (row: SingleRowRefProxy) => any +export interface CurrentStateAsChangesOptions { /** Pre-compiled expression for filtering the current state */ - whereExpression?: BasicExpression + where?: BasicExpression + orderBy?: OrderBy + limit?: number + optimizedOnly?: boolean } /** diff --git a/packages/db/tests/collection-auto-index.test.ts b/packages/db/tests/collection-auto-index.test.ts index a4d464682..118b4a84d 100644 --- a/packages/db/tests/collection-auto-index.test.ts +++ b/packages/db/tests/collection-auto-index.test.ts @@ -16,6 +16,7 @@ import { expectIndexUsage, withIndexTracking, } from "./utils" +import { PropRef } from "../src/query/ir" // Global row proxy for expressions const row = createSingleRowRefProxy() @@ -104,7 +105,7 @@ describe(`Collection Auto-Indexing`, () => { // Subscribe with a where expression const changes: Array = [] - const unsubscribe = autoIndexCollection.subscribeChanges( + const subscription = autoIndexCollection.subscribeChanges( (items) => { changes.push(...items) }, @@ -117,7 +118,7 @@ describe(`Collection Auto-Indexing`, () => { // Should still have no indexes after subscription expect(autoIndexCollection.indexes.size).toBe(0) - unsubscribe() + subscription.unsubscribe() }) it(`should create auto-indexes by default when autoIndex is not specified`, async () => { @@ -146,7 +147,7 @@ describe(`Collection Auto-Indexing`, () => { // Subscribe with a where expression const changes: Array = [] - const unsubscribe = autoIndexCollection.subscribeChanges( + const subscription = autoIndexCollection.subscribeChanges( (items) => { changes.push(...items) }, @@ -163,7 +164,7 @@ describe(`Collection Auto-Indexing`, () => { expect(autoIndex.expression.type).toBe(`ref`) expect((autoIndex.expression as any).path).toEqual([`status`]) - unsubscribe() + subscription.unsubscribe() }) it(`should create auto-indexes for simple where expressions when autoIndex is "eager"`, async () => { @@ -193,7 +194,7 @@ describe(`Collection Auto-Indexing`, () => { // Subscribe with a where expression const changes: Array = [] - const unsubscribe = autoIndexCollection.subscribeChanges( + const subscription = autoIndexCollection.subscribeChanges( (items) => { changes.push(...items) }, @@ -210,7 +211,7 @@ describe(`Collection Auto-Indexing`, () => { expect(autoIndex.expression.type).toBe(`ref`) expect((autoIndex.expression as any).path).toEqual([`status`]) - unsubscribe() + subscription.unsubscribe() }) it(`should create auto-indexes for transformed fields of subqueries when autoIndex is "eager"`, async () => {}) @@ -238,15 +239,15 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe with the same where expression multiple times - const unsubscribe1 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription1 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: eq(row.status, `active`), }) - const unsubscribe2 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription2 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: eq(row.status, `inactive`), }) - const unsubscribe3 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription3 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: eq(row.status, `pending`), }) @@ -257,9 +258,9 @@ describe(`Collection Auto-Indexing`, () => { expect(autoIndex.expression.type).toBe(`ref`) expect((autoIndex.expression as any).path).toEqual([`status`]) - unsubscribe1() - unsubscribe2() - unsubscribe3() + subscription1.unsubscribe() + subscription2.unsubscribe() + subscription3.unsubscribe() }) it(`should create auto-indexes for different supported operations`, async () => { @@ -285,15 +286,15 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe with different operations on different fields - const unsubscribe1 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription1 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: eq(row.status, `active`), }) - const unsubscribe2 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription2 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: gt(row.age, 25), }) - const unsubscribe3 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription3 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: lte(row.score, 90), }) @@ -308,9 +309,9 @@ describe(`Collection Auto-Indexing`, () => { expect(indexPaths).toContainEqual([`age`]) expect(indexPaths).toContainEqual([`score`]) - unsubscribe1() - unsubscribe2() - unsubscribe3() + subscription1.unsubscribe() + subscription2.unsubscribe() + subscription3.unsubscribe() }) it(`should create auto-indexes for AND expressions`, async () => { @@ -336,7 +337,7 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe with AND expression that should create indexes for both fields - const unsubscribe1 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: and(eq(row.status, `active`), gt(row.age, 25)), }) @@ -350,7 +351,7 @@ describe(`Collection Auto-Indexing`, () => { expect(indexPaths).toContainEqual([`status`]) expect(indexPaths).toContainEqual([`age`]) - unsubscribe1() + subscription.unsubscribe() }) it(`should not create auto-indexes for OR expressions`, async () => { @@ -376,14 +377,14 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe with OR expression that shouldn't create auto-indexes - const unsubscribe1 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: or(eq(row.status, `active`), eq(row.status, `pending`)), }) // Should not have created any auto-indexes for OR expressions expect(autoIndexCollection.indexes.size).toBe(0) - unsubscribe1() + subscription.unsubscribe() }) it(`should create auto-indexes for complex AND expressions with multiple fields`, async () => { @@ -409,7 +410,7 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe with complex AND expression that should create indexes for all fields - const unsubscribe1 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: and( eq(row.status, `active`), gt(row.age, 25), @@ -428,7 +429,7 @@ describe(`Collection Auto-Indexing`, () => { expect(indexPaths).toContainEqual([`age`]) expect(indexPaths).toContainEqual([`score`]) - unsubscribe1() + subscription.unsubscribe() }) it(`should create auto-indexes for join key on lazy collection when joining`, async () => { @@ -685,19 +686,19 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe with unsupported operations - const unsubscribe1 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription1 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: gt(length(row.name), 3), }) - const unsubscribe2 = autoIndexCollection.subscribeChanges(() => {}, { + const subscription2 = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: not(eq(row.status, `active`)), }) // Should not have created any auto-indexes for unsupported operations expect(autoIndexCollection.indexes.size).toBe(0) - unsubscribe1() - unsubscribe2() + subscription1.unsubscribe() + subscription2.unsubscribe() }) it(`should use auto-created indexes for query optimization`, async () => { @@ -723,7 +724,7 @@ describe(`Collection Auto-Indexing`, () => { await autoIndexCollection.stateWhenReady() // Subscribe to create auto-index - const unsubscribe = autoIndexCollection.subscribeChanges(() => {}, { + const subscription = autoIndexCollection.subscribeChanges(() => {}, { whereExpression: eq(row.status, `active`), }) @@ -733,8 +734,8 @@ describe(`Collection Auto-Indexing`, () => { // Test that the auto-index is used for queries withIndexTracking(autoIndexCollection, (tracker) => { const result = autoIndexCollection.currentStateAsChanges({ - whereExpression: eq(row.status, `active`), - }) + where: eq(new PropRef([`status`]), `active`), + })! expect(result.length).toBeGreaterThan(0) @@ -747,6 +748,6 @@ describe(`Collection Auto-Indexing`, () => { }) }) - unsubscribe() + subscription.unsubscribe() }) }) diff --git a/packages/db/tests/collection-events.test.ts b/packages/db/tests/collection-events.test.ts index d49caef8c..518f3b7f3 100644 --- a/packages/db/tests/collection-events.test.ts +++ b/packages/db/tests/collection-events.test.ts @@ -51,7 +51,7 @@ describe(`Collection Events System`, () => { const subscribersChangeListener = vi.fn() collection.on(`subscribers:change`, subscribersChangeListener) - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) expect(subscribersChangeListener).toHaveBeenCalledWith({ type: `subscribers:change`, @@ -60,7 +60,7 @@ describe(`Collection Events System`, () => { subscriberCount: 1, }) - unsubscribe() + subscription.unsubscribe() }) }) @@ -102,7 +102,7 @@ describe(`Collection Events System`, () => { collection.on(`subscribers:change`, subscribersListener) collection.startSyncImmediate() - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) expect(statusListener.mock.calls[0]?.[0]).toMatchObject({ type: `status:change`, @@ -118,7 +118,7 @@ describe(`Collection Events System`, () => { subscriberCount: expect.any(Number), }) - unsubscribe() + subscription.unsubscribe() }) }) diff --git a/packages/db/tests/collection-indexes.test.ts b/packages/db/tests/collection-indexes.test.ts index 2c1db86fa..913cfae3f 100644 --- a/packages/db/tests/collection-indexes.test.ts +++ b/packages/db/tests/collection-indexes.test.ts @@ -13,6 +13,7 @@ import { lte, or, } from "../src/query/builder/functions" +import { PropRef } from "../src/query/ir" import { expectIndexUsage, withIndexTracking } from "./utils" import type { Collection } from "../src/collection" import type { MutationFn, PendingMutation } from "../src/types" @@ -192,7 +193,7 @@ describe(`Collection Indexes`, () => { const changes: Array = [] // Subscribe to all changes - const unsubscribe = collection.subscribeChanges((items) => { + const subscription = collection.subscribeChanges((items) => { changes.push(...items) }) @@ -217,15 +218,23 @@ describe(`Collection Indexes`, () => { expect(changes[0]?.type).toBe(`insert`) expect(changes[0]?.value.name).toBe(`Frank`) - unsubscribe() + subscription.unsubscribe() }) it(`should reflect updates in collection state and subscriptions`, async () => { const changes: Array = [] - const unsubscribe = collection.subscribeChanges((items) => { - changes.push(...items) - }) + const subscription = collection.subscribeChanges( + (items) => { + changes.push(...items) + }, + { + includeInitialState: true, + } + ) + + // Clear the changes array + changes.length = 0 const tx = createTransaction({ mutationFn }) tx.mutate(() => @@ -246,16 +255,53 @@ describe(`Collection Indexes`, () => { expect(changes[0]?.type).toBe(`update`) expect(changes[0]?.value.status).toBe(`inactive`) - unsubscribe() + subscription.unsubscribe() }) - it(`should reflect deletions in collection state and subscriptions`, async () => { + it(`should send insert to subscription when updating collection state that has not yet been sent over the subscription`, async () => { const changes: Array = [] - const unsubscribe = collection.subscribeChanges((items) => { + const subscription = collection.subscribeChanges((items) => { changes.push(...items) }) + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(`1`, (draft) => { + draft.status = `inactive` + draft.age = 26 + }) + ) + await tx.isPersisted.promise + + // Updated item should be in collection state + const updatedItem = collection.get(`1`) + expect(updatedItem?.status).toBe(`inactive`) + expect(updatedItem?.age).toBe(26) + + // Should trigger subscription + expect(changes).toHaveLength(1) + expect(changes[0]?.type).toBe(`insert`) + expect(changes[0]?.value.status).toBe(`inactive`) + + subscription.unsubscribe() + }) + + it(`should reflect deletions in collection state and subscriptions`, async () => { + const changes: Array = [] + + const subscription = collection.subscribeChanges( + (items) => { + changes.push(...items) + }, + { + includeInitialState: true, + } + ) + + // Clear the changes + changes.length = 0 + const tx = createTransaction({ mutationFn }) tx.mutate(() => collection.delete(`1`)) await tx.isPersisted.promise @@ -275,21 +321,46 @@ describe(`Collection Indexes`, () => { ) expect(deleteEvents.length).toBe(changes.length) // All events should be the same delete - unsubscribe() + subscription.unsubscribe() + }) + + it(`should filter out deletions in collection state if that key was not sent by the subscription`, async () => { + const changes: Array = [] + + const subscription = collection.subscribeChanges((items) => { + changes.push(...items) + }) + + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.delete(`1`)) + await tx.isPersisted.promise + + // Item should be removed from collection state + expect(collection.size).toBe(4) + expect(collection.get(`1`)).toBeUndefined() + + // Should trigger subscription (may be called multiple times in test environment) + expect(changes.length).toBeGreaterThanOrEqual(0) + + subscription.unsubscribe() }) it(`should handle filtered subscriptions correctly with mutations`, async () => { const activeChanges: Array = [] - const unsubscribe = collection.subscribeChanges( + const subscription = collection.subscribeChanges( (items) => { activeChanges.push(...items) }, { - where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), + includeInitialState: true, } ) + // Clear the changes + activeChanges.length = 0 + // Change inactive item to active (should trigger) const tx1 = createTransaction({ mutationFn }) tx1.mutate(() => @@ -318,7 +389,49 @@ describe(`Collection Indexes`, () => { expect(activeChanges[0]?.key).toBe(`1`) expect(activeChanges[0]?.value.status).toBe(`active`) // Should be the previous value - unsubscribe() + subscription.unsubscribe() + }) + + it(`should not send delete change on move-out when the key was never sent to the subscribers`, async () => { + const activeChanges: Array = [] + + const subscription = collection.subscribeChanges( + (items) => { + activeChanges.push(...items) + }, + { + whereExpression: eq(new PropRef([`status`]), `active`), + } + ) + + // Change inactive item to active (should trigger) + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.update(`2`, (draft) => { + draft.status = `active` + }) + ) + await tx1.isPersisted.promise + + expect(activeChanges).toHaveLength(1) + expect(activeChanges[0]?.value.name).toBe(`Bob`) + + // Change active item to inactive (should trigger delete event for item leaving filter) + activeChanges.length = 0 + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.update(`1`, (draft) => { + draft.status = `inactive` + }) + ) + await tx2.isPersisted.promise + + // Subscriber shoiuld not receive any changes + // because it is not aware of that key + // so it should also not receive the delete of that key + expect(activeChanges).toHaveLength(0) + + subscription.unsubscribe() }) }) @@ -330,8 +443,8 @@ describe(`Collection Indexes`, () => { it(`should perform equality queries`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => eq(row.age, 25), - }) + where: eq(new PropRef([`age`]), 25), + })! expect(result).toHaveLength(1) expect(result[0]?.value.name).toBe(`Alice`) @@ -349,8 +462,8 @@ describe(`Collection Indexes`, () => { it(`should perform greater than queries`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => gt(row.age, 28), - }) + where: gt(new PropRef([`age`]), 28), + })! expect(result).toHaveLength(2) const names = result.map((r) => r.value.name).sort() @@ -369,8 +482,8 @@ describe(`Collection Indexes`, () => { it(`should perform greater than or equal queries`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => gte(row.age, 28), - }) + where: gte(new PropRef([`age`]), 28), + })! expect(result).toHaveLength(3) const names = result.map((r) => r.value.name).sort() @@ -389,8 +502,8 @@ describe(`Collection Indexes`, () => { it(`should perform less than queries`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => lt(row.age, 28), - }) + where: lt(new PropRef([`age`]), 28), + })! expect(result).toHaveLength(2) const names = result.map((r) => r.value.name).sort() @@ -409,8 +522,8 @@ describe(`Collection Indexes`, () => { it(`should perform less than or equal queries`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => lte(row.age, 28), - }) + where: lte(new PropRef([`age`]), 28), + })! expect(result).toHaveLength(3) const names = result.map((r) => r.value.name).sort() @@ -431,8 +544,8 @@ describe(`Collection Indexes`, () => { // This should work but use full scan since it's not a simple comparison // Using a complex expression that can't be optimized with indexes const result = collection.currentStateAsChanges({ - where: (row) => gt(length(row.name), 3), - }) + where: gt(length(new PropRef([`name`])), 3), + })! expect(result).toHaveLength(3) // Alice, Charlie, Diana (names longer than 3 chars) const names = result.map((r) => r.value.name).sort() @@ -452,8 +565,8 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // This should use index optimization const result = collection.currentStateAsChanges({ - where: (row) => eq(row.age, 25), - }) + where: eq(new PropRef([`age`]), 25), + })! expect(result).toHaveLength(1) expect(result[0]?.value.name).toBe(`Alice`) @@ -481,13 +594,13 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Test multiple range operations const eqResult = collection.currentStateAsChanges({ - where: (row) => eq(row.age, 25), + where: eq(new PropRef([`age`]), 25), }) const gtResult = collection.currentStateAsChanges({ - where: (row) => gt(row.age, 30), + where: gt(new PropRef([`age`]), 30), }) const lteResult = collection.currentStateAsChanges({ - where: (row) => lte(row.age, 28), + where: lte(new PropRef([`age`]), 28), }) expect(eqResult).toHaveLength(1) @@ -523,8 +636,8 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // This should fall back to full scan const result = collection.currentStateAsChanges({ - where: (row) => gt(length(row.name), 3), - }) + where: gt(length(new PropRef([`name`])), 3), + })! expect(result).toHaveLength(3) // Alice, Charlie, Diana @@ -546,7 +659,7 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Query on a field without an index (status) const result = collection.currentStateAsChanges({ - where: (row) => eq(row.status, `active`), + where: eq(new PropRef([`status`]), `active`), }) expect(result).toHaveLength(3) // Alice, Charlie, Eve @@ -572,8 +685,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Test the key case: range query with AND const result = collection.currentStateAsChanges({ - where: (row) => and(gt(row.age, 25), lt(row.age, 35)), - }) + where: and( + gt(new PropRef([`age`]), 25), + lt(new PropRef([`age`]), 35) + ), + })! expect(result).toHaveLength(2) // Bob (30), Diana (28) const names = result.map((r) => r.value.name).sort() @@ -601,8 +717,11 @@ describe(`Collection Indexes`, () => { it(`should optimize AND queries with multiple field conditions`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => and(eq(row.status, `active`), gte(row.age, 25)), - }) + where: and( + eq(new PropRef([`status`]), `active`), + gte(new PropRef([`age`]), 25) + ), + })! expect(result).toHaveLength(2) // Alice (25, active), Charlie (35, active) const names = result.map((r) => r.value.name).sort() @@ -636,8 +755,8 @@ describe(`Collection Indexes`, () => { it(`should optimize OR queries using indexes`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => or(eq(row.age, 25), eq(row.age, 35)), - }) + where: or(eq(new PropRef([`age`]), 25), eq(new PropRef([`age`]), 35)), + })! expect(result).toHaveLength(2) // Alice (25), Charlie (35) const names = result.map((r) => r.value.name).sort() @@ -671,8 +790,8 @@ describe(`Collection Indexes`, () => { it(`should optimize inArray queries using indexes`, () => { withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => inArray(row.status, [`active`, `pending`]), - }) + where: inArray(new PropRef([`status`]), [`active`, `pending`]), + })! expect(result).toHaveLength(4) // Alice, Charlie, Eve (active), Diana (pending) const names = result.map((r) => r.value.name).sort() @@ -701,12 +820,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // (age >= 25 AND age <= 30) OR status = 'pending' const result = collection.currentStateAsChanges({ - where: (row) => - or( - and(gte(row.age, 25), lte(row.age, 30)), - eq(row.status, `pending`) - ), - }) + where: or( + and(gte(new PropRef([`age`]), 25), lte(new PropRef([`age`]), 30)), + eq(new PropRef([`status`]), `pending`) + ), + })! expect(result).toHaveLength(3) // Alice (25), Bob (30), Diana (28, pending) const names = result.map((r) => r.value.name).sort() @@ -741,12 +859,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Mix of optimizable and non-optimizable conditions const result = collection.currentStateAsChanges({ - where: (row) => - and( - eq(row.status, `active`), // Can optimize with index - gt(row.age, 24) // Can also optimize - will be AND combined - ), - }) + where: and( + eq(new PropRef([`status`]), `active`), // Can optimize with index + gt(new PropRef([`age`]), 24) // Can also optimize - will be AND combined + ), + })! expect(result).toHaveLength(2) // Alice (25), Charlie (35) - both active and age > 24 const names = result.map((r) => r.value.name).sort() @@ -766,12 +883,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Query on a field without an index (name) const result = collection.currentStateAsChanges({ - where: (row) => - and( - eq(row.age, 25), // Has index - eq(row.name, `Alice`) // No index on name - ), - }) + where: and( + eq(new PropRef([`age`]), 25), // Has index + eq(new PropRef([`name`]), `Alice`) // No index on name + ), + })! expect(result).toHaveLength(1) // Alice (25, name Alice) expect(result[0]?.value.name).toBe(`Alice`) @@ -790,8 +906,8 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Only complex expressions that can't be optimized const result = collection.currentStateAsChanges({ - where: (row) => gt(length(row.name), 3), - }) + where: gt(length(new PropRef([`name`])), 3), + })! expect(result).toHaveLength(3) // Alice, Charlie, Diana (names > 3 chars) const names = result.map((r) => r.value.name).sort() @@ -811,12 +927,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Complex expression involving function calls - no simple field comparisons const result = collection.currentStateAsChanges({ - where: (row) => - and( - gt(length(row.name), 4), // Complex - can't optimize (Alice=5, Charlie=7, Diana=5) - gt(length(row.status), 6) // Complex - can't optimize (only "inactive" = 8 > 6) - ), - }) + where: and( + gt(length(new PropRef([`name`])), 4), // Complex - can't optimize (Alice=5, Charlie=7, Diana=5) + gt(length(new PropRef([`status`])), 6) // Complex - can't optimize (only "inactive" = 8 > 6) + ), + })! expect(result).toHaveLength(1) // Only Diana has name>4 AND status>6 (Diana name=5, status="pending"=7) const names = result.map((r) => r.value.name).sort() @@ -836,12 +951,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // OR with complex conditions that can't be optimized const result = collection.currentStateAsChanges({ - where: (row) => - or( - gt(length(row.name), 6), // Complex - can't optimize (only Charlie has name length 7 > 6) - gt(length(row.status), 7) // Complex - can't optimize (only Bob has status "inactive" = 8 > 7) - ), - }) + where: or( + gt(length(new PropRef([`name`])), 6), // Complex - can't optimize (only Charlie has name length 7 > 6) + gt(length(new PropRef([`status`])), 7) // Complex - can't optimize (only Bob has status "inactive" = 8 > 7) + ), + })! expect(result).toHaveLength(2) // Charlie (name length 7 > 6), Bob (status length 8 > 7) const names = result.map((r) => r.value.name).sort() @@ -861,8 +975,11 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Query only on fields without indexes (name and score fields don't have indexes) const result = collection.currentStateAsChanges({ - where: (row) => and(eq(row.name, `Alice`), eq(row.score, 95)), - }) + where: and( + eq(new PropRef([`name`]), `Alice`), + eq(new PropRef([`score`]), 95) + ), + })! expect(result).toHaveLength(1) // Alice expect(result[0]?.value.name).toBe(`Alice`) @@ -883,11 +1000,10 @@ describe(`Collection Indexes`, () => { // First: partial optimization (age index + name filter) withIndexTracking(collection, (tracker1) => { const result1 = collection.currentStateAsChanges({ - where: (row) => - and( - eq(row.age, 25), // Can optimize - has index - eq(row.name, `Alice`) // Can't optimize - no index - ), + where: and( + eq(new PropRef([`age`]), 25), // Can optimize - has index + eq(new PropRef([`name`]), `Alice`) // Can't optimize - no index + ), }) expect(result1).toHaveLength(1) // Alice via partial optimization @@ -902,11 +1018,10 @@ describe(`Collection Indexes`, () => { // Second: full scan (no optimizable conditions) withIndexTracking(collection, (tracker2) => { const result2 = collection.currentStateAsChanges({ - where: (row) => - and( - eq(row.name, `Alice`), // Can't optimize - no index - gt(length(row.name), 3) // Can't optimize - complex expression - ), + where: and( + eq(new PropRef([`name`]), `Alice`), // Can't optimize - no index + gt(length(new PropRef([`name`])), 3) // Can't optimize - complex expression + ), }) expect(result2).toHaveLength(1) // Alice via full scan @@ -936,17 +1051,17 @@ describe(`Collection Indexes`, () => { withIndexTracking(collection, (tracker) => { // Query using age index const ageQuery = collection.currentStateAsChanges({ - where: (row) => gte(row.age, 30), + where: gte(new PropRef([`age`]), 30), }) // Query using status index const statusQuery = collection.currentStateAsChanges({ - where: (row) => eq(row.status, `active`), + where: eq(new PropRef([`status`]), `active`), }) // Query using name index const nameQuery = collection.currentStateAsChanges({ - where: (row) => eq(row.name, `Alice`), + where: eq(new PropRef([`name`]), `Alice`), }) expect(ageQuery).toHaveLength(2) // Bob (30), Charlie (35) @@ -987,11 +1102,11 @@ describe(`Collection Indexes`, () => { const changes: Array = [] // Subscribe with a where clause that should use index - const unsubscribe = collection.subscribeChanges( + const subscription = collection.subscribeChanges( (items) => changes.push(...items), { includeInitialState: true, - where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), } ) @@ -1005,7 +1120,7 @@ describe(`Collection Indexes`, () => { fullScanCallCount: 0, }) - unsubscribe() + subscription.unsubscribe() }) }) }) @@ -1020,13 +1135,13 @@ describe(`Collection Indexes`, () => { await withIndexTracking(collection, async (tracker) => { const changes: Array = [] - const unsubscribe = collection.subscribeChanges( + const subscription = collection.subscribeChanges( (items) => { changes.push(...items) }, { includeInitialState: true, - where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), } ) @@ -1093,7 +1208,7 @@ describe(`Collection Indexes`, () => { expect(changes[0]?.key).toBe(`1`) expect(changes[0]?.value.status).toBe(`active`) // Should be the previous value - unsubscribe() + subscription.unsubscribe() }) }) @@ -1101,13 +1216,13 @@ describe(`Collection Indexes`, () => { await withIndexTracking(collection, async (tracker) => { const changes: Array = [] - const unsubscribe = collection.subscribeChanges( + const subscription = collection.subscribeChanges( (items) => { changes.push(...items) }, { includeInitialState: true, - where: (row) => gte(row.age, 30), + whereExpression: gte(new PropRef([`age`]), 30), } ) @@ -1138,7 +1253,7 @@ describe(`Collection Indexes`, () => { expect(changes).toHaveLength(1) expect(changes[0]?.value.name).toBe(`Diana`) - unsubscribe() + subscription.unsubscribe() }) }) @@ -1148,13 +1263,13 @@ describe(`Collection Indexes`, () => { await withIndexTracking(collection, (tracker) => { const changes: Array = [] - const unsubscribe = collection.subscribeChanges( + const subscription = collection.subscribeChanges( (items) => { changes.push(...items) }, { includeInitialState: true, - where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), } ) @@ -1168,7 +1283,7 @@ describe(`Collection Indexes`, () => { fullScanCallCount: 0, }) - unsubscribe() + subscription.unsubscribe() }) }) }) @@ -1234,22 +1349,22 @@ describe(`Collection Indexes`, () => { withIndexTracking(specialCollection, (tracker) => { // Query for zero age const zeroAgeResult = specialCollection.currentStateAsChanges({ - where: (row) => eq(row.age, 0), - }) + where: eq(new PropRef([`age`]), 0), + })! expect(zeroAgeResult).toHaveLength(1) expect(zeroAgeResult[0]?.value.name).toBe(`Zero Age`) // Query for negative age const negativeAgeResult = specialCollection.currentStateAsChanges({ - where: (row) => eq(row.age, -5), - }) + where: eq(new PropRef([`age`]), -5), + })! expect(negativeAgeResult).toHaveLength(1) expect(negativeAgeResult[0]?.value.name).toBe(`Negative Age`) // Query for ages greater than negative const gtNegativeResult = specialCollection.currentStateAsChanges({ - where: (row) => gt(row.age, -1), - }) + where: gt(new PropRef([`age`]), -1), + })! expect(gtNegativeResult.length).toBeGreaterThan(0) // Should find positive ages // Verify all queries used indexes @@ -1319,8 +1434,8 @@ describe(`Collection Indexes`, () => { // Test that index-optimized queries work with the updated data withIndexTracking(collection, (tracker) => { const result = collection.currentStateAsChanges({ - where: (row) => gte(row.age, 50), - }) + where: gte(new PropRef([`age`]), 50), + })! // Should find items with age >= 50 using index expect(result.length).toBeGreaterThanOrEqual(1) diff --git a/packages/db/tests/collection-lifecycle.test.ts b/packages/db/tests/collection-lifecycle.test.ts index 62c32dfc7..22ad48817 100644 --- a/packages/db/tests/collection-lifecycle.test.ts +++ b/packages/db/tests/collection-lifecycle.test.ts @@ -139,7 +139,7 @@ describe(`Collection Lifecycle Management`, () => { expect(collection.status).toBe(`idle`) - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) expect(collection.status).toBe(`loading`) @@ -150,7 +150,7 @@ describe(`Collection Lifecycle Management`, () => { expect(collection.status).toBe(`ready`) - unsubscribe() + subscription.unsubscribe() expect(collection.status).toBe(`ready`) }) @@ -203,40 +203,17 @@ describe(`Collection Lifecycle Management`, () => { expect((collection as any).activeSubscribersCount).toBe(0) // Subscribe to changes - const unsubscribe1 = collection.subscribeChanges(() => {}) + const subscription1 = collection.subscribeChanges(() => {}) expect((collection as any).activeSubscribersCount).toBe(1) - const unsubscribe2 = collection.subscribeChanges(() => {}) + const subscription2 = collection.subscribeChanges(() => {}) expect((collection as any).activeSubscribersCount).toBe(2) // Unsubscribe - unsubscribe1() + subscription1.unsubscribe() expect((collection as any).activeSubscribersCount).toBe(1) - unsubscribe2() - expect((collection as any).activeSubscribersCount).toBe(0) - }) - - it(`should track key-specific subscribers`, () => { - const collection = createCollection<{ id: string; name: string }>({ - id: `key-subscriber-test`, - getKey: (item) => item.id, - sync: { - sync: () => {}, - }, - }) - - const unsubscribe1 = collection.subscribeChangesKey(`key1`, () => {}) - const unsubscribe2 = collection.subscribeChangesKey(`key2`, () => {}) - const unsubscribe3 = collection.subscribeChangesKey(`key1`, () => {}) - - expect((collection as any).activeSubscribersCount).toBe(3) - - unsubscribe1() - expect((collection as any).activeSubscribersCount).toBe(2) - - unsubscribe2() - unsubscribe3() + subscription2.unsubscribe() expect((collection as any).activeSubscribersCount).toBe(0) }) @@ -252,9 +229,9 @@ describe(`Collection Lifecycle Management`, () => { // Subscribe and immediately unsubscribe multiple times for (let i = 0; i < 5; i++) { - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) expect((collection as any).activeSubscribersCount).toBe(1) - unsubscribe() + subscription.unsubscribe() expect((collection as any).activeSubscribersCount).toBe(0) // Should start GC timer each time @@ -276,12 +253,12 @@ describe(`Collection Lifecycle Management`, () => { }, }) - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Should not have GC timer while there are subscribers expect(mockSetTimeout).not.toHaveBeenCalled() - unsubscribe() + subscription.unsubscribe() // Should start GC timer when last subscriber is removed expect(mockSetTimeout).toHaveBeenCalledWith(expect.any(Function), 5000) @@ -297,17 +274,17 @@ describe(`Collection Lifecycle Management`, () => { }, }) - const unsubscribe1 = collection.subscribeChanges(() => {}) - unsubscribe1() + const subscription1 = collection.subscribeChanges(() => {}) + subscription1.unsubscribe() expect(mockSetTimeout).toHaveBeenCalledTimes(1) const timerId = mockSetTimeout.mock.results[0]?.value // Add new subscriber should cancel GC timer - const unsubscribe2 = collection.subscribeChanges(() => {}) + const subscription2 = collection.subscribeChanges(() => {}) expect(mockClearTimeout).toHaveBeenCalledWith(timerId) - unsubscribe2() + subscription2.unsubscribe() }) it(`should cleanup collection when GC timer fires`, () => { @@ -320,8 +297,8 @@ describe(`Collection Lifecycle Management`, () => { }, }) - const unsubscribe = collection.subscribeChanges(() => {}) - unsubscribe() + const subscription = collection.subscribeChanges(() => {}) + subscription.unsubscribe() expect(collection.status).toBe(`loading`) // or "ready" @@ -343,8 +320,8 @@ describe(`Collection Lifecycle Management`, () => { }, }) - const unsubscribe = collection.subscribeChanges(() => {}) - unsubscribe() + const subscription = collection.subscribeChanges(() => {}) + subscription.unsubscribe() // Should use default 5 minutes (300000ms) expect(mockSetTimeout).toHaveBeenCalledWith(expect.any(Function), 300000) @@ -360,8 +337,8 @@ describe(`Collection Lifecycle Management`, () => { }, }) - const unsubscribe = collection.subscribeChanges(() => {}) - unsubscribe() + const subscription = collection.subscribeChanges(() => {}) + subscription.unsubscribe() // Should not start any timer when GC is disabled expect(mockSetTimeout).not.toHaveBeenCalled() @@ -458,7 +435,7 @@ describe(`Collection Lifecycle Management`, () => { }, }) - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Register callbacks collection.onFirstReady(() => callbacks.push(() => `callback1`)) @@ -479,7 +456,7 @@ describe(`Collection Lifecycle Management`, () => { } expect(callbacks).toHaveLength(2) - unsubscribe() + subscription.unsubscribe() }) }) }) diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 6b3ea1d76..4abcc37e7 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -3,6 +3,7 @@ import mitt from "mitt" import { createCollection } from "../src/collection" import { createTransaction } from "../src/transactions" import { eq } from "../src/query/builder/functions" +import { PropRef } from "../src/query/ir" import type { ChangeMessage, ChangesPayload, @@ -42,7 +43,7 @@ describe(`Collection.subscribeChanges`, () => { // await waitForChanges() // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback, { + const subscription = collection.subscribeChanges(callback, { includeInitialState: true, }) @@ -61,7 +62,7 @@ describe(`Collection.subscribeChanges`, () => { expect(changes.every((change) => change.type === `insert`)).toBe(true) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should not emit initial collection state as insert changes by default`, () => { @@ -92,13 +93,13 @@ describe(`Collection.subscribeChanges`, () => { // await waitForChanges() // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback) + const subscription = collection.subscribeChanges(callback) // Verify that callback was called with initial state expect(callback).toHaveBeenCalledTimes(0) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should emit changes from synced operations`, () => { @@ -133,7 +134,7 @@ describe(`Collection.subscribeChanges`, () => { }) // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback) + const subscription = collection.subscribeChanges(callback) // Reset mock to ignore initial state emission callback.mockReset() @@ -210,7 +211,7 @@ describe(`Collection.subscribeChanges`, () => { expect(deleteChange.type).toBe(`delete`) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should emit changes from optimistic operations`, () => { @@ -253,7 +254,7 @@ describe(`Collection.subscribeChanges`, () => { } // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback) + const subscription = collection.subscribeChanges(callback) // Reset mock to ignore initial state emission callback.mockReset() @@ -339,7 +340,7 @@ describe(`Collection.subscribeChanges`, () => { expect(deleteChange.key).toBe(1) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should handle both synced and optimistic changes together`, async () => { @@ -379,7 +380,7 @@ describe(`Collection.subscribeChanges`, () => { } // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback) + const subscription = collection.subscribeChanges(callback) // Reset mock to ignore initial state emission callback.mockReset() @@ -487,7 +488,7 @@ describe(`Collection.subscribeChanges`, () => { expect(updateChange.value).toEqual({ id: 1, value: `updated synced value` }) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should only emit differences between states, not whole state`, async () => { @@ -534,7 +535,7 @@ describe(`Collection.subscribeChanges`, () => { } // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback, { + const subscription = collection.subscribeChanges(callback, { includeInitialState: true, }) @@ -603,7 +604,7 @@ describe(`Collection.subscribeChanges`, () => { expect(updateChange.key).toBe(1) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should correctly unsubscribe when returned function is called`, () => { @@ -623,7 +624,7 @@ describe(`Collection.subscribeChanges`, () => { const mutationFn = async () => {} // Subscribe to changes - const unsubscribe = collection.subscribeChanges(callback, { + const subscription = collection.subscribeChanges(callback, { includeInitialState: true, }) @@ -634,7 +635,7 @@ describe(`Collection.subscribeChanges`, () => { callback.mockReset() // Unsubscribe - unsubscribe() + subscription.unsubscribe() // Insert an item const tx = createTransaction({ mutationFn }) @@ -708,9 +709,9 @@ describe(`Collection.subscribeChanges`, () => { } // Subscribe to changes with a filter for active items only - const unsubscribe = collection.subscribeChanges(callback, { + const subscription = collection.subscribeChanges(callback, { includeInitialState: true, - where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), }) // Should only receive the active item in initial state @@ -808,7 +809,7 @@ describe(`Collection.subscribeChanges`, () => { expect(callback).not.toHaveBeenCalled() // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should emit delete events for all items when truncate is called`, async () => { @@ -841,9 +842,14 @@ describe(`Collection.subscribeChanges`, () => { }) // Listen to change events - collection.subscribeChanges((changes) => { - changeEvents.push(...changes) - }) + collection.subscribeChanges( + (changes) => { + changeEvents.push(...changes) + }, + { + includeInitialState: true, + } + ) await collection.stateWhenReady() @@ -852,6 +858,8 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value 1` }) expect(collection.state.get(2)).toEqual({ id: 2, value: `initial value 2` }) + expect(changeEvents).toHaveLength(2) + // Clear change events from initial state changeEvents.length = 0 @@ -911,9 +919,14 @@ describe(`Collection.subscribeChanges`, () => { }) // Listen to change events - collection.subscribeChanges((changes) => { - changeEvents.push(...changes) - }) + collection.subscribeChanges( + (changes) => { + changeEvents.push(...changes) + }, + { + includeInitialState: true, + } + ) await collection.stateWhenReady() @@ -1017,9 +1030,14 @@ describe(`Collection.subscribeChanges`, () => { }) // Listen to change events - collection.subscribeChanges((changes) => { - changeEvents.push(...changes) - }) + collection.subscribeChanges( + (changes) => { + changeEvents.push(...changes) + }, + { + includeInitialState: true, + } + ) await collection.stateWhenReady() @@ -1244,7 +1262,9 @@ describe(`Collection.subscribeChanges`, () => { }, }, }) - collection.subscribeChanges((c) => changeEvents.push(...c)) + collection.subscribeChanges((c) => changeEvents.push(...c), { + includeInitialState: true, + }) await collection.stateWhenReady() // Optimistic insert for id 2 (did not exist before) diff --git a/packages/db/tests/local-only.test.ts b/packages/db/tests/local-only.test.ts index 7155b5d17..7be3b59ec 100644 --- a/packages/db/tests/local-only.test.ts +++ b/packages/db/tests/local-only.test.ts @@ -122,7 +122,7 @@ describe(`LocalOnly Collection`, () => { const changeHandler = vi.fn() // Subscribe to changes - const unsubscribe = collection.subscribeChanges(changeHandler) + const subscription = collection.subscribeChanges(changeHandler) // Insert an item collection.insert({ id: 1, name: `Test Item` }) @@ -138,7 +138,7 @@ describe(`LocalOnly Collection`, () => { ]) // Clean up - unsubscribe() + subscription.unsubscribe() }) it(`should support toArray method`, () => { diff --git a/packages/db/tests/local-storage.test.ts b/packages/db/tests/local-storage.test.ts index 859042bdc..51e36f97d 100644 --- a/packages/db/tests/local-storage.test.ts +++ b/packages/db/tests/local-storage.test.ts @@ -201,13 +201,13 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Should load the existing data expect(collection.size).toBe(1) expect(collection.get(`1`)?.title).toBe(`Existing Todo`) - unsubscribe() + subscription.unsubscribe() }) it(`should handle corrupted storage data gracefully`, () => { @@ -255,7 +255,7 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) const todo: Todo = { id: `1`, @@ -296,7 +296,7 @@ describe(`localStorage collection`, () => { parsed = JSON.parse(storedData!) expect(parsed[`1`]).toBeUndefined() - unsubscribe() + subscription.unsubscribe() }) it(`should call mutation handlers when provided and still persist data`, async () => { @@ -317,7 +317,7 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) const todo: Todo = { id: `1`, @@ -358,7 +358,7 @@ describe(`localStorage collection`, () => { parsed = JSON.parse(storedData!) expect(parsed[`1`]).toBeUndefined() - unsubscribe() + subscription.unsubscribe() }) it(`should perform insert operations and update storage`, async () => { @@ -423,7 +423,7 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Update the todo - this automatically creates a transaction and calls onUpdate const tx = collection.update(`1`, (draft) => { @@ -439,7 +439,7 @@ describe(`localStorage collection`, () => { expect(parsed[`1`].versionKey).not.toBe(`initial-version`) // Should have new version key expect(parsed[`1`].data.title).toBe(`Updated Todo`) - unsubscribe() + subscription.unsubscribe() }) it(`should perform delete operations and update storage`, async () => { @@ -468,7 +468,7 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Delete the todo - this automatically creates a transaction and calls onDelete const tx = collection.delete(`1`) @@ -481,7 +481,7 @@ describe(`localStorage collection`, () => { const parsed = JSON.parse(storedData!) expect(parsed[`1`]).toBeUndefined() - unsubscribe() + subscription.unsubscribe() }) }) @@ -497,7 +497,7 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Simulate data being added from another tab const newTodoData = { @@ -532,7 +532,7 @@ describe(`localStorage collection`, () => { expect(collection.size).toBe(1) expect(collection.get(`1`)?.title).toBe(`From Another Tab`) - unsubscribe() + subscription.unsubscribe() }) it(`should ignore storage events for different keys`, () => { @@ -673,7 +673,7 @@ describe(`localStorage collection`, () => { ) // Subscribe to trigger sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) expect(collection.size).toBe(1) expect(collection.get(`1`)?.title).toBe(`Initial`) @@ -709,7 +709,7 @@ describe(`localStorage collection`, () => { expect(collection.size).toBe(1) expect(collection.get(`1`)?.title).toBe(`Updated`) - unsubscribe() + subscription.unsubscribe() }) it(`should not trigger unnecessary updates for same version key`, () => { diff --git a/packages/db/tests/query/compiler/basic.test.ts b/packages/db/tests/query/compiler/basic.test.ts index 655f1601b..74cfef707 100644 --- a/packages/db/tests/query/compiler/basic.test.ts +++ b/packages/db/tests/query/compiler/basic.test.ts @@ -48,6 +48,7 @@ describe(`Query2 Compiler`, () => { { users: input }, { users: usersCollection }, {}, + {}, new Set(), {} ) @@ -102,6 +103,7 @@ describe(`Query2 Compiler`, () => { { users: input }, { users: usersCollection }, {}, + {}, new Set(), {} ) @@ -178,6 +180,7 @@ describe(`Query2 Compiler`, () => { { users: input }, { users: usersCollection }, {}, + {}, new Set(), {} ) @@ -242,6 +245,7 @@ describe(`Query2 Compiler`, () => { { users: input }, { users: usersCollection }, {}, + {}, new Set(), {} ) diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index e7bab73eb..1cef89cd4 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -174,6 +174,7 @@ describe(`Query2 Subqueries`, () => { { issues: issuesInput }, { issues: issuesCollection }, {}, + {}, new Set(), {} ) @@ -269,6 +270,9 @@ describe(`Query2 Subqueries`, () => { const builtQuery = getQueryIR(query) + const usersSubscription = usersCollection.subscribeChanges(() => {}) + const issuesSubscription = issuesCollection.subscribeChanges(() => {}) + // Compile and execute the query const graph = new D2() const issuesInput = createIssueInput(graph) @@ -281,6 +285,10 @@ describe(`Query2 Subqueries`, () => { users: usersInput, }, { issues: issuesCollection, users: usersCollection }, + { + [usersCollection.id]: usersSubscription, + [issuesCollection.id]: issuesSubscription, + }, { issues: dummyCallbacks, users: dummyCallbacks }, lazyCollections, {} @@ -349,6 +357,7 @@ describe(`Query2 Subqueries`, () => { { issues: issuesInput }, { issues: issuesCollection }, {}, + {}, new Set(), {} ) diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index 97bc639c7..ad1108c0a 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -52,6 +52,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, cache1 @@ -68,6 +69,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, cache2 @@ -86,6 +88,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, cache2 @@ -104,6 +107,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, cache2 @@ -113,6 +117,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, cache2 @@ -148,6 +153,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, sharedCache @@ -160,6 +166,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, sharedCache @@ -204,6 +211,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, sharedCache @@ -213,6 +221,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, sharedCache @@ -275,6 +284,7 @@ describe(`Subquery Caching`, () => { inputs, { users: usersCollection }, {}, + {}, new Set(), {}, sharedCache diff --git a/packages/db/tests/query/distinct.test.ts b/packages/db/tests/query/distinct.test.ts index 101444294..1f2b0414f 100644 --- a/packages/db/tests/query/distinct.test.ts +++ b/packages/db/tests/query/distinct.test.ts @@ -477,7 +477,7 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { emptyCollection.utils.commit() expect(emptyDistinct.size).toBe(1) - const department = emptyDistinct.get(1) + const department = emptyDistinct.toArray[0] expect(department?.department).toBe(`Test`) }) diff --git a/packages/db/tests/query/indexes.test.ts b/packages/db/tests/query/indexes.test.ts index 24798fd77..6750afd90 100644 --- a/packages/db/tests/query/indexes.test.ts +++ b/packages/db/tests/query/indexes.test.ts @@ -775,9 +775,16 @@ describe(`Query Index Optimization`, () => { }, ]) - // We should have done an index lookup on the 1st collection to find matching items - // i.e. items with id "1" + // We should have done 2 index lookups: + // 1. to find active items + // 2. to find items with matching IDs expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, { type: `index`, operation: `in`, @@ -1173,6 +1180,12 @@ describe(`Query Index Optimization`, () => { // We should have done an index lookup on the 1st collection to find active items expect(tracker1.stats.queriesExecuted).toEqual([ + { + field: `status`, + operation: `eq`, + type: `index`, + value: `active`, + }, { type: `index`, operation: `in`, diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index edf7c6127..6b7dbe0b8 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -306,12 +306,12 @@ describe(`createLiveQueryCollection`, () => { gcTime: 1, }) - const unsubscribe = liveQuery.subscribeChanges(() => {}) + const subscription = liveQuery.subscribeChanges(() => {}) await liveQuery.preload() expect(liveQuery.status).toBe(`ready`) // Unsubscribe and wait for GC to run and cleanup to complete - unsubscribe() + subscription.unsubscribe() const deadline = Date.now() + 500 while (liveQuery.status !== `cleaned-up` && Date.now() < deadline) { await new Promise((r) => setTimeout(r, 1)) @@ -390,14 +390,14 @@ describe(`createLiveQueryCollection`, () => { expect(nestedLQ.status).toBe(`ready`) // First subscription cycle - const unsubscribe1 = nestedLQ.subscribeChanges(() => {}) + const subscription1 = nestedLQ.subscribeChanges(() => {}) // Verify we still have data after subscribing expect(nestedLQ.size).toBe(2) expect(nestedLQ.status).toBe(`ready`) // Unsubscribe and wait for GC - unsubscribe1() + subscription1.unsubscribe() const deadline1 = Date.now() + 500 while (nestedLQ.status !== `cleaned-up` && Date.now() < deadline1) { await new Promise((r) => setTimeout(r, 1)) @@ -407,7 +407,7 @@ describe(`createLiveQueryCollection`, () => { // Try multiple resubscribe cycles to increase chance of reproduction for (let i = 0; i < 3; i++) { // Resubscribe - const unsubscribe2 = nestedLQ.subscribeChanges(() => {}) + const subscription2 = nestedLQ.subscribeChanges(() => {}) // Wait for the collection to potentially recover await new Promise((r) => setTimeout(r, 50)) @@ -416,7 +416,7 @@ describe(`createLiveQueryCollection`, () => { expect(nestedLQ.size).toBe(2) // Unsubscribe and wait for GC again - unsubscribe2() + subscription2.unsubscribe() const deadline2 = Date.now() + 500 while (nestedLQ.status !== `cleaned-up` && Date.now() < deadline2) { await new Promise((r) => setTimeout(r, 1)) @@ -428,7 +428,7 @@ describe(`createLiveQueryCollection`, () => { } // Final verification - resubscribe one more time and ensure data is available - const finalUnsubscribe = nestedLQ.subscribeChanges(() => {}) + const finalSubscription = nestedLQ.subscribeChanges(() => {}) // Wait for the collection to become ready const finalDeadline = Date.now() + 1000 @@ -439,7 +439,7 @@ describe(`createLiveQueryCollection`, () => { expect(nestedLQ.status).toBe(`ready`) expect(nestedLQ.size).toBe(2) - finalUnsubscribe() + finalSubscription.unsubscribe() }) it(`should handle temporal values correctly in live queries`, async () => { diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index fdaf56f2b..0d6b4796e 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -791,13 +791,13 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`cleaned-up`) // Access collection data to restart sync - const unsubscribe = testCollection.subscribeChanges(() => {}) + const subscription = testCollection.subscribeChanges(() => {}) // Should have started a new stream expect(mockSubscribe).toHaveBeenCalledTimes(2) expect(testCollection.status).toBe(`loading`) - unsubscribe() + subscription.unsubscribe() }) it(`should handle stream errors gracefully`, () => { @@ -1073,8 +1073,8 @@ describe(`Electric Integration`, () => { expect(testCollection.size).toBe(2) // Subscribe and then unsubscribe to trigger GC timer - const unsubscribe = testCollection.subscribeChanges(() => {}) - unsubscribe() + const subscription = testCollection.subscribeChanges(() => {}) + subscription.unsubscribe() // Collection should still be ready before GC timer fires expect(testCollection.status).toBe(`ready`) @@ -1091,7 +1091,7 @@ describe(`Electric Integration`, () => { const initialMockCallCount = mockSubscribe.mock.calls.length // Subscribe again - this should restart the sync - const newUnsubscribe = testCollection.subscribeChanges(() => {}) + const newSubscription = testCollection.subscribeChanges(() => {}) // Should have created a new stream expect(mockSubscribe.mock.calls.length).toBe(initialMockCallCount + 1) @@ -1125,7 +1125,7 @@ describe(`Electric Integration`, () => { // Old data should not be present (collection was cleaned) expect(testCollection.has(2)).toBe(false) - newUnsubscribe() + newSubscription.unsubscribe() // Restore real timers vi.useRealTimers() diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index b14fa591a..4bba0e59a 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -740,12 +740,12 @@ describe(`QueryCollection`, () => { expect(collection.status).toBe(`cleaned-up`) // Access collection data to restart sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Should restart sync (might be ready immediately if query is cached) expect([`loading`, `ready`]).toContain(collection.status) - unsubscribe() + subscription.unsubscribe() }) it(`should handle query lifecycle during restart cycle`, async () => { @@ -790,13 +790,13 @@ describe(`QueryCollection`, () => { removeQueriesSpy.mockClear() // Restart by accessing collection - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) // Should restart sync expect([`loading`, `ready`]).toContain(collection.status) // Cleanup again to verify the new sync cleanup works - unsubscribe() + subscription.unsubscribe() await collection.cleanup() await flushPromises() @@ -948,8 +948,8 @@ describe(`QueryCollection`, () => { const changeHandler1 = vi.fn() const changeHandler2 = vi.fn() - const unsubscribe1 = collection.subscribeChanges(changeHandler1) - const unsubscribe2 = collection.subscribeChanges(changeHandler2) + const subscription1 = collection.subscribeChanges(changeHandler1) + const subscription2 = collection.subscribeChanges(changeHandler2) // Change the data and trigger a refetch items = [{ id: `1`, name: `Item 1 Updated` }] @@ -965,7 +965,7 @@ describe(`QueryCollection`, () => { expect(changeHandler2).toHaveBeenCalled() // Unsubscribe one - unsubscribe1() + subscription1.unsubscribe() changeHandler1.mockClear() changeHandler2.mockClear() @@ -983,7 +983,7 @@ describe(`QueryCollection`, () => { expect(changeHandler2).toHaveBeenCalled() // Cleanup - unsubscribe2() + subscription2.unsubscribe() }) it(`should handle query cancellation gracefully`, async () => { @@ -1810,13 +1810,13 @@ describe(`QueryCollection`, () => { // Read-only operations don't affect error state expect(collection.has(`1`)).toBe(true) const changeHandler = vi.fn() - const unsubscribe = collection.subscribeChanges(changeHandler) + const subscription = collection.subscribeChanges(changeHandler) expect(collection.utils.lastError()).toBe(originalError) expect(collection.utils.isError()).toBe(true) expect(collection.utils.errorCount()).toBe(originalErrorCount) - unsubscribe() + subscription.unsubscribe() }) it(`should handle custom error objects correctly`, async () => { diff --git a/packages/react-db/src/useLiveQuery.ts b/packages/react-db/src/useLiveQuery.ts index d4df4904e..c6cf77cae 100644 --- a/packages/react-db/src/useLiveQuery.ts +++ b/packages/react-db/src/useLiveQuery.ts @@ -387,7 +387,7 @@ export function useLiveQuery( return () => {} } - const unsubscribe = collectionRef.current.subscribeChanges(() => { + const subscription = collectionRef.current.subscribeChanges(() => { // Bump version on any change; getSnapshot will rebuild next time versionRef.current += 1 onStoreChange() @@ -398,7 +398,7 @@ export function useLiveQuery( onStoreChange() } return () => { - unsubscribe() + subscription.unsubscribe() } } } diff --git a/packages/rxdb-db-collection/tests/rxdb.test.ts b/packages/rxdb-db-collection/tests/rxdb.test.ts index 02bbc727f..08105066d 100644 --- a/packages/rxdb-db-collection/tests/rxdb.test.ts +++ b/packages/rxdb-db-collection/tests/rxdb.test.ts @@ -225,12 +225,12 @@ describe(`RxDB Integration`, () => { await rxCollection.insert({ id: `3`, name: `Item 3` }) // Access collection data to restart sync - const unsubscribe = collection.subscribeChanges(() => {}) + const subscription = collection.subscribeChanges(() => {}) await collection.toArrayWhenReady() expect(collection.get(`3`).name).toEqual(`Item 3`) - unsubscribe() + subscription.unsubscribe() await db.remove() }) }) diff --git a/packages/solid-db/src/useLiveQuery.ts b/packages/solid-db/src/useLiveQuery.ts index 72bd647d6..a4c4919f5 100644 --- a/packages/solid-db/src/useLiveQuery.ts +++ b/packages/solid-db/src/useLiveQuery.ts @@ -266,11 +266,8 @@ export function useLiveQuery( state.set(key, value) } - // Initialize data array in correct order - syncDataFromCollection(currentCollection) - // Subscribe to collection changes with granular updates - currentUnsubscribe = currentCollection.subscribeChanges( + const subscription = currentCollection.subscribeChanges( (changes: Array>) => { // Apply each change individually to the reactive state batch(() => { @@ -292,9 +289,14 @@ export function useLiveQuery( // Update status ref on every change setStatus(currentCollection.status) + }, + { + includeInitialState: true, } ) + currentUnsubscribe = subscription.unsubscribe.bind(subscription) + // Preload collection data if not already started if (currentCollection.status === `idle`) { createResource(() => currentCollection.preload()) diff --git a/packages/svelte-db/src/useLiveQuery.svelte.ts b/packages/svelte-db/src/useLiveQuery.svelte.ts index 99e819c2e..5a11f2246 100644 --- a/packages/svelte-db/src/useLiveQuery.svelte.ts +++ b/packages/svelte-db/src/useLiveQuery.svelte.ts @@ -325,7 +325,7 @@ export function useLiveQuery( }) // Subscribe to collection changes with granular updates - currentUnsubscribe = currentCollection.subscribeChanges( + const subscription = currentCollection.subscribeChanges( (changes: Array>) => { // Apply each change individually to the reactive state untrack(() => { @@ -346,9 +346,14 @@ export function useLiveQuery( syncDataFromCollection(currentCollection) // Update status state on every change status = currentCollection.status + }, + { + includeInitialState: true, } ) + currentUnsubscribe = subscription.unsubscribe.bind(subscription) + // Preload collection data if not already started if (currentCollection.status === `idle`) { currentCollection.preload().catch(console.error) diff --git a/packages/vue-db/src/useLiveQuery.ts b/packages/vue-db/src/useLiveQuery.ts index e9ad9a31f..4de3881ad 100644 --- a/packages/vue-db/src/useLiveQuery.ts +++ b/packages/vue-db/src/useLiveQuery.ts @@ -309,7 +309,7 @@ export function useLiveQuery( }) // Subscribe to collection changes with granular updates - currentUnsubscribe = currentCollection.subscribeChanges( + const subscription = currentCollection.subscribeChanges( (changes: Array>) => { // Apply each change individually to the reactive state for (const change of changes) { @@ -328,9 +328,14 @@ export function useLiveQuery( syncDataFromCollection(currentCollection) // Update status ref on every change status.value = currentCollection.status + }, + { + includeInitialState: true, } ) + currentUnsubscribe = subscription.unsubscribe.bind(subscription) + // Preload collection data if not already started if (currentCollection.status === `idle`) { currentCollection.preload().catch(console.error)