From 6c745574df97cab1bd3b037db70c90937f9aa074 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 20 Aug 2025 14:51:56 +0200 Subject: [PATCH 01/11] Introduce CollectionConfigBuilder class --- .../db/src/query/live-query-collection.ts | 688 +++++++++++++++++- 1 file changed, 661 insertions(+), 27 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 8d0337ab9..6946161b9 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -17,7 +17,7 @@ import type { } from "../types.js" import type { Context, GetResult } from "./builder/types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" -import type { BasicExpression } from "./ir.js" +import type { BasicExpression, QueryIR } from "./ir.js" import type { LazyCollectionCallbacks } from "./compiler/joins.js" // Global counter for auto-generated collection IDs @@ -93,6 +93,592 @@ export interface LiveQueryCollectionConfig< gcTime?: number } +type Changes = { + deletes: number + inserts: number + value: T + orderByIndex: string | undefined +} + +type SyncState = { + messagesCount: number + subscribedToAllCollections: boolean + unsubscribeCallbacks: Set<() => void> + + graph?: D2 + inputs?: Record> + pipeline?: ResultStream +} + +type FullSyncState = Required + +class CollectionConfigBuilder< + TContext extends Context, + TResult extends object = GetResult, +> { + private readonly id: string + private readonly query: QueryIR + private readonly collections: Record> + + // WeakMap to store the keys of the results + // so that we can retrieve them in the getKey function + private readonly resultKeys = new WeakMap() + + // WeakMap to store the orderBy index for each result + private readonly orderByIndices = new WeakMap() + + private readonly compare?: (val1: TResult, val2: TResult) => number + + private graphCache: D2 | undefined + private inputsCache: Record> | undefined + private pipelineCache: ResultStream | undefined + private collectionWhereClausesCache: + | Map> + | undefined + + // Map of collection IDs to functions that load keys for that lazy collection + private readonly lazyCollectionsCallbacks: Record< + string, + LazyCollectionCallbacks + > = {} + // Set of collection IDs that are lazy collections + private readonly lazyCollections = new Set() + // Set of collection IDs that include an optimizable ORDER BY clause + private readonly optimizableOrderByCollections: Record< + string, + OrderByOptimizationInfo + > = {} + + constructor( + private readonly config: LiveQueryCollectionConfig + ) { + // Generate a unique ID if not provided + this.id = config.id || `live-query-${++liveQueryCollectionCounter}` + + this.query = buildQueryFromConfig(config) + this.collections = extractCollectionsFromQuery(this.query) + + // Create compare function for ordering if the query has orderBy + if (this.query.orderBy && this.query.orderBy.length > 0) { + this.compare = createOrderByComparator(this.orderByIndices) + } + + // Compile the base pipeline once initially + // This is done to ensure that any errors are thrown immediately and synchronously + this.compileBasePipeline() + } + + getConfig(): CollectionConfig { + return { + id: this.id, + getKey: + this.config.getKey || + ((item) => this.resultKeys.get(item) as string | number), + sync: this.getSyncConfig(), + compare: this.compare, + gcTime: this.config.gcTime || 5000, // 5 seconds by default for live queries + schema: this.config.schema, + onInsert: this.config.onInsert, + onUpdate: this.config.onUpdate, + onDelete: this.config.onDelete, + startSync: this.config.startSync, + } + } + + private getSyncConfig(): SyncConfig { + return { + rowUpdateMode: `full`, + sync: this.syncFn.bind(this), + } + } + + private syncFn(config: Parameters[`sync`]>[0]) { + const syncState: SyncState = { + messagesCount: 0, + subscribedToAllCollections: false, + unsubscribeCallbacks: new Set<() => void>(), + } + + // Extend the pipeline such that it applies the incoming changes to the collection + const fullSyncState = this.extendPipelineWithChangeProcessing( + config, + syncState + ) + + this.subscribeToAllCollections(config, fullSyncState) + + // TODO: move this into the subscribeToAllCollections method + syncState.subscribedToAllCollections = true + + // Initial run + this.maybeRunGraph(config, fullSyncState) + + // Return the unsubscribe function + return () => { + syncState.unsubscribeCallbacks.forEach((unsubscribe) => unsubscribe()) + + // Reset caches so a fresh graph/pipeline is compiled on next start + // This avoids reusing a finalized D2 graph across GC restarts + this.graphCache = undefined + this.inputsCache = undefined + this.pipelineCache = undefined + this.collectionWhereClausesCache = undefined + } + } + + private subscribeToAllCollections( + config: Parameters[`sync`]>[0], + syncState: FullSyncState + ) { + Object.entries(this.collections).forEach(([collectionId, collection]) => + this.subscribeToCollection(collectionId, collection, config, syncState) + ) + } + + // TODO: refactor this method into a few smaller methods + private subscribeToCollection( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState + ) { + const input = syncState.inputs[collectionId]! + const collectionAlias = findCollectionAlias(collectionId, this.query) + const whereClause = this.getWhereClauseFromAlias(collectionAlias) + + const sendChangesToPipeline = ( + changes: Iterable>, + callback?: () => boolean + ) => { + sendChangesToInput(input, changes, collection.config.getKey) + this.maybeRunGraph(config, syncState, callback) + } + + // Wraps the sendChangesToPipeline function + // in order to turn `update`s into `insert`s + // for keys that have not been sent to the pipeline yet + // and filter out deletes for keys that have not been sent + const sendVisibleChangesToPipeline = ( + changes: Array>, + loadedInitialState: boolean, + sentKeys: Set + ) => { + if (loadedInitialState) { + // There was no index for the join key + // so we loaded the initial state + // so we can safely assume that the pipeline has seen all keys + return sendChangesToPipeline(changes) + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert` } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + } + newChanges.push(newChange) + } + + return sendChangesToPipeline(newChanges) + } + + const loadKeys = ( + keys: Iterable, + sentKeys: Set, + filterFn: (item: object) => boolean + ) => { + for (const key of keys) { + // Only load the key once + if (sentKeys.has(key)) continue + + const value = collection.get(key) + if (value !== undefined && filterFn(value)) { + sentKeys.add(key) + sendChangesToPipeline([{ type: `insert`, key, value }]) + } + } + } + + const subscribeToAllChanges = ( + whereExpression: BasicExpression | undefined + ) => { + const unsubscribe = collection.subscribeChanges(sendChangesToPipeline, { + includeInitialState: true, + ...(whereExpression ? { whereExpression } : undefined), + }) + return unsubscribe + } + + // Subscribes to all changes but without the initial state + // such that we can load keys from the initial state on demand + // based on the matching keys from the main collection in the join + const subscribeToMatchingChanges = ( + whereExpression: BasicExpression | undefined + ) => { + let loadedInitialState = false + const sentKeys = new Set() + + const sendVisibleChanges = ( + changes: Array> + ) => { + sendVisibleChangesToPipeline(changes, loadedInitialState, sentKeys) + } + + const unsubscribe = collection.subscribeChanges(sendVisibleChanges, { + whereExpression, + }) + + // Create a function that loads keys from the collection + // into the query pipeline on demand + const filterFn = whereExpression + ? createFilterFunctionFromExpression(whereExpression) + : () => true + const loadKs = (keys: Set) => { + return loadKeys(keys, sentKeys, filterFn) + } + + // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map + // This is used by the join operator to dynamically load matching keys from the lazy collection + // or to get the full initial state of the collection if there's no index for the join key + this.lazyCollectionsCallbacks[collectionId] = { + loadKeys: loadKs, + loadInitialState: () => { + // Make sure we only load the initial state once + if (loadedInitialState) return + loadedInitialState = true + + const changes = collection.currentStateAsChanges({ + whereExpression, + }) + sendChangesToPipeline(changes) + }, + } + return unsubscribe + } + + const subscribeToOrderedChanges = ( + whereExpression: BasicExpression | undefined + ) => { + const { + offset, + limit, + comparator, + index, + dataNeeded, + valueExtractorForRawRow, + } = this.optimizableOrderByCollections[collectionId]! + + if (!dataNeeded) { + // This should never happen because the topK operator should always set the size callback + // which in turn should lead to the orderBy operator setting the dataNeeded callback + throw new Error( + `Missing dataNeeded callback for collection ${collectionId}` + ) + } + + // This function is called by maybeRunGraph + // after each iteration of the query pipeline + // to ensure that the orderBy operator has enough data to work with + const loadMoreIfNeeded = () => { + // `dataNeeded` probes the orderBy operator to see if it needs more data + // if it needs more data, it returns the number of items it needs + const n = dataNeeded() + if (n > 0) { + loadNextItems(n) + } + + // Indicate that we're done loading data if we didn't need to load more data + return n === 0 + } + + // Keep track of the keys we've sent + // and also the biggest value we've sent so far + const sentValuesInfo: { + sentKeys: Set + biggest: any + } = { + sentKeys: new Set(), + biggest: undefined, + } + + const sendChangesToPipelineWithTracking = ( + changes: Iterable> + ) => { + const trackedChanges = trackSentValues( + changes, + comparator, + sentValuesInfo + ) + sendChangesToPipeline(trackedChanges, loadMoreIfNeeded) + } + + // Loads the next `n` items from the collection + // starting from the biggest item it has sent + const loadNextItems = (n: number) => { + const biggestSentRow = sentValuesInfo.biggest + const biggestSentValue = biggestSentRow + ? valueExtractorForRawRow(biggestSentRow) + : biggestSentRow + // Take the `n` items after the biggest sent value + const nextOrderedKeys = index.take(n, biggestSentValue) + const nextInserts: Array> = + nextOrderedKeys.map((key) => { + return { type: `insert`, key, value: collection.get(key) } + }) + sendChangesToPipelineWithTracking(nextInserts) + } + + // Load the first `offset + limit` values from the index + // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ + loadNextItems(offset + limit) + + const sendChangesInRange = ( + changes: Iterable> + ) => { + // Split live updates into a delete of the old value and an insert of the new value + // and filter out changes that are bigger than the biggest value we've sent so far + // because they can't affect the topK + const splittedChanges = splitUpdates(changes) + const filteredChanges = filterChangesSmallerOrEqualToMax( + splittedChanges, + comparator, + sentValuesInfo.biggest + ) + sendChangesToPipeline(filteredChanges, loadMoreIfNeeded) + } + + // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far + // values that are bigger don't need to be sent because they can't affect the topK + const unsubscribe = collection.subscribeChanges(sendChangesInRange, { + whereExpression, + }) + + return unsubscribe + } + + const subscribeToChanges = (whereExpression?: BasicExpression) => { + let unsubscribe: () => void + if (this.lazyCollections.has(collectionId)) { + unsubscribe = subscribeToMatchingChanges(whereExpression) + } else if ( + Object.hasOwn(this.optimizableOrderByCollections, collectionId) + ) { + unsubscribe = subscribeToOrderedChanges(whereExpression) + } else { + unsubscribe = subscribeToAllChanges(whereExpression) + } + syncState.unsubscribeCallbacks.add(unsubscribe) + } + + if (whereClause) { + // Convert WHERE clause to BasicExpression format for collection subscription + const whereExpression = convertToBasicExpression( + whereClause, + collectionAlias! + ) + + if (whereExpression) { + // Use index optimization for this collection + subscribeToChanges(whereExpression) + } else { + // This should not happen - if we have a whereClause but can't create whereExpression, + // it indicates a bug in our optimization logic + throw new Error( + `Failed to convert WHERE clause to collection filter for collection '${collectionId}'. ` + + `This indicates a bug in the query optimization logic.` + ) + } + } else { + // No WHERE clause for this collection, use regular subscription + subscribeToChanges() + } + } + + private compileBasePipeline() { + this.graphCache = new D2() + this.inputsCache = Object.fromEntries( + Object.entries(this.collections).map(([key]) => [ + key, + this.graphCache!.newInput(), + ]) + ) + + // Compile the query and get both pipeline and collection WHERE clauses + const { + pipeline: pipelineCache, + collectionWhereClauses: collectionWhereClausesCache, + } = compileQuery( + this.query, + this.inputsCache as Record, + this.collections, + this.lazyCollectionsCallbacks, + this.lazyCollections, + this.optimizableOrderByCollections + ) + + this.pipelineCache = pipelineCache + this.collectionWhereClausesCache = collectionWhereClausesCache + } + + private maybeCompileBasePipeline() { + if (!this.graphCache || !this.inputsCache || !this.pipelineCache) { + this.compileBasePipeline() + } + return { + graph: this.graphCache!, + inputs: this.inputsCache!, + pipeline: this.pipelineCache!, + } + } + + private extendPipelineWithChangeProcessing( + config: Parameters[`sync`]>[0], + syncState: SyncState + ): FullSyncState { + const { begin, commit } = config + const { graph, inputs, pipeline } = this.maybeCompileBasePipeline() + + pipeline.pipe( + output((data) => { + const messages = data.getInner() + syncState.messagesCount += messages.length + + begin() + messages + .reduce( + accumulateChanges, + new Map>() + ) + .forEach(this.applyChanges.bind(this, config)) + commit() + }) + ) + + graph.finalize() + + // Extend the sync state with the graph, inputs, and pipeline + syncState.graph = graph + syncState.inputs = inputs + syncState.pipeline = pipeline + + return syncState as FullSyncState + } + + private applyChanges( + config: Parameters[`sync`]>[0], + changes: { + deletes: number + inserts: number + value: TResult + orderByIndex: string | undefined + }, + key: unknown + ) { + const { write, collection } = config + const { deletes, inserts, value, orderByIndex } = changes + + // Store the key of the result so that we can retrieve it in the + // getKey function + this.resultKeys.set(value, key) + + // Store the orderBy index if it exists + if (orderByIndex !== undefined) { + this.orderByIndices.set(value, orderByIndex) + } + + // Simple singular insert. + if (inserts && deletes === 0) { + write({ + value, + type: `insert`, + }) + } else if ( + // Insert & update(s) (updates are a delete & insert) + inserts > deletes || + // Just update(s) but the item is already in the collection (so + // was inserted previously). + (inserts === deletes && collection.has(key as string | number)) + ) { + write({ + value, + type: `update`, + }) + // Only delete is left as an option + } else if (deletes > 0) { + write({ + value, + type: `delete`, + }) + } else { + throw new Error( + `Could not apply changes: ${JSON.stringify(changes)}. This should never happen.` + ) + } + } + + // The callback function is called after the graph has run. + // This gives the callback a chance to load more data if needed, + // that's used to optimize orderBy operators that set a limit, + // in order to load some more data if we still don't have enough rows after the pipeline has run. + // That can happend because even though we load N rows, the pipeline might filter some of these rows out + // causing the orderBy operator to receive less than N rows or even no rows at all. + // So this callback would notice that it doesn't have enough rows and load some more. + // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. + private maybeRunGraph( + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + callback?: () => boolean + ) { + const { begin, commit, markReady } = config + + // We only run the graph if all the collections are ready + if ( + this.allCollectionsReadyOrInitialCommit() && + syncState.subscribedToAllCollections + ) { + syncState.graph.run() + const ready = callback?.() ?? true + // On the initial run, we may need to do an empty commit to ensure that + // the collection is initialized + if (syncState.messagesCount === 0) { + begin() + commit() + } + // Mark the collection as ready after the first successful run + if (ready && this.allCollectionsReady()) { + markReady() + } + } + } + + private allCollectionsReady() { + return Object.values(this.collections).every((collection) => + collection.isReady() + ) + } + + private allCollectionsReadyOrInitialCommit() { + return Object.values(this.collections).every( + (collection) => + collection.status === `ready` || collection.status === `initialCommit` + ) + } + + private getWhereClauseFromAlias( + collectionAlias: string | undefined + ): BasicExpression | undefined { + if (collectionAlias && this.collectionWhereClausesCache) { + return this.collectionWhereClausesCache.get(collectionAlias) + } + return undefined + } +} + /** * Creates live query collection options for use with createCollection * @@ -123,17 +709,15 @@ export function liveQueryCollectionOptions< >( config: LiveQueryCollectionConfig ): CollectionConfig { + /* // Generate a unique ID if not provided const id = config.id || `live-query-${++liveQueryCollectionCounter}` - // Build the query using the provided query builder function or instance - const query = - typeof config.query === `function` - ? buildQuery(config.query) - : getQueryIR(config.query) + // Build the query + const query = buildQueryFromConfig(config) - // WeakMap to store the keys of the results so that we can retreve them in the - // getKey function + // WeakMap to store the keys of the results + // so that we can retrieve them in the getKey function const resultKeys = new WeakMap() // WeakMap to store the orderBy index for each result @@ -142,25 +726,7 @@ export function liveQueryCollectionOptions< // Create compare function for ordering if the query has orderBy const compare = query.orderBy && query.orderBy.length > 0 - ? (val1: TResult, val2: TResult): number => { - // Use the orderBy index stored in the WeakMap - const index1 = orderByIndices.get(val1) - const index2 = orderByIndices.get(val2) - - // Compare fractional indices lexicographically - if (index1 && index2) { - if (index1 < index2) { - return -1 - } else if (index1 > index2) { - return 1 - } else { - return 0 - } - } - - // Fallback to no ordering if indices are missing - return 0 - } + ? createOrderByComparator(orderByIndices) : undefined const collections = extractCollectionsFromQuery(query) @@ -650,6 +1216,12 @@ export function liveQueryCollectionOptions< onDelete: config.onDelete, startSync: config.startSync, } + */ + const collectionConfigBuilder = new CollectionConfigBuilder< + TContext, + TResult + >(config) + return collectionConfigBuilder.getConfig() } /** @@ -779,6 +1351,40 @@ function sendChangesToInput( input.sendData(new MultiSet(multiSetArray)) } +function buildQueryFromConfig( + config: LiveQueryCollectionConfig +) { + // Build the query using the provided query builder function or instance + if (typeof config.query === `function`) { + return buildQuery(config.query) + } + return getQueryIR(config.query) +} + +function createOrderByComparator( + orderByIndices: WeakMap +) { + return (val1: T, val2: T): number => { + // Use the orderBy index stored in the WeakMap + const index1 = orderByIndices.get(val1) + const index2 = orderByIndices.get(val2) + + // Compare fractional indices lexicographically + if (index1 && index2) { + if (index1 < index2) { + return -1 + } else if (index1 > index2) { + return 1 + } else { + return 0 + } + } + + // Fallback to no ordering if indices are missing + return 0 + } +} + /** * Helper function to extract collections from a compiled query * Traverses the query IR to find all collection references @@ -860,6 +1466,34 @@ function findCollectionAlias( return undefined } +function accumulateChanges( + acc: Map>, + [[key, tupleData], multiplicity]: [ + [unknown, [any, string | undefined]], + number, + ] +) { + // All queries now consistently return [value, orderByIndex] format + // where orderByIndex is undefined for queries without ORDER BY + const [value, orderByIndex] = tupleData as [T, string | undefined] + + const changes = acc.get(key) || { + deletes: 0, + inserts: 0, + value, + orderByIndex, + } + if (multiplicity < 0) { + changes.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + changes.inserts += multiplicity + changes.value = value + changes.orderByIndex = orderByIndex + } + acc.set(key, changes) + return acc +} + function* trackSentValues( changes: Iterable>, comparator: (a: any, b: any) => number, From f05c2d5a5a1d3e2e7495747ef2c7b54489eb1562 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 10:11:34 +0200 Subject: [PATCH 02/11] Refactor subscribeToCollection method to split into helper methods --- .../db/src/query/live-query-collection.ts | 720 +++++++++++------- 1 file changed, 444 insertions(+), 276 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 6946161b9..b708408d6 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -207,9 +207,6 @@ class CollectionConfigBuilder< this.subscribeToAllCollections(config, fullSyncState) - // TODO: move this into the subscribeToAllCollections method - syncState.subscribedToAllCollections = true - // Initial run this.maybeRunGraph(config, fullSyncState) @@ -226,279 +223,6 @@ class CollectionConfigBuilder< } } - private subscribeToAllCollections( - config: Parameters[`sync`]>[0], - syncState: FullSyncState - ) { - Object.entries(this.collections).forEach(([collectionId, collection]) => - this.subscribeToCollection(collectionId, collection, config, syncState) - ) - } - - // TODO: refactor this method into a few smaller methods - private subscribeToCollection( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState - ) { - const input = syncState.inputs[collectionId]! - const collectionAlias = findCollectionAlias(collectionId, this.query) - const whereClause = this.getWhereClauseFromAlias(collectionAlias) - - const sendChangesToPipeline = ( - changes: Iterable>, - callback?: () => boolean - ) => { - sendChangesToInput(input, changes, collection.config.getKey) - this.maybeRunGraph(config, syncState, callback) - } - - // Wraps the sendChangesToPipeline function - // in order to turn `update`s into `insert`s - // for keys that have not been sent to the pipeline yet - // and filter out deletes for keys that have not been sent - const sendVisibleChangesToPipeline = ( - changes: Array>, - loadedInitialState: boolean, - sentKeys: Set - ) => { - if (loadedInitialState) { - // There was no index for the join key - // so we loaded the initial state - // so we can safely assume that the pipeline has seen all keys - return sendChangesToPipeline(changes) - } - - const newChanges = [] - for (const change of changes) { - let newChange = change - if (!sentKeys.has(change.key)) { - if (change.type === `update`) { - newChange = { ...change, type: `insert` } - } else if (change.type === `delete`) { - // filter out deletes for keys that have not been sent - continue - } - } - newChanges.push(newChange) - } - - return sendChangesToPipeline(newChanges) - } - - const loadKeys = ( - keys: Iterable, - sentKeys: Set, - filterFn: (item: object) => boolean - ) => { - for (const key of keys) { - // Only load the key once - if (sentKeys.has(key)) continue - - const value = collection.get(key) - if (value !== undefined && filterFn(value)) { - sentKeys.add(key) - sendChangesToPipeline([{ type: `insert`, key, value }]) - } - } - } - - const subscribeToAllChanges = ( - whereExpression: BasicExpression | undefined - ) => { - const unsubscribe = collection.subscribeChanges(sendChangesToPipeline, { - includeInitialState: true, - ...(whereExpression ? { whereExpression } : undefined), - }) - return unsubscribe - } - - // Subscribes to all changes but without the initial state - // such that we can load keys from the initial state on demand - // based on the matching keys from the main collection in the join - const subscribeToMatchingChanges = ( - whereExpression: BasicExpression | undefined - ) => { - let loadedInitialState = false - const sentKeys = new Set() - - const sendVisibleChanges = ( - changes: Array> - ) => { - sendVisibleChangesToPipeline(changes, loadedInitialState, sentKeys) - } - - const unsubscribe = collection.subscribeChanges(sendVisibleChanges, { - whereExpression, - }) - - // Create a function that loads keys from the collection - // into the query pipeline on demand - const filterFn = whereExpression - ? createFilterFunctionFromExpression(whereExpression) - : () => true - const loadKs = (keys: Set) => { - return loadKeys(keys, sentKeys, filterFn) - } - - // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map - // This is used by the join operator to dynamically load matching keys from the lazy collection - // or to get the full initial state of the collection if there's no index for the join key - this.lazyCollectionsCallbacks[collectionId] = { - loadKeys: loadKs, - loadInitialState: () => { - // Make sure we only load the initial state once - if (loadedInitialState) return - loadedInitialState = true - - const changes = collection.currentStateAsChanges({ - whereExpression, - }) - sendChangesToPipeline(changes) - }, - } - return unsubscribe - } - - const subscribeToOrderedChanges = ( - whereExpression: BasicExpression | undefined - ) => { - const { - offset, - limit, - comparator, - index, - dataNeeded, - valueExtractorForRawRow, - } = this.optimizableOrderByCollections[collectionId]! - - if (!dataNeeded) { - // This should never happen because the topK operator should always set the size callback - // which in turn should lead to the orderBy operator setting the dataNeeded callback - throw new Error( - `Missing dataNeeded callback for collection ${collectionId}` - ) - } - - // This function is called by maybeRunGraph - // after each iteration of the query pipeline - // to ensure that the orderBy operator has enough data to work with - const loadMoreIfNeeded = () => { - // `dataNeeded` probes the orderBy operator to see if it needs more data - // if it needs more data, it returns the number of items it needs - const n = dataNeeded() - if (n > 0) { - loadNextItems(n) - } - - // Indicate that we're done loading data if we didn't need to load more data - return n === 0 - } - - // Keep track of the keys we've sent - // and also the biggest value we've sent so far - const sentValuesInfo: { - sentKeys: Set - biggest: any - } = { - sentKeys: new Set(), - biggest: undefined, - } - - const sendChangesToPipelineWithTracking = ( - changes: Iterable> - ) => { - const trackedChanges = trackSentValues( - changes, - comparator, - sentValuesInfo - ) - sendChangesToPipeline(trackedChanges, loadMoreIfNeeded) - } - - // Loads the next `n` items from the collection - // starting from the biggest item it has sent - const loadNextItems = (n: number) => { - const biggestSentRow = sentValuesInfo.biggest - const biggestSentValue = biggestSentRow - ? valueExtractorForRawRow(biggestSentRow) - : biggestSentRow - // Take the `n` items after the biggest sent value - const nextOrderedKeys = index.take(n, biggestSentValue) - const nextInserts: Array> = - nextOrderedKeys.map((key) => { - return { type: `insert`, key, value: collection.get(key) } - }) - sendChangesToPipelineWithTracking(nextInserts) - } - - // Load the first `offset + limit` values from the index - // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - loadNextItems(offset + limit) - - const sendChangesInRange = ( - changes: Iterable> - ) => { - // Split live updates into a delete of the old value and an insert of the new value - // and filter out changes that are bigger than the biggest value we've sent so far - // because they can't affect the topK - const splittedChanges = splitUpdates(changes) - const filteredChanges = filterChangesSmallerOrEqualToMax( - splittedChanges, - comparator, - sentValuesInfo.biggest - ) - sendChangesToPipeline(filteredChanges, loadMoreIfNeeded) - } - - // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far - // values that are bigger don't need to be sent because they can't affect the topK - const unsubscribe = collection.subscribeChanges(sendChangesInRange, { - whereExpression, - }) - - return unsubscribe - } - - const subscribeToChanges = (whereExpression?: BasicExpression) => { - let unsubscribe: () => void - if (this.lazyCollections.has(collectionId)) { - unsubscribe = subscribeToMatchingChanges(whereExpression) - } else if ( - Object.hasOwn(this.optimizableOrderByCollections, collectionId) - ) { - unsubscribe = subscribeToOrderedChanges(whereExpression) - } else { - unsubscribe = subscribeToAllChanges(whereExpression) - } - syncState.unsubscribeCallbacks.add(unsubscribe) - } - - if (whereClause) { - // Convert WHERE clause to BasicExpression format for collection subscription - const whereExpression = convertToBasicExpression( - whereClause, - collectionAlias! - ) - - if (whereExpression) { - // Use index optimization for this collection - subscribeToChanges(whereExpression) - } else { - // This should not happen - if we have a whereClause but can't create whereExpression, - // it indicates a bug in our optimization logic - throw new Error( - `Failed to convert WHERE clause to collection filter for collection '${collectionId}'. ` + - `This indicates a bug in the query optimization logic.` - ) - } - } else { - // No WHERE clause for this collection, use regular subscription - subscribeToChanges() - } - } - private compileBasePipeline() { this.graphCache = new D2() this.inputsCache = Object.fromEntries( @@ -677,6 +401,450 @@ class CollectionConfigBuilder< } return undefined } + + private subscribeToAllCollections( + config: Parameters[`sync`]>[0], + syncState: FullSyncState + ) { + Object.entries(this.collections).forEach(([collectionId, collection]) => + this.subscribeToCollection(collectionId, collection, config, syncState) + ) + + // Mark the collections as subscribed in the sync state + syncState.subscribedToAllCollections = true + } + + private subscribeToCollection( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState + ) { + const collectionAlias = findCollectionAlias(collectionId, this.query) + const whereClause = this.getWhereClauseFromAlias(collectionAlias) + + if (whereClause) { + // Convert WHERE clause to BasicExpression format for collection subscription + const whereExpression = convertToBasicExpression( + whereClause, + collectionAlias! + ) + + if (whereExpression) { + // Use index optimization for this collection + this.subscribeToChanges( + collectionId, + collection, + config, + syncState, + whereExpression + ) + } else { + // This should not happen - if we have a whereClause but can't create whereExpression, + // it indicates a bug in our optimization logic + throw new Error( + `Failed to convert WHERE clause to collection filter for collection '${collectionId}'. ` + + `This indicates a bug in the query optimization logic.` + ) + } + } else { + // No WHERE clause for this collection, use regular subscription + this.subscribeToChanges(collectionId, collection, config, syncState) + } + } + + private subscribeToChanges( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + whereExpression?: BasicExpression + ) { + let unsubscribe: () => void + if (this.lazyCollections.has(collectionId)) { + unsubscribe = this.subscribeToMatchingChanges( + collectionId, + collection, + config, + syncState, + whereExpression + ) + } else if ( + Object.hasOwn(this.optimizableOrderByCollections, collectionId) + ) { + unsubscribe = this.subscribeToOrderedChanges( + collectionId, + collection, + config, + syncState, + whereExpression + ) + } else { + unsubscribe = this.subscribeToAllChanges( + collectionId, + collection, + config, + syncState, + whereExpression + ) + } + syncState.unsubscribeCallbacks.add(unsubscribe) + } + + private sendChangesToPipeline( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + changes: Iterable>, + callback?: () => boolean + ) { + const input = syncState.inputs[collectionId]! + sendChangesToInput(input, changes, collection.config.getKey) + this.maybeRunGraph(config, syncState, callback) + } + + // Wraps the sendChangesToPipeline function + // in order to turn `update`s into `insert`s + // for keys that have not been sent to the pipeline yet + // and filter out deletes for keys that have not been sent + private sendVisibleChangesToPipeline = ( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + changes: Array>, + loadedInitialState: boolean, + sentKeys: Set + ) => { + if (loadedInitialState) { + // There was no index for the join key + // so we loaded the initial state + // so we can safely assume that the pipeline has seen all keys + return this.sendChangesToPipeline( + collectionId, + collection, + config, + syncState, + changes + ) + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert` } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + } + newChanges.push(newChange) + } + + return this.sendChangesToPipeline( + collectionId, + collection, + config, + syncState, + newChanges + ) + } + + private loadKeys( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + keys: Iterable, + sentKeys: Set, + filterFn: (item: object) => boolean + ) { + for (const key of keys) { + // Only load the key once + if (sentKeys.has(key)) continue + + const value = collection.get(key) + if (value !== undefined && filterFn(value)) { + sentKeys.add(key) + this.sendChangesToPipeline( + collectionId, + collection, + config, + syncState, + [{ type: `insert`, key, value }] + ) + } + } + } + + private subscribeToAllChanges( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + whereExpression: BasicExpression | undefined + ) { + const sendChangesToPipeline = this.sendChangesToPipeline.bind( + this, + collectionId, + collection, + config, + syncState + ) + const unsubscribe = collection.subscribeChanges(sendChangesToPipeline, { + includeInitialState: true, + ...(whereExpression ? { whereExpression } : undefined), + }) + return unsubscribe + } + + private subscribeToMatchingChanges( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + whereExpression: BasicExpression | undefined + ) { + let loadedInitialState = false + const sentKeys = new Set() + + const sendVisibleChanges = ( + changes: Array> + ) => { + this.sendVisibleChangesToPipeline( + collectionId, + collection, + config, + syncState, + changes, + loadedInitialState, + sentKeys + ) + } + + const unsubscribe = collection.subscribeChanges(sendVisibleChanges, { + whereExpression, + }) + + // Create a function that loads keys from the collection + // into the query pipeline on demand + const filterFn = whereExpression + ? createFilterFunctionFromExpression(whereExpression) + : () => true + const loadKs = (keys: Set) => { + return this.loadKeys( + collectionId, + collection, + config, + syncState, + keys, + sentKeys, + filterFn + ) + } + + // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map + // This is used by the join operator to dynamically load matching keys from the lazy collection + // or to get the full initial state of the collection if there's no index for the join key + this.lazyCollectionsCallbacks[collectionId] = { + loadKeys: loadKs, + loadInitialState: () => { + // Make sure we only load the initial state once + if (loadedInitialState) return + loadedInitialState = true + + const changes = collection.currentStateAsChanges({ + whereExpression, + }) + this.sendChangesToPipeline( + collectionId, + collection, + config, + syncState, + changes + ) + }, + } + return unsubscribe + } + + private subscribeToOrderedChanges( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + whereExpression: BasicExpression | undefined + ) { + const { offset, limit, comparator } = + this.optimizableOrderByCollections[collectionId]! + + // Keep track of the keys we've sent + // and also the biggest value we've sent so far + const sentValuesInfo: { + sentKeys: Set + biggest: any + } = { + sentKeys: new Set(), + biggest: undefined, + } + + // Load the first `offset + limit` values from the index + // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ + this.loadNextItems( + collectionId, + collection, + config, + syncState, + sentValuesInfo, + offset + limit + ) + + const sendChangesInRange = ( + changes: Iterable> + ) => { + // Split live updates into a delete of the old value and an insert of the new value + // and filter out changes that are bigger than the biggest value we've sent so far + // because they can't affect the topK + const splittedChanges = splitUpdates(changes) + const filteredChanges = filterChangesSmallerOrEqualToMax( + splittedChanges, + comparator, + sentValuesInfo.biggest + ) + this.sendChangesToPipeline( + collectionId, + collection, + config, + syncState, + filteredChanges, + this.loadMoreIfNeeded.bind( + this, + collectionId, + collection, + config, + syncState, + sentValuesInfo + ) + ) + } + + // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far + // values that are bigger don't need to be sent because they can't affect the topK + const unsubscribe = collection.subscribeChanges(sendChangesInRange, { + whereExpression, + }) + + return unsubscribe + } + + // This function is called by maybeRunGraph + // after each iteration of the query pipeline + // to ensure that the orderBy operator has enough data to work with + private loadMoreIfNeeded( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + sentValuesInfo: { + sentKeys: Set + biggest: any + } + ) { + const { dataNeeded } = this.optimizableOrderByCollections[collectionId]! + + if (!dataNeeded) { + // This should never happen because the topK operator should always set the size callback + // which in turn should lead to the orderBy operator setting the dataNeeded callback + throw new Error( + `Missing dataNeeded callback for collection ${collectionId}` + ) + } + + // `dataNeeded` probes the orderBy operator to see if it needs more data + // if it needs more data, it returns the number of items it needs + const n = dataNeeded() + if (n > 0) { + this.loadNextItems( + collectionId, + collection, + config, + syncState, + sentValuesInfo, + n + ) + } + + // Indicate that we're done loading data if we didn't need to load more data + return n === 0 + } + + private sendChangesToPipelineWithTracking( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + sentValuesInfo: { + sentKeys: Set + biggest: any + }, + changes: Iterable> + ) { + const { comparator } = this.optimizableOrderByCollections[collectionId]! + const trackedChanges = trackSentValues(changes, comparator, sentValuesInfo) + this.sendChangesToPipeline( + collectionId, + collection, + config, + syncState, + trackedChanges, + this.loadMoreIfNeeded.bind( + this, + collectionId, + collection, + config, + syncState, + sentValuesInfo + ) + ) + } + + // Loads the next `n` items from the collection + // starting from the biggest item it has sent + private loadNextItems( + collectionId: string, + collection: Collection, + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + sentValuesInfo: { + sentKeys: Set + biggest: any + }, + n: number + ) { + const { valueExtractorForRawRow, index } = + this.optimizableOrderByCollections[collectionId]! + const biggestSentRow = sentValuesInfo.biggest + const biggestSentValue = biggestSentRow + ? valueExtractorForRawRow(biggestSentRow) + : biggestSentRow + // Take the `n` items after the biggest sent value + const nextOrderedKeys = index.take(n, biggestSentValue) + const nextInserts: Array> = + nextOrderedKeys.map((key) => { + return { type: `insert`, key, value: collection.get(key) } + }) + this.sendChangesToPipelineWithTracking( + collectionId, + collection, + config, + syncState, + sentValuesInfo, + nextInserts + ) + } } /** From f3d35ea32b6a3a78fea5b5dd36850fdddf02b9a8 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 11:46:34 +0200 Subject: [PATCH 03/11] Add CollectionSubscriber class that encapsulates subscription logic for live collections --- .../db/src/query/live-query-collection.ts | 869 ++++++++---------- 1 file changed, 375 insertions(+), 494 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index b708408d6..2d31c8b19 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -112,12 +112,329 @@ type SyncState = { type FullSyncState = Required +class CollectionSubscriber< + TContext extends Context, + TResult extends object = GetResult, +> { + constructor( + private collectionId: string, + private collection: Collection, + private config: Parameters[`sync`]>[0], + private syncState: FullSyncState, + private collectionConfigBuilder: CollectionConfigBuilder + ) {} + + subscribe() { + const collectionAlias = findCollectionAlias( + this.collectionId, + this.collectionConfigBuilder.query + ) + const whereClause = this.getWhereClauseFromAlias(collectionAlias) + + if (whereClause) { + // Convert WHERE clause to BasicExpression format for collection subscription + const whereExpression = convertToBasicExpression( + whereClause, + collectionAlias! + ) + + if (whereExpression) { + // Use index optimization for this collection + this.subscribeToChanges(whereExpression) + } else { + // This should not happen - if we have a whereClause but can't create whereExpression, + // it indicates a bug in our optimization logic + throw new Error( + `Failed to convert WHERE clause to collection filter for collection '${this.collectionId}'. ` + + `This indicates a bug in the query optimization logic.` + ) + } + } else { + // No WHERE clause for this collection, use regular subscription + this.subscribeToChanges() + } + } + + private subscribeToChanges(whereExpression?: BasicExpression) { + let unsubscribe: () => void + if (this.collectionConfigBuilder.lazyCollections.has(this.collectionId)) { + unsubscribe = this.subscribeToMatchingChanges(whereExpression) + } else if ( + Object.hasOwn( + this.collectionConfigBuilder.optimizableOrderByCollections, + this.collectionId + ) + ) { + unsubscribe = this.subscribeToOrderedChanges(whereExpression) + } else { + unsubscribe = this.subscribeToAllChanges(whereExpression) + } + this.syncState.unsubscribeCallbacks.add(unsubscribe) + } + + private sendChangesToPipeline( + changes: Iterable>, + callback?: () => boolean + ) { + const input = this.syncState.inputs[this.collectionId]! + sendChangesToInput(input, changes, this.collection.config.getKey) + this.collectionConfigBuilder.maybeRunGraph( + this.config, + this.syncState, + callback + ) + } + + // Wraps the sendChangesToPipeline function + // in order to turn `update`s into `insert`s + // for keys that have not been sent to the pipeline yet + // and filter out deletes for keys that have not been sent + private sendVisibleChangesToPipeline = ( + changes: Array>, + loadedInitialState: boolean, + sentKeys: Set + ) => { + if (loadedInitialState) { + // There was no index for the join key + // so we loaded the initial state + // so we can safely assume that the pipeline has seen all keys + return this.sendChangesToPipeline(changes) + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert` } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + } + newChanges.push(newChange) + } + + return this.sendChangesToPipeline(newChanges) + } + + private loadKeys( + keys: Iterable, + sentKeys: Set, + filterFn: (item: object) => boolean + ) { + for (const key of keys) { + // Only load the key once + if (sentKeys.has(key)) continue + + const value = this.collection.get(key) + if (value !== undefined && filterFn(value)) { + sentKeys.add(key) + this.sendChangesToPipeline([{ type: `insert`, key, value }]) + } + } + } + + private subscribeToAllChanges( + whereExpression: BasicExpression | undefined + ) { + const sendChangesToPipeline = this.sendChangesToPipeline.bind(this) + const unsubscribe = this.collection.subscribeChanges( + sendChangesToPipeline, + { + includeInitialState: true, + ...(whereExpression ? { whereExpression } : undefined), + } + ) + return unsubscribe + } + + private subscribeToMatchingChanges( + whereExpression: BasicExpression | undefined + ) { + let loadedInitialState = false + const sentKeys = new Set() + + const sendVisibleChanges = ( + changes: Array> + ) => { + this.sendVisibleChangesToPipeline(changes, loadedInitialState, sentKeys) + } + + const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { + whereExpression, + }) + + // Create a function that loads keys from the collection + // into the query pipeline on demand + const filterFn = whereExpression + ? createFilterFunctionFromExpression(whereExpression) + : () => true + const loadKs = (keys: Set) => { + return this.loadKeys(keys, sentKeys, filterFn) + } + + // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map + // This is used by the join operator to dynamically load matching keys from the lazy collection + // or to get the full initial state of the collection if there's no index for the join key + this.collectionConfigBuilder.lazyCollectionsCallbacks[this.collectionId] = { + loadKeys: loadKs, + loadInitialState: () => { + // Make sure we only load the initial state once + if (loadedInitialState) return + loadedInitialState = true + + const changes = this.collection.currentStateAsChanges({ + whereExpression, + }) + this.sendChangesToPipeline(changes) + }, + } + return unsubscribe + } + + private subscribeToOrderedChanges( + whereExpression: BasicExpression | undefined + ) { + const { offset, limit, comparator } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + + // Keep track of the keys we've sent + // and also the biggest value we've sent so far + const sentValuesInfo: { + sentKeys: Set + biggest: any + } = { + sentKeys: new Set(), + biggest: undefined, + } + + // Load the first `offset + limit` values from the index + // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ + this.loadNextItems(sentValuesInfo, offset + limit) + + const sendChangesInRange = ( + changes: Iterable> + ) => { + // Split live updates into a delete of the old value and an insert of the new value + // and filter out changes that are bigger than the biggest value we've sent so far + // because they can't affect the topK + const splittedChanges = splitUpdates(changes) + const filteredChanges = filterChangesSmallerOrEqualToMax( + splittedChanges, + comparator, + sentValuesInfo.biggest + ) + this.sendChangesToPipeline( + filteredChanges, + this.loadMoreIfNeeded.bind(this, sentValuesInfo) + ) + } + + // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far + // values that are bigger don't need to be sent because they can't affect the topK + const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, { + whereExpression, + }) + + return unsubscribe + } + + // This function is called by maybeRunGraph + // after each iteration of the query pipeline + // to ensure that the orderBy operator has enough data to work with + private loadMoreIfNeeded(sentValuesInfo: { + sentKeys: Set + biggest: any + }) { + const { dataNeeded } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + + if (!dataNeeded) { + // This should never happen because the topK operator should always set the size callback + // which in turn should lead to the orderBy operator setting the dataNeeded callback + throw new Error( + `Missing dataNeeded callback for collection ${this.collectionId}` + ) + } + + // `dataNeeded` probes the orderBy operator to see if it needs more data + // if it needs more data, it returns the number of items it needs + const n = dataNeeded() + if (n > 0) { + this.loadNextItems(sentValuesInfo, n) + } + + // Indicate that we're done loading data if we didn't need to load more data + return n === 0 + } + + private sendChangesToPipelineWithTracking( + sentValuesInfo: { + sentKeys: Set + biggest: any + }, + changes: Iterable> + ) { + const { comparator } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + const trackedChanges = trackSentValues(changes, comparator, sentValuesInfo) + this.sendChangesToPipeline( + trackedChanges, + this.loadMoreIfNeeded.bind(this, sentValuesInfo) + ) + } + + // Loads the next `n` items from the collection + // starting from the biggest item it has sent + private loadNextItems( + sentValuesInfo: { + sentKeys: Set + biggest: any + }, + n: number + ) { + const { valueExtractorForRawRow, index } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + const biggestSentRow = sentValuesInfo.biggest + const biggestSentValue = biggestSentRow + ? valueExtractorForRawRow(biggestSentRow) + : biggestSentRow + // Take the `n` items after the biggest sent value + const nextOrderedKeys = index.take(n, biggestSentValue) + const nextInserts: Array> = + nextOrderedKeys.map((key) => { + return { type: `insert`, key, value: this.collection.get(key) } + }) + this.sendChangesToPipelineWithTracking(sentValuesInfo, nextInserts) + } + + private getWhereClauseFromAlias( + collectionAlias: string | undefined + ): BasicExpression | undefined { + const collectionWhereClausesCache = + this.collectionConfigBuilder.collectionWhereClausesCache + if (collectionAlias && collectionWhereClausesCache) { + return collectionWhereClausesCache.get(collectionAlias) + } + return undefined + } +} + class CollectionConfigBuilder< TContext extends Context, TResult extends object = GetResult, > { private readonly id: string - private readonly query: QueryIR + readonly query: QueryIR private readonly collections: Record> // WeakMap to store the keys of the results @@ -132,19 +449,17 @@ class CollectionConfigBuilder< private graphCache: D2 | undefined private inputsCache: Record> | undefined private pipelineCache: ResultStream | undefined - private collectionWhereClausesCache: + public collectionWhereClausesCache: | Map> | undefined // Map of collection IDs to functions that load keys for that lazy collection - private readonly lazyCollectionsCallbacks: Record< - string, - LazyCollectionCallbacks - > = {} + readonly lazyCollectionsCallbacks: Record = + {} // Set of collection IDs that are lazy collections - private readonly lazyCollections = new Set() + readonly lazyCollections = new Set() // Set of collection IDs that include an optimizable ORDER BY clause - private readonly optimizableOrderByCollections: Record< + readonly optimizableOrderByCollections: Record< string, OrderByOptimizationInfo > = {} @@ -185,17 +500,52 @@ class CollectionConfigBuilder< } } - private getSyncConfig(): SyncConfig { - return { - rowUpdateMode: `full`, - sync: this.syncFn.bind(this), - } - } - - private syncFn(config: Parameters[`sync`]>[0]) { - const syncState: SyncState = { - messagesCount: 0, - subscribedToAllCollections: false, + // The callback function is called after the graph has run. + // This gives the callback a chance to load more data if needed, + // that's used to optimize orderBy operators that set a limit, + // in order to load some more data if we still don't have enough rows after the pipeline has run. + // That can happend because even though we load N rows, the pipeline might filter some of these rows out + // causing the orderBy operator to receive less than N rows or even no rows at all. + // So this callback would notice that it doesn't have enough rows and load some more. + // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. + maybeRunGraph( + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + callback?: () => boolean + ) { + const { begin, commit, markReady } = config + + // We only run the graph if all the collections are ready + if ( + this.allCollectionsReadyOrInitialCommit() && + syncState.subscribedToAllCollections + ) { + syncState.graph.run() + const ready = callback?.() ?? true + // On the initial run, we may need to do an empty commit to ensure that + // the collection is initialized + if (syncState.messagesCount === 0) { + begin() + commit() + } + // Mark the collection as ready after the first successful run + if (ready && this.allCollectionsReady()) { + markReady() + } + } + } + + private getSyncConfig(): SyncConfig { + return { + rowUpdateMode: `full`, + sync: this.syncFn.bind(this), + } + } + + private syncFn(config: Parameters[`sync`]>[0]) { + const syncState: SyncState = { + messagesCount: 0, + subscribedToAllCollections: false, unsubscribeCallbacks: new Set<() => void>(), } @@ -345,41 +695,6 @@ class CollectionConfigBuilder< } } - // The callback function is called after the graph has run. - // This gives the callback a chance to load more data if needed, - // that's used to optimize orderBy operators that set a limit, - // in order to load some more data if we still don't have enough rows after the pipeline has run. - // That can happend because even though we load N rows, the pipeline might filter some of these rows out - // causing the orderBy operator to receive less than N rows or even no rows at all. - // So this callback would notice that it doesn't have enough rows and load some more. - // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. - private maybeRunGraph( - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - callback?: () => boolean - ) { - const { begin, commit, markReady } = config - - // We only run the graph if all the collections are ready - if ( - this.allCollectionsReadyOrInitialCommit() && - syncState.subscribedToAllCollections - ) { - syncState.graph.run() - const ready = callback?.() ?? true - // On the initial run, we may need to do an empty commit to ensure that - // the collection is initialized - if (syncState.messagesCount === 0) { - begin() - commit() - } - // Mark the collection as ready after the first successful run - if (ready && this.allCollectionsReady()) { - markReady() - } - } - } - private allCollectionsReady() { return Object.values(this.collections).every((collection) => collection.isReady() @@ -393,457 +708,23 @@ class CollectionConfigBuilder< ) } - private getWhereClauseFromAlias( - collectionAlias: string | undefined - ): BasicExpression | undefined { - if (collectionAlias && this.collectionWhereClausesCache) { - return this.collectionWhereClausesCache.get(collectionAlias) - } - return undefined - } - private subscribeToAllCollections( config: Parameters[`sync`]>[0], syncState: FullSyncState ) { - Object.entries(this.collections).forEach(([collectionId, collection]) => - this.subscribeToCollection(collectionId, collection, config, syncState) - ) - - // Mark the collections as subscribed in the sync state - syncState.subscribedToAllCollections = true - } - - private subscribeToCollection( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState - ) { - const collectionAlias = findCollectionAlias(collectionId, this.query) - const whereClause = this.getWhereClauseFromAlias(collectionAlias) - - if (whereClause) { - // Convert WHERE clause to BasicExpression format for collection subscription - const whereExpression = convertToBasicExpression( - whereClause, - collectionAlias! - ) - - if (whereExpression) { - // Use index optimization for this collection - this.subscribeToChanges( - collectionId, - collection, - config, - syncState, - whereExpression - ) - } else { - // This should not happen - if we have a whereClause but can't create whereExpression, - // it indicates a bug in our optimization logic - throw new Error( - `Failed to convert WHERE clause to collection filter for collection '${collectionId}'. ` + - `This indicates a bug in the query optimization logic.` - ) - } - } else { - // No WHERE clause for this collection, use regular subscription - this.subscribeToChanges(collectionId, collection, config, syncState) - } - } - - private subscribeToChanges( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - whereExpression?: BasicExpression - ) { - let unsubscribe: () => void - if (this.lazyCollections.has(collectionId)) { - unsubscribe = this.subscribeToMatchingChanges( - collectionId, - collection, - config, - syncState, - whereExpression - ) - } else if ( - Object.hasOwn(this.optimizableOrderByCollections, collectionId) - ) { - unsubscribe = this.subscribeToOrderedChanges( + Object.entries(this.collections).forEach(([collectionId, collection]) => { + const collectionSubscriber = new CollectionSubscriber( collectionId, collection, config, syncState, - whereExpression + this ) - } else { - unsubscribe = this.subscribeToAllChanges( - collectionId, - collection, - config, - syncState, - whereExpression - ) - } - syncState.unsubscribeCallbacks.add(unsubscribe) - } - - private sendChangesToPipeline( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - changes: Iterable>, - callback?: () => boolean - ) { - const input = syncState.inputs[collectionId]! - sendChangesToInput(input, changes, collection.config.getKey) - this.maybeRunGraph(config, syncState, callback) - } - - // Wraps the sendChangesToPipeline function - // in order to turn `update`s into `insert`s - // for keys that have not been sent to the pipeline yet - // and filter out deletes for keys that have not been sent - private sendVisibleChangesToPipeline = ( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - changes: Array>, - loadedInitialState: boolean, - sentKeys: Set - ) => { - if (loadedInitialState) { - // There was no index for the join key - // so we loaded the initial state - // so we can safely assume that the pipeline has seen all keys - return this.sendChangesToPipeline( - collectionId, - collection, - config, - syncState, - changes - ) - } - - const newChanges = [] - for (const change of changes) { - let newChange = change - if (!sentKeys.has(change.key)) { - if (change.type === `update`) { - newChange = { ...change, type: `insert` } - } else if (change.type === `delete`) { - // filter out deletes for keys that have not been sent - continue - } - } - newChanges.push(newChange) - } - - return this.sendChangesToPipeline( - collectionId, - collection, - config, - syncState, - newChanges - ) - } - - private loadKeys( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - keys: Iterable, - sentKeys: Set, - filterFn: (item: object) => boolean - ) { - for (const key of keys) { - // Only load the key once - if (sentKeys.has(key)) continue - - const value = collection.get(key) - if (value !== undefined && filterFn(value)) { - sentKeys.add(key) - this.sendChangesToPipeline( - collectionId, - collection, - config, - syncState, - [{ type: `insert`, key, value }] - ) - } - } - } - - private subscribeToAllChanges( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - whereExpression: BasicExpression | undefined - ) { - const sendChangesToPipeline = this.sendChangesToPipeline.bind( - this, - collectionId, - collection, - config, - syncState - ) - const unsubscribe = collection.subscribeChanges(sendChangesToPipeline, { - includeInitialState: true, - ...(whereExpression ? { whereExpression } : undefined), - }) - return unsubscribe - } - - private subscribeToMatchingChanges( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - whereExpression: BasicExpression | undefined - ) { - let loadedInitialState = false - const sentKeys = new Set() - - const sendVisibleChanges = ( - changes: Array> - ) => { - this.sendVisibleChangesToPipeline( - collectionId, - collection, - config, - syncState, - changes, - loadedInitialState, - sentKeys - ) - } - - const unsubscribe = collection.subscribeChanges(sendVisibleChanges, { - whereExpression, - }) - - // Create a function that loads keys from the collection - // into the query pipeline on demand - const filterFn = whereExpression - ? createFilterFunctionFromExpression(whereExpression) - : () => true - const loadKs = (keys: Set) => { - return this.loadKeys( - collectionId, - collection, - config, - syncState, - keys, - sentKeys, - filterFn - ) - } - - // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map - // This is used by the join operator to dynamically load matching keys from the lazy collection - // or to get the full initial state of the collection if there's no index for the join key - this.lazyCollectionsCallbacks[collectionId] = { - loadKeys: loadKs, - loadInitialState: () => { - // Make sure we only load the initial state once - if (loadedInitialState) return - loadedInitialState = true - - const changes = collection.currentStateAsChanges({ - whereExpression, - }) - this.sendChangesToPipeline( - collectionId, - collection, - config, - syncState, - changes - ) - }, - } - return unsubscribe - } - - private subscribeToOrderedChanges( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - whereExpression: BasicExpression | undefined - ) { - const { offset, limit, comparator } = - this.optimizableOrderByCollections[collectionId]! - - // Keep track of the keys we've sent - // and also the biggest value we've sent so far - const sentValuesInfo: { - sentKeys: Set - biggest: any - } = { - sentKeys: new Set(), - biggest: undefined, - } - - // Load the first `offset + limit` values from the index - // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - this.loadNextItems( - collectionId, - collection, - config, - syncState, - sentValuesInfo, - offset + limit - ) - - const sendChangesInRange = ( - changes: Iterable> - ) => { - // Split live updates into a delete of the old value and an insert of the new value - // and filter out changes that are bigger than the biggest value we've sent so far - // because they can't affect the topK - const splittedChanges = splitUpdates(changes) - const filteredChanges = filterChangesSmallerOrEqualToMax( - splittedChanges, - comparator, - sentValuesInfo.biggest - ) - this.sendChangesToPipeline( - collectionId, - collection, - config, - syncState, - filteredChanges, - this.loadMoreIfNeeded.bind( - this, - collectionId, - collection, - config, - syncState, - sentValuesInfo - ) - ) - } - - // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far - // values that are bigger don't need to be sent because they can't affect the topK - const unsubscribe = collection.subscribeChanges(sendChangesInRange, { - whereExpression, + collectionSubscriber.subscribe() }) - return unsubscribe - } - - // This function is called by maybeRunGraph - // after each iteration of the query pipeline - // to ensure that the orderBy operator has enough data to work with - private loadMoreIfNeeded( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - sentValuesInfo: { - sentKeys: Set - biggest: any - } - ) { - const { dataNeeded } = this.optimizableOrderByCollections[collectionId]! - - if (!dataNeeded) { - // This should never happen because the topK operator should always set the size callback - // which in turn should lead to the orderBy operator setting the dataNeeded callback - throw new Error( - `Missing dataNeeded callback for collection ${collectionId}` - ) - } - - // `dataNeeded` probes the orderBy operator to see if it needs more data - // if it needs more data, it returns the number of items it needs - const n = dataNeeded() - if (n > 0) { - this.loadNextItems( - collectionId, - collection, - config, - syncState, - sentValuesInfo, - n - ) - } - - // Indicate that we're done loading data if we didn't need to load more data - return n === 0 - } - - private sendChangesToPipelineWithTracking( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - sentValuesInfo: { - sentKeys: Set - biggest: any - }, - changes: Iterable> - ) { - const { comparator } = this.optimizableOrderByCollections[collectionId]! - const trackedChanges = trackSentValues(changes, comparator, sentValuesInfo) - this.sendChangesToPipeline( - collectionId, - collection, - config, - syncState, - trackedChanges, - this.loadMoreIfNeeded.bind( - this, - collectionId, - collection, - config, - syncState, - sentValuesInfo - ) - ) - } - - // Loads the next `n` items from the collection - // starting from the biggest item it has sent - private loadNextItems( - collectionId: string, - collection: Collection, - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - sentValuesInfo: { - sentKeys: Set - biggest: any - }, - n: number - ) { - const { valueExtractorForRawRow, index } = - this.optimizableOrderByCollections[collectionId]! - const biggestSentRow = sentValuesInfo.biggest - const biggestSentValue = biggestSentRow - ? valueExtractorForRawRow(biggestSentRow) - : biggestSentRow - // Take the `n` items after the biggest sent value - const nextOrderedKeys = index.take(n, biggestSentValue) - const nextInserts: Array> = - nextOrderedKeys.map((key) => { - return { type: `insert`, key, value: collection.get(key) } - }) - this.sendChangesToPipelineWithTracking( - collectionId, - collection, - config, - syncState, - sentValuesInfo, - nextInserts - ) + // Mark the collections as subscribed in the sync state + syncState.subscribedToAllCollections = true } } From fee27b71acf371bf05b529b24969de66b76e8945 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 11:53:07 +0200 Subject: [PATCH 04/11] Move sentValuesInfo into the class' state --- .../db/src/query/live-query-collection.ts | 58 ++++++++----------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 2d31c8b19..08f0a871c 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -116,6 +116,16 @@ class CollectionSubscriber< TContext extends Context, TResult extends object = GetResult, > { + // Keep track of the keys we've sent + // and also the biggest value we've sent so far + private sentValuesInfo: { + sentKeys: Set + biggest: any + } = { + sentKeys: new Set(), + biggest: undefined, + } + constructor( private collectionId: string, private collection: Collection, @@ -301,19 +311,9 @@ class CollectionSubscriber< this.collectionId ]! - // Keep track of the keys we've sent - // and also the biggest value we've sent so far - const sentValuesInfo: { - sentKeys: Set - biggest: any - } = { - sentKeys: new Set(), - biggest: undefined, - } - // Load the first `offset + limit` values from the index // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - this.loadNextItems(sentValuesInfo, offset + limit) + this.loadNextItems(offset + limit) const sendChangesInRange = ( changes: Iterable> @@ -325,11 +325,11 @@ class CollectionSubscriber< const filteredChanges = filterChangesSmallerOrEqualToMax( splittedChanges, comparator, - sentValuesInfo.biggest + this.sentValuesInfo.biggest ) this.sendChangesToPipeline( filteredChanges, - this.loadMoreIfNeeded.bind(this, sentValuesInfo) + this.loadMoreIfNeeded.bind(this) ) } @@ -345,10 +345,7 @@ class CollectionSubscriber< // This function is called by maybeRunGraph // after each iteration of the query pipeline // to ensure that the orderBy operator has enough data to work with - private loadMoreIfNeeded(sentValuesInfo: { - sentKeys: Set - biggest: any - }) { + private loadMoreIfNeeded() { const { dataNeeded } = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId @@ -366,7 +363,7 @@ class CollectionSubscriber< // if it needs more data, it returns the number of items it needs const n = dataNeeded() if (n > 0) { - this.loadNextItems(sentValuesInfo, n) + this.loadNextItems(n) } // Indicate that we're done loading data if we didn't need to load more data @@ -374,37 +371,28 @@ class CollectionSubscriber< } private sendChangesToPipelineWithTracking( - sentValuesInfo: { - sentKeys: Set - biggest: any - }, changes: Iterable> ) { const { comparator } = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! - const trackedChanges = trackSentValues(changes, comparator, sentValuesInfo) - this.sendChangesToPipeline( - trackedChanges, - this.loadMoreIfNeeded.bind(this, sentValuesInfo) + const trackedChanges = trackSentValues( + changes, + comparator, + this.sentValuesInfo ) + this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this)) } // Loads the next `n` items from the collection // starting from the biggest item it has sent - private loadNextItems( - sentValuesInfo: { - sentKeys: Set - biggest: any - }, - n: number - ) { + private loadNextItems(n: number) { const { valueExtractorForRawRow, index } = this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! - const biggestSentRow = sentValuesInfo.biggest + const biggestSentRow = this.sentValuesInfo.biggest const biggestSentValue = biggestSentRow ? valueExtractorForRawRow(biggestSentRow) : biggestSentRow @@ -414,7 +402,7 @@ class CollectionSubscriber< nextOrderedKeys.map((key) => { return { type: `insert`, key, value: this.collection.get(key) } }) - this.sendChangesToPipelineWithTracking(sentValuesInfo, nextInserts) + this.sendChangesToPipelineWithTracking(nextInserts) } private getWhereClauseFromAlias( From c95091a1a1166f1de22f1aac1a85684af33a6744 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 13:54:38 +0200 Subject: [PATCH 05/11] Split sentValuesInfo class field into 2 fields: sentKeys and biggest --- .../db/src/query/live-query-collection.ts | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 08f0a871c..7ba450622 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -116,15 +116,11 @@ class CollectionSubscriber< TContext extends Context, TResult extends object = GetResult, > { - // Keep track of the keys we've sent - // and also the biggest value we've sent so far - private sentValuesInfo: { - sentKeys: Set - biggest: any - } = { - sentKeys: new Set(), - biggest: undefined, - } + // Keep track of the keys we've sent (needed for join and orderBy optimizations) + private sentKeys = new Set() + + // Keep track of the biggest value we've sent so far (needed for orderBy optimization) + private biggest: any = undefined constructor( private collectionId: string, @@ -325,7 +321,7 @@ class CollectionSubscriber< const filteredChanges = filterChangesSmallerOrEqualToMax( splittedChanges, comparator, - this.sentValuesInfo.biggest + this.biggest ) this.sendChangesToPipeline( filteredChanges, @@ -377,11 +373,7 @@ class CollectionSubscriber< this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! - const trackedChanges = trackSentValues( - changes, - comparator, - this.sentValuesInfo - ) + const trackedChanges = this.trackSentValues(changes, comparator) this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this)) } @@ -392,7 +384,7 @@ class CollectionSubscriber< this.collectionConfigBuilder.optimizableOrderByCollections[ this.collectionId ]! - const biggestSentRow = this.sentValuesInfo.biggest + const biggestSentRow = this.biggest const biggestSentValue = biggestSentRow ? valueExtractorForRawRow(biggestSentRow) : biggestSentRow @@ -415,6 +407,23 @@ class CollectionSubscriber< } return undefined } + + private *trackSentValues( + changes: Iterable>, + comparator: (a: any, b: any) => number + ) { + for (const change of changes) { + this.sentKeys.add(change.key) + + if (!this.biggest) { + this.biggest = change.value + } else if (comparator(this.biggest, change.value) < 0) { + this.biggest = change.value + } + + yield change + } + } } class CollectionConfigBuilder< @@ -1531,24 +1540,6 @@ function accumulateChanges( return acc } -function* trackSentValues( - changes: Iterable>, - comparator: (a: any, b: any) => number, - tracker: { sentKeys: Set; biggest: any } -) { - for (const change of changes) { - tracker.sentKeys.add(change.key) - - if (!tracker.biggest) { - tracker.biggest = change.value - } else if (comparator(tracker.biggest, change.value) < 0) { - tracker.biggest = change.value - } - - yield change - } -} - /** Splits updates into a delete of the old value and an insert of the new value */ function* splitUpdates< T extends object = Record, From d7e95f38cf560c77004220b0372b6a152f0cf123 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 14:01:41 +0200 Subject: [PATCH 06/11] Also use sentKeys for join optimization --- packages/db/src/query/live-query-collection.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 7ba450622..3565d0c92 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -197,8 +197,7 @@ class CollectionSubscriber< // and filter out deletes for keys that have not been sent private sendVisibleChangesToPipeline = ( changes: Array>, - loadedInitialState: boolean, - sentKeys: Set + loadedInitialState: boolean ) => { if (loadedInitialState) { // There was no index for the join key @@ -210,7 +209,7 @@ class CollectionSubscriber< const newChanges = [] for (const change of changes) { let newChange = change - if (!sentKeys.has(change.key)) { + if (!this.sentKeys.has(change.key)) { if (change.type === `update`) { newChange = { ...change, type: `insert` } } else if (change.type === `delete`) { @@ -226,16 +225,15 @@ class CollectionSubscriber< private loadKeys( keys: Iterable, - sentKeys: Set, filterFn: (item: object) => boolean ) { for (const key of keys) { // Only load the key once - if (sentKeys.has(key)) continue + if (this.sentKeys.has(key)) continue const value = this.collection.get(key) if (value !== undefined && filterFn(value)) { - sentKeys.add(key) + this.sentKeys.add(key) this.sendChangesToPipeline([{ type: `insert`, key, value }]) } } @@ -259,12 +257,11 @@ class CollectionSubscriber< whereExpression: BasicExpression | undefined ) { let loadedInitialState = false - const sentKeys = new Set() const sendVisibleChanges = ( changes: Array> ) => { - this.sendVisibleChangesToPipeline(changes, loadedInitialState, sentKeys) + this.sendVisibleChangesToPipeline(changes, loadedInitialState) } const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { @@ -277,7 +274,7 @@ class CollectionSubscriber< ? createFilterFunctionFromExpression(whereExpression) : () => true const loadKs = (keys: Set) => { - return this.loadKeys(keys, sentKeys, filterFn) + return this.loadKeys(keys, filterFn) } // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map From cec44c0ff861b2b3f9a26782f71c162f7291abd7 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 14:17:50 +0200 Subject: [PATCH 07/11] Split the different classes into their own files --- packages/db/src/query/index.ts | 3 +- .../db/src/query/live-query-collection.ts | 932 +----------------- .../src/query/live/CollectionConfigBuilder.ts | 425 ++++++++ .../db/src/query/live/CollectionSubscriber.ts | 426 ++++++++ .../query/live/LiveQueryCollectionConfig.ts | 73 ++ packages/db/src/query/live/types.ts | 21 + 6 files changed, 950 insertions(+), 930 deletions(-) create mode 100644 packages/db/src/query/live/CollectionConfigBuilder.ts create mode 100644 packages/db/src/query/live/CollectionSubscriber.ts create mode 100644 packages/db/src/query/live/LiveQueryCollectionConfig.ts create mode 100644 packages/db/src/query/live/types.ts diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index 20043a04a..d555df359 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -50,5 +50,6 @@ export { compileQuery } from "./compiler/index.js" export { createLiveQueryCollection, liveQueryCollectionOptions, - type LiveQueryCollectionConfig, } from "./live-query-collection.js" + +export { type LiveQueryCollectionConfig } from "./live/LiveQueryCollectionConfig.js" diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 3565d0c92..a085021a4 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -1,726 +1,10 @@ -import { D2, MultiSet, output } from "@tanstack/db-ivm" import { createCollection } from "../collection.js" -import { createFilterFunctionFromExpression } from "../change-events.js" -import { compileQuery } from "./compiler/index.js" -import { buildQuery, getQueryIR } from "./builder/index.js" -import { convertToBasicExpression } from "./compiler/expressions.js" -import type { OrderByOptimizationInfo } from "./compiler/order-by.js" +import { CollectionConfigBuilder } from "./live/CollectionConfigBuilder.js" +import type { LiveQueryCollectionConfig } from "./live/LiveQueryCollectionConfig.js" import type { InitialQueryBuilder, QueryBuilder } from "./builder/index.js" import type { Collection } from "../collection.js" -import type { - ChangeMessage, - CollectionConfig, - KeyedStream, - ResultStream, - SyncConfig, - UtilsRecord, -} from "../types.js" +import type { CollectionConfig, UtilsRecord } from "../types.js" import type { Context, GetResult } from "./builder/types.js" -import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" -import type { BasicExpression, QueryIR } from "./ir.js" -import type { LazyCollectionCallbacks } from "./compiler/joins.js" - -// Global counter for auto-generated collection IDs -let liveQueryCollectionCounter = 0 - -/** - * Configuration interface for live query collection options - * - * @example - * ```typescript - * const config: LiveQueryCollectionConfig = { - * // id is optional - will auto-generate "live-query-1", "live-query-2", etc. - * query: (q) => q - * .from({ comment: commentsCollection }) - * .join( - * { user: usersCollection }, - * ({ comment, user }) => eq(comment.user_id, user.id) - * ) - * .where(({ comment }) => eq(comment.active, true)) - * .select(({ comment, user }) => ({ - * id: comment.id, - * content: comment.content, - * authorName: user.name, - * })), - * // getKey is optional - defaults to using stream key - * getKey: (item) => item.id, - * } - * ``` - */ -export interface LiveQueryCollectionConfig< - TContext extends Context, - TResult extends object = GetResult & object, -> { - /** - * Unique identifier for the collection - * If not provided, defaults to `live-query-${number}` with auto-incrementing number - */ - id?: string - - /** - * Query builder function that defines the live query - */ - query: - | ((q: InitialQueryBuilder) => QueryBuilder) - | QueryBuilder - - /** - * Function to extract the key from result items - * If not provided, defaults to using the key from the D2 stream - */ - getKey?: (item: TResult) => string | number - - /** - * Optional schema for validation - */ - schema?: CollectionConfig[`schema`] - - /** - * Optional mutation handlers - */ - onInsert?: CollectionConfig[`onInsert`] - onUpdate?: CollectionConfig[`onUpdate`] - onDelete?: CollectionConfig[`onDelete`] - - /** - * Start sync / the query immediately - */ - startSync?: boolean - - /** - * GC time for the collection - */ - gcTime?: number -} - -type Changes = { - deletes: number - inserts: number - value: T - orderByIndex: string | undefined -} - -type SyncState = { - messagesCount: number - subscribedToAllCollections: boolean - unsubscribeCallbacks: Set<() => void> - - graph?: D2 - inputs?: Record> - pipeline?: ResultStream -} - -type FullSyncState = Required - -class CollectionSubscriber< - TContext extends Context, - TResult extends object = GetResult, -> { - // Keep track of the keys we've sent (needed for join and orderBy optimizations) - private sentKeys = new Set() - - // Keep track of the biggest value we've sent so far (needed for orderBy optimization) - private biggest: any = undefined - - constructor( - private collectionId: string, - private collection: Collection, - private config: Parameters[`sync`]>[0], - private syncState: FullSyncState, - private collectionConfigBuilder: CollectionConfigBuilder - ) {} - - subscribe() { - const collectionAlias = findCollectionAlias( - this.collectionId, - this.collectionConfigBuilder.query - ) - const whereClause = this.getWhereClauseFromAlias(collectionAlias) - - if (whereClause) { - // Convert WHERE clause to BasicExpression format for collection subscription - const whereExpression = convertToBasicExpression( - whereClause, - collectionAlias! - ) - - if (whereExpression) { - // Use index optimization for this collection - this.subscribeToChanges(whereExpression) - } else { - // This should not happen - if we have a whereClause but can't create whereExpression, - // it indicates a bug in our optimization logic - throw new Error( - `Failed to convert WHERE clause to collection filter for collection '${this.collectionId}'. ` + - `This indicates a bug in the query optimization logic.` - ) - } - } else { - // No WHERE clause for this collection, use regular subscription - this.subscribeToChanges() - } - } - - private subscribeToChanges(whereExpression?: BasicExpression) { - let unsubscribe: () => void - if (this.collectionConfigBuilder.lazyCollections.has(this.collectionId)) { - unsubscribe = this.subscribeToMatchingChanges(whereExpression) - } else if ( - Object.hasOwn( - this.collectionConfigBuilder.optimizableOrderByCollections, - this.collectionId - ) - ) { - unsubscribe = this.subscribeToOrderedChanges(whereExpression) - } else { - unsubscribe = this.subscribeToAllChanges(whereExpression) - } - this.syncState.unsubscribeCallbacks.add(unsubscribe) - } - - private sendChangesToPipeline( - changes: Iterable>, - callback?: () => boolean - ) { - const input = this.syncState.inputs[this.collectionId]! - sendChangesToInput(input, changes, this.collection.config.getKey) - this.collectionConfigBuilder.maybeRunGraph( - this.config, - this.syncState, - callback - ) - } - - // Wraps the sendChangesToPipeline function - // in order to turn `update`s into `insert`s - // for keys that have not been sent to the pipeline yet - // and filter out deletes for keys that have not been sent - private sendVisibleChangesToPipeline = ( - changes: Array>, - loadedInitialState: boolean - ) => { - if (loadedInitialState) { - // There was no index for the join key - // so we loaded the initial state - // so we can safely assume that the pipeline has seen all keys - return this.sendChangesToPipeline(changes) - } - - const newChanges = [] - for (const change of changes) { - let newChange = change - if (!this.sentKeys.has(change.key)) { - if (change.type === `update`) { - newChange = { ...change, type: `insert` } - } else if (change.type === `delete`) { - // filter out deletes for keys that have not been sent - continue - } - } - newChanges.push(newChange) - } - - return this.sendChangesToPipeline(newChanges) - } - - private loadKeys( - keys: Iterable, - filterFn: (item: object) => boolean - ) { - for (const key of keys) { - // Only load the key once - if (this.sentKeys.has(key)) continue - - const value = this.collection.get(key) - if (value !== undefined && filterFn(value)) { - this.sentKeys.add(key) - this.sendChangesToPipeline([{ type: `insert`, key, value }]) - } - } - } - - private subscribeToAllChanges( - whereExpression: BasicExpression | undefined - ) { - const sendChangesToPipeline = this.sendChangesToPipeline.bind(this) - const unsubscribe = this.collection.subscribeChanges( - sendChangesToPipeline, - { - includeInitialState: true, - ...(whereExpression ? { whereExpression } : undefined), - } - ) - return unsubscribe - } - - private subscribeToMatchingChanges( - whereExpression: BasicExpression | undefined - ) { - let loadedInitialState = false - - const sendVisibleChanges = ( - changes: Array> - ) => { - this.sendVisibleChangesToPipeline(changes, loadedInitialState) - } - - const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { - whereExpression, - }) - - // Create a function that loads keys from the collection - // into the query pipeline on demand - const filterFn = whereExpression - ? createFilterFunctionFromExpression(whereExpression) - : () => true - const loadKs = (keys: Set) => { - return this.loadKeys(keys, filterFn) - } - - // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map - // This is used by the join operator to dynamically load matching keys from the lazy collection - // or to get the full initial state of the collection if there's no index for the join key - this.collectionConfigBuilder.lazyCollectionsCallbacks[this.collectionId] = { - loadKeys: loadKs, - loadInitialState: () => { - // Make sure we only load the initial state once - if (loadedInitialState) return - loadedInitialState = true - - const changes = this.collection.currentStateAsChanges({ - whereExpression, - }) - this.sendChangesToPipeline(changes) - }, - } - return unsubscribe - } - - private subscribeToOrderedChanges( - whereExpression: BasicExpression | undefined - ) { - const { offset, limit, comparator } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! - - // Load the first `offset + limit` values from the index - // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - this.loadNextItems(offset + limit) - - const sendChangesInRange = ( - changes: Iterable> - ) => { - // Split live updates into a delete of the old value and an insert of the new value - // and filter out changes that are bigger than the biggest value we've sent so far - // because they can't affect the topK - const splittedChanges = splitUpdates(changes) - const filteredChanges = filterChangesSmallerOrEqualToMax( - splittedChanges, - comparator, - this.biggest - ) - this.sendChangesToPipeline( - filteredChanges, - this.loadMoreIfNeeded.bind(this) - ) - } - - // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far - // values that are bigger don't need to be sent because they can't affect the topK - const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, { - whereExpression, - }) - - return unsubscribe - } - - // This function is called by maybeRunGraph - // after each iteration of the query pipeline - // to ensure that the orderBy operator has enough data to work with - private loadMoreIfNeeded() { - const { dataNeeded } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! - - if (!dataNeeded) { - // This should never happen because the topK operator should always set the size callback - // which in turn should lead to the orderBy operator setting the dataNeeded callback - throw new Error( - `Missing dataNeeded callback for collection ${this.collectionId}` - ) - } - - // `dataNeeded` probes the orderBy operator to see if it needs more data - // if it needs more data, it returns the number of items it needs - const n = dataNeeded() - if (n > 0) { - this.loadNextItems(n) - } - - // Indicate that we're done loading data if we didn't need to load more data - return n === 0 - } - - private sendChangesToPipelineWithTracking( - changes: Iterable> - ) { - const { comparator } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! - const trackedChanges = this.trackSentValues(changes, comparator) - this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this)) - } - - // Loads the next `n` items from the collection - // starting from the biggest item it has sent - private loadNextItems(n: number) { - const { valueExtractorForRawRow, index } = - this.collectionConfigBuilder.optimizableOrderByCollections[ - this.collectionId - ]! - const biggestSentRow = this.biggest - const biggestSentValue = biggestSentRow - ? valueExtractorForRawRow(biggestSentRow) - : biggestSentRow - // Take the `n` items after the biggest sent value - const nextOrderedKeys = index.take(n, biggestSentValue) - const nextInserts: Array> = - nextOrderedKeys.map((key) => { - return { type: `insert`, key, value: this.collection.get(key) } - }) - this.sendChangesToPipelineWithTracking(nextInserts) - } - - private getWhereClauseFromAlias( - collectionAlias: string | undefined - ): BasicExpression | undefined { - const collectionWhereClausesCache = - this.collectionConfigBuilder.collectionWhereClausesCache - if (collectionAlias && collectionWhereClausesCache) { - return collectionWhereClausesCache.get(collectionAlias) - } - return undefined - } - - private *trackSentValues( - changes: Iterable>, - comparator: (a: any, b: any) => number - ) { - for (const change of changes) { - this.sentKeys.add(change.key) - - if (!this.biggest) { - this.biggest = change.value - } else if (comparator(this.biggest, change.value) < 0) { - this.biggest = change.value - } - - yield change - } - } -} - -class CollectionConfigBuilder< - TContext extends Context, - TResult extends object = GetResult, -> { - private readonly id: string - readonly query: QueryIR - private readonly collections: Record> - - // WeakMap to store the keys of the results - // so that we can retrieve them in the getKey function - private readonly resultKeys = new WeakMap() - - // WeakMap to store the orderBy index for each result - private readonly orderByIndices = new WeakMap() - - private readonly compare?: (val1: TResult, val2: TResult) => number - - private graphCache: D2 | undefined - private inputsCache: Record> | undefined - private pipelineCache: ResultStream | undefined - public collectionWhereClausesCache: - | Map> - | undefined - - // Map of collection IDs to functions that load keys for that lazy collection - readonly lazyCollectionsCallbacks: Record = - {} - // Set of collection IDs that are lazy collections - readonly lazyCollections = new Set() - // Set of collection IDs that include an optimizable ORDER BY clause - readonly optimizableOrderByCollections: Record< - string, - OrderByOptimizationInfo - > = {} - - constructor( - private readonly config: LiveQueryCollectionConfig - ) { - // Generate a unique ID if not provided - this.id = config.id || `live-query-${++liveQueryCollectionCounter}` - - this.query = buildQueryFromConfig(config) - this.collections = extractCollectionsFromQuery(this.query) - - // Create compare function for ordering if the query has orderBy - if (this.query.orderBy && this.query.orderBy.length > 0) { - this.compare = createOrderByComparator(this.orderByIndices) - } - - // Compile the base pipeline once initially - // This is done to ensure that any errors are thrown immediately and synchronously - this.compileBasePipeline() - } - - getConfig(): CollectionConfig { - return { - id: this.id, - getKey: - this.config.getKey || - ((item) => this.resultKeys.get(item) as string | number), - sync: this.getSyncConfig(), - compare: this.compare, - gcTime: this.config.gcTime || 5000, // 5 seconds by default for live queries - schema: this.config.schema, - onInsert: this.config.onInsert, - onUpdate: this.config.onUpdate, - onDelete: this.config.onDelete, - startSync: this.config.startSync, - } - } - - // The callback function is called after the graph has run. - // This gives the callback a chance to load more data if needed, - // that's used to optimize orderBy operators that set a limit, - // in order to load some more data if we still don't have enough rows after the pipeline has run. - // That can happend because even though we load N rows, the pipeline might filter some of these rows out - // causing the orderBy operator to receive less than N rows or even no rows at all. - // So this callback would notice that it doesn't have enough rows and load some more. - // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. - maybeRunGraph( - config: Parameters[`sync`]>[0], - syncState: FullSyncState, - callback?: () => boolean - ) { - const { begin, commit, markReady } = config - - // We only run the graph if all the collections are ready - if ( - this.allCollectionsReadyOrInitialCommit() && - syncState.subscribedToAllCollections - ) { - syncState.graph.run() - const ready = callback?.() ?? true - // On the initial run, we may need to do an empty commit to ensure that - // the collection is initialized - if (syncState.messagesCount === 0) { - begin() - commit() - } - // Mark the collection as ready after the first successful run - if (ready && this.allCollectionsReady()) { - markReady() - } - } - } - - private getSyncConfig(): SyncConfig { - return { - rowUpdateMode: `full`, - sync: this.syncFn.bind(this), - } - } - - private syncFn(config: Parameters[`sync`]>[0]) { - const syncState: SyncState = { - messagesCount: 0, - subscribedToAllCollections: false, - unsubscribeCallbacks: new Set<() => void>(), - } - - // Extend the pipeline such that it applies the incoming changes to the collection - const fullSyncState = this.extendPipelineWithChangeProcessing( - config, - syncState - ) - - this.subscribeToAllCollections(config, fullSyncState) - - // Initial run - this.maybeRunGraph(config, fullSyncState) - - // Return the unsubscribe function - return () => { - syncState.unsubscribeCallbacks.forEach((unsubscribe) => unsubscribe()) - - // Reset caches so a fresh graph/pipeline is compiled on next start - // This avoids reusing a finalized D2 graph across GC restarts - this.graphCache = undefined - this.inputsCache = undefined - this.pipelineCache = undefined - this.collectionWhereClausesCache = undefined - } - } - - private compileBasePipeline() { - this.graphCache = new D2() - this.inputsCache = Object.fromEntries( - Object.entries(this.collections).map(([key]) => [ - key, - this.graphCache!.newInput(), - ]) - ) - - // Compile the query and get both pipeline and collection WHERE clauses - const { - pipeline: pipelineCache, - collectionWhereClauses: collectionWhereClausesCache, - } = compileQuery( - this.query, - this.inputsCache as Record, - this.collections, - this.lazyCollectionsCallbacks, - this.lazyCollections, - this.optimizableOrderByCollections - ) - - this.pipelineCache = pipelineCache - this.collectionWhereClausesCache = collectionWhereClausesCache - } - - private maybeCompileBasePipeline() { - if (!this.graphCache || !this.inputsCache || !this.pipelineCache) { - this.compileBasePipeline() - } - return { - graph: this.graphCache!, - inputs: this.inputsCache!, - pipeline: this.pipelineCache!, - } - } - - private extendPipelineWithChangeProcessing( - config: Parameters[`sync`]>[0], - syncState: SyncState - ): FullSyncState { - const { begin, commit } = config - const { graph, inputs, pipeline } = this.maybeCompileBasePipeline() - - pipeline.pipe( - output((data) => { - const messages = data.getInner() - syncState.messagesCount += messages.length - - begin() - messages - .reduce( - accumulateChanges, - new Map>() - ) - .forEach(this.applyChanges.bind(this, config)) - commit() - }) - ) - - graph.finalize() - - // Extend the sync state with the graph, inputs, and pipeline - syncState.graph = graph - syncState.inputs = inputs - syncState.pipeline = pipeline - - return syncState as FullSyncState - } - - private applyChanges( - config: Parameters[`sync`]>[0], - changes: { - deletes: number - inserts: number - value: TResult - orderByIndex: string | undefined - }, - key: unknown - ) { - const { write, collection } = config - const { deletes, inserts, value, orderByIndex } = changes - - // Store the key of the result so that we can retrieve it in the - // getKey function - this.resultKeys.set(value, key) - - // Store the orderBy index if it exists - if (orderByIndex !== undefined) { - this.orderByIndices.set(value, orderByIndex) - } - - // Simple singular insert. - if (inserts && deletes === 0) { - write({ - value, - type: `insert`, - }) - } else if ( - // Insert & update(s) (updates are a delete & insert) - inserts > deletes || - // Just update(s) but the item is already in the collection (so - // was inserted previously). - (inserts === deletes && collection.has(key as string | number)) - ) { - write({ - value, - type: `update`, - }) - // Only delete is left as an option - } else if (deletes > 0) { - write({ - value, - type: `delete`, - }) - } else { - throw new Error( - `Could not apply changes: ${JSON.stringify(changes)}. This should never happen.` - ) - } - } - - private allCollectionsReady() { - return Object.values(this.collections).every((collection) => - collection.isReady() - ) - } - - private allCollectionsReadyOrInitialCommit() { - return Object.values(this.collections).every( - (collection) => - collection.status === `ready` || collection.status === `initialCommit` - ) - } - - private subscribeToAllCollections( - config: Parameters[`sync`]>[0], - syncState: FullSyncState - ) { - Object.entries(this.collections).forEach(([collectionId, collection]) => { - const collectionSubscriber = new CollectionSubscriber( - collectionId, - collection, - config, - syncState, - this - ) - collectionSubscriber.subscribe() - }) - - // Mark the collections as subscribed in the sync state - syncState.subscribedToAllCollections = true - } -} /** * Creates live query collection options for use with createCollection @@ -1370,107 +654,6 @@ function bridgeToCreateCollection< > } -/** - * Helper function to send changes to a D2 input stream - */ -function sendChangesToInput( - input: RootStreamBuilder, - changes: Iterable, - getKey: (item: ChangeMessage[`value`]) => any -) { - const multiSetArray: MultiSetArray = [] - for (const change of changes) { - const key = getKey(change.value) - if (change.type === `insert`) { - multiSetArray.push([[key, change.value], 1]) - } else if (change.type === `update`) { - multiSetArray.push([[key, change.previousValue], -1]) - multiSetArray.push([[key, change.value], 1]) - } else { - // change.type === `delete` - multiSetArray.push([[key, change.value], -1]) - } - } - input.sendData(new MultiSet(multiSetArray)) -} - -function buildQueryFromConfig( - config: LiveQueryCollectionConfig -) { - // Build the query using the provided query builder function or instance - if (typeof config.query === `function`) { - return buildQuery(config.query) - } - return getQueryIR(config.query) -} - -function createOrderByComparator( - orderByIndices: WeakMap -) { - return (val1: T, val2: T): number => { - // Use the orderBy index stored in the WeakMap - const index1 = orderByIndices.get(val1) - const index2 = orderByIndices.get(val2) - - // Compare fractional indices lexicographically - if (index1 && index2) { - if (index1 < index2) { - return -1 - } else if (index1 > index2) { - return 1 - } else { - return 0 - } - } - - // Fallback to no ordering if indices are missing - return 0 - } -} - -/** - * Helper function to extract collections from a compiled query - * Traverses the query IR to find all collection references - * Maps collections by their ID (not alias) as expected by the compiler - */ -function extractCollectionsFromQuery( - query: any -): Record> { - const collections: Record = {} - - // Helper function to recursively extract collections from a query or source - function extractFromSource(source: any) { - if (source.type === `collectionRef`) { - collections[source.collection.id] = source.collection - } else if (source.type === `queryRef`) { - // Recursively extract from subquery - extractFromQuery(source.query) - } - } - - // Helper function to recursively extract collections from a query - function extractFromQuery(q: any) { - // Extract from FROM clause - if (q.from) { - extractFromSource(q.from) - } - - // Extract from JOIN clauses - if (q.join && Array.isArray(q.join)) { - for (const joinClause of q.join) { - if (joinClause.from) { - extractFromSource(joinClause.from) - } - } - } - } - - // Start extraction from the root query - extractFromQuery(query) - - return collections -} - /** * Converts WHERE expressions from the query IR into a BasicExpression for subscribeChanges * @@ -1478,112 +661,3 @@ function extractCollectionsFromQuery( * @param tableAlias The table alias used in the expressions * @returns A BasicExpression that can be used with the collection's index system */ - -/** - * Finds the alias for a collection ID in the query - */ -function findCollectionAlias( - collectionId: string, - query: any -): string | undefined { - // Check FROM clause - if ( - query.from?.type === `collectionRef` && - query.from.collection?.id === collectionId - ) { - return query.from.alias - } - - // Check JOIN clauses - if (query.join) { - for (const joinClause of query.join) { - if ( - joinClause.from?.type === `collectionRef` && - joinClause.from.collection?.id === collectionId - ) { - return joinClause.from.alias - } - } - } - - return undefined -} - -function accumulateChanges( - acc: Map>, - [[key, tupleData], multiplicity]: [ - [unknown, [any, string | undefined]], - number, - ] -) { - // All queries now consistently return [value, orderByIndex] format - // where orderByIndex is undefined for queries without ORDER BY - const [value, orderByIndex] = tupleData as [T, string | undefined] - - const changes = acc.get(key) || { - deletes: 0, - inserts: 0, - value, - orderByIndex, - } - if (multiplicity < 0) { - changes.deletes += Math.abs(multiplicity) - } else if (multiplicity > 0) { - changes.inserts += multiplicity - changes.value = value - changes.orderByIndex = orderByIndex - } - acc.set(key, changes) - return acc -} - -/** Splits updates into a delete of the old value and an insert of the new value */ -function* splitUpdates< - T extends object = Record, - TKey extends string | number = string | number, ->( - changes: Iterable> -): Generator> { - for (const change of changes) { - if (change.type === `update`) { - yield { type: `delete`, key: change.key, value: change.previousValue! } - yield { type: `insert`, key: change.key, value: change.value } - } else { - yield change - } - } -} - -function* filterChanges< - T extends object = Record, - TKey extends string | number = string | number, ->( - changes: Iterable>, - f: (change: ChangeMessage) => boolean -): Generator> { - for (const change of changes) { - if (f(change)) { - yield change - } - } -} - -/** - * Filters changes to only include those that are smaller than the provided max value - * @param changes - Iterable of changes to filter - * @param comparator - Comparator function to use for filtering - * @param maxValue - Range to filter changes within (range boundaries are exclusive) - * @returns Iterable of changes that fall within the range - */ -function* filterChangesSmallerOrEqualToMax< - T extends object = Record, - TKey extends string | number = string | number, ->( - changes: Iterable>, - comparator: (a: any, b: any) => number, - maxValue: any -): Generator> { - yield* filterChanges(changes, (change) => { - return !maxValue || comparator(change.value, maxValue) <= 0 - }) -} diff --git a/packages/db/src/query/live/CollectionConfigBuilder.ts b/packages/db/src/query/live/CollectionConfigBuilder.ts new file mode 100644 index 000000000..7725c503e --- /dev/null +++ b/packages/db/src/query/live/CollectionConfigBuilder.ts @@ -0,0 +1,425 @@ +import { D2, output } from "@tanstack/db-ivm" +import { compileQuery } from "../compiler/index.js" +import { buildQuery, getQueryIR } from "../builder/index.js" +import { CollectionSubscriber } from "./CollectionSubscriber.js" +import type { RootStreamBuilder } from "@tanstack/db-ivm" +import type { OrderByOptimizationInfo } from "../compiler/order-by.js" +import type { Collection } from "../../collection.js" +import type { + CollectionConfig, + KeyedStream, + ResultStream, + SyncConfig, +} from "../../types.js" +import type { Context, GetResult } from "../builder/types.js" +import type { BasicExpression, QueryIR } from "../ir.js" +import type { LazyCollectionCallbacks } from "../compiler/joins.js" +import type { LiveQueryCollectionConfig } from "./LiveQueryCollectionConfig.js" +import type { Changes, FullSyncState, SyncState } from "./types.js" + +// Global counter for auto-generated collection IDs +let liveQueryCollectionCounter = 0 + +export class CollectionConfigBuilder< + TContext extends Context, + TResult extends object = GetResult, +> { + private readonly id: string + readonly query: QueryIR + private readonly collections: Record> + + // WeakMap to store the keys of the results + // so that we can retrieve them in the getKey function + private readonly resultKeys = new WeakMap() + + // WeakMap to store the orderBy index for each result + private readonly orderByIndices = new WeakMap() + + private readonly compare?: (val1: TResult, val2: TResult) => number + + private graphCache: D2 | undefined + private inputsCache: Record> | undefined + private pipelineCache: ResultStream | undefined + public collectionWhereClausesCache: + | Map> + | undefined + + // Map of collection IDs to functions that load keys for that lazy collection + readonly lazyCollectionsCallbacks: Record = + {} + // Set of collection IDs that are lazy collections + readonly lazyCollections = new Set() + // Set of collection IDs that include an optimizable ORDER BY clause + readonly optimizableOrderByCollections: Record< + string, + OrderByOptimizationInfo + > = {} + + constructor( + private readonly config: LiveQueryCollectionConfig + ) { + // Generate a unique ID if not provided + this.id = config.id || `live-query-${++liveQueryCollectionCounter}` + + this.query = buildQueryFromConfig(config) + this.collections = extractCollectionsFromQuery(this.query) + + // Create compare function for ordering if the query has orderBy + if (this.query.orderBy && this.query.orderBy.length > 0) { + this.compare = createOrderByComparator(this.orderByIndices) + } + + // Compile the base pipeline once initially + // This is done to ensure that any errors are thrown immediately and synchronously + this.compileBasePipeline() + } + + getConfig(): CollectionConfig { + return { + id: this.id, + getKey: + this.config.getKey || + ((item) => this.resultKeys.get(item) as string | number), + sync: this.getSyncConfig(), + compare: this.compare, + gcTime: this.config.gcTime || 5000, // 5 seconds by default for live queries + schema: this.config.schema, + onInsert: this.config.onInsert, + onUpdate: this.config.onUpdate, + onDelete: this.config.onDelete, + startSync: this.config.startSync, + } + } + + // The callback function is called after the graph has run. + // This gives the callback a chance to load more data if needed, + // that's used to optimize orderBy operators that set a limit, + // in order to load some more data if we still don't have enough rows after the pipeline has run. + // That can happend because even though we load N rows, the pipeline might filter some of these rows out + // causing the orderBy operator to receive less than N rows or even no rows at all. + // So this callback would notice that it doesn't have enough rows and load some more. + // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. + maybeRunGraph( + config: Parameters[`sync`]>[0], + syncState: FullSyncState, + callback?: () => boolean + ) { + const { begin, commit, markReady } = config + + // We only run the graph if all the collections are ready + if ( + this.allCollectionsReadyOrInitialCommit() && + syncState.subscribedToAllCollections + ) { + syncState.graph.run() + const ready = callback?.() ?? true + // On the initial run, we may need to do an empty commit to ensure that + // the collection is initialized + if (syncState.messagesCount === 0) { + begin() + commit() + } + // Mark the collection as ready after the first successful run + if (ready && this.allCollectionsReady()) { + markReady() + } + } + } + + private getSyncConfig(): SyncConfig { + return { + rowUpdateMode: `full`, + sync: this.syncFn.bind(this), + } + } + + private syncFn(config: Parameters[`sync`]>[0]) { + const syncState: SyncState = { + messagesCount: 0, + subscribedToAllCollections: false, + unsubscribeCallbacks: new Set<() => void>(), + } + + // Extend the pipeline such that it applies the incoming changes to the collection + const fullSyncState = this.extendPipelineWithChangeProcessing( + config, + syncState + ) + + this.subscribeToAllCollections(config, fullSyncState) + + // Initial run + this.maybeRunGraph(config, fullSyncState) + + // Return the unsubscribe function + return () => { + syncState.unsubscribeCallbacks.forEach((unsubscribe) => unsubscribe()) + + // Reset caches so a fresh graph/pipeline is compiled on next start + // This avoids reusing a finalized D2 graph across GC restarts + this.graphCache = undefined + this.inputsCache = undefined + this.pipelineCache = undefined + this.collectionWhereClausesCache = undefined + } + } + + private compileBasePipeline() { + this.graphCache = new D2() + this.inputsCache = Object.fromEntries( + Object.entries(this.collections).map(([key]) => [ + key, + this.graphCache!.newInput(), + ]) + ) + + // Compile the query and get both pipeline and collection WHERE clauses + const { + pipeline: pipelineCache, + collectionWhereClauses: collectionWhereClausesCache, + } = compileQuery( + this.query, + this.inputsCache as Record, + this.collections, + this.lazyCollectionsCallbacks, + this.lazyCollections, + this.optimizableOrderByCollections + ) + + this.pipelineCache = pipelineCache + this.collectionWhereClausesCache = collectionWhereClausesCache + } + + private maybeCompileBasePipeline() { + if (!this.graphCache || !this.inputsCache || !this.pipelineCache) { + this.compileBasePipeline() + } + return { + graph: this.graphCache!, + inputs: this.inputsCache!, + pipeline: this.pipelineCache!, + } + } + + private extendPipelineWithChangeProcessing( + config: Parameters[`sync`]>[0], + syncState: SyncState + ): FullSyncState { + const { begin, commit } = config + const { graph, inputs, pipeline } = this.maybeCompileBasePipeline() + + pipeline.pipe( + output((data) => { + const messages = data.getInner() + syncState.messagesCount += messages.length + + begin() + messages + .reduce( + accumulateChanges, + new Map>() + ) + .forEach(this.applyChanges.bind(this, config)) + commit() + }) + ) + + graph.finalize() + + // Extend the sync state with the graph, inputs, and pipeline + syncState.graph = graph + syncState.inputs = inputs + syncState.pipeline = pipeline + + return syncState as FullSyncState + } + + private applyChanges( + config: Parameters[`sync`]>[0], + changes: { + deletes: number + inserts: number + value: TResult + orderByIndex: string | undefined + }, + key: unknown + ) { + const { write, collection } = config + const { deletes, inserts, value, orderByIndex } = changes + + // Store the key of the result so that we can retrieve it in the + // getKey function + this.resultKeys.set(value, key) + + // Store the orderBy index if it exists + if (orderByIndex !== undefined) { + this.orderByIndices.set(value, orderByIndex) + } + + // Simple singular insert. + if (inserts && deletes === 0) { + write({ + value, + type: `insert`, + }) + } else if ( + // Insert & update(s) (updates are a delete & insert) + inserts > deletes || + // Just update(s) but the item is already in the collection (so + // was inserted previously). + (inserts === deletes && collection.has(key as string | number)) + ) { + write({ + value, + type: `update`, + }) + // Only delete is left as an option + } else if (deletes > 0) { + write({ + value, + type: `delete`, + }) + } else { + throw new Error( + `Could not apply changes: ${JSON.stringify(changes)}. This should never happen.` + ) + } + } + + private allCollectionsReady() { + return Object.values(this.collections).every((collection) => + collection.isReady() + ) + } + + private allCollectionsReadyOrInitialCommit() { + return Object.values(this.collections).every( + (collection) => + collection.status === `ready` || collection.status === `initialCommit` + ) + } + + private subscribeToAllCollections( + config: Parameters[`sync`]>[0], + syncState: FullSyncState + ) { + Object.entries(this.collections).forEach(([collectionId, collection]) => { + const collectionSubscriber = new CollectionSubscriber( + collectionId, + collection, + config, + syncState, + this + ) + collectionSubscriber.subscribe() + }) + + // Mark the collections as subscribed in the sync state + syncState.subscribedToAllCollections = true + } +} + +function buildQueryFromConfig( + config: LiveQueryCollectionConfig +) { + // Build the query using the provided query builder function or instance + if (typeof config.query === `function`) { + return buildQuery(config.query) + } + return getQueryIR(config.query) +} + +function createOrderByComparator( + orderByIndices: WeakMap +) { + return (val1: T, val2: T): number => { + // Use the orderBy index stored in the WeakMap + const index1 = orderByIndices.get(val1) + const index2 = orderByIndices.get(val2) + + // Compare fractional indices lexicographically + if (index1 && index2) { + if (index1 < index2) { + return -1 + } else if (index1 > index2) { + return 1 + } else { + return 0 + } + } + + // Fallback to no ordering if indices are missing + return 0 + } +} + +/** + * Helper function to extract collections from a compiled query + * Traverses the query IR to find all collection references + * Maps collections by their ID (not alias) as expected by the compiler + */ +function extractCollectionsFromQuery( + query: any +): Record> { + const collections: Record = {} + + // Helper function to recursively extract collections from a query or source + function extractFromSource(source: any) { + if (source.type === `collectionRef`) { + collections[source.collection.id] = source.collection + } else if (source.type === `queryRef`) { + // Recursively extract from subquery + extractFromQuery(source.query) + } + } + + // Helper function to recursively extract collections from a query + function extractFromQuery(q: any) { + // Extract from FROM clause + if (q.from) { + extractFromSource(q.from) + } + + // Extract from JOIN clauses + if (q.join && Array.isArray(q.join)) { + for (const joinClause of q.join) { + if (joinClause.from) { + extractFromSource(joinClause.from) + } + } + } + } + + // Start extraction from the root query + extractFromQuery(query) + + return collections +} + +function accumulateChanges( + acc: Map>, + [[key, tupleData], multiplicity]: [ + [unknown, [any, string | undefined]], + number, + ] +) { + // All queries now consistently return [value, orderByIndex] format + // where orderByIndex is undefined for queries without ORDER BY + const [value, orderByIndex] = tupleData as [T, string | undefined] + + const changes = acc.get(key) || { + deletes: 0, + inserts: 0, + value, + orderByIndex, + } + if (multiplicity < 0) { + changes.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + changes.inserts += multiplicity + changes.value = value + changes.orderByIndex = orderByIndex + } + acc.set(key, changes) + return acc +} diff --git a/packages/db/src/query/live/CollectionSubscriber.ts b/packages/db/src/query/live/CollectionSubscriber.ts new file mode 100644 index 000000000..2c2c1d15f --- /dev/null +++ b/packages/db/src/query/live/CollectionSubscriber.ts @@ -0,0 +1,426 @@ +import { MultiSet } from "@tanstack/db-ivm" +import { createFilterFunctionFromExpression } from "../../change-events.js" +import { convertToBasicExpression } from "../compiler/expressions.js" +import type { FullSyncState } from "./types.js" +import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" +import type { Collection } from "../../collection.js" +import type { ChangeMessage, SyncConfig } from "../../types.js" +import type { Context, GetResult } from "../builder/types.js" +import type { BasicExpression } from "../ir.js" +import type { CollectionConfigBuilder } from "./CollectionConfigBuilder.js" + +export class CollectionSubscriber< + TContext extends Context, + TResult extends object = GetResult, +> { + // Keep track of the keys we've sent (needed for join and orderBy optimizations) + private sentKeys = new Set() + + // Keep track of the biggest value we've sent so far (needed for orderBy optimization) + private biggest: any = undefined + + constructor( + private collectionId: string, + private collection: Collection, + private config: Parameters[`sync`]>[0], + private syncState: FullSyncState, + private collectionConfigBuilder: CollectionConfigBuilder + ) {} + + subscribe() { + const collectionAlias = findCollectionAlias( + this.collectionId, + this.collectionConfigBuilder.query + ) + const whereClause = this.getWhereClauseFromAlias(collectionAlias) + + if (whereClause) { + // Convert WHERE clause to BasicExpression format for collection subscription + const whereExpression = convertToBasicExpression( + whereClause, + collectionAlias! + ) + + if (whereExpression) { + // Use index optimization for this collection + this.subscribeToChanges(whereExpression) + } else { + // This should not happen - if we have a whereClause but can't create whereExpression, + // it indicates a bug in our optimization logic + throw new Error( + `Failed to convert WHERE clause to collection filter for collection '${this.collectionId}'. ` + + `This indicates a bug in the query optimization logic.` + ) + } + } else { + // No WHERE clause for this collection, use regular subscription + this.subscribeToChanges() + } + } + + private subscribeToChanges(whereExpression?: BasicExpression) { + let unsubscribe: () => void + if (this.collectionConfigBuilder.lazyCollections.has(this.collectionId)) { + unsubscribe = this.subscribeToMatchingChanges(whereExpression) + } else if ( + Object.hasOwn( + this.collectionConfigBuilder.optimizableOrderByCollections, + this.collectionId + ) + ) { + unsubscribe = this.subscribeToOrderedChanges(whereExpression) + } else { + unsubscribe = this.subscribeToAllChanges(whereExpression) + } + this.syncState.unsubscribeCallbacks.add(unsubscribe) + } + + private sendChangesToPipeline( + changes: Iterable>, + callback?: () => boolean + ) { + const input = this.syncState.inputs[this.collectionId]! + sendChangesToInput(input, changes, this.collection.config.getKey) + this.collectionConfigBuilder.maybeRunGraph( + this.config, + this.syncState, + callback + ) + } + + // Wraps the sendChangesToPipeline function + // in order to turn `update`s into `insert`s + // for keys that have not been sent to the pipeline yet + // and filter out deletes for keys that have not been sent + private sendVisibleChangesToPipeline = ( + changes: Array>, + loadedInitialState: boolean + ) => { + if (loadedInitialState) { + // There was no index for the join key + // so we loaded the initial state + // so we can safely assume that the pipeline has seen all keys + return this.sendChangesToPipeline(changes) + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!this.sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert` } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + } + newChanges.push(newChange) + } + + return this.sendChangesToPipeline(newChanges) + } + + private loadKeys( + keys: Iterable, + filterFn: (item: object) => boolean + ) { + for (const key of keys) { + // Only load the key once + if (this.sentKeys.has(key)) continue + + const value = this.collection.get(key) + if (value !== undefined && filterFn(value)) { + this.sentKeys.add(key) + this.sendChangesToPipeline([{ type: `insert`, key, value }]) + } + } + } + + private subscribeToAllChanges( + whereExpression: BasicExpression | undefined + ) { + const sendChangesToPipeline = this.sendChangesToPipeline.bind(this) + const unsubscribe = this.collection.subscribeChanges( + sendChangesToPipeline, + { + includeInitialState: true, + ...(whereExpression ? { whereExpression } : undefined), + } + ) + return unsubscribe + } + + private subscribeToMatchingChanges( + whereExpression: BasicExpression | undefined + ) { + let loadedInitialState = false + + const sendVisibleChanges = ( + changes: Array> + ) => { + this.sendVisibleChangesToPipeline(changes, loadedInitialState) + } + + const unsubscribe = this.collection.subscribeChanges(sendVisibleChanges, { + whereExpression, + }) + + // Create a function that loads keys from the collection + // into the query pipeline on demand + const filterFn = whereExpression + ? createFilterFunctionFromExpression(whereExpression) + : () => true + const loadKs = (keys: Set) => { + return this.loadKeys(keys, filterFn) + } + + // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map + // This is used by the join operator to dynamically load matching keys from the lazy collection + // or to get the full initial state of the collection if there's no index for the join key + this.collectionConfigBuilder.lazyCollectionsCallbacks[this.collectionId] = { + loadKeys: loadKs, + loadInitialState: () => { + // Make sure we only load the initial state once + if (loadedInitialState) return + loadedInitialState = true + + const changes = this.collection.currentStateAsChanges({ + whereExpression, + }) + this.sendChangesToPipeline(changes) + }, + } + return unsubscribe + } + + private subscribeToOrderedChanges( + whereExpression: BasicExpression | undefined + ) { + const { offset, limit, comparator } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + + // Load the first `offset + limit` values from the index + // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ + this.loadNextItems(offset + limit) + + const sendChangesInRange = ( + changes: Iterable> + ) => { + // Split live updates into a delete of the old value and an insert of the new value + // and filter out changes that are bigger than the biggest value we've sent so far + // because they can't affect the topK + const splittedChanges = splitUpdates(changes) + const filteredChanges = filterChangesSmallerOrEqualToMax( + splittedChanges, + comparator, + this.biggest + ) + this.sendChangesToPipeline( + filteredChanges, + this.loadMoreIfNeeded.bind(this) + ) + } + + // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far + // values that are bigger don't need to be sent because they can't affect the topK + const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, { + whereExpression, + }) + + return unsubscribe + } + + // This function is called by maybeRunGraph + // after each iteration of the query pipeline + // to ensure that the orderBy operator has enough data to work with + private loadMoreIfNeeded() { + const { dataNeeded } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + + if (!dataNeeded) { + // This should never happen because the topK operator should always set the size callback + // which in turn should lead to the orderBy operator setting the dataNeeded callback + throw new Error( + `Missing dataNeeded callback for collection ${this.collectionId}` + ) + } + + // `dataNeeded` probes the orderBy operator to see if it needs more data + // if it needs more data, it returns the number of items it needs + const n = dataNeeded() + if (n > 0) { + this.loadNextItems(n) + } + + // Indicate that we're done loading data if we didn't need to load more data + return n === 0 + } + + private sendChangesToPipelineWithTracking( + changes: Iterable> + ) { + const { comparator } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + const trackedChanges = this.trackSentValues(changes, comparator) + this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this)) + } + + // Loads the next `n` items from the collection + // starting from the biggest item it has sent + private loadNextItems(n: number) { + const { valueExtractorForRawRow, index } = + this.collectionConfigBuilder.optimizableOrderByCollections[ + this.collectionId + ]! + const biggestSentRow = this.biggest + const biggestSentValue = biggestSentRow + ? valueExtractorForRawRow(biggestSentRow) + : biggestSentRow + // Take the `n` items after the biggest sent value + const nextOrderedKeys = index.take(n, biggestSentValue) + const nextInserts: Array> = + nextOrderedKeys.map((key) => { + return { type: `insert`, key, value: this.collection.get(key) } + }) + this.sendChangesToPipelineWithTracking(nextInserts) + } + + private getWhereClauseFromAlias( + collectionAlias: string | undefined + ): BasicExpression | undefined { + const collectionWhereClausesCache = + this.collectionConfigBuilder.collectionWhereClausesCache + if (collectionAlias && collectionWhereClausesCache) { + return collectionWhereClausesCache.get(collectionAlias) + } + return undefined + } + + private *trackSentValues( + changes: Iterable>, + comparator: (a: any, b: any) => number + ) { + for (const change of changes) { + this.sentKeys.add(change.key) + + if (!this.biggest) { + this.biggest = change.value + } else if (comparator(this.biggest, change.value) < 0) { + this.biggest = change.value + } + + yield change + } + } +} + +/** + * Finds the alias for a collection ID in the query + */ +function findCollectionAlias( + collectionId: string, + query: any +): string | undefined { + // Check FROM clause + if ( + query.from?.type === `collectionRef` && + query.from.collection?.id === collectionId + ) { + return query.from.alias + } + + // Check JOIN clauses + if (query.join) { + for (const joinClause of query.join) { + if ( + joinClause.from?.type === `collectionRef` && + joinClause.from.collection?.id === collectionId + ) { + return joinClause.from.alias + } + } + } + + return undefined +} + +/** + * Helper function to send changes to a D2 input stream + */ +function sendChangesToInput( + input: RootStreamBuilder, + changes: Iterable, + getKey: (item: ChangeMessage[`value`]) => any +) { + const multiSetArray: MultiSetArray = [] + for (const change of changes) { + const key = getKey(change.value) + if (change.type === `insert`) { + multiSetArray.push([[key, change.value], 1]) + } else if (change.type === `update`) { + multiSetArray.push([[key, change.previousValue], -1]) + multiSetArray.push([[key, change.value], 1]) + } else { + // change.type === `delete` + multiSetArray.push([[key, change.value], -1]) + } + } + input.sendData(new MultiSet(multiSetArray)) +} + +/** Splits updates into a delete of the old value and an insert of the new value */ +function* splitUpdates< + T extends object = Record, + TKey extends string | number = string | number, +>( + changes: Iterable> +): Generator> { + for (const change of changes) { + if (change.type === `update`) { + yield { type: `delete`, key: change.key, value: change.previousValue! } + yield { type: `insert`, key: change.key, value: change.value } + } else { + yield change + } + } +} + +function* filterChanges< + T extends object = Record, + TKey extends string | number = string | number, +>( + changes: Iterable>, + f: (change: ChangeMessage) => boolean +): Generator> { + for (const change of changes) { + if (f(change)) { + yield change + } + } +} + +/** + * Filters changes to only include those that are smaller than the provided max value + * @param changes - Iterable of changes to filter + * @param comparator - Comparator function to use for filtering + * @param maxValue - Range to filter changes within (range boundaries are exclusive) + * @returns Iterable of changes that fall within the range + */ +function* filterChangesSmallerOrEqualToMax< + T extends object = Record, + TKey extends string | number = string | number, +>( + changes: Iterable>, + comparator: (a: any, b: any) => number, + maxValue: any +): Generator> { + yield* filterChanges(changes, (change) => { + return !maxValue || comparator(change.value, maxValue) <= 0 + }) +} diff --git a/packages/db/src/query/live/LiveQueryCollectionConfig.ts b/packages/db/src/query/live/LiveQueryCollectionConfig.ts new file mode 100644 index 000000000..02df0d36d --- /dev/null +++ b/packages/db/src/query/live/LiveQueryCollectionConfig.ts @@ -0,0 +1,73 @@ +import type { InitialQueryBuilder, QueryBuilder } from "../builder/index.js" +import type { CollectionConfig } from "../../types.js" +import type { Context, GetResult } from "../builder/types.js" + +/** + * Configuration interface for live query collection options + * + * @example + * ```typescript + * const config: LiveQueryCollectionConfig = { + * // id is optional - will auto-generate "live-query-1", "live-query-2", etc. + * query: (q) => q + * .from({ comment: commentsCollection }) + * .join( + * { user: usersCollection }, + * ({ comment, user }) => eq(comment.user_id, user.id) + * ) + * .where(({ comment }) => eq(comment.active, true)) + * .select(({ comment, user }) => ({ + * id: comment.id, + * content: comment.content, + * authorName: user.name, + * })), + * // getKey is optional - defaults to using stream key + * getKey: (item) => item.id, + * } + * ``` + */ +export interface LiveQueryCollectionConfig< + TContext extends Context, + TResult extends object = GetResult & object, +> { + /** + * Unique identifier for the collection + * If not provided, defaults to `live-query-${number}` with auto-incrementing number + */ + id?: string + + /** + * Query builder function that defines the live query + */ + query: + | ((q: InitialQueryBuilder) => QueryBuilder) + | QueryBuilder + + /** + * Function to extract the key from result items + * If not provided, defaults to using the key from the D2 stream + */ + getKey?: (item: TResult) => string | number + + /** + * Optional schema for validation + */ + schema?: CollectionConfig[`schema`] + + /** + * Optional mutation handlers + */ + onInsert?: CollectionConfig[`onInsert`] + onUpdate?: CollectionConfig[`onUpdate`] + onDelete?: CollectionConfig[`onDelete`] + + /** + * Start sync / the query immediately + */ + startSync?: boolean + + /** + * GC time for the collection + */ + gcTime?: number +} diff --git a/packages/db/src/query/live/types.ts b/packages/db/src/query/live/types.ts new file mode 100644 index 000000000..6c382ff30 --- /dev/null +++ b/packages/db/src/query/live/types.ts @@ -0,0 +1,21 @@ +import type { D2, RootStreamBuilder } from "@tanstack/db-ivm" +import type { ResultStream } from "../../types.js" + +export type Changes = { + deletes: number + inserts: number + value: T + orderByIndex: string | undefined +} + +export type SyncState = { + messagesCount: number + subscribedToAllCollections: boolean + unsubscribeCallbacks: Set<() => void> + + graph?: D2 + inputs?: Record> + pipeline?: ResultStream +} + +export type FullSyncState = Required From be04c52aa0726f854ddac49959df053bf991e911 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 14:20:15 +0200 Subject: [PATCH 08/11] Remove orphaned docstring --- packages/db/src/query/live-query-collection.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index a085021a4..a771c753d 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -653,11 +653,3 @@ function bridgeToCreateCollection< TUtils > } - -/** - * Converts WHERE expressions from the query IR into a BasicExpression for subscribeChanges - * - * @param whereExpressions Array of WHERE expressions to convert - * @param tableAlias The table alias used in the expressions - * @returns A BasicExpression that can be used with the collection's index system - */ From 67f83e12ad91db6ce5724d4db112a4026a9b0a98 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 14:22:23 +0200 Subject: [PATCH 09/11] Remove original code that was commented out --- .../db/src/query/live-query-collection.ts | 508 ------------------ 1 file changed, 508 deletions(-) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index a771c753d..6992f45a3 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -36,514 +36,6 @@ export function liveQueryCollectionOptions< >( config: LiveQueryCollectionConfig ): CollectionConfig { - /* - // Generate a unique ID if not provided - const id = config.id || `live-query-${++liveQueryCollectionCounter}` - - // Build the query - const query = buildQueryFromConfig(config) - - // WeakMap to store the keys of the results - // so that we can retrieve them in the getKey function - const resultKeys = new WeakMap() - - // WeakMap to store the orderBy index for each result - const orderByIndices = new WeakMap() - - // Create compare function for ordering if the query has orderBy - const compare = - query.orderBy && query.orderBy.length > 0 - ? createOrderByComparator(orderByIndices) - : undefined - - const collections = extractCollectionsFromQuery(query) - - const allCollectionsReady = () => { - return Object.values(collections).every((collection) => - collection.isReady() - ) - } - - const allCollectionsReadyOrInitialCommit = () => { - return Object.values(collections).every( - (collection) => - collection.status === `ready` || collection.status === `initialCommit` - ) - } - - let graphCache: D2 | undefined - let inputsCache: Record> | undefined - let pipelineCache: ResultStream | undefined - let collectionWhereClausesCache: - | Map> - | undefined - - // Map of collection IDs to functions that load keys for that lazy collection - const lazyCollectionsCallbacks: Record = {} - // Set of collection IDs that are lazy collections - const lazyCollections = new Set() - // Set of collection IDs that include an optimizable ORDER BY clause - const optimizableOrderByCollections: Record = - {} - - const compileBasePipeline = () => { - graphCache = new D2() - inputsCache = Object.fromEntries( - Object.entries(collections).map(([key]) => [ - key, - graphCache!.newInput(), - ]) - ) - - // Compile the query and get both pipeline and collection WHERE clauses - ;({ - pipeline: pipelineCache, - collectionWhereClauses: collectionWhereClausesCache, - } = compileQuery( - query, - inputsCache as Record, - collections, - lazyCollectionsCallbacks, - lazyCollections, - optimizableOrderByCollections - )) - } - - const maybeCompileBasePipeline = () => { - if (!graphCache || !inputsCache || !pipelineCache) { - compileBasePipeline() - } - return { - graph: graphCache!, - inputs: inputsCache!, - pipeline: pipelineCache!, - } - } - - // Compile the base pipeline once initially - // This is done to ensure that any errors are thrown immediately and synchronously - compileBasePipeline() - - // Create the sync configuration - const sync: SyncConfig = { - rowUpdateMode: `full`, - sync: ({ begin, write, commit, markReady, collection: theCollection }) => { - const { graph, inputs, pipeline } = maybeCompileBasePipeline() - let messagesCount = 0 - pipeline.pipe( - output((data) => { - const messages = data.getInner() - messagesCount += messages.length - - begin() - messages - .reduce((acc, [[key, tupleData], multiplicity]) => { - // All queries now consistently return [value, orderByIndex] format - // where orderByIndex is undefined for queries without ORDER BY - const [value, orderByIndex] = tupleData as [ - TResult, - string | undefined, - ] - - const changes = acc.get(key) || { - deletes: 0, - inserts: 0, - value, - orderByIndex, - } - if (multiplicity < 0) { - changes.deletes += Math.abs(multiplicity) - } else if (multiplicity > 0) { - changes.inserts += multiplicity - changes.value = value - changes.orderByIndex = orderByIndex - } - acc.set(key, changes) - return acc - }, new Map()) - .forEach((changes, rawKey) => { - const { deletes, inserts, value, orderByIndex } = changes - - // Store the key of the result so that we can retrieve it in the - // getKey function - resultKeys.set(value, rawKey) - - // Store the orderBy index if it exists - if (orderByIndex !== undefined) { - orderByIndices.set(value, orderByIndex) - } - - // Simple singular insert. - if (inserts && deletes === 0) { - write({ - value, - type: `insert`, - }) - } else if ( - // Insert & update(s) (updates are a delete & insert) - inserts > deletes || - // Just update(s) but the item is already in the collection (so - // was inserted previously). - (inserts === deletes && - theCollection.has(rawKey as string | number)) - ) { - write({ - value, - type: `update`, - }) - // Only delete is left as an option - } else if (deletes > 0) { - write({ - value, - type: `delete`, - }) - } else { - throw new Error( - `This should never happen ${JSON.stringify(changes)}` - ) - } - }) - commit() - }) - ) - - graph.finalize() - - let subscribedToAllCollections = false - - // The callback function is called after the graph has run. - // This gives the callback a chance to load more data if needed, - // that's used to optimize orderBy operators that set a limit, - // in order to load some more data if we still don't have enough rows after the pipeline has run. - // That can happend because even though we load N rows, the pipeline might filter some of these rows out - // causing the orderBy operator to receive less than N rows or even no rows at all. - // So this callback would notice that it doesn't have enough rows and load some more. - // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. - const maybeRunGraph = (callback?: () => boolean) => { - // We only run the graph if all the collections are ready - if ( - allCollectionsReadyOrInitialCommit() && - subscribedToAllCollections - ) { - graph.run() - const ready = callback?.() ?? true - // On the initial run, we may need to do an empty commit to ensure that - // the collection is initialized - if (messagesCount === 0) { - begin() - commit() - } - // Mark the collection as ready after the first successful run - if (ready && allCollectionsReady()) { - markReady() - } - } - } - - // Unsubscribe callbacks - const unsubscribeCallbacks = new Set<() => void>() - - // Subscribe to all collections, using WHERE clause optimization when available - Object.entries(collections).forEach(([collectionId, collection]) => { - const input = inputs[collectionId]! - const collectionAlias = findCollectionAlias(collectionId, query) - const whereClause = - collectionAlias && collectionWhereClausesCache - ? collectionWhereClausesCache.get(collectionAlias) - : undefined - - const sendChangesToPipeline = ( - changes: Iterable>, - callback?: () => boolean - ) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph(callback) - } - - // Wraps the sendChangesToPipeline function - // in order to turn `update`s into `insert`s - // for keys that have not been sent to the pipeline yet - // and filter out deletes for keys that have not been sent - const sendVisibleChangesToPipeline = ( - changes: Array>, - loadedInitialState: boolean, - sentKeys: Set - ) => { - if (loadedInitialState) { - // There was no index for the join key - // so we loaded the initial state - // so we can safely assume that the pipeline has seen all keys - return sendChangesToPipeline(changes) - } - - const newChanges = [] - for (const change of changes) { - let newChange = change - if (!sentKeys.has(change.key)) { - if (change.type === `update`) { - newChange = { ...change, type: `insert` } - } else if (change.type === `delete`) { - // filter out deletes for keys that have not been sent - continue - } - } - newChanges.push(newChange) - } - - return sendChangesToPipeline(newChanges) - } - - const loadKeys = ( - keys: Iterable, - sentKeys: Set, - filterFn: (item: object) => boolean - ) => { - for (const key of keys) { - // Only load the key once - if (sentKeys.has(key)) continue - - const value = collection.get(key) - if (value !== undefined && filterFn(value)) { - sentKeys.add(key) - sendChangesToPipeline([{ type: `insert`, key, value }]) - } - } - } - - const subscribeToAllChanges = ( - whereExpression: BasicExpression | undefined - ) => { - const unsubscribe = collection.subscribeChanges( - sendChangesToPipeline, - { - includeInitialState: true, - ...(whereExpression ? { whereExpression } : undefined), - } - ) - return unsubscribe - } - - // Subscribes to all changes but without the initial state - // such that we can load keys from the initial state on demand - // based on the matching keys from the main collection in the join - const subscribeToMatchingChanges = ( - whereExpression: BasicExpression | undefined - ) => { - let loadedInitialState = false - const sentKeys = new Set() - - const sendVisibleChanges = ( - changes: Array> - ) => { - sendVisibleChangesToPipeline(changes, loadedInitialState, sentKeys) - } - - const unsubscribe = collection.subscribeChanges(sendVisibleChanges, { - whereExpression, - }) - - // Create a function that loads keys from the collection - // into the query pipeline on demand - const filterFn = whereExpression - ? createFilterFunctionFromExpression(whereExpression) - : () => true - const loadKs = (keys: Set) => { - return loadKeys(keys, sentKeys, filterFn) - } - - // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map - // This is used by the join operator to dynamically load matching keys from the lazy collection - // or to get the full initial state of the collection if there's no index for the join key - lazyCollectionsCallbacks[collectionId] = { - loadKeys: loadKs, - loadInitialState: () => { - // Make sure we only load the initial state once - if (loadedInitialState) return - loadedInitialState = true - - const changes = collection.currentStateAsChanges({ - whereExpression, - }) - sendChangesToPipeline(changes) - }, - } - return unsubscribe - } - - const subscribeToOrderedChanges = ( - whereExpression: BasicExpression | undefined - ) => { - const { - offset, - limit, - comparator, - index, - dataNeeded, - valueExtractorForRawRow, - } = optimizableOrderByCollections[collectionId]! - - if (!dataNeeded) { - // This should never happen because the topK operator should always set the size callback - // which in turn should lead to the orderBy operator setting the dataNeeded callback - throw new Error( - `Missing dataNeeded callback for collection ${collectionId}` - ) - } - - // This function is called by maybeRunGraph - // after each iteration of the query pipeline - // to ensure that the orderBy operator has enough data to work with - const loadMoreIfNeeded = () => { - // `dataNeeded` probes the orderBy operator to see if it needs more data - // if it needs more data, it returns the number of items it needs - const n = dataNeeded() - if (n > 0) { - loadNextItems(n) - } - - // Indicate that we're done loading data if we didn't need to load more data - return n === 0 - } - - // Keep track of the keys we've sent - // and also the biggest value we've sent so far - const sentValuesInfo: { - sentKeys: Set - biggest: any - } = { - sentKeys: new Set(), - biggest: undefined, - } - - const sendChangesToPipelineWithTracking = ( - changes: Iterable> - ) => { - const trackedChanges = trackSentValues( - changes, - comparator, - sentValuesInfo - ) - sendChangesToPipeline(trackedChanges, loadMoreIfNeeded) - } - - // Loads the next `n` items from the collection - // starting from the biggest item it has sent - const loadNextItems = (n: number) => { - const biggestSentRow = sentValuesInfo.biggest - const biggestSentValue = biggestSentRow - ? valueExtractorForRawRow(biggestSentRow) - : biggestSentRow - // Take the `n` items after the biggest sent value - const nextOrderedKeys = index.take(n, biggestSentValue) - const nextInserts: Array> = - nextOrderedKeys.map((key) => { - return { type: `insert`, key, value: collection.get(key) } - }) - sendChangesToPipelineWithTracking(nextInserts) - } - - // Load the first `offset + limit` values from the index - // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - loadNextItems(offset + limit) - - const sendChangesInRange = ( - changes: Iterable> - ) => { - // Split live updates into a delete of the old value and an insert of the new value - // and filter out changes that are bigger than the biggest value we've sent so far - // because they can't affect the topK - const splittedChanges = splitUpdates(changes) - const filteredChanges = filterChangesSmallerOrEqualToMax( - splittedChanges, - comparator, - sentValuesInfo.biggest - ) - sendChangesToPipeline(filteredChanges, loadMoreIfNeeded) - } - - // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far - // values that are bigger don't need to be sent because they can't affect the topK - const unsubscribe = collection.subscribeChanges(sendChangesInRange, { - whereExpression, - }) - - return unsubscribe - } - - const subscribeToChanges = ( - whereExpression?: BasicExpression - ) => { - let unsubscribe: () => void - if (lazyCollections.has(collectionId)) { - unsubscribe = subscribeToMatchingChanges(whereExpression) - } else if ( - Object.hasOwn(optimizableOrderByCollections, collectionId) - ) { - unsubscribe = subscribeToOrderedChanges(whereExpression) - } else { - unsubscribe = subscribeToAllChanges(whereExpression) - } - unsubscribeCallbacks.add(unsubscribe) - } - - if (whereClause) { - // Convert WHERE clause to BasicExpression format for collection subscription - const whereExpression = convertToBasicExpression( - whereClause, - collectionAlias! - ) - - if (whereExpression) { - // Use index optimization for this collection - subscribeToChanges(whereExpression) - } else { - // This should not happen - if we have a whereClause but can't create whereExpression, - // it indicates a bug in our optimization logic - throw new Error( - `Failed to convert WHERE clause to collection filter for collection '${collectionId}'. ` + - `This indicates a bug in the query optimization logic.` - ) - } - } else { - // No WHERE clause for this collection, use regular subscription - subscribeToChanges() - } - }) - - subscribedToAllCollections = true - - // Initial run - maybeRunGraph() - - // Return the unsubscribe function - return () => { - unsubscribeCallbacks.forEach((unsubscribe) => unsubscribe()) - - // Reset caches so a fresh graph/pipeline is compiled on next start - // This avoids reusing a finalized D2 graph across GC restarts - graphCache = undefined - inputsCache = undefined - pipelineCache = undefined - collectionWhereClausesCache = undefined - } - }, - } - - // Return collection configuration - return { - id, - getKey: - config.getKey || ((item) => resultKeys.get(item) as string | number), - sync, - compare, - gcTime: config.gcTime || 5000, // 5 seconds by default for live queries - schema: config.schema, - onInsert: config.onInsert, - onUpdate: config.onUpdate, - onDelete: config.onDelete, - startSync: config.startSync, - } - */ const collectionConfigBuilder = new CollectionConfigBuilder< TContext, TResult From 5e1dabeaa9379976f708eec3cf31fc0869519e16 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 21 Aug 2025 14:28:57 +0200 Subject: [PATCH 10/11] Changeset --- .changeset/polite-peaches-relax.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/polite-peaches-relax.md diff --git a/.changeset/polite-peaches-relax.md b/.changeset/polite-peaches-relax.md new file mode 100644 index 000000000..f46fc8d52 --- /dev/null +++ b/.changeset/polite-peaches-relax.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db": patch +--- + +Refactor live query collection From 6964eab081ef85be0101650606b34d0a835c576f Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 26 Aug 2025 09:51:13 +0200 Subject: [PATCH 11/11] Address Sam's feedback --- packages/db/src/query/index.ts | 2 +- .../db/src/query/live-query-collection.ts | 4 +- .../query/live/LiveQueryCollectionConfig.ts | 73 ------------------ ...uilder.ts => collection-config-builder.ts} | 10 ++- ...Subscriber.ts => collection-subscriber.ts} | 2 +- packages/db/src/query/live/types.ts | 74 ++++++++++++++++++- 6 files changed, 84 insertions(+), 81 deletions(-) delete mode 100644 packages/db/src/query/live/LiveQueryCollectionConfig.ts rename packages/db/src/query/live/{CollectionConfigBuilder.ts => collection-config-builder.ts} (98%) rename packages/db/src/query/live/{CollectionSubscriber.ts => collection-subscriber.ts} (99%) diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index d555df359..2fb81e6f4 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -52,4 +52,4 @@ export { liveQueryCollectionOptions, } from "./live-query-collection.js" -export { type LiveQueryCollectionConfig } from "./live/LiveQueryCollectionConfig.js" +export { type LiveQueryCollectionConfig } from "./live/types.js" diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 6992f45a3..6ef7e7f65 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -1,6 +1,6 @@ import { createCollection } from "../collection.js" -import { CollectionConfigBuilder } from "./live/CollectionConfigBuilder.js" -import type { LiveQueryCollectionConfig } from "./live/LiveQueryCollectionConfig.js" +import { CollectionConfigBuilder } from "./live/collection-config-builder.js" +import type { LiveQueryCollectionConfig } from "./live/types.js" import type { InitialQueryBuilder, QueryBuilder } from "./builder/index.js" import type { Collection } from "../collection.js" import type { CollectionConfig, UtilsRecord } from "../types.js" diff --git a/packages/db/src/query/live/LiveQueryCollectionConfig.ts b/packages/db/src/query/live/LiveQueryCollectionConfig.ts deleted file mode 100644 index 02df0d36d..000000000 --- a/packages/db/src/query/live/LiveQueryCollectionConfig.ts +++ /dev/null @@ -1,73 +0,0 @@ -import type { InitialQueryBuilder, QueryBuilder } from "../builder/index.js" -import type { CollectionConfig } from "../../types.js" -import type { Context, GetResult } from "../builder/types.js" - -/** - * Configuration interface for live query collection options - * - * @example - * ```typescript - * const config: LiveQueryCollectionConfig = { - * // id is optional - will auto-generate "live-query-1", "live-query-2", etc. - * query: (q) => q - * .from({ comment: commentsCollection }) - * .join( - * { user: usersCollection }, - * ({ comment, user }) => eq(comment.user_id, user.id) - * ) - * .where(({ comment }) => eq(comment.active, true)) - * .select(({ comment, user }) => ({ - * id: comment.id, - * content: comment.content, - * authorName: user.name, - * })), - * // getKey is optional - defaults to using stream key - * getKey: (item) => item.id, - * } - * ``` - */ -export interface LiveQueryCollectionConfig< - TContext extends Context, - TResult extends object = GetResult & object, -> { - /** - * Unique identifier for the collection - * If not provided, defaults to `live-query-${number}` with auto-incrementing number - */ - id?: string - - /** - * Query builder function that defines the live query - */ - query: - | ((q: InitialQueryBuilder) => QueryBuilder) - | QueryBuilder - - /** - * Function to extract the key from result items - * If not provided, defaults to using the key from the D2 stream - */ - getKey?: (item: TResult) => string | number - - /** - * Optional schema for validation - */ - schema?: CollectionConfig[`schema`] - - /** - * Optional mutation handlers - */ - onInsert?: CollectionConfig[`onInsert`] - onUpdate?: CollectionConfig[`onUpdate`] - onDelete?: CollectionConfig[`onDelete`] - - /** - * Start sync / the query immediately - */ - startSync?: boolean - - /** - * GC time for the collection - */ - gcTime?: number -} diff --git a/packages/db/src/query/live/CollectionConfigBuilder.ts b/packages/db/src/query/live/collection-config-builder.ts similarity index 98% rename from packages/db/src/query/live/CollectionConfigBuilder.ts rename to packages/db/src/query/live/collection-config-builder.ts index 7725c503e..c87fc23a4 100644 --- a/packages/db/src/query/live/CollectionConfigBuilder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,7 +1,7 @@ import { D2, output } from "@tanstack/db-ivm" import { compileQuery } from "../compiler/index.js" import { buildQuery, getQueryIR } from "../builder/index.js" -import { CollectionSubscriber } from "./CollectionSubscriber.js" +import { CollectionSubscriber } from "./collection-subscriber.js" import type { RootStreamBuilder } from "@tanstack/db-ivm" import type { OrderByOptimizationInfo } from "../compiler/order-by.js" import type { Collection } from "../../collection.js" @@ -14,8 +14,12 @@ import type { import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression, QueryIR } from "../ir.js" import type { LazyCollectionCallbacks } from "../compiler/joins.js" -import type { LiveQueryCollectionConfig } from "./LiveQueryCollectionConfig.js" -import type { Changes, FullSyncState, SyncState } from "./types.js" +import type { + Changes, + FullSyncState, + LiveQueryCollectionConfig, + SyncState, +} from "./types.js" // Global counter for auto-generated collection IDs let liveQueryCollectionCounter = 0 diff --git a/packages/db/src/query/live/CollectionSubscriber.ts b/packages/db/src/query/live/collection-subscriber.ts similarity index 99% rename from packages/db/src/query/live/CollectionSubscriber.ts rename to packages/db/src/query/live/collection-subscriber.ts index 2c2c1d15f..07fe577ff 100644 --- a/packages/db/src/query/live/CollectionSubscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -7,7 +7,7 @@ import type { Collection } from "../../collection.js" import type { ChangeMessage, SyncConfig } from "../../types.js" import type { Context, GetResult } from "../builder/types.js" import type { BasicExpression } from "../ir.js" -import type { CollectionConfigBuilder } from "./CollectionConfigBuilder.js" +import type { CollectionConfigBuilder } from "./collection-config-builder.js" export class CollectionSubscriber< TContext extends Context, diff --git a/packages/db/src/query/live/types.ts b/packages/db/src/query/live/types.ts index 6c382ff30..995101aef 100644 --- a/packages/db/src/query/live/types.ts +++ b/packages/db/src/query/live/types.ts @@ -1,5 +1,7 @@ import type { D2, RootStreamBuilder } from "@tanstack/db-ivm" -import type { ResultStream } from "../../types.js" +import type { CollectionConfig, ResultStream } from "../../types.js" +import type { InitialQueryBuilder, QueryBuilder } from "../builder/index.js" +import type { Context, GetResult } from "../builder/types.js" export type Changes = { deletes: number @@ -19,3 +21,73 @@ export type SyncState = { } export type FullSyncState = Required + +/** + * Configuration interface for live query collection options + * + * @example + * ```typescript + * const config: LiveQueryCollectionConfig = { + * // id is optional - will auto-generate "live-query-1", "live-query-2", etc. + * query: (q) => q + * .from({ comment: commentsCollection }) + * .join( + * { user: usersCollection }, + * ({ comment, user }) => eq(comment.user_id, user.id) + * ) + * .where(({ comment }) => eq(comment.active, true)) + * .select(({ comment, user }) => ({ + * id: comment.id, + * content: comment.content, + * authorName: user.name, + * })), + * // getKey is optional - defaults to using stream key + * getKey: (item) => item.id, + * } + * ``` + */ +export interface LiveQueryCollectionConfig< + TContext extends Context, + TResult extends object = GetResult & object, +> { + /** + * Unique identifier for the collection + * If not provided, defaults to `live-query-${number}` with auto-incrementing number + */ + id?: string + + /** + * Query builder function that defines the live query + */ + query: + | ((q: InitialQueryBuilder) => QueryBuilder) + | QueryBuilder + + /** + * Function to extract the key from result items + * If not provided, defaults to using the key from the D2 stream + */ + getKey?: (item: TResult) => string | number + + /** + * Optional schema for validation + */ + schema?: CollectionConfig[`schema`] + + /** + * Optional mutation handlers + */ + onInsert?: CollectionConfig[`onInsert`] + onUpdate?: CollectionConfig[`onUpdate`] + onDelete?: CollectionConfig[`onDelete`] + + /** + * Start sync / the query immediately + */ + startSync?: boolean + + /** + * GC time for the collection + */ + gcTime?: number +}