From d725a0eb716d96df6948ac5fe0011231c0fe79bb Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Tue, 14 Oct 2025 11:00:30 +0100 Subject: [PATCH 1/5] Handle pushed down predicates in Electric collection Co-authored-by: Kevin De Porre Co-authored-by: Sam Willis --- .changeset/tender-carpets-cheat.md | 5 + .../electric-db-collection/src/electric.ts | 86 ++- .../src/pg-serializer.ts | 27 + .../src/sql-compiler.ts | 163 +++++ .../tests/electric-live-query.test.ts | 516 +++++++++++++ .../tests/electric.test.ts | 684 +++++++++++++++++- 6 files changed, 1462 insertions(+), 19 deletions(-) create mode 100644 .changeset/tender-carpets-cheat.md create mode 100644 packages/electric-db-collection/src/pg-serializer.ts create mode 100644 packages/electric-db-collection/src/sql-compiler.ts diff --git a/.changeset/tender-carpets-cheat.md b/.changeset/tender-carpets-cheat.md new file mode 100644 index 000000000..77c9dfd73 --- /dev/null +++ b/.changeset/tender-carpets-cheat.md @@ -0,0 +1,5 @@ +--- +"@tanstack/electric-db-collection": patch +--- + +Handle predicates that are pushed down. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index bdd6f34a7..28a7eb60e 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -12,12 +12,15 @@ import { TimeoutWaitingForMatchError, TimeoutWaitingForTxIdError, } from "./errors" +import { compileSQL } from "./sql-compiler" import type { BaseCollectionConfig, CollectionConfig, DeleteMutationFnParams, InsertMutationFnParams, + LoadSubsetOptions, SyncConfig, + SyncMode, UpdateMutationFnParams, UtilsRecord, } from "@tanstack/db" @@ -72,6 +75,24 @@ type InferSchemaOutput = T extends StandardSchemaV1 : Record : Record +/** + * The mode of sync to use for the collection. + * @default `eager` + * @description + * - `eager`: + * - syncs all data immediately on preload + * - collection will be marked as ready once the sync is complete + * - there is no incremental sync + * - `on-demand`: + * - syncs data in incremental snapshots when the collection is queried + * - collection will be marked as ready immediately after the first snapshot is synced + * - `progressive`: + * - syncs all data for the collection in the background + * - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries + * - collection will be marked as ready once the full sync is complete + */ +export type ElectricSyncMode = SyncMode | `progressive` + /** * Configuration interface for Electric collection options * @template T - The type of items in the collection @@ -82,12 +103,13 @@ export interface ElectricCollectionConfig< TSchema extends StandardSchemaV1 = never, > extends Omit< BaseCollectionConfig, - `onInsert` | `onUpdate` | `onDelete` + `onInsert` | `onUpdate` | `onDelete` | `syncMode` > { /** * Configuration options for the ElectricSQL ShapeStream */ shapeOptions: ShapeStreamOptions> + syncMode?: ElectricSyncMode /** * Optional asynchronous handler function called before an insert operation @@ -281,6 +303,9 @@ export function electricCollectionOptions( } { const seenTxids = new Store>(new Set([])) const seenSnapshots = new Store>([]) + const internalSyncMode = config.syncMode ?? `eager` + const finalSyncMode = + internalSyncMode === `progressive` ? `on-demand` : internalSyncMode const pendingMatches = new Store< Map< string, @@ -331,6 +356,7 @@ export function electricCollectionOptions( const sync = createElectricSync(config.shapeOptions, { seenTxids, seenSnapshots, + syncMode: internalSyncMode, pendingMatches, currentBatchMessages, removePendingMatches, @@ -550,6 +576,7 @@ export function electricCollectionOptions( return { ...restConfig, + syncMode: finalSyncMode, sync, onInsert: wrappedOnInsert, onUpdate: wrappedOnUpdate, @@ -567,6 +594,7 @@ export function electricCollectionOptions( function createElectricSync>( shapeOptions: ShapeStreamOptions>, options: { + syncMode: ElectricSyncMode seenTxids: Store> seenSnapshots: Store> pendingMatches: Store< @@ -590,6 +618,7 @@ function createElectricSync>( const { seenTxids, seenSnapshots, + syncMode, pendingMatches, currentBatchMessages, removePendingMatches, @@ -653,6 +682,12 @@ function createElectricSync>( const stream = new ShapeStream({ ...shapeOptions, + // In on-demand mode, we only want to sync changes, so we set the log to `changes_only` + log: syncMode === `on-demand` ? `changes_only` : undefined, + // In on-demand mode, we only need the changes from the point of time the collection was created + // so we default to `now` when there is no saved offset. + offset: + shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking @@ -679,9 +714,11 @@ function createElectricSync>( let transactionStarted = false const newTxids = new Set() const newSnapshots: Array = [] + let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode unsubscribeStream = stream.subscribe((messages: Array>) => { let hasUpToDate = false + let hasSnapshotEnd = false for (const message of messages) { // Add message to current batch buffer (for race condition handling) @@ -746,6 +783,7 @@ function createElectricSync>( }) } else if (isSnapshotEndMessage(message)) { newSnapshots.push(parseSnapshotMessage(message)) + hasSnapshotEnd = true } else if (isUpToDateMessage(message)) { hasUpToDate = true } else if (isMustRefetchMessage(message)) { @@ -761,12 +799,14 @@ function createElectricSync>( truncate() - // Reset hasUpToDate so we continue accumulating changes until next up-to-date + // Reset flags so we continue accumulating changes until next up-to-date hasUpToDate = false + hasSnapshotEnd = false + hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync } } - if (hasUpToDate) { + if (hasUpToDate || hasSnapshotEnd) { // Clear the current batch buffer since we're now up-to-date currentBatchMessages.setState(() => []) @@ -776,8 +816,15 @@ function createElectricSync>( transactionStarted = false } - // Mark the collection as ready now that sync is up to date - markReady() + if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { + // Mark the collection as ready now that sync is up to date + markReady() + } + + // Track that we've received the first up-to-date for progressive mode + if (hasUpToDate) { + hasReceivedUpToDate = true + } // Always commit txids when we receive up-to-date, regardless of transaction state seenTxids.setState((currentTxids) => { @@ -811,12 +858,29 @@ function createElectricSync>( } }) - // Return the unsubscribe function - return () => { - // Unsubscribe from the stream - unsubscribeStream() - // Abort the abort controller to stop the stream - abortController.abort() + // Only set onLoadSubset if the sync mode is not eager, this indicates to the sync + // layer that it can load more data on demand via the requestSnapshot method when, + // the syncMode = `on-demand` or `progressive` + const loadSubset = + syncMode === `eager` + ? undefined + : async (opts: LoadSubsetOptions) => { + // In progressive mode, stop requesting snapshots once full sync is complete + if (syncMode === `progressive` && hasReceivedUpToDate) { + return + } + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + } + + return { + loadSubset, + cleanup: () => { + // Unsubscribe from the stream + unsubscribeStream() + // Abort the abort controller to stop the stream + abortController.abort() + }, } }, // Expose the getSyncMetadata function diff --git a/packages/electric-db-collection/src/pg-serializer.ts b/packages/electric-db-collection/src/pg-serializer.ts new file mode 100644 index 000000000..707c4e1b8 --- /dev/null +++ b/packages/electric-db-collection/src/pg-serializer.ts @@ -0,0 +1,27 @@ +export function serialize(value: unknown): string { + if (typeof value === `string`) { + return `'${value}'` + } + + if (typeof value === `number`) { + return value.toString() + } + + if (value === null || value === undefined) { + return `NULL` + } + + if (typeof value === `boolean`) { + return value ? `true` : `false` + } + + if (value instanceof Date) { + return `'${value.toISOString()}'` + } + + if (Array.isArray(value)) { + return `ARRAY[${value.map(serialize).join(`,`)}]` + } + + throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`) +} diff --git a/packages/electric-db-collection/src/sql-compiler.ts b/packages/electric-db-collection/src/sql-compiler.ts new file mode 100644 index 000000000..969869aae --- /dev/null +++ b/packages/electric-db-collection/src/sql-compiler.ts @@ -0,0 +1,163 @@ +import { serialize } from "./pg-serializer" +import type { SubsetParams } from "@electric-sql/client" +import type { IR, LoadSubsetOptions } from "@tanstack/db" + +export type CompiledSqlRecord = Omit & { + params?: Array +} + +export function compileSQL(options: LoadSubsetOptions): SubsetParams { + const { where, orderBy, limit } = options + + const params: Array = [] + const compiledSQL: CompiledSqlRecord = { params } + + if (where) { + // TODO: this only works when the where expression's PropRefs directly reference a column of the collection + // doesn't work if it goes through aliases because then we need to know the entire query to be able to follow the reference until the base collection (cf. followRef function) + compiledSQL.where = compileBasicExpression(where, params) + } + + if (orderBy) { + compiledSQL.orderBy = compileOrderBy(orderBy, params) + } + + if (limit) { + compiledSQL.limit = limit + } + + // Serialize the values in the params array into PG formatted strings + // and transform the array into a Record + const paramsRecord = params.reduce( + (acc, param, index) => { + acc[`${index + 1}`] = serialize(param) + return acc + }, + {} as Record + ) + + return { + ...compiledSQL, + params: paramsRecord, + } +} + +/** + * Compiles the expression to a SQL string and mutates the params array with the values. + * @param exp - The expression to compile + * @param params - The params array + * @returns The compiled SQL string + */ +function compileBasicExpression( + exp: IR.BasicExpression, + params: Array +): string { + switch (exp.type) { + case `val`: + params.push(exp.value) + return `$${params.length}` + case `ref`: + // TODO: doesn't yet support JSON(B) values which could be accessed with nested props + if (exp.path.length !== 1) { + throw new Error( + `Compiler can't handle nested properties: ${exp.path.join(`.`)}` + ) + } + return exp.path[0]! + case `func`: + return compileFunction(exp, params) + default: + throw new Error(`Unknown expression type`) + } +} + +function compileOrderBy(orderBy: IR.OrderBy, params: Array): string { + const compiledOrderByClauses = orderBy.map((clause: IR.OrderByClause) => + compileOrderByClause(clause, params) + ) + return compiledOrderByClauses.join(`,`) +} + +function compileOrderByClause( + clause: IR.OrderByClause, + params: Array +): string { + // TODO: what to do with stringSort and locale? + // Correctly supporting them is tricky as it depends on Postgres' collation + const { expression, compareOptions } = clause + let sql = compileBasicExpression(expression, params) + + if (compareOptions.direction === `desc`) { + sql = `${sql} DESC` + } + + if (compareOptions.nulls === `first`) { + sql = `${sql} NULLS FIRST` + } + + if (compareOptions.nulls === `last`) { + sql = `${sql} NULLS LAST` + } + + return sql +} + +function compileFunction( + exp: IR.Func, + params: Array = [] +): string { + const { name, args } = exp + + const opName = getOpName(name) + + const compiledArgs = args.map((arg: IR.BasicExpression) => + compileBasicExpression(arg, params) + ) + + if (isBinaryOp(name)) { + if (compiledArgs.length !== 2) { + throw new Error(`Binary operator ${name} expects 2 arguments`) + } + const [lhs, rhs] = compiledArgs + return `${lhs} ${opName} ${rhs}` + } + + return `${opName}(${compiledArgs.join(`,`)})` +} + +function isBinaryOp(name: string): boolean { + const binaryOps = [`eq`, `gt`, `gte`, `lt`, `lte`, `and`, `or`] + return binaryOps.includes(name) +} + +function getOpName(name: string): string { + const opNames = { + eq: `=`, + gt: `>`, + gte: `>=`, + lt: `<`, + lte: `<=`, + add: `+`, + and: `AND`, + or: `OR`, + not: `NOT`, + isUndefined: `IS NULL`, + isNull: `IS NULL`, + in: `IN`, + like: `LIKE`, + ilike: `ILIKE`, + upper: `UPPER`, + lower: `LOWER`, + length: `LENGTH`, + concat: `CONCAT`, + coalesce: `COALESCE`, + } + + const opName = opNames[name as keyof typeof opNames] + + if (!opName) { + throw new Error(`Unknown operator/function: ${name}`) + } + + return opName +} diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index b387f1756..1cd952506 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -54,10 +54,39 @@ const sampleUsers: Array = [ // Mock the ShapeStream module const mockSubscribe = vi.fn() +const mockRequestSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, + requestSnapshot: async (...args: any) => { + const result = await mockRequestSnapshot(...args) + const subscribers = mockSubscribe.mock.calls.map((args) => args[0]) + const data = [...result.data] + + const messages: Array> = data.map((row: any) => ({ + value: row.value, + key: row.key, + headers: row.headers, + })) + + if (messages.length > 0) { + // add an up-to-date message + messages.push({ + headers: { control: `up-to-date` }, + }) + } + + subscribers.forEach((subscriber) => subscriber(messages)) + return result + }, } +// Mock the requestSnapshot method +// to return an empty array of data +// since most tests don't use it +mockRequestSnapshot.mockResolvedValue({ + data: [], +}) + vi.mock(`@electric-sql/client`, async () => { const actual = await vi.importActual(`@electric-sql/client`) return { @@ -437,4 +466,491 @@ describe.each([ // Clean up subscription.unsubscribe() }) + if (autoIndex === `eager`) { + it(`should load more data via requestSnapshot when creating live query with higher limit`, async () => { + // Create a new electric collection with on-demand syncMode for this test + vi.clearAllMocks() + + let testSubscriber: (messages: Array>) => void = () => {} + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + + const testElectricCollection = createCollection( + electricCollectionOptions({ + id: `test-incremental-loading`, + shapeOptions: { + url: `http://test-url`, + params: { table: `users` }, + }, + syncMode: `on-demand`, + getKey: (user: User) => user.id, + startSync: true, + autoIndex: `eager` as const, + }) + ) + + mockRequestSnapshot.mockResolvedValue({ + data: [], + }) + + // Initial sync with limited data + testSubscriber([ + ...sampleUsers.map((user) => ({ + key: user.id.toString(), + value: user, + headers: { operation: `insert` as const }, + })), + { headers: { control: `up-to-date` as const } }, + ]) + + expect(testElectricCollection.status).toBe(`ready`) + expect(testElectricCollection.size).toBe(4) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(0) + + // Create first live query with limit of 2 + const limitedLiveQuery = createLiveQueryCollection({ + id: `limited-users-live-query`, + startSync: true, + query: (q) => + q + .from({ user: testElectricCollection }) + .where(({ user }) => eq(user.active, true)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + active: user.active, + age: user.age, + })) + .orderBy(({ user }) => user.age, `asc`) + .limit(2), + }) + + expect(limitedLiveQuery.status).toBe(`ready`) + expect(limitedLiveQuery.size).toBe(2) // Only first 2 active users + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + const callArgs = (index: number) => + mockRequestSnapshot.mock.calls[index]?.[0] + expect(callArgs(0)).toMatchObject({ + params: { "1": `true` }, + where: `active = $1`, + orderBy: `age NULLS FIRST`, + limit: 2, + }) + + // Next call will return a snapshot containing 2 rows + // Calls after that will return the default empty snapshot + mockRequestSnapshot.mockResolvedValueOnce({ + data: [ + { + headers: { operation: `insert` }, + key: 5, + value: { + id: 5, + name: `Eve`, + age: 30, + email: `eve@example.com`, + active: true, + }, + }, + { + headers: { operation: `insert` }, + key: 6, + value: { + id: 6, + name: `Frank`, + age: 35, + email: `frank@example.com`, + active: true, + }, + }, + ], + }) + + // Create second live query with higher limit of 6 + const expandedLiveQuery = createLiveQueryCollection({ + id: `expanded-users-live-query`, + startSync: true, + query: (q) => + q + .from({ user: testElectricCollection }) + .where(({ user }) => eq(user.active, true)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + active: user.active, + })) + .orderBy(({ user }) => user.age, `asc`) + .limit(6), + }) + + // Wait for the live query to process + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Verify that requestSnapshot was called with the correct parameters + expect(mockRequestSnapshot).toHaveBeenCalledTimes(4) + + // Check that first it requested a limit of 6 users + expect(callArgs(1)).toMatchObject({ + params: { "1": `true` }, + where: `active = $1`, + orderBy: `age NULLS FIRST`, + limit: 6, + }) + + // After this initial snapshot for the new live query it receives all 3 users from the local collection + // so it still needs 3 more users to reach the limit of 6 so it requests 3 more to the sync layer + expect(callArgs(2)).toMatchObject({ + params: { "1": `true`, "2": `25` }, + where: `active = $1 AND age > $2`, + orderBy: `age NULLS FIRST`, + limit: 3, + }) + + // The previous snapshot returned 2 more users so it still needs 1 more user to reach the limit of 6 + expect(callArgs(3)).toMatchObject({ + params: { "1": `true`, "2": `35` }, + where: `active = $1 AND age > $2`, + orderBy: `age NULLS FIRST`, + limit: 1, + }) + + // The sync layer won't provide any more users so the DB is exhausted and it stops (i.e. doesn't request more) + + // The expanded live query should now have more data + expect(expandedLiveQuery.status).toBe(`ready`) + expect(expandedLiveQuery.size).toBe(5) // Alice, Bob, Dave from initial + Eve and Frank from additional data + }) + } +}) + +// Tests specifically for syncMode behavior with live queries +describe(`Electric Collection with Live Query - syncMode integration`, () => { + let subscriber: (messages: Array>) => void + + function createElectricCollectionWithSyncMode( + syncMode: `eager` | `on-demand` | `progressive` + ) { + vi.clearAllMocks() + + mockSubscribe.mockImplementation((callback) => { + subscriber = callback + return () => {} + }) + + mockRequestSnapshot.mockResolvedValue({ + data: [], + }) + + const config = { + id: `electric-users-${syncMode}`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `users`, + }, + }, + syncMode, + getKey: (user: User) => user.id, + } + + const options = electricCollectionOptions(config) + return createCollection({ + ...options, + startSync: true, + autoIndex: `eager` as const, + }) + } + + function simulateInitialSync(users: Array = sampleUsers) { + const messages: Array> = users.map((user) => ({ + key: user.id.toString(), + value: user, + headers: { operation: `insert` }, + })) + + messages.push({ + headers: { control: `up-to-date` }, + }) + + subscriber(messages) + } + + it(`should trigger requestSnapshot in on-demand mode when live query needs more data`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + // Initial sync with limited data + simulateInitialSync([sampleUsers[0]!, sampleUsers[1]!]) // Only Alice and Bob + expect(electricCollection.status).toBe(`ready`) + expect(electricCollection.size).toBe(2) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(0) + + // Mock requestSnapshot to return additional data + mockRequestSnapshot.mockResolvedValueOnce({ + data: [ + { + headers: { operation: `insert` }, + key: 3, + value: sampleUsers[2]!, // Charlie + }, + { + headers: { operation: `insert` }, + key: 4, + value: sampleUsers[3]!, // Dave + }, + ], + }) + + // Create live query with limit that exceeds available data + const liveQuery = createLiveQueryCollection({ + id: `on-demand-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`) + .limit(5), + }) + + // Wait for the live query to process + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have requested more data from Electric with correct parameters + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + limit: 5, // Requests full limit from Electric + orderBy: `age NULLS FIRST`, + where: `active = $1`, + params: { 1: `true` }, // Parameters are stringified + }) + ) + expect(liveQuery.size).toBeGreaterThan(2) + }) + + it(`should trigger requestSnapshot in progressive mode when live query needs more data`, async () => { + const electricCollection = + createElectricCollectionWithSyncMode(`progressive`) + + // Send initial snapshot with limited data (using snapshot-end, not up-to-date) + // This keeps the collection in "loading" state, simulating progressive mode still syncing + subscriber([ + { + key: sampleUsers[0]!.id.toString(), + value: sampleUsers[0]!, + headers: { operation: `insert` }, + }, + { + key: sampleUsers[1]!.id.toString(), + value: sampleUsers[1]!, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode + expect(electricCollection.size).toBe(2) + + // Mock requestSnapshot to return additional data + mockRequestSnapshot.mockResolvedValueOnce({ + data: [ + { + headers: { operation: `insert` }, + key: 3, + value: sampleUsers[2]!, // Charlie + }, + ], + }) + + // Create live query that needs more data + createLiveQueryCollection({ + id: `progressive-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .orderBy(({ user }) => user.id, `asc`) + .limit(3), + }) + + // Wait for the live query to process + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have requested more data from Electric with correct parameters + // First request asks for the full limit + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + limit: 3, // Requests full limit from Electric + orderBy: `id NULLS FIRST`, + params: {}, + }) + ) + }) + + it(`should NOT trigger requestSnapshot in eager mode even when live query needs more data`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`eager`) + + // Initial sync with limited data + simulateInitialSync([sampleUsers[0]!, sampleUsers[1]!]) // Only Alice and Bob + expect(electricCollection.status).toBe(`ready`) + expect(electricCollection.size).toBe(2) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(0) + + // Create live query with limit that exceeds available data + const liveQuery = createLiveQueryCollection({ + id: `eager-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`) + .limit(5), + }) + + // Wait for the live query to process + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should NOT have requested more data (eager mode doesn't support incremental loading) + expect(mockRequestSnapshot).not.toHaveBeenCalled() + expect(liveQuery.size).toBe(2) // Only has the initially synced data + }) + + it(`should request additional snapshots progressively as live query expands in on-demand mode`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + // Initial sync with just Alice + simulateInitialSync([sampleUsers[0]!]) + expect(electricCollection.size).toBe(1) + + const callArgs = (index: number) => + mockRequestSnapshot.mock.calls[index]?.[0] + + // First snapshot returns Bob and Charlie + mockRequestSnapshot.mockResolvedValueOnce({ + data: [ + { + headers: { operation: `insert` }, + key: 2, + value: sampleUsers[1]!, // Bob + }, + { + headers: { operation: `insert` }, + key: 3, + value: sampleUsers[2]!, // Charlie + }, + ], + }) + + // Create live query with limit of 3 + createLiveQueryCollection({ + id: `expanding-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .orderBy(({ user }) => user.age, `asc`) + .limit(3), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have requested snapshot for limit 3 + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + limit: 3, + orderBy: `age NULLS FIRST`, + }) + ) + + // After receiving Bob and Charlie, the collection now has 3 users (Alice + Bob + Charlie) + // but it still requests 2 more... TODO: check if this is correct? + expect(callArgs(1)).toMatchObject({ + limit: 2, + orderBy: `age NULLS FIRST`, + }) + }) + + it(`should pass correct WHERE clause to requestSnapshot when live query has filters`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync([]) + expect(electricCollection.size).toBe(0) + + // Create filtered live query + createLiveQueryCollection({ + id: `filtered-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `desc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have requested snapshot with WHERE clause + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + where: `active = $1`, + params: { "1": `true` }, + orderBy: `name DESC NULLS FIRST`, + limit: 10, + }) + ) + }) + + it(`should handle complex filters in requestSnapshot`, async () => { + const electricCollection = + createElectricCollectionWithSyncMode(`progressive`) + + // Send snapshot-end (not up-to-date) to keep collection in loading state + subscriber([ + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode + + // Create live query with complex WHERE clause + createLiveQueryCollection({ + id: `complex-filter-live-query`, + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 20)) + .orderBy(({ user }) => user.age, `asc`) + .limit(5), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have requested snapshot with complex WHERE clause + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + where: `age > $1`, + params: { "1": `20` }, + orderBy: `age NULLS FIRST`, + limit: 5, + }) + ) + }) }) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index bf059a021..032e42033 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -19,8 +19,10 @@ import type { StandardSchemaV1 } from "@standard-schema/spec" // Mock the ShapeStream module const mockSubscribe = vi.fn() +const mockRequestSnapshot = vi.fn() const mockStream = { subscribe: mockSubscribe, + requestSnapshot: mockRequestSnapshot, } vi.mock(`@electric-sql/client`, async () => { @@ -50,6 +52,9 @@ describe(`Electric Integration`, () => { return () => {} }) + // Reset mock requestSnapshot + mockRequestSnapshot.mockResolvedValue(undefined) + // Create collection with Electric configuration const config = { id: `test`, @@ -728,6 +733,9 @@ describe(`Electric Integration`, () => { expect(testCollection.has(1)).toBe(true) }) + // NOTE: This test has a known issue with unhandled rejection warnings + // This is a pre-existing issue from main branch (not caused by merge) + // The test functionality works correctly, but vitest reports unhandled rejections it(`should timeout with custom match function when no match found`, async () => { vi.useFakeTimers() @@ -754,14 +762,16 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) const tx = testCollection.insert({ id: 1, name: `Timeout Test` }) - // Add catch handler to prevent global unhandled rejection detection - tx.isPersisted.promise.catch(() => {}) + // Capture the rejection promise before advancing timers + const rejectionPromise = expect(tx.isPersisted.promise).rejects.toThrow( + `Timeout waiting for custom match function` + ) // Advance timers to trigger timeout await vi.runOnlyPendingTimersAsync() // Should timeout and fail - await expect(tx.isPersisted.promise).rejects.toThrow() + await rejectionPromise vi.useRealTimers() }) @@ -834,6 +844,9 @@ describe(`Electric Integration`, () => { expect(options.onDelete).toBeDefined() }) + // NOTE: This test has a known issue with unhandled rejection warnings + // This is a pre-existing issue from main branch (not caused by merge) + // The test functionality works correctly, but vitest reports unhandled rejections it(`should cleanup pending matches on timeout without memory leaks`, async () => { vi.useFakeTimers() @@ -862,16 +875,16 @@ describe(`Electric Integration`, () => { // Start insert that will timeout const tx = testCollection.insert({ id: 1, name: `Timeout Test` }) - // Add catch handler to prevent global unhandled rejection detection - tx.isPersisted.promise.catch(() => {}) + // Capture the rejection promise before advancing timers + const rejectionPromise = expect(tx.isPersisted.promise).rejects.toThrow( + `Timeout waiting for custom match function` + ) // Advance timers to trigger timeout await vi.runOnlyPendingTimersAsync() // Should timeout and fail - await expect(tx.isPersisted.promise).rejects.toThrow( - `Timeout waiting for custom match function` - ) + await rejectionPromise // Send a message after timeout - should not cause any side effects // This verifies that the pending match was properly cleaned up @@ -1601,7 +1614,662 @@ describe(`Electric Integration`, () => { // Snapshot txid should also resolve await expect(testCollection.utils.awaitTxId(105)).resolves.toBe(true) }) + }) + + // Tests for syncMode configuration + describe(`syncMode configuration`, () => { + it(`should not request snapshots during subscription in eager mode`, () => { + vi.clearAllMocks() + + const config = { + id: `eager-no-snapshot-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Subscribe and try to get more data + const subscription = testCollection.subscribeChanges(() => {}) + + // In eager mode, requestSnapshot should not be called + expect(mockRequestSnapshot).not.toHaveBeenCalled() + + subscription.unsubscribe() + }) + + it(`should request incremental snapshots in on-demand mode when loadSubset is called`, async () => { + vi.clearAllMocks() + + const config = { + id: `on-demand-snapshot-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send up-to-date to mark collection as ready + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // In on-demand mode, calling loadSubset should request a snapshot + await testCollection._sync.loadSubset({ limit: 10 }) + + // Verify requestSnapshot was called + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + limit: 10, + params: {}, + }) + ) + }) + + it(`should request incremental snapshots in progressive mode when loadSubset is called before sync completes`, async () => { + vi.clearAllMocks() + + const config = { + id: `progressive-snapshot-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send initial data with snapshot-end (but not up-to-date yet - still syncing) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + expect(testCollection.status).toBe(`loading`) // Not ready yet + + // In progressive mode, calling loadSubset should request a snapshot BEFORE full sync completes + await testCollection._sync.loadSubset({ limit: 20 }) + + // Verify requestSnapshot was called + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + limit: 20, + params: {}, + }) + ) + }) + + it(`should not request snapshots when loadSubset is called in eager mode`, async () => { + vi.clearAllMocks() + + const config = { + id: `eager-no-loadsubset-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send up-to-date to mark collection as ready + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // In eager mode, loadSubset should do nothing + await testCollection._sync.loadSubset({ limit: 10 }) + + // Verify requestSnapshot was NOT called + expect(mockRequestSnapshot).not.toHaveBeenCalled() + }) + + it(`should handle progressive mode syncing in background`, async () => { + vi.clearAllMocks() + + const config = { + id: `progressive-background-sync-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send initial data with snapshot-end (but not up-to-date - still syncing) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Initial User` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + // Collection should have data but not be ready yet + expect(testCollection.status).toBe(`loading`) + expect(testCollection.has(1)).toBe(true) + + // Should be able to request more data incrementally before full sync completes + await testCollection._sync.loadSubset({ limit: 10 }) + expect(mockRequestSnapshot).toHaveBeenCalled() + + // Now send up-to-date to complete the sync + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + }) + + it(`should stop requesting snapshots in progressive mode after first up-to-date`, async () => { + vi.clearAllMocks() + + const config = { + id: `progressive-stop-after-sync-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send initial data with snapshot-end (not up-to-date yet) + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + expect(testCollection.status).toBe(`loading`) // Not ready yet in progressive + expect(testCollection.has(1)).toBe(true) + + // Should be able to request more data before up-to-date + vi.clearAllMocks() + await testCollection._sync.loadSubset({ limit: 10 }) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + // Now send up-to-date to complete the full sync + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + + // Try to request more data - should NOT make a request since full sync is complete + vi.clearAllMocks() + await testCollection._sync.loadSubset({ limit: 10 }) + expect(mockRequestSnapshot).not.toHaveBeenCalled() + }) + + it(`should allow snapshots in on-demand mode even after up-to-date`, async () => { + vi.clearAllMocks() + + const config = { + id: `on-demand-after-sync-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send initial data with up-to-date + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + + // Should STILL be able to request more data in on-demand mode + vi.clearAllMocks() + await testCollection._sync.loadSubset({ limit: 10 }) + expect(mockRequestSnapshot).toHaveBeenCalled() + }) + + it(`should default offset to 'now' in on-demand mode when no offset provided`, async () => { + vi.clearAllMocks() + + // Import ShapeStream to check constructor calls + const { ShapeStream } = await import(`@electric-sql/client`) + + const config = { + id: `on-demand-offset-now-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + // No offset provided + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + createCollection(electricCollectionOptions(config)) + + // Check that ShapeStream was called with offset: 'now' + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: `now`, + }) + ) + }) + + it(`should use undefined offset in eager mode when no offset provided`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + + const config = { + id: `eager-offset-undefined-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + // No offset provided + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + createCollection(electricCollectionOptions(config)) + + // Check that ShapeStream was called with offset: undefined + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: undefined, + }) + ) + }) + + it(`should use undefined offset in progressive mode when no offset provided`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + + const config = { + id: `progressive-offset-undefined-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + // No offset provided + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + createCollection(electricCollectionOptions(config)) + + // Check that ShapeStream was called with offset: undefined + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: undefined, + }) + ) + }) + + it(`should use explicit offset when provided regardless of syncMode`, async () => { + vi.clearAllMocks() + + const { ShapeStream } = await import(`@electric-sql/client`) + + const config = { + id: `explicit-offset-test`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `test_table`, + }, + offset: -1 as any, // Explicit offset + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + createCollection(electricCollectionOptions(config)) + + // Check that ShapeStream was called with the explicit offset + expect(ShapeStream).toHaveBeenCalledWith( + expect.objectContaining({ + offset: -1, + }) + ) + }) + }) + + // Tests for commit and ready behavior with snapshot-end and up-to-date messages + describe(`Commit and ready behavior`, () => { + it(`should commit on snapshot-end in eager mode but not mark ready`, () => { + const config = { + id: `eager-snapshot-end-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send data followed by snapshot-end (but no up-to-date) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + // Data should be committed (available in state) + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + + // But collection should NOT be marked as ready yet in eager mode + expect(testCollection.status).toBe(`loading`) + + // Now send up-to-date + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // Now it should be ready + expect(testCollection.status).toBe(`ready`) + }) + + it(`should commit and mark ready on snapshot-end in on-demand mode`, () => { + const config = { + id: `on-demand-snapshot-end-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `on-demand` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send data followed by snapshot-end (but no up-to-date) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + // Data should be committed (available in state) + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + + // Collection SHOULD be marked as ready in on-demand mode + expect(testCollection.status).toBe(`ready`) + }) + + it(`should commit on snapshot-end in progressive mode but not mark ready`, () => { + const config = { + id: `progressive-snapshot-end-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send data followed by snapshot-end (but no up-to-date) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + // Data should be committed (available in state) + expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) + + // But collection should NOT be marked as ready yet in progressive mode + expect(testCollection.status).toBe(`loading`) + + // Now send up-to-date + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // Now it should be ready + expect(testCollection.status).toBe(`ready`) + }) + + it(`should commit multiple snapshot-end messages before up-to-date in eager mode`, () => { + const config = { + id: `eager-multiple-snapshots-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // First snapshot with data + subscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + ]) + + // First data should be committed + expect(testCollection.has(1)).toBe(true) + expect(testCollection.status).toBe(`loading`) + + // Second snapshot with more data + subscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `110`, + xmax: `120`, + xip_list: [], + }, + }, + ]) + + // Second data should also be committed + expect(testCollection.has(2)).toBe(true) + expect(testCollection.size).toBe(2) + expect(testCollection.status).toBe(`loading`) + + // Finally send up-to-date + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // Now should be ready + expect(testCollection.status).toBe(`ready`) + }) + + it(`should handle up-to-date without snapshot-end (traditional behavior)`, () => { + const config = { + id: `traditional-up-to-date-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send data followed by up-to-date (no snapshot-end) + subscriber([ + { + key: `1`, + value: { id: 1, name: `Test User` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // Data should be committed and collection ready + expect(testCollection.has(1)).toBe(true) + expect(testCollection.status).toBe(`ready`) + }) + }) + describe(`syncMode configuration - GC and resync`, () => { it(`should resync after garbage collection and new subscription`, () => { // Use fake timers for this test vi.useFakeTimers() From 2b4206790e179452478cf11f04b365be18f27d12 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 15 Oct 2025 19:31:00 +0100 Subject: [PATCH 2/5] use the subsetDuduper for electric --- .../electric-db-collection/src/electric.ts | 39 +- .../tests/electric-live-query.test.ts | 378 ++++++++++++++++-- .../tests/electric.test-d.ts | 3 +- packages/electric-db-collection/tsconfig.json | 4 +- 4 files changed, 373 insertions(+), 51 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 28a7eb60e..a71ae10bd 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -6,6 +6,7 @@ import { } from "@electric-sql/client" import { Store } from "@tanstack/store" import DebugModule from "debug" +import { DeduplicatedLoadSubset } from "@tanstack/db" import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, @@ -716,6 +717,21 @@ function createElectricSync>( const newSnapshots: Array = [] let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode + // Create deduplicated loadSubset wrapper for non-eager modes + // This prevents redundant snapshot requests when multiple concurrent + // live queries request overlapping or subset predicates + const loadSubsetDedupe = + syncMode === `eager` + ? null + : new DeduplicatedLoadSubset(async (opts: LoadSubsetOptions) => { + // In progressive mode, stop requesting snapshots once full sync is complete + if (syncMode === `progressive` && hasReceivedUpToDate) { + return + } + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + }) + unsubscribeStream = stream.subscribe((messages: Array>) => { let hasUpToDate = false let hasSnapshotEnd = false @@ -799,6 +815,10 @@ function createElectricSync>( truncate() + // Reset the loadSubset deduplication state since we're starting fresh + // This ensures that previously loaded predicates don't prevent refetching after truncate + loadSubsetDedupe?.reset() + // Reset flags so we continue accumulating changes until next up-to-date hasUpToDate = false hasSnapshotEnd = false @@ -858,23 +878,10 @@ function createElectricSync>( } }) - // Only set onLoadSubset if the sync mode is not eager, this indicates to the sync - // layer that it can load more data on demand via the requestSnapshot method when, - // the syncMode = `on-demand` or `progressive` - const loadSubset = - syncMode === `eager` - ? undefined - : async (opts: LoadSubsetOptions) => { - // In progressive mode, stop requesting snapshots once full sync is complete - if (syncMode === `progressive` && hasReceivedUpToDate) { - return - } - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) - } - + // Return the deduplicated loadSubset if available (on-demand or progressive mode) + // The loadSubset method is auto-bound, so it can be safely returned directly return { - loadSubset, + loadSubset: loadSubsetDedupe?.loadSubset, cleanup: () => { // Unsubscribe from the stream unsubscribeStream() diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index 1cd952506..b3a55d087 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -4,6 +4,7 @@ import { createLiveQueryCollection, eq, gt, + lt, } from "@tanstack/db" import { electricCollectionOptions } from "../src/electric" import type { ElectricCollectionUtils } from "../src/electric" @@ -59,7 +60,7 @@ const mockStream = { subscribe: mockSubscribe, requestSnapshot: async (...args: any) => { const result = await mockRequestSnapshot(...args) - const subscribers = mockSubscribe.mock.calls.map((args) => args[0]) + const subscribers = mockSubscribe.mock.calls.map((call) => call[0]) const data = [...result.data] const messages: Array> = data.map((row: any) => ({ @@ -589,39 +590,33 @@ describe.each([ // Wait for the live query to process await new Promise((resolve) => setTimeout(resolve, 0)) - // Verify that requestSnapshot was called with the correct parameters - expect(mockRequestSnapshot).toHaveBeenCalledTimes(4) + // With deduplication, the expanded query (limit 6) is NOT a subset of the limited query (limit 2), + // so it will trigger a new requestSnapshot call. However, some of the recursive + // calls may be deduped if they're covered by the union of previous unlimited calls. + // We expect at least 2 calls: the initial limit 2 and the initial limit 6. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) - // Check that first it requested a limit of 6 users - expect(callArgs(1)).toMatchObject({ + // Check that first it requested a limit of 2 users (from first query) + expect(callArgs(0)).toMatchObject({ params: { "1": `true` }, where: `active = $1`, orderBy: `age NULLS FIRST`, - limit: 6, - }) - - // After this initial snapshot for the new live query it receives all 3 users from the local collection - // so it still needs 3 more users to reach the limit of 6 so it requests 3 more to the sync layer - expect(callArgs(2)).toMatchObject({ - params: { "1": `true`, "2": `25` }, - where: `active = $1 AND age > $2`, - orderBy: `age NULLS FIRST`, - limit: 3, + limit: 2, }) - // The previous snapshot returned 2 more users so it still needs 1 more user to reach the limit of 6 - expect(callArgs(3)).toMatchObject({ - params: { "1": `true`, "2": `35` }, - where: `active = $1 AND age > $2`, + // Check that second it requested a limit of 6 users (from second query) + expect(callArgs(1)).toMatchObject({ + params: { "1": `true` }, + where: `active = $1`, orderBy: `age NULLS FIRST`, - limit: 1, + limit: 6, }) - // The sync layer won't provide any more users so the DB is exhausted and it stops (i.e. doesn't request more) - - // The expanded live query should now have more data + // The expanded live query should have the locally available data expect(expandedLiveQuery.status).toBe(`ready`) - expect(expandedLiveQuery.size).toBe(5) // Alice, Bob, Dave from initial + Eve and Frank from additional data + // The mock returned 2 additional users (Eve and Frank) in response to the limit 6 request, + // plus the initial 3 active users (Alice, Bob, Dave) from the initial sync + expect(expandedLiveQuery.size).toBe(5) }) } }) @@ -832,9 +827,6 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { simulateInitialSync([sampleUsers[0]!]) expect(electricCollection.size).toBe(1) - const callArgs = (index: number) => - mockRequestSnapshot.mock.calls[index]?.[0] - // First snapshot returns Bob and Charlie mockRequestSnapshot.mockResolvedValueOnce({ data: [ @@ -872,12 +864,11 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { }) ) - // After receiving Bob and Charlie, the collection now has 3 users (Alice + Bob + Charlie) - // but it still requests 2 more... TODO: check if this is correct? - expect(callArgs(1)).toMatchObject({ - limit: 2, - orderBy: `age NULLS FIRST`, - }) + // With deduplication, the unlimited where predicate (no where clause) is tracked, + // and subsequent calls for the same unlimited predicate may be deduped. + // After receiving Bob and Charlie, we have 3 users total, which satisfies the limit of 3, + // so no additional requests should be made. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) }) it(`should pass correct WHERE clause to requestSnapshot when live query has filters`, async () => { @@ -954,3 +945,324 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { ) }) }) + +// Tests specifically for loadSubset deduplication +describe(`Electric Collection - loadSubset deduplication`, () => { + let subscriber: (messages: Array>) => void + + function createElectricCollectionWithSyncMode( + syncMode: `on-demand` | `progressive` + ) { + vi.clearAllMocks() + + mockSubscribe.mockImplementation((callback) => { + subscriber = callback + return () => {} + }) + + mockRequestSnapshot.mockResolvedValue({ + data: [], + }) + + const config = { + id: `electric-dedupe-test-${syncMode}`, + shapeOptions: { + url: `http://test-url`, + params: { + table: `users`, + }, + }, + syncMode, + getKey: (user: User) => user.id, + } + + const options = electricCollectionOptions(config) + return createCollection({ + ...options, + startSync: true, + autoIndex: `eager` as const, + }) + } + + function simulateInitialSync(users: Array = sampleUsers) { + const messages: Array> = users.map((user) => ({ + key: user.id.toString(), + value: user, + headers: { operation: `insert` }, + })) + + messages.push({ + headers: { control: `up-to-date` }, + }) + + subscriber(messages) + } + + it(`should deduplicate identical concurrent loadSubset requests`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync([]) + expect(electricCollection.status).toBe(`ready`) + + // Create three identical live queries concurrently + // Without deduplication, this would trigger 3 requestSnapshot calls + // With deduplication, only 1 should be made + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // With deduplication, only 1 requestSnapshot call should be made + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + expect(mockRequestSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + where: `active = $1`, + params: { "1": `true` }, + orderBy: `age NULLS FIRST`, + limit: 10, + }) + ) + }) + + it(`should deduplicate subset loadSubset requests`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync([]) + expect(electricCollection.status).toBe(`ready`) + + // Create a live query with a broader predicate + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 10)) + .orderBy(({ user }) => user.age, `asc`) + .limit(20), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + // Create a live query with a subset predicate (age > 20 is subset of age > 10) + // This should be deduped - no additional requestSnapshot call + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 20)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Still only 1 call - the second was deduped as a subset + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + }) + + it(`should NOT deduplicate non-subset loadSubset requests`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync([]) + expect(electricCollection.status).toBe(`ready`) + + // Create a live query with a narrower predicate + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 30)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + // Create a live query with a broader predicate (age > 20 is NOT subset of age > 30) + // This should NOT be deduped - should trigger another requestSnapshot + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 20)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have 2 calls - the second was not a subset + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) + }) + + it(`should reset deduplication state on must-refetch/truncate`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync(sampleUsers) + expect(electricCollection.status).toBe(`ready`) + + // Create a live query + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + // Simulate a must-refetch (which triggers truncate and reset) + subscriber([{ headers: { control: `must-refetch` } }]) + subscriber([{ headers: { control: `up-to-date` } }]) + + // Wait for the existing live query to re-request data after truncate + await new Promise((resolve) => setTimeout(resolve, 0)) + + // The existing live query re-requests its data after truncate (call 2) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) + + // Create the same live query again after reset + // This should NOT be deduped because the reset cleared the deduplication state, + // but it WILL be deduped because the existing live query just made the same request (call 2) + // So creating a different query to ensure we test the reset + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, false)) + .orderBy(({ user }) => user.age, `asc`) + .limit(10), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Should have 3 calls - the different query triggered a new request + expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + }) + + it(`should deduplicate unlimited queries regardless of orderBy`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync([]) + expect(electricCollection.status).toBe(`ready`) + + // Create a live query without limit (unlimited) + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.age, `asc`), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + // Create another unlimited query with same where but different orderBy + // This should be deduped - orderBy is ignored for unlimited queries + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => eq(user.active, true)) + .orderBy(({ user }) => user.name, `desc`), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Still only 1 call - different orderBy doesn't matter for unlimited queries + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + }) + + it(`should combine multiple unlimited queries with union`, async () => { + const electricCollection = createElectricCollectionWithSyncMode(`on-demand`) + + simulateInitialSync([]) + expect(electricCollection.status).toBe(`ready`) + + // Create first unlimited query (age > 30) + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 30)), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + + // Create second unlimited query (age < 20) - different range + // This should trigger a new request + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => lt(user.age, 20)), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) + + // Create third query (age > 35) - this is a subset of (age > 30) + // This should be deduped + createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ user: electricCollection }) + .where(({ user }) => gt(user.age, 35)), + }) + + await new Promise((resolve) => setTimeout(resolve, 0)) + + // Still 2 calls - third was covered by the union of first two + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) + }) +}) diff --git a/packages/electric-db-collection/tests/electric.test-d.ts b/packages/electric-db-collection/tests/electric.test-d.ts index b45d47370..27f90918d 100644 --- a/packages/electric-db-collection/tests/electric.test-d.ts +++ b/packages/electric-db-collection/tests/electric.test-d.ts @@ -1,6 +1,7 @@ import { describe, expectTypeOf, it } from "vitest" import { z } from "zod" import { + and, createCollection, createLiveQueryCollection, eq, @@ -200,7 +201,7 @@ describe(`Electric collection type resolution tests`, () => { query: (q) => q .from({ user: usersCollection }) - .where(({ user }) => eq(user.active, true) && gt(user.age, 18)) + .where(({ user }) => and(eq(user.active, true), gt(user.age, 18))) .select(({ user }) => ({ id: user.id, name: user.name, diff --git a/packages/electric-db-collection/tsconfig.json b/packages/electric-db-collection/tsconfig.json index 7e586bab3..fc6368937 100644 --- a/packages/electric-db-collection/tsconfig.json +++ b/packages/electric-db-collection/tsconfig.json @@ -12,7 +12,9 @@ "forceConsistentCasingInFileNames": true, "jsx": "react", "paths": { - "@tanstack/store": ["../store/src"] + "@tanstack/store": ["../store/src"], + "@tanstack/db-ivm": ["../db-ivm/src"], + "@tanstack/db": ["../db/src"] } }, "include": ["src", "tests", "vite.config.ts"], From ffe08e9e2c694177b3896914f2a2719ede10c3f2 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 4 Nov 2025 10:14:45 +0100 Subject: [PATCH 3/5] Leave fixme --- packages/electric-db-collection/src/sql-compiler.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/electric-db-collection/src/sql-compiler.ts b/packages/electric-db-collection/src/sql-compiler.ts index 969869aae..078840da3 100644 --- a/packages/electric-db-collection/src/sql-compiler.ts +++ b/packages/electric-db-collection/src/sql-compiler.ts @@ -82,8 +82,8 @@ function compileOrderByClause( clause: IR.OrderByClause, params: Array ): string { - // TODO: what to do with stringSort and locale? - // Correctly supporting them is tricky as it depends on Postgres' collation + // FIXME: We should handle stringSort and locale. + // Correctly supporting them is tricky as it depends on Postgres' collation const { expression, compareOptions } = clause let sql = compileBasicExpression(expression, params) From 247e0ddcae3128847637790f848f0cd76f08cfa4 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 5 Nov 2025 14:49:19 +0000 Subject: [PATCH 4/5] fix DeduplicatedLoadSubset call --- packages/electric-db-collection/src/electric.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index a71ae10bd..22403dbdf 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -723,13 +723,15 @@ function createElectricSync>( const loadSubsetDedupe = syncMode === `eager` ? null - : new DeduplicatedLoadSubset(async (opts: LoadSubsetOptions) => { - // In progressive mode, stop requesting snapshots once full sync is complete - if (syncMode === `progressive` && hasReceivedUpToDate) { - return - } - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) + : new DeduplicatedLoadSubset({ + loadSubset: async (opts: LoadSubsetOptions) => { + // In progressive mode, stop requesting snapshots once full sync is complete + if (syncMode === `progressive` && hasReceivedUpToDate) { + return + } + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + }, }) unsubscribeStream = stream.subscribe((messages: Array>) => { From 80284c2b983261e11439b311f00d558f6c7f9ec1 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 5 Nov 2025 15:23:47 +0000 Subject: [PATCH 5/5] fix tests --- .../tests/electric-live-query.test.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index b3a55d087..b8047e8cf 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -593,8 +593,9 @@ describe.each([ // With deduplication, the expanded query (limit 6) is NOT a subset of the limited query (limit 2), // so it will trigger a new requestSnapshot call. However, some of the recursive // calls may be deduped if they're covered by the union of previous unlimited calls. - // We expect at least 2 calls: the initial limit 2 and the initial limit 6. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) + // We expect at least 4 calls: 2x for the initial limit 2 and 2x for the initial limit 6. + // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this to 2 calls. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(4) // Check that first it requested a limit of 2 users (from first query) expect(callArgs(0)).toMatchObject({ @@ -868,7 +869,8 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { // and subsequent calls for the same unlimited predicate may be deduped. // After receiving Bob and Charlie, we have 3 users total, which satisfies the limit of 3, // so no additional requests should be made. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this to 1 call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) }) it(`should pass correct WHERE clause to requestSnapshot when live query has filters`, async () => { @@ -1148,7 +1150,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) - expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) + // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this to 1 call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Simulate a must-refetch (which triggers truncate and reset) subscriber([{ headers: { control: `must-refetch` } }]) @@ -1158,7 +1161,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // The existing live query re-requests its data after truncate (call 2) - expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) + // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this to 1 call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(4) // Create the same live query again after reset // This should NOT be deduped because the reset cleared the deduplication state, @@ -1176,8 +1180,9 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) - // Should have 3 calls - the different query triggered a new request - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // Should have 5 calls - the different query triggered a new request + // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this to <=3 calls. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(5) }) it(`should deduplicate unlimited queries regardless of orderBy`, async () => {