From 4e61d64c8bad4f3627d4ec5464d37da18ba03e0a Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 26 Aug 2025 10:23:07 +0100 Subject: [PATCH 1/4] add failing test --- .../tests/query/live-query-collection.test.ts | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 7fb59dc3a..e59854329 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -313,4 +313,143 @@ describe(`createLiveQueryCollection`, () => { // Resubscribe should not throw (would throw "Graph already finalized" without the fix) expect(() => liveQuery.subscribeChanges(() => {})).not.toThrow() }) + + for (const autoIndex of [`eager`, `off`] as const) { + it(`should not send the initial state twice on joins with autoIndex: ${autoIndex}`, async () => { + type Player = { id: number; name: string } + type Challenge = { id: number; value: number } + + let playerBeginCallback: (() => void) | undefined + let playerWriteCallback: + | (( + message: Omit, `key`> + ) => void) + | undefined + let playerCommitCallback: (() => void) | undefined + let playerMarkReadyCallback: (() => void) | undefined + const playerCollection = createCollection({ + id: `player`, + getKey: (post) => post.id, + startSync: false, + autoIndex, + sync: { + sync: ({ begin, commit, write, markReady }) => { + playerBeginCallback = begin + playerCommitCallback = commit + playerMarkReadyCallback = markReady + playerWriteCallback = write + return () => {} + }, + }, + onInsert: async () => {}, // Add empty handler to allow direct inserts + }) + + let challenge1BeginCallback: (() => void) | undefined + let challenge1WriteCallback: + | (( + message: Omit, `key`> + ) => void) + | undefined + let challenge1CommitCallback: (() => void) | undefined + let challenge1MarkReadyCallback: (() => void) | undefined + const challenge1Collection = createCollection({ + id: `challenge1`, + getKey: (post) => post.id, + startSync: false, + autoIndex, + sync: { + sync: ({ begin, commit, write, markReady }) => { + challenge1BeginCallback = begin + challenge1CommitCallback = commit + challenge1MarkReadyCallback = markReady + challenge1WriteCallback = write + return () => {} + }, + }, + onInsert: async () => {}, // Add empty handler to allow direct inserts + }) + + let challenge2BeginCallback: (() => void) | undefined + let challenge2WriteCallback: + | (( + message: Omit, `key`> + ) => void) + | undefined + let challenge2CommitCallback: (() => void) | undefined + let challenge2MarkReadyCallback: (() => void) | undefined + const challenge2Collection = createCollection({ + id: `challenge2`, + getKey: (post) => post.id, + startSync: false, + autoIndex, + sync: { + sync: ({ begin, commit, write, markReady }) => { + challenge2BeginCallback = begin + challenge2CommitCallback = commit + challenge2MarkReadyCallback = markReady + challenge2WriteCallback = write + return () => {} + }, + }, + onInsert: async () => {}, // Add empty handler to allow direct inserts + }) + + const liveQuery = createLiveQueryCollection((q) => + q + .from({ player: playerCollection }) + .leftJoin( + { challenge1: challenge1Collection }, + ({ player, challenge1 }) => eq(player.id, challenge1.id) + ) + .leftJoin( + { challenge2: challenge2Collection }, + ({ player, challenge2 }) => eq(player.id, challenge2.id) + ) + ) + + // Start the query, but don't wait it, we are doing to write the data to the + // source collections while the query is loading the initial state + const preloadPromise = liveQuery.preload() + + // Write player + playerBeginCallback!() + playerWriteCallback!({ + type: `insert`, + value: { id: 1, name: `Alice` }, + }) + playerCommitCallback!() + playerMarkReadyCallback!() + + // Write challenge1 + challenge1BeginCallback!() + challenge1WriteCallback!({ + type: `insert`, + value: { id: 1, value: 100 }, + }) + challenge1CommitCallback!() + challenge1MarkReadyCallback!() + + // Write challenge2 + challenge2BeginCallback!() + challenge2WriteCallback!({ + type: `insert`, + value: { id: 1, value: 200 }, + }) + challenge2CommitCallback!() + challenge2MarkReadyCallback!() + + await preloadPromise + + // With a failed test the results show more than 1 item + // It returns both an unjoined player with no joined challenges, and a joined + // player with the challenges + const results = liveQuery.toArray + expect(results.length).toBe(1) + + const result = results[0]! + expect(result.player.name).toBe(`Alice`) + expect(result.challenge1?.value).toBe(100) + expect(result.challenge2?.value).toBe(200) + }) + } }) From 14b39191c9439ad3d4f14ffb359621bd60f2fc84 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 26 Aug 2025 10:30:13 +0100 Subject: [PATCH 2/4] fix the bug --- .../src/query/live/collection-subscriber.ts | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 07fe577ff..a71988a1a 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -113,6 +113,7 @@ export class CollectionSubscriber< // filter out deletes for keys that have not been sent continue } + this.sentKeys.add(change.key) } newChanges.push(newChange) } @@ -153,12 +154,27 @@ export class CollectionSubscriber< private subscribeToMatchingChanges( whereExpression: BasicExpression | undefined ) { + // Flag to indicate we have send to whole initial state of the collection + // to the pipeline, this is set when there are no indexes that can be used + // to filter the changes and so the whole state was requested from the collection let loadedInitialState = false + // Flag to indicate that we have started sending changes to the pipeline. + // This is set to true by either the first call to `loadKeys` or when the + // query requests the whole initial state in `loadInitialState`. + // Until that point we filter out all changes from subscription to the collection. + let sendChanges = false + const sendVisibleChanges = ( changes: Array> ) => { - this.sendVisibleChangesToPipeline(changes, loadedInitialState) + // We are filtering the changes out when `sendChanges` is false, but still sending + // an empty array to the pipeline. This is needed to ensure that the pipeline + // receives the status update that the collection is now ready. + this.sendVisibleChangesToPipeline( + sendChanges ? changes : [], + loadedInitialState + ) } const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { @@ -171,6 +187,7 @@ export class CollectionSubscriber< ? createFilterFunctionFromExpression(whereExpression) : () => true const loadKs = (keys: Set) => { + sendChanges = true return this.loadKeys(keys, filterFn) } @@ -183,6 +200,7 @@ export class CollectionSubscriber< // Make sure we only load the initial state once if (loadedInitialState) return loadedInitialState = true + sendChanges = true const changes = this.collection.currentStateAsChanges({ whereExpression, From 4e45b8359a9e4150e97be68be1fd6950204086ae Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 26 Aug 2025 10:31:42 +0100 Subject: [PATCH 3/4] changeset --- .changeset/new-swans-heal.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/new-swans-heal.md diff --git a/.changeset/new-swans-heal.md b/.changeset/new-swans-heal.md new file mode 100644 index 000000000..1778913a8 --- /dev/null +++ b/.changeset/new-swans-heal.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +fix a race condition that could result in the initial state of a joined collection being sent to the live query pipeline twice, this would result in incorrect join results. From 30f29e4bae777e4cd34bf066bb009eda7973a878 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 26 Aug 2025 11:49:51 +0100 Subject: [PATCH 4/4] address review --- .../src/query/live/collection-subscriber.ts | 8 +- .../tests/query/live-query-collection.test.ts | 122 ++++++------------ packages/db/tests/utls.ts | 91 ++++++++++++- 3 files changed, 126 insertions(+), 95 deletions(-) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index a71988a1a..03dfb63f1 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -168,9 +168,11 @@ export class CollectionSubscriber< const sendVisibleChanges = ( changes: Array> ) => { - // We are filtering the changes out when `sendChanges` is false, but still sending - // an empty array to the pipeline. This is needed to ensure that the pipeline - // receives the status update that the collection is now ready. + // We filter out changes when sendChanges is false to ensure that we don't send + // any changes from the live subscription until the join operator requests either + // the initial state or its first key. This is needed otherwise it could receive + // changes which are then later subsumed by the initial state (and that would + // lead to weird bugs due to the data being received twice). this.sendVisibleChangesToPipeline( sendChanges ? changes : [], loadedInitialState diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index e59854329..2570c0812 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -2,7 +2,10 @@ import { beforeEach, describe, expect, it } from "vitest" import { createCollection } from "../../src/collection.js" import { createLiveQueryCollection, eq } from "../../src/query/index.js" import { Query } from "../../src/query/builder/index.js" -import { mockSyncCollectionOptions } from "../utls.js" +import { + mockSyncCollectionOptions, + mockSyncCollectionOptionsNoInitialState, +} from "../utls.js" import type { ChangeMessage } from "../../src/types.js" // Sample user type for tests @@ -319,80 +322,29 @@ describe(`createLiveQueryCollection`, () => { type Player = { id: number; name: string } type Challenge = { id: number; value: number } - let playerBeginCallback: (() => void) | undefined - let playerWriteCallback: - | (( - message: Omit, `key`> - ) => void) - | undefined - let playerCommitCallback: (() => void) | undefined - let playerMarkReadyCallback: (() => void) | undefined - const playerCollection = createCollection({ - id: `player`, - getKey: (post) => post.id, - startSync: false, - autoIndex, - sync: { - sync: ({ begin, commit, write, markReady }) => { - playerBeginCallback = begin - playerCommitCallback = commit - playerMarkReadyCallback = markReady - playerWriteCallback = write - return () => {} - }, - }, - onInsert: async () => {}, // Add empty handler to allow direct inserts - }) + const playerCollection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `player`, + getKey: (post) => post.id, + autoIndex, + }) + ) - let challenge1BeginCallback: (() => void) | undefined - let challenge1WriteCallback: - | (( - message: Omit, `key`> - ) => void) - | undefined - let challenge1CommitCallback: (() => void) | undefined - let challenge1MarkReadyCallback: (() => void) | undefined - const challenge1Collection = createCollection({ - id: `challenge1`, - getKey: (post) => post.id, - startSync: false, - autoIndex, - sync: { - sync: ({ begin, commit, write, markReady }) => { - challenge1BeginCallback = begin - challenge1CommitCallback = commit - challenge1MarkReadyCallback = markReady - challenge1WriteCallback = write - return () => {} - }, - }, - onInsert: async () => {}, // Add empty handler to allow direct inserts - }) + const challenge1Collection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `challenge1`, + getKey: (post) => post.id, + autoIndex, + }) + ) - let challenge2BeginCallback: (() => void) | undefined - let challenge2WriteCallback: - | (( - message: Omit, `key`> - ) => void) - | undefined - let challenge2CommitCallback: (() => void) | undefined - let challenge2MarkReadyCallback: (() => void) | undefined - const challenge2Collection = createCollection({ - id: `challenge2`, - getKey: (post) => post.id, - startSync: false, - autoIndex, - sync: { - sync: ({ begin, commit, write, markReady }) => { - challenge2BeginCallback = begin - challenge2CommitCallback = commit - challenge2MarkReadyCallback = markReady - challenge2WriteCallback = write - return () => {} - }, - }, - onInsert: async () => {}, // Add empty handler to allow direct inserts - }) + const challenge2Collection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `challenge2`, + getKey: (post) => post.id, + autoIndex, + }) + ) const liveQuery = createLiveQueryCollection((q) => q @@ -412,31 +364,31 @@ describe(`createLiveQueryCollection`, () => { const preloadPromise = liveQuery.preload() // Write player - playerBeginCallback!() - playerWriteCallback!({ + playerCollection.utils.begin() + playerCollection.utils.write({ type: `insert`, value: { id: 1, name: `Alice` }, }) - playerCommitCallback!() - playerMarkReadyCallback!() + playerCollection.utils.commit() + playerCollection.utils.markReady() // Write challenge1 - challenge1BeginCallback!() - challenge1WriteCallback!({ + challenge1Collection.utils.begin() + challenge1Collection.utils.write({ type: `insert`, value: { id: 1, value: 100 }, }) - challenge1CommitCallback!() - challenge1MarkReadyCallback!() + challenge1Collection.utils.commit() + challenge1Collection.utils.markReady() // Write challenge2 - challenge2BeginCallback!() - challenge2WriteCallback!({ + challenge2Collection.utils.begin() + challenge2Collection.utils.write({ type: `insert`, value: { id: 1, value: 200 }, }) - challenge2CommitCallback!() - challenge2MarkReadyCallback!() + challenge2Collection.utils.commit() + challenge2Collection.utils.markReady() await preloadPromise diff --git a/packages/db/tests/utls.ts b/packages/db/tests/utls.ts index 9ab74d3a5..6fb4aae0e 100644 --- a/packages/db/tests/utls.ts +++ b/packages/db/tests/utls.ts @@ -5,13 +5,6 @@ import type { SyncConfig, } from "../src/index.js" -type MockSyncCollectionConfig = { - id: string - initialData: Array - getKey: (item: T) => string | number - autoIndex?: `off` | `eager` -} - // Index usage tracking utilities export interface IndexUsageStats { rangeQueryCalls: number @@ -179,6 +172,13 @@ export function withIndexTracking( } } +type MockSyncCollectionConfig = { + id: string + initialData: Array + getKey: (item: T) => string | number + autoIndex?: `off` | `eager` +} + export function mockSyncCollectionOptions< T extends object = Record, >(config: MockSyncCollectionConfig) { @@ -257,3 +257,80 @@ export function mockSyncCollectionOptions< return options } + +type MockSyncCollectionConfigNoInitialState = { + id: string + getKey: (item: T) => string | number + autoIndex?: `off` | `eager` +} + +export function mockSyncCollectionOptionsNoInitialState< + T extends object = Record, +>(config: MockSyncCollectionConfigNoInitialState) { + let begin: () => void + let write: Parameters[`sync`]>[0][`write`] + let commit: () => void + let markReady: () => void + + let syncPendingPromise: Promise | undefined + let syncPendingResolve: (() => void) | undefined + let syncPendingReject: ((error: Error) => void) | undefined + + const awaitSync = async () => { + if (syncPendingPromise) { + return syncPendingPromise + } + syncPendingPromise = new Promise((resolve, reject) => { + syncPendingResolve = resolve + syncPendingReject = reject + }) + syncPendingPromise.then(() => { + syncPendingPromise = undefined + syncPendingResolve = undefined + syncPendingReject = undefined + }) + return syncPendingPromise + } + + const utils = { + begin: () => begin!(), + write: ((value) => write!(value)) as typeof write, + commit: () => commit!(), + markReady: () => markReady!(), + resolveSync: () => { + syncPendingResolve!() + }, + rejectSync: (error: Error) => { + syncPendingReject!(error) + }, + } + + const options: CollectionConfig & { utils: typeof utils } = { + sync: { + sync: (params: Parameters[`sync`]>[0]) => { + begin = params.begin + write = params.write + commit = params.commit + markReady = params.markReady + }, + }, + startSync: false, + onInsert: async (_params: MutationFnParams) => { + // TODO + await awaitSync() + }, + onUpdate: async (_params: MutationFnParams) => { + // TODO + await awaitSync() + }, + onDelete: async (_params: MutationFnParams) => { + // TODO + await awaitSync() + }, + utils, + ...config, + autoIndex: config.autoIndex, + } + + return options +}