diff --git a/.changeset/db-sync-methods.md b/.changeset/db-sync-methods.md new file mode 100644 index 000000000..854888aa5 --- /dev/null +++ b/.changeset/db-sync-methods.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Add a new truncate method to the sync handler to enable a collections state to be reset from a sync transaction. diff --git a/.changeset/electric-must-refetch.md b/.changeset/electric-must-refetch.md new file mode 100644 index 000000000..cd03d880c --- /dev/null +++ b/.changeset/electric-must-refetch.md @@ -0,0 +1,5 @@ +--- +"@tanstack/electric-db-collection": patch +--- + +Add must-refetch message handling to clear synced data and re-sync collection data from server. diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index eea7905cd..f02263bb6 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -65,6 +65,7 @@ import type { BaseIndex, IndexResolver } from "./indexes/base-index.js" interface PendingSyncedTransaction> { committed: boolean operations: Array> + truncate?: boolean } /** @@ -559,11 +560,16 @@ export class CollectionImpl< // Check if an item with this key already exists when inserting if (messageWithoutKey.type === `insert`) { + const insertingIntoExistingSynced = this.syncedData.has(key) + const hasPendingDeleteForKey = pendingTransaction.operations.some( + (op) => op.key === key && op.type === `delete` + ) + const isTruncateTransaction = pendingTransaction.truncate === true + // Allow insert after truncate in the same transaction even if it existed in syncedData if ( - this.syncedData.has(key) && - !pendingTransaction.operations.some( - (op) => op.key === key && op.type === `delete` - ) + insertingIntoExistingSynced && + !hasPendingDeleteForKey && + !isTruncateTransaction ) { throw new DuplicateKeySyncError(key, this.id) } @@ -600,6 +606,28 @@ export class CollectionImpl< markReady: () => { this.markReady() }, + truncate: () => { + const pendingTransaction = + this.pendingSyncedTransactions[ + this.pendingSyncedTransactions.length - 1 + ] + if (!pendingTransaction) { + throw new NoPendingSyncTransactionWriteError() + } + if (pendingTransaction.committed) { + throw new SyncTransactionAlreadyCommittedWriteError() + } + + // Clear all operations from the current transaction + pendingTransaction.operations = [] + + // Mark the transaction as a truncate operation. During commit, this triggers: + // - Delete events for all previously synced keys (excluding optimistic-deleted keys) + // - Clearing of syncedData/syncedMetadata + // - Subsequent synced ops applied on the fresh base + // - Finally, optimistic mutations re-applied on top (single batch) + pendingTransaction.truncate = true + }, }) // Store cleanup function if provided @@ -1149,7 +1177,11 @@ export class CollectionImpl< } } - if (!hasPersistingTransaction) { + const hasTruncateSync = this.pendingSyncedTransactions.some( + (t) => t.truncate === true + ) + + if (!hasPersistingTransaction || hasTruncateSync) { // Set flag to prevent redundant optimistic state recalculations this.isCommittingSyncTransactions = true @@ -1179,6 +1211,28 @@ export class CollectionImpl< const rowUpdateMode = this.config.sync.rowUpdateMode || `partial` for (const transaction of this.pendingSyncedTransactions) { + // Handle truncate operations first + if (transaction.truncate) { + // TRUNCATE PHASE + // 1) Emit a delete for every currently-synced key so downstream listeners/indexes + // observe a clear-before-rebuild. We intentionally skip keys already in + // optimisticDeletes because their delete was previously emitted by the user. + for (const key of this.syncedData.keys()) { + if (this.optimisticDeletes.has(key)) continue + const previousValue = + this.optimisticUpserts.get(key) || this.syncedData.get(key) + if (previousValue !== undefined) { + events.push({ type: `delete`, key, value: previousValue }) + } + } + + // 2) Clear the authoritative synced base. Subsequent server ops in this + // same commit will rebuild the base atomically. + this.syncedData.clear() + this.syncedMetadata.clear() + this.syncedKeys.clear() + } + for (const operation of transaction.operations) { const key = operation.key as TKey this.syncedKeys.add(key) @@ -1228,7 +1282,101 @@ export class CollectionImpl< } } - // Clear optimistic state since sync operations will now provide the authoritative data + // After applying synced operations, if this commit included a truncate, + // re-apply optimistic mutations on top of the fresh synced base. This ensures + // the UI preserves local intent while respecting server rebuild semantics. + // Ordering: deletes (above) -> server ops (just applied) -> optimistic upserts. + const hadTruncate = this.pendingSyncedTransactions.some( + (t) => t.truncate === true + ) + if (hadTruncate) { + // Avoid duplicating keys that were inserted/updated by synced operations in this commit + const syncedInsertedOrUpdatedKeys = new Set() + for (const t of this.pendingSyncedTransactions) { + for (const op of t.operations) { + if (op.type === `insert` || op.type === `update`) { + syncedInsertedOrUpdatedKeys.add(op.key as TKey) + } + } + } + + // Build re-apply sets from ACTIVE optimistic transactions against the new synced base + // We do not copy maps; we compute intent directly from transactions to avoid drift. + const reapplyUpserts = new Map() + const reapplyDeletes = new Set() + + for (const tx of this.transactions.values()) { + if ([`completed`, `failed`].includes(tx.state)) continue + for (const mutation of tx.mutations) { + if (mutation.collection !== this || !mutation.optimistic) continue + const key = mutation.key as TKey + switch (mutation.type) { + case `insert`: + reapplyUpserts.set(key, mutation.modified as T) + reapplyDeletes.delete(key) + break + case `update`: { + const base = this.syncedData.get(key) + const next = base + ? (Object.assign({}, base, mutation.changes) as T) + : (mutation.modified as T) + reapplyUpserts.set(key, next) + reapplyDeletes.delete(key) + break + } + case `delete`: + reapplyUpserts.delete(key) + reapplyDeletes.add(key) + break + } + } + } + + // Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete. + // If the server also inserted/updated the same key in this batch, override that value + // with the optimistic value to preserve local intent. + for (const [key, value] of reapplyUpserts) { + if (reapplyDeletes.has(key)) continue + if (syncedInsertedOrUpdatedKeys.has(key)) { + let foundInsert = false + for (let i = events.length - 1; i >= 0; i--) { + const evt = events[i]! + if (evt.key === key && evt.type === `insert`) { + evt.value = value + foundInsert = true + break + } + } + if (!foundInsert) { + events.push({ type: `insert`, key, value }) + } + } else { + events.push({ type: `insert`, key, value }) + } + } + + // Finally, ensure we do NOT insert keys that have an outstanding optimistic delete. + if (events.length > 0 && reapplyDeletes.size > 0) { + const filtered: Array> = [] + for (const evt of events) { + if (evt.type === `insert` && reapplyDeletes.has(evt.key)) { + continue + } + filtered.push(evt) + } + events.length = 0 + events.push(...filtered) + } + + // Ensure listeners are active before emitting this critical batch + if (!this.isReady()) { + this.setStatus(`ready`) + } + } + + // Maintain optimistic state appropriately + // Clear optimistic state since sync operations will now provide the authoritative data. + // Any still-active user transactions will be re-applied below in recompute. this.optimisticUpserts.clear() this.optimisticDeletes.clear() diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 28b25e0ef..b32ea93e1 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -207,6 +207,7 @@ export interface SyncConfig< write: (message: Omit, `key`>) => void commit: () => void markReady: () => void + truncate: () => void }) => void /** diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 23ae8ecd9..022361837 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -810,4 +810,553 @@ describe(`Collection.subscribeChanges`, () => { // Clean up unsubscribe() }) + + it(`should emit delete events for all items when truncate is called`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `initial value 2` }) + + // Clear change events from initial state + changeEvents.length = 0 + + // Test truncate operation + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + expect(collection.state.size).toBe(0) + + // Verify delete events were emitted for all existing items + expect(changeEvents).toHaveLength(2) + expect(changeEvents[0]).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `initial value 1` }, + }) + expect(changeEvents[1]).toEqual({ + type: `delete`, + key: 2, + value: { id: 2, value: `initial value 2` }, + }) + }) + + it(`should emit delete events for optimistic state when truncate is called`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-optimistic-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + onInsert: async () => {}, + onUpdate: async () => {}, + onDelete: async () => {}, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Add some optimistic updates + const tx1 = collection.update(1, (draft) => { + draft.value = `optimistic update 1` + }) + const tx2 = collection.insert({ id: 3, value: `optimistic insert` }) + + // Verify optimistic state exists + expect(collection.optimisticUpserts.has(1)).toBe(true) + expect(collection.optimisticUpserts.has(3)).toBe(true) + expect(collection.state.get(1)?.value).toBe(`optimistic update 1`) + expect(collection.state.get(3)?.value).toBe(`optimistic insert`) + + // Clear previous change events + changeEvents.length = 0 + + // Test truncate operation + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + // After truncate, preserved optimistic inserts should be re-applied + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ + id: 1, + value: `optimistic update 1`, + }) + expect(collection.state.get(3)).toEqual({ + id: 3, + value: `optimistic insert`, + }) + + // Verify events are a single batch: deletes for synced keys (1,2), then inserts for preserved optimistic (1,3) + expect(changeEvents.length).toBe(4) + const deletes = changeEvents.filter((e) => e.type === `delete`) + const inserts = changeEvents.filter((e) => e.type === `insert`) + expect(deletes.length).toBe(2) + expect(inserts.length).toBe(2) + + const deleteByKey = new Map(deletes.map((e) => [e.key, e])) + const insertByKey = new Map(inserts.map((e) => [e.key, e])) + + expect(deleteByKey.get(1)).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `optimistic update 1` }, + }) + expect(deleteByKey.get(2)).toEqual({ + type: `delete`, + key: 2, + value: { id: 2, value: `initial value 2` }, + }) + + // Insert events for preserved optimistic entries (1 and 3) + expect(insertByKey.get(1)).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `optimistic update 1` }, + }) + expect(insertByKey.get(3)).toEqual({ + type: `insert`, + key: 3, + value: { id: 3, value: `optimistic insert` }, + }) + + // Wait for transactions to complete + await Promise.all([tx1.isPersisted.promise, tx2.isPersisted.promise]) + }) + + it(`should emit insert events for new data after truncate`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-new-data-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(2) + + // Clear change events from initial state + changeEvents.length = 0 + + // Test truncate operation + const { begin, truncate, commit, write } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + expect(collection.state.size).toBe(0) + + // Verify delete events were emitted + expect(changeEvents).toHaveLength(2) + expect(changeEvents.every((event) => event.type === `delete`)).toBe(true) + + // Clear change events again + changeEvents.length = 0 + + // Add new data after truncate + begin() + write({ + type: `insert`, + value: { id: 3, value: `new value after truncate` }, + }) + write({ + type: `insert`, + value: { id: 4, value: `another new value` }, + }) + commit() + + // Verify new data is added correctly + expect(collection.state.size).toBe(2) + expect(collection.state.get(3)).toEqual({ + id: 3, + value: `new value after truncate`, + }) + expect(collection.state.get(4)).toEqual({ + id: 4, + value: `another new value`, + }) + + // Verify insert events were emitted for new data + expect(changeEvents).toHaveLength(2) + expect(changeEvents[0]).toEqual({ + type: `insert`, + key: 3, + value: { id: 3, value: `new value after truncate` }, + }) + expect(changeEvents[1]).toEqual({ + type: `insert`, + key: 4, + value: { id: 4, value: `another new value` }, + }) + }) + + it(`should not emit any events when truncate is called on empty collection`, async () => { + const changeEvents: Array = [] + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-empty-changes-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, truncate, markReady }) => { + // Initialize with empty collection + begin() + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, commit, truncate } + }, + }, + }) + + // Listen to change events + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + await collection.stateWhenReady() + + // Verify initial state is empty + expect(collection.state.size).toBe(0) + + // Clear any initial change events + changeEvents.length = 0 + + // Test truncate operation on empty collection + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection remains empty + expect(collection.state.size).toBe(0) + + // Verify no change events were emitted (since there was nothing to delete) + expect(changeEvents).toHaveLength(0) + }) + + it(`truncate + optimistic update: server reinserted key -> optimistic value wins in single batch`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-update-exists-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initial state with key 1 + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + + collection.subscribeChanges((changes) => changeEvents.push(...changes)) + await collection.stateWhenReady() + + // Optimistic update on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(1, (draft) => { + draft.value = `client-update` + }) + ) + + changeEvents.length = 0 + + // Truncate, then server reinserts id 1 with different value + f.begin() + f.truncate() + f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) + f.commit() + + // Expect delete then insert with optimistic value + expect(changeEvents.length).toBe(2) + expect(changeEvents[0]).toEqual({ + type: `delete`, + key: 1, + value: { id: 1, value: `client-update` }, + }) + expect(changeEvents[1]).toEqual({ + type: `insert`, + key: 1, + value: { id: 1, value: `client-update` }, + }) + + // Final state reflects optimistic value + expect(collection.state.get(1)).toEqual({ id: 1, value: `client-update` }) + }) + + it(`truncate + optimistic delete: server reinserted key -> remains deleted (no duplicate delete event)`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-delete-exists-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic delete on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.delete(1)) + + changeEvents.length = 0 + + // Truncate, then server tries to reinsert id 1 + f.begin() + f.truncate() + f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) + f.commit() + + // We already emitted the optimistic delete earlier; do not emit it again. + // Also, do not emit an insert for the re-introduced key. + expect(changeEvents.length).toBe(0) + expect(collection.state.has(1)).toBe(false) + }) + + it(`truncate + optimistic insert: server did NOT reinsert key -> inserted optimistically`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-insert-not-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic insert for id 2 (did not exist before) + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.insert({ id: 2, value: `client-insert` })) + + changeEvents.length = 0 + + // Truncate without server reinsert for id 2 + f.begin() + f.truncate() + // server does not write id 2 + f.commit() + + // Expect delete for id 1, and insert for id 2 + expect(changeEvents.some((e) => e.type === `delete` && e.key === 1)).toBe( + true + ) + expect( + changeEvents.some( + (e) => + e.type === `insert` && + e.key === 2 && + e.value.value === `client-insert` + ) + ).toBe(true) + expect(collection.state.get(2)).toEqual({ id: 2, value: `client-insert` }) + }) + + it(`truncate + optimistic update: server did NOT reinsert key -> optimistic insert then update minimal`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-update-not-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic update on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => + collection.update(1, (draft) => { + draft.value = `client-update` + }) + ) + + changeEvents.length = 0 + + // Truncate; server does not reinsert id 1 + f.begin() + f.truncate() + f.commit() + + // We expect delete for id 1 and then insert (minimal) with optimistic value + const deletes = changeEvents.filter((e) => e.type === `delete`) + const inserts = changeEvents.filter((e) => e.type === `insert`) + expect(deletes.some((e) => e.key === 1)).toBe(true) + expect( + inserts.some((e) => e.key === 1 && e.value.value === `client-update`) + ).toBe(true) + expect(collection.state.get(1)).toEqual({ id: 1, value: `client-update` }) + }) + + it(`truncate + optimistic delete: server did NOT reinsert key -> remains deleted (no extra event)`, async () => { + const changeEvents: Array = [] + let f: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-opt-delete-not-after`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + begin() + write({ type: `insert`, value: { id: 1, value: `server-initial` } }) + commit() + markReady() + f = { begin, write, commit, truncate } + }, + }, + }) + collection.subscribeChanges((c) => changeEvents.push(...c)) + await collection.stateWhenReady() + + // Optimistic delete on id 1 + const mutationFn = async () => {} + const tx = createTransaction({ mutationFn }) + tx.mutate(() => collection.delete(1)) + + changeEvents.length = 0 + + // Truncate with no server reinserts + f.begin() + f.truncate() + f.commit() + + // No new events since the optimistic delete event already fired earlier + expect(changeEvents.length).toBe(0) + expect(collection.state.has(1)).toBe(false) + }) }) diff --git a/packages/db/tests/collection.test.ts b/packages/db/tests/collection.test.ts index 2a15688a2..ee4712bb9 100644 --- a/packages/db/tests/collection.test.ts +++ b/packages/db/tests/collection.test.ts @@ -1358,4 +1358,154 @@ describe(`Collection with schema validation`, () => { tx5.isPersisted.promise, ]) }) + + it(`should handle basic truncate operations`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-basic-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value 1` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `initial value 2` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value 1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `initial value 2` }) + + // Test truncate operation + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection is cleared + expect(collection.state.size).toBe(0) + expect(collection.syncedData.size).toBe(0) + expect(collection.syncedMetadata.size).toBe(0) + }) + + it(`should keep operations written after truncate in the same transaction`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-operations-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit, truncate, markReady }) => { + // Initialize with some data + begin() + write({ + type: `insert`, + value: { id: 1, value: `initial value` }, + }) + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, write, commit, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + // Verify initial state + expect(collection.state.size).toBe(1) + expect(collection.state.get(1)).toEqual({ id: 1, value: `initial value` }) + + // Test truncate operation with additional operations in the same transaction + const { begin, write, truncate, commit } = testSyncFunctions + + begin() + + // Add some operations to the transaction + write({ + type: `insert`, + value: { id: 2, value: `should be cleared` }, + }) + write({ + type: `update`, + value: { id: 1, value: `should be cleared` }, + }) + + // Call truncate - this should clear the operations and mark as truncate + truncate() + + // Add more operations after truncate (these should not be cleared) + write({ + type: `insert`, + value: { id: 3, value: `should not be cleared` }, + }) + + commit() + + // Verify only post-truncate operations are kept + expect(collection.state.size).toBe(1) + expect(collection.state.get(3)).toEqual({ + id: 3, + value: `should not be cleared`, + }) + expect(collection.syncedData.size).toBe(1) + expect(collection.syncedMetadata.size).toBe(1) + }) + + it(`should handle truncate with empty collection`, async () => { + let testSyncFunctions: any = null + + const collection = createCollection<{ id: number; value: string }>({ + id: `truncate-empty-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, commit, truncate, markReady }) => { + // Initialize with empty collection + begin() + commit() + markReady() + + // Store the sync functions for testing + testSyncFunctions = { begin, commit, truncate } + }, + }, + }) + + await collection.stateWhenReady() + + // Verify initial state is empty + expect(collection.state.size).toBe(0) + + // Test truncate operation on empty collection + const { begin, truncate, commit } = testSyncFunctions + begin() + truncate() + commit() + + // Verify collection remains empty + expect(collection.state.size).toBe(0) + expect(collection.syncedData.size).toBe(0) + expect(collection.syncedMetadata.size).toBe(0) + }) }) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index d80f46bcd..a89331700 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -254,6 +254,12 @@ function isUpToDateMessage>( return isControlMessage(message) && message.headers.control === `up-to-date` } +function isMustRefetchMessage>( + message: Message +): message is ControlMessage & { headers: { control: `must-refetch` } } { + return isControlMessage(message) && message.headers.control === `must-refetch` +} + // Check if a message contains txids in its headers function hasTxids>( message: Message @@ -470,7 +476,7 @@ function createElectricSync>( return { sync: (params: Parameters[`sync`]>[0]) => { - const { begin, write, commit, markReady } = params + const { begin, write, commit, markReady, truncate } = params const stream = new ShapeStream({ ...shapeOptions, signal: abortController.signal, @@ -521,6 +527,22 @@ function createElectricSync>( }) } else if (isUpToDateMessage(message)) { hasUpToDate = true + } else if (isMustRefetchMessage(message)) { + debug( + `Received must-refetch message, starting transaction with truncate` + ) + + // Start a transaction and truncate the collection + if (!transactionStarted) { + begin() + transactionStarted = true + } + + truncate() + + // Commit the truncate transaction immediately + commit() + transactionStarted = false } } diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 9a4eaa7d7..e8167efbb 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -225,6 +225,57 @@ describe(`Electric Integration`, () => { expect(collection.state).toEqual(new Map()) }) + it(`should handle must-refetch by clearing synced data and re-syncing`, () => { + // First, populate the collection with some data + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + key: `2`, + value: { id: 2, name: `Another User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Verify the data is in the collection + expect(collection.state.size).toBe(2) + expect(collection.status).toBe(`ready`) + + // Send must-refetch control message + subscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // The collection should be cleared but remain in ready state + expect(collection.state.size).toBe(0) + expect(collection.status).toBe(`ready`) + + // Send new data after must-refetch + subscriber([ + { + key: `3`, + value: { id: 3, name: `New User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // The collection should now have the new data + expect(collection.state.size).toBe(1) + expect(collection.state.get(3)).toEqual({ id: 3, name: `New User` }) + expect(collection.status).toBe(`ready`) + }) + // Tests for txid tracking functionality describe(`txid tracking`, () => { it(`should track txids from incoming messages`, async () => {