diff --git a/.changeset/afraid-camels-tickle.md b/.changeset/afraid-camels-tickle.md new file mode 100644 index 000000000..9062592e7 --- /dev/null +++ b/.changeset/afraid-camels-tickle.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +optimise the live query graph execution by removing recursive calls to graph.run diff --git a/.changeset/fifty-ways-hang.md b/.changeset/fifty-ways-hang.md new file mode 100644 index 000000000..57a99d4e5 --- /dev/null +++ b/.changeset/fifty-ways-hang.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Fix a bug with distinct operator diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index dc2f5a177..f1880858d 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -4,21 +4,25 @@ import { hash } from "../hashing/index.js" import { MultiSet } from "../multiset.js" import type { Hash } from "../hashing/index.js" import type { DifferenceStreamReader } from "../graph.js" -import type { IStreamBuilder } from "../types.js" +import type { IStreamBuilder, KeyValue } from "../types.js" type Multiplicity = number +type GetValue = T extends KeyValue ? V : never + /** * Operator that removes duplicates */ -export class DistinctOperator extends UnaryOperator { +export class DistinctOperator< + T extends KeyValue, +> extends UnaryOperator>> { #by: (value: T) => any #values: Map // keeps track of the number of times each value has been seen constructor( id: number, input: DifferenceStreamReader, - output: DifferenceStreamWriter, + output: DifferenceStreamWriter>>, by: (value: T) => any = (value: T) => value ) { super(id, input, output) @@ -39,12 +43,11 @@ export class DistinctOperator extends UnaryOperator { this.#values.get(hashedValue) ?? 0 const newMultiplicity = oldMultiplicity + diff - updatedValues.set(hashedValue, [newMultiplicity, value]) } } - const result: Array<[T, number]> = [] + const result: Array<[KeyValue>, number]> = [] // Check which values became visible or disappeared for (const [ @@ -62,11 +65,11 @@ export class DistinctOperator extends UnaryOperator { if (oldMultiplicity <= 0 && newMultiplicity > 0) { // The value wasn't present in the stream // but with this change it is now present in the stream - result.push([value, 1]) + result.push([[hash(this.#by(value)), value[1]], 1]) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was present in the stream // but with this change it is no longer present in the stream - result.push([value, -1]) + result.push([[hash(this.#by(value)), value[1]], -1]) } } @@ -79,7 +82,9 @@ export class DistinctOperator extends UnaryOperator { /** * Removes duplicate values */ -export function distinct(by: (value: T) => any = (value: T) => value) { +export function distinct>( + by: (value: T) => any = (value: T) => value +) { return (stream: IStreamBuilder): IStreamBuilder => { const output = new StreamBuilder( stream.graph, diff --git a/packages/db-ivm/tests/operators/distinct.test.ts b/packages/db-ivm/tests/operators/distinct.test.ts index 9a7645ded..aa865e25c 100644 --- a/packages/db-ivm/tests/operators/distinct.test.ts +++ b/packages/db-ivm/tests/operators/distinct.test.ts @@ -4,6 +4,7 @@ import { MultiSet } from "../../src/multiset.js" import { distinct } from "../../src/operators/distinct.js" import { output } from "../../src/operators/output.js" import { MessageTracker, assertResults } from "../test-utils.js" +import { hash } from "../../src/hashing/index.js" describe(`Operators`, () => { describe(`Efficient distinct operation`, () => { @@ -39,9 +40,9 @@ function testDistinct() { expect(data).toEqual([ [ - [[1, `a`], 1], - [[2, `b`], 1], - [[2, `c`], 1], + [[hash([1, `a`]), `a`], 1], + [[hash([2, `b`]), `b`], 1], + [[hash([2, `c`]), `c`], 1], ], ]) }) @@ -74,7 +75,7 @@ function testDistinct() { graph.run() - const data = messages.map((m) => m.getInner())[0] + const data = messages.map((m) => m.getInner())[0]! const countries = data .map(([[_, value], multiplicity]) => [value.country, multiplicity]) .sort() @@ -118,8 +119,8 @@ function testDistinct() { `distinct with updates - initial`, initialResult, [ - [1, `a`], - [1, `b`], + [hash([1, `a`]), `a`], + [hash([1, `b`]), `b`], ], // Should have both distinct values 4 // Max expected messages ) @@ -140,7 +141,7 @@ function testDistinct() { assertResults( `distinct with updates - second batch`, secondResult, - [[1, `c`]], // Should only have 'c' remaining + [[hash([1, `c`]), `c`]], // Should only have 'c' remaining 4 // Max expected messages ) @@ -186,9 +187,9 @@ function testDistinct() { expect(data).toEqual([ [ - [[`key1`, 1], 1], - [[`key1`, 2], 1], - [[`key2`, 1], 1], + [[hash([`key1`, 1]), 1], 1], + [[hash([`key1`, 2]), 2], 1], + [[hash([`key2`, 1]), 1], 1], ], ]) }) @@ -224,8 +225,8 @@ function testDistinct() { `distinct with multiple batches that cancel out`, result, [ - [`key1`, 1], // Should remain (multiplicity 2 -> 1 in distinct) - [`key2`, 1], // Should remain (multiplicity 2 -> 1 in distinct) + [hash([`key1`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct) + [hash([`key2`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct) ], 6 // Max expected messages (generous upper bound) ) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 2ce55ff7f..8f67cb644 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -42,6 +42,8 @@ export class CollectionConfigBuilder< private readonly compare?: (val1: TResult, val2: TResult) => number + private isGraphRunning = false + private graphCache: D2 | undefined private inputsCache: Record> | undefined private pipelineCache: ResultStream | undefined @@ -107,25 +109,41 @@ export class CollectionConfigBuilder< syncState: FullSyncState, callback?: () => boolean ) { - const { begin, commit, markReady } = config + if (this.isGraphRunning) { + // no nested runs of the graph + // which is possible if the `callback` + // would call `maybeRunGraph` e.g. after it has loaded some more data + return + } - // We only run the graph if all the collections are ready - if ( - this.allCollectionsReadyOrInitialCommit() && - syncState.subscribedToAllCollections - ) { - syncState.graph.run() - const ready = callback?.() ?? true - // On the initial run, we may need to do an empty commit to ensure that - // the collection is initialized - if (syncState.messagesCount === 0) { - begin() - commit() - } - // Mark the collection as ready after the first successful run - if (ready && this.allCollectionsReady()) { - markReady() + this.isGraphRunning = true + + try { + const { begin, commit, markReady } = config + + // We only run the graph if all the collections are ready + if ( + this.allCollectionsReadyOrInitialCommit() && + syncState.subscribedToAllCollections + ) { + while (syncState.graph.pendingWork()) { + syncState.graph.run() + callback?.() + } + + // On the initial run, we may need to do an empty commit to ensure that + // the collection is initialized + if (syncState.messagesCount === 0) { + begin() + commit() + } + // Mark the collection as ready after the first successful run + if (this.allCollectionsReady()) { + markReady() + } } + } finally { + this.isGraphRunning = false } } diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index b4030558e..319b8e09e 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -318,7 +318,11 @@ function sendChangesToInput( multiSetArray.push([[key, change.value], -1]) } } - input.sendData(new MultiSet(multiSetArray)) + + if (multiSetArray.length !== 0) { + input.sendData(new MultiSet(multiSetArray)) + } + return multiSetArray.length } diff --git a/packages/db/tests/query/distinct.test.ts b/packages/db/tests/query/distinct.test.ts index 101444294..1f2b0414f 100644 --- a/packages/db/tests/query/distinct.test.ts +++ b/packages/db/tests/query/distinct.test.ts @@ -477,7 +477,7 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { emptyCollection.utils.commit() expect(emptyDistinct.size).toBe(1) - const department = emptyDistinct.get(1) + const department = emptyDistinct.toArray[0] expect(department?.department).toBe(`Test`) })