From 6e3dccf47fd37c4f154d137a1a8fcb541a1221f7 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 30 Jul 2025 11:35:43 +0200 Subject: [PATCH 1/9] Optimize joins to use index when possible. --- packages/db/src/collection.ts | 64 +++++- packages/db/src/query/compiler/index.ts | 40 +++- packages/db/src/query/compiler/joins.ts | 200 ++++++++++++++++-- .../db/src/query/live-query-collection.ts | 89 ++++++-- .../db/tests/query/compiler/basic.test.ts | 32 ++- .../tests/query/compiler/subqueries.test.ts | 39 +++- .../query/compiler/subquery-caching.test.ts | 90 +++++++- packages/db/tests/query/indexes.test.ts | 124 +++++++++++ 8 files changed, 612 insertions(+), 66 deletions(-) diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index eea7905cd..98af5cdd7 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -37,6 +37,7 @@ import { UpdateKeyNotFoundError, } from "./errors" import { createFilteredCallback, currentStateAsChanges } from "./change-events" +import type { BasicExpression } from "./query/ir" import type { Transaction } from "./transactions" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { SingleRowRefProxy } from "./query/builder/ref-proxy" @@ -1397,8 +1398,8 @@ export class CollectionImpl< /** * Creates an index on a collection for faster queries. - * Indexes significantly improve query performance by allowing binary search - * and range queries instead of full scans. + * Indexes significantly improve query performance by allowing constant time lookups + * and logarithmic time range queries instead of full scans. * * @template TResolver - The type of the index resolver (constructor or async loader) * @param indexCallback - Function that extracts the indexed value from each item @@ -2283,6 +2284,65 @@ export class CollectionImpl< } } + /** + * Subscribes to changes for a dynamic set of keys + * @param callback - Function that is called when items in the key set change + * @param whereExpression - Optional where expression to filter the changes (needed in case we need to switch back to a regular subscription) + * @returns An object with an add method to add keys to the subscription and an unsubscribe method to stop listening for changes + * @example + * const { add, unsubscribe } = collection.subscribeChangesDynamic((changes) => { + * changes.forEach(change => { + * console.log(`${change.type}: ${change.key}`, change.value) + * }) + * }) + * add("user1") + */ + subscribeChangesDynamic( + callback: (changes: Array>) => void, + whereExpression?: BasicExpression + ) { + const keys = new Set() + const subscriptions = new Set<() => void>() + let switchedToRegularSubscription = false + + const add = (keySet: Set) => { + if (switchedToRegularSubscription) return + for (const key of keySet) { + if (keys.has(key)) break + keys.add(key) + subscriptions.add( + this.subscribeChangesKey(key, callback, { + includeInitialState: true, + }) + ) + } + } + + const unsubscribe = () => { + subscriptions.forEach((unsub) => unsub()) + subscriptions.clear() + keys.clear() + } + + const switchToRegularSubscription = () => { + if (switchedToRegularSubscription) return + unsubscribe() + switchedToRegularSubscription = true + this.subscribeChanges(callback, { + includeInitialState: true, + whereExpression, + }) + } + + const res = { + add, + unsubscribe, + switchToRegularSubscription, + } + + return res + } + /** * Subscribe to changes for a specific key */ diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 364670b49..2cccb4861 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -12,6 +12,8 @@ import { processJoins } from "./joins.js" import { processGroupBy } from "./group-by.js" import { processOrderBy } from "./order-by.js" import { processSelectToResults } from "./select.js" +import type { LazyCollectionCallbacks } from "./joins.js" +import type { Collection } from "../../collection.js" import type { BasicExpression, CollectionRef, @@ -29,6 +31,8 @@ import type { QueryCache, QueryMapping } from "./types.js" * Result of query compilation including both the pipeline and collection-specific WHERE clauses */ export interface CompilationResult { + /** The ID of the main collection */ + collectionId: string /** The compiled query pipeline */ pipeline: ResultStream /** Map of collection aliases to their WHERE clauses for index optimization */ @@ -46,6 +50,9 @@ export interface CompilationResult { export function compileQuery( rawQuery: QueryIR, inputs: Record, + collections: Record>, + callbacks: Record, + lazyCollections: Set, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap() ): CompilationResult { @@ -70,9 +77,16 @@ export function compileQuery( const tables: Record = {} // Process the FROM clause to get the main table - const { alias: mainTableAlias, input: mainInput } = processFrom( + const { + alias: mainTableAlias, + input: mainInput, + collectionId: mainCollectionId, + } = processFrom( query.from, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -96,10 +110,14 @@ export function compileQuery( pipeline, query.join, tables, + mainCollectionId, mainTableAlias, allInputs, cache, - queryMapping + queryMapping, + collections, + callbacks, + lazyCollections ) } @@ -249,6 +267,7 @@ export function compileQuery( const result = resultPipeline // Cache the result before returning (use original query as key) const compilationResult = { + collectionId: mainCollectionId, pipeline: result, collectionWhereClauses, } @@ -275,6 +294,7 @@ export function compileQuery( const result = resultPipeline // Cache the result before returning (use original query as key) const compilationResult = { + collectionId: mainCollectionId, pipeline: result, collectionWhereClauses, } @@ -289,16 +309,19 @@ export function compileQuery( function processFrom( from: CollectionRef | QueryRef, allInputs: Record, + collections: Record, + callbacks: Record, + lazyCollections: Set, cache: QueryCache, queryMapping: QueryMapping -): { alias: string; input: KeyedStream } { +): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { const input = allInputs[from.collection.id] if (!input) { throw new CollectionInputNotFoundError(from.collection.id) } - return { alias: from.alias, input } + return { alias: from.alias, input, collectionId: from.collection.id } } case `queryRef`: { // Find the original query for caching purposes @@ -308,6 +331,9 @@ function processFrom( const subQueryResult = compileQuery( originalQuery, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -324,7 +350,11 @@ function processFrom( }) ) - return { alias: from.alias, input: extractedInput } + return { + alias: from.alias, + input: extractedInput, + collectionId: subQueryResult.collectionId, + } } default: throw new UnsupportedFromTypeError((from as any).type) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 554b13b1b..bcb7b08b8 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -12,13 +12,16 @@ import { UnsupportedJoinSourceTypeError, UnsupportedJoinTypeError, } from "../../errors.js" +import { findIndexForField } from "../../utils/index-optimization.js" import { compileExpression } from "./evaluators.js" import { compileQuery } from "./index.js" import type { IStreamBuilder, JoinType } from "@tanstack/db-ivm" +import type { Collection } from "../../collection.js" import type { BasicExpression, CollectionRef, JoinClause, + PropRef, QueryRef, } from "../ir.js" import type { @@ -27,6 +30,13 @@ import type { NamespacedRow, } from "../../types.js" import type { QueryCache, QueryMapping } from "./types.js" +import type { BaseIndex } from "../../indexes/base-index.js" + +export type LoadKeysFn = (key: Set) => void +export type LazyCollectionCallbacks = { + loadKeys: LoadKeysFn + switchToRegularSubscription: () => void +} /** * Processes all join clauses in a query @@ -35,10 +45,14 @@ export function processJoins( pipeline: NamespacedAndKeyedStream, joinClauses: Array, tables: Record, + mainTableId: string, mainTableAlias: string, allInputs: Record, cache: QueryCache, - queryMapping: QueryMapping + queryMapping: QueryMapping, + collections: Record, + callbacks: Record, + lazyCollections: Set ): NamespacedAndKeyedStream { let resultPipeline = pipeline @@ -47,10 +61,14 @@ export function processJoins( resultPipeline, joinClause, tables, + mainTableId, mainTableAlias, allInputs, cache, - queryMapping + queryMapping, + collections, + callbacks, + lazyCollections ) } @@ -64,15 +82,26 @@ function processJoin( pipeline: NamespacedAndKeyedStream, joinClause: JoinClause, tables: Record, + mainTableId: string, mainTableAlias: string, allInputs: Record, cache: QueryCache, - queryMapping: QueryMapping + queryMapping: QueryMapping, + collections: Record, + callbacks: Record, + lazyCollections: Set ): NamespacedAndKeyedStream { // Get the joined table alias and input stream - const { alias: joinedTableAlias, input: joinedInput } = processJoinSource( + const { + alias: joinedTableAlias, + input: joinedInput, + collectionId: joinedCollectionId, + } = processJoinSource( joinClause.from, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -80,13 +109,26 @@ function processJoin( // Add the joined table to the tables map tables[joinedTableAlias] = joinedInput - // Convert join type to D2 join type - const joinType: JoinType = - joinClause.type === `cross` - ? `inner` - : joinClause.type === `outer` - ? `full` - : (joinClause.type as JoinType) + const mainCollection = collections[mainTableId] + const joinedCollection = collections[joinedCollectionId] + + if (!mainCollection) { + throw new Error( + `Internal error in join. Collection not found for ${mainTableId}` + ) + } + + if (!joinedCollection) { + throw new Error( + `Internal error in join. Collection not found for ${joinedCollectionId}` + ) + } + + const { activeCollection, lazyCollection } = getActiveAndLazyCollections( + joinClause.type, + mainCollection, + joinedCollection + ) // Analyze which table each expression refers to and swap if necessary const { mainExpr, joinedExpr } = analyzeJoinExpressions( @@ -101,7 +143,7 @@ function processJoin( const compiledJoinedExpr = compileExpression(joinedExpr) // Prepare the main pipeline for joining - const mainPipeline = pipeline.pipe( + let mainPipeline = pipeline.pipe( map(([currentKey, namespacedRow]) => { // Extract the join key from the main table expression const mainKey = compiledMainExpr(namespacedRow) @@ -115,7 +157,7 @@ function processJoin( ) // Prepare the joined pipeline - const joinedPipeline = joinedInput.pipe( + let joinedPipeline = joinedInput.pipe( map(([currentKey, row]) => { // Wrap the row in a namespaced structure const namespacedRow: NamespacedRow = { [joinedTableAlias]: row } @@ -132,11 +174,81 @@ function processJoin( ) // Apply the join operation - if (![`inner`, `left`, `right`, `full`].includes(joinType)) { + if (![`inner`, `left`, `right`, `full`].includes(joinClause.type)) { throw new UnsupportedJoinTypeError(joinClause.type) } + + if (activeCollection) { + // This join can be optimized by having the active collection + // dynamically load keys into the lazy collection + // based on the value of the joinKey and by looking up + // matching rows in the index of the lazy collection + + // Mark the lazy collection as lazy + // this Set is passed by the liveQueryCollection to the compiler + // such that the liveQueryCollection can check it after compilation + // to know which collections are lazy collections + lazyCollections.add(lazyCollection.id) + + const activePipeline = + activeCollection === `main` ? mainPipeline : joinedPipeline + + let index: BaseIndex | undefined + + const activePipelineWithLoading: IStreamBuilder< + [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] + > = activePipeline.pipe( + map(([joinKey, [originalKey, namespacedRow]]) => { + // Find the index for the path we join on + // we need to find the index inside the map operator + // because the indexes are only available after the initial sync + // so we can't fetch it during compilation + + // The joinedExpr is of the form `{ tableAlias }.{ joinKey }` + // so we need to strip the table alias from this path + const [_, ...indexPath] = (joinedExpr as PropRef).path + index ??= findIndexForField(lazyCollection.indexes, indexPath) + + // The `callbacks` object is passed by the liveQueryCollection to the compiler. + // It contains a function to lazy load keys for each lazy collection + // as well as a function to switch back to a regular collection + // (useful when there's no index for available for lazily loading the collection) + const collectionCallbacks = callbacks[lazyCollection.id] + if (!collectionCallbacks) { + throw new Error( + `Internal error: callbacks for collection are missing in join pipeline. Make sure the live query collection sets them before running the pipeline.` + ) + } + + const { loadKeys, switchToRegularSubscription } = collectionCallbacks + + if (index && index.supports(`eq`)) { + // Use the index to fetch the PKs of the rows in the lazy collection + // that match this row from the active collection based on the value of the joinKey + const matchingKeys = index.lookup(`eq`, joinKey) + // Inform the lazy collection that those keys need to be loaded + loadKeys(matchingKeys) + } else { + // We can't optimize the join because there is no index for the join key + // on the lazy collection, so we subscribe to all changes + switchToRegularSubscription() + } + + // TODO: introduce an identity operator which is not modifying the stream but only applying side effects and use that one instead of map + // Return the row unchanged + return [joinKey, [originalKey, namespacedRow]] + }) + ) + + if (activeCollection === `main`) { + mainPipeline = activePipelineWithLoading + } else { + joinedPipeline = activePipelineWithLoading + } + } + return mainPipeline.pipe( - joinOperator(joinedPipeline, joinType), + joinOperator(joinedPipeline, joinClause.type as JoinType), consolidate(), processJoinResults(joinClause.type) ) @@ -225,16 +337,19 @@ function getTableAliasFromExpression(expr: BasicExpression): string | null { function processJoinSource( from: CollectionRef | QueryRef, allInputs: Record, + collections: Record, + callbacks: Record, + lazyCollections: Set, cache: QueryCache, queryMapping: QueryMapping -): { alias: string; input: KeyedStream } { +): { alias: string; input: KeyedStream; collectionId: string } { switch (from.type) { case `collectionRef`: { const input = allInputs[from.collection.id] if (!input) { throw new CollectionInputNotFoundError(from.collection.id) } - return { alias: from.alias, input } + return { alias: from.alias, input, collectionId: from.collection.id } } case `queryRef`: { // Find the original query for caching purposes @@ -244,6 +359,9 @@ function processJoinSource( const subQueryResult = compileQuery( originalQuery, allInputs, + collections, + callbacks, + lazyCollections, cache, queryMapping ) @@ -260,7 +378,11 @@ function processJoinSource( }) ) - return { alias: from.alias, input: extractedInput as KeyedStream } + return { + alias: from.alias, + input: extractedInput as KeyedStream, + collectionId: subQueryResult.collectionId, + } } default: throw new UnsupportedJoinSourceTypeError((from as any).type) @@ -333,3 +455,45 @@ function processJoinResults(joinType: string) { ) } } + +/** + * Returns the active and lazy collections for a join clause. + * The active collection is the one that we need to fully iterate over + * and it can be the main table (i.e. left collection) or the joined table (i.e. right collection). + * The lazy collection is the one that we should join-in lazily based on matches in the active collection. + * @param joinClause - The join clause to analyze + * @param leftCollection - The left collection + * @param rightCollection - The right collection + * @returns The active and lazy collections. They are undefined if we need to loop over both collections (i.e. both are active) + */ +function getActiveAndLazyCollections( + joinType: JoinClause[`type`], + leftCollection: Collection, + rightCollection: Collection +): + | { activeCollection: `main` | `joined`; lazyCollection: Collection } + | { activeCollection: undefined; lazyCollection: undefined } { + if (leftCollection.id === rightCollection.id) { + // We can't apply this optimization if there's only one collection + // because `liveQueryCollection` will detect that the collection is lazy + // and treat it lazily (because the collection is shared) + // and thus it will not load any keys because both sides of the join + // will be handled lazily + return { activeCollection: undefined, lazyCollection: undefined } + } + + switch (joinType) { + case `left`: + return { activeCollection: `main`, lazyCollection: rightCollection } + case `right`: + return { activeCollection: `joined`, lazyCollection: leftCollection } + case `inner`: + // The smallest collection should be the active collection + // and the biggest collection should be lazy + return leftCollection.size < rightCollection.size + ? { activeCollection: `main`, lazyCollection: rightCollection } + : { activeCollection: `joined`, lazyCollection: leftCollection } + default: + return { activeCollection: undefined, lazyCollection: undefined } + } +} diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index 9d7877f5c..c874c22b8 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -16,6 +16,7 @@ import type { import type { Context, GetResult } from "./builder/types.js" import type { MultiSetArray, RootStreamBuilder } from "@tanstack/db-ivm" import type { BasicExpression } from "./ir.js" +import type { LazyCollectionCallbacks } from "./compiler/joins.js" // Global counter for auto-generated collection IDs let liveQueryCollectionCounter = 0 @@ -176,6 +177,11 @@ export function liveQueryCollectionOptions< | Map> | undefined + // Map of collection IDs to functions that load keys for that lazy collection + const lazyCollectionsCallbacks: Record = {} + // Set of collection IDs that are lazy collections + const lazyCollections = new Set() + const compileBasePipeline = () => { graphCache = new D2() inputsCache = Object.fromEntries( @@ -189,7 +195,13 @@ export function liveQueryCollectionOptions< ;({ pipeline: pipelineCache, collectionWhereClauses: collectionWhereClausesCache, - } = compileQuery(query, inputsCache as Record)) + } = compileQuery( + query, + inputsCache as Record, + collections, + lazyCollectionsCallbacks, + lazyCollections + )) } const maybeCompileBasePipeline = () => { @@ -292,9 +304,11 @@ export function liveQueryCollectionOptions< graph.finalize() + let subscribedToAllCollections = false + const maybeRunGraph = () => { // We only run the graph if all the collections are ready - if (allCollectionsReady()) { + if (allCollectionsReady() && subscribedToAllCollections) { graph.run() // On the initial run, we may need to do an empty commit to ensure that // the collection is initialized @@ -319,6 +333,54 @@ export function liveQueryCollectionOptions< ? collectionWhereClausesCache.get(collectionAlias) : undefined + const subscribeToAllChanges = ( + whereExpression: BasicExpression | undefined + ) => { + const unsubscribe = collection.subscribeChanges( + (changes) => { + sendChangesToInput(input, changes, collection.config.getKey) + maybeRunGraph() + }, + { + includeInitialState: true, + ...(whereExpression ? { whereExpression } : undefined), + } + ) + return unsubscribe + } + + const subscribeToMatchingChanges = ( + whereExpression: BasicExpression | undefined + ) => { + // This is used by lazy collections + // i.e. collections that are part of a join + // and can be loaded lazily based on the rows + // from the other collection in the join + const { add, switchToRegularSubscription, unsubscribe } = + collection.subscribeChangesDynamic((changes) => { + sendChangesToInput(input, changes, collection.config.getKey) + maybeRunGraph() + }, whereExpression) + // Store the `add` function in the `lazyLoaders` map + lazyCollectionsCallbacks[collectionId] = { + loadKeys: add, + switchToRegularSubscription, + } + return unsubscribe + } + + const subscribeToChanges = ( + whereExpression?: BasicExpression + ) => { + let unsubscribe: () => void + if (lazyCollections.has(collectionId)) { + unsubscribe = subscribeToMatchingChanges(whereExpression) + } else { + unsubscribe = subscribeToAllChanges(whereExpression) + } + unsubscribeCallbacks.add(unsubscribe) + } + if (whereClause) { // Convert WHERE clause to BasicExpression format for collection subscription const whereExpression = convertToBasicExpression( @@ -328,17 +390,7 @@ export function liveQueryCollectionOptions< if (whereExpression) { // Use index optimization for this collection - const subscription = collection.subscribeChanges( - (changes) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph() - }, - { - includeInitialState: true, - whereExpression: whereExpression, - } - ) - unsubscribeCallbacks.add(subscription) + subscribeToChanges(whereExpression) } else { // This should not happen - if we have a whereClause but can't create whereExpression, // it indicates a bug in our optimization logic @@ -349,17 +401,12 @@ export function liveQueryCollectionOptions< } } else { // No WHERE clause for this collection, use regular subscription - const subscription = collection.subscribeChanges( - (changes) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph() - }, - { includeInitialState: true } - ) - unsubscribeCallbacks.add(subscription) + subscribeToChanges() } }) + subscribedToAllCollections = true + // Initial run maybeRunGraph() diff --git a/packages/db/tests/query/compiler/basic.test.ts b/packages/db/tests/query/compiler/basic.test.ts index d8d2f472e..3c4f4693a 100644 --- a/packages/db/tests/query/compiler/basic.test.ts +++ b/packages/db/tests/query/compiler/basic.test.ts @@ -43,7 +43,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -90,7 +96,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -159,7 +171,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -216,7 +234,13 @@ describe(`Query2 Compiler`, () => { const graph = new D2() const input = graph.newInput<[number, User]>() - const { pipeline } = compileQuery(query, { users: input }) + const { pipeline } = compileQuery( + query, + { users: input }, + { users: usersCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index f69a49846..23d09b963 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -169,7 +169,13 @@ describe(`Query2 Subqueries`, () => { // Compile and execute the query const graph = new D2() const issuesInput = createIssueInput(graph) - const { pipeline } = compileQuery(builtQuery, { issues: issuesInput }) + const { pipeline } = compileQuery( + builtQuery, + { issues: issuesInput }, + { issues: issuesCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( @@ -202,6 +208,11 @@ describe(`Query2 Subqueries`, () => { }) describe(`Subqueries in JOIN clause`, () => { + const dummyCallbacks = { + loadKeys: (_: any) => {}, + switchToRegularSubscription: () => {}, + } + it(`supports subquery in join clause`, () => { // Create a subquery for active users const activeUsersQuery = new Query() @@ -261,10 +272,20 @@ describe(`Query2 Subqueries`, () => { const graph = new D2() const issuesInput = createIssueInput(graph) const usersInput = createUserInput(graph) - const { pipeline } = compileQuery(builtQuery, { - issues: issuesInput, - users: usersInput, - }) + const lazyCollections = new Set() + const { pipeline } = compileQuery( + builtQuery, + { + issues: issuesInput, + users: usersInput, + }, + { issues: issuesCollection, users: usersCollection }, + { issues: dummyCallbacks, users: dummyCallbacks }, + lazyCollections + ) + + // Since we're doing a left join, the collection on the right should be handled lazily + expect(lazyCollections).contains(usersCollection.id) const messages: Array> = [] pipeline.pipe( @@ -321,7 +342,13 @@ describe(`Query2 Subqueries`, () => { // Execute the aggregate query const graph = new D2() const issuesInput = createIssueInput(graph) - const { pipeline } = compileQuery(builtQuery, { issues: issuesInput }) + const { pipeline } = compileQuery( + builtQuery, + { issues: issuesInput }, + { issues: issuesCollection }, + {}, + new Set() + ) const messages: Array> = [] pipeline.pipe( diff --git a/packages/db/tests/query/compiler/subquery-caching.test.ts b/packages/db/tests/query/compiler/subquery-caching.test.ts index 99067d653..7bb8bf967 100644 --- a/packages/db/tests/query/compiler/subquery-caching.test.ts +++ b/packages/db/tests/query/compiler/subquery-caching.test.ts @@ -47,7 +47,14 @@ describe(`Subquery Caching`, () => { // First compilation without shared cache const cache1 = new WeakMap() - const result1 = compileQuery(mainQuery, inputs, cache1) + const result1 = compileQuery( + mainQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache1 + ) // Verify subquery is in first cache expect(cache1.has(subquery)).toBe(true) @@ -55,7 +62,14 @@ describe(`Subquery Caching`, () => { // Second compilation with different cache (should recompile everything) const cache2 = new WeakMap() - const result2 = compileQuery(mainQuery, inputs, cache2) + const result2 = compileQuery( + mainQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) // Results should be different objects (different compilation) expect(result1).not.toBe(result2) @@ -65,7 +79,14 @@ describe(`Subquery Caching`, () => { expect(cache2.has(mainQuery)).toBe(true) // Third compilation with the same cache as #2 (should reuse cached results) - const result3 = compileQuery(mainQuery, inputs, cache2) + const result3 = compileQuery( + mainQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) // Result should be the same object as #2 (reused from cache) expect(result3).toBe(result2) @@ -75,8 +96,22 @@ describe(`Subquery Caching`, () => { expect(cache2.has(mainQuery)).toBe(true) // Fourth compilation: compile just the subquery with cache2 (should reuse) - const subqueryResult1 = compileQuery(subquery, inputs, cache2) - const subqueryResult2 = compileQuery(subquery, inputs, cache2) + const subqueryResult1 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) + const subqueryResult2 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + cache2 + ) // Both subquery compilations should return the same cached result expect(subqueryResult1).toBe(subqueryResult2) @@ -103,11 +138,25 @@ describe(`Subquery Caching`, () => { const sharedCache = new WeakMap() // First compilation - should add to cache - const result1 = compileQuery(subquery, inputs, sharedCache) + const result1 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) expect(sharedCache.has(subquery)).toBe(true) // Second compilation with same cache - should return cached result - const result2 = compileQuery(subquery, inputs, sharedCache) + const result2 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) expect(result1).toBe(result2) // Should be the exact same object reference }) @@ -143,8 +192,22 @@ describe(`Subquery Caching`, () => { const sharedCache = new WeakMap() // Compile both queries - const result1 = compileQuery(subquery1, inputs, sharedCache) - const result2 = compileQuery(subquery, inputs, sharedCache) + const result1 = compileQuery( + subquery1, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) + const result2 = compileQuery( + subquery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) // Should have different results since they are different objects expect(result1).not.toBe(result2) @@ -198,7 +261,14 @@ describe(`Subquery Caching`, () => { const sharedCache = new WeakMap() // Compile the outer query - should cache innerSubquery and reuse it - const result = compileQuery(outerQuery, inputs, sharedCache) + const result = compileQuery( + outerQuery, + inputs, + { users: usersCollection }, + {}, + new Set(), + sharedCache + ) expect(result).toBeDefined() // Verify that innerSubquery is cached diff --git a/packages/db/tests/query/indexes.test.ts b/packages/db/tests/query/indexes.test.ts index 5dfad097b..1396fed71 100644 --- a/packages/db/tests/query/indexes.test.ts +++ b/packages/db/tests/query/indexes.test.ts @@ -713,6 +713,130 @@ describe(`Query Index Optimization`, () => { } }) + it(`should use available indexes when joining collections`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + // Since we're using an inner join, it will iterate over the smallest collection + // and join in matching keys from the bigger collection + // so it will iterate over the second collection and use the index for the status to find active items + // then for each such item (there is only 1), it will do an index lookup into the first collection to find matching items + // So we need an index on the status for the second collection + // and an index on the id for the first collection + secondCollection.createIndex((row) => row.status) + collection.createIndex((row) => row.id) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id), + `inner` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.size).toBe(1) + + // Combine stats from both collections + const combinedStats: IndexUsageStats = { + rangeQueryCalls: + tracker1.stats.rangeQueryCalls + tracker2.stats.rangeQueryCalls, + fullScanCalls: + tracker1.stats.fullScanCalls + tracker2.stats.fullScanCalls, + indexesUsed: [ + ...tracker1.stats.indexesUsed, + ...tracker2.stats.indexesUsed, + ], + queriesExecuted: [ + ...tracker1.stats.queriesExecuted, + ...tracker2.stats.queriesExecuted, + ], + } + + // We should have done an index lookup on the 2nd collection to find active items + // There should only be 1 active item in the second collection and it has id "1" + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find matching items + // i.e. items with id "1" + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id`, + value: `1`, + }, + ]) + + expectIndexUsage(combinedStats, { + shouldUseIndex: true, + shouldUseFullScan: false, + indexCallCount: 2, + fullScanCallCount: 0, + }) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + it(`should optimize live queries with ORDER BY and LIMIT`, async () => { await withIndexTracking(collection, async (tracker) => { const liveQuery = createLiveQueryCollection({ From 69ff88449f4af75b8d40193312f8e398648c7755 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 30 Jul 2025 14:40:13 +0200 Subject: [PATCH 2/9] Index tests for left, right, and inner joins. --- packages/db/tests/query/indexes.test.ts | 491 +++++++++++++++++++++++- 1 file changed, 482 insertions(+), 9 deletions(-) diff --git a/packages/db/tests/query/indexes.test.ts b/packages/db/tests/query/indexes.test.ts index 1396fed71..3a4490b67 100644 --- a/packages/db/tests/query/indexes.test.ts +++ b/packages/db/tests/query/indexes.test.ts @@ -713,7 +713,7 @@ describe(`Query Index Optimization`, () => { } }) - it(`should use available indexes when joining collections`, async () => { + it(`should use index of biggest collection when inner-joining collections`, async () => { // Create a second collection for the join with its own index const secondCollection = createCollection({ getKey: (item) => item.id, @@ -752,7 +752,6 @@ describe(`Query Index Optimization`, () => { // then for each such item (there is only 1), it will do an index lookup into the first collection to find matching items // So we need an index on the status for the second collection // and an index on the id for the first collection - secondCollection.createIndex((row) => row.status) collection.createIndex((row) => row.id) await secondCollection.stateWhenReady() @@ -761,6 +760,97 @@ describe(`Query Index Optimization`, () => { const tracker1 = createIndexUsageTracker(collection) const tracker2 = createIndexUsageTracker(secondCollection) + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id), + `inner` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + ]) + + // We should have done an index lookup on the 2nd collection to find active items + // There should only be 1 active item in the second collection and it has id "1" + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find matching items + // i.e. items with id "1" + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id`, + value: `1`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should not optimize inner join if biggest collection has no index on the join key`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + try { const liveQuery = createLiveQueryCollection({ query: (q: any) => @@ -787,6 +877,102 @@ describe(`Query Index Optimization`, () => { // Should have found results where both items are active expect(liveQuery.size).toBe(1) + // We should have done an index lookup on the 2nd collection to find active items + // There should only be 1 active item in the second collection and it has id "1" + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should use index of right collection when left-joining collections`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + // Since we're using a left join, it will iterate over the left collection + // and join in matching keys from the right collection + secondCollection.createIndex((row) => row.id) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id), + `left` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should include all results from the first collection + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + { id: `3`, name: `Charlie` }, + { id: `5`, name: `Eve` }, + ]) + // Combine stats from both collections const combinedStats: IndexUsageStats = { rangeQueryCalls: @@ -803,9 +989,8 @@ describe(`Query Index Optimization`, () => { ], } - // We should have done an index lookup on the 2nd collection to find active items - // There should only be 1 active item in the second collection and it has id "1" - expect(tracker2.stats.queriesExecuted).toEqual([ + // We should have done an index lookup on the 1st collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ { type: `index`, operation: `eq`, @@ -814,21 +999,33 @@ describe(`Query Index Optimization`, () => { }, ]) - // We should have done an index lookup on the 1st collection to find matching items - // i.e. items with id "1" - expect(tracker1.stats.queriesExecuted).toEqual([ + // For each active item from the first collection + // we must have done an index lookup on the 2nd collection to find matching items + expect(tracker2.stats.queriesExecuted).toEqual([ { type: `index`, operation: `eq`, field: `id`, value: `1`, }, + { + type: `index`, + operation: `eq`, + field: `id`, + value: `3`, + }, + { + type: `index`, + operation: `eq`, + field: `id`, + value: `5`, + }, ]) expectIndexUsage(combinedStats, { shouldUseIndex: true, shouldUseFullScan: false, - indexCallCount: 2, + indexCallCount: 4, fullScanCallCount: 0, }) } finally { @@ -837,6 +1034,282 @@ describe(`Query Index Optimization`, () => { } }) + it(`should not optimize left join if right collection has no index on the join key`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id), + `left` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + { id: `3`, name: `Charlie` }, + { id: `5`, name: `Eve` }, + ]) + + // We should have done an index lookup on the left collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + + // We should have done a full scanof the right collection + // because it doesn't have any indexes + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should use index of left collection when right-joining collections`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + // Since we're using a right join, it will iterate over the right collection + // and join in matching keys from the left collection + collection.createIndex((row) => row.id) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id), + `right` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should include all results from the first collection + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + ]) + + // We should have done a full scan of the right collection + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the 1st collection to find active items + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id`, + value: `1`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + + it(`should not optimize right join if left collection has no index on the join key`, async () => { + // Create a second collection for the join with its own index + const secondCollection = createCollection({ + getKey: (item) => item.id, + startSync: true, + sync: { + sync: ({ begin, write, commit }) => { + begin() + write({ + type: `insert`, + value: { + id: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + }, + }, + }) + + await secondCollection.stateWhenReady() + + // Track both collections + const tracker1 = createIndexUsageTracker(collection) + const tracker2 = createIndexUsageTracker(secondCollection) + + try { + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: collection }) + .join( + { other: secondCollection }, + ({ item, other }: any) => eq(item.id, other.id), + `right` + ) + .where(({ item, other }: any) => + and(eq(item.status, `active`), eq(other.status, `active`)) + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + // Should have found results where both items are active + expect(liveQuery.toArray).toEqual([ + { id: `1`, name: `Alice`, otherName: `Other Active Item` }, + ]) + + // We should have done a full scan of the right collection + // because it has no indexes + expect(tracker2.stats.queriesExecuted).toEqual([ + { + type: `fullScan`, + }, + ]) + + // We should have done an index lookup on the left collection to find active items + // because it has an index on the join key + expect(tracker1.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `status`, + value: `active`, + }, + ]) + } finally { + tracker1.restore() + tracker2.restore() + } + }) + it(`should optimize live queries with ORDER BY and LIMIT`, async () => { await withIndexTracking(collection, async (tracker) => { const liveQuery = createLiveQueryCollection({ From 9b623270651ae4a02852b7b96cd3981b4ddcd6c6 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 30 Jul 2025 15:29:33 +0200 Subject: [PATCH 3/9] Use the right join expression to determine the join key and the index to use --- packages/db/src/query/compiler/joins.ts | 8 +++++++- packages/db/tests/query/indexes.test.ts | 17 ++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index bcb7b08b8..1ca6354ec 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -206,7 +206,13 @@ function processJoin( // The joinedExpr is of the form `{ tableAlias }.{ joinKey }` // so we need to strip the table alias from this path - const [_, ...indexPath] = (joinedExpr as PropRef).path + + const lazyCollectionJoinExpr = + activeCollection === `main` + ? (joinedExpr as PropRef) + : (mainExpr as PropRef) + const [_, ...indexPath] = lazyCollectionJoinExpr.path + index ??= findIndexForField(lazyCollection.indexes, indexPath) // The `callbacks` object is passed by the liveQueryCollection to the compiler. diff --git a/packages/db/tests/query/indexes.test.ts b/packages/db/tests/query/indexes.test.ts index 3a4490b67..b2e24bafc 100644 --- a/packages/db/tests/query/indexes.test.ts +++ b/packages/db/tests/query/indexes.test.ts @@ -25,6 +25,10 @@ interface TestItem { createdAt: Date } +type TestItem2 = Omit & { + id2: string +} + // Index usage tracking utilities (copied from collection-indexes.test.ts) interface IndexUsageStats { rangeQueryCalls: number @@ -1128,8 +1132,8 @@ describe(`Query Index Optimization`, () => { it(`should use index of left collection when right-joining collections`, async () => { // Create a second collection for the join with its own index - const secondCollection = createCollection({ - getKey: (item) => item.id, + const secondCollection = createCollection({ + getKey: (item) => item.id2, startSync: true, sync: { sync: ({ begin, write, commit }) => { @@ -1137,7 +1141,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `1`, + id2: `1`, name: `Other Active Item`, age: 40, status: `active`, @@ -1147,7 +1151,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `other2`, + id2: `other2`, name: `Other Inactive Item`, age: 35, status: `inactive`, @@ -1169,6 +1173,9 @@ describe(`Query Index Optimization`, () => { const tracker1 = createIndexUsageTracker(collection) const tracker2 = createIndexUsageTracker(secondCollection) + // TODO: make sure all join tests here use different names for the columns that are joined + // such that we detect bugs due to the wrong join expr being used + try { const liveQuery = createLiveQueryCollection({ query: (q: any) => @@ -1176,7 +1183,7 @@ describe(`Query Index Optimization`, () => { .from({ item: collection }) .join( { other: secondCollection }, - ({ item, other }: any) => eq(item.id, other.id), + ({ item, other }: any) => eq(item.id, other.id2), `right` ) .where(({ item, other }: any) => From 98ba8b81df76917e93c6acc7711c6f5e756333df Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 30 Jul 2025 15:38:33 +0200 Subject: [PATCH 4/9] Modify join tests to use different column names for the columns that are joined on --- packages/db/tests/query/indexes.test.ts | 61 ++++++++++++------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/packages/db/tests/query/indexes.test.ts b/packages/db/tests/query/indexes.test.ts index b2e24bafc..b306362b2 100644 --- a/packages/db/tests/query/indexes.test.ts +++ b/packages/db/tests/query/indexes.test.ts @@ -719,8 +719,8 @@ describe(`Query Index Optimization`, () => { it(`should use index of biggest collection when inner-joining collections`, async () => { // Create a second collection for the join with its own index - const secondCollection = createCollection({ - getKey: (item) => item.id, + const secondCollection = createCollection({ + getKey: (item) => item.id2, startSync: true, sync: { sync: ({ begin, write, commit }) => { @@ -728,7 +728,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `1`, + id2: `1`, name: `Other Active Item`, age: 40, status: `active`, @@ -738,7 +738,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `other2`, + id2: `other2`, name: `Other Inactive Item`, age: 35, status: `inactive`, @@ -771,7 +771,7 @@ describe(`Query Index Optimization`, () => { .from({ item: collection }) .join( { other: secondCollection }, - ({ item, other }: any) => eq(item.id, other.id), + ({ item, other }: any) => eq(item.id, other.id2), `inner` ) .where(({ item, other }: any) => @@ -818,8 +818,8 @@ describe(`Query Index Optimization`, () => { it(`should not optimize inner join if biggest collection has no index on the join key`, async () => { // Create a second collection for the join with its own index - const secondCollection = createCollection({ - getKey: (item) => item.id, + const secondCollection = createCollection({ + getKey: (item) => item.id2, startSync: true, sync: { sync: ({ begin, write, commit }) => { @@ -827,7 +827,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `1`, + id2: `1`, name: `Other Active Item`, age: 40, status: `active`, @@ -837,7 +837,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `other2`, + id2: `other2`, name: `Other Inactive Item`, age: 35, status: `inactive`, @@ -862,7 +862,7 @@ describe(`Query Index Optimization`, () => { .from({ item: collection }) .join( { other: secondCollection }, - ({ item, other }: any) => eq(item.id, other.id), + ({ item, other }: any) => eq(item.id, other.id2), `inner` ) .where(({ item, other }: any) => @@ -906,8 +906,8 @@ describe(`Query Index Optimization`, () => { it(`should use index of right collection when left-joining collections`, async () => { // Create a second collection for the join with its own index - const secondCollection = createCollection({ - getKey: (item) => item.id, + const secondCollection = createCollection({ + getKey: (item) => item.id2, startSync: true, sync: { sync: ({ begin, write, commit }) => { @@ -915,7 +915,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `1`, + id2: `1`, name: `Other Active Item`, age: 40, status: `active`, @@ -925,7 +925,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `other2`, + id2: `other2`, name: `Other Inactive Item`, age: 35, status: `inactive`, @@ -939,7 +939,7 @@ describe(`Query Index Optimization`, () => { // Since we're using a left join, it will iterate over the left collection // and join in matching keys from the right collection - secondCollection.createIndex((row) => row.id) + secondCollection.createIndex((row) => row.id2) await secondCollection.stateWhenReady() @@ -954,7 +954,7 @@ describe(`Query Index Optimization`, () => { .from({ item: collection }) .join( { other: secondCollection }, - ({ item, other }: any) => eq(item.id, other.id), + ({ item, other }: any) => eq(item.id, other.id2), `left` ) .where(({ item, other }: any) => @@ -1009,19 +1009,19 @@ describe(`Query Index Optimization`, () => { { type: `index`, operation: `eq`, - field: `id`, + field: `id2`, value: `1`, }, { type: `index`, operation: `eq`, - field: `id`, + field: `id2`, value: `3`, }, { type: `index`, operation: `eq`, - field: `id`, + field: `id2`, value: `5`, }, ]) @@ -1040,8 +1040,8 @@ describe(`Query Index Optimization`, () => { it(`should not optimize left join if right collection has no index on the join key`, async () => { // Create a second collection for the join with its own index - const secondCollection = createCollection({ - getKey: (item) => item.id, + const secondCollection = createCollection({ + getKey: (item) => item.id2, startSync: true, sync: { sync: ({ begin, write, commit }) => { @@ -1049,7 +1049,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `1`, + id2: `1`, name: `Other Active Item`, age: 40, status: `active`, @@ -1059,7 +1059,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `other2`, + id2: `other2`, name: `Other Inactive Item`, age: 35, status: `inactive`, @@ -1084,7 +1084,7 @@ describe(`Query Index Optimization`, () => { .from({ item: collection }) .join( { other: secondCollection }, - ({ item, other }: any) => eq(item.id, other.id), + ({ item, other }: any) => eq(item.id, other.id2), `left` ) .where(({ item, other }: any) => @@ -1173,9 +1173,6 @@ describe(`Query Index Optimization`, () => { const tracker1 = createIndexUsageTracker(collection) const tracker2 = createIndexUsageTracker(secondCollection) - // TODO: make sure all join tests here use different names for the columns that are joined - // such that we detect bugs due to the wrong join expr being used - try { const liveQuery = createLiveQueryCollection({ query: (q: any) => @@ -1228,8 +1225,8 @@ describe(`Query Index Optimization`, () => { it(`should not optimize right join if left collection has no index on the join key`, async () => { // Create a second collection for the join with its own index - const secondCollection = createCollection({ - getKey: (item) => item.id, + const secondCollection = createCollection({ + getKey: (item) => item.id2, startSync: true, sync: { sync: ({ begin, write, commit }) => { @@ -1237,7 +1234,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `1`, + id2: `1`, name: `Other Active Item`, age: 40, status: `active`, @@ -1247,7 +1244,7 @@ describe(`Query Index Optimization`, () => { write({ type: `insert`, value: { - id: `other2`, + id2: `other2`, name: `Other Inactive Item`, age: 35, status: `inactive`, @@ -1272,7 +1269,7 @@ describe(`Query Index Optimization`, () => { .from({ item: collection }) .join( { other: secondCollection }, - ({ item, other }: any) => eq(item.id, other.id), + ({ item, other }: any) => eq(item.id, other.id2), `right` ) .where(({ item, other }: any) => From 2a2fa9a28a61b3e4b54a83bf7fa404923c3c35b3 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 31 Jul 2025 11:14:40 +0200 Subject: [PATCH 5/9] Automatically create index for join key in eager mode --- packages/db/src/query/compiler/joins.ts | 38 ++++-- .../db/tests/collection-auto-index.test.ts | 123 +++++++++++++++++- packages/db/tests/query/join.test.ts | 18 ++- 3 files changed, 165 insertions(+), 14 deletions(-) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 1ca6354ec..607861c2e 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -13,10 +13,11 @@ import { UnsupportedJoinTypeError, } from "../../errors.js" import { findIndexForField } from "../../utils/index-optimization.js" +import { Func } from "../ir.js" +import { ensureIndexForExpression } from "../../indexes/auto-index.js" import { compileExpression } from "./evaluators.js" +import { convertToBasicExpression } from "./expressions.js" import { compileQuery } from "./index.js" -import type { IStreamBuilder, JoinType } from "@tanstack/db-ivm" -import type { Collection } from "../../collection.js" import type { BasicExpression, CollectionRef, @@ -24,6 +25,8 @@ import type { PropRef, QueryRef, } from "../ir.js" +import type { IStreamBuilder, JoinType } from "@tanstack/db-ivm" +import type { Collection } from "../../collection.js" import type { KeyedStream, NamespacedAndKeyedStream, @@ -195,6 +198,23 @@ function processJoin( let index: BaseIndex | undefined + const [lazyCollectionJoinExpr, activeCollectionJoinExpr] = + activeCollection === `main` + ? [joinedExpr as PropRef, mainExpr as PropRef] + : [mainExpr as PropRef, joinedExpr as PropRef] + + // The lazyCollectionJoinExpr is of the form `{ tableAlias }.{ joinKey }` + // so we need to strip the table alias from this path + const [_, ...indexPath] = lazyCollectionJoinExpr.path + + const lazyCollectionAlias = + activeCollection === `main` ? joinedTableAlias : mainTableAlias + + const exprToIndex = convertToBasicExpression( + new Func(`eq`, [lazyCollectionJoinExpr, activeCollectionJoinExpr]), + lazyCollectionAlias + )! + const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( @@ -204,15 +224,7 @@ function processJoin( // because the indexes are only available after the initial sync // so we can't fetch it during compilation - // The joinedExpr is of the form `{ tableAlias }.{ joinKey }` - // so we need to strip the table alias from this path - - const lazyCollectionJoinExpr = - activeCollection === `main` - ? (joinedExpr as PropRef) - : (mainExpr as PropRef) - const [_, ...indexPath] = lazyCollectionJoinExpr.path - + ensureIndexForExpression(exprToIndex, lazyCollection) index ??= findIndexForField(lazyCollection.indexes, indexPath) // The `callbacks` object is passed by the liveQueryCollection to the compiler. @@ -233,6 +245,10 @@ function processJoin( // that match this row from the active collection based on the value of the joinKey const matchingKeys = index.lookup(`eq`, joinKey) // Inform the lazy collection that those keys need to be loaded + console.log( + `loading keys in ${lazyCollectionAlias} - ${lazyCollection.id} - ${JSON.stringify(lazyCollectionJoinExpr, null, 2)}: `, + JSON.stringify([...matchingKeys.values()], null, 2) + ) loadKeys(matchingKeys) } else { // We can't optimize the join because there is no index for the join key diff --git a/packages/db/tests/collection-auto-index.test.ts b/packages/db/tests/collection-auto-index.test.ts index d1e1ac0b5..ac91fb129 100644 --- a/packages/db/tests/collection-auto-index.test.ts +++ b/packages/db/tests/collection-auto-index.test.ts @@ -10,7 +10,12 @@ import { or, } from "../src/query/builder/functions" import { createSingleRowRefProxy } from "../src/query/builder/ref-proxy" -import { expectIndexUsage, withIndexTracking } from "./utls" +import { createLiveQueryCollection } from "../src" +import { + createIndexUsageTracker, + expectIndexUsage, + withIndexTracking, +} from "./utls" // Global row proxy for expressions const row = createSingleRowRefProxy() @@ -24,6 +29,10 @@ interface TestItem { createdAt: Date } +type TestItem2 = Omit & { + id2: string +} + const testData: Array = [ { id: `1`, @@ -420,6 +429,118 @@ describe(`Collection Auto-Indexing`, () => { unsubscribe1() }) + it(`should create auto-indexes for join key on lazy collection when joining`, async () => { + const leftCollection = createCollection({ + getKey: (item) => item.id, + autoIndex: `eager`, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + for (const item of testData) { + write({ + type: `insert`, + value: item, + }) + } + commit() + markReady() + }, + }, + onInsert: async (_) => {}, + }) + + const rightCollection = createCollection({ + getKey: (item) => item.id2, + autoIndex: `eager`, + startSync: true, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { + id2: `1`, + name: `Other Active Item`, + age: 40, + status: `active`, + createdAt: new Date(), + }, + }) + write({ + type: `insert`, + value: { + id2: `other2`, + name: `Other Inactive Item`, + age: 35, + status: `inactive`, + createdAt: new Date(), + }, + }) + commit() + markReady() + }, + }, + }) + + await rightCollection.stateWhenReady() + + const liveQuery = createLiveQueryCollection({ + query: (q: any) => + q + .from({ item: leftCollection }) + .join( + { other: rightCollection }, + ({ item, other }: any) => eq(item.id, other.id2), + `left` + ) + .select(({ item, other }: any) => ({ + id: item.id, + name: item.name, + otherName: other.name, + })), + startSync: true, + }) + + await liveQuery.stateWhenReady() + + expect(liveQuery.size).toBe(testData.length) + + expect(rightCollection.indexes.size).toBe(1) + + const index = rightCollection.indexes.values().next().value! + expect(index.expression).toEqual({ + type: `ref`, + path: [`id2`], + }) + + const tracker = createIndexUsageTracker(rightCollection) + + // Now send another item through the left collection + // and check that it used the index to join it to items of the right collection + + leftCollection.insert({ + id: `other2`, + name: `New Item`, + age: 25, + status: `active`, + createdAt: new Date(), + }) + + expect(tracker.stats.queriesExecuted).toEqual([ + { + type: `index`, + operation: `eq`, + field: `id2`, + value: `other2`, + }, + ]) + + expect(liveQuery.size).toBe(testData.length + 1) + + tracker.restore() + }) + it(`should not create auto-indexes for unsupported operations`, async () => { const autoIndexCollection = createCollection({ getKey: (item) => item.id, diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index df1b5f016..74d5287be 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -1,7 +1,7 @@ import { beforeEach, describe, expect, test } from "vitest" import { createLiveQueryCollection, eq } from "../../src/query/index.js" import { createCollection } from "../../src/collection.js" -import { mockSyncCollectionOptions } from "../utls.js" +import { createIndexUsageTracker, mockSyncCollectionOptions } from "../utls.js" // Sample data types for join testing type User = { @@ -277,7 +277,7 @@ function testJoinType(joinType: JoinType, autoIndex: `off` | `eager`) { } }) - test(`should handle live updates for ${joinType} joins - insert matching record`, () => { + test.only(`should handle live updates for ${joinType} joins - insert matching record`, () => { const joinQuery = createLiveQueryCollection({ startSync: true, query: (q) => @@ -294,6 +294,9 @@ function testJoinType(joinType: JoinType, autoIndex: `off` | `eager`) { })), }) + usersCollection.createIndex((row) => row.id) + departmentsCollection.createIndex((row) => row.id) + const initialSize = joinQuery.size // Insert a new user with existing department @@ -304,11 +307,22 @@ function testJoinType(joinType: JoinType, autoIndex: `off` | `eager`) { department_id: 1, // Engineering } + console.log(`gonna insert new user`) + + const tracker1 = createIndexUsageTracker(usersCollection) + const tracker2 = createIndexUsageTracker(departmentsCollection) + usersCollection.utils.begin() usersCollection.utils.write({ type: `insert`, value: newUser }) usersCollection.utils.commit() + console.log(`inserted new user`) + + console.log(`tracker1 stats:`, JSON.stringify(tracker1.stats, null, 2)) + console.log(`tracker2 stats:`, JSON.stringify(tracker2.stats, null, 2)) + // For all join types, adding a matching user should increase the count + console.log(`joinQuery res:`, JSON.stringify(joinQuery.toArray, null, 2)) expect(joinQuery.size).toBe(initialSize + 1) const eve = joinQuery.get(5) From 3ac12088a5a1f52cf070284af1ee97d862678e14 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 4 Aug 2025 13:21:38 +0200 Subject: [PATCH 6/9] Optimize initial load of join queries by using a regular subscription and loading matching keys dynamically. --- packages/db-ivm/src/operators/index.ts | 1 + packages/db-ivm/src/operators/tap.ts | 53 ++++++++++ packages/db/src/collection.ts | 60 ----------- packages/db/src/indexes/auto-index.ts | 82 ++++++++------ packages/db/src/query/compiler/index.ts | 74 ++++++++++++- packages/db/src/query/compiler/joins.ts | 69 ++++++------ .../db/src/query/live-query-collection.ts | 100 +++++++++++++++--- .../db/tests/collection-auto-index.test.ts | 2 + .../tests/query/compiler/subqueries.test.ts | 2 +- packages/db/tests/query/join.test.ts | 18 +--- 10 files changed, 299 insertions(+), 162 deletions(-) create mode 100644 packages/db-ivm/src/operators/tap.ts diff --git a/packages/db-ivm/src/operators/index.ts b/packages/db-ivm/src/operators/index.ts index f7dfa0949..f7619dc28 100644 --- a/packages/db-ivm/src/operators/index.ts +++ b/packages/db-ivm/src/operators/index.ts @@ -1,5 +1,6 @@ export * from "./pipe.js" export * from "./map.js" +export * from "./tap.js" export * from "./filter.js" export * from "./negate.js" export * from "./concat.js" diff --git a/packages/db-ivm/src/operators/tap.ts b/packages/db-ivm/src/operators/tap.ts new file mode 100644 index 000000000..caeca3535 --- /dev/null +++ b/packages/db-ivm/src/operators/tap.ts @@ -0,0 +1,53 @@ +import { DifferenceStreamWriter, LinearUnaryOperator } from "../graph.js" +import { StreamBuilder } from "../d2.js" +import type { IStreamBuilder, PipedOperator } from "../types.js" +import type { DifferenceStreamReader } from "../graph.js" +import type { MultiSet } from "../multiset.js" + +/** + * Operator that applies a function to each element in the input stream + */ +export class TapOperator extends LinearUnaryOperator { + #f: (data: T) => void + + constructor( + id: number, + inputA: DifferenceStreamReader, + output: DifferenceStreamWriter, + f: (data: T) => void + ) { + super(id, inputA, output) + this.#f = f + } + + inner(collection: MultiSet): MultiSet { + return collection.map((data) => { + this.#f(data) + return data + }) + } +} + +/** + * Invokes a function for each element in the input stream. + * This operator doesn't modify the stream and is used to perform side effects. + * @param f - The function to invoke on each element + * @returns The input stream + */ +export function tap(f: (data: T) => void): PipedOperator { + return (stream: IStreamBuilder): IStreamBuilder => { + const output = new StreamBuilder( + stream.graph, + new DifferenceStreamWriter() + ) + const operator = new TapOperator( + stream.graph.getNextOperatorId(), + stream.connectReader(), + output.writer, + f + ) + stream.graph.addOperator(operator) + stream.graph.addStream(output.connectReader()) + return output + } +} diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts index 98af5cdd7..aa549544b 100644 --- a/packages/db/src/collection.ts +++ b/packages/db/src/collection.ts @@ -37,7 +37,6 @@ import { UpdateKeyNotFoundError, } from "./errors" import { createFilteredCallback, currentStateAsChanges } from "./change-events" -import type { BasicExpression } from "./query/ir" import type { Transaction } from "./transactions" import type { StandardSchemaV1 } from "@standard-schema/spec" import type { SingleRowRefProxy } from "./query/builder/ref-proxy" @@ -2284,65 +2283,6 @@ export class CollectionImpl< } } - /** - * Subscribes to changes for a dynamic set of keys - * @param callback - Function that is called when items in the key set change - * @param whereExpression - Optional where expression to filter the changes (needed in case we need to switch back to a regular subscription) - * @returns An object with an add method to add keys to the subscription and an unsubscribe method to stop listening for changes - * @example - * const { add, unsubscribe } = collection.subscribeChangesDynamic((changes) => { - * changes.forEach(change => { - * console.log(`${change.type}: ${change.key}`, change.value) - * }) - * }) - * add("user1") - */ - subscribeChangesDynamic( - callback: (changes: Array>) => void, - whereExpression?: BasicExpression - ) { - const keys = new Set() - const subscriptions = new Set<() => void>() - let switchedToRegularSubscription = false - - const add = (keySet: Set) => { - if (switchedToRegularSubscription) return - for (const key of keySet) { - if (keys.has(key)) break - keys.add(key) - subscriptions.add( - this.subscribeChangesKey(key, callback, { - includeInitialState: true, - }) - ) - } - } - - const unsubscribe = () => { - subscriptions.forEach((unsub) => unsub()) - subscriptions.clear() - keys.clear() - } - - const switchToRegularSubscription = () => { - if (switchedToRegularSubscription) return - unsubscribe() - switchedToRegularSubscription = true - this.subscribeChanges(callback, { - includeInitialState: true, - whereExpression, - }) - } - - const res = { - add, - unsubscribe, - switchToRegularSubscription, - } - - return res - } - /** * Subscribe to changes for a specific key */ diff --git a/packages/db/src/indexes/auto-index.ts b/packages/db/src/indexes/auto-index.ts index de6d6238d..8ac95c5dc 100644 --- a/packages/db/src/indexes/auto-index.ts +++ b/packages/db/src/indexes/auto-index.ts @@ -6,6 +6,55 @@ export interface AutoIndexConfig { autoIndex?: `off` | `eager` } +function shouldAutoIndex(collection: CollectionImpl) { + // Only proceed if auto-indexing is enabled + if (collection.config.autoIndex !== `eager`) { + return false + } + + // Don't auto-index during sync operations + if ( + collection.status === `loading` || + collection.status === `initialCommit` + ) { + return false + } + + return true +} + +export function ensureIndexForField< + T extends Record, + TKey extends string | number, +>( + fieldName: string, + fieldPath: Array, + collection: CollectionImpl +) { + if (!shouldAutoIndex(collection)) { + return + } + + // Check if we already have an index for this field + const existingIndex = Array.from(collection.indexes.values()).find((index) => + index.matchesField(fieldPath) + ) + + if (existingIndex) { + return // Index already exists + } + + // Create a new index for this field using the collection's createIndex method + try { + collection.createIndex((row) => (row as any)[fieldName], { + name: `auto_${fieldName}`, + indexType: BTreeIndex, + }) + } catch (error) { + console.warn(`Failed to create auto-index for field "${fieldName}":`, error) + } +} + /** * Analyzes a where expression and creates indexes for all simple operations on single fields */ @@ -16,16 +65,7 @@ export function ensureIndexForExpression< expression: BasicExpression, collection: CollectionImpl ): void { - // Only proceed if auto-indexing is enabled - if (collection.config.autoIndex !== `eager`) { - return - } - - // Don't auto-index during sync operations - if ( - collection.status === `loading` || - collection.status === `initialCommit` - ) { + if (!shouldAutoIndex(collection)) { return } @@ -33,27 +73,7 @@ export function ensureIndexForExpression< const indexableExpressions = extractIndexableExpressions(expression) for (const { fieldName, fieldPath } of indexableExpressions) { - // Check if we already have an index for this field - const existingIndex = Array.from(collection.indexes.values()).find( - (index) => index.matchesField(fieldPath) - ) - - if (existingIndex) { - continue // Index already exists - } - - // Create a new index for this field using the collection's createIndex method - try { - collection.createIndex((row) => (row as any)[fieldName], { - name: `auto_${fieldName}`, - indexType: BTreeIndex, - }) - } catch (error) { - console.warn( - `Failed to create auto-index for field "${fieldName}":`, - error - ) - } + ensureIndexForField(fieldName, fieldPath, collection) } } diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 2cccb4861..7813cab76 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -7,19 +7,20 @@ import { LimitOffsetRequireOrderByError, UnsupportedFromTypeError, } from "../../errors.js" +import { PropRef } from "../ir.js" import { compileExpression } from "./evaluators.js" import { processJoins } from "./joins.js" import { processGroupBy } from "./group-by.js" import { processOrderBy } from "./order-by.js" import { processSelectToResults } from "./select.js" -import type { LazyCollectionCallbacks } from "./joins.js" -import type { Collection } from "../../collection.js" import type { BasicExpression, CollectionRef, QueryIR, QueryRef, } from "../ir.js" +import type { LazyCollectionCallbacks } from "./joins.js" +import type { Collection } from "../../collection.js" import type { KeyedStream, NamespacedAndKeyedStream, @@ -39,6 +40,72 @@ export interface CompilationResult { collectionWhereClauses: Map> } +function getRefFromAlias( + query: QueryIR, + alias: string +): CollectionRef | QueryRef | void { + if (query.from.alias === alias) { + return query.from + } + + for (const join of query.join || []) { + if (join.from.alias === alias) { + return join.from + } + } +} + +/** + * Follows the given reference in a query + * until its finds the root field the reference points to. + * @returns The collection, its alias, and the path to the root field in this collection + */ +export function followRef( + query: QueryIR, + ref: PropRef, + collection: Collection +): { collection: Collection; path: Array } | void { + if (ref.path.length === 0) { + return + } + + if (ref.path.length === 1) { + // This field should be part of this collection + const field = ref.path[0]! + // is it part of the select clause? + if (query.select) { + const selectedField = query.select[field] + if (selectedField && selectedField.type === `ref`) { + return followRef(query, selectedField, collection) + } + } + + // Either this field is not part of the select clause + // and thus it must be part of the collection itself + // or it is part of the select but is not a reference + // so we can stop here and don't have to follow it + return { collection, path: [field] } + } + + if (ref.path.length > 1) { + // This is a nested field + const [alias, ...rest] = ref.path + const aliasRef = getRefFromAlias(query, alias!) + if (!aliasRef) { + return + } + + if (aliasRef.type === `queryRef`) { + return followRef(aliasRef.query, new PropRef(rest), collection) + } else { + // This is a reference to a collection + // we can't follow it further + // so the field must be on the collection itself + return { collection: aliasRef.collection, path: rest } + } + } +} + /** * Compiles a query2 IR into a D2 pipeline * @param rawQuery The query IR to compile @@ -117,7 +184,8 @@ export function compileQuery( queryMapping, collections, callbacks, - lazyCollections + lazyCollections, + rawQuery ) } diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 607861c2e..08a19f4b5 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -3,6 +3,7 @@ import { filter, join as joinOperator, map, + tap, } from "@tanstack/db-ivm" import { CollectionInputNotFoundError, @@ -13,16 +14,15 @@ import { UnsupportedJoinTypeError, } from "../../errors.js" import { findIndexForField } from "../../utils/index-optimization.js" -import { Func } from "../ir.js" -import { ensureIndexForExpression } from "../../indexes/auto-index.js" +import { ensureIndexForField } from "../../indexes/auto-index.js" import { compileExpression } from "./evaluators.js" -import { convertToBasicExpression } from "./expressions.js" -import { compileQuery } from "./index.js" +import { compileQuery, followRef } from "./index.js" import type { BasicExpression, CollectionRef, JoinClause, PropRef, + QueryIR, QueryRef, } from "../ir.js" import type { IStreamBuilder, JoinType } from "@tanstack/db-ivm" @@ -38,7 +38,7 @@ import type { BaseIndex } from "../../indexes/base-index.js" export type LoadKeysFn = (key: Set) => void export type LazyCollectionCallbacks = { loadKeys: LoadKeysFn - switchToRegularSubscription: () => void + loadInitialState: () => void } /** @@ -55,7 +55,8 @@ export function processJoins( queryMapping: QueryMapping, collections: Record, callbacks: Record, - lazyCollections: Set + lazyCollections: Set, + rawQuery: QueryIR ): NamespacedAndKeyedStream { let resultPipeline = pipeline @@ -71,7 +72,8 @@ export function processJoins( queryMapping, collections, callbacks, - lazyCollections + lazyCollections, + rawQuery ) } @@ -92,7 +94,8 @@ function processJoin( queryMapping: QueryMapping, collections: Record, callbacks: Record, - lazyCollections: Set + lazyCollections: Set, + rawQuery: QueryIR ): NamespacedAndKeyedStream { // Get the joined table alias and input stream const { @@ -198,34 +201,38 @@ function processJoin( let index: BaseIndex | undefined - const [lazyCollectionJoinExpr, activeCollectionJoinExpr] = + const lazyCollectionJoinExpr = activeCollection === `main` - ? [joinedExpr as PropRef, mainExpr as PropRef] - : [mainExpr as PropRef, joinedExpr as PropRef] + ? (joinedExpr as PropRef) + : (mainExpr as PropRef) - // The lazyCollectionJoinExpr is of the form `{ tableAlias }.{ joinKey }` - // so we need to strip the table alias from this path - const [_, ...indexPath] = lazyCollectionJoinExpr.path + const activeColl = + activeCollection === `main` ? collections[mainTableId]! : lazyCollection - const lazyCollectionAlias = - activeCollection === `main` ? joinedTableAlias : mainTableAlias - - const exprToIndex = convertToBasicExpression( - new Func(`eq`, [lazyCollectionJoinExpr, activeCollectionJoinExpr]), - lazyCollectionAlias + const followRefResult = followRef( + rawQuery, + lazyCollectionJoinExpr, + activeColl )! + const followRefCollection = followRefResult.collection + + const fieldName = followRefResult.path[0] + if (fieldName) { + ensureIndexForField(fieldName, followRefResult.path, followRefCollection) + } const activePipelineWithLoading: IStreamBuilder< [key: unknown, [originalKey: string, namespacedRow: NamespacedRow]] > = activePipeline.pipe( - map(([joinKey, [originalKey, namespacedRow]]) => { + tap(([joinKey, _]) => { // Find the index for the path we join on // we need to find the index inside the map operator // because the indexes are only available after the initial sync // so we can't fetch it during compilation - - ensureIndexForExpression(exprToIndex, lazyCollection) - index ??= findIndexForField(lazyCollection.indexes, indexPath) + index ??= findIndexForField( + followRefCollection.indexes, + followRefResult.path + ) // The `callbacks` object is passed by the liveQueryCollection to the compiler. // It contains a function to lazy load keys for each lazy collection @@ -238,27 +245,19 @@ function processJoin( ) } - const { loadKeys, switchToRegularSubscription } = collectionCallbacks + const { loadKeys, loadInitialState } = collectionCallbacks if (index && index.supports(`eq`)) { // Use the index to fetch the PKs of the rows in the lazy collection // that match this row from the active collection based on the value of the joinKey const matchingKeys = index.lookup(`eq`, joinKey) // Inform the lazy collection that those keys need to be loaded - console.log( - `loading keys in ${lazyCollectionAlias} - ${lazyCollection.id} - ${JSON.stringify(lazyCollectionJoinExpr, null, 2)}: `, - JSON.stringify([...matchingKeys.values()], null, 2) - ) loadKeys(matchingKeys) } else { // We can't optimize the join because there is no index for the join key - // on the lazy collection, so we subscribe to all changes - switchToRegularSubscription() + // on the lazy collection, so we load the initial state + loadInitialState() } - - // TODO: introduce an identity operator which is not modifying the stream but only applying side effects and use that one instead of map - // Return the row unchanged - return [joinKey, [originalKey, namespacedRow]] }) ) diff --git a/packages/db/src/query/live-query-collection.ts b/packages/db/src/query/live-query-collection.ts index c874c22b8..9a8fd9abe 100644 --- a/packages/db/src/query/live-query-collection.ts +++ b/packages/db/src/query/live-query-collection.ts @@ -1,5 +1,6 @@ import { D2, MultiSet, output } from "@tanstack/db-ivm" import { createCollection } from "../collection.js" +import { createFilterFunctionFromExpression } from "../change-events.js" import { compileQuery } from "./compiler/index.js" import { buildQuery, getQueryIR } from "./builder/index.js" import { convertToBasicExpression } from "./compiler/expressions.js" @@ -333,14 +334,18 @@ export function liveQueryCollectionOptions< ? collectionWhereClausesCache.get(collectionAlias) : undefined + const sendChangesToPipeline = ( + changes: Array> + ) => { + sendChangesToInput(input, changes, collection.config.getKey) + maybeRunGraph() + } + const subscribeToAllChanges = ( whereExpression: BasicExpression | undefined ) => { const unsubscribe = collection.subscribeChanges( - (changes) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph() - }, + sendChangesToPipeline, { includeInitialState: true, ...(whereExpression ? { whereExpression } : undefined), @@ -349,22 +354,85 @@ export function liveQueryCollectionOptions< return unsubscribe } + // Subscribes to all changes but without the initial state + // such that we can load keys from the initial state on demand + // based on the matching keys from the main collection in the join const subscribeToMatchingChanges = ( whereExpression: BasicExpression | undefined ) => { - // This is used by lazy collections - // i.e. collections that are part of a join - // and can be loaded lazily based on the rows - // from the other collection in the join - const { add, switchToRegularSubscription, unsubscribe } = - collection.subscribeChangesDynamic((changes) => { - sendChangesToInput(input, changes, collection.config.getKey) - maybeRunGraph() - }, whereExpression) - // Store the `add` function in the `lazyLoaders` map + const sentKeys = new Set() + + // Wraps the sendChangesToPipeline function + // in order to turn `update`s into `insert`s + // for keys that have not been sent to the pipeline yet + // and filter out deletes for keys that have not been sent + const sendVisibleChangesToPipeline = ( + changes: Array> + ) => { + if (loadedInitialState) { + // There was no index for the join key + // so we loaded the initial state + // so we can safely assume that the pipeline has seen all keys + return sendChangesToPipeline(changes) + } + + const newChanges = [] + for (const change of changes) { + let newChange = change + if (!sentKeys.has(change.key)) { + if (change.type === `update`) { + newChange = { ...change, type: `insert` } + } else if (change.type === `delete`) { + // filter out deletes for keys that have not been sent + continue + } + } + newChanges.push(newChange) + } + + return sendChangesToPipeline(newChanges) + } + + const unsubscribe = collection.subscribeChanges( + sendVisibleChangesToPipeline, + { whereExpression } + ) + + // Create a function that loads keys from the collection + // into the query pipeline on demand + const filterFn = whereExpression + ? createFilterFunctionFromExpression(whereExpression) + : () => true + const loadKeys = (keys: Set) => { + for (const key of keys) { + // Only load the key once + if (sentKeys.has(key)) continue + + const value = collection.get(key) + if (value !== undefined && filterFn(value)) { + sentKeys.add(key) + sendChangesToPipeline([{ type: `insert`, key, value }]) + } + } + } + + let loadedInitialState = false + + // Store the functions to load keys and load initial state in the `lazyCollectionsCallbacks` map + // This is used by the join operator to dynamically load matching keys from the lazy collection + // or to get the full initial state of the collection if there's no index for the join key lazyCollectionsCallbacks[collectionId] = { - loadKeys: add, - switchToRegularSubscription, + loadKeys, + loadInitialState: () => { + // Make sure we only load the initial state once + if (loadedInitialState) return + loadedInitialState = true + + const changes = collection.currentStateAsChanges({ + whereExpression, + }) + sendChangesToPipeline(changes) + }, } return unsubscribe } diff --git a/packages/db/tests/collection-auto-index.test.ts b/packages/db/tests/collection-auto-index.test.ts index ac91fb129..ce2be3fc2 100644 --- a/packages/db/tests/collection-auto-index.test.ts +++ b/packages/db/tests/collection-auto-index.test.ts @@ -213,6 +213,8 @@ describe(`Collection Auto-Indexing`, () => { unsubscribe() }) + it(`should create auto-indexes for transformed fields of subqueries when autoIndex is "eager"`, async () => {}) + it(`should not create duplicate auto-indexes for the same field`, async () => { const autoIndexCollection = createCollection({ getKey: (item) => item.id, diff --git a/packages/db/tests/query/compiler/subqueries.test.ts b/packages/db/tests/query/compiler/subqueries.test.ts index 23d09b963..3a9f8c2d9 100644 --- a/packages/db/tests/query/compiler/subqueries.test.ts +++ b/packages/db/tests/query/compiler/subqueries.test.ts @@ -210,7 +210,7 @@ describe(`Query2 Subqueries`, () => { describe(`Subqueries in JOIN clause`, () => { const dummyCallbacks = { loadKeys: (_: any) => {}, - switchToRegularSubscription: () => {}, + loadInitialState: () => {}, } it(`supports subquery in join clause`, () => { diff --git a/packages/db/tests/query/join.test.ts b/packages/db/tests/query/join.test.ts index 74d5287be..df1b5f016 100644 --- a/packages/db/tests/query/join.test.ts +++ b/packages/db/tests/query/join.test.ts @@ -1,7 +1,7 @@ import { beforeEach, describe, expect, test } from "vitest" import { createLiveQueryCollection, eq } from "../../src/query/index.js" import { createCollection } from "../../src/collection.js" -import { createIndexUsageTracker, mockSyncCollectionOptions } from "../utls.js" +import { mockSyncCollectionOptions } from "../utls.js" // Sample data types for join testing type User = { @@ -277,7 +277,7 @@ function testJoinType(joinType: JoinType, autoIndex: `off` | `eager`) { } }) - test.only(`should handle live updates for ${joinType} joins - insert matching record`, () => { + test(`should handle live updates for ${joinType} joins - insert matching record`, () => { const joinQuery = createLiveQueryCollection({ startSync: true, query: (q) => @@ -294,9 +294,6 @@ function testJoinType(joinType: JoinType, autoIndex: `off` | `eager`) { })), }) - usersCollection.createIndex((row) => row.id) - departmentsCollection.createIndex((row) => row.id) - const initialSize = joinQuery.size // Insert a new user with existing department @@ -307,22 +304,11 @@ function testJoinType(joinType: JoinType, autoIndex: `off` | `eager`) { department_id: 1, // Engineering } - console.log(`gonna insert new user`) - - const tracker1 = createIndexUsageTracker(usersCollection) - const tracker2 = createIndexUsageTracker(departmentsCollection) - usersCollection.utils.begin() usersCollection.utils.write({ type: `insert`, value: newUser }) usersCollection.utils.commit() - console.log(`inserted new user`) - - console.log(`tracker1 stats:`, JSON.stringify(tracker1.stats, null, 2)) - console.log(`tracker2 stats:`, JSON.stringify(tracker2.stats, null, 2)) - // For all join types, adding a matching user should increase the count - console.log(`joinQuery res:`, JSON.stringify(joinQuery.toArray, null, 2)) expect(joinQuery.size).toBe(initialSize + 1) const eve = joinQuery.get(5) From 02e04a3ab38cb50bb005ce51bd5ffd029063e156 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 5 Aug 2025 09:52:05 +0200 Subject: [PATCH 7/9] Move followRef helper function to the bottom of the file --- packages/db/src/query/compiler/index.ts | 132 ++++++++++++------------ 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 7813cab76..acbe3e01f 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -40,72 +40,6 @@ export interface CompilationResult { collectionWhereClauses: Map> } -function getRefFromAlias( - query: QueryIR, - alias: string -): CollectionRef | QueryRef | void { - if (query.from.alias === alias) { - return query.from - } - - for (const join of query.join || []) { - if (join.from.alias === alias) { - return join.from - } - } -} - -/** - * Follows the given reference in a query - * until its finds the root field the reference points to. - * @returns The collection, its alias, and the path to the root field in this collection - */ -export function followRef( - query: QueryIR, - ref: PropRef, - collection: Collection -): { collection: Collection; path: Array } | void { - if (ref.path.length === 0) { - return - } - - if (ref.path.length === 1) { - // This field should be part of this collection - const field = ref.path[0]! - // is it part of the select clause? - if (query.select) { - const selectedField = query.select[field] - if (selectedField && selectedField.type === `ref`) { - return followRef(query, selectedField, collection) - } - } - - // Either this field is not part of the select clause - // and thus it must be part of the collection itself - // or it is part of the select but is not a reference - // so we can stop here and don't have to follow it - return { collection, path: [field] } - } - - if (ref.path.length > 1) { - // This is a nested field - const [alias, ...rest] = ref.path - const aliasRef = getRefFromAlias(query, alias!) - if (!aliasRef) { - return - } - - if (aliasRef.type === `queryRef`) { - return followRef(aliasRef.query, new PropRef(rest), collection) - } else { - // This is a reference to a collection - // we can't follow it further - // so the field must be on the collection itself - return { collection: aliasRef.collection, path: rest } - } - } -} - /** * Compiles a query2 IR into a D2 pipeline * @param rawQuery The query IR to compile @@ -478,3 +412,69 @@ function mapNestedQueries( } } } + +function getRefFromAlias( + query: QueryIR, + alias: string +): CollectionRef | QueryRef | void { + if (query.from.alias === alias) { + return query.from + } + + for (const join of query.join || []) { + if (join.from.alias === alias) { + return join.from + } + } +} + +/** + * Follows the given reference in a query + * until its finds the root field the reference points to. + * @returns The collection, its alias, and the path to the root field in this collection + */ +export function followRef( + query: QueryIR, + ref: PropRef, + collection: Collection +): { collection: Collection; path: Array } | void { + if (ref.path.length === 0) { + return + } + + if (ref.path.length === 1) { + // This field should be part of this collection + const field = ref.path[0]! + // is it part of the select clause? + if (query.select) { + const selectedField = query.select[field] + if (selectedField && selectedField.type === `ref`) { + return followRef(query, selectedField, collection) + } + } + + // Either this field is not part of the select clause + // and thus it must be part of the collection itself + // or it is part of the select but is not a reference + // so we can stop here and don't have to follow it + return { collection, path: [field] } + } + + if (ref.path.length > 1) { + // This is a nested field + const [alias, ...rest] = ref.path + const aliasRef = getRefFromAlias(query, alias!) + if (!aliasRef) { + return + } + + if (aliasRef.type === `queryRef`) { + return followRef(aliasRef.query, new PropRef(rest), collection) + } else { + // This is a reference to a collection + // we can't follow it further + // so the field must be on the collection itself + return { collection: aliasRef.collection, path: rest } + } + } +} From 69bb8ba88b23bc1770ec940b31e3e8ff08f33d38 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 13 Aug 2025 10:05:13 +0200 Subject: [PATCH 8/9] Changeset --- .changeset/sour-emus-count.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/sour-emus-count.md diff --git a/.changeset/sour-emus-count.md b/.changeset/sour-emus-count.md new file mode 100644 index 000000000..dcceba53e --- /dev/null +++ b/.changeset/sour-emus-count.md @@ -0,0 +1,6 @@ +--- +"@tanstack/db-ivm": patch +"@tanstack/db": patch +--- + +Optimize joins to use index on the join key when available. From 714057380783380af87143377ae01d22c293aaac Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 13 Aug 2025 10:11:47 +0200 Subject: [PATCH 9/9] Use central error class --- packages/db/src/errors.ts | 6 ++++++ packages/db/src/query/compiler/joins.ts | 9 +++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index 3ae8cc804..43fb097f7 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -377,6 +377,12 @@ export class UnknownFunctionError extends QueryCompilationError { } } +export class JoinCollectionNotFoundError extends QueryCompilationError { + constructor(collectionId: string) { + super(`Collection "${collectionId}" not found during compilation of join`) + } +} + // JOIN Operation Errors export class JoinError extends TanStackDBError { constructor(message: string) { diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 08a19f4b5..3f2cbffb2 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -10,6 +10,7 @@ import { InvalidJoinConditionSameTableError, InvalidJoinConditionTableMismatchError, InvalidJoinConditionWrongTablesError, + JoinCollectionNotFoundError, UnsupportedJoinSourceTypeError, UnsupportedJoinTypeError, } from "../../errors.js" @@ -119,15 +120,11 @@ function processJoin( const joinedCollection = collections[joinedCollectionId] if (!mainCollection) { - throw new Error( - `Internal error in join. Collection not found for ${mainTableId}` - ) + throw new JoinCollectionNotFoundError(mainTableId) } if (!joinedCollection) { - throw new Error( - `Internal error in join. Collection not found for ${joinedCollectionId}` - ) + throw new JoinCollectionNotFoundError(joinedCollectionId) } const { activeCollection, lazyCollection } = getActiveAndLazyCollections(