From 657817064b4edbb1dbe243b91198f90661ae660c Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 14:57:33 +0000 Subject: [PATCH 1/6] feat(db): add `where` callback option to subscribeChanges Add a `where` callback option to `SubscribeChangesOptions` that allows filtering changes using query builder functions (eq, gt, and, etc.) with a row proxy, instead of requiring manual IR construction with PropRef. Before: ```ts import { eq, PropRef } from "@tanstack/db" collection.subscribeChanges(callback, { whereExpression: eq(new PropRef(["status"]), "active") }) ``` After: ```ts import { eq } from "@tanstack/db" collection.subscribeChanges(callback, { where: (row) => eq(row.status, "active") }) ``` Changes: - Add generic `where` callback to `SubscribeChangesOptions` - Compile `where` callback to `whereExpression` in CollectionChangesManager - Update JSDoc examples to show correct usage - Add tests for single and multiple condition filtering --- packages/db/src/collection/changes.ts | 33 ++-- packages/db/src/collection/index.ts | 15 +- packages/db/src/types.ts | 39 +++-- .../collection-subscribe-changes.test.ts | 162 +++++++++++++++++- 4 files changed, 216 insertions(+), 33 deletions(-) diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index a69400375..d0e7a1b26 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -1,11 +1,15 @@ -import { NegativeActiveSubscribersError } from '../errors' -import { CollectionSubscription } from './subscription.js' -import type { StandardSchemaV1 } from '@standard-schema/spec' -import type { ChangeMessage, SubscribeChangesOptions } from '../types' -import type { CollectionLifecycleManager } from './lifecycle.js' -import type { CollectionSyncManager } from './sync.js' -import type { CollectionEventsManager } from './events.js' -import type { CollectionImpl } from './index.js' +import { NegativeActiveSubscribersError } from "../errors" +import { + createSingleRowRefProxy, + toExpression, +} from "../query/builder/ref-proxy.js" +import { CollectionSubscription } from "./subscription.js" +import type { StandardSchemaV1 } from "@standard-schema/spec" +import type { ChangeMessage, SubscribeChangesOptions } from "../types" +import type { CollectionLifecycleManager } from "./lifecycle.js" +import type { CollectionSyncManager } from "./sync.js" +import type { CollectionEventsManager } from "./events.js" +import type { CollectionImpl } from "./index.js" export class CollectionChangesManager< TOutput extends object = Record, @@ -94,13 +98,22 @@ export class CollectionChangesManager< */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {}, + options: SubscribeChangesOptions = {} ): CollectionSubscription { // Start sync and track subscriber this.addSubscriber() + // Compile where callback to whereExpression if provided + let whereExpression = options.whereExpression + if (options.where && !whereExpression) { + const proxy = createSingleRowRefProxy() + const result = options.where(proxy) + whereExpression = toExpression(result) + } + const subscription = new CollectionSubscription(this.collection, callback, { - ...options, + includeInitialState: options.includeInitialState, + whereExpression, onUnsubscribe: () => { this.removeSubscriber() this.changeSubscriptions.delete(subscription) diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index ecd8d70bf..e30b4d4a9 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -849,26 +849,29 @@ export class CollectionImpl< * }, { includeInitialState: true }) * * @example - * // Subscribe only to changes matching a condition + * // Subscribe only to changes matching a condition using where callback + * import { eq } from "@tanstack/db" + * * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { * includeInitialState: true, - * where: (row) => row.status === 'active' + * where: (row) => eq(row.status, "active") * }) * * @example - * // Subscribe using a pre-compiled expression + * // Using multiple conditions with and() + * import { and, eq, gt } from "@tanstack/db" + * * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { - * includeInitialState: true, - * whereExpression: eq(row.status, 'active') + * where: (row) => and(eq(row.status, "active"), gt(row.priority, 5)) * }) */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {}, + options: SubscribeChangesOptions = {} ): CollectionSubscription { return this._changes.subscribeChanges(callback, options) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index bb78f35eb..002772352 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -1,9 +1,10 @@ -import type { IStreamBuilder } from '@tanstack/db-ivm' -import type { Collection } from './collection/index.js' -import type { StandardSchemaV1 } from '@standard-schema/spec' -import type { Transaction } from './transactions' -import type { BasicExpression, OrderBy } from './query/ir.js' -import type { EventEmitter } from './event-emitter.js' +import type { IStreamBuilder } from "@tanstack/db-ivm" +import type { Collection } from "./collection/index.js" +import type { StandardSchemaV1 } from "@standard-schema/spec" +import type { Transaction } from "./transactions" +import type { BasicExpression, OrderBy } from "./query/ir.js" +import type { EventEmitter } from "./event-emitter.js" +import type { SingleRowRefProxy } from "./query/builder/ref-proxy.js" /** * Interface for a collection-like object that provides the necessary methods @@ -775,17 +776,33 @@ export type NamespacedAndKeyedStream = IStreamBuilder /** * Options for subscribing to collection changes */ -export interface SubscribeChangesOptions { +export interface SubscribeChangesOptions< + T extends object = Record, +> { /** Whether to include the current state as initial changes */ includeInitialState?: boolean + /** + * Callback function for filtering changes using a row proxy. + * The callback receives a proxy object that records property access, + * allowing you to use query builder functions like `eq`, `gt`, etc. + * + * @example + * ```ts + * import { eq } from "@tanstack/db" + * + * collection.subscribeChanges(callback, { + * where: (row) => eq(row.status, "active") + * }) + * ``` + */ + where?: (row: SingleRowRefProxy) => any /** Pre-compiled expression for filtering changes */ whereExpression?: BasicExpression } -export interface SubscribeChangesSnapshotOptions extends Omit< - SubscribeChangesOptions, - `includeInitialState` -> { +export interface SubscribeChangesSnapshotOptions< + T extends object = Record, +> extends Omit, `includeInitialState`> { orderBy?: OrderBy limit?: number } diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index f3f4b1cce..41437d208 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1,9 +1,9 @@ -import { describe, expect, it, vi } from 'vitest' -import mitt from 'mitt' -import { createCollection } from '../src/collection/index.js' -import { createTransaction } from '../src/transactions' -import { eq } from '../src/query/builder/functions' -import { PropRef } from '../src/query/ir' +import { describe, expect, it, vi } from "vitest" +import mitt from "mitt" +import { createCollection } from "../src/collection/index.js" +import { createTransaction } from "../src/transactions" +import { and, eq, gt } from "../src/query/builder/functions" +import { PropRef } from "../src/query/ir" import type { ChangeMessage, ChangesPayload, @@ -1916,4 +1916,154 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.status).toBe(`ready`) expect(collection.size).toBe(2) }) + + it(`should support where callback for filtering changes`, () => { + const callback = vi.fn() + + // Create collection with items that have a status field + const collection = createCollection<{ + id: number + value: string + status: `active` | `inactive` + }>({ + id: `where-callback-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit }) => { + // Start with some initial data + begin() + write({ + type: `insert`, + value: { id: 1, value: `item1`, status: `inactive` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `item2`, status: `active` }, + }) + commit() + }, + }, + }) + + const mutationFn: MutationFn = async () => {} + + // Subscribe to changes with a where callback for active items only + const subscription = collection.subscribeChanges(callback, { + includeInitialState: true, + where: (row) => eq(row.status, `active`), + }) + + // Should only receive the active item in initial state + expect(callback).toHaveBeenCalledTimes(1) + const initialChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(initialChanges).toHaveLength(1) + expect(initialChanges[0]!.key).toBe(2) + expect(initialChanges[0]!.type).toBe(`insert`) + + // Reset mock + callback.mockReset() + + // Update an inactive item to active (should emit insert) + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.update(1, (draft) => { + draft.status = `active` + }) + ) + + // Should emit an insert event for the newly active item + expect(callback).toHaveBeenCalledTimes(1) + const insertChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(insertChanges).toHaveLength(1) + expect(insertChanges[0]!.type).toBe(`insert`) + expect(insertChanges[0]!.key).toBe(1) + expect(insertChanges[0]!.value.status).toBe(`active`) + + // Reset mock + callback.mockReset() + + // Update an active item to inactive (should emit delete) + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.update(2, (draft) => { + draft.status = `inactive` + }) + ) + + // Should emit a delete event for the newly inactive item + expect(callback).toHaveBeenCalledTimes(1) + const deleteChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(deleteChanges).toHaveLength(1) + expect(deleteChanges[0]!.type).toBe(`delete`) + expect(deleteChanges[0]!.key).toBe(2) + + // Clean up + subscription.unsubscribe() + }) + + it(`should support where callback with multiple conditions`, () => { + const callback = vi.fn() + + // Create collection with items + const collection = createCollection<{ + id: number + value: string + status: `active` | `inactive` + priority: number + }>({ + id: `where-callback-and-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `item1`, status: `active`, priority: 3 }, + }) + write({ + type: `insert`, + value: { id: 2, value: `item2`, status: `active`, priority: 8 }, + }) + write({ + type: `insert`, + value: { id: 3, value: `item3`, status: `inactive`, priority: 10 }, + }) + commit() + }, + }, + }) + + // Subscribe with where callback using and() for multiple conditions + const subscription = collection.subscribeChanges(callback, { + includeInitialState: true, + where: (row) => and(eq(row.status, `active`), gt(row.priority, 5)), + }) + + // Should only receive item2 (active AND priority > 5) + expect(callback).toHaveBeenCalledTimes(1) + const initialChanges = callback.mock.calls[0]![0] as ChangesPayload + expect(initialChanges).toHaveLength(1) + expect(initialChanges[0]!.key).toBe(2) + expect(initialChanges[0]!.value).toEqual({ + id: 2, + value: `item2`, + status: `active`, + priority: 8, + }) + + // Clean up + subscription.unsubscribe() + }) }) From 3c101ecd946c4617a14808435030f50da1a53631 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 15:26:00 +0000 Subject: [PATCH 2/6] chore: add changeset for where callback feature --- .../where-callback-subscribe-changes.md | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .changeset/where-callback-subscribe-changes.md diff --git a/.changeset/where-callback-subscribe-changes.md b/.changeset/where-callback-subscribe-changes.md new file mode 100644 index 000000000..d701e292f --- /dev/null +++ b/.changeset/where-callback-subscribe-changes.md @@ -0,0 +1,21 @@ +--- +"@tanstack/db": minor +--- + +Add `where` callback option to `subscribeChanges` for ergonomic filtering + +Instead of manually constructing IR with `PropRef`: +```ts +import { eq, PropRef } from "@tanstack/db" +collection.subscribeChanges(callback, { + whereExpression: eq(new PropRef(["status"]), "active") +}) +``` + +You can now use a callback with query builder functions: +```ts +import { eq } from "@tanstack/db" +collection.subscribeChanges(callback, { + where: (row) => eq(row.status, "active") +}) +``` From 682ffef4e37ca17045c90624b0243b49b9724d46 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 15:28:39 +0000 Subject: [PATCH 3/6] chore: fix changeset to patch and format --- .changeset/where-callback-subscribe-changes.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.changeset/where-callback-subscribe-changes.md b/.changeset/where-callback-subscribe-changes.md index d701e292f..86027ff39 100644 --- a/.changeset/where-callback-subscribe-changes.md +++ b/.changeset/where-callback-subscribe-changes.md @@ -1,21 +1,23 @@ --- -"@tanstack/db": minor +"@tanstack/db": patch --- Add `where` callback option to `subscribeChanges` for ergonomic filtering Instead of manually constructing IR with `PropRef`: + ```ts import { eq, PropRef } from "@tanstack/db" collection.subscribeChanges(callback, { - whereExpression: eq(new PropRef(["status"]), "active") + whereExpression: eq(new PropRef(["status"]), "active"), }) ``` You can now use a callback with query builder functions: + ```ts import { eq } from "@tanstack/db" collection.subscribeChanges(callback, { - where: (row) => eq(row.status, "active") + where: (row) => eq(row.status, "active"), }) ``` From 9225ffd17bd951a022e6f2230b6b0550cef99db7 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Dec 2025 15:37:19 +0000 Subject: [PATCH 4/6] feat(db): throw error if both where and whereExpression provided --- packages/db/src/collection/changes.ts | 8 +++++++- .../db/tests/collection-subscribe-changes.test.ts | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index d0e7a1b26..5dfc3e67b 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -104,8 +104,14 @@ export class CollectionChangesManager< this.addSubscriber() // Compile where callback to whereExpression if provided + if (options.where && options.whereExpression) { + throw new Error( + `Cannot specify both 'where' and 'whereExpression' options. Use one or the other.` + ) + } + let whereExpression = options.whereExpression - if (options.where && !whereExpression) { + if (options.where) { const proxy = createSingleRowRefProxy() const result = options.where(proxy) whereExpression = toExpression(result) diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 41437d208..4dfab648d 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -2066,4 +2066,19 @@ describe(`Collection.subscribeChanges`, () => { // Clean up subscription.unsubscribe() }) + + it(`should throw if both where and whereExpression are provided`, () => { + const collection = createCollection<{ id: number; status: string }>({ + id: `where-both-error-test`, + getKey: (item) => item.id, + sync: { sync: () => {} }, + }) + + expect(() => { + collection.subscribeChanges(() => {}, { + where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), + }) + }).toThrow(`Cannot specify both 'where' and 'whereExpression' options`) + }) }) From 589477faeba8088d01cc17a433c90d32965f84e0 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:19:58 +0000 Subject: [PATCH 5/6] ci: apply automated fixes --- .../where-callback-subscribe-changes.md | 10 ++++----- packages/db/src/collection/changes.ts | 22 +++++++++---------- packages/db/src/collection/index.ts | 2 +- packages/db/src/types.ts | 14 ++++++------ .../collection-subscribe-changes.test.ts | 16 +++++++------- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/.changeset/where-callback-subscribe-changes.md b/.changeset/where-callback-subscribe-changes.md index 86027ff39..687d36e33 100644 --- a/.changeset/where-callback-subscribe-changes.md +++ b/.changeset/where-callback-subscribe-changes.md @@ -1,5 +1,5 @@ --- -"@tanstack/db": patch +'@tanstack/db': patch --- Add `where` callback option to `subscribeChanges` for ergonomic filtering @@ -7,17 +7,17 @@ Add `where` callback option to `subscribeChanges` for ergonomic filtering Instead of manually constructing IR with `PropRef`: ```ts -import { eq, PropRef } from "@tanstack/db" +import { eq, PropRef } from '@tanstack/db' collection.subscribeChanges(callback, { - whereExpression: eq(new PropRef(["status"]), "active"), + whereExpression: eq(new PropRef(['status']), 'active'), }) ``` You can now use a callback with query builder functions: ```ts -import { eq } from "@tanstack/db" +import { eq } from '@tanstack/db' collection.subscribeChanges(callback, { - where: (row) => eq(row.status, "active"), + where: (row) => eq(row.status, 'active'), }) ``` diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index 5dfc3e67b..32e417784 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -1,15 +1,15 @@ -import { NegativeActiveSubscribersError } from "../errors" +import { NegativeActiveSubscribersError } from '../errors' import { createSingleRowRefProxy, toExpression, -} from "../query/builder/ref-proxy.js" -import { CollectionSubscription } from "./subscription.js" -import type { StandardSchemaV1 } from "@standard-schema/spec" -import type { ChangeMessage, SubscribeChangesOptions } from "../types" -import type { CollectionLifecycleManager } from "./lifecycle.js" -import type { CollectionSyncManager } from "./sync.js" -import type { CollectionEventsManager } from "./events.js" -import type { CollectionImpl } from "./index.js" +} from '../query/builder/ref-proxy.js' +import { CollectionSubscription } from './subscription.js' +import type { StandardSchemaV1 } from '@standard-schema/spec' +import type { ChangeMessage, SubscribeChangesOptions } from '../types' +import type { CollectionLifecycleManager } from './lifecycle.js' +import type { CollectionSyncManager } from './sync.js' +import type { CollectionEventsManager } from './events.js' +import type { CollectionImpl } from './index.js' export class CollectionChangesManager< TOutput extends object = Record, @@ -98,7 +98,7 @@ export class CollectionChangesManager< */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {} + options: SubscribeChangesOptions = {}, ): CollectionSubscription { // Start sync and track subscriber this.addSubscriber() @@ -106,7 +106,7 @@ export class CollectionChangesManager< // Compile where callback to whereExpression if provided if (options.where && options.whereExpression) { throw new Error( - `Cannot specify both 'where' and 'whereExpression' options. Use one or the other.` + `Cannot specify both 'where' and 'whereExpression' options. Use one or the other.`, ) } diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index e30b4d4a9..39f59ed73 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -871,7 +871,7 @@ export class CollectionImpl< */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {} + options: SubscribeChangesOptions = {}, ): CollectionSubscription { return this._changes.subscribeChanges(callback, options) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 002772352..564c891e4 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -1,10 +1,10 @@ -import type { IStreamBuilder } from "@tanstack/db-ivm" -import type { Collection } from "./collection/index.js" -import type { StandardSchemaV1 } from "@standard-schema/spec" -import type { Transaction } from "./transactions" -import type { BasicExpression, OrderBy } from "./query/ir.js" -import type { EventEmitter } from "./event-emitter.js" -import type { SingleRowRefProxy } from "./query/builder/ref-proxy.js" +import type { IStreamBuilder } from '@tanstack/db-ivm' +import type { Collection } from './collection/index.js' +import type { StandardSchemaV1 } from '@standard-schema/spec' +import type { Transaction } from './transactions' +import type { BasicExpression, OrderBy } from './query/ir.js' +import type { EventEmitter } from './event-emitter.js' +import type { SingleRowRefProxy } from './query/builder/ref-proxy.js' /** * Interface for a collection-like object that provides the necessary methods diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 4dfab648d..02e987e1e 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1,9 +1,9 @@ -import { describe, expect, it, vi } from "vitest" -import mitt from "mitt" -import { createCollection } from "../src/collection/index.js" -import { createTransaction } from "../src/transactions" -import { and, eq, gt } from "../src/query/builder/functions" -import { PropRef } from "../src/query/ir" +import { describe, expect, it, vi } from 'vitest' +import mitt from 'mitt' +import { createCollection } from '../src/collection/index.js' +import { createTransaction } from '../src/transactions' +import { and, eq, gt } from '../src/query/builder/functions' +import { PropRef } from '../src/query/ir' import type { ChangeMessage, ChangesPayload, @@ -1972,7 +1972,7 @@ describe(`Collection.subscribeChanges`, () => { tx1.mutate(() => collection.update(1, (draft) => { draft.status = `active` - }) + }), ) // Should emit an insert event for the newly active item @@ -1995,7 +1995,7 @@ describe(`Collection.subscribeChanges`, () => { tx2.mutate(() => collection.update(2, (draft) => { draft.status = `inactive` - }) + }), ) // Should emit a delete event for the newly inactive item From 3805fcd555a9f7f7d18f95934dc9c2ebbef3f093 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:30:03 +0000 Subject: [PATCH 6/6] refactor: use destructuring for future-proof option spreading Co-authored-by: Kevin --- packages/db/src/collection/changes.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index 32e417784..dc1aa4023 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -110,15 +110,16 @@ export class CollectionChangesManager< ) } - let whereExpression = options.whereExpression - if (options.where) { + const { where, ...opts } = options + let whereExpression = opts.whereExpression + if (where) { const proxy = createSingleRowRefProxy() - const result = options.where(proxy) + const result = where(proxy) whereExpression = toExpression(result) } const subscription = new CollectionSubscription(this.collection, callback, { - includeInitialState: options.includeInitialState, + ...opts, whereExpression, onUnsubscribe: () => { this.removeSubscriber()