Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-camels-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

optimise the live query graph execution by removing recursive calls to graph.run
5 changes: 5 additions & 0 deletions .changeset/fifty-ways-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db-ivm": patch
---

Fix a bug with distinct operator
10 changes: 10 additions & 0 deletions .changeset/plain-lights-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@tanstack/db": minor
"@tanstack/angular-db": patch
"@tanstack/svelte-db": patch
"@tanstack/react-db": patch
"@tanstack/solid-db": patch
"@tanstack/vue-db": patch
---

Let collection.subscribeChanges return a subscription object. Move all data loading code related to optimizations into that subscription object.
3 changes: 2 additions & 1 deletion packages/angular-db/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ export function injectLiveQuery(opts: any) {
}

// Subscribe to changes
unsub = currentCollection.subscribeChanges(
const subscription = currentCollection.subscribeChanges(
(_: Array<ChangeMessage<any>>) => {
syncDataFromCollection(currentCollection)
}
)
unsub = subscription.unsubscribe.bind(subscription)

// Handle ready state
currentCollection.onFirstReady(() => {
Expand Down
4 changes: 3 additions & 1 deletion packages/angular-db/tests/inject-live-query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ function createMockCollection<T extends object, K extends string | number>(
size: () => map.size,
subscribeChanges: (cb: (changes: Array<any>) => void) => {
subs.add(cb)
return () => subs.delete(cb)
return {
unsubscribe: () => subs.delete(cb),
}
},
onFirstReady: (cb: () => void) => {
if (status === `ready`) {
Expand Down
21 changes: 13 additions & 8 deletions packages/db-ivm/src/operators/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@ import { hash } from "../hashing/index.js"
import { MultiSet } from "../multiset.js"
import type { Hash } from "../hashing/index.js"
import type { DifferenceStreamReader } from "../graph.js"
import type { IStreamBuilder } from "../types.js"
import type { IStreamBuilder, KeyValue } from "../types.js"

type Multiplicity = number

type GetValue<T> = T extends KeyValue<any, infer V> ? V : never

/**
* Operator that removes duplicates
*/
export class DistinctOperator<T> extends UnaryOperator<T> {
export class DistinctOperator<
T extends KeyValue<any, any>,
> extends UnaryOperator<T, KeyValue<number, GetValue<T>>> {
#by: (value: T) => any
#values: Map<Hash, Multiplicity> // keeps track of the number of times each value has been seen

constructor(
id: number,
input: DifferenceStreamReader<T>,
output: DifferenceStreamWriter<T>,
output: DifferenceStreamWriter<KeyValue<number, GetValue<T>>>,
by: (value: T) => any = (value: T) => value
) {
super(id, input, output)
Expand All @@ -39,12 +43,11 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
this.#values.get(hashedValue) ??
0
const newMultiplicity = oldMultiplicity + diff

updatedValues.set(hashedValue, [newMultiplicity, value])
}
}

const result: Array<[T, number]> = []
const result: Array<[KeyValue<number, GetValue<T>>, number]> = []

// Check which values became visible or disappeared
for (const [
Expand All @@ -62,11 +65,11 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
if (oldMultiplicity <= 0 && newMultiplicity > 0) {
// The value wasn't present in the stream
// but with this change it is now present in the stream
result.push([value, 1])
result.push([[hash(this.#by(value)), value[1]], 1])
} else if (oldMultiplicity > 0 && newMultiplicity <= 0) {
// The value was present in the stream
// but with this change it is no longer present in the stream
result.push([value, -1])
result.push([[hash(this.#by(value)), value[1]], -1])
}
}

Expand All @@ -79,7 +82,9 @@ export class DistinctOperator<T> extends UnaryOperator<T> {
/**
* Removes duplicate values
*/
export function distinct<T>(by: (value: T) => any = (value: T) => value) {
export function distinct<T extends KeyValue<any, any>>(
by: (value: T) => any = (value: T) => value
) {
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
const output = new StreamBuilder<T>(
stream.graph,
Expand Down
25 changes: 13 additions & 12 deletions packages/db-ivm/tests/operators/distinct.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { MultiSet } from "../../src/multiset.js"
import { distinct } from "../../src/operators/distinct.js"
import { output } from "../../src/operators/output.js"
import { MessageTracker, assertResults } from "../test-utils.js"
import { hash } from "../../src/hashing/index.js"

describe(`Operators`, () => {
describe(`Efficient distinct operation`, () => {
Expand Down Expand Up @@ -39,9 +40,9 @@ function testDistinct() {

expect(data).toEqual([
[
[[1, `a`], 1],
[[2, `b`], 1],
[[2, `c`], 1],
[[hash([1, `a`]), `a`], 1],
[[hash([2, `b`]), `b`], 1],
[[hash([2, `c`]), `c`], 1],
],
])
})
Expand Down Expand Up @@ -74,7 +75,7 @@ function testDistinct() {

graph.run()

const data = messages.map((m) => m.getInner())[0]
const data = messages.map((m) => m.getInner())[0]!
const countries = data
.map(([[_, value], multiplicity]) => [value.country, multiplicity])
.sort()
Expand Down Expand Up @@ -118,8 +119,8 @@ function testDistinct() {
`distinct with updates - initial`,
initialResult,
[
[1, `a`],
[1, `b`],
[hash([1, `a`]), `a`],
[hash([1, `b`]), `b`],
], // Should have both distinct values
4 // Max expected messages
)
Expand All @@ -140,7 +141,7 @@ function testDistinct() {
assertResults(
`distinct with updates - second batch`,
secondResult,
[[1, `c`]], // Should only have 'c' remaining
[[hash([1, `c`]), `c`]], // Should only have 'c' remaining
4 // Max expected messages
)

Expand Down Expand Up @@ -186,9 +187,9 @@ function testDistinct() {

expect(data).toEqual([
[
[[`key1`, 1], 1],
[[`key1`, 2], 1],
[[`key2`, 1], 1],
[[hash([`key1`, 1]), 1], 1],
[[hash([`key1`, 2]), 2], 1],
[[hash([`key2`, 1]), 1], 1],
],
])
})
Expand Down Expand Up @@ -224,8 +225,8 @@ function testDistinct() {
`distinct with multiple batches that cancel out`,
result,
[
[`key1`, 1], // Should remain (multiplicity 2 -> 1 in distinct)
[`key2`, 1], // Should remain (multiplicity 2 -> 1 in distinct)
[hash([`key1`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct)
[hash([`key2`, 1]), 1], // Should remain (multiplicity 2 -> 1 in distinct)
],
6 // Max expected messages (generous upper bound)
)
Expand Down
50 changes: 18 additions & 32 deletions packages/db/src/change-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export function currentStateAsChanges<
TKey extends string | number,
>(
collection: CollectionLike<T, TKey>,
options: CurrentStateAsChangesOptions<T> = {}
): Array<ChangeMessage<T>> {
options: CurrentStateAsChangesOptions = {}
): Array<ChangeMessage<T>> | void {
// Helper function to collect filtered results
const collectFilteredResults = (
filterFn?: (value: T) => boolean
Expand All @@ -66,31 +66,17 @@ export function currentStateAsChanges<
return result
}

if (!options.where && !options.whereExpression) {
// TODO: handle orderBy and limit options
// by calling optimizeOrderedLimit

if (!options.where) {
// No filtering, return all items
return collectFilteredResults()
}

// There's a where clause, let's see if we can use an index
try {
let expression: BasicExpression<boolean>

if (options.whereExpression) {
// Use the pre-compiled expression directly
expression = options.whereExpression
} else if (options.where) {
// Create the single-row refProxy for the callback
const singleRowRefProxy = createSingleRowRefProxy<T>()

// Execute the callback to get the expression
const whereExpression = options.where(singleRowRefProxy)

// Convert the result to a BasicExpression
expression = toExpression(whereExpression)
} else {
// This should never happen due to the check above, but TypeScript needs it
return []
}
const expression: BasicExpression<boolean> = options.where

// Try to optimize the query using indexes
const optimizationResult = optimizeExpressionWithIndexes(
Expand All @@ -113,11 +99,11 @@ export function currentStateAsChanges<
}
return result
} else {
// No index found or complex expression, fall back to full scan with filter
const filterFn = options.where
? createFilterFunction(options.where)
: createFilterFunctionFromExpression(expression)
if (options.optimizedOnly) {
return
}

const filterFn = createFilterFunctionFromExpression(expression)
return collectFilteredResults(filterFn)
}
} catch (error) {
Expand All @@ -127,9 +113,11 @@ export function currentStateAsChanges<
error
)

const filterFn = options.where
? createFilterFunction(options.where)
: createFilterFunctionFromExpression(options.whereExpression!)
const filterFn = createFilterFunctionFromExpression(options.where)

if (options.optimizedOnly) {
return
}

return collectFilteredResults(filterFn)
}
Expand Down Expand Up @@ -201,11 +189,9 @@ export function createFilterFunctionFromExpression<T extends object>(
*/
export function createFilteredCallback<T extends object>(
originalCallback: (changes: Array<ChangeMessage<T>>) => void,
options: SubscribeChangesOptions<T>
options: SubscribeChangesOptions
): (changes: Array<ChangeMessage<T>>) => void {
const filterFn = options.whereExpression
? createFilterFunctionFromExpression(options.whereExpression)
: createFilterFunction(options.where!)
const filterFn = createFilterFunctionFromExpression(options.whereExpression!)

return (changes: Array<ChangeMessage<T>>) => {
const filteredChanges: Array<ChangeMessage<T>> = []
Expand Down
Loading
Loading