diff --git a/.changeset/where-callback-subscribe-changes.md b/.changeset/where-callback-subscribe-changes.md new file mode 100644 index 000000000..687d36e33 --- /dev/null +++ b/.changeset/where-callback-subscribe-changes.md @@ -0,0 +1,23 @@ +--- +'@tanstack/db': patch +--- + +Add `where` callback option to `subscribeChanges` for ergonomic filtering + +Instead of manually constructing IR with `PropRef`: + +```ts +import { eq, PropRef } from '@tanstack/db' +collection.subscribeChanges(callback, { + whereExpression: eq(new PropRef(['status']), 'active'), +}) +``` + +You can now use a callback with query builder functions: + +```ts +import { eq } from '@tanstack/db' +collection.subscribeChanges(callback, { + where: (row) => eq(row.status, 'active'), +}) +``` diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index a69400375..dc1aa4023 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -1,4 +1,8 @@ import { NegativeActiveSubscribersError } from '../errors' +import { + createSingleRowRefProxy, + toExpression, +} from '../query/builder/ref-proxy.js' import { CollectionSubscription } from './subscription.js' import type { StandardSchemaV1 } from '@standard-schema/spec' import type { ChangeMessage, SubscribeChangesOptions } from '../types' @@ -94,13 +98,29 @@ export class CollectionChangesManager< */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {}, + options: SubscribeChangesOptions = {}, ): CollectionSubscription { // Start sync and track subscriber this.addSubscriber() + // Compile where callback to whereExpression if provided + if (options.where && options.whereExpression) { + throw new Error( + `Cannot specify both 'where' and 'whereExpression' options. Use one or the other.`, + ) + } + + const { where, ...opts } = options + let whereExpression = opts.whereExpression + if (where) { + const proxy = createSingleRowRefProxy() + const result = where(proxy) + whereExpression = toExpression(result) + } + const subscription = new CollectionSubscription(this.collection, callback, { - ...options, + ...opts, + whereExpression, onUnsubscribe: () => { this.removeSubscriber() this.changeSubscriptions.delete(subscription) diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index ecd8d70bf..39f59ed73 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -849,26 +849,29 @@ export class CollectionImpl< * }, { includeInitialState: true }) * * @example - * // Subscribe only to changes matching a condition + * // Subscribe only to changes matching a condition using where callback + * import { eq } from "@tanstack/db" + * * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { * includeInitialState: true, - * where: (row) => row.status === 'active' + * where: (row) => eq(row.status, "active") * }) * * @example - * // Subscribe using a pre-compiled expression + * // Using multiple conditions with and() + * import { and, eq, gt } from "@tanstack/db" + * * const subscription = collection.subscribeChanges((changes) => { * updateUI(changes) * }, { - * includeInitialState: true, - * whereExpression: eq(row.status, 'active') + * where: (row) => and(eq(row.status, "active"), gt(row.priority, 5)) * }) */ public subscribeChanges( callback: (changes: Array>) => void, - options: SubscribeChangesOptions = {}, + options: SubscribeChangesOptions = {}, ): CollectionSubscription { return this._changes.subscribeChanges(callback, options) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index bb78f35eb..564c891e4 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -4,6 +4,7 @@ import type { StandardSchemaV1 } from '@standard-schema/spec' import type { Transaction } from './transactions' import type { BasicExpression, OrderBy } from './query/ir.js' import type { EventEmitter } from './event-emitter.js' +import type { SingleRowRefProxy } from './query/builder/ref-proxy.js' /** * Interface for a collection-like object that provides the necessary methods @@ -775,17 +776,33 @@ export type NamespacedAndKeyedStream = IStreamBuilder /** * Options for subscribing to collection changes */ -export interface SubscribeChangesOptions { +export interface SubscribeChangesOptions< + T extends object = Record, +> { /** Whether to include the current state as initial changes */ includeInitialState?: boolean + /** + * Callback function for filtering changes using a row proxy. + * The callback receives a proxy object that records property access, + * allowing you to use query builder functions like `eq`, `gt`, etc. + * + * @example + * ```ts + * import { eq } from "@tanstack/db" + * + * collection.subscribeChanges(callback, { + * where: (row) => eq(row.status, "active") + * }) + * ``` + */ + where?: (row: SingleRowRefProxy) => any /** Pre-compiled expression for filtering changes */ whereExpression?: BasicExpression } -export interface SubscribeChangesSnapshotOptions extends Omit< - SubscribeChangesOptions, - `includeInitialState` -> { +export interface SubscribeChangesSnapshotOptions< + T extends object = Record, +> extends Omit, `includeInitialState`> { orderBy?: OrderBy limit?: number } diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index f3f4b1cce..02e987e1e 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it, vi } from 'vitest' import mitt from 'mitt' import { createCollection } from '../src/collection/index.js' import { createTransaction } from '../src/transactions' -import { eq } from '../src/query/builder/functions' +import { and, eq, gt } from '../src/query/builder/functions' import { PropRef } from '../src/query/ir' import type { ChangeMessage, @@ -1916,4 +1916,169 @@ describe(`Collection.subscribeChanges`, () => { expect(collection.status).toBe(`ready`) expect(collection.size).toBe(2) }) + + it(`should support where callback for filtering changes`, () => { + const callback = vi.fn() + + // Create collection with items that have a status field + const collection = createCollection<{ + id: number + value: string + status: `active` | `inactive` + }>({ + id: `where-callback-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit }) => { + // Start with some initial data + begin() + write({ + type: `insert`, + value: { id: 1, value: `item1`, status: `inactive` }, + }) + write({ + type: `insert`, + value: { id: 2, value: `item2`, status: `active` }, + }) + commit() + }, + }, + }) + + const mutationFn: MutationFn = async () => {} + + // Subscribe to changes with a where callback for active items only + const subscription = collection.subscribeChanges(callback, { + includeInitialState: true, + where: (row) => eq(row.status, `active`), + }) + + // Should only receive the active item in initial state + expect(callback).toHaveBeenCalledTimes(1) + const initialChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(initialChanges).toHaveLength(1) + expect(initialChanges[0]!.key).toBe(2) + expect(initialChanges[0]!.type).toBe(`insert`) + + // Reset mock + callback.mockReset() + + // Update an inactive item to active (should emit insert) + const tx1 = createTransaction({ mutationFn }) + tx1.mutate(() => + collection.update(1, (draft) => { + draft.status = `active` + }), + ) + + // Should emit an insert event for the newly active item + expect(callback).toHaveBeenCalledTimes(1) + const insertChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(insertChanges).toHaveLength(1) + expect(insertChanges[0]!.type).toBe(`insert`) + expect(insertChanges[0]!.key).toBe(1) + expect(insertChanges[0]!.value.status).toBe(`active`) + + // Reset mock + callback.mockReset() + + // Update an active item to inactive (should emit delete) + const tx2 = createTransaction({ mutationFn }) + tx2.mutate(() => + collection.update(2, (draft) => { + draft.status = `inactive` + }), + ) + + // Should emit a delete event for the newly inactive item + expect(callback).toHaveBeenCalledTimes(1) + const deleteChanges = callback.mock.calls[0]![0] as ChangesPayload<{ + id: number + value: string + status: `active` | `inactive` + }> + expect(deleteChanges).toHaveLength(1) + expect(deleteChanges[0]!.type).toBe(`delete`) + expect(deleteChanges[0]!.key).toBe(2) + + // Clean up + subscription.unsubscribe() + }) + + it(`should support where callback with multiple conditions`, () => { + const callback = vi.fn() + + // Create collection with items + const collection = createCollection<{ + id: number + value: string + status: `active` | `inactive` + priority: number + }>({ + id: `where-callback-and-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { id: 1, value: `item1`, status: `active`, priority: 3 }, + }) + write({ + type: `insert`, + value: { id: 2, value: `item2`, status: `active`, priority: 8 }, + }) + write({ + type: `insert`, + value: { id: 3, value: `item3`, status: `inactive`, priority: 10 }, + }) + commit() + }, + }, + }) + + // Subscribe with where callback using and() for multiple conditions + const subscription = collection.subscribeChanges(callback, { + includeInitialState: true, + where: (row) => and(eq(row.status, `active`), gt(row.priority, 5)), + }) + + // Should only receive item2 (active AND priority > 5) + expect(callback).toHaveBeenCalledTimes(1) + const initialChanges = callback.mock.calls[0]![0] as ChangesPayload + expect(initialChanges).toHaveLength(1) + expect(initialChanges[0]!.key).toBe(2) + expect(initialChanges[0]!.value).toEqual({ + id: 2, + value: `item2`, + status: `active`, + priority: 8, + }) + + // Clean up + subscription.unsubscribe() + }) + + it(`should throw if both where and whereExpression are provided`, () => { + const collection = createCollection<{ id: number; status: string }>({ + id: `where-both-error-test`, + getKey: (item) => item.id, + sync: { sync: () => {} }, + }) + + expect(() => { + collection.subscribeChanges(() => {}, { + where: (row) => eq(row.status, `active`), + whereExpression: eq(new PropRef([`status`]), `active`), + }) + }).toThrow(`Cannot specify both 'where' and 'whereExpression' options`) + }) })