diff --git a/.changeset/fix-optimistic-insert-cleanup.md b/.changeset/fix-optimistic-insert-cleanup.md new file mode 100644 index 000000000..d84e44e0c --- /dev/null +++ b/.changeset/fix-optimistic-insert-cleanup.md @@ -0,0 +1,7 @@ +--- +'@tanstack/db': patch +--- + +fix: clean up optimistic state when server returns a different key than the optimistic insert + +When an `onInsert`/`onUpdate`/`onDelete` handler syncs server data back to the collection (via `writeInsert`, `writeUpdate`, `writeUpsert`, `writeDelete`, or `refetch`), the optimistic state under the original client key is now correctly removed if the server returns a different key. Previously, the client-key item persisted forever alongside the server-key item, causing duplication and stale `$synced: false` state. diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index af65cb801..f27eabecc 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -83,6 +83,8 @@ export class CollectionStateManager< public pendingOptimisticDeletes = new Set() public pendingOptimisticDirectUpserts = new Set() public pendingOptimisticDirectDeletes = new Set() + public directTransactionsWithSyncWrites = new Set() + public processedCompletedTransactions = new Set() /** * Tracks the origin of confirmed changes for each row. @@ -483,6 +485,20 @@ export class CollectionStateManager< const isDirectTransaction = transaction.metadata[DIRECT_TRANSACTION_METADATA_KEY] === true if (transaction.state === `completed`) { + // Add-once guard: only process direct transactions for pendingOptimisticDirect* + // on their first completion. This prevents re-adding keys that were already + // cleaned up by commitPendingTransactions (e.g., after writeInsert with same key). + const isFirstProcessing = + isDirectTransaction && + !this.processedCompletedTransactions.has(transaction.id) && + !this.directTransactionsWithSyncWrites.has(transaction.id) + if ( + isDirectTransaction && + !this.processedCompletedTransactions.has(transaction.id) + ) { + this.processedCompletedTransactions.add(transaction.id) + } + for (const mutation of transaction.mutations) { if (!this.isThisCollection(mutation.collection)) { continue @@ -499,21 +515,26 @@ export class CollectionStateManager< mutation.modified as TOutput, ) this.pendingOptimisticDeletes.delete(mutation.key) - if (isDirectTransaction) { + if (isFirstProcessing) { + // First time seeing this direct transaction — seed the pending direct set this.pendingOptimisticDirectUpserts.add(mutation.key) this.pendingOptimisticDirectDeletes.delete(mutation.key) - } else { + } else if (!isDirectTransaction) { + // Non-direct completed transaction — clear pending direct state for this key this.pendingOptimisticDirectUpserts.delete(mutation.key) this.pendingOptimisticDirectDeletes.delete(mutation.key) } + // else: direct but already processed or had sync writes — leave + // pendingOptimisticDirect* unchanged to avoid clobbering entries + // from other concurrent direct transactions break case `delete`: this.pendingOptimisticUpserts.delete(mutation.key) this.pendingOptimisticDeletes.add(mutation.key) - if (isDirectTransaction) { + if (isFirstProcessing) { this.pendingOptimisticDirectUpserts.delete(mutation.key) this.pendingOptimisticDirectDeletes.add(mutation.key) - } else { + } else if (!isDirectTransaction) { this.pendingOptimisticDirectUpserts.delete(mutation.key) this.pendingOptimisticDirectDeletes.delete(mutation.key) } @@ -854,6 +875,22 @@ export class CollectionStateManager< // non-immediate transactions would be applied later and could overwrite newer state. // Processing all committed transactions together preserves causal ordering. if (!hasPersistingTransaction || hasTruncateSync || hasImmediateSync) { + // Track which direct transactions had sync writes committed during their handler. + // When an immediate sync (from writeInsert/writeUpdate/writeDelete) is processed, + // mark all persisting direct transactions. This prevents recomputeOptimisticState + // from adding their mutation keys to pendingOptimisticDirectUpserts (via the + // isFirstProcessing guard), since the sync already confirmed the data. + if (hasImmediateSync) { + for (const tx of this.transactions.values()) { + if ( + tx.state === `persisting` && + tx.metadata[DIRECT_TRANSACTION_METADATA_KEY] === true + ) { + this.directTransactionsWithSyncWrites.add(tx.id) + } + } + } + // Set flag to prevent redundant optimistic state recalculations this.isCommittingSyncTransactions = true @@ -1288,6 +1325,58 @@ export class CollectionStateManager< this.recentlySyncedKeys.clear() }) + // Clean up orphaned pendingOptimisticDirect entries after sync processing. + // A key is orphaned when it belongs to a completed direct transaction + // (handler has run) but the key is not in syncedData (sync confirmed it under + // a different key). This handles the refetch-with-different-key case where the + // handler called refetch() and the server returned the item under a new key. + if (committedSyncedTransactions.length > 0) { + for (const key of [...this.pendingOptimisticDirectUpserts]) { + if (!this.syncedData.has(key)) { + // Check if this key belongs to a completed direct transaction + let belongsToCompletedDirect = false + for (const tx of this.transactions.values()) { + if ( + tx.state === `completed` && + tx.metadata[DIRECT_TRANSACTION_METADATA_KEY] === true + ) { + for (const m of tx.mutations) { + if (this.isThisCollection(m.collection) && m.key === key) { + belongsToCompletedDirect = true + break + } + } + } + if (belongsToCompletedDirect) break + } + if (belongsToCompletedDirect) { + this.pendingOptimisticDirectUpserts.delete(key) + } + } + } + for (const key of [...this.pendingOptimisticDirectDeletes]) { + if (this.syncedData.has(key)) continue + let belongsToCompletedDirect = false + for (const tx of this.transactions.values()) { + if ( + tx.state === `completed` && + tx.metadata[DIRECT_TRANSACTION_METADATA_KEY] === true + ) { + for (const m of tx.mutations) { + if (this.isThisCollection(m.collection) && m.key === key) { + belongsToCompletedDirect = true + break + } + } + } + if (belongsToCompletedDirect) break + } + if (belongsToCompletedDirect) { + this.pendingOptimisticDirectDeletes.delete(key) + } + } + } + // Mark that we've received the first commit (for tracking purposes) if (!this.hasReceivedFirstCommit) { this.hasReceivedFirstCommit = true @@ -1308,13 +1397,28 @@ export class CollectionStateManager< // Schedule cleanup when the transaction completes transaction.isPersisted.promise .then(() => { - // Transaction completed successfully, remove it immediately + // Process any queued sync transactions BEFORE deleting the transaction. + // This ordering is critical: the orphan cleanup inside commitPendingTransactions + // needs to find this transaction (state=completed) in this.transactions to + // identify orphaned keys from the refetch-with-different-key case. + if (this.pendingSyncedTransactions.length > 0) { + this.commitPendingTransactions() + } + + // Now remove the transaction and its tracking entries this.transactions.delete(transaction.id) + this.directTransactionsWithSyncWrites.delete(transaction.id) + this.processedCompletedTransactions.delete(transaction.id) + + // Recompute to pick up any orphan cleanup done by commitPendingTransactions + // during touchCollection (before this cleanup ran) or above. + this.recomputeOptimisticState(false) }) .catch(() => { - // Transaction failed, but we want to keep failed transactions for reference - // so don't remove it. + // Transaction failed — clean up tracking state. // Rollback already triggers state recomputation via touchCollection(). + this.directTransactionsWithSyncWrites.delete(transaction.id) + this.processedCompletedTransactions.delete(transaction.id) }) } @@ -1380,6 +1484,8 @@ export class CollectionStateManager< this.pendingOptimisticDeletes.clear() this.pendingOptimisticDirectUpserts.clear() this.pendingOptimisticDirectDeletes.clear() + this.directTransactionsWithSyncWrites.clear() + this.processedCompletedTransactions.clear() this.clearOriginTrackingState() this.isLocalOnly = false this.size = 0 diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index 113fc0e1e..d4fad6ba6 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -6342,4 +6342,340 @@ describe(`QueryCollection`, () => { customQueryClient.clear() }) }) + + it(`should not duplicate items when writeInsert uses a different key than the optimistic insert`, async () => { + // Client inserts with a temp ID, server returns a different ID, + // writeInsert writes under the server ID. The optimistic insert + // under the client ID must be removed when the handler completes. + type Todo = { id: number; text: string; completed: boolean } + + let nextServerId = 100 + const serverTodos: Array = [] + const queryFn = vi.fn().mockImplementation(() => [...serverTodos]) + + const testQueryClient = new QueryClient({ + defaultOptions: { + queries: { gcTime: 5 * 60 * 1000, staleTime: 0, retry: false }, + }, + }) + + const collection = createCollection( + queryCollectionOptions({ + id: `writeInsert-different-key`, + queryKey: [`writeInsert-different-key`], + queryFn, + queryClient: testQueryClient, + getKey: (todo) => todo.id, + startSync: true, + onInsert: async ({ transaction }) => { + const items = transaction.mutations.map((m) => m.modified) + await new Promise((r) => setTimeout(r, 10)) + const saved = items.map((t) => ({ ...t, id: nextServerId++ })) + serverTodos.push(...saved) + collection.utils.writeInsert(saved) + return { refetch: false } + }, + }), + ) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ todos: collection }), + }) + await liveQuery.preload() + await vi.waitFor(() => expect(collection.status).toBe(`ready`)) + + const clientId = -999 + const tx = collection.insert({ + id: clientId, + text: `Buy milk`, + completed: false, + }) + expect(collection.has(clientId)).toBe(true) + + await tx.isPersisted.promise + await flushPromises() + await new Promise((r) => setTimeout(r, 200)) + await flushPromises() + + expect(collection._state.syncedData.has(100)).toBe(true) + expect(collection.has(clientId)).toBe(false) + expect(collection._state.pendingOptimisticDirectUpserts.has(clientId)).toBe( + false, + ) + expect(collection.size).toBe(1) + + await liveQuery.cleanup() + testQueryClient.clear() + }) + + it(`should mark item as synced when writeInsert uses the same key as the optimistic insert`, async () => { + // Client and server use the same ID, but writeInsert adds + // server-computed fields. After the handler completes the item + // should be $synced: true and the server data should be visible. + type Todo = { + id: number + text: string + completed: boolean + createdAt?: string + } + + const serverTodos: Array = [] + const queryFn = vi.fn().mockImplementation(() => [...serverTodos]) + + const testQueryClient = new QueryClient({ + defaultOptions: { + queries: { gcTime: 5 * 60 * 1000, staleTime: 0, retry: false }, + }, + }) + + const collection = createCollection( + queryCollectionOptions({ + id: `writeInsert-same-key`, + queryKey: [`writeInsert-same-key`], + queryFn, + queryClient: testQueryClient, + getKey: (todo) => todo.id, + startSync: true, + onInsert: async ({ transaction }) => { + const items = transaction.mutations.map((m) => m.modified) + await new Promise((r) => setTimeout(r, 10)) + const saved = items.map((t) => ({ + ...t, + createdAt: `2024-01-01T00:00:00Z`, + })) + serverTodos.push(...saved) + collection.utils.writeInsert(saved) + return { refetch: false } + }, + }), + ) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ todos: collection }), + }) + await liveQuery.preload() + await vi.waitFor(() => expect(collection.status).toBe(`ready`)) + + const tx = collection.insert({ id: 1, text: `Buy milk`, completed: false }) + expect(collection.has(1)).toBe(true) + + await tx.isPersisted.promise + await flushPromises() + await new Promise((r) => setTimeout(r, 200)) + await flushPromises() + + expect(collection._state.syncedData.has(1)).toBe(true) + expect(collection._state.syncedData.get(1)?.createdAt).toBe( + `2024-01-01T00:00:00Z`, + ) + expect(collection._state.optimisticUpserts.has(1)).toBe(false) + expect(collection._state.pendingOptimisticDirectUpserts.has(1)).toBe(false) + expect(collection.size).toBe(1) + + await liveQuery.cleanup() + testQueryClient.clear() + }) + + it(`should not duplicate items when refetch returns a different key than the optimistic insert`, async () => { + // Client inserts with a temp ID, handler calls refetch(), + // server returns the item under a different ID. + // The optimistic insert under the client key must be removed. + type Todo = { id: number; text: string; completed: boolean } + + let nextServerId = 500 + const serverTodos: Array = [] + const queryFn = vi.fn().mockImplementation(() => [...serverTodos]) + + const testQueryClient = new QueryClient({ + defaultOptions: { + queries: { gcTime: 5 * 60 * 1000, staleTime: Infinity, retry: false }, + }, + }) + + const collection = createCollection( + queryCollectionOptions({ + id: `refetch-different-key`, + queryKey: [`refetch-different-key`], + queryFn, + queryClient: testQueryClient, + getKey: (todo) => todo.id, + startSync: true, + onInsert: async ({ transaction, collection: col }) => { + const items = transaction.mutations.map((m) => m.modified) + await new Promise((r) => setTimeout(r, 10)) + const saved = items.map((t) => ({ ...t, id: nextServerId++ })) + serverTodos.push(...saved) + await col.utils.refetch() + }, + }), + ) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ todos: collection }), + }) + await liveQuery.preload() + await vi.waitFor(() => expect(collection.status).toBe(`ready`)) + + const clientId = -777 + const tx = collection.insert({ + id: clientId, + text: `Walk dog`, + completed: false, + }) + expect(collection.has(clientId)).toBe(true) + + await tx.isPersisted.promise + await flushPromises() + await new Promise((r) => setTimeout(r, 100)) + await flushPromises() + + expect(collection._state.syncedData.has(500)).toBe(true) + expect(collection.has(clientId)).toBe(false) + expect(collection._state.pendingOptimisticDirectUpserts.has(clientId)).toBe( + false, + ) + expect(collection.size).toBe(1) + + await liveQuery.cleanup() + testQueryClient.clear() + }) + + it(`should clean up optimistic state when writeUpdate is called in onUpdate handler`, async () => { + // When an onUpdate handler calls writeUpdate to sync the server response, + // the optimistic update should be removed and $synced should become true. + type Todo = { + id: number + text: string + completed: boolean + updatedAt?: string + } + + const serverTodos: Array = [ + { id: 1, text: `Buy milk`, completed: false }, + ] + const queryFn = vi.fn().mockImplementation(() => [...serverTodos]) + + const testQueryClient = new QueryClient({ + defaultOptions: { + queries: { gcTime: 5 * 60 * 1000, staleTime: 0, retry: false }, + }, + }) + + const collection = createCollection( + queryCollectionOptions({ + id: `writeUpdate-same-key`, + queryKey: [`writeUpdate-same-key`], + queryFn, + queryClient: testQueryClient, + getKey: (todo) => todo.id, + startSync: true, + onUpdate: async ({ transaction }) => { + const items = transaction.mutations.map((m) => m.modified) + await new Promise((r) => setTimeout(r, 10)) + const saved = items.map((t) => ({ + ...t, + updatedAt: `2024-06-01T00:00:00Z`, + })) + for (const s of saved) { + const idx = serverTodos.findIndex((t) => t.id === s.id) + if (idx >= 0) serverTodos[idx] = s as Todo + } + collection.utils.writeUpdate(saved) + return { refetch: false } + }, + }), + ) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ todos: collection }), + }) + await liveQuery.preload() + await vi.waitFor(() => expect(collection.status).toBe(`ready`)) + expect(collection._state.syncedData.has(1)).toBe(true) + + const tx = collection.update(1, (draft) => { + draft.completed = true + }) + + await tx.isPersisted.promise + await flushPromises() + await new Promise((r) => setTimeout(r, 200)) + await flushPromises() + + expect(collection._state.syncedData.get(1)?.updatedAt).toBe( + `2024-06-01T00:00:00Z`, + ) + expect(collection._state.optimisticUpserts.has(1)).toBe(false) + expect(collection._state.pendingOptimisticDirectUpserts.has(1)).toBe(false) + expect(collection.size).toBe(1) + + await liveQuery.cleanup() + testQueryClient.clear() + }) + + it(`should not duplicate items when writeBatch uses different keys than the optimistic inserts`, async () => { + // When an onInsert handler uses writeBatch to insert multiple items under + // server-assigned IDs, all optimistic items under client IDs must be removed. + type Todo = { id: number; text: string; completed: boolean } + + let nextServerId = 200 + const serverTodos: Array = [] + const queryFn = vi.fn().mockImplementation(() => [...serverTodos]) + + const testQueryClient = new QueryClient({ + defaultOptions: { + queries: { gcTime: 5 * 60 * 1000, staleTime: 0, retry: false }, + }, + }) + + const collection = createCollection( + queryCollectionOptions({ + id: `writeBatch-different-keys`, + queryKey: [`writeBatch-different-keys`], + queryFn, + queryClient: testQueryClient, + getKey: (todo) => todo.id, + startSync: true, + onInsert: async ({ transaction }) => { + const items = transaction.mutations.map((m) => m.modified) + await new Promise((r) => setTimeout(r, 10)) + const saved = items.map((t) => ({ ...t, id: nextServerId++ })) + serverTodos.push(...saved) + collection.utils.writeBatch(() => { + collection.utils.writeInsert(saved) + }) + return { refetch: false } + }, + }), + ) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ todos: collection }), + }) + await liveQuery.preload() + await vi.waitFor(() => expect(collection.status).toBe(`ready`)) + + const clientId = -888 + const tx = collection.insert({ + id: clientId, + text: `Test batch`, + completed: false, + }) + expect(collection.has(clientId)).toBe(true) + + await tx.isPersisted.promise + await flushPromises() + await new Promise((r) => setTimeout(r, 200)) + await flushPromises() + + expect(collection._state.syncedData.has(200)).toBe(true) + expect(collection.has(clientId)).toBe(false) + expect(collection._state.pendingOptimisticDirectUpserts.has(clientId)).toBe( + false, + ) + expect(collection.size).toBe(1) + + await liveQuery.cleanup() + testQueryClient.clear() + }) })