From ba076b77b4602559f33b1d7fb5f00f84959c970f Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 22 Jan 2026 22:56:53 +0000 Subject: [PATCH 1/4] Fix unhandled 409 errors during collection cleanup When a collection is cleaned up while requestSnapshot or fetchSnapshot calls are still in-flight, the abort signal causes these calls to fail with HTTP 409 "must-refetch" errors. These errors were propagating as unhandled promise rejections because they bypassed the onError handler (which only catches errors from the ShapeStream subscription). This fix: - Passes the abort signal to createLoadSubsetDedupe - Wraps requestSnapshot and fetchSnapshot calls in try-catch blocks - Silently ignores errors when the signal is aborted (during cleanup) - Re-throws errors if the signal is not aborted (real errors) --- .../electric-db-collection/src/electric.ts | 109 ++++++++++++------ .../tests/electric.test.ts | 2 + 2 files changed, 74 insertions(+), 37 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 994889ded..110a71929 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -349,6 +349,7 @@ function createLoadSubsetDedupe>({ commit, collectionId, encodeColumnName, + signal, }: { stream: ShapeStream syncMode: ElectricSyncMode @@ -366,6 +367,11 @@ function createLoadSubsetDedupe>({ * This is typically the `encode` function from shapeOptions.columnMapper. */ encodeColumnName?: ColumnEncoder + /** + * Abort signal to check if the stream has been aborted during cleanup. + * When aborted, errors from requestSnapshot are silently ignored. + */ + signal: AbortSignal }): DeduplicatedLoadSubset | null { // Eager mode doesn't need subset loading if (syncMode === `eager`) { @@ -410,6 +416,16 @@ function createLoadSubsetDedupe>({ ) } } catch (error) { + // If the stream has been aborted (during cleanup), ignore the error. + // This prevents unhandled promise rejections when the collection is + // cleaned up while fetchSnapshot calls are still in-flight. + if (signal.aborted) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Ignoring fetchSnapshot error during cleanup: %o`, + error, + ) + return + } debug( `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, error, @@ -426,47 +442,64 @@ function createLoadSubsetDedupe>({ // 2. whereFrom (rows > cursor, with limit) const { cursor, where, orderBy, limit } = opts - if (cursor) { - // Make parallel requests for cursor-based pagination - const promises: Array> = [] - - // Request 1: All rows matching whereCurrent (ties at boundary, no limit) - // Combine main where with cursor.whereCurrent - const whereCurrentOpts: LoadSubsetOptions = { - where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, - orderBy, - // No limit - get all ties - } - const whereCurrentParams = compileSQL( - whereCurrentOpts, - compileOptions, - ) - promises.push(stream.requestSnapshot(whereCurrentParams)) + try { + if (cursor) { + // Make parallel requests for cursor-based pagination + const promises: Array> = [] + + // Request 1: All rows matching whereCurrent (ties at boundary, no limit) + // Combine main where with cursor.whereCurrent + const whereCurrentOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, + orderBy, + // No limit - get all ties + } + const whereCurrentParams = compileSQL( + whereCurrentOpts, + compileOptions, + ) + promises.push(stream.requestSnapshot(whereCurrentParams)) - debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`, - ) + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`, + ) - // Request 2: Rows matching whereFrom (rows > cursor, with limit) - // Combine main where with cursor.whereFrom - const whereFromOpts: LoadSubsetOptions = { - where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, - orderBy, - limit, - } - const whereFromParams = compileSQL(whereFromOpts, compileOptions) - promises.push(stream.requestSnapshot(whereFromParams)) + // Request 2: Rows matching whereFrom (rows > cursor, with limit) + // Combine main where with cursor.whereFrom + const whereFromOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, + orderBy, + limit, + } + const whereFromParams = compileSQL(whereFromOpts, compileOptions) + promises.push(stream.requestSnapshot(whereFromParams)) - debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`, - ) + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`, + ) - // Wait for both requests to complete - await Promise.all(promises) - } else { - // No cursor - standard single request - const snapshotParams = compileSQL(opts, compileOptions) - await stream.requestSnapshot(snapshotParams) + // Wait for both requests to complete + await Promise.all(promises) + } else { + // No cursor - standard single request + const snapshotParams = compileSQL(opts, compileOptions) + await stream.requestSnapshot(snapshotParams) + } + } catch (error) { + // If the stream has been aborted (during cleanup), ignore the error. + // This prevents unhandled promise rejections when the collection is + // cleaned up while requestSnapshot calls are still in-flight. + // The 409 "must-refetch" errors are expected during cleanup and + // don't indicate a real problem. + if (signal.aborted) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}Ignoring requestSnapshot error during cleanup: %o`, + error, + ) + return + } + // Re-throw non-abort errors + throw error } } } @@ -1311,6 +1344,8 @@ function createElectricSync>( // Pass the columnMapper's encode function to transform column names // (e.g., camelCase to snake_case) when compiling SQL for subset queries encodeColumnName: shapeOptions.columnMapper?.encode, + // Pass abort signal so requestSnapshot errors can be ignored during cleanup + signal: abortController.signal, }) unsubscribeStream = stream.subscribe((messages: Array>) => { diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 7a8bc7384..8f363db6e 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2842,6 +2842,7 @@ describe(`Electric Integration`, () => { }), ) }) + }) // Tests for overlapping subset queries with duplicate keys @@ -3961,4 +3962,5 @@ describe(`Electric Integration`, () => { vi.useRealTimers() }) }) + }) From 8e2a8772112c7167e5e4384d8c0495a7ae341540 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 21:05:29 +0000 Subject: [PATCH 2/4] ci: apply automated fixes --- packages/electric-db-collection/src/electric.ts | 4 +++- packages/electric-db-collection/tests/electric.test.ts | 2 -- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 110a71929..8a1fbd4fc 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -450,7 +450,9 @@ function createLoadSubsetDedupe>({ // Request 1: All rows matching whereCurrent (ties at boundary, no limit) // Combine main where with cursor.whereCurrent const whereCurrentOpts: LoadSubsetOptions = { - where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, + where: where + ? and(where, cursor.whereCurrent) + : cursor.whereCurrent, orderBy, // No limit - get all ties } diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 8f363db6e..7a8bc7384 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2842,7 +2842,6 @@ describe(`Electric Integration`, () => { }), ) }) - }) // Tests for overlapping subset queries with duplicate keys @@ -3962,5 +3961,4 @@ describe(`Electric Integration`, () => { vi.useRealTimers() }) }) - }) From 846782cbced0a8595f64ff73b3ad36e66431b940 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 26 Jan 2026 08:54:03 -0700 Subject: [PATCH 3/4] Add changeset for 409 error fix Co-Authored-By: Claude Opus 4.5 --- .changeset/fix-409-errors-collection-cleanup.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-409-errors-collection-cleanup.md diff --git a/.changeset/fix-409-errors-collection-cleanup.md b/.changeset/fix-409-errors-collection-cleanup.md new file mode 100644 index 000000000..5117efe5a --- /dev/null +++ b/.changeset/fix-409-errors-collection-cleanup.md @@ -0,0 +1,5 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Fix unhandled 409 errors during collection cleanup. When a collection is cleaned up while snapshot requests are in-flight, errors are now properly caught and ignored rather than propagating as unhandled promise rejections. From d5a0765da1869b4869f96ede45274c9e60d1bb38 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 26 Jan 2026 08:57:08 -0700 Subject: [PATCH 4/4] Simplify 409 error handling code - Extract handleSnapshotError helper to deduplicate error handling logic - Consolidate repeated logPrefix pattern into single variable - Flatten nested else-if chains using early returns - Simplify Promise.all with inline array literal - Remove redundant comments Co-Authored-By: Claude Opus 4.5 --- .../electric-db-collection/src/electric.ts | 148 +++++++----------- 1 file changed, 56 insertions(+), 92 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 8a1fbd4fc..25bf29e53 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -373,136 +373,100 @@ function createLoadSubsetDedupe>({ */ signal: AbortSignal }): DeduplicatedLoadSubset | null { - // Eager mode doesn't need subset loading if (syncMode === `eager`) { return null } const compileOptions = encodeColumnName ? { encodeColumnName } : undefined + const logPrefix = collectionId ? `[${collectionId}] ` : `` + + /** + * Handles errors from snapshot operations. Returns true if the error was + * handled (signal aborted during cleanup), false if it should be re-thrown. + */ + function handleSnapshotError(error: unknown, operation: string): boolean { + if (signal.aborted) { + debug(`${logPrefix}Ignoring ${operation} error during cleanup: %o`, error) + return true + } + debug(`${logPrefix}Error in ${operation}: %o`, error) + return false + } const loadSubset = async (opts: LoadSubsetOptions) => { - // In progressive mode, use fetchSnapshot during snapshot phase if (isBufferingInitialSync()) { - // Progressive mode snapshot phase: fetch and apply immediately const snapshotParams = compileSQL(opts, compileOptions) try { const { data: rows } = await stream.fetchSnapshot(snapshotParams) - // Check again if we're still buffering - we might have received up-to-date - // and completed the atomic swap while waiting for the snapshot if (!isBufferingInitialSync()) { - debug( - `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`, - ) + debug(`${logPrefix}Ignoring snapshot - sync completed while fetching`) return } - // Apply snapshot data in a sync transaction (only if we have data) if (rows.length > 0) { begin() for (const row of rows) { write({ type: `insert`, value: row.value, - metadata: { - ...row.headers, - }, + metadata: { ...row.headers }, }) } commit() - - debug( - `${collectionId ? `[${collectionId}] ` : ``}Applied snapshot with ${rows.length} rows`, - ) + debug(`${logPrefix}Applied snapshot with ${rows.length} rows`) } } catch (error) { - // If the stream has been aborted (during cleanup), ignore the error. - // This prevents unhandled promise rejections when the collection is - // cleaned up while fetchSnapshot calls are still in-flight. - if (signal.aborted) { - debug( - `${collectionId ? `[${collectionId}] ` : ``}Ignoring fetchSnapshot error during cleanup: %o`, - error, - ) + if (handleSnapshotError(error, `fetchSnapshot`)) { return } - debug( - `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, - error, - ) throw error } - } else if (syncMode === `progressive`) { - // Progressive mode after full sync complete: no need to load more return - } else { - // On-demand mode: use requestSnapshot - // When cursor is provided, make two calls: - // 1. whereCurrent (all ties, no limit) - // 2. whereFrom (rows > cursor, with limit) - const { cursor, where, orderBy, limit } = opts - - try { - if (cursor) { - // Make parallel requests for cursor-based pagination - const promises: Array> = [] - - // Request 1: All rows matching whereCurrent (ties at boundary, no limit) - // Combine main where with cursor.whereCurrent - const whereCurrentOpts: LoadSubsetOptions = { - where: where - ? and(where, cursor.whereCurrent) - : cursor.whereCurrent, - orderBy, - // No limit - get all ties - } - const whereCurrentParams = compileSQL( - whereCurrentOpts, - compileOptions, - ) - promises.push(stream.requestSnapshot(whereCurrentParams)) - - debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereCurrent snapshot (all ties)`, - ) + } - // Request 2: Rows matching whereFrom (rows > cursor, with limit) - // Combine main where with cursor.whereFrom - const whereFromOpts: LoadSubsetOptions = { - where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, - orderBy, - limit, - } - const whereFromParams = compileSQL(whereFromOpts, compileOptions) - promises.push(stream.requestSnapshot(whereFromParams)) + if (syncMode === `progressive`) { + return + } - debug( - `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor.whereFrom snapshot (with limit ${limit})`, - ) + const { cursor, where, orderBy, limit } = opts - // Wait for both requests to complete - await Promise.all(promises) - } else { - // No cursor - standard single request - const snapshotParams = compileSQL(opts, compileOptions) - await stream.requestSnapshot(snapshotParams) + try { + if (cursor) { + const whereCurrentOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereCurrent) : cursor.whereCurrent, + orderBy, } - } catch (error) { - // If the stream has been aborted (during cleanup), ignore the error. - // This prevents unhandled promise rejections when the collection is - // cleaned up while requestSnapshot calls are still in-flight. - // The 409 "must-refetch" errors are expected during cleanup and - // don't indicate a real problem. - if (signal.aborted) { - debug( - `${collectionId ? `[${collectionId}] ` : ``}Ignoring requestSnapshot error during cleanup: %o`, - error, - ) - return + const whereCurrentParams = compileSQL( + whereCurrentOpts, + compileOptions, + ) + + const whereFromOpts: LoadSubsetOptions = { + where: where ? and(where, cursor.whereFrom) : cursor.whereFrom, + orderBy, + limit, } - // Re-throw non-abort errors - throw error + const whereFromParams = compileSQL(whereFromOpts, compileOptions) + + debug(`${logPrefix}Requesting cursor.whereCurrent snapshot (all ties)`) + debug( + `${logPrefix}Requesting cursor.whereFrom snapshot (with limit ${limit})`, + ) + + await Promise.all([ + stream.requestSnapshot(whereCurrentParams), + stream.requestSnapshot(whereFromParams), + ]) + } else { + const snapshotParams = compileSQL(opts, compileOptions) + await stream.requestSnapshot(snapshotParams) + } + } catch (error) { + if (handleSnapshotError(error, `requestSnapshot`)) { + return } + throw error } }