diff --git a/.changeset/sour-emus-count.md b/.changeset/sour-emus-count.md new file mode 100644 index 000000000..dcceba53e --- /dev/null +++ b/.changeset/sour-emus-count.md @@ -0,0 +1,6 @@ +--- +"@tanstack/db-ivm": patch +"@tanstack/db": patch +--- + +Optimize joins to use index on the join key when available. diff --git a/packages/db-ivm/src/operators/index.ts b/packages/db-ivm/src/operators/index.ts index f7dfa0949..f7619dc28 100644 --- a/packages/db-ivm/src/operators/index.ts +++ b/packages/db-ivm/src/operators/index.ts @@ -1,5 +1,6 @@ export * from "./pipe.js" export * from "./map.js" +export * from "./tap.js" export * from "./filter.js" export * from "./negate.js" export * from "./concat.js" diff --git a/packages/db-ivm/src/operators/tap.ts b/packages/db-ivm/src/operators/tap.ts new file mode 100644 index 000000000..caeca3535 --- /dev/null +++ b/packages/db-ivm/src/operators/tap.ts @@ -0,0 +1,53 @@ +import { DifferenceStreamWriter, LinearUnaryOperator } from "../graph.js" +import { StreamBuilder } from "../d2.js" +import type { IStreamBuilder, PipedOperator } from "../types.js" +import type { DifferenceStreamReader } from "../graph.js" +import type { MultiSet } from "../multiset.js" + +/** + * Operator that applies a function to each element in the input stream + */ +export class TapOperator extends LinearUnaryOperator { + #f: (data: T) => void + + constructor( + id: number, + inputA: DifferenceStreamReader, + output: DifferenceStreamWriter, + f: (data: T) => void + ) { + super(id, inputA, output) + this.#f = f + } + + inner(collection: MultiSet): MultiSet { + return collection.map((data) => { + this.#f(data) + return data + }) + } +} + +/** + * Invokes a function for each element in the input stream. + * This operator doesn't modify the stream and is used to perform side effects. + * @param f - The function to invoke on each element + * @returns The input stream + */ +export function tap(f: (data: T) => void): PipedOperator { + return (stream: IStreamBuilder): IStreamBuilder => { + const output = new StreamBuilder( + stream.graph, + new DifferenceStreamWriter() + ) + const operator = new TapOperator( + stream.graph.getNextOperatorId(), + stream.connectReader(), + output.writer, + f + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index eea7905cd..aa549544b 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -1397,8 +1397,8 @@ export class CollectionImpl< /** * Creates an index on a collection for faster queries. - * Indexes significantly improve query performance by allowing binary search - * and range queries instead of full scans. + * Indexes significantly improve query performance by allowing constant time lookups + * and logarithmic time range queries instead of full scans. * * @template TResolver - The type of the index resolver (constructor or async loader) * @param indexCallback - Function that extracts the indexed value from each item diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 3ae8cc804..43fb097f7 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -377,6 +377,12 @@ export class UnknownFunctionError extends QueryCompilationError { } } +export class JoinCollectionNotFoundError extends QueryCompilationError { + constructor(collectionId: string) { + super(`Collection "${collectionId}" not found during compilation of join`) + } +} + // JOIN Operation Errors export class JoinError extends TanStackDBError { constructor(message: string) { diff --git a/packages/db/src/indexes/auto-index.ts b/packages/db/src/indexes/auto-index.ts index de6d6238d..8ac95c5dc 100644 --- a/packages/db/src/indexes/auto-index.ts +++ b/packages/db/src/indexes/auto-index.ts @@ -6,6 +6,55 @@ export interface AutoIndexConfig { autoIndex?: `off` | `eager` } +function shouldAutoIndex(collection: CollectionImpl) { + // Only proceed if auto-indexing is enabled + if (collection.config.autoIndex !== `eager`) { + return false + } + + // Don't auto-index during sync operations + if ( + collection.status === `loading` || + collection.status === `initialCommit` + ) { + return false + } + + return true +} + +export function ensureIndexForField< + T extends Record, + TKey extends string | number, +>( + fieldName: string, + fieldPath: Array, + collection: CollectionImpl +) { + if (!shouldAutoIndex(collection)) { + return + } + + // Check if we already have an index for this field + const existingIndex = Array.from(collection.indexes.values()).find((index) => + index.matchesField(fieldPath) + ) + + if (existingIndex) { + return // Index already exists + } + + // Create a new index for this field using the collection's createIndex method + try { + collection.createIndex((row) => (row as any)[fieldName], { + name: `auto_${fieldName}`, + indexType: BTreeIndex, + }) + } catch (error) { + console.warn(`Failed to create auto-index for field "${fieldName}":`, error) + } +} + /** * Analyzes a where expression and creates indexes for all simple operations on single fields */ @@ -16,16 +65,7 @@ export function ensureIndexForExpression< expression: BasicExpression, collection: CollectionImpl ): void { - // Only proceed if auto-indexing is enabled - if (collection.config.autoIndex !== `eager`) { - return - } - - // Don't auto-index during sync operations - if ( - collection.status === `loading` || - collection.status === `initialCommit` - ) { + if (!shouldAutoIndex(collection)) { return } @@ -33,27 +73,7 @@ export function ensureIndexForExpression< const indexableExpressions = extractIndexableExpressions(expression) for (const { fieldName, fieldPath } of indexableExpressions) { - // Check if we already have an index for this field - const existingIndex = Array.from(collection.indexes.values()).find( - (index) => index.matchesField(fieldPath) - ) - - if (existingIndex) { - continue // Index already exists - } - - // Create a new index for this field using the collection's createIndex method - try { - collection.createIndex((row) => (row as any)[fieldName], { - name: `auto_${fieldName}`, - indexType: BTreeIndex, - }) - } catch (error) { - console.warn( - `Failed to create auto-index for field "${fieldName}":`, - error - ) - } + ensureIndexForField(fieldName, fieldPath, collection) } } diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 364670b49..acbe3e01f 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -7,6 +7,7 @@ import { LimitOffsetRequireOrderByError, UnsupportedFromTypeError, } from "../../errors.js" +import { PropRef } from "../ir.js" import { compileExpression } from "./evaluators.js" import { processJoins } from "./joins.js" import { processGroupBy } from "./group-by.js" @@ -18,6 +19,8 @@ import type { QueryIR, QueryRef, } from "../ir.js" +import type { LazyCollectionCallbacks } from "./joins.js" +import type { Collection } from "../../collection.js" import type { KeyedStream, NamespacedAndKeyedStream, @@ -29,6 +32,8 @@ import type { QueryCache, QueryMapping } from "./types.js" * Result of query compilation including both the pipeline and collection-specific WHERE clauses */ export interface CompilationResult { + /** The ID of the main collection */ + collectionId: string /** The compiled query pipeline */ pipeline: ResultStream /** Map of collection aliases to their WHERE clauses for index optimization */ @@ -46,6 +51,9 @@ export interface CompilationResult { export function compileQuery( rawQuery: QueryIR, inputs: Record, + collections: Record>, + callbacks: Record, + lazyCollections: Set, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap() ): CompilationResult { @@ -70,9 +78,16 @@ export function compileQuery( const tables: Record = {} // Process the FROM clause to get the main table - const { alias: mainTableAlias, input: mainInput } = processFrom( + const { + alias: mainTableAlias, + input: mainInput, + collectionId: mainCollectionId, + } = processFrom( query.from, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -96,10 +111,15 @@ export function compileQuery( pipeline, query.join, tables, + mainCollectionId, mainTableAlias, allInputs, cache, - queryMapping + queryMapping, + collections, + callbacks, + lazyCollections, + rawQuery ) } @@ -249,6 +269,7 @@ export function compileQuery( const result = resultPipeline // Cache the result before returning (use original query as key) const compilationResult = { + collectionId: mainCollectionId, pipeline: result, collectionWhereClauses, } @@ -275,6 +296,7 @@ export function compileQuery( const result = resultPipeline // Cache the result before returning (use original query as key) const compilationResult = { + collectionId: mainCollectionId, pipeline: result, collectionWhereClauses, } @@ -289,16 +311,19 @@ export function compileQuery( function processFrom( from: CollectionRef | QueryRef, allInputs: Record, + collections: Record, + callbacks: Record, + lazyCollections: Set, cache: QueryCache, queryMapping: QueryMapping -): { alias: string; input: KeyedStream } { +): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { const input = allInputs[from.collection.id] if (!input) { throw new CollectionInputNotFoundError(from.collection.id) } - return { alias: from.alias, input } + return { alias: from.alias, input, collectionId: from.collection.id } } case `queryRef`: { // Find the original query for caching purposes @@ -308,6 +333,9 @@ function processFrom( const subQueryResult = compileQuery( originalQuery, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -324,7 +352,11 @@ function processFrom( }) ) - return { alias: from.alias, input: extractedInput } + return { + alias: from.alias, + input: extractedInput, + collectionId: subQueryResult.collectionId, + } } default: throw new UnsupportedFromTypeError((from as any).type) @@ -380,3 +412,69 @@ function mapNestedQueries( } } } + +function getRefFromAlias( + query: QueryIR, + alias: string +): CollectionRef | QueryRef | void { + if (query.from.alias === alias) { + return query.from + } + + for (const join of query.join || []) { + if (join.from.alias === alias) { + return join.from + } + } +} + +/** + * Follows the given reference in a query + * until its finds the root field the reference points to. + * @returns The collection, its alias, and the path to the root field in this collection + */ +export function followRef( + query: QueryIR, + ref: PropRef, + collection: Collection +): { collection: Collection; path: Array } | void { + if (ref.path.length === 0) { + return + } + + if (ref.path.length === 1) { + // This field should be part of this collection + const field = ref.path[0]! + // is it part of the select clause? + if (query.select) { + const selectedField = query.select[field] + if (selectedField && selectedField.type === `ref`) { + return followRef(query, selectedField, collection) + } + } + + // Either this field is not part of the select clause + // and thus it must be part of the collection itself + // or it is part of the select but is not a reference + // so we can stop here and don't have to follow it + return { collection, path: [field] } + } + + if (ref.path.length > 1) { + // This is a nested field + const [alias, ...rest] = ref.path + const aliasRef = getRefFromAlias(query, alias!) + if (!aliasRef) { + return + } + + if (aliasRef.type === `queryRef`) { + return followRef(aliasRef.query, new PropRef(rest), collection) + } else { + // This is a reference to a collection + // we can't follow it further + // so the field must be on the collection itself + return { collection: aliasRef.collection, path: rest } + } + } +} diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 554b13b1b..3f2cbffb2 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -3,30 +3,44 @@ import { filter, join as joinOperator, map, + tap, } from "@tanstack/db-ivm" import { CollectionInputNotFoundError, InvalidJoinConditionSameTableError, InvalidJoinConditionTableMismatchError, InvalidJoinConditionWrongTablesError, + JoinCollectionNotFoundError, UnsupportedJoinSourceTypeError, UnsupportedJoinTypeError, } from "../../errors.js" +import { findIndexForField } from "../../utils/index-optimization.js" +import { ensureIndexForField } from "../../indexes/auto-index.js" import { compileExpression } from "./evaluators.js" -import { compileQuery } from "./index.js" -import type { IStreamBuilder, JoinType } from "@tanstack/db-ivm" +import { compileQuery, followRef } from "./index.js" import type { BasicExpression, CollectionRef, JoinClause, + PropRef, + QueryIR, QueryRef, } from "../ir.js" +import type { IStreamBuilder, JoinType } from "@tanstack/db-ivm" +import type { Collection } from "../../collection.js" import type { KeyedStream, NamespacedAndKeyedStream, NamespacedRow, } from "../../types.js" import type { QueryCache, QueryMapping } from "./types.js" +import type { BaseIndex } from "../../indexes/base-index.js" + +export type LoadKeysFn = (key: Set) => void +export type LazyCollectionCallbacks = { + loadKeys: LoadKeysFn + loadInitialState: () => void +} /** * Processes all join clauses in a query @@ -35,10 +49,15 @@ export function processJoins( pipeline: NamespacedAndKeyedStream, joinClauses: Array, tables: Record, + mainTableId: string, mainTableAlias: string, allInputs: Record, cache: QueryCache, - queryMapping: QueryMapping + queryMapping: QueryMapping, + collections: Record, + callbacks: Record, + lazyCollections: Set, + rawQuery: QueryIR ): NamespacedAndKeyedStream { let resultPipeline = pipeline @@ -47,10 +66,15 @@ export function processJoins( resultPipeline, joinClause, tables, + mainTableId, mainTableAlias, allInputs, cache, - queryMapping + queryMapping, + collections, + callbacks, + lazyCollections, + rawQuery ) } @@ -64,15 +88,27 @@ function processJoin( pipeline: NamespacedAndKeyedStream, joinClause: JoinClause, tables: Record, + mainTableId: string, mainTableAlias: string, allInputs: Record, cache: QueryCache, - queryMapping: QueryMapping + queryMapping: QueryMapping, + collections: Record, + callbacks: Record, + lazyCollections: Set, + rawQuery: QueryIR ): NamespacedAndKeyedStream { // Get the joined table alias and input stream - const { alias: joinedTableAlias, input: joinedInput } = processJoinSource( + const { + alias: joinedTableAlias, + input: joinedInput, + collectionId: joinedCollectionId, + } = processJoinSource( joinClause.from, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -80,13 +116,22 @@ function processJoin( // Add the joined table to the tables map tables[joinedTableAlias] = joinedInput - // Convert join type to D2 join type - const joinType: JoinType = - joinClause.type === `cross` - ? `inner` - : joinClause.type === `outer` - ? `full` - : (joinClause.type as JoinType) + const mainCollection = collections[mainTableId] + const joinedCollection = collections[joinedCollectionId] + + if (!mainCollection) { + throw new JoinCollectionNotFoundError(mainTableId) + } + + if (!joinedCollection) { + throw new JoinCollectionNotFoundError(joinedCollectionId) + } + + const { activeCollection, lazyCollection } = getActiveAndLazyCollections( + joinClause.type, + mainCollection, + joinedCollection + ) // Analyze which table each expression refers to and swap if necessary const { mainExpr, joinedExpr } = analyzeJoinExpressions( @@ -101,7 +146,7 @@ function processJoin( const compiledJoinedExpr = compileExpression(joinedExpr) // Prepare the main pipeline for joining - const mainPipeline = pipeline.pipe( + let mainPipeline = pipeline.pipe( map(([currentKey, namespacedRow]) => { // Extract the join key from the main table expression const mainKey = compiledMainExpr(namespacedRow) @@ -115,7 +160,7 @@ function processJoin( ) // Prepare the joined pipeline - const joinedPipeline = joinedInput.pipe( + let joinedPipeline = joinedInput.pipe( map(([currentKey, row]) => { // Wrap the row in a namespaced structure const namespacedRow: NamespacedRow = { [joinedTableAlias]: row } @@ -132,11 +177,96 @@ function processJoin( ) // Apply the join operation - if (![`inner`, `left`, `right`, `full`].includes(joinType)) { + if (![`inner`, `left`, `right`, `full`].includes(joinClause.type)) { throw new UnsupportedJoinTypeError(joinClause.type) } + + if (activeCollection) { + // This join can be optimized by having the active collection + // dynamically load keys into the lazy collection + // based on the value of the joinKey and by looking up + // matching rows in the index of the lazy collection + + // Mark the lazy collection as lazy + // this Set is passed by the liveQueryCollection to the compiler + // such that the liveQueryCollection can check it after compilation + // to know which collections are lazy collections + lazyCollections.add(lazyCollection.id) + + const activePipeline = + activeCollection === `main` ? mainPipeline : joinedPipeline + + let index: BaseIndex | undefined + + const lazyCollectionJoinExpr = + activeCollection === `main` + ? (joinedExpr as PropRef) + : (mainExpr as PropRef) + + const activeColl = + activeCollection === `main` ? collections[mainTableId]! : lazyCollection + + const followRefResult = followRef( + rawQuery, + lazyCollectionJoinExpr, + activeColl + )! + const followRefCollection = followRefResult.collection + + const fieldName = followRefResult.path[0] + if (fieldName) { + ensureIndexForField(fieldName, followRefResult.path, followRefCollection) + } + + const activePipelineWithLoading: IStreamBuilder< + [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] + > = activePipeline.pipe( + tap(([joinKey, _]) => { + // Find the index for the path we join on + // we need to find the index inside the map operator + // because the indexes are only available after the initial sync + // so we can't fetch it during compilation + index ??= findIndexForField( + followRefCollection.indexes, + followRefResult.path + ) + + // The `callbacks` object is passed by the liveQueryCollection to the compiler. + // It contains a function to lazy load keys for each lazy collection + // as well as a function to switch back to a regular collection + // (useful when there's no index for available for lazily loading the collection) + const collectionCallbacks = callbacks[lazyCollection.id] + if (!collectionCallbacks) { + throw new Error( + `Internal error: callbacks for collection are missing in join pipeline. Make sure the live query collection sets them before running the pipeline.` + ) + } + + const { loadKeys, loadInitialState } = collectionCallbacks + + if (index && index.supports(`eq`)) { + // Use the index to fetch the PKs of the rows in the lazy collection + // that match this row from the active collection based on the value of the joinKey + const matchingKeys = index.lookup(`eq`, joinKey) + // Inform the lazy collection that those keys need to be loaded + loadKeys(matchingKeys) + } else { + // We can't optimize the join because there is no index for the join key + // on the lazy collection, so we load the initial state + loadInitialState() + } + }) + ) + + if (activeCollection === `main`) { + mainPipeline = activePipelineWithLoading + } else { + joinedPipeline = activePipelineWithLoading + } + } + return mainPipeline.pipe( - joinOperator(joinedPipeline, joinType), + joinOperator(joinedPipeline, joinClause.type as JoinType), consolidate(), processJoinResults(joinClause.type) ) @@ -225,16 +355,19 @@ function getTableAliasFromExpression(expr: BasicExpression): string | null { function processJoinSource( from: CollectionRef | QueryRef, allInputs: Record, + collections: Record, + callbacks: Record, + lazyCollections: Set, cache: QueryCache, queryMapping: QueryMapping -): { alias: string; input: KeyedStream } { +): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { const input = allInputs[from.collection.id] if (!input) { throw new CollectionInputNotFoundError(from.collection.id) } - return { alias: from.alias, input } + return { alias: from.alias, input, collectionId: from.collection.id } } case `queryRef`: { // Find the original query for caching purposes @@ -244,6 +377,9 @@ function processJoinSource( const subQueryResult = compileQuery( originalQuery, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -260,7 +396,11 @@ function processJoinSource( }) ) - return { alias: from.alias, input: extractedInput as KeyedStream } + return { + alias: from.alias, + input: extractedInput as KeyedStream, + collectionId: subQueryResult.collectionId, + } } default: throw new UnsupportedJoinSourceTypeError((from as any).type) @@ -333,3 +473,45 @@ function processJoinResults(joinType: string) { ) } } + +/** + * Returns the active and lazy collections for a join clause. + * The active collection is the one that we need to fully iterate over + * and it can be the main table (i.e. left collection) or the joined table (i.e. right collection). + * The lazy collection is the one that we should join-in lazily based on matches in the active collection. + * @param joinClause - The join clause to analyze + * @param leftCollection - The left collection + * @param rightCollection - The right collection + * @returns The active and lazy collections. They are undefined if we need to loop over both collections (i.e. both are active) + */ +function getActiveAndLazyCollections( + joinType: JoinClause[`type`], + leftCollection: Collection, + rightCollection: Collection +): + | { activeCollection: `main` | `joined`; lazyCollection: Collection } + | { activeCollection: undefined; lazyCollection: undefined } { + if (leftCollection.id === rightCollection.id) { + // We can't apply this optimization if there's only one collection + // because `liveQueryCollection` will detect that the collection is lazy + // and treat it lazily (because the collection is shared) + // and thus it will not load any keys because both sides of the join + // will be handled lazily + return { activeCollection: undefined, lazyCollection: undefined } + } + + switch (joinType) { + case `left`: + return { activeCollection: `main`, lazyCollection: rightCollection } + case `right`: + return { activeCollection: `joined`, lazyCollection: leftCollection } + case `inner`: + // The smallest collection should be the active collection + // and the biggest collection should be lazy + return leftCollection.size < rightCollection.size + ? { activeCollection: `main`, lazyCollection: rightCollection } + : { activeCollection: `joined`, lazyCollection: leftCollection } + default: + return { activeCollection: undefined, lazyCollection: undefined } + } +} diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 9d7877f5c..9a8fd9abe 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -1,5 +1,6 @@ import { D2, MultiSet, output } from "@tanstack/db-ivm" import { createCollection } from "../collection.js" +import { createFilterFunctionFromExpression } from "../change-events.js" import { compileQuery } from "./compiler/index.js" import { buildQuery, getQueryIR } from "./builder/index.js" import { convertToBasicExpression } from "./compiler/expressions.js" @@ -16,6 +17,7 @@ import type { import type { Context, GetResult } from "./builder/types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" import type { BasicExpression } from "./ir.js" +import type { LazyCollectionCallbacks } from "./compiler/joins.js" // Global counter for auto-generated collection IDs let liveQueryCollectionCounter = 0 @@ -176,6 +178,11 @@ export function liveQueryCollectionOptions< | Map> | undefined + // Map of collection IDs to functions that load keys for that lazy collection + const lazyCollectionsCallbacks: Record = {} + // Set of collection IDs that are lazy collections + const lazyCollections = new Set() + const compileBasePipeline = () => { graphCache = new D2() inputsCache = Object.fromEntries( @@ -189,7 +196,13 @@ export function liveQueryCollectionOptions< ;({ pipeline: pipelineCache, collectionWhereClauses: collectionWhereClausesCache, - } = compileQuery(query, inputsCache as Record)) + } = compileQuery( + query, + inputsCache as Record, + collections, + lazyCollectionsCallbacks, + lazyCollections + )) } const maybeCompileBasePipeline = () => { @@ -292,9 +305,11 @@ export function liveQueryCollectionOptions< graph.finalize() + let subscribedToAllCollections = false + const maybeRunGraph = () => { // We only run the graph if all the collections are ready - if (allCollectionsReady()) { + if (allCollectionsReady() && subscribedToAllCollections) { graph.run() // On the initial run, we may need to do an empty commit to ensure that // the collection is initialized @@ -319,6 +334,121 @@ export function liveQueryCollectionOptions< ? collectionWhereClausesCache.get(collectionAlias) : undefined + const sendChangesToPipeline = ( + changes: Array> + ) => { + sendChangesToInput(input, changes, collection.config.getKey) + maybeRunGraph() + } + + const subscribeToAllChanges = ( + whereExpression: BasicExpression | undefined + ) => { + const unsubscribe = collection.subscribeChanges( + sendChangesToPipeline, + { + includeInitialState: true, + ...(whereExpression ? { whereExpression } : undefined), + } + ) + return unsubscribe + } + + // Subscribes to all changes but without the initial state + // such that we can load keys from the initial state on demand + // based on the matching keys from the main collection in the join + const subscribeToMatchingChanges = ( + whereExpression: BasicExpression | undefined + ) => { + const sentKeys = new Set() + + // Wraps the sendChangesToPipeline function + // in order to turn `update`s into `insert`s + // for keys that have not been sent to the pipeline yet + // and filter out deletes for keys that have not been sent + const sendVisibleChangesToPipeline = ( + changes: Array> + ) => { + if (loadedInitialState) { + // There was no index for the join key + // so we loaded the initial state + // so we can safely assume that the pipeline has seen all keys + return sendChangesToPipeline(changes) + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert` } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + } + newChanges.push(newChange) + } + + return sendChangesToPipeline(newChanges) + } + + const unsubscribe = collection.subscribeChanges( + sendVisibleChangesToPipeline, + { whereExpression } + ) + + // Create a function that loads keys from the collection + // into the query pipeline on demand + const filterFn = whereExpression + ? createFilterFunctionFromExpression(whereExpression) + : () => true + const loadKeys = (keys: Set) => { + for (const key of keys) { + // Only load the key once + if (sentKeys.has(key)) continue + + const value = collection.get(key) + if (value !== undefined && filterFn(value)) { + sentKeys.add(key) + sendChangesToPipeline([{ type: `insert`, key, value }]) + } + } + } + + let loadedInitialState = false + + // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map + // This is used by the join operator to dynamically load matching keys from the lazy collection + // or to get the full initial state of the collection if there's no index for the join key + lazyCollectionsCallbacks[collectionId] = { + loadKeys, + loadInitialState: () => { + // Make sure we only load the initial state once + if (loadedInitialState) return + loadedInitialState = true + + const changes = collection.currentStateAsChanges({ + whereExpression, + }) + sendChangesToPipeline(changes) + }, + } + return unsubscribe + } + + const subscribeToChanges = ( + whereExpression?: BasicExpression + ) => { + let unsubscribe: () => void + if (lazyCollections.has(collectionId)) { + unsubscribe = subscribeToMatchingChanges(whereExpression) + } else { + unsubscribe = subscribeToAllChanges(whereExpression) + } + unsubscribeCallbacks.add(unsubscribe) + } + if (whereClause) { // Convert WHERE clause to BasicExpression format for collection subscription const whereExpression = convertToBasicExpression( @@ -328,17 +458,7 @@ export function liveQueryCollectionOptions< if (whereExpression) { // Use index optimization for this collection - const subscription = collection.subscribeChanges( - (changes) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph() - }, - { - includeInitialState: true, - whereExpression: whereExpression, - } - ) - unsubscribeCallbacks.add(subscription) + subscribeToChanges(whereExpression) } else { // This should not happen - if we have a whereClause but can't create whereExpression, // it indicates a bug in our optimization logic @@ -349,17 +469,12 @@ export function liveQueryCollectionOptions< } } else { // No WHERE clause for this collection, use regular subscription - const subscription = collection.subscribeChanges( - (changes) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph() - }, - { includeInitialState: true } - ) - unsubscribeCallbacks.add(subscription) + subscribeToChanges() } }) + subscribedToAllCollections = true + // Initial run maybeRunGraph() diff --git a/packages/db/tests/collection-auto-index.test.ts b/packages/db/tests/collection-auto-index.test.ts index d1e1ac0b5..ce2be3fc2 100644 --- a/packages/db/tests/collection-auto-index.test.ts +++ b/packages/db/tests/collection-auto-index.test.ts @@ -10,7 +10,12 @@ import { or, } from "../src/query/builder/functions" import { createSingleRowRefProxy } from "../src/query/builder/ref-proxy" -import { expectIndexUsage, withIndexTracking } from "./utls" +import { createLiveQueryCollection } from "../src" +import { + createIndexUsageTracker, + expectIndexUsage, + withIndexTracking, +} from "./utls" // Global row proxy for expressions const row = createSingleRowRefProxy() @@ -24,6 +29,10 @@ interface TestItem { createdAt: Date } +type TestItem2 = Omit & { + id2: string +} + const testData: Array = [ { id: `1`, @@ -204,6 +213,8 @@ describe(`Collection Auto-Indexing`, () => { unsubscribe() }) + it(`should create auto-indexes for transformed fields of subqueries when autoIndex is "eager"`, async () => {}) + it(`should not create duplicate auto-indexes for the same field`, async () => { const autoIndexCollection = createCollection({ getKey: (item) => item.id, @@ -420,6 +431,118 @@ describe(`Collection Auto-Indexing`, () => { unsubscribe1() }) + it(`should create auto-indexes for join key on lazy collection when joining`, async () => { + const leftCollection = createCollection({ + getKey: (item) => item.id, + autoIndex: `eager`, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of testData) { + write({ + type: `insert`, + value: item, + }) + } + commit() + markReady() + }, + }, + onInsert: async (_) => {}, + }) + + const rightCollection = createCollection({ + getKey: (item) => item.id2, + autoIndex: `eager`, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + markReady() + }, + }, + }) + + await rightCollection.stateWhenReady() + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: leftCollection }) + .join( + { other: rightCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `left` + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + expect(liveQuery.size).toBe(testData.length) + + expect(rightCollection.indexes.size).toBe(1) + + const index = rightCollection.indexes.values().next().value! + expect(index.expression).toEqual({ + type: `ref`, + path: [`id2`], + }) + + const tracker = createIndexUsageTracker(rightCollection) + + // Now send another item through the left collection + // and check that it used the index to join it to items of the right collection + + leftCollection.insert({ + id: `other2`, + name: `New Item`, + age: 25, + status: `active`, + createdAt: new Date(), + }) + + expect(tracker.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id2`, + value: `other2`, + }, + ]) + + expect(liveQuery.size).toBe(testData.length + 1) + + tracker.restore() + }) + it(`should not create auto-indexes for unsupported operations`, async () => { const autoIndexCollection = createCollection({ getKey: (item) => item.id, diff --git a/packages/db/tests/query/compiler/basic.test.ts b/packages/db/tests/query/compiler/basic.test.ts index d8d2f472e..3c4f4693a 100644 --- a/packages/db/tests/query/compiler/basic.test.ts +++ b/packages/db/tests/query/compiler/basic.test.ts @@ -43,7 +43,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -90,7 +96,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -159,7 +171,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -216,7 +234,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index f69a49846..3a9f8c2d9 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -169,7 +169,13 @@ describe(`Query2 Subqueries`, () => { // Compile and execute the query const graph = new D2() const issuesInput = createIssueInput(graph) - const { pipeline } = compileQuery(builtQuery, { issues: issuesInput }) + const { pipeline } = compileQuery( + builtQuery, + { issues: issuesInput }, + { issues: issuesCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -202,6 +208,11 @@ describe(`Query2 Subqueries`, () => { }) describe(`Subqueries in JOIN clause`, () => { + const dummyCallbacks = { + loadKeys: (_: any) => {}, + loadInitialState: () => {}, + } + it(`supports subquery in join clause`, () => { // Create a subquery for active users const activeUsersQuery = new Query() @@ -261,10 +272,20 @@ describe(`Query2 Subqueries`, () => { const graph = new D2() const issuesInput = createIssueInput(graph) const usersInput = createUserInput(graph) - const { pipeline } = compileQuery(builtQuery, { - issues: issuesInput, - users: usersInput, - }) + const lazyCollections = new Set() + const { pipeline } = compileQuery( + builtQuery, + { + issues: issuesInput, + users: usersInput, + }, + { issues: issuesCollection, users: usersCollection }, + { issues: dummyCallbacks, users: dummyCallbacks }, + lazyCollections + ) + + // Since we're doing a left join, the collection on the right should be handled lazily + expect(lazyCollections).contains(usersCollection.id) const messages: Array> = [] pipeline.pipe( @@ -321,7 +342,13 @@ describe(`Query2 Subqueries`, () => { // Execute the aggregate query const graph = new D2() const issuesInput = createIssueInput(graph) - const { pipeline } = compileQuery(builtQuery, { issues: issuesInput }) + const { pipeline } = compileQuery( + builtQuery, + { issues: issuesInput }, + { issues: issuesCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index 99067d653..7bb8bf967 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -47,7 +47,14 @@ describe(`Subquery Caching`, () => { // First compilation without shared cache const cache1 = new WeakMap() - const result1 = compileQuery(mainQuery, inputs, cache1) + const result1 = compileQuery( + mainQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache1 + ) // Verify subquery is in first cache expect(cache1.has(subquery)).toBe(true) @@ -55,7 +62,14 @@ describe(`Subquery Caching`, () => { // Second compilation with different cache (should recompile everything) const cache2 = new WeakMap() - const result2 = compileQuery(mainQuery, inputs, cache2) + const result2 = compileQuery( + mainQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) // Results should be different objects (different compilation) expect(result1).not.toBe(result2) @@ -65,7 +79,14 @@ describe(`Subquery Caching`, () => { expect(cache2.has(mainQuery)).toBe(true) // Third compilation with the same cache as #2 (should reuse cached results) - const result3 = compileQuery(mainQuery, inputs, cache2) + const result3 = compileQuery( + mainQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) // Result should be the same object as #2 (reused from cache) expect(result3).toBe(result2) @@ -75,8 +96,22 @@ describe(`Subquery Caching`, () => { expect(cache2.has(mainQuery)).toBe(true) // Fourth compilation: compile just the subquery with cache2 (should reuse) - const subqueryResult1 = compileQuery(subquery, inputs, cache2) - const subqueryResult2 = compileQuery(subquery, inputs, cache2) + const subqueryResult1 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) + const subqueryResult2 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) // Both subquery compilations should return the same cached result expect(subqueryResult1).toBe(subqueryResult2) @@ -103,11 +138,25 @@ describe(`Subquery Caching`, () => { const sharedCache = new WeakMap() // First compilation - should add to cache - const result1 = compileQuery(subquery, inputs, sharedCache) + const result1 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) expect(sharedCache.has(subquery)).toBe(true) // Second compilation with same cache - should return cached result - const result2 = compileQuery(subquery, inputs, sharedCache) + const result2 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) expect(result1).toBe(result2) // Should be the exact same object reference }) @@ -143,8 +192,22 @@ describe(`Subquery Caching`, () => { const sharedCache = new WeakMap() // Compile both queries - const result1 = compileQuery(subquery1, inputs, sharedCache) - const result2 = compileQuery(subquery, inputs, sharedCache) + const result1 = compileQuery( + subquery1, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) + const result2 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) // Should have different results since they are different objects expect(result1).not.toBe(result2) @@ -198,7 +261,14 @@ describe(`Subquery Caching`, () => { const sharedCache = new WeakMap() // Compile the outer query - should cache innerSubquery and reuse it - const result = compileQuery(outerQuery, inputs, sharedCache) + const result = compileQuery( + outerQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) expect(result).toBeDefined() // Verify that innerSubquery is cached diff --git a/packages/db/tests/query/indexes.test.ts b/packages/db/tests/query/indexes.test.ts index 5dfad097b..b306362b2 100644 --- a/packages/db/tests/query/indexes.test.ts +++ b/packages/db/tests/query/indexes.test.ts @@ -25,6 +25,10 @@ interface TestItem { createdAt: Date } +type TestItem2 = Omit & { + id2: string +} + // Index usage tracking utilities (copied from collection-indexes.test.ts) interface IndexUsageStats { rangeQueryCalls: number @@ -713,6 +717,603 @@ describe(`Query Index Optimization`, () => { } }) + it(`should use index of biggest collection when inner-joining collections`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id2, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + // Since we're using an inner join, it will iterate over the smallest collection + // and join in matching keys from the bigger collection + // so it will iterate over the second collection and use the index for the status to find active items + // then for each such item (there is only 1), it will do an index lookup into the first collection to find matching items + // So we need an index on the status for the second collection + // and an index on the id for the first collection + collection.createIndex((row) => row.id) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `inner` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + ]) + + // We should have done an index lookup on the 2nd collection to find active items + // There should only be 1 active item in the second collection and it has id "1" + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find matching items + // i.e. items with id "1" + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id`, + value: `1`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should not optimize inner join if biggest collection has no index on the join key`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id2, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `inner` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.size).toBe(1) + + // We should have done an index lookup on the 2nd collection to find active items + // There should only be 1 active item in the second collection and it has id "1" + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should use index of right collection when left-joining collections`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id2, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + // Since we're using a left join, it will iterate over the left collection + // and join in matching keys from the right collection + secondCollection.createIndex((row) => row.id2) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `left` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should include all results from the first collection + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + { id: `3`, name: `Charlie` }, + { id: `5`, name: `Eve` }, + ]) + + // Combine stats from both collections + const combinedStats: IndexUsageStats = { + rangeQueryCalls: + tracker1.stats.rangeQueryCalls + tracker2.stats.rangeQueryCalls, + fullScanCalls: + tracker1.stats.fullScanCalls + tracker2.stats.fullScanCalls, + indexesUsed: [ + ...tracker1.stats.indexesUsed, + ...tracker2.stats.indexesUsed, + ], + queriesExecuted: [ + ...tracker1.stats.queriesExecuted, + ...tracker2.stats.queriesExecuted, + ], + } + + // We should have done an index lookup on the 1st collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + + // For each active item from the first collection + // we must have done an index lookup on the 2nd collection to find matching items + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id2`, + value: `1`, + }, + { + type: `index`, + operation: `eq`, + field: `id2`, + value: `3`, + }, + { + type: `index`, + operation: `eq`, + field: `id2`, + value: `5`, + }, + ]) + + expectIndexUsage(combinedStats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 4, + fullScanCallCount: 0, + }) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should not optimize left join if right collection has no index on the join key`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id2, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `left` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + { id: `3`, name: `Charlie` }, + { id: `5`, name: `Eve` }, + ]) + + // We should have done an index lookup on the left collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + + // We should have done a full scanof the right collection + // because it doesn't have any indexes + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should use index of left collection when right-joining collections`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id2, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + // Since we're using a right join, it will iterate over the right collection + // and join in matching keys from the left collection + collection.createIndex((row) => row.id) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `right` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should include all results from the first collection + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + ]) + + // We should have done a full scan of the right collection + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id`, + value: `1`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should not optimize right join if left collection has no index on the join key`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id2, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `right` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + ]) + + // We should have done a full scan of the right collection + // because it has no indexes + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the left collection to find active items + // because it has an index on the join key + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + it(`should optimize live queries with ORDER BY and LIMIT`, async () => { await withIndexTracking(collection, async (tracker) => { const liveQuery = createLiveQueryCollection({