diff --git a/.changeset/fix-409-errors-collection-cleanup.md b/.changeset/fix-409-errors-collection-cleanup.md new file mode 100644 index 000000000..5117efe5a --- /dev/null +++ b/.changeset/fix-409-errors-collection-cleanup.md @@ -0,0 +1,5 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Fix unhandled 409 errors during collection cleanup. When a collection is cleaned up while snapshot requests are in-flight, errors are now properly caught and ignored rather than propagating as unhandled promise rejections. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 994889ded..25bf29e53 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -349,6 +349,7 @@ function createLoadSubsetDedupe>({ commit, collectionId, encodeColumnName, + signal, }: { stream: ShapeStream syncMode: ElectricSyncMode @@ -366,108 +367,106 @@ function createLoadSubsetDedupe>({ * This is typically the `encode` function from shapeOptions.columnMapper. */ encodeColumnName?: ColumnEncoder + /** + * Abort signal to check if the stream has been aborted during cleanup. + * When aborted, errors from requestSnapshot are silently ignored. + */ + signal: AbortSignal }): DeduplicatedLoadSubset | null { - // Eager mode doesn't need subset loading if (syncMode === `eager`) { return null } const compileOptions = encodeColumnName ? { encodeColumnName } : undefined + const logPrefix = collectionId ? `[${collectionId}] ` : `` + + /** + * Handles errors from snapshot operations. Returns true if the error was + * handled (signal aborted during cleanup), false if it should be re-thrown. + */ + function handleSnapshotError(error: unknown, operation: string): boolean { + if (signal.aborted) { + debug(`${logPrefix}Ignoring ${operation} error during cleanup: %o`, error) + return true + } + debug(`${logPrefix}Error in ${operation}: %o`, error) + return false + } const loadSubset = async (opts: LoadSubsetOptions) => { - // In progressive mode, use fetchSnapshot during snapshot phase if (isBufferingInitialSync()) { - // Progressive mode snapshot phase: fetch and apply immediately const snapshotParams = compileSQL(opts, compileOptions) try { const { data: rows } = await stream.fetchSnapshot(snapshotParams) - // Check again if we're still buffering - we might have received up-to-date - // and completed the atomic swap while waiting for the snapshot if (!isBufferingInitialSync()) { - debug( - `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`, - ) + debug(`${logPrefix}Ignoring snapshot - sync completed while fetching`) return } - // Apply snapshot data in a sync transaction (only if we have data) if (rows.length > 0) { begin() for (const row of rows) { write({ type: `insert`, value: row.value, - metadata: { - ...row.headers, - }, + metadata: { ...row.headers }, }) } commit() - - debug( - `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows`, - ) + debug(`${logPrefix}Applied snapshot with ${rows.length} rows`) } } catch (error) { - debug( - `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, - error, - ) + if (handleSnapshotError(error, `fetchSnapshot`)) { + return + } throw error } - } else if (syncMode === `progressive`) { - // Progressive mode after full sync complete: no need to load more return - } else { - // On-demand mode: use requestSnapshot - // When cursor is provided, make two calls: - // 1. whereCurrent (all ties, no limit) - // 2. whereFrom (rows > cursor, with limit) - const { cursor, where, orderBy, limit } = opts + } - if (cursor) { - // Make parallel requests for cursor-based pagination - const promises: Array> = [] + if (syncMode === `progressive`) { + return + } - // Request 1: All rows matching whereCurrent (ties at boundary, no limit) - // Combine main where with cursor.whereCurrent + const { cursor, where, orderBy, limit } = opts + + try { + if (cursor) { const whereCurrentOpts: LoadSubsetOptions = { where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, orderBy, - // No limit - get all ties } const whereCurrentParams = compileSQL( whereCurrentOpts, compileOptions, ) - promises.push(stream.requestSnapshot(whereCurrentParams)) - debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`, - ) - - // Request 2: Rows matching whereFrom (rows > cursor, with limit) - // Combine main where with cursor.whereFrom const whereFromOpts: LoadSubsetOptions = { where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, orderBy, limit, } const whereFromParams = compileSQL(whereFromOpts, compileOptions) - promises.push(stream.requestSnapshot(whereFromParams)) + debug(`${logPrefix}Requesting cursor.whereCurrent snapshot (all ties)`) debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`, + `${logPrefix}Requesting cursor.whereFrom snapshot (with limit ${limit})`, ) - // Wait for both requests to complete - await Promise.all(promises) + await Promise.all([ + stream.requestSnapshot(whereCurrentParams), + stream.requestSnapshot(whereFromParams), + ]) } else { - // No cursor - standard single request const snapshotParams = compileSQL(opts, compileOptions) await stream.requestSnapshot(snapshotParams) } + } catch (error) { + if (handleSnapshotError(error, `requestSnapshot`)) { + return + } + throw error } } @@ -1311,6 +1310,8 @@ function createElectricSync>( // Pass the columnMapper's encode function to transform column names // (e.g., camelCase to snake_case) when compiling SQL for subset queries encodeColumnName: shapeOptions.columnMapper?.encode, + // Pass abort signal so requestSnapshot errors can be ignored during cleanup + signal: abortController.signal, }) unsubscribeStream = stream.subscribe((messages: Array>) => {