diff --git a/.changeset/brave-pens-sleep.md b/.changeset/brave-pens-sleep.md new file mode 100644 index 000000000..2c54e114a --- /dev/null +++ b/.changeset/brave-pens-sleep.md @@ -0,0 +1,13 @@ +--- +'@tanstack/db': patch +--- + +fix: prevent duplicate inserts from reaching D2 pipeline in live queries + +Added defensive measures to prevent duplicate INSERT events from reaching the D2 (differential dataflow) pipeline, which could cause items to not disappear when deleted (due to multiplicity going from 2 to 1 instead of 1 to 0). + +Changes: + +- Added `sentToD2Keys` tracking in `CollectionSubscriber` to filter duplicate inserts at the D2 pipeline entry point +- Fixed `includeInitialState` handling to only pass when `true`, preventing internal lazy-loading subscriptions from incorrectly disabling filtering +- Clear `sentToD2Keys` on truncate to allow re-inserts after collection reset diff --git a/examples/react/paced-mutations-demo/package.json b/examples/react/paced-mutations-demo/package.json index 1bace882e..49f22ec47 100644 --- a/examples/react/paced-mutations-demo/package.json +++ b/examples/react/paced-mutations-demo/package.json @@ -10,7 +10,7 @@ }, "dependencies": { "@tanstack/db": "^0.5.11", - "@tanstack/react-db": "^0.1.56", + "@tanstack/react-db": "^0.1.58", "mitt": "^3.0.1", "react": "^19.2.1", "react-dom": "^19.2.1" diff --git a/examples/react/todo/package.json b/examples/react/todo/package.json index 4f738205f..17a84a8bf 100644 --- a/examples/react/todo/package.json +++ b/examples/react/todo/package.json @@ -5,8 +5,8 @@ "dependencies": { "@tanstack/electric-db-collection": "^0.2.12", "@tanstack/query-core": "^5.90.12", - "@tanstack/query-db-collection": "^1.0.8", - "@tanstack/react-db": "^0.1.56", + "@tanstack/query-db-collection": "^1.0.10", + "@tanstack/react-db": "^0.1.58", "@tanstack/react-router": "^1.140.0", "@tanstack/react-start": "^1.140.0", "@tanstack/trailbase-db-collection": "^0.1.55", diff --git a/examples/solid/todo/package.json b/examples/solid/todo/package.json index 7301e813a..91c3bce55 100644 --- a/examples/solid/todo/package.json +++ b/examples/solid/todo/package.json @@ -5,7 +5,7 @@ "dependencies": { "@tanstack/electric-db-collection": "^0.2.12", "@tanstack/query-core": "^5.90.12", - "@tanstack/query-db-collection": "^1.0.8", + "@tanstack/query-db-collection": "^1.0.10", "@tanstack/solid-db": "^0.1.54", "@tanstack/solid-router": "^1.140.0", "@tanstack/solid-start": "^1.140.0", diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index f368562cf..303c833fc 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -29,6 +29,11 @@ export class CollectionSubscriber< { resolve: () => void } >() + // Track keys that have been sent to the D2 pipeline to prevent duplicate inserts + // This is necessary because different code paths (initial load, change events) + // can potentially send the same item to D2 multiple times. + private sentToD2Keys = new Set() + constructor( private alias: string, private collectionId: string, @@ -129,13 +134,33 @@ export class CollectionSubscriber< changes: Iterable>, callback?: () => boolean, ) { + // Filter changes to prevent duplicate inserts to D2 pipeline. + // This ensures D2 multiplicity stays at 1 for visible items, so deletes + // properly reduce multiplicity to 0 (triggering DELETE output). + const changesArray = Array.isArray(changes) ? changes : [...changes] + const filteredChanges: Array> = [] + for (const change of changesArray) { + if (change.type === `insert`) { + if (this.sentToD2Keys.has(change.key)) { + // Skip duplicate insert - already sent to D2 + continue + } + this.sentToD2Keys.add(change.key) + } else if (change.type === `delete`) { + // Remove from tracking so future re-inserts are allowed + this.sentToD2Keys.delete(change.key) + } + // Updates are handled as delete+insert by splitUpdates, so no special handling needed + filteredChanges.push(change) + } + // currentSyncState and input are always defined when this method is called // (only called from active subscriptions during a sync session) const input = this.collectionConfigBuilder.currentSyncState!.inputs[this.alias]! const sentChanges = sendChangesToInput( input, - changes, + filteredChanges, this.collection.config.getKey, ) @@ -162,8 +187,13 @@ export class CollectionSubscriber< this.sendChangesToPipeline(changes) } + // Only pass includeInitialState when true. When it's false, we leave it + // undefined so that user subscriptions with explicit `includeInitialState: false` + // can be distinguished from internal lazy-loading subscriptions. + // If we pass `false`, changes.ts would call markAllStateAsSeen() which + // disables filtering - but internal subscriptions still need filtering. const subscription = this.collection.subscribeChanges(sendChanges, { - includeInitialState, + ...(includeInitialState && { includeInitialState }), whereExpression, }) @@ -190,10 +220,12 @@ export class CollectionSubscriber< whereExpression, }) - // Listen for truncate events to reset cursor tracking state + // Listen for truncate events to reset cursor tracking state and sentToD2Keys // This ensures that after a must-refetch/truncate, we don't use stale cursor data + // and allow re-inserts of previously sent keys const truncateUnsubscribe = this.collection.on(`truncate`, () => { this.biggest = undefined + this.sentToD2Keys.clear() }) // Clean up truncate listener when subscription is unsubscribed diff --git a/packages/db/tests/collection-subscriber-duplicate-inserts.test.ts b/packages/db/tests/collection-subscriber-duplicate-inserts.test.ts new file mode 100644 index 000000000..5657eb206 --- /dev/null +++ b/packages/db/tests/collection-subscriber-duplicate-inserts.test.ts @@ -0,0 +1,452 @@ +import { describe, expect, it } from 'vitest' +import { createCollection } from '../src/collection/index.js' +import { createLiveQueryCollection, eq } from '../src/query/index.js' +import { mockSyncCollectionOptions } from './utils.js' +import type { ChangeMessage } from '../src/types.js' + +/** + * Tests for duplicate insert prevention in the D2 pipeline. + * + * The issue: When using live queries with orderBy + limit, the D2 pipeline + * uses multiplicity tracking. Each insert adds 1 to multiplicity, each delete + * subtracts 1. For an item to disappear, multiplicity must go from 1 to 0. + * + * If duplicate inserts reach D2, multiplicity becomes > 1, and deletes won't + * properly remove items (multiplicity goes from 2 to 1, not triggering removal). + * + * The fix: CollectionSubscriber tracks keys sent to D2 (sentToD2Keys) and + * filters out duplicate inserts before they reach the pipeline. + * + * Additionally, for JOIN queries with lazy sources: + * - The includeInitialState fix ensures internal lazy-loading subscriptions + * don't trigger markAllStateAsSeen() which would disable filtering. + */ + +type TestItem = { + id: string + value: number +} + +type User = { + id: string + name: string +} + +type Order = { + id: string + userId: string + amount: number +} + +describe(`CollectionSubscriber duplicate insert prevention`, () => { + it(`should properly delete items from live query with orderBy + limit`, async () => { + // This test verifies that items can be properly deleted from a live query + // with orderBy + limit. If duplicate inserts reach D2, the delete won't work. + + const initialData: Array = [ + { id: `1`, value: 100 }, + { id: `2`, value: 90 }, + { id: `3`, value: 80 }, + ] + + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `duplicate-d2-source`, + getKey: (item: TestItem) => item.id, + initialData, + }), + ) + + await sourceCollection.preload() + + // Create live query with orderBy + limit (uses TopKWithFractionalIndexOperator) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(2) + .select(({ items }) => ({ + id: items.id, + value: items.value, + })), + ) + + await liveQueryCollection.preload() + + // Verify initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes to verify events + const allChanges: Array> = [] + const subscription = liveQueryCollection.subscribeChanges( + (changes) => { + allChanges.push(...changes) + }, + { includeInitialState: true }, + ) + + // Wait for initial state + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear changes + allChanges.length = 0 + + // Delete item 2 (which is in the visible set) + sourceCollection.delete(`2`) + + // Wait for delete to propagate + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Verify delete event was emitted + const deleteEvents = allChanges.filter((c) => c.type === `delete`) + expect( + deleteEvents.some((e) => e.key === `2`), + `Expected delete event for key 2, but got: ${JSON.stringify(allChanges.map((c) => ({ type: c.type, key: c.key })))}`, + ).toBe(true) + + // Verify item 3 moved into the visible set + const insertEvents = allChanges.filter((c) => c.type === `insert`) + expect( + insertEvents.some((e) => e.key === `3`), + `Expected insert event for key 3, but got: ${JSON.stringify(allChanges.map((c) => ({ type: c.type, key: c.key })))}`, + ).toBe(true) + + // Verify final state + const finalResults = Array.from(liveQueryCollection.values()) + expect(finalResults).toHaveLength(2) + expect(finalResults.map((r) => r.id)).toEqual([`1`, `3`]) + + subscription.unsubscribe() + }) + + it(`should not emit duplicate inserts to live query subscribers`, async () => { + // This test checks that live query subscribers don't receive duplicate inserts + // which would indicate D2 multiplicity issues + + const initialData: Array = [ + { id: `1`, value: 100 }, + { id: `2`, value: 90 }, + ] + + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `duplicate-d2-count`, + getKey: (item: TestItem) => item.id, + initialData, + }), + ) + + await sourceCollection.preload() + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(2) + .select(({ items }) => ({ + id: items.id, + value: items.value, + })), + ) + + await liveQueryCollection.preload() + + const allChanges: Array> = [] + const subscription = liveQueryCollection.subscribeChanges( + (changes) => { + allChanges.push(...changes) + }, + { includeInitialState: true }, + ) + + // Wait for initial state + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Count inserts per key + const insertCounts = new Map() + for (const change of allChanges) { + if (change.type === `insert`) { + insertCounts.set( + change.key as string, + (insertCounts.get(change.key as string) || 0) + 1, + ) + } + } + + // Each key should only have ONE insert + for (const [key, count] of insertCounts) { + expect( + count, + `Key ${key} should only have 1 insert after initial state, got ${count}. ` + + `This indicates duplicate inserts are reaching D2.`, + ).toBe(1) + } + + subscription.unsubscribe() + }) + + it(`should handle rapid updates without duplicate inserts`, async () => { + // This test simulates rapid updates that could cause race conditions + // leading to duplicate inserts + + const initialData: Array = [ + { id: `1`, value: 100 }, + { id: `2`, value: 90 }, + ] + + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `duplicate-d2-rapid`, + getKey: (item: TestItem) => item.id, + initialData, + }), + ) + + await sourceCollection.preload() + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(2) + .select(({ items }) => ({ + id: items.id, + value: items.value, + })), + ) + + await liveQueryCollection.preload() + + const allChanges: Array> = [] + const subscription = liveQueryCollection.subscribeChanges( + (changes) => { + allChanges.push(...changes) + }, + { includeInitialState: true }, + ) + + // Wait for initial state + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear changes + allChanges.length = 0 + + // Rapid updates to simulate potential race conditions + for (let i = 0; i < 5; i++) { + sourceCollection.update(`1`, (draft) => { + draft.value = 100 + i + }) + } + + // Wait for updates to propagate + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Should have update events, not duplicate inserts + const insertCounts = new Map() + for (const change of allChanges) { + if (change.type === `insert`) { + insertCounts.set( + change.key as string, + (insertCounts.get(change.key as string) || 0) + 1, + ) + } + } + + // No duplicate inserts should occur during updates + for (const [key, count] of insertCounts) { + expect( + count, + `Key ${key} should not have duplicate inserts during updates, got ${count}.`, + ).toBeLessThanOrEqual(1) + } + + subscription.unsubscribe() + }) + + it(`should handle mutation during live query subscription setup`, async () => { + // This test checks for race conditions where a mutation occurs + // during live query subscription initialization + + const initialData: Array = [ + { id: `1`, value: 100 }, + { id: `2`, value: 90 }, + ] + + const sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `duplicate-d2-mutation-during-setup`, + getKey: (item: TestItem) => item.id, + initialData, + }), + ) + + await sourceCollection.preload() + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(2) + .select(({ items }) => ({ + id: items.id, + value: items.value, + })), + ) + + await liveQueryCollection.preload() + + const allChanges: Array> = [] + + // Subscribe and immediately mutate the source + const subscription = liveQueryCollection.subscribeChanges( + (changes) => { + allChanges.push(...changes) + + // Trigger mutation during callback (similar to race condition test) + const firstInsert = changes.find( + (c) => c.type === `insert` && c.key === `1`, + ) + if (firstInsert) { + sourceCollection.update(`1`, (draft) => { + draft.value = 101 + }) + } + }, + { includeInitialState: true }, + ) + + // Wait for everything to settle + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Count inserts per key + const insertCounts = new Map() + for (const change of allChanges) { + if (change.type === `insert`) { + insertCounts.set( + change.key as string, + (insertCounts.get(change.key as string) || 0) + 1, + ) + } + } + + console.log( + `Insert counts:`, + Object.fromEntries(insertCounts), + `All changes:`, + allChanges.map((c) => ({ type: c.type, key: c.key })), + ) + + // Each key should only have ONE insert initially + // (updates after initial insert are ok, but not duplicate inserts) + for (const [key, count] of insertCounts) { + expect( + count, + `Key ${key} should only have 1 insert, got ${count}. ` + + `Duplicate inserts indicate D2 multiplicity issues.`, + ).toBe(1) + } + + // Verify we can still delete items (multiplicity is correct) + allChanges.length = 0 + sourceCollection.delete(`2`) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + const deleteEvents = allChanges.filter((c) => c.type === `delete`) + expect(deleteEvents.some((e) => e.key === `2`)).toBe(true) + + subscription.unsubscribe() + }) + + it(`should properly handle deletes in join queries with lazy sources`, async () => { + // This test verifies that items can be properly deleted from a live query + // with a join where one collection is loaded lazily. + // The bug: includeInitialState: false was being passed to lazy sources, + // which triggered markAllStateAsSeen(), disabling filtering. + + const users: Array = [ + { id: `u1`, name: `Alice` }, + { id: `u2`, name: `Bob` }, + ] + + const orders: Array = [ + { id: `o1`, userId: `u1`, amount: 100 }, + { id: `o2`, userId: `u1`, amount: 200 }, + { id: `o3`, userId: `u2`, amount: 150 }, + ] + + const usersCollection = createCollection( + mockSyncCollectionOptions({ + id: `join-lazy-users`, + getKey: (item: User) => item.id, + initialData: users, + }), + ) + + const ordersCollection = createCollection( + mockSyncCollectionOptions({ + id: `join-lazy-orders`, + getKey: (item: Order) => item.id, + initialData: orders, + }), + ) + + await Promise.all([usersCollection.preload(), ordersCollection.preload()]) + + // Create a join query - the orders collection should be lazy loaded + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ users: usersCollection }) + .join({ orders: ordersCollection }, ({ users, orders }) => + eq(users.id, orders.userId), + ) + .select(({ users, orders }) => ({ + orderId: orders!.id, + userName: users.name, + amount: orders!.amount, + })), + ) + + await liveQueryCollection.preload() + + // Verify initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(3) + + // Subscribe to changes + const allChanges: Array> = [] + const subscription = liveQueryCollection.subscribeChanges( + (changes) => { + allChanges.push(...changes) + }, + { includeInitialState: true }, + ) + + // Wait for initial state + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Clear changes + allChanges.length = 0 + + // Delete an order + ordersCollection.delete(`o1`) + + // Wait for delete to propagate + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Verify delete event was emitted + const deleteEvents = allChanges.filter((c) => c.type === `delete`) + expect( + deleteEvents.length, + `Expected at least 1 delete event, but got: ${JSON.stringify(allChanges.map((c) => ({ type: c.type, key: c.key })))}`, + ).toBeGreaterThan(0) + + // Verify final state + const finalResults = Array.from(liveQueryCollection.values()) + expect(finalResults).toHaveLength(2) + + subscription.unsubscribe() + }) +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e905bf92f..2e2148566 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -184,6 +184,8 @@ importers: specifier: ^5.9.2 version: 5.9.3 + examples/react/linearlarge: {} + examples/react/offline-transactions: dependencies: '@tanstack/offline-transactions': @@ -257,7 +259,7 @@ importers: specifier: ^0.5.11 version: link:../../../packages/db '@tanstack/react-db': - specifier: ^0.1.56 + specifier: ^0.1.58 version: link:../../../packages/react-db mitt: specifier: ^3.0.1 @@ -433,10 +435,10 @@ importers: specifier: ^5.90.12 version: 5.90.12 '@tanstack/query-db-collection': - specifier: ^1.0.8 + specifier: ^1.0.10 version: link:../../../packages/query-db-collection '@tanstack/react-db': - specifier: ^0.1.56 + specifier: ^0.1.58 version: link:../../../packages/react-db '@tanstack/react-router': specifier: ^1.140.0 @@ -554,7 +556,7 @@ importers: specifier: ^5.90.12 version: 5.90.12 '@tanstack/query-db-collection': - specifier: ^1.0.8 + specifier: ^1.0.10 version: link:../../../packages/query-db-collection '@tanstack/solid-db': specifier: ^0.1.54 @@ -682,6 +684,8 @@ importers: specifier: ^0.16.0 version: 0.16.0 + packages/benchmarks: {} + packages/db: dependencies: '@standard-schema/spec': @@ -747,6 +751,8 @@ importers: specifier: ^3.2.4 version: 3.2.4(@types/debug@4.1.12)(@types/node@24.7.0)(@vitest/ui@3.2.4)(jiti@2.6.1)(jsdom@27.2.0(postcss@8.5.6))(lightningcss@1.30.2)(sass@1.90.0)(terser@5.44.0)(tsx@4.21.0)(yaml@2.8.1) + packages/db-devtools: {} + packages/db-ivm: dependencies: fractional-indexing: @@ -766,6 +772,8 @@ importers: specifier: ^3.2.4 version: 3.2.4(vitest@3.2.4) + packages/db-tracing: {} + packages/electric-db-collection: dependencies: '@electric-sql/client': @@ -900,6 +908,8 @@ importers: specifier: ^19.2.1 version: 19.2.1(react@19.2.1) + packages/rss-db-collection: {} + packages/rxdb-db-collection: dependencies: '@standard-schema/spec': @@ -931,6 +941,8 @@ importers: specifier: ^3.2.4 version: 3.2.4(vitest@3.2.4) + packages/shared-types: {} + packages/solid-db: dependencies: '@solid-primitives/map':