Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-swans-heal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

fix a race condition that could result in the initial state of a joined collection being sent to the live query pipeline twice, this would result in incorrect join results.
22 changes: 21 additions & 1 deletion packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export class CollectionSubscriber<
// filter out deletes for keys that have not been sent
continue
}
this.sentKeys.add(change.key)
Copy link
Collaborator Author

@samwillis samwillis Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were only adding keys to the sentKeys when they were requested, not when they were send due to just watching the live changes.

}
newChanges.push(newChange)
}
Expand Down Expand Up @@ -153,12 +154,29 @@ export class CollectionSubscriber<
private subscribeToMatchingChanges(
whereExpression: BasicExpression<boolean> | undefined
) {
// Flag to indicate we have send to whole initial state of the collection
// to the pipeline, this is set when there are no indexes that can be used
// to filter the changes and so the whole state was requested from the collection
let loadedInitialState = false

// Flag to indicate that we have started sending changes to the pipeline.
// This is set to true by either the first call to `loadKeys` or when the
// query requests the whole initial state in `loadInitialState`.
// Until that point we filter out all changes from subscription to the collection.
let sendChanges = false

const sendVisibleChanges = (
changes: Array<ChangeMessage<any, string | number>>
) => {
this.sendVisibleChangesToPipeline(changes, loadedInitialState)
// We filter out changes when sendChanges is false to ensure that we don't send
// any changes from the live subscription until the join operator requests either
// the initial state or its first key. This is needed otherwise it could receive
// changes which are then later subsumed by the initial state (and that would
// lead to weird bugs due to the data being received twice).
this.sendVisibleChangesToPipeline(
sendChanges ? changes : [],
loadedInitialState
)
}

const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, {
Expand All @@ -171,6 +189,7 @@ export class CollectionSubscriber<
? createFilterFunctionFromExpression(whereExpression)
: () => true
const loadKs = (keys: Set<string | number>) => {
sendChanges = true
return this.loadKeys(keys, filterFn)
}

Expand All @@ -183,6 +202,7 @@ export class CollectionSubscriber<
// Make sure we only load the initial state once
if (loadedInitialState) return
loadedInitialState = true
sendChanges = true

const changes = this.collection.currentStateAsChanges({
whereExpression,
Expand Down
93 changes: 92 additions & 1 deletion packages/db/tests/query/live-query-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { beforeEach, describe, expect, it } from "vitest"
import { createCollection } from "../../src/collection.js"
import { createLiveQueryCollection, eq } from "../../src/query/index.js"
import { Query } from "../../src/query/builder/index.js"
import { mockSyncCollectionOptions } from "../utls.js"
import {
mockSyncCollectionOptions,
mockSyncCollectionOptionsNoInitialState,
} from "../utls.js"
import type { ChangeMessage } from "../../src/types.js"

// Sample user type for tests
Expand Down Expand Up @@ -313,4 +316,92 @@ describe(`createLiveQueryCollection`, () => {
// Resubscribe should not throw (would throw "Graph already finalized" without the fix)
expect(() => liveQuery.subscribeChanges(() => {})).not.toThrow()
})

for (const autoIndex of [`eager`, `off`] as const) {
it(`should not send the initial state twice on joins with autoIndex: ${autoIndex}`, async () => {
type Player = { id: number; name: string }
type Challenge = { id: number; value: number }

const playerCollection = createCollection(
mockSyncCollectionOptionsNoInitialState<Player>({
id: `player`,
getKey: (post) => post.id,
autoIndex,
})
)

const challenge1Collection = createCollection(
mockSyncCollectionOptionsNoInitialState<Challenge>({
id: `challenge1`,
getKey: (post) => post.id,
autoIndex,
})
)

const challenge2Collection = createCollection(
mockSyncCollectionOptionsNoInitialState<Challenge>({
id: `challenge2`,
getKey: (post) => post.id,
autoIndex,
})
)

const liveQuery = createLiveQueryCollection((q) =>
q
.from({ player: playerCollection })
.leftJoin(
{ challenge1: challenge1Collection },
({ player, challenge1 }) => eq(player.id, challenge1.id)
)
.leftJoin(
{ challenge2: challenge2Collection },
({ player, challenge2 }) => eq(player.id, challenge2.id)
)
)

// Start the query, but don't wait it, we are doing to write the data to the
// source collections while the query is loading the initial state
const preloadPromise = liveQuery.preload()

// Write player
playerCollection.utils.begin()
playerCollection.utils.write({
type: `insert`,
value: { id: 1, name: `Alice` },
})
playerCollection.utils.commit()
playerCollection.utils.markReady()

// Write challenge1
challenge1Collection.utils.begin()
challenge1Collection.utils.write({
type: `insert`,
value: { id: 1, value: 100 },
})
challenge1Collection.utils.commit()
challenge1Collection.utils.markReady()

// Write challenge2
challenge2Collection.utils.begin()
challenge2Collection.utils.write({
type: `insert`,
value: { id: 1, value: 200 },
})
challenge2Collection.utils.commit()
challenge2Collection.utils.markReady()

await preloadPromise

// With a failed test the results show more than 1 item
// It returns both an unjoined player with no joined challenges, and a joined
// player with the challenges
const results = liveQuery.toArray
expect(results.length).toBe(1)

const result = results[0]!
expect(result.player.name).toBe(`Alice`)
expect(result.challenge1?.value).toBe(100)
expect(result.challenge2?.value).toBe(200)
})
}
})
91 changes: 84 additions & 7 deletions packages/db/tests/utls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ import type {
SyncConfig,
} from "../src/index.js"

type MockSyncCollectionConfig<T> = {
id: string
initialData: Array<T>
getKey: (item: T) => string | number
autoIndex?: `off` | `eager`
}

// Index usage tracking utilities
export interface IndexUsageStats {
rangeQueryCalls: number
Expand Down Expand Up @@ -179,6 +172,13 @@ export function withIndexTracking(
}
}

type MockSyncCollectionConfig<T> = {
id: string
initialData: Array<T>
getKey: (item: T) => string | number
autoIndex?: `off` | `eager`
}

export function mockSyncCollectionOptions<
T extends object = Record<string, unknown>,
>(config: MockSyncCollectionConfig<T>) {
Expand Down Expand Up @@ -257,3 +257,80 @@ export function mockSyncCollectionOptions<

return options
}

type MockSyncCollectionConfigNoInitialState<T> = {
id: string
getKey: (item: T) => string | number
autoIndex?: `off` | `eager`
}

export function mockSyncCollectionOptionsNoInitialState<
T extends object = Record<string, unknown>,
>(config: MockSyncCollectionConfigNoInitialState<T>) {
let begin: () => void
let write: Parameters<SyncConfig<T>[`sync`]>[0][`write`]
let commit: () => void
let markReady: () => void

let syncPendingPromise: Promise<void> | undefined
let syncPendingResolve: (() => void) | undefined
let syncPendingReject: ((error: Error) => void) | undefined

const awaitSync = async () => {
if (syncPendingPromise) {
return syncPendingPromise
}
syncPendingPromise = new Promise((resolve, reject) => {
syncPendingResolve = resolve
syncPendingReject = reject
})
syncPendingPromise.then(() => {
syncPendingPromise = undefined
syncPendingResolve = undefined
syncPendingReject = undefined
})
return syncPendingPromise
}

const utils = {
begin: () => begin!(),
write: ((value) => write!(value)) as typeof write,
commit: () => commit!(),
markReady: () => markReady!(),
resolveSync: () => {
syncPendingResolve!()
},
rejectSync: (error: Error) => {
syncPendingReject!(error)
},
}

const options: CollectionConfig<T> & { utils: typeof utils } = {
sync: {
sync: (params: Parameters<SyncConfig<T>[`sync`]>[0]) => {
begin = params.begin
write = params.write
commit = params.commit
markReady = params.markReady
},
},
startSync: false,
onInsert: async (_params: MutationFnParams<T>) => {
// TODO
await awaitSync()
},
onUpdate: async (_params: MutationFnParams<T>) => {
// TODO
await awaitSync()
},
onDelete: async (_params: MutationFnParams<T>) => {
// TODO
await awaitSync()
},
utils,
...config,
autoIndex: config.autoIndex,
}

return options
}