diff --git a/.changeset/new-swans-heal.md b/.changeset/new-swans-heal.md new file mode 100644 index 000000000..1778913a8 --- /dev/null +++ b/.changeset/new-swans-heal.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +fix a race condition that could result in the initial state of a joined collection being sent to the live query pipeline twice, this would result in incorrect join results. diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 07fe577ff..03dfb63f1 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -113,6 +113,7 @@ export class CollectionSubscriber< // filter out deletes for keys that have not been sent continue } + this.sentKeys.add(change.key) } newChanges.push(newChange) } @@ -153,12 +154,29 @@ export class CollectionSubscriber< private subscribeToMatchingChanges( whereExpression: BasicExpression | undefined ) { + // Flag to indicate we have send to whole initial state of the collection + // to the pipeline, this is set when there are no indexes that can be used + // to filter the changes and so the whole state was requested from the collection let loadedInitialState = false + // Flag to indicate that we have started sending changes to the pipeline. + // This is set to true by either the first call to `loadKeys` or when the + // query requests the whole initial state in `loadInitialState`. + // Until that point we filter out all changes from subscription to the collection. + let sendChanges = false + const sendVisibleChanges = ( changes: Array> ) => { - this.sendVisibleChangesToPipeline(changes, loadedInitialState) + // We filter out changes when sendChanges is false to ensure that we don't send + // any changes from the live subscription until the join operator requests either + // the initial state or its first key. This is needed otherwise it could receive + // changes which are then later subsumed by the initial state (and that would + // lead to weird bugs due to the data being received twice). + this.sendVisibleChangesToPipeline( + sendChanges ? changes : [], + loadedInitialState + ) } const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { @@ -171,6 +189,7 @@ export class CollectionSubscriber< ? createFilterFunctionFromExpression(whereExpression) : () => true const loadKs = (keys: Set) => { + sendChanges = true return this.loadKeys(keys, filterFn) } @@ -183,6 +202,7 @@ export class CollectionSubscriber< // Make sure we only load the initial state once if (loadedInitialState) return loadedInitialState = true + sendChanges = true const changes = this.collection.currentStateAsChanges({ whereExpression, diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 7fb59dc3a..2570c0812 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -2,7 +2,10 @@ import { beforeEach, describe, expect, it } from "vitest" import { createCollection } from "../../src/collection.js" import { createLiveQueryCollection, eq } from "../../src/query/index.js" import { Query } from "../../src/query/builder/index.js" -import { mockSyncCollectionOptions } from "../utls.js" +import { + mockSyncCollectionOptions, + mockSyncCollectionOptionsNoInitialState, +} from "../utls.js" import type { ChangeMessage } from "../../src/types.js" // Sample user type for tests @@ -313,4 +316,92 @@ describe(`createLiveQueryCollection`, () => { // Resubscribe should not throw (would throw "Graph already finalized" without the fix) expect(() => liveQuery.subscribeChanges(() => {})).not.toThrow() }) + + for (const autoIndex of [`eager`, `off`] as const) { + it(`should not send the initial state twice on joins with autoIndex: ${autoIndex}`, async () => { + type Player = { id: number; name: string } + type Challenge = { id: number; value: number } + + const playerCollection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `player`, + getKey: (post) => post.id, + autoIndex, + }) + ) + + const challenge1Collection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `challenge1`, + getKey: (post) => post.id, + autoIndex, + }) + ) + + const challenge2Collection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `challenge2`, + getKey: (post) => post.id, + autoIndex, + }) + ) + + const liveQuery = createLiveQueryCollection((q) => + q + .from({ player: playerCollection }) + .leftJoin( + { challenge1: challenge1Collection }, + ({ player, challenge1 }) => eq(player.id, challenge1.id) + ) + .leftJoin( + { challenge2: challenge2Collection }, + ({ player, challenge2 }) => eq(player.id, challenge2.id) + ) + ) + + // Start the query, but don't wait it, we are doing to write the data to the + // source collections while the query is loading the initial state + const preloadPromise = liveQuery.preload() + + // Write player + playerCollection.utils.begin() + playerCollection.utils.write({ + type: `insert`, + value: { id: 1, name: `Alice` }, + }) + playerCollection.utils.commit() + playerCollection.utils.markReady() + + // Write challenge1 + challenge1Collection.utils.begin() + challenge1Collection.utils.write({ + type: `insert`, + value: { id: 1, value: 100 }, + }) + challenge1Collection.utils.commit() + challenge1Collection.utils.markReady() + + // Write challenge2 + challenge2Collection.utils.begin() + challenge2Collection.utils.write({ + type: `insert`, + value: { id: 1, value: 200 }, + }) + challenge2Collection.utils.commit() + challenge2Collection.utils.markReady() + + await preloadPromise + + // With a failed test the results show more than 1 item + // It returns both an unjoined player with no joined challenges, and a joined + // player with the challenges + const results = liveQuery.toArray + expect(results.length).toBe(1) + + const result = results[0]! + expect(result.player.name).toBe(`Alice`) + expect(result.challenge1?.value).toBe(100) + expect(result.challenge2?.value).toBe(200) + }) + } }) diff --git a/packages/db/tests/utls.ts b/packages/db/tests/utls.ts index 9ab74d3a5..6fb4aae0e 100644 --- a/packages/db/tests/utls.ts +++ b/packages/db/tests/utls.ts @@ -5,13 +5,6 @@ import type { SyncConfig, } from "../src/index.js" -type MockSyncCollectionConfig = { - id: string - initialData: Array - getKey: (item: T) => string | number - autoIndex?: `off` | `eager` -} - // Index usage tracking utilities export interface IndexUsageStats { rangeQueryCalls: number @@ -179,6 +172,13 @@ export function withIndexTracking( } } +type MockSyncCollectionConfig = { + id: string + initialData: Array + getKey: (item: T) => string | number + autoIndex?: `off` | `eager` +} + export function mockSyncCollectionOptions< T extends object = Record, >(config: MockSyncCollectionConfig) { @@ -257,3 +257,80 @@ export function mockSyncCollectionOptions< return options } + +type MockSyncCollectionConfigNoInitialState = { + id: string + getKey: (item: T) => string | number + autoIndex?: `off` | `eager` +} + +export function mockSyncCollectionOptionsNoInitialState< + T extends object = Record, +>(config: MockSyncCollectionConfigNoInitialState) { + let begin: () => void + let write: Parameters[`sync`]>[0][`write`] + let commit: () => void + let markReady: () => void + + let syncPendingPromise: Promise | undefined + let syncPendingResolve: (() => void) | undefined + let syncPendingReject: ((error: Error) => void) | undefined + + const awaitSync = async () => { + if (syncPendingPromise) { + return syncPendingPromise + } + syncPendingPromise = new Promise((resolve, reject) => { + syncPendingResolve = resolve + syncPendingReject = reject + }) + syncPendingPromise.then(() => { + syncPendingPromise = undefined + syncPendingResolve = undefined + syncPendingReject = undefined + }) + return syncPendingPromise + } + + const utils = { + begin: () => begin!(), + write: ((value) => write!(value)) as typeof write, + commit: () => commit!(), + markReady: () => markReady!(), + resolveSync: () => { + syncPendingResolve!() + }, + rejectSync: (error: Error) => { + syncPendingReject!(error) + }, + } + + const options: CollectionConfig & { utils: typeof utils } = { + sync: { + sync: (params: Parameters[`sync`]>[0]) => { + begin = params.begin + write = params.write + commit = params.commit + markReady = params.markReady + }, + }, + startSync: false, + onInsert: async (_params: MutationFnParams) => { + // TODO + await awaitSync() + }, + onUpdate: async (_params: MutationFnParams) => { + // TODO + await awaitSync() + }, + onDelete: async (_params: MutationFnParams) => { + // TODO + await awaitSync() + }, + utils, + ...config, + autoIndex: config.autoIndex, + } + + return options +}