Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-camels-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

optimise the live query graph execution by removing recursive calls to graph.run
5 changes: 5 additions & 0 deletions .changeset/fifty-ways-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db-ivm": patch
---

Fix a bug with distinct operator
21 changes: 13 additions & 8 deletions packages/db-ivm/src/operators/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@ import { hash } from "../hashing/index.js"
import { MultiSet } from "../multiset.js"
import type { Hash } from "../hashing/index.js"
import type { DifferenceStreamReader } from "../graph.js"
import type { IStreamBuilder } from "../types.js"
import type { IStreamBuilder, KeyValue } from "../types.js"

type Multiplicity = number

type GetValue<T> = T extends KeyValue<any, infer V> ? V : never

/**
* Operator that removes duplicates
*/
export class DistinctOperator<T> extends UnaryOperator<T> {
export class DistinctOperator<
T extends KeyValue<any, any>,
> extends UnaryOperator<T, KeyValue<number, GetValue<T>>> {
#by: (value: T) => any
#values: Map<Hash, Multiplicity> // keeps track of the number of times each value has been seen

constructor(
id: number,
input: DifferenceStreamReader<T>,
output: DifferenceStreamWriter<T>,
output: DifferenceStreamWriter<KeyValue<number, GetValue<T>>>,
by: (value: T) => any = (value: T) => value
) {
super(id, input, output)
Expand All @@ -39,12 +43,11 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
this.#values.get(hashedValue) ??
0
const newMultiplicity = oldMultiplicity + diff

updatedValues.set(hashedValue, [newMultiplicity, value])
}
}

const result: Array<[T, number]> = []
const result: Array<[KeyValue<number, GetValue<T>>, number]> = []

// Check which values became visible or disappeared
for (const [
Expand All @@ -62,11 +65,11 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
if (oldMultiplicity <= 0 && newMultiplicity > 0) {
// The value wasn't present in the stream
// but with this change it is now present in the stream
result.push([value, 1])
result.push([[hash(this.#by(value)), value[1]], 1])
} else if (oldMultiplicity > 0 && newMultiplicity <= 0) {
// The value was present in the stream
// but with this change it is no longer present in the stream
result.push([value, -1])
result.push([[hash(this.#by(value)), value[1]], -1])
}
}

Expand All @@ -79,7 +82,9 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
/**
* Removes duplicate values
*/
export function distinct<T>(by: (value: T) => any = (value: T) => value) {
export function distinct<T extends KeyValue<any, any>>(
by: (value: T) => any = (value: T) => value
) {
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
const output = new StreamBuilder<T>(
stream.graph,
Expand Down
25 changes: 13 additions & 12 deletions packages/db-ivm/tests/operators/distinct.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { MultiSet } from "../../src/multiset.js"
import { distinct } from "../../src/operators/distinct.js"
import { output } from "../../src/operators/output.js"
import { MessageTracker, assertResults } from "../test-utils.js"
import { hash } from "../../src/hashing/index.js"

describe(`Operators`, () => {
describe(`Efficient distinct operation`, () => {
Expand Down Expand Up @@ -39,9 +40,9 @@ function testDistinct() {

expect(data).toEqual([
[
[[1, `a`], 1],
[[2, `b`], 1],
[[2, `c`], 1],
[[hash([1, `a`]), `a`], 1],
[[hash([2, `b`]), `b`], 1],
[[hash([2, `c`]), `c`], 1],
],
])
})
Expand Down Expand Up @@ -74,7 +75,7 @@ function testDistinct() {

graph.run()

const data = messages.map((m) => m.getInner())[0]
const data = messages.map((m) => m.getInner())[0]!
const countries = data
.map(([[_, value], multiplicity]) => [value.country, multiplicity])
.sort()
Expand Down Expand Up @@ -118,8 +119,8 @@ function testDistinct() {
`distinct with updates - initial`,
initialResult,
[
[1, `a`],
[1, `b`],
[hash([1, `a`]), `a`],
[hash([1, `b`]), `b`],
], // Should have both distinct values
4 // Max expected messages
)
Expand All @@ -140,7 +141,7 @@ function testDistinct() {
assertResults(
`distinct with updates - second batch`,
secondResult,
[[1, `c`]], // Should only have 'c' remaining
[[hash([1, `c`]), `c`]], // Should only have 'c' remaining
4 // Max expected messages
)

Expand Down Expand Up @@ -186,9 +187,9 @@ function testDistinct() {

expect(data).toEqual([
[
[[`key1`, 1], 1],
[[`key1`, 2], 1],
[[`key2`, 1], 1],
[[hash([`key1`, 1]), 1], 1],
[[hash([`key1`, 2]), 2], 1],
[[hash([`key2`, 1]), 1], 1],
],
])
})
Expand Down Expand Up @@ -224,8 +225,8 @@ function testDistinct() {
`distinct with multiple batches that cancel out`,
result,
[
[`key1`, 1], // Should remain (multiplicity 2 -> 1 in distinct)
[`key2`, 1], // Should remain (multiplicity 2 -> 1 in distinct)
[hash([`key1`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct)
[hash([`key2`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct)
],
6 // Max expected messages (generous upper bound)
)
Expand Down
52 changes: 35 additions & 17 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export class CollectionConfigBuilder<

private readonly compare?: (val1: TResult, val2: TResult) => number

private isGraphRunning = false

private graphCache: D2 | undefined
private inputsCache: Record<string, RootStreamBuilder<unknown>> | undefined
private pipelineCache: ResultStream | undefined
Expand Down Expand Up @@ -107,25 +109,41 @@ export class CollectionConfigBuilder<
syncState: FullSyncState,
callback?: () => boolean
) {
const { begin, commit, markReady } = config
if (this.isGraphRunning) {
// no nested runs of the graph
// which is possible if the `callback`
// would call `maybeRunGraph` e.g. after it has loaded some more data
return
}

// We only run the graph if all the collections are ready
if (
this.allCollectionsReadyOrInitialCommit() &&
syncState.subscribedToAllCollections
) {
syncState.graph.run()
const ready = callback?.() ?? true
// On the initial run, we may need to do an empty commit to ensure that
// the collection is initialized
if (syncState.messagesCount === 0) {
begin()
commit()
}
// Mark the collection as ready after the first successful run
if (ready && this.allCollectionsReady()) {
markReady()
this.isGraphRunning = true

try {
const { begin, commit, markReady } = config

// We only run the graph if all the collections are ready
if (
this.allCollectionsReadyOrInitialCommit() &&
syncState.subscribedToAllCollections
) {
while (syncState.graph.pendingWork()) {
syncState.graph.run()
callback?.()
}

// On the initial run, we may need to do an empty commit to ensure that
// the collection is initialized
if (syncState.messagesCount === 0) {
begin()
commit()
}
// Mark the collection as ready after the first successful run
if (this.allCollectionsReady()) {
markReady()
}
}
} finally {
this.isGraphRunning = false
}
}

Expand Down
6 changes: 5 additions & 1 deletion packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,11 @@ function sendChangesToInput(
multiSetArray.push([[key, change.value], -1])
}
}
input.sendData(new MultiSet(multiSetArray))

if (multiSetArray.length !== 0) {
input.sendData(new MultiSet(multiSetArray))
}

return multiSetArray.length
}

Expand Down
2 changes: 1 addition & 1 deletion packages/db/tests/query/distinct.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ function createDistinctTests(autoIndex: `off` | `eager`): void {
emptyCollection.utils.commit()

expect(emptyDistinct.size).toBe(1)
const department = emptyDistinct.get(1)
const department = emptyDistinct.toArray[0]
expect(department?.department).toBe(`Test`)
})

Expand Down
Loading