Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-409-errors-collection-cleanup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/electric-db-collection': patch
---

Fix unhandled 409 errors during collection cleanup. When a collection is cleaned up while snapshot requests are in-flight, errors are now properly caught and ignored rather than propagating as unhandled promise rejections.
93 changes: 47 additions & 46 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ function createLoadSubsetDedupe<T extends Row<unknown>>({
commit,
collectionId,
encodeColumnName,
signal,
}: {
stream: ShapeStream<T>
syncMode: ElectricSyncMode
Expand All @@ -366,108 +367,106 @@ function createLoadSubsetDedupe<T extends Row<unknown>>({
* This is typically the `encode` function from shapeOptions.columnMapper.
*/
encodeColumnName?: ColumnEncoder
/**
* Abort signal to check if the stream has been aborted during cleanup.
* When aborted, errors from requestSnapshot are silently ignored.
*/
signal: AbortSignal
}): DeduplicatedLoadSubset | null {
// Eager mode doesn't need subset loading
if (syncMode === `eager`) {
return null
}

const compileOptions = encodeColumnName ? { encodeColumnName } : undefined
const logPrefix = collectionId ? `[${collectionId}] ` : ``

/**
* Handles errors from snapshot operations. Returns true if the error was
* handled (signal aborted during cleanup), false if it should be re-thrown.
*/
function handleSnapshotError(error: unknown, operation: string): boolean {
if (signal.aborted) {
debug(`${logPrefix}Ignoring ${operation} error during cleanup: %o`, error)
return true
}
debug(`${logPrefix}Error in ${operation}: %o`, error)
return false
}

const loadSubset = async (opts: LoadSubsetOptions) => {
// In progressive mode, use fetchSnapshot during snapshot phase
if (isBufferingInitialSync()) {
// Progressive mode snapshot phase: fetch and apply immediately
const snapshotParams = compileSQL<T>(opts, compileOptions)
try {
const { data: rows } = await stream.fetchSnapshot(snapshotParams)

// Check again if we're still buffering - we might have received up-to-date
// and completed the atomic swap while waiting for the snapshot
if (!isBufferingInitialSync()) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`,
)
debug(`${logPrefix}Ignoring snapshot - sync completed while fetching`)
return
}

// Apply snapshot data in a sync transaction (only if we have data)
if (rows.length > 0) {
begin()
for (const row of rows) {
write({
type: `insert`,
value: row.value,
metadata: {
...row.headers,
},
metadata: { ...row.headers },
})
}
commit()

debug(
`${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows`,
)
debug(`${logPrefix}Applied snapshot with ${rows.length} rows`)
}
} catch (error) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`,
error,
)
if (handleSnapshotError(error, `fetchSnapshot`)) {
return
}
throw error
}
} else if (syncMode === `progressive`) {
// Progressive mode after full sync complete: no need to load more
return
} else {
// On-demand mode: use requestSnapshot
// When cursor is provided, make two calls:
// 1. whereCurrent (all ties, no limit)
// 2. whereFrom (rows > cursor, with limit)
const { cursor, where, orderBy, limit } = opts
}

if (cursor) {
// Make parallel requests for cursor-based pagination
const promises: Array<Promise<unknown>> = []
if (syncMode === `progressive`) {
return
}

// Request 1: All rows matching whereCurrent (ties at boundary, no limit)
// Combine main where with cursor.whereCurrent
const { cursor, where, orderBy, limit } = opts

try {
if (cursor) {
const whereCurrentOpts: LoadSubsetOptions = {
where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent,
orderBy,
// No limit - get all ties
}
const whereCurrentParams = compileSQL<T>(
whereCurrentOpts,
compileOptions,
)
promises.push(stream.requestSnapshot(whereCurrentParams))

debug(
`${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`,
)

// Request 2: Rows matching whereFrom (rows > cursor, with limit)
// Combine main where with cursor.whereFrom
const whereFromOpts: LoadSubsetOptions = {
where: where ? and(where, cursor.whereFrom) : cursor.whereFrom,
orderBy,
limit,
}
const whereFromParams = compileSQL<T>(whereFromOpts, compileOptions)
promises.push(stream.requestSnapshot(whereFromParams))

debug(`${logPrefix}Requesting cursor.whereCurrent snapshot (all ties)`)
debug(
`${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`,
`${logPrefix}Requesting cursor.whereFrom snapshot (with limit ${limit})`,
)

// Wait for both requests to complete
await Promise.all(promises)
await Promise.all([
stream.requestSnapshot(whereCurrentParams),
stream.requestSnapshot(whereFromParams),
])
} else {
// No cursor - standard single request
const snapshotParams = compileSQL<T>(opts, compileOptions)
await stream.requestSnapshot(snapshotParams)
}
} catch (error) {
if (handleSnapshotError(error, `requestSnapshot`)) {
return
}
throw error
}
}

Expand Down Expand Up @@ -1311,6 +1310,8 @@ function createElectricSync<T extends Row<unknown>>(
// Pass the columnMapper's encode function to transform column names
// (e.g., camelCase to snake_case) when compiling SQL for subset queries
encodeColumnName: shapeOptions.columnMapper?.encode,
// Pass abort signal so requestSnapshot errors can be ignored during cleanup
signal: abortController.signal,
})

unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
Expand Down
Loading