Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/db-sync-methods.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Add a new truncate method to the sync handler to enable a collections state to be reset from a sync transaction.
5 changes: 5 additions & 0 deletions .changeset/electric-must-refetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/electric-db-collection": patch
---

Add must-refetch message handling to clear synced data and re-sync collection data from server.
160 changes: 154 additions & 6 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import type { BaseIndex, IndexResolver } from "./indexes/base-index.js"
interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
committed: boolean
operations: Array<OptimisticChangeMessage<T>>
truncate?: boolean
}

/**
Expand Down Expand Up @@ -559,11 +560,16 @@ export class CollectionImpl<

// Check if an item with this key already exists when inserting
if (messageWithoutKey.type === `insert`) {
const insertingIntoExistingSynced = this.syncedData.has(key)
const hasPendingDeleteForKey = pendingTransaction.operations.some(
(op) => op.key === key && op.type === `delete`
)
const isTruncateTransaction = pendingTransaction.truncate === true
// Allow insert after truncate in the same transaction even if it existed in syncedData
if (
this.syncedData.has(key) &&
!pendingTransaction.operations.some(
(op) => op.key === key && op.type === `delete`
)
insertingIntoExistingSynced &&
!hasPendingDeleteForKey &&
!isTruncateTransaction
) {
throw new DuplicateKeySyncError(key, this.id)
}
Expand Down Expand Up @@ -600,6 +606,28 @@ export class CollectionImpl<
markReady: () => {
this.markReady()
},
truncate: () => {
const pendingTransaction =
this.pendingSyncedTransactions[
this.pendingSyncedTransactions.length - 1
]
if (!pendingTransaction) {
throw new NoPendingSyncTransactionWriteError()
}
if (pendingTransaction.committed) {
throw new SyncTransactionAlreadyCommittedWriteError()
}

// Clear all operations from the current transaction
pendingTransaction.operations = []

// Mark the transaction as a truncate operation. During commit, this triggers:
// - Delete events for all previously synced keys (excluding optimistic-deleted keys)
// - Clearing of syncedData/syncedMetadata
// - Subsequent synced ops applied on the fresh base
// - Finally, optimistic mutations re-applied on top (single batch)
pendingTransaction.truncate = true
},
})

// Store cleanup function if provided
Expand Down Expand Up @@ -1149,7 +1177,11 @@ export class CollectionImpl<
}
}

if (!hasPersistingTransaction) {
const hasTruncateSync = this.pendingSyncedTransactions.some(
(t) => t.truncate === true
)

if (!hasPersistingTransaction || hasTruncateSync) {
// Set flag to prevent redundant optimistic state recalculations
this.isCommittingSyncTransactions = true

Expand Down Expand Up @@ -1179,6 +1211,28 @@ export class CollectionImpl<
const rowUpdateMode = this.config.sync.rowUpdateMode || `partial`

for (const transaction of this.pendingSyncedTransactions) {
// Handle truncate operations first
if (transaction.truncate) {
// TRUNCATE PHASE
// 1) Emit a delete for every currently-synced key so downstream listeners/indexes
// observe a clear-before-rebuild. We intentionally skip keys already in
// optimisticDeletes because their delete was previously emitted by the user.
for (const key of this.syncedData.keys()) {
if (this.optimisticDeletes.has(key)) continue
const previousValue =
this.optimisticUpserts.get(key) || this.syncedData.get(key)
if (previousValue !== undefined) {
events.push({ type: `delete`, key, value: previousValue })
}
}

// 2) Clear the authoritative synced base. Subsequent server ops in this
// same commit will rebuild the base atomically.
this.syncedData.clear()
this.syncedMetadata.clear()
this.syncedKeys.clear()
}

for (const operation of transaction.operations) {
const key = operation.key as TKey
this.syncedKeys.add(key)
Expand Down Expand Up @@ -1228,7 +1282,101 @@ export class CollectionImpl<
}
}

// Clear optimistic state since sync operations will now provide the authoritative data
// After applying synced operations, if this commit included a truncate,
// re-apply optimistic mutations on top of the fresh synced base. This ensures
// the UI preserves local intent while respecting server rebuild semantics.
// Ordering: deletes (above) -> server ops (just applied) -> optimistic upserts.
const hadTruncate = this.pendingSyncedTransactions.some(
(t) => t.truncate === true
)
if (hadTruncate) {
// Avoid duplicating keys that were inserted/updated by synced operations in this commit
const syncedInsertedOrUpdatedKeys = new Set<TKey>()
for (const t of this.pendingSyncedTransactions) {
for (const op of t.operations) {
if (op.type === `insert` || op.type === `update`) {
syncedInsertedOrUpdatedKeys.add(op.key as TKey)
}
}
}

// Build re-apply sets from ACTIVE optimistic transactions against the new synced base
// We do not copy maps; we compute intent directly from transactions to avoid drift.
const reapplyUpserts = new Map<TKey, T>()
const reapplyDeletes = new Set<TKey>()

for (const tx of this.transactions.values()) {
if ([`completed`, `failed`].includes(tx.state)) continue
for (const mutation of tx.mutations) {
if (mutation.collection !== this || !mutation.optimistic) continue
const key = mutation.key as TKey
switch (mutation.type) {
case `insert`:
reapplyUpserts.set(key, mutation.modified as T)
reapplyDeletes.delete(key)
break
case `update`: {
const base = this.syncedData.get(key)
const next = base
? (Object.assign({}, base, mutation.changes) as T)
: (mutation.modified as T)
reapplyUpserts.set(key, next)
reapplyDeletes.delete(key)
break
}
case `delete`:
reapplyUpserts.delete(key)
reapplyDeletes.add(key)
break
}
}
}

// Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete.
// If the server also inserted/updated the same key in this batch, override that value
// with the optimistic value to preserve local intent.
for (const [key, value] of reapplyUpserts) {
if (reapplyDeletes.has(key)) continue
if (syncedInsertedOrUpdatedKeys.has(key)) {
let foundInsert = false
for (let i = events.length - 1; i >= 0; i--) {
const evt = events[i]!
if (evt.key === key && evt.type === `insert`) {
evt.value = value
foundInsert = true
break
}
}
if (!foundInsert) {
events.push({ type: `insert`, key, value })
}
} else {
events.push({ type: `insert`, key, value })
}
}

// Finally, ensure we do NOT insert keys that have an outstanding optimistic delete.
if (events.length > 0 && reapplyDeletes.size > 0) {
const filtered: Array<ChangeMessage<T, TKey>> = []
for (const evt of events) {
if (evt.type === `insert` && reapplyDeletes.has(evt.key)) {
continue
}
filtered.push(evt)
}
events.length = 0
events.push(...filtered)
}

// Ensure listeners are active before emitting this critical batch
if (!this.isReady()) {
this.setStatus(`ready`)
}
}

// Maintain optimistic state appropriately
// Clear optimistic state since sync operations will now provide the authoritative data.
// Any still-active user transactions will be re-applied below in recompute.
this.optimisticUpserts.clear()
this.optimisticDeletes.clear()

Expand Down
1 change: 1 addition & 0 deletions packages/db/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ export interface SyncConfig<
write: (message: Omit<ChangeMessage<T>, `key`>) => void
commit: () => void
markReady: () => void
truncate: () => void
}) => void

/**
Expand Down
Loading