From 0d9667635314d9b43e1ac24ebbd3dfcd70a17a7f Mon Sep 17 00:00:00 2001 From: Igor Barakaiev Date: Mon, 15 Dec 2025 23:56:23 -0800 Subject: [PATCH 1/8] fix(db): re-request subsets after truncate for on-demand sync mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a must-refetch (409) occurs in on-demand sync mode, the collection receives a truncate which clears all data and resets the loadSubset deduplication state. However, subscriptions were not re-requesting their previously loaded subsets, leaving the collection empty. This fix adds a truncate event listener to CollectionSubscription that: 1. Resets pagination/snapshot tracking state (but NOT sentKeys) 2. Re-requests all previously loaded subsets We intentionally keep sentKeys intact because the truncate event is emitted BEFORE delete events are sent to subscribers. If we cleared sentKeys, delete events would be filtered by filterAndFlipChanges. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- packages/db/src/collection/subscription.ts | 48 ++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 44981d460..a7acced3f 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -79,6 +79,9 @@ export class CollectionSubscription private _status: SubscriptionStatus = `ready` private pendingLoadSubsetPromises: Set> = new Set() + // Cleanup function for truncate event listener + private truncateCleanup: (() => void) | undefined + public get status(): SubscriptionStatus { return this._status } @@ -111,6 +114,47 @@ export class CollectionSubscription this.filteredCallback = options.whereExpression ? createFilteredCallback(this.callback, options) : this.callback + + // Listen for truncate events to re-request data after must-refetch + // When a truncate happens (e.g., from a 409 must-refetch), all collection data is cleared. + // We need to re-request all previously loaded subsets to repopulate the data. + this.truncateCleanup = this.collection.on(`truncate`, () => { + this.handleTruncate() + }) + } + + /** + * Handle collection truncate event by resetting state and re-requesting subsets. + * This is called when the sync layer receives a must-refetch and clears all data. + * + * IMPORTANT: We intentionally do NOT clear sentKeys here. The truncate event is emitted + * BEFORE delete events are sent to subscribers. If we cleared sentKeys, the delete events + * would be filtered out by filterAndFlipChanges (which skips deletes for keys not in sentKeys). + * By keeping sentKeys intact, delete events pass through, and when new data arrives, + * inserts will still be emitted correctly (the type is already 'insert' so no conversion needed). + */ + private handleTruncate() { + // Reset snapshot/pagination tracking state but NOT sentKeys + // sentKeys must remain so delete events can pass through filterAndFlipChanges + this.snapshotSent = false + this.loadedInitialState = false + this.limitedSnapshotRowCount = 0 + this.lastSentKey = undefined + + // Copy the loaded subsets before clearing (we'll re-request them) + const subsetsToReload = [...this.loadedSubsets] + + // Clear the loadedSubsets array since we're re-requesting fresh + this.loadedSubsets = [] + + // Re-request all previously loaded subsets + for (const options of subsetsToReload) { + const syncResult = this.collection._sync.loadSubset(options) + + // Track this loadSubset call so we can unload it later + this.loadedSubsets.push(options) + this.trackLoadSubsetPromise(syncResult) + } } setOrderByIndex(index: IndexInterface) { @@ -479,6 +523,10 @@ export class CollectionSubscription } unsubscribe() { + // Clean up truncate event listener + this.truncateCleanup?.() + this.truncateCleanup = undefined + // Unload all subsets that this subscription loaded // We pass the exact same LoadSubsetOptions we used for loadSubset for (const options of this.loadedSubsets) { From 27c01d9ab012b0a619e34896015faf03d394f6e6 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 18 Dec 2025 10:02:27 +0000 Subject: [PATCH 2/8] feat: Buffer subscription changes during truncate This change buffers subscription changes during a truncate event until all loadSubset refetches complete. This prevents a flash of missing content between deletes and new inserts. Co-authored-by: sam.willis --- packages/db/src/collection/subscription.ts | 91 ++++++- packages/db/tests/collection-truncate.test.ts | 236 +++++++++++++++++- 2 files changed, 321 insertions(+), 6 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index a7acced3f..eab19114f 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -82,6 +82,13 @@ export class CollectionSubscription // Cleanup function for truncate event listener private truncateCleanup: (() => void) | undefined + // Truncate buffering state + // When a truncate occurs, we buffer changes until all loadSubset refetches complete + // This prevents a flash of missing content between deletes and new inserts + private isBufferingForTruncate = false + private truncateBuffer: Array>> = [] + private pendingTruncateRefetches: Set> = new Set() + public get status(): SubscriptionStatus { return this._status } @@ -127,6 +134,9 @@ export class CollectionSubscription * Handle collection truncate event by resetting state and re-requesting subsets. * This is called when the sync layer receives a must-refetch and clears all data. * + * To prevent a flash of missing content, we buffer all changes (deletes from truncate + * and inserts from refetch) until all loadSubset promises resolve, then emit them together. + * * IMPORTANT: We intentionally do NOT clear sentKeys here. The truncate event is emitted * BEFORE delete events are sent to subscribers. If we cleared sentKeys, the delete events * would be filtered out by filterAndFlipChanges (which skips deletes for keys not in sentKeys). @@ -134,6 +144,25 @@ export class CollectionSubscription * inserts will still be emitted correctly (the type is already 'insert' so no conversion needed). */ private handleTruncate() { + // Copy the loaded subsets before clearing (we'll re-request them) + const subsetsToReload = [...this.loadedSubsets] + + // If there are no subsets to reload, no need to buffer - just reset state + if (subsetsToReload.length === 0) { + this.snapshotSent = false + this.loadedInitialState = false + this.limitedSnapshotRowCount = 0 + this.lastSentKey = undefined + this.loadedSubsets = [] + return + } + + // Start buffering BEFORE we receive the delete events from the truncate commit + // This ensures we capture both the deletes and subsequent inserts + this.isBufferingForTruncate = true + this.truncateBuffer = [] + this.pendingTruncateRefetches.clear() + // Reset snapshot/pagination tracking state but NOT sentKeys // sentKeys must remain so delete events can pass through filterAndFlipChanges this.snapshotSent = false @@ -141,22 +170,60 @@ export class CollectionSubscription this.limitedSnapshotRowCount = 0 this.lastSentKey = undefined - // Copy the loaded subsets before clearing (we'll re-request them) - const subsetsToReload = [...this.loadedSubsets] - // Clear the loadedSubsets array since we're re-requesting fresh this.loadedSubsets = [] - // Re-request all previously loaded subsets + // Re-request all previously loaded subsets and track their promises for (const options of subsetsToReload) { const syncResult = this.collection._sync.loadSubset(options) // Track this loadSubset call so we can unload it later this.loadedSubsets.push(options) this.trackLoadSubsetPromise(syncResult) + + // Track the promise for buffer flushing + if (syncResult instanceof Promise) { + this.pendingTruncateRefetches.add(syncResult) + syncResult + .catch(() => { + // Ignore errors - we still want to flush the buffer even if some requests fail + }) + .finally(() => { + this.pendingTruncateRefetches.delete(syncResult) + this.checkTruncateRefetchComplete() + }) + } + } + + // If all loadSubset calls were synchronous (returned true), flush immediately + if (this.pendingTruncateRefetches.size === 0) { + this.flushTruncateBuffer() } } + /** + * Check if all truncate refetch promises have completed and flush buffer if so + */ + private checkTruncateRefetchComplete() { + if (this.pendingTruncateRefetches.size === 0 && this.isBufferingForTruncate) { + this.flushTruncateBuffer() + } + } + + /** + * Flush the truncate buffer, emitting all buffered changes to the callback + */ + private flushTruncateBuffer() { + this.isBufferingForTruncate = false + + // Emit all buffered changes in order + for (const changes of this.truncateBuffer) { + this.filteredCallback(changes) + } + + this.truncateBuffer = [] + } + setOrderByIndex(index: IndexInterface) { this.orderByIndex = index } @@ -218,7 +285,16 @@ export class CollectionSubscription emitEvents(changes: Array>) { const newChanges = this.filterAndFlipChanges(changes) - this.filteredCallback(newChanges) + + if (this.isBufferingForTruncate) { + // Buffer the changes instead of emitting immediately + // This prevents a flash of missing content during truncate/refetch + if (newChanges.length > 0) { + this.truncateBuffer.push(newChanges) + } + } else { + this.filteredCallback(newChanges) + } } /** @@ -527,6 +603,11 @@ export class CollectionSubscription this.truncateCleanup?.() this.truncateCleanup = undefined + // Clean up truncate buffer state + this.isBufferingForTruncate = false + this.truncateBuffer = [] + this.pendingTruncateRefetches.clear() + // Unload all subsets that this subscription loaded // We pass the exact same LoadSubsetOptions we used for loadSubset for (const options of this.loadedSubsets) { diff --git a/packages/db/tests/collection-truncate.test.ts b/packages/db/tests/collection-truncate.test.ts index 8e4d2d124..79a42d9a0 100644 --- a/packages/db/tests/collection-truncate.test.ts +++ b/packages/db/tests/collection-truncate.test.ts @@ -1,6 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createCollection } from '../src/collection/index.js' -import type { SyncConfig } from '../src/types' +import type { LoadSubsetOptions, SyncConfig } from '../src/types' describe(`Collection truncate operations`, () => { beforeEach(() => { @@ -706,4 +706,238 @@ describe(`Collection truncate operations`, () => { expect(collection.state.has(2)).toBe(true) expect(collection.state.get(2)).toEqual({ id: 2, value: `optimistic` }) }) + + it(`should buffer subscription changes during truncate until loadSubset refetch completes`, async () => { + // This test verifies that when a truncate occurs: + // 1. The subscription buffers delete events from the truncate + // 2. The subscription re-requests its previously loaded subsets + // 3. Insert events from the refetch are also buffered + // 4. All buffered events are emitted together when loadSubset completes + // This prevents a flash of missing content during must-refetch scenarios + + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + let loadSubsetResolver: (() => void) | undefined + let loadSubsetCallCount = 0 + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-buffering-test`, + getKey: (item) => item.id, + startSync: true, + syncMode: `on-demand`, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + + return { + loadSubset: (_options: LoadSubsetOptions) => { + loadSubsetCallCount++ + + // Return a promise that we control + return new Promise((resolve) => { + loadSubsetResolver = () => { + // When resolved, write some data + cfg.begin() + cfg.write({ + type: `insert`, + value: { id: 1, value: `refetched-1` }, + }) + cfg.write({ + type: `insert`, + value: { id: 2, value: `refetched-2` }, + }) + cfg.commit() + resolve() + } + }) + }, + } + }, + }, + }) + + await collection.stateWhenReady() + + // Subscribe with includeInitialState to trigger loadSubset + const subscription = collection.subscribeChanges( + (changes) => { + changeEvents.push(...changes) + }, + { + includeInitialState: true, + }, + ) + + // Initially empty, waiting for loadSubset + expect(changeEvents.length).toBe(0) + expect(loadSubsetCallCount).toBe(1) + + // Resolve the first loadSubset to get initial data + loadSubsetResolver!() + await vi.waitFor(() => expect(changeEvents.length).toBe(2)) + + // Verify initial data arrived + expect(changeEvents).toEqual([ + { type: `insert`, key: 1, value: { id: 1, value: `refetched-1` } }, + { type: `insert`, key: 2, value: { id: 2, value: `refetched-2` } }, + ]) + + // Clear events for next phase + changeEvents.length = 0 + loadSubsetCallCount = 0 + + // Now trigger a truncate (simulating must-refetch) + syncOps!.begin() + syncOps!.truncate() + syncOps!.commit() + + // The truncate event should trigger the subscription to buffer and re-request + // loadSubset should be called again + await vi.waitFor(() => expect(loadSubsetCallCount).toBe(1)) + + // Changes should be buffered - subscriber should NOT see deletes yet + // (The subscription is buffering until loadSubset completes) + expect(changeEvents.length).toBe(0) + + // Now resolve the loadSubset - this should flush the buffer + loadSubsetResolver!() + + // Wait for buffered events to be flushed + await vi.waitFor(() => expect(changeEvents.length).toBeGreaterThan(0)) + + // Verify we got all events in one batch (deletes + inserts) + // The subscription should have received: + // - Delete events for the old data (from truncate) + // - Insert events for the new data (from refetch) + const deletes = changeEvents.filter((e) => e.type === `delete`) + const inserts = changeEvents.filter((e) => e.type === `insert`) + + expect(deletes.length).toBe(2) // Deleted the old items + expect(inserts.length).toBe(2) // Inserted the refetched items + + // Verify final state is correct + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ id: 1, value: `refetched-1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `refetched-2` }) + + subscription.unsubscribe() + }) + + it(`should not buffer changes when there are no subsets to reload`, async () => { + // When a subscription has no previously loaded subsets, + // it should not enter buffering mode during truncate + + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-no-buffer-test`, + getKey: (item) => item.id, + startSync: true, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.begin() + cfg.write({ type: `insert`, value: { id: 1, value: `initial` } }) + cfg.commit() + cfg.markReady() + }, + }, + }) + + await collection.stateWhenReady() + + // Subscribe WITHOUT includeInitialState - no loadSubset is called + collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + // No initial events since includeInitialState is false + expect(changeEvents.length).toBe(0) + + // Trigger truncate + syncOps!.begin() + syncOps!.truncate() + syncOps!.write({ type: `insert`, value: { id: 1, value: `after-truncate` } }) + syncOps!.commit() + + // Since there were no loaded subsets, changes should be emitted immediately + // (no buffering because there's nothing to refetch) + expect(changeEvents.length).toBeGreaterThan(0) + }) + + it(`should handle unsubscribe during truncate buffering`, async () => { + // Verifies that unsubscribing while buffering cleans up properly + + let syncOps: + | Parameters[`sync`]>[0] + | undefined + let loadSubsetResolver: (() => void) | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-unsubscribe-during-buffer`, + getKey: (item) => item.id, + startSync: true, + syncMode: `on-demand`, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + + return { + loadSubset: (_options: LoadSubsetOptions) => { + return new Promise((resolve) => { + loadSubsetResolver = () => { + cfg.begin() + cfg.write({ + type: `insert`, + value: { id: 1, value: `data` }, + }) + cfg.commit() + resolve() + } + }) + }, + } + }, + }, + }) + + await collection.stateWhenReady() + + const changeEvents: Array = [] + const subscription = collection.subscribeChanges( + (changes) => { + changeEvents.push(...changes) + }, + { includeInitialState: true }, + ) + + // Resolve initial load + loadSubsetResolver!() + await vi.waitFor(() => expect(changeEvents.length).toBe(1)) + changeEvents.length = 0 + + // Trigger truncate + syncOps!.begin() + syncOps!.truncate() + syncOps!.commit() + + // Unsubscribe before loadSubset completes + subscription.unsubscribe() + + // Resolve the loadSubset - should not cause errors + loadSubsetResolver!() + + // Give it time to process + await vi.advanceTimersByTimeAsync(10) + + // No additional events should have been emitted to the unsubscribed callback + expect(changeEvents.length).toBe(0) + }) }) From 3427ef339007081ccce96dc0e74e0fd5ca082eb9 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 10:03:37 +0000 Subject: [PATCH 3/8] ci: apply automated fixes --- packages/db/src/collection/subscription.ts | 5 ++++- packages/db/tests/collection-truncate.test.ts | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index eab19114f..604fd5587 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -205,7 +205,10 @@ export class CollectionSubscription * Check if all truncate refetch promises have completed and flush buffer if so */ private checkTruncateRefetchComplete() { - if (this.pendingTruncateRefetches.size === 0 && this.isBufferingForTruncate) { + if ( + this.pendingTruncateRefetches.size === 0 && + this.isBufferingForTruncate + ) { this.flushTruncateBuffer() } } diff --git a/packages/db/tests/collection-truncate.test.ts b/packages/db/tests/collection-truncate.test.ts index 79a42d9a0..a8841bae6 100644 --- a/packages/db/tests/collection-truncate.test.ts +++ b/packages/db/tests/collection-truncate.test.ts @@ -863,7 +863,10 @@ describe(`Collection truncate operations`, () => { // Trigger truncate syncOps!.begin() syncOps!.truncate() - syncOps!.write({ type: `insert`, value: { id: 1, value: `after-truncate` } }) + syncOps!.write({ + type: `insert`, + value: { id: 1, value: `after-truncate` }, + }) syncOps!.commit() // Since there were no loaded subsets, changes should be emitted immediately From c9ceeecd953a344593a34dbbe778c430043da02b Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 18 Dec 2025 13:19:45 +0000 Subject: [PATCH 4/8] changeset --- .changeset/clever-dodos-rest.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/clever-dodos-rest.md diff --git a/.changeset/clever-dodos-rest.md b/.changeset/clever-dodos-rest.md new file mode 100644 index 000000000..882ced143 --- /dev/null +++ b/.changeset/clever-dodos-rest.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': patch +--- + +Fix subscriptions not re-requesting data after truncate in on-demand sync mode. When a must-refetch occurs, subscriptions now buffer changes and re-request their previously loaded subsets, preventing a flash of missing content. From a91b8cf6815efc7202c3234d96f616c3d3da0d60 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 18 Dec 2025 14:17:44 +0000 Subject: [PATCH 5/8] tweaks --- .changeset/clever-dodos-rest.md | 2 +- packages/db/src/collection/subscription.ts | 26 +++-- packages/db/tests/collection-truncate.test.ts | 100 ++++++++++++++++++ 3 files changed, 117 insertions(+), 11 deletions(-) diff --git a/.changeset/clever-dodos-rest.md b/.changeset/clever-dodos-rest.md index 882ced143..23e71cbce 100644 --- a/.changeset/clever-dodos-rest.md +++ b/.changeset/clever-dodos-rest.md @@ -2,4 +2,4 @@ '@tanstack/db': patch --- -Fix subscriptions not re-requesting data after truncate in on-demand sync mode. When a must-refetch occurs, subscriptions now buffer changes and re-request their previously loaded subsets, preventing a flash of missing content. +Fix subscriptions not re-requesting data after truncate in on-demand sync mode. When a must-refetch occurs, subscriptions now buffer changes and re-request their previously loaded subsets, preventing a flash of missing content. Also fixes delete events being incorrectly filtered out when `loadedInitialState` was true (which causes `sentKeys` to be empty) by skipping the delete filter during truncate buffering. diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 604fd5587..02429c3c9 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -136,12 +136,6 @@ export class CollectionSubscription * * To prevent a flash of missing content, we buffer all changes (deletes from truncate * and inserts from refetch) until all loadSubset promises resolve, then emit them together. - * - * IMPORTANT: We intentionally do NOT clear sentKeys here. The truncate event is emitted - * BEFORE delete events are sent to subscribers. If we cleared sentKeys, the delete events - * would be filtered out by filterAndFlipChanges (which skips deletes for keys not in sentKeys). - * By keeping sentKeys intact, delete events pass through, and when new data arrives, - * inserts will still be emitted correctly (the type is already 'insert' so no conversion needed). */ private handleTruncate() { // Copy the loaded subsets before clearing (we'll re-request them) @@ -163,8 +157,9 @@ export class CollectionSubscription this.truncateBuffer = [] this.pendingTruncateRefetches.clear() - // Reset snapshot/pagination tracking state but NOT sentKeys - // sentKeys must remain so delete events can pass through filterAndFlipChanges + // Reset snapshot/pagination tracking state + // Note: We don't need to populate sentKeys here because filterAndFlipChanges + // will skip the delete filter when isBufferingForTruncate is true this.snapshotSent = false this.loadedInitialState = false this.limitedSnapshotRowCount = 0 @@ -572,6 +567,14 @@ export class CollectionSubscription return changes } + // When buffering for truncate, we need all changes (including deletes) to pass through. + // This is important because: + // 1. If loadedInitialState was previously true, sentKeys will be empty + // (trackSentKeys early-returns when loadedInitialState is true) + // 2. The truncate deletes are for keys that WERE sent to the subscriber + // 3. We're collecting all changes atomically, so filtering doesn't make sense + const skipDeleteFilter = this.isBufferingForTruncate + const newChanges = [] for (const change of changes) { let newChange = change @@ -579,8 +582,11 @@ export class CollectionSubscription if (change.type === `update`) { newChange = { ...change, type: `insert`, previousValue: undefined } } else if (change.type === `delete`) { - // filter out deletes for keys that have not been sent - continue + // Filter out deletes for keys that have not been sent, + // UNLESS we're buffering for truncate (where all deletes should pass through) + if (!skipDeleteFilter) { + continue + } } this.sentKeys.add(change.key) } diff --git a/packages/db/tests/collection-truncate.test.ts b/packages/db/tests/collection-truncate.test.ts index a8841bae6..8aab07ce4 100644 --- a/packages/db/tests/collection-truncate.test.ts +++ b/packages/db/tests/collection-truncate.test.ts @@ -943,4 +943,104 @@ describe(`Collection truncate operations`, () => { // No additional events should have been emitted to the unsubscribed callback expect(changeEvents.length).toBe(0) }) + + it(`should emit delete events after truncate when loadedInitialState was true`, async () => { + // This test specifically validates the edge case where: + // 1. requestSnapshot() is called without options (sets loadedInitialState = true) + // 2. When loadedInitialState is true, trackSentKeys early-returns, leaving sentKeys empty + // 3. On truncate, we must populate sentKeys BEFORE setting loadedInitialState = false + // Otherwise, delete events would be filtered out by filterAndFlipChanges + // + // This is the scenario the reviewer identified where sentKeys could be empty. + + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + let loadSubsetResolver: (() => void) | undefined + let loadSubsetCallCount = 0 + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-loadedInitialState-test`, + getKey: (item) => item.id, + startSync: true, + syncMode: `on-demand`, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + + return { + loadSubset: (_options: LoadSubsetOptions) => { + loadSubsetCallCount++ + + return new Promise((resolve) => { + loadSubsetResolver = () => { + cfg.begin() + cfg.write({ + type: `insert`, + value: { id: 1, value: `item-1` }, + }) + cfg.write({ + type: `insert`, + value: { id: 2, value: `item-2` }, + }) + cfg.commit() + resolve() + } + }) + }, + } + }, + }, + }) + + await collection.stateWhenReady() + + // Create subscription WITHOUT includeInitialState + // Then manually call requestSnapshot() without options to set loadedInitialState = true + const subscription = collection.subscribeChanges((changes) => { + changeEvents.push(...changes) + }) + + // Manually trigger requestSnapshot() without options + // This sets loadedInitialState = true and sentKeys stays empty + // (This mimics the lazy join fallback path in joins.ts line 310) + subscription.requestSnapshot() + expect(loadSubsetCallCount).toBe(1) + + // Resolve the loadSubset promise + loadSubsetResolver!() + await vi.waitFor(() => expect(changeEvents.length).toBe(2)) + + // Verify initial data arrived + expect(changeEvents[0]).toMatchObject({ type: `insert`, key: 1 }) + expect(changeEvents[1]).toMatchObject({ type: `insert`, key: 2 }) + + changeEvents.length = 0 + const previousLoadSubsetCount = loadSubsetCallCount + + // Now trigger a truncate + syncOps!.begin() + syncOps!.truncate() + syncOps!.commit() + + // Wait for loadSubset to be called again + await vi.waitFor(() => + expect(loadSubsetCallCount).toBeGreaterThan(previousLoadSubsetCount), + ) + + // Resolve the new loadSubset promise + loadSubsetResolver!() + + // Wait for events to be emitted + await vi.waitFor(() => expect(changeEvents.length).toBeGreaterThan(0)) + + // The key assertion: we should have received delete events + // Without the fix, sentKeys would be empty and deletes would be filtered out + const deletes = changeEvents.filter((e) => e.type === `delete`) + expect(deletes.length).toBe(2) // Must have delete events! + + subscription.unsubscribe() + }) }) From d88e3ac277f32d96db4f2a5fb55769d3f2fac995 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 18 Dec 2025 19:02:07 +0000 Subject: [PATCH 6/8] test --- packages/db/src/collection/subscription.ts | 7 +- packages/db/tests/collection-truncate.test.ts | 81 +++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 02429c3c9..3023638c9 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -190,7 +190,12 @@ export class CollectionSubscription } } - // If all loadSubset calls were synchronous (returned true), flush immediately + // If all loadSubset calls were synchronous (returned true), flush immediately. + // Note: This may result in insert events arriving before delete events if the sync + // loadSubset triggers a nested commit. This is acceptable because: + // 1. The final collection state is always correct + // 2. UI frameworks like React derive state from collection.state, not incremental events + // 3. For incremental event processing, the events are still all present (just out of order) if (this.pendingTruncateRefetches.size === 0) { this.flushTruncateBuffer() } diff --git a/packages/db/tests/collection-truncate.test.ts b/packages/db/tests/collection-truncate.test.ts index 8aab07ce4..6c16cbf65 100644 --- a/packages/db/tests/collection-truncate.test.ts +++ b/packages/db/tests/collection-truncate.test.ts @@ -1043,4 +1043,85 @@ describe(`Collection truncate operations`, () => { subscription.unsubscribe() }) + + it(`should buffer truncate deletes even when loadSubset is synchronous`, async () => { + // This test verifies that even when loadSubset returns synchronously (true), + // we still capture the truncate delete events in the buffer. + // The truncate event is emitted BEFORE delete events are sent, so if we + // flush immediately, we'd miss the deletes. + + const changeEvents: Array = [] + let syncOps: + | Parameters[`sync`]>[0] + | undefined + + const collection = createCollection<{ id: number; value: string }, number>({ + id: `truncate-sync-loadSubset-test`, + getKey: (item) => item.id, + startSync: true, + syncMode: `on-demand`, + sync: { + sync: (cfg) => { + syncOps = cfg + cfg.markReady() + + return { + // loadSubset returns true (synchronous) - data already available + loadSubset: (_options: LoadSubsetOptions) => { + // Synchronously write data + cfg.begin() + cfg.write({ + type: `insert`, + value: { id: 1, value: `sync-item-1` }, + }) + cfg.write({ + type: `insert`, + value: { id: 2, value: `sync-item-2` }, + }) + cfg.commit() + return true // Synchronous return + }, + } + }, + }, + }) + + await collection.stateWhenReady() + + // Subscribe with includeInitialState to trigger loadSubset + const subscription = collection.subscribeChanges( + (changes) => { + changeEvents.push(...changes) + }, + { includeInitialState: true }, + ) + + // Wait for initial data + await vi.waitFor(() => expect(changeEvents.length).toBe(2)) + expect(collection.state.size).toBe(2) + + changeEvents.length = 0 + + // Trigger truncate - loadSubset will return synchronously + syncOps!.begin() + syncOps!.truncate() + syncOps!.commit() + + // Wait for events to settle + await vi.advanceTimersByTimeAsync(10) + + // We should have received delete events even though loadSubset was sync + const deletes = changeEvents.filter((e) => e.type === `delete`) + const inserts = changeEvents.filter((e) => e.type === `insert`) + + expect(deletes.length).toBe(2) // Should have 2 deletes + expect(inserts.length).toBe(2) // Should have 2 inserts + + // Verify collection state is correct + expect(collection.state.size).toBe(2) + expect(collection.state.get(1)).toEqual({ id: 1, value: `sync-item-1` }) + expect(collection.state.get(2)).toEqual({ id: 2, value: `sync-item-2` }) + + subscription.unsubscribe() + }) }) From 78259248b6ab66e4dd2d22721b9cfeac32341661 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Thu, 18 Dec 2025 19:22:45 +0000 Subject: [PATCH 7/8] address review --- .changeset/clever-dodos-rest.md | 8 +- packages/db/src/collection/subscription.ts | 79 +++++++++++-------- packages/db/tests/collection-truncate.test.ts | 6 ++ 3 files changed, 59 insertions(+), 34 deletions(-) diff --git a/.changeset/clever-dodos-rest.md b/.changeset/clever-dodos-rest.md index 23e71cbce..04e88f559 100644 --- a/.changeset/clever-dodos-rest.md +++ b/.changeset/clever-dodos-rest.md @@ -2,4 +2,10 @@ '@tanstack/db': patch --- -Fix subscriptions not re-requesting data after truncate in on-demand sync mode. When a must-refetch occurs, subscriptions now buffer changes and re-request their previously loaded subsets, preventing a flash of missing content. Also fixes delete events being incorrectly filtered out when `loadedInitialState` was true (which causes `sentKeys` to be empty) by skipping the delete filter during truncate buffering. +Fix subscriptions not re-requesting data after truncate in on-demand sync mode. When a must-refetch occurs, subscriptions now buffer changes and re-request their previously loaded subsets, preventing a flash of missing content. + +Key improvements: +- Buffer changes atomically: deletes and inserts are emitted together in a single callback +- Correct event ordering: defers loadSubset calls to a microtask so truncate deletes are buffered before refetch inserts +- Gated on on-demand mode: only buffers when there's an actual loadSubset handler +- Fixes delete filter edge case: skips delete filter during truncate buffering when `sentKeys` is empty diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 3023638c9..70f791cb0 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -141,8 +141,13 @@ export class CollectionSubscription // Copy the loaded subsets before clearing (we'll re-request them) const subsetsToReload = [...this.loadedSubsets] - // If there are no subsets to reload, no need to buffer - just reset state - if (subsetsToReload.length === 0) { + // Only buffer if there's an actual loadSubset handler that can do async work. + // Without a loadSubset handler, there's nothing to re-request and no reason to buffer. + // This prevents unnecessary buffering in eager sync mode or when loadSubset isn't implemented. + const hasLoadSubsetHandler = this.collection._sync.syncLoadSubsetFn !== null + + // If there are no subsets to reload OR no loadSubset handler, just reset state + if (subsetsToReload.length === 0 || !hasLoadSubsetHandler) { this.snapshotSent = false this.loadedInitialState = false this.limitedSnapshotRowCount = 0 @@ -168,37 +173,43 @@ export class CollectionSubscription // Clear the loadedSubsets array since we're re-requesting fresh this.loadedSubsets = [] - // Re-request all previously loaded subsets and track their promises - for (const options of subsetsToReload) { - const syncResult = this.collection._sync.loadSubset(options) - - // Track this loadSubset call so we can unload it later - this.loadedSubsets.push(options) - this.trackLoadSubsetPromise(syncResult) + // Defer the loadSubset calls to a microtask so the truncate commit's delete events + // are buffered BEFORE the loadSubset calls potentially trigger nested commits. + // This ensures correct event ordering: deletes first, then inserts. + queueMicrotask(() => { + // Check if we were unsubscribed while waiting + if (!this.isBufferingForTruncate) { + return + } - // Track the promise for buffer flushing - if (syncResult instanceof Promise) { - this.pendingTruncateRefetches.add(syncResult) - syncResult - .catch(() => { - // Ignore errors - we still want to flush the buffer even if some requests fail - }) - .finally(() => { - this.pendingTruncateRefetches.delete(syncResult) - this.checkTruncateRefetchComplete() - }) + // Re-request all previously loaded subsets and track their promises + for (const options of subsetsToReload) { + const syncResult = this.collection._sync.loadSubset(options) + + // Track this loadSubset call so we can unload it later + this.loadedSubsets.push(options) + this.trackLoadSubsetPromise(syncResult) + + // Track the promise for buffer flushing + if (syncResult instanceof Promise) { + this.pendingTruncateRefetches.add(syncResult) + syncResult + .catch(() => { + // Ignore errors - we still want to flush the buffer even if some requests fail + }) + .finally(() => { + this.pendingTruncateRefetches.delete(syncResult) + this.checkTruncateRefetchComplete() + }) + } } - } - // If all loadSubset calls were synchronous (returned true), flush immediately. - // Note: This may result in insert events arriving before delete events if the sync - // loadSubset triggers a nested commit. This is acceptable because: - // 1. The final collection state is always correct - // 2. UI frameworks like React derive state from collection.state, not incremental events - // 3. For incremental event processing, the events are still all present (just out of order) - if (this.pendingTruncateRefetches.size === 0) { - this.flushTruncateBuffer() - } + // If all loadSubset calls were synchronous (returned true), flush now + // At this point, delete events have already been buffered from the truncate commit + if (this.pendingTruncateRefetches.size === 0) { + this.flushTruncateBuffer() + } + }) } /** @@ -219,9 +230,11 @@ export class CollectionSubscription private flushTruncateBuffer() { this.isBufferingForTruncate = false - // Emit all buffered changes in order - for (const changes of this.truncateBuffer) { - this.filteredCallback(changes) + // Flatten all buffered changes into a single array for atomic emission + // This ensures consumers see all truncate changes (deletes + inserts) in one callback + const merged = this.truncateBuffer.flat() + if (merged.length > 0) { + this.filteredCallback(merged) } this.truncateBuffer = [] diff --git a/packages/db/tests/collection-truncate.test.ts b/packages/db/tests/collection-truncate.test.ts index 6c16cbf65..1e12b0416 100644 --- a/packages/db/tests/collection-truncate.test.ts +++ b/packages/db/tests/collection-truncate.test.ts @@ -1117,6 +1117,12 @@ describe(`Collection truncate operations`, () => { expect(deletes.length).toBe(2) // Should have 2 deletes expect(inserts.length).toBe(2) // Should have 2 inserts + // Verify correct ordering: deletes should come before inserts + // (truncate clears old data, then refetch adds new data) + const firstDeleteIdx = changeEvents.findIndex((e) => e.type === `delete`) + const firstInsertIdx = changeEvents.findIndex((e) => e.type === `insert`) + expect(firstDeleteIdx).toBeLessThan(firstInsertIdx) + // Verify collection state is correct expect(collection.state.size).toBe(2) expect(collection.state.get(1)).toEqual({ id: 1, value: `sync-item-1` }) From e285c9122a7a11ec528f31586f9cc8502e77e116 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 18 Dec 2025 19:23:53 +0000 Subject: [PATCH 8/8] ci: apply automated fixes --- .changeset/clever-dodos-rest.md | 1 + 1 file changed, 1 insertion(+) diff --git a/.changeset/clever-dodos-rest.md b/.changeset/clever-dodos-rest.md index 04e88f559..4bfb46cc5 100644 --- a/.changeset/clever-dodos-rest.md +++ b/.changeset/clever-dodos-rest.md @@ -5,6 +5,7 @@ Fix subscriptions not re-requesting data after truncate in on-demand sync mode. When a must-refetch occurs, subscriptions now buffer changes and re-request their previously loaded subsets, preventing a flash of missing content. Key improvements: + - Buffer changes atomically: deletes and inserts are emitted together in a single callback - Correct event ordering: defers loadSubset calls to a microtask so truncate deletes are buffered before refetch inserts - Gated on on-demand mode: only buffers when there's an actual loadSubset handler