From 1d5a19367e74eb520b21e82ea7b27c80638c709c Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 18 Sep 2025 15:41:43 +0200 Subject: [PATCH 01/10] Modifed maybeRunGraph to run until there's no more pending work --- .../query/live/collection-config-builder.ts | 23 +++++++++++++--- packages/db/tests/query/order-by.test.ts | 26 ++++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 2ce55ff7f..b04f1a267 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -42,6 +42,8 @@ export class CollectionConfigBuilder< private readonly compare?: (val1: TResult, val2: TResult) => number + private isGraphRunning = false + private graphCache: D2 | undefined private inputsCache: Record> | undefined private pipelineCache: ResultStream | undefined @@ -107,6 +109,16 @@ export class CollectionConfigBuilder< syncState: FullSyncState, callback?: () => boolean ) { + if (this.isGraphRunning) { + // no nested runs of the graph + // which is possible if the `callback` + // would call `maybeRunGraph` e.g. after it has loaded some more data + console.log(`IGNORING MAYBERUNGRAPH`) + return + } + + this.isGraphRunning = true + const { begin, commit, markReady } = config // We only run the graph if all the collections are ready @@ -114,8 +126,11 @@ export class CollectionConfigBuilder< this.allCollectionsReadyOrInitialCommit() && syncState.subscribedToAllCollections ) { - syncState.graph.run() - const ready = callback?.() ?? true + while (syncState.graph.pendingWork()) { + syncState.graph.run() + callback?.() + } + // On the initial run, we may need to do an empty commit to ensure that // the collection is initialized if (syncState.messagesCount === 0) { @@ -123,10 +138,12 @@ export class CollectionConfigBuilder< commit() } // Mark the collection as ready after the first successful run - if (ready && this.allCollectionsReady()) { + if (this.allCollectionsReady()) { markReady() } } + + this.isGraphRunning = false } private getSyncConfig(): SyncConfig { diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index 4b336dc44..b7596d06d 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -257,8 +257,10 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { let departmentsCollection: ReturnType beforeEach(() => { + console.log(`before each`) employeesCollection = createEmployeesCollection(autoIndex) departmentsCollection = createDepartmentsCollection(autoIndex) + console.log(`end of before each`) }) describe(`Basic OrderBy`, () => { @@ -405,6 +407,7 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies offset correctly with ordering`, async () => { + console.log(`in applies offset correctly with ordering`) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -425,6 +428,7 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies both limit and offset with ordering`, async () => { + console.log(`in applies both limit and offset with ordering`) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -446,6 +450,7 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`throws error when limit/offset used without orderBy`, () => { + console.log(`in throws error when limit/offset used without orderBy`) expect(() => { createLiveQueryCollection((q) => q @@ -462,6 +467,9 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental insert of a new row before the topK correctly`, async () => { + console.log( + `in applies incremental insert of a new row before the topK correctly` + ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -509,6 +517,9 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental insert of a new row inside the topK correctly`, async () => { + console.log( + `in applies incremental insert of a new row inside the topK correctly` + ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -553,9 +564,13 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { [6, 62_000], [2, 60_000], ]) + console.log(`end`) }) it(`applies incremental insert of a new row inside the topK but after max sent value correctly`, async () => { + console.log( + `in applies incremental insert of a new row inside the topK but after max sent value correctly` + ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -607,6 +622,9 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental insert of a new row after the topK correctly`, async () => { + console.log( + `in applies incremental insert of a new row after the topK correctly` + ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -650,6 +668,9 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental update of a row inside the topK correctly`, async () => { + console.log( + `in applies incremental update of a row inside the topK correctly` + ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -693,6 +714,9 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental delete of a row in the topK correctly`, async () => { + console.log( + `in applies incremental delete of a row in the topK correctly` + ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -1847,6 +1871,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { } describe(`Query2 OrderBy Compiler`, () => { - createOrderByTests(`off`) + // createOrderByTests(`off`) createOrderByTests(`eager`) }) From 2185569e1e1994f3914633dd48577a10649eec9a Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 22 Sep 2025 10:40:21 +0200 Subject: [PATCH 02/10] Do not send empty changes to inputs otherwise it keeps thinking there's some more work to do --- .../query/live/collection-config-builder.ts | 49 ++++++++++--------- .../src/query/live/collection-subscriber.ts | 6 ++- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b04f1a267..de0fb13b2 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -119,31 +119,36 @@ export class CollectionConfigBuilder< this.isGraphRunning = true - const { begin, commit, markReady } = config - - // We only run the graph if all the collections are ready - if ( - this.allCollectionsReadyOrInitialCommit() && - syncState.subscribedToAllCollections - ) { - while (syncState.graph.pendingWork()) { - syncState.graph.run() - callback?.() - } + try { + const { begin, commit, markReady } = config + + // We only run the graph if all the collections are ready + if ( + this.allCollectionsReadyOrInitialCommit() && + syncState.subscribedToAllCollections + ) { + while (syncState.graph.pendingWork()) { + console.log(`RUNNING GRAPH`) + syncState.graph.run() + console.log(`Calling callback`) + callback?.() + console.log(`returned from callback`) + } - // On the initial run, we may need to do an empty commit to ensure that - // the collection is initialized - if (syncState.messagesCount === 0) { - begin() - commit() - } - // Mark the collection as ready after the first successful run - if (this.allCollectionsReady()) { - markReady() + // On the initial run, we may need to do an empty commit to ensure that + // the collection is initialized + if (syncState.messagesCount === 0) { + begin() + commit() + } + // Mark the collection as ready after the first successful run + if (this.allCollectionsReady()) { + markReady() + } } + } finally { + this.isGraphRunning = false } - - this.isGraphRunning = false } private getSyncConfig(): SyncConfig { diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index b4030558e..319b8e09e 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -318,7 +318,11 @@ function sendChangesToInput( multiSetArray.push([[key, change.value], -1]) } } - input.sendData(new MultiSet(multiSetArray)) + + if (multiSetArray.length !== 0) { + input.sendData(new MultiSet(multiSetArray)) + } + return multiSetArray.length } From d59fe19f6566438f47bc0657aa8ee9fde919a4d1 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 22 Sep 2025 11:14:28 +0200 Subject: [PATCH 03/10] Debugging --- packages/db-ivm/src/operators/distinct.ts | 16 +++++++++++++++- packages/db/tests/query/distinct.test.ts | 10 ++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index dc2f5a177..887695f9d 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -32,14 +32,28 @@ export class DistinctOperator extends UnaryOperator { // Compute the new multiplicity for each value for (const message of this.inputMessages()) { for (const [value, diff] of message.getInner()) { + console.log(`value in distinct:`, JSON.stringify(value, null, 2)) + console.log(`diff in distinct:`, JSON.stringify(diff, null, 2)) const hashedValue = hash(this.#by(value)) + console.log(`by in distinct:`, JSON.stringify(this.#by(value), null, 2)) + console.log( + `hashedValue in distinct:`, + JSON.stringify(hashedValue, null, 2) + ) const oldMultiplicity = updatedValues.get(hashedValue)?.[0] ?? this.#values.get(hashedValue) ?? 0 + console.log( + `oldMultiplicity in distinct:`, + JSON.stringify(oldMultiplicity, null, 2) + ) const newMultiplicity = oldMultiplicity + diff - + console.log( + `newMultiplicity in distinct:`, + JSON.stringify(newMultiplicity, null, 2) + ) updatedValues.set(hashedValue, [newMultiplicity, value]) } } diff --git a/packages/db/tests/query/distinct.test.ts b/packages/db/tests/query/distinct.test.ts index 101444294..e3e57c2f1 100644 --- a/packages/db/tests/query/distinct.test.ts +++ b/packages/db/tests/query/distinct.test.ts @@ -675,6 +675,11 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { }) test(`distinct with join operations`, () => { + console.log(`**************************************************`) + console.log(`**************************************************`) + console.log(`**************************************************`) + console.log(`**************************************************`) + console.log(`**************************************************`) // Create a simple departments collection to join with const departmentsData = [ { id: `Engineering`, budget: 1000000 }, @@ -707,11 +712,12 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { .distinct(), }) + const results = Array.from(distinctJoinedData.values()) + // There are 3 distinct departments that have active users + console.log(`results:`, JSON.stringify(results, null, 2)) expect(distinctJoinedData.size).toBe(3) - const results = Array.from(distinctJoinedData.values()) - // Should have distinct combinations of department const combinations = results.map((r) => `${r.department}`) const uniqueCombinations = [...new Set(combinations)] From a06f7b699bc7cbbdd6b296f66406b235a72ce3d8 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 22 Sep 2025 13:04:19 +0100 Subject: [PATCH 04/10] fixes --- packages/db-ivm/src/operators/distinct.ts | 35 +++++++++-------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index 887695f9d..f1880858d 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -4,21 +4,25 @@ import { hash } from "../hashing/index.js" import { MultiSet } from "../multiset.js" import type { Hash } from "../hashing/index.js" import type { DifferenceStreamReader } from "../graph.js" -import type { IStreamBuilder } from "../types.js" +import type { IStreamBuilder, KeyValue } from "../types.js" type Multiplicity = number +type GetValue = T extends KeyValue ? V : never + /** * Operator that removes duplicates */ -export class DistinctOperator extends UnaryOperator { +export class DistinctOperator< + T extends KeyValue, +> extends UnaryOperator>> { #by: (value: T) => any #values: Map // keeps track of the number of times each value has been seen constructor( id: number, input: DifferenceStreamReader, - output: DifferenceStreamWriter, + output: DifferenceStreamWriter>>, by: (value: T) => any = (value: T) => value ) { super(id, input, output) @@ -32,33 +36,18 @@ export class DistinctOperator extends UnaryOperator { // Compute the new multiplicity for each value for (const message of this.inputMessages()) { for (const [value, diff] of message.getInner()) { - console.log(`value in distinct:`, JSON.stringify(value, null, 2)) - console.log(`diff in distinct:`, JSON.stringify(diff, null, 2)) const hashedValue = hash(this.#by(value)) - console.log(`by in distinct:`, JSON.stringify(this.#by(value), null, 2)) - console.log( - `hashedValue in distinct:`, - JSON.stringify(hashedValue, null, 2) - ) const oldMultiplicity = updatedValues.get(hashedValue)?.[0] ?? this.#values.get(hashedValue) ?? 0 - console.log( - `oldMultiplicity in distinct:`, - JSON.stringify(oldMultiplicity, null, 2) - ) const newMultiplicity = oldMultiplicity + diff - console.log( - `newMultiplicity in distinct:`, - JSON.stringify(newMultiplicity, null, 2) - ) updatedValues.set(hashedValue, [newMultiplicity, value]) } } - const result: Array<[T, number]> = [] + const result: Array<[KeyValue>, number]> = [] // Check which values became visible or disappeared for (const [ @@ -76,11 +65,11 @@ export class DistinctOperator extends UnaryOperator { if (oldMultiplicity <= 0 && newMultiplicity > 0) { // The value wasn't present in the stream // but with this change it is now present in the stream - result.push([value, 1]) + result.push([[hash(this.#by(value)), value[1]], 1]) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was present in the stream // but with this change it is no longer present in the stream - result.push([value, -1]) + result.push([[hash(this.#by(value)), value[1]], -1]) } } @@ -93,7 +82,9 @@ export class DistinctOperator extends UnaryOperator { /** * Removes duplicate values */ -export function distinct(by: (value: T) => any = (value: T) => value) { +export function distinct>( + by: (value: T) => any = (value: T) => value +) { return (stream: IStreamBuilder): IStreamBuilder => { const output = new StreamBuilder( stream.graph, From 8efeda424535b0e0ccad904d4928a2cd2f89195e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 22 Sep 2025 14:17:29 +0200 Subject: [PATCH 05/10] Remove debug logging --- packages/db/tests/query/distinct.test.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/db/tests/query/distinct.test.ts b/packages/db/tests/query/distinct.test.ts index e3e57c2f1..bbe7736e5 100644 --- a/packages/db/tests/query/distinct.test.ts +++ b/packages/db/tests/query/distinct.test.ts @@ -675,11 +675,6 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { }) test(`distinct with join operations`, () => { - console.log(`**************************************************`) - console.log(`**************************************************`) - console.log(`**************************************************`) - console.log(`**************************************************`) - console.log(`**************************************************`) // Create a simple departments collection to join with const departmentsData = [ { id: `Engineering`, budget: 1000000 }, @@ -715,7 +710,6 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { const results = Array.from(distinctJoinedData.values()) // There are 3 distinct departments that have active users - console.log(`results:`, JSON.stringify(results, null, 2)) expect(distinctJoinedData.size).toBe(3) // Should have distinct combinations of department From 2f20064cb5334c25947af3d645784dbdaccb595b Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 22 Sep 2025 14:21:40 +0200 Subject: [PATCH 06/10] Remove debug logging --- .../query/live/collection-config-builder.ts | 4 --- packages/db/tests/query/distinct.test.ts | 4 +-- packages/db/tests/query/order-by.test.ts | 26 +------------------ 3 files changed, 3 insertions(+), 31 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index de0fb13b2..8f67cb644 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -113,7 +113,6 @@ export class CollectionConfigBuilder< // no nested runs of the graph // which is possible if the `callback` // would call `maybeRunGraph` e.g. after it has loaded some more data - console.log(`IGNORING MAYBERUNGRAPH`) return } @@ -128,11 +127,8 @@ export class CollectionConfigBuilder< syncState.subscribedToAllCollections ) { while (syncState.graph.pendingWork()) { - console.log(`RUNNING GRAPH`) syncState.graph.run() - console.log(`Calling callback`) callback?.() - console.log(`returned from callback`) } // On the initial run, we may need to do an empty commit to ensure that diff --git a/packages/db/tests/query/distinct.test.ts b/packages/db/tests/query/distinct.test.ts index bbe7736e5..101444294 100644 --- a/packages/db/tests/query/distinct.test.ts +++ b/packages/db/tests/query/distinct.test.ts @@ -707,11 +707,11 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { .distinct(), }) - const results = Array.from(distinctJoinedData.values()) - // There are 3 distinct departments that have active users expect(distinctJoinedData.size).toBe(3) + const results = Array.from(distinctJoinedData.values()) + // Should have distinct combinations of department const combinations = results.map((r) => `${r.department}`) const uniqueCombinations = [...new Set(combinations)] diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index b7596d06d..4b336dc44 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -257,10 +257,8 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { let departmentsCollection: ReturnType beforeEach(() => { - console.log(`before each`) employeesCollection = createEmployeesCollection(autoIndex) departmentsCollection = createDepartmentsCollection(autoIndex) - console.log(`end of before each`) }) describe(`Basic OrderBy`, () => { @@ -407,7 +405,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies offset correctly with ordering`, async () => { - console.log(`in applies offset correctly with ordering`) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -428,7 +425,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies both limit and offset with ordering`, async () => { - console.log(`in applies both limit and offset with ordering`) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -450,7 +446,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`throws error when limit/offset used without orderBy`, () => { - console.log(`in throws error when limit/offset used without orderBy`) expect(() => { createLiveQueryCollection((q) => q @@ -467,9 +462,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental insert of a new row before the topK correctly`, async () => { - console.log( - `in applies incremental insert of a new row before the topK correctly` - ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -517,9 +509,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental insert of a new row inside the topK correctly`, async () => { - console.log( - `in applies incremental insert of a new row inside the topK correctly` - ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -564,13 +553,9 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { [6, 62_000], [2, 60_000], ]) - console.log(`end`) }) it(`applies incremental insert of a new row inside the topK but after max sent value correctly`, async () => { - console.log( - `in applies incremental insert of a new row inside the topK but after max sent value correctly` - ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -622,9 +607,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental insert of a new row after the topK correctly`, async () => { - console.log( - `in applies incremental insert of a new row after the topK correctly` - ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -668,9 +650,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental update of a row inside the topK correctly`, async () => { - console.log( - `in applies incremental update of a row inside the topK correctly` - ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -714,9 +693,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { }) it(`applies incremental delete of a row in the topK correctly`, async () => { - console.log( - `in applies incremental delete of a row in the topK correctly` - ) const collection = createLiveQueryCollection((q) => q .from({ employees: employeesCollection }) @@ -1871,6 +1847,6 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { } describe(`Query2 OrderBy Compiler`, () => { - // createOrderByTests(`off`) + createOrderByTests(`off`) createOrderByTests(`eager`) }) From 3abed6974bcf015233effd0cc35d55e0ee3be919 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 22 Sep 2025 15:55:19 +0200 Subject: [PATCH 07/10] Fix distinct tests --- packages/db-ivm/src/operators/distinct.ts | 5 ++++ .../db-ivm/tests/operators/distinct.test.ts | 25 ++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index f1880858d..c0bd127cf 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -36,6 +36,11 @@ export class DistinctOperator< // Compute the new multiplicity for each value for (const message of this.inputMessages()) { for (const [value, diff] of message.getInner()) { + console.log(`value: `, JSON.stringify(value, null, 2)) + console.log( + `this.#by(value): `, + JSON.stringify(this.#by(value), null, 2) + ) const hashedValue = hash(this.#by(value)) const oldMultiplicity = diff --git a/packages/db-ivm/tests/operators/distinct.test.ts b/packages/db-ivm/tests/operators/distinct.test.ts index 9a7645ded..aa865e25c 100644 --- a/packages/db-ivm/tests/operators/distinct.test.ts +++ b/packages/db-ivm/tests/operators/distinct.test.ts @@ -4,6 +4,7 @@ import { MultiSet } from "../../src/multiset.js" import { distinct } from "../../src/operators/distinct.js" import { output } from "../../src/operators/output.js" import { MessageTracker, assertResults } from "../test-utils.js" +import { hash } from "../../src/hashing/index.js" describe(`Operators`, () => { describe(`Efficient distinct operation`, () => { @@ -39,9 +40,9 @@ function testDistinct() { expect(data).toEqual([ [ - [[1, `a`], 1], - [[2, `b`], 1], - [[2, `c`], 1], + [[hash([1, `a`]), `a`], 1], + [[hash([2, `b`]), `b`], 1], + [[hash([2, `c`]), `c`], 1], ], ]) }) @@ -74,7 +75,7 @@ function testDistinct() { graph.run() - const data = messages.map((m) => m.getInner())[0] + const data = messages.map((m) => m.getInner())[0]! const countries = data .map(([[_, value], multiplicity]) => [value.country, multiplicity]) .sort() @@ -118,8 +119,8 @@ function testDistinct() { `distinct with updates - initial`, initialResult, [ - [1, `a`], - [1, `b`], + [hash([1, `a`]), `a`], + [hash([1, `b`]), `b`], ], // Should have both distinct values 4 // Max expected messages ) @@ -140,7 +141,7 @@ function testDistinct() { assertResults( `distinct with updates - second batch`, secondResult, - [[1, `c`]], // Should only have 'c' remaining + [[hash([1, `c`]), `c`]], // Should only have 'c' remaining 4 // Max expected messages ) @@ -186,9 +187,9 @@ function testDistinct() { expect(data).toEqual([ [ - [[`key1`, 1], 1], - [[`key1`, 2], 1], - [[`key2`, 1], 1], + [[hash([`key1`, 1]), 1], 1], + [[hash([`key1`, 2]), 2], 1], + [[hash([`key2`, 1]), 1], 1], ], ]) }) @@ -224,8 +225,8 @@ function testDistinct() { `distinct with multiple batches that cancel out`, result, [ - [`key1`, 1], // Should remain (multiplicity 2 -> 1 in distinct) - [`key2`, 1], // Should remain (multiplicity 2 -> 1 in distinct) + [hash([`key1`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct) + [hash([`key2`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct) ], 6 // Max expected messages (generous upper bound) ) From 9914bf56573c8819ffcb1258ee7d1515d4c63833 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 23 Sep 2025 08:27:01 +0200 Subject: [PATCH 08/10] Fix distinct test in ts/db --- packages/db/tests/query/distinct.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/db/tests/query/distinct.test.ts b/packages/db/tests/query/distinct.test.ts index 101444294..1f2b0414f 100644 --- a/packages/db/tests/query/distinct.test.ts +++ b/packages/db/tests/query/distinct.test.ts @@ -477,7 +477,7 @@ function createDistinctTests(autoIndex: `off` | `eager`): void { emptyCollection.utils.commit() expect(emptyDistinct.size).toBe(1) - const department = emptyDistinct.get(1) + const department = emptyDistinct.toArray[0] expect(department?.department).toBe(`Test`) }) From 58b8062b7a3e33a46385b0ff1b8289eea121c171 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 23 Sep 2025 15:27:12 +0200 Subject: [PATCH 09/10] Remove debugging console logs --- packages/db-ivm/src/operators/distinct.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index c0bd127cf..f1880858d 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -36,11 +36,6 @@ export class DistinctOperator< // Compute the new multiplicity for each value for (const message of this.inputMessages()) { for (const [value, diff] of message.getInner()) { - console.log(`value: `, JSON.stringify(value, null, 2)) - console.log( - `this.#by(value): `, - JSON.stringify(this.#by(value), null, 2) - ) const hashedValue = hash(this.#by(value)) const oldMultiplicity = From f04634ff87a928f6a06a90651d9eaa8fbf39e838 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 23 Sep 2025 15:29:16 +0200 Subject: [PATCH 10/10] Changesets --- .changeset/afraid-camels-tickle.md | 5 +++++ .changeset/fifty-ways-hang.md | 5 +++++ 2 files changed, 10 insertions(+) create mode 100644 .changeset/afraid-camels-tickle.md create mode 100644 .changeset/fifty-ways-hang.md diff --git a/.changeset/afraid-camels-tickle.md b/.changeset/afraid-camels-tickle.md new file mode 100644 index 000000000..9062592e7 --- /dev/null +++ b/.changeset/afraid-camels-tickle.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +optimise the live query graph execution by removing recursive calls to graph.run diff --git a/.changeset/fifty-ways-hang.md b/.changeset/fifty-ways-hang.md new file mode 100644 index 000000000..57a99d4e5 --- /dev/null +++ b/.changeset/fifty-ways-hang.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Fix a bug with distinct operator