From 0083024a356448c98456880bf6d504032a5c6553 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 15 Aug 2025 16:43:25 +0100 Subject: [PATCH 1/7] add support for electric must-refetch --- .changeset/db-sync-methods.md | 5 ++ .changeset/electric-must-refetch.md | 5 ++ packages/db/src/collection.ts | 32 +++++++++++- packages/db/src/types.ts | 2 + .../electric-db-collection/src/electric.ts | 20 +++++++- .../tests/electric.test.ts | 51 +++++++++++++++++++ 6 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 .changeset/db-sync-methods.md create mode 100644 .changeset/electric-must-refetch.md diff --git a/.changeset/db-sync-methods.md b/.changeset/db-sync-methods.md new file mode 100644 index 000000000..cb678027a --- /dev/null +++ b/.changeset/db-sync-methods.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Add setLoading and clearSyncedState methods to sync handler to enable a collections state to be reset to loading state and cleared. diff --git a/.changeset/electric-must-refetch.md b/.changeset/electric-must-refetch.md new file mode 100644 index 000000000..c8be84a4e --- /dev/null +++ b/.changeset/electric-must-refetch.md @@ -0,0 +1,5 @@ +--- +"@tanstack/electric-db-collection": patch +--- + +Add must-refetch message handling to clear synced data and reset collection to loading state. diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index eea7905cd..2cd46dc45 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -401,6 +401,30 @@ export class CollectionImpl< } } + /** + * Set the collection back to loading state + * This is called by sync implementations when they need to reset the collection state + * (e.g., when receiving a must-refetch message from Electric) + * @private - Should only be called by sync implementations + */ + private setLoading(): void { + // Can transition to loading from ready or initialCommit states + if (this._status === `ready` || this._status === `initialCommit`) { + this.setStatus(`loading`) + } + } + + /** + * Clear the synced data and metadata + * This is called by sync implementations when they need to clear the collection state + * (e.g., when receiving a must-refetch message from Electric) + * @private - Should only be called by sync implementations + */ + private clearSyncedState(): void { + this.syncedData.clear() + this.syncedMetadata.clear() + } + public id = `` /** @@ -444,7 +468,7 @@ export class CollectionImpl< idle: [`loading`, `error`, `cleaned-up`], loading: [`initialCommit`, `ready`, `error`, `cleaned-up`], initialCommit: [`ready`, `error`, `cleaned-up`], - ready: [`cleaned-up`, `error`], + ready: [`cleaned-up`, `error`, `loading`], error: [`cleaned-up`, `idle`], "cleaned-up": [`loading`, `error`], } @@ -600,6 +624,12 @@ export class CollectionImpl< markReady: () => { this.markReady() }, + setLoading: () => { + this.setLoading() + }, + clearSyncedState: () => { + this.clearSyncedState() + }, }) // Store cleanup function if provided diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 28b25e0ef..01e0388e2 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -207,6 +207,8 @@ export interface SyncConfig< write: (message: Omit, `key`>) => void commit: () => void markReady: () => void + setLoading: () => void + clearSyncedState: () => void }) => void /** diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index d80f46bcd..5c8793ca7 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -254,6 +254,12 @@ function isUpToDateMessage>( return isControlMessage(message) && message.headers.control === `up-to-date` } +function isMustRefetchMessage>( + message: Message +): message is ControlMessage & { headers: { control: `must-refetch` } } { + return isControlMessage(message) && message.headers.control === `must-refetch` +} + // Check if a message contains txids in its headers function hasTxids>( message: Message @@ -470,7 +476,8 @@ function createElectricSync>( return { sync: (params: Parameters[`sync`]>[0]) => { - const { begin, write, commit, markReady } = params + const { begin, write, commit, markReady, clearSyncedState, setLoading } = + params const stream = new ShapeStream({ ...shapeOptions, signal: abortController.signal, @@ -521,6 +528,17 @@ function createElectricSync>( }) } else if (isUpToDateMessage(message)) { hasUpToDate = true + } else if (isMustRefetchMessage(message)) { + debug( + `Received must-refetch message, clearing synced data and restarting sync` + ) + + // Clear synced data and reset to loading state using the new methods + clearSyncedState() + setLoading() + + // Reset transaction state to allow new transactions after must-refetch + transactionStarted = false } } diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 9a4eaa7d7..2839f80b5 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -225,6 +225,57 @@ describe(`Electric Integration`, () => { expect(collection.state).toEqual(new Map()) }) + it(`should handle must-refetch by clearing synced data and resetting to loading state`, () => { + // First, populate the collection with some data + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + key: `2`, + value: { id: 2, name: `Another User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Verify the data is in the collection + expect(collection.state.size).toBe(2) + expect(collection.status).toBe(`ready`) + + // Send must-refetch control message + subscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // The collection should be cleared and reset to loading state + expect(collection.state.size).toBe(0) + expect(collection.status).toBe(`loading`) + + // Send new data after must-refetch + subscriber([ + { + key: `3`, + value: { id: 3, name: `New User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // The collection should now have the new data + expect(collection.state.size).toBe(1) + expect(collection.state.get(3)).toEqual({ id: 3, name: `New User` }) + expect(collection.status).toBe(`ready`) + }) + // Tests for txid tracking functionality describe(`txid tracking`, () => { it(`should track txids from incoming messages`, async () => { From b020323c9cdbe83a1ffd7f7f089224045783a603 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 15 Aug 2025 16:51:25 +0100 Subject: [PATCH 2/7] fix test --- packages/db/tests/collection-errors.test.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/db/tests/collection-errors.test.ts b/packages/db/tests/collection-errors.test.ts index d8784760f..e9f2e3925 100644 --- a/packages/db/tests/collection-errors.test.ts +++ b/packages/db/tests/collection-errors.test.ts @@ -370,9 +370,9 @@ describe(`Collection Error Handling`, () => { expect(collection.status).toBe(`idle`) - // Test invalid transition + // Test invalid transition (ready to idle is not allowed) expect(() => { - collectionImpl.validateStatusTransition(`ready`, `loading`) + collectionImpl.validateStatusTransition(`ready`, `idle`) }).toThrow(InvalidCollectionStatusTransitionError) // Test valid transition @@ -435,6 +435,9 @@ describe(`Collection Error Handling`, () => { expect(() => collectionImpl.validateStatusTransition(`ready`, `error`) ).not.toThrow() + expect(() => + collectionImpl.validateStatusTransition(`ready`, `loading`) + ).not.toThrow() // Valid transitions from error (allow recovery) expect(() => From c317eef79e40f0633c446d99146ad53f6423c58e Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 15 Aug 2025 18:58:43 +0100 Subject: [PATCH 3/7] swap to trucating during a sync transaction --- .changeset/db-sync-methods.md | 2 +- packages/db/src/collection.ts | 83 ++++++++++++------- packages/db/src/types.ts | 3 +- packages/db/tests/collection-errors.test.ts | 7 +- .../electric-db-collection/src/electric.ts | 18 ++-- .../tests/electric.test.ts | 4 +- 6 files changed, 70 insertions(+), 47 deletions(-) diff --git a/.changeset/db-sync-methods.md b/.changeset/db-sync-methods.md index cb678027a..854888aa5 100644 --- a/.changeset/db-sync-methods.md +++ b/.changeset/db-sync-methods.md @@ -2,4 +2,4 @@ "@tanstack/db": patch --- -Add setLoading and clearSyncedState methods to sync handler to enable a collections state to be reset to loading state and cleared. +Add a new truncate method to the sync handler to enable a collections state to be reset from a sync transaction. diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 2cd46dc45..c9e1951b1 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -65,6 +65,7 @@ import type { BaseIndex, IndexResolver } from "./indexes/base-index.js" interface PendingSyncedTransaction> { committed: boolean operations: Array> + truncate?: boolean } /** @@ -401,30 +402,6 @@ export class CollectionImpl< } } - /** - * Set the collection back to loading state - * This is called by sync implementations when they need to reset the collection state - * (e.g., when receiving a must-refetch message from Electric) - * @private - Should only be called by sync implementations - */ - private setLoading(): void { - // Can transition to loading from ready or initialCommit states - if (this._status === `ready` || this._status === `initialCommit`) { - this.setStatus(`loading`) - } - } - - /** - * Clear the synced data and metadata - * This is called by sync implementations when they need to clear the collection state - * (e.g., when receiving a must-refetch message from Electric) - * @private - Should only be called by sync implementations - */ - private clearSyncedState(): void { - this.syncedData.clear() - this.syncedMetadata.clear() - } - public id = `` /** @@ -468,7 +445,7 @@ export class CollectionImpl< idle: [`loading`, `error`, `cleaned-up`], loading: [`initialCommit`, `ready`, `error`, `cleaned-up`], initialCommit: [`ready`, `error`, `cleaned-up`], - ready: [`cleaned-up`, `error`, `loading`], + ready: [`cleaned-up`, `error`], error: [`cleaned-up`, `idle`], "cleaned-up": [`loading`, `error`], } @@ -624,11 +601,23 @@ export class CollectionImpl< markReady: () => { this.markReady() }, - setLoading: () => { - this.setLoading() - }, - clearSyncedState: () => { - this.clearSyncedState() + truncate: () => { + const pendingTransaction = + this.pendingSyncedTransactions[ + this.pendingSyncedTransactions.length - 1 + ] + if (!pendingTransaction) { + throw new NoPendingSyncTransactionWriteError() + } + if (pendingTransaction.committed) { + throw new SyncTransactionAlreadyCommittedWriteError() + } + + // Clear all operations from the current transaction + pendingTransaction.operations = [] + + // Mark the transaction as a truncate operation + pendingTransaction.truncate = true }, }) @@ -1209,6 +1198,40 @@ export class CollectionImpl< const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` for (const transaction of this.pendingSyncedTransactions) { + // Handle truncate operations first + if (transaction.truncate) { + // Collect all existing keys (both synced and optimistic) before clearing + const existingKeys = new Set() + + // Add all synced keys + for (const key of this.syncedKeys) { + existingKeys.add(key) + } + + // Add all optimistic upsert keys + for (const key of this.optimisticUpserts.keys()) { + existingKeys.add(key) + } + + // Generate delete events for all existing keys + for (const key of existingKeys) { + const previousValue = + this.optimisticUpserts.get(key) || this.syncedData.get(key) + if (previousValue !== undefined) { + events.push({ + type: `delete`, + key, + value: previousValue, + }) + } + } + + // Clear all synced data and metadata + this.syncedData.clear() + this.syncedMetadata.clear() + this.syncedKeys.clear() + } + for (const operation of transaction.operations) { const key = operation.key as TKey this.syncedKeys.add(key) diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 01e0388e2..b32ea93e1 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -207,8 +207,7 @@ export interface SyncConfig< write: (message: Omit, `key`>) => void commit: () => void markReady: () => void - setLoading: () => void - clearSyncedState: () => void + truncate: () => void }) => void /** diff --git a/packages/db/tests/collection-errors.test.ts b/packages/db/tests/collection-errors.test.ts index e9f2e3925..d8784760f 100644 --- a/packages/db/tests/collection-errors.test.ts +++ b/packages/db/tests/collection-errors.test.ts @@ -370,9 +370,9 @@ describe(`Collection Error Handling`, () => { expect(collection.status).toBe(`idle`) - // Test invalid transition (ready to idle is not allowed) + // Test invalid transition expect(() => { - collectionImpl.validateStatusTransition(`ready`, `idle`) + collectionImpl.validateStatusTransition(`ready`, `loading`) }).toThrow(InvalidCollectionStatusTransitionError) // Test valid transition @@ -435,9 +435,6 @@ describe(`Collection Error Handling`, () => { expect(() => collectionImpl.validateStatusTransition(`ready`, `error`) ).not.toThrow() - expect(() => - collectionImpl.validateStatusTransition(`ready`, `loading`) - ).not.toThrow() // Valid transitions from error (allow recovery) expect(() => diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 5c8793ca7..a89331700 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -476,8 +476,7 @@ function createElectricSync>( return { sync: (params: Parameters[`sync`]>[0]) => { - const { begin, write, commit, markReady, clearSyncedState, setLoading } = - params + const { begin, write, commit, markReady, truncate } = params const stream = new ShapeStream({ ...shapeOptions, signal: abortController.signal, @@ -530,14 +529,19 @@ function createElectricSync>( hasUpToDate = true } else if (isMustRefetchMessage(message)) { debug( - `Received must-refetch message, clearing synced data and restarting sync` + `Received must-refetch message, starting transaction with truncate` ) - // Clear synced data and reset to loading state using the new methods - clearSyncedState() - setLoading() + // Start a transaction and truncate the collection + if (!transactionStarted) { + begin() + transactionStarted = true + } + + truncate() - // Reset transaction state to allow new transactions after must-refetch + // Commit the truncate transaction immediately + commit() transactionStarted = false } } diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 2839f80b5..304401503 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -254,9 +254,9 @@ describe(`Electric Integration`, () => { }, ]) - // The collection should be cleared and reset to loading state + // The collection should be cleared but remain in ready state expect(collection.state.size).toBe(0) - expect(collection.status).toBe(`loading`) + expect(collection.status).toBe(`ready`) // Send new data after must-refetch subscriber([ From 892311fc6af8475a7d624ca2b5b5b86db040eb03 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 15 Aug 2025 20:30:20 +0100 Subject: [PATCH 4/7] more through tests and fixes --- packages/db/src/collection.ts | 89 ++- .../collection-subscribe-changes.test.ts | 565 ++++++++++++++++++ packages/db/tests/collection.test.ts | 150 +++++ 3 files changed, 793 insertions(+), 11 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index c9e1951b1..5754eba47 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -560,11 +560,16 @@ export class CollectionImpl< // Check if an item with this key already exists when inserting if (messageWithoutKey.type === `insert`) { + const insertingIntoExistingSynced = this.syncedData.has(key) + const hasPendingDeleteForKey = pendingTransaction.operations.some( + (op) => op.key === key && op.type === `delete` + ) + const isTruncateTransaction = pendingTransaction.truncate === true + // Allow insert after truncate in the same transaction even if it existed in syncedData if ( - this.syncedData.has(key) && - !pendingTransaction.operations.some( - (op) => op.key === key && op.type === `delete` - ) + insertingIntoExistingSynced && + !hasPendingDeleteForKey && + !isTruncateTransaction ) { throw new DuplicateKeySyncError(key, this.id) } @@ -1168,7 +1173,11 @@ export class CollectionImpl< } } - if (!hasPersistingTransaction) { + const hasTruncateSync = this.pendingSyncedTransactions.some( + (t) => t.truncate === true + ) + + if (!hasPersistingTransaction || hasTruncateSync) { // Set flag to prevent redundant optimistic state recalculations this.isCommittingSyncTransactions = true @@ -1197,14 +1206,17 @@ export class CollectionImpl< const events: Array> = [] const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` + // Preserve current optimistic inserts so we can re-emit them after truncate + const preservedOptimisticInserts = new Map(this.optimisticUpserts) + for (const transaction of this.pendingSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { // Collect all existing keys (both synced and optimistic) before clearing const existingKeys = new Set() - // Add all synced keys - for (const key of this.syncedKeys) { + // Add all current synced data keys + for (const key of this.syncedData.keys()) { existingKeys.add(key) } @@ -1281,9 +1293,63 @@ export class CollectionImpl< } } - // Clear optimistic state since sync operations will now provide the authoritative data - this.optimisticUpserts.clear() - this.optimisticDeletes.clear() + // After applying synced operations, if this cycle included a truncate, + // re-emit optimistic inserts as inserts after deletes to restore optimistic view + // and maintain required event ordering (deletes first, then inserts). + const hadTruncate = this.pendingSyncedTransactions.some( + (t) => t.truncate === true + ) + if (hadTruncate) { + // Avoid duplicating keys that were inserted by synced operations in this commit + const syncedInsertedOrUpdatedKeys = new Set() + for (const t of this.pendingSyncedTransactions) { + for (const op of t.operations) { + if (op.type === `insert` || op.type === `update`) { + syncedInsertedOrUpdatedKeys.add(op.key as TKey) + } + } + } + + for (const [key, value] of preservedOptimisticInserts) { + if (this.optimisticDeletes.has(key)) continue + if (syncedInsertedOrUpdatedKeys.has(key)) { + // Override the server insert/update value with optimistic value for minimal changes. + // If no server insert event exists in our batch yet, add an insert with optimistic value. + let foundInsert = false + for (let i = events.length - 1; i >= 0; i--) { + const evt = events[i]! + if (evt.key === key && evt.type === `insert`) { + evt.value = value + foundInsert = true + break + } + } + if (!foundInsert) { + events.push({ type: `insert`, key, value }) + } + } else { + events.push({ type: `insert`, key, value }) + } + } + + // Ensure listeners are active before emitting this critical batch by + // transitioning to ready if not already + if (!this.isReady()) { + this.setStatus(`ready`) + } + } + + // Maintain optimistic state appropriately + if (hadTruncate) { + // Preserve optimistic inserts across truncate + this.optimisticUpserts = new Map(preservedOptimisticInserts) + this.optimisticDeletes.clear() + } else { + // Clear optimistic state since sync operations will now provide the authoritative data. + // Any still-active user transactions will be re-applied below in recompute. + this.optimisticUpserts.clear() + this.optimisticDeletes.clear() + } // Reset flag and recompute optimistic state for any remaining active transactions this.isCommittingSyncTransactions = false @@ -1380,7 +1446,8 @@ export class CollectionImpl< this.updateIndexes(events) } - // End batching and emit all events (combines any batched events with sync events) + // Emit all events in one batch. Any previously batched optimistic events + // will be combined inside emitEvents when forceEmit=true. this.emitEvents(events, true) this.pendingSyncedTransactions = [] diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 23ae8ecd9..33383c198 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -810,4 +810,569 @@ describe(`Collection.subscribeChanges`, () => { // Clean up unsubscribe() }) + + it(`should emit delete events for all items when truncate is called`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `initial value 2` }) + + // Clear change events from initial state + changeEvents.length = 0 + + // Test truncate operation + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + expect(collection.state.size).toBe(0) + + // Verify delete events were emitted for all existing items + expect(changeEvents).toHaveLength(2) + expect(changeEvents[0]).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `initial value 1` }, + }) + expect(changeEvents[1]).toEqual({ + type: `delete`, + key: 2, + value: { id: 2, value: `initial value 2` }, + }) + }) + + it(`should emit delete events for optimistic state when truncate is called`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-optimistic-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + onInsert: async () => {}, + onUpdate: async () => {}, + onDelete: async () => {}, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Add some optimistic updates + const tx1 = collection.update(1, (draft) => { + draft.value = `optimistic update 1` + }) + const tx2 = collection.insert({ id: 3, value: `optimistic insert` }) + + // Verify optimistic state exists + expect(collection.optimisticUpserts.has(1)).toBe(true) + expect(collection.optimisticUpserts.has(3)).toBe(true) + expect(collection.state.get(1)?.value).toBe(`optimistic update 1`) + expect(collection.state.get(3)?.value).toBe(`optimistic insert`) + + // Clear previous change events + changeEvents.length = 0 + + // Test truncate operation + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + // After truncate, preserved optimistic inserts should be re-applied + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ + id: 1, + value: `optimistic update 1`, + }) + expect(collection.state.get(3)).toEqual({ + id: 3, + value: `optimistic insert`, + }) + + // Verify events are a single batch: deletes for all existing keys, then inserts for preserved optimistic + expect(changeEvents).toHaveLength(5) + // First 3 should be deletes for keys 1,2,3 (order by key not guaranteed, so sort for assertions) + const deletes = changeEvents.slice(0, 3) + const inserts = changeEvents.slice(3) + expect(deletes.every((e) => e.type === `delete`)).toBe(true) + expect(inserts.every((e) => e.type === `insert`)).toBe(true) + + const deleteByKey = new Map(deletes.map((e) => [e.key, e])) + const insertByKey = new Map(inserts.map((e) => [e.key, e])) + + const deleteEvent1 = deleteByKey.get(1) + const deleteEvent2 = deleteByKey.get(2) + const deleteEvent3 = deleteByKey.get(3) + + expect(deleteEvent1).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `optimistic update 1` }, // Should use optimistic value + }) + expect(deleteEvent2).toEqual({ + type: `delete`, + key: 2, + value: { id: 2, value: `initial value 2` }, + }) + expect(deleteEvent3).toEqual({ + type: `delete`, + key: 3, + value: { id: 3, value: `optimistic insert` }, + }) + + // Insert events for preserved optimistic entries (1 and 3) + expect(insertByKey.get(1)).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `optimistic update 1` }, + }) + expect(insertByKey.get(3)).toEqual({ + type: `insert`, + key: 3, + value: { id: 3, value: `optimistic insert` }, + }) + + // Wait for transactions to complete + await Promise.all([tx1.isPersisted.promise, tx2.isPersisted.promise]) + }) + + it(`should emit insert events for new data after truncate`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-new-data-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(2) + + // Clear change events from initial state + changeEvents.length = 0 + + // Test truncate operation + const { begin, truncate, commit, write } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + expect(collection.state.size).toBe(0) + + // Verify delete events were emitted + expect(changeEvents).toHaveLength(2) + expect(changeEvents.every((event) => event.type === `delete`)).toBe(true) + + // Clear change events again + changeEvents.length = 0 + + // Add new data after truncate + begin() + write({ + type: `insert`, + value: { id: 3, value: `new value after truncate` }, + }) + write({ + type: `insert`, + value: { id: 4, value: `another new value` }, + }) + commit() + + // Verify new data is added correctly + expect(collection.state.size).toBe(2) + expect(collection.state.get(3)).toEqual({ + id: 3, + value: `new value after truncate`, + }) + expect(collection.state.get(4)).toEqual({ + id: 4, + value: `another new value`, + }) + + // Verify insert events were emitted for new data + expect(changeEvents).toHaveLength(2) + expect(changeEvents[0]).toEqual({ + type: `insert`, + key: 3, + value: { id: 3, value: `new value after truncate` }, + }) + expect(changeEvents[1]).toEqual({ + type: `insert`, + key: 4, + value: { id: 4, value: `another new value` }, + }) + }) + + it(`should not emit any events when truncate is called on empty collection`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-empty-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, truncate, markReady }) => { + // Initialize with empty collection + begin() + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, commit, truncate } + }, + }, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Verify initial state is empty + expect(collection.state.size).toBe(0) + + // Clear any initial change events + changeEvents.length = 0 + + // Test truncate operation on empty collection + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection remains empty + expect(collection.state.size).toBe(0) + + // Verify no change events were emitted (since there was nothing to delete) + expect(changeEvents).toHaveLength(0) + }) + + it(`truncate + optimistic update: server reinserted key -> optimistic value wins in single batch`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-update-exists-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initial state with key 1 + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + // Optimistic update on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(1, (draft) => { + draft.value = `client-update` + }) + ) + + changeEvents.length = 0 + + // Truncate, then server reinserts id 1 with different value + f.begin() + f.truncate() + f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) + f.commit() + + // Expect delete then insert with optimistic value + expect(changeEvents.length).toBe(2) + expect(changeEvents[0]).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `client-update` }, + }) + expect(changeEvents[1]).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `client-update` }, + }) + + // Final state reflects optimistic value + expect(collection.state.get(1)).toEqual({ id: 1, value: `client-update` }) + }) + + it(`truncate + optimistic delete: server reinserted key -> remains deleted`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-delete-exists-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic delete on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.delete(1)) + + changeEvents.length = 0 + + // Truncate, then server tries to reinsert id 1 + f.begin() + f.truncate() + f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) + f.commit() + + // Expect only delete; no insert due to optimistic delete precedence + expect(changeEvents.length).toBe(1) + expect(changeEvents[0]).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `server-initial` }, + }) + expect(collection.state.has(1)).toBe(false) + }) + + it(`truncate + optimistic insert: server did NOT reinsert key -> inserted optimistically`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-insert-not-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic insert for id 2 (did not exist before) + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.insert({ id: 2, value: `client-insert` })) + + changeEvents.length = 0 + + // Truncate without server reinsert for id 2 + f.begin() + f.truncate() + // server does not write id 2 + f.commit() + + // Expect delete for id 1, and insert for id 2 + expect(changeEvents.some((e) => e.type === `delete` && e.key === 1)).toBe( + true + ) + expect( + changeEvents.some( + (e) => + e.type === `insert` && + e.key === 2 && + e.value.value === `client-insert` + ) + ).toBe(true) + expect(collection.state.get(2)).toEqual({ id: 2, value: `client-insert` }) + }) + + it(`truncate + optimistic update: server did NOT reinsert key -> optimistic insert then update minimal`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-update-not-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic update on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(1, (draft) => { + draft.value = `client-update` + }) + ) + + changeEvents.length = 0 + + // Truncate; server does not reinsert id 1 + f.begin() + f.truncate() + f.commit() + + // We expect delete for id 1 and then insert (minimal) with optimistic value + const deletes = changeEvents.filter((e) => e.type === `delete`) + const inserts = changeEvents.filter((e) => e.type === `insert`) + expect(deletes.some((e) => e.key === 1)).toBe(true) + expect( + inserts.some((e) => e.key === 1 && e.value.value === `client-update`) + ).toBe(true) + expect(collection.state.get(1)).toEqual({ id: 1, value: `client-update` }) + }) + + it(`truncate + optimistic delete: server did NOT reinsert key -> remains deleted`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-delete-not-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic delete on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.delete(1)) + + changeEvents.length = 0 + + // Truncate with no server reinserts + f.begin() + f.truncate() + f.commit() + + // Expect only delete + expect(changeEvents.length).toBe(1) + expect(changeEvents[0].type).toBe(`delete`) + expect(changeEvents[0].key).toBe(1) + expect(collection.state.has(1)).toBe(false) + }) }) diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 2a15688a2..ee4712bb9 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1358,4 +1358,154 @@ describe(`Collection with schema validation`, () => { tx5.isPersisted.promise, ]) }) + + it(`should handle basic truncate operations`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-basic-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `initial value 2` }) + + // Test truncate operation + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + expect(collection.state.size).toBe(0) + expect(collection.syncedData.size).toBe(0) + expect(collection.syncedMetadata.size).toBe(0) + }) + + it(`should keep operations written after truncate in the same transaction`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-operations-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(1) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value` }) + + // Test truncate operation with additional operations in the same transaction + const { begin, write, truncate, commit } = testSyncFunctions + + begin() + + // Add some operations to the transaction + write({ + type: `insert`, + value: { id: 2, value: `should be cleared` }, + }) + write({ + type: `update`, + value: { id: 1, value: `should be cleared` }, + }) + + // Call truncate - this should clear the operations and mark as truncate + truncate() + + // Add more operations after truncate (these should not be cleared) + write({ + type: `insert`, + value: { id: 3, value: `should not be cleared` }, + }) + + commit() + + // Verify only post-truncate operations are kept + expect(collection.state.size).toBe(1) + expect(collection.state.get(3)).toEqual({ + id: 3, + value: `should not be cleared`, + }) + expect(collection.syncedData.size).toBe(1) + expect(collection.syncedMetadata.size).toBe(1) + }) + + it(`should handle truncate with empty collection`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-empty-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, truncate, markReady }) => { + // Initialize with empty collection + begin() + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, commit, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + // Verify initial state is empty + expect(collection.state.size).toBe(0) + + // Test truncate operation on empty collection + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection remains empty + expect(collection.state.size).toBe(0) + expect(collection.syncedData.size).toBe(0) + expect(collection.syncedMetadata.size).toBe(0) + }) }) From 5f4a1d4c2e3a74519a65f6e6f0efab2aad9d2100 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 16 Aug 2025 17:23:17 +0100 Subject: [PATCH 5/7] checkpoint --- packages/db/src/collection.ts | 102 +++++++++++------- .../collection-subscribe-changes.test.ts | 48 +++------ 2 files changed, 81 insertions(+), 69 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 5754eba47..d1e8402ba 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -1189,6 +1189,14 @@ export class CollectionImpl< } } + // If this commit includes a truncate control, also include optimistic upsert keys + // so the generic diff machinery can emit inserts/overrides for them + if (this.pendingSyncedTransactions.some((t) => t.truncate === true)) { + for (const key of this.optimisticUpserts.keys()) { + changedKeys.add(key) + } + } + // Use pre-captured state if available (from optimistic scenarios), // otherwise capture current state (for pure sync scenarios) let currentVisibleState = this.preSyncVisibleState @@ -1206,35 +1214,19 @@ export class CollectionImpl< const events: Array> = [] const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` - // Preserve current optimistic inserts so we can re-emit them after truncate - const preservedOptimisticInserts = new Map(this.optimisticUpserts) + // Use current optimistic state directly; no copies required for (const transaction of this.pendingSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { - // Collect all existing keys (both synced and optimistic) before clearing - const existingKeys = new Set() - - // Add all current synced data keys + // Emit deletes only for currently synced keys, skipping keys already + // optimistically deleted (their delete was already emitted) for (const key of this.syncedData.keys()) { - existingKeys.add(key) - } - - // Add all optimistic upsert keys - for (const key of this.optimisticUpserts.keys()) { - existingKeys.add(key) - } - - // Generate delete events for all existing keys - for (const key of existingKeys) { + if (this.optimisticDeletes.has(key)) continue const previousValue = this.optimisticUpserts.get(key) || this.syncedData.get(key) if (previousValue !== undefined) { - events.push({ - type: `delete`, - key, - value: previousValue, - }) + events.push({ type: `delete`, key, value: previousValue }) } } @@ -1310,11 +1302,41 @@ export class CollectionImpl< } } - for (const [key, value] of preservedOptimisticInserts) { - if (this.optimisticDeletes.has(key)) continue + // Build re-apply sets from active optimistic transactions against the new synced base + const reapplyUpserts = new Map() + const reapplyDeletes = new Set() + + for (const tx of this.transactions.values()) { + if ([`completed`, `failed`].includes(tx.state)) continue + for (const mutation of tx.mutations) { + if (mutation.collection !== this || !mutation.optimistic) continue + const key = mutation.key as TKey + switch (mutation.type) { + case `insert`: + reapplyUpserts.set(key, mutation.modified as T) + reapplyDeletes.delete(key) + break + case `update`: { + const base = this.syncedData.get(key) + const next = base + ? (Object.assign({}, base, mutation.changes) as T) + : (mutation.modified as T) + reapplyUpserts.set(key, next) + reapplyDeletes.delete(key) + break + } + case `delete`: + reapplyUpserts.delete(key) + reapplyDeletes.add(key) + break + } + } + } + + // Emit/override inserts for re-applied upserts, skipping any deleted keys + for (const [key, value] of reapplyUpserts) { + if (reapplyDeletes.has(key)) continue if (syncedInsertedOrUpdatedKeys.has(key)) { - // Override the server insert/update value with optimistic value for minimal changes. - // If no server insert event exists in our batch yet, add an insert with optimistic value. let foundInsert = false for (let i = events.length - 1; i >= 0; i--) { const evt = events[i]! @@ -1332,24 +1354,30 @@ export class CollectionImpl< } } - // Ensure listeners are active before emitting this critical batch by - // transitioning to ready if not already + // Filter out any insert events for keys that are optimistically deleted + if (events.length > 0 && reapplyDeletes.size > 0) { + const filtered: Array> = [] + for (const evt of events) { + if (evt.type === `insert` && reapplyDeletes.has(evt.key)) { + continue + } + filtered.push(evt) + } + events.length = 0 + events.push(...filtered) + } + + // Ensure listeners are active before emitting this critical batch if (!this.isReady()) { this.setStatus(`ready`) } } // Maintain optimistic state appropriately - if (hadTruncate) { - // Preserve optimistic inserts across truncate - this.optimisticUpserts = new Map(preservedOptimisticInserts) - this.optimisticDeletes.clear() - } else { - // Clear optimistic state since sync operations will now provide the authoritative data. - // Any still-active user transactions will be re-applied below in recompute. - this.optimisticUpserts.clear() - this.optimisticDeletes.clear() - } + // Clear optimistic state since sync operations will now provide the authoritative data. + // Any still-active user transactions will be re-applied below in recompute. + this.optimisticUpserts.clear() + this.optimisticDeletes.clear() // Reset flag and recompute optimistic state for any remaining active transactions this.isCommittingSyncTransactions = false diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 33383c198..022361837 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -950,36 +950,26 @@ describe(`Collection.subscribeChanges`, () => { value: `optimistic insert`, }) - // Verify events are a single batch: deletes for all existing keys, then inserts for preserved optimistic - expect(changeEvents).toHaveLength(5) - // First 3 should be deletes for keys 1,2,3 (order by key not guaranteed, so sort for assertions) - const deletes = changeEvents.slice(0, 3) - const inserts = changeEvents.slice(3) - expect(deletes.every((e) => e.type === `delete`)).toBe(true) - expect(inserts.every((e) => e.type === `insert`)).toBe(true) + // Verify events are a single batch: deletes for synced keys (1,2), then inserts for preserved optimistic (1,3) + expect(changeEvents.length).toBe(4) + const deletes = changeEvents.filter((e) => e.type === `delete`) + const inserts = changeEvents.filter((e) => e.type === `insert`) + expect(deletes.length).toBe(2) + expect(inserts.length).toBe(2) const deleteByKey = new Map(deletes.map((e) => [e.key, e])) const insertByKey = new Map(inserts.map((e) => [e.key, e])) - const deleteEvent1 = deleteByKey.get(1) - const deleteEvent2 = deleteByKey.get(2) - const deleteEvent3 = deleteByKey.get(3) - - expect(deleteEvent1).toEqual({ + expect(deleteByKey.get(1)).toEqual({ type: `delete`, key: 1, - value: { id: 1, value: `optimistic update 1` }, // Should use optimistic value + value: { id: 1, value: `optimistic update 1` }, }) - expect(deleteEvent2).toEqual({ + expect(deleteByKey.get(2)).toEqual({ type: `delete`, key: 2, value: { id: 2, value: `initial value 2` }, }) - expect(deleteEvent3).toEqual({ - type: `delete`, - key: 3, - value: { id: 3, value: `optimistic insert` }, - }) // Insert events for preserved optimistic entries (1 and 3) expect(insertByKey.get(1)).toEqual({ @@ -1196,7 +1186,7 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.state.get(1)).toEqual({ id: 1, value: `client-update` }) }) - it(`truncate + optimistic delete: server reinserted key -> remains deleted`, async () => { + it(`truncate + optimistic delete: server reinserted key -> remains deleted (no duplicate delete event)`, async () => { const changeEvents: Array = [] let f: any = null @@ -1230,13 +1220,9 @@ describe(`Collection.subscribeChanges`, () => { f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) f.commit() - // Expect only delete; no insert due to optimistic delete precedence - expect(changeEvents.length).toBe(1) - expect(changeEvents[0]).toEqual({ - type: `delete`, - key: 1, - value: { id: 1, value: `server-initial` }, - }) + // We already emitted the optimistic delete earlier; do not emit it again. + // Also, do not emit an insert for the re-introduced key. + expect(changeEvents.length).toBe(0) expect(collection.state.has(1)).toBe(false) }) @@ -1336,7 +1322,7 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.state.get(1)).toEqual({ id: 1, value: `client-update` }) }) - it(`truncate + optimistic delete: server did NOT reinsert key -> remains deleted`, async () => { + it(`truncate + optimistic delete: server did NOT reinsert key -> remains deleted (no extra event)`, async () => { const changeEvents: Array = [] let f: any = null @@ -1369,10 +1355,8 @@ describe(`Collection.subscribeChanges`, () => { f.truncate() f.commit() - // Expect only delete - expect(changeEvents.length).toBe(1) - expect(changeEvents[0].type).toBe(`delete`) - expect(changeEvents[0].key).toBe(1) + // No new events since the optimistic delete event already fired earlier + expect(changeEvents.length).toBe(0) expect(collection.state.has(1)).toBe(false) }) }) From 7634818c429810343d5e73b9b930bd8b77463453 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 16 Aug 2025 17:38:36 +0100 Subject: [PATCH 6/7] tidy --- packages/db/src/collection.ts | 44 ++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index d1e8402ba..d704c8eae 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -621,7 +621,11 @@ export class CollectionImpl< // Clear all operations from the current transaction pendingTransaction.operations = [] - // Mark the transaction as a truncate operation + // Mark the transaction as a truncate operation. During commit, this triggers: + // - Delete events for all previously synced keys (excluding optimistic-deleted keys) + // - Clearing of syncedData/syncedMetadata + // - Subsequent synced ops applied on the fresh base + // - Finally, optimistic mutations re-applied on top (single batch) pendingTransaction.truncate = true }, }) @@ -1189,14 +1193,6 @@ export class CollectionImpl< } } - // If this commit includes a truncate control, also include optimistic upsert keys - // so the generic diff machinery can emit inserts/overrides for them - if (this.pendingSyncedTransactions.some((t) => t.truncate === true)) { - for (const key of this.optimisticUpserts.keys()) { - changedKeys.add(key) - } - } - // Use pre-captured state if available (from optimistic scenarios), // otherwise capture current state (for pure sync scenarios) let currentVisibleState = this.preSyncVisibleState @@ -1219,8 +1215,10 @@ export class CollectionImpl< for (const transaction of this.pendingSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { - // Emit deletes only for currently synced keys, skipping keys already - // optimistically deleted (their delete was already emitted) + // TRUNCATE PHASE + // 1) Emit a delete for every currently-synced key so downstream listeners/indexes + // observe a clear-before-rebuild. We intentionally skip keys already in + // optimisticDeletes because their delete was previously emitted by the user. for (const key of this.syncedData.keys()) { if (this.optimisticDeletes.has(key)) continue const previousValue = @@ -1230,7 +1228,8 @@ export class CollectionImpl< } } - // Clear all synced data and metadata + // 2) Clear the authoritative synced base. Subsequent server ops in this + // same commit will rebuild the base atomically. this.syncedData.clear() this.syncedMetadata.clear() this.syncedKeys.clear() @@ -1285,14 +1284,15 @@ export class CollectionImpl< } } - // After applying synced operations, if this cycle included a truncate, - // re-emit optimistic inserts as inserts after deletes to restore optimistic view - // and maintain required event ordering (deletes first, then inserts). + // After applying synced operations, if this commit included a truncate, + // re-apply optimistic mutations on top of the fresh synced base. This ensures + // the UI preserves local intent while respecting server rebuild semantics. + // Ordering: deletes (above) -> server ops (just applied) -> optimistic upserts. const hadTruncate = this.pendingSyncedTransactions.some( (t) => t.truncate === true ) if (hadTruncate) { - // Avoid duplicating keys that were inserted by synced operations in this commit + // Avoid duplicating keys that were inserted/updated by synced operations in this commit const syncedInsertedOrUpdatedKeys = new Set() for (const t of this.pendingSyncedTransactions) { for (const op of t.operations) { @@ -1302,7 +1302,8 @@ export class CollectionImpl< } } - // Build re-apply sets from active optimistic transactions against the new synced base + // Build re-apply sets from ACTIVE optimistic transactions against the new synced base + // We do not copy maps; we compute intent directly from transactions to avoid drift. const reapplyUpserts = new Map() const reapplyDeletes = new Set() @@ -1333,7 +1334,9 @@ export class CollectionImpl< } } - // Emit/override inserts for re-applied upserts, skipping any deleted keys + // Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete. + // If the server also inserted/updated the same key in this batch, override that value + // with the optimistic value to preserve local intent. for (const [key, value] of reapplyUpserts) { if (reapplyDeletes.has(key)) continue if (syncedInsertedOrUpdatedKeys.has(key)) { @@ -1354,7 +1357,7 @@ export class CollectionImpl< } } - // Filter out any insert events for keys that are optimistically deleted + // Finally, ensure we do NOT insert keys that have an outstanding optimistic delete. if (events.length > 0 && reapplyDeletes.size > 0) { const filtered: Array> = [] for (const evt of events) { @@ -1474,8 +1477,7 @@ export class CollectionImpl< this.updateIndexes(events) } - // Emit all events in one batch. Any previously batched optimistic events - // will be combined inside emitEvents when forceEmit=true. + // End batching and emit all events (combines any batched events with sync events) this.emitEvents(events, true) this.pendingSyncedTransactions = [] From 04fb63a8059aa2831a40fe4d156eec544f84168a Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 Aug 2025 09:06:40 +0100 Subject: [PATCH 7/7] tweaks --- .changeset/electric-must-refetch.md | 2 +- packages/db/src/collection.ts | 2 -- packages/electric-db-collection/tests/electric.test.ts | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.changeset/electric-must-refetch.md b/.changeset/electric-must-refetch.md index c8be84a4e..cd03d880c 100644 --- a/.changeset/electric-must-refetch.md +++ b/.changeset/electric-must-refetch.md @@ -2,4 +2,4 @@ "@tanstack/electric-db-collection": patch --- -Add must-refetch message handling to clear synced data and reset collection to loading state. +Add must-refetch message handling to clear synced data and re-sync collection data from server. diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index d704c8eae..f02263bb6 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -1210,8 +1210,6 @@ export class CollectionImpl< const events: Array> = [] const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` - // Use current optimistic state directly; no copies required - for (const transaction of this.pendingSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 304401503..e8167efbb 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -225,7 +225,7 @@ describe(`Electric Integration`, () => { expect(collection.state).toEqual(new Map()) }) - it(`should handle must-refetch by clearing synced data and resetting to loading state`, () => { + it(`should handle must-refetch by clearing synced data and re-syncing`, () => { // First, populate the collection with some data subscriber([ {