Skip to content
6 changes: 6 additions & 0 deletions .changeset/sour-emus-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@tanstack/db-ivm": patch
"@tanstack/db": patch
---

Optimize joins to use index on the join key when available.
1 change: 1 addition & 0 deletions packages/db-ivm/src/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./pipe.js"
export * from "./map.js"
export * from "./tap.js"
export * from "./filter.js"
export * from "./negate.js"
export * from "./concat.js"
Expand Down
53 changes: 53 additions & 0 deletions packages/db-ivm/src/operators/tap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { DifferenceStreamWriter, LinearUnaryOperator } from "../graph.js"
import { StreamBuilder } from "../d2.js"
import type { IStreamBuilder, PipedOperator } from "../types.js"
import type { DifferenceStreamReader } from "../graph.js"
import type { MultiSet } from "../multiset.js"

/**
* Operator that applies a function to each element in the input stream
*/
export class TapOperator<T> extends LinearUnaryOperator<T, T> {
#f: (data: T) => void

constructor(
id: number,
inputA: DifferenceStreamReader<T>,
output: DifferenceStreamWriter<T>,
f: (data: T) => void
) {
super(id, inputA, output)
this.#f = f
}

inner(collection: MultiSet<T>): MultiSet<T> {
return collection.map((data) => {
this.#f(data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be useful in future to pass the multiplicity to the callback so that it's aware if it's an insert/delete. Not important for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this later when we need it :-)

return data
})
}
}

/**
* Invokes a function for each element in the input stream.
* This operator doesn't modify the stream and is used to perform side effects.
* @param f - The function to invoke on each element
* @returns The input stream
*/
export function tap<T>(f: (data: T) => void): PipedOperator<T, T> {
return (stream: IStreamBuilder<T>): IStreamBuilder<T> => {
const output = new StreamBuilder<T>(
stream.graph,
new DifferenceStreamWriter<T>()
)
const operator = new TapOperator<T>(
stream.graph.getNextOperatorId(),
stream.connectReader(),
output.writer,
f
)
stream.graph.addOperator(operator)
stream.graph.addStream(output.connectReader())
return output
}
}
4 changes: 2 additions & 2 deletions packages/db/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1397,8 +1397,8 @@ export class CollectionImpl<

/**
* Creates an index on a collection for faster queries.
* Indexes significantly improve query performance by allowing binary search
* and range queries instead of full scans.
* Indexes significantly improve query performance by allowing constant time lookups
* and logarithmic time range queries instead of full scans.
*
* @template TResolver - The type of the index resolver (constructor or async loader)
* @param indexCallback - Function that extracts the indexed value from each item
Expand Down
6 changes: 6 additions & 0 deletions packages/db/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,12 @@ export class UnknownFunctionError extends QueryCompilationError {
}
}

export class JoinCollectionNotFoundError extends QueryCompilationError {
constructor(collectionId: string) {
super(`Collection "${collectionId}" not found during compilation of join`)
}
}

// JOIN Operation Errors
export class JoinError extends TanStackDBError {
constructor(message: string) {
Expand Down
82 changes: 51 additions & 31 deletions packages/db/src/indexes/auto-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,55 @@ export interface AutoIndexConfig {
autoIndex?: `off` | `eager`
}

function shouldAutoIndex(collection: CollectionImpl<any, any, any, any, any>) {
// Only proceed if auto-indexing is enabled
if (collection.config.autoIndex !== `eager`) {
return false
}

// Don't auto-index during sync operations
if (
collection.status === `loading` ||
collection.status === `initialCommit`
) {
return false
}

return true
}

export function ensureIndexForField<
T extends Record<string, any>,
TKey extends string | number,
>(
fieldName: string,
fieldPath: Array<string>,
collection: CollectionImpl<T, TKey, any, any, any>
) {
if (!shouldAutoIndex(collection)) {
return
}

// Check if we already have an index for this field
const existingIndex = Array.from(collection.indexes.values()).find((index) =>
index.matchesField(fieldPath)
)

if (existingIndex) {
return // Index already exists
}

// Create a new index for this field using the collection's createIndex method
try {
collection.createIndex((row) => (row as any)[fieldName], {
name: `auto_${fieldName}`,
indexType: BTreeIndex,
})
} catch (error) {
console.warn(`Failed to create auto-index for field "${fieldName}":`, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've used the debug package elsewhere for logging. We should maybe use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't actually change this. I took this piece of code from ensureIndexForExpression and moved it here. We could use debugLog but that would only log it in debug mode. I think this warning is useful also in non-debug mode to warn you that for some reason the index could not be created and thus the queries might be less efficient.

}
}

/**
* Analyzes a where expression and creates indexes for all simple operations on single fields
*/
Expand All @@ -16,44 +65,15 @@ export function ensureIndexForExpression<
expression: BasicExpression,
collection: CollectionImpl<T, TKey, any, any, any>
): void {
// Only proceed if auto-indexing is enabled
if (collection.config.autoIndex !== `eager`) {
return
}

// Don't auto-index during sync operations
if (
collection.status === `loading` ||
collection.status === `initialCommit`
) {
if (!shouldAutoIndex(collection)) {
return
}

// Extract all indexable expressions and create indexes for them
const indexableExpressions = extractIndexableExpressions(expression)

for (const { fieldName, fieldPath } of indexableExpressions) {
// Check if we already have an index for this field
const existingIndex = Array.from(collection.indexes.values()).find(
(index) => index.matchesField(fieldPath)
)

if (existingIndex) {
continue // Index already exists
}

// Create a new index for this field using the collection's createIndex method
try {
collection.createIndex((row) => (row as any)[fieldName], {
name: `auto_${fieldName}`,
indexType: BTreeIndex,
})
} catch (error) {
console.warn(
`Failed to create auto-index for field "${fieldName}":`,
error
)
}
ensureIndexForField(fieldName, fieldPath, collection)
}
}

Expand Down
108 changes: 103 additions & 5 deletions packages/db/src/query/compiler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
LimitOffsetRequireOrderByError,
UnsupportedFromTypeError,
} from "../../errors.js"
import { PropRef } from "../ir.js"
import { compileExpression } from "./evaluators.js"
import { processJoins } from "./joins.js"
import { processGroupBy } from "./group-by.js"
Expand All @@ -18,6 +19,8 @@ import type {
QueryIR,
QueryRef,
} from "../ir.js"
import type { LazyCollectionCallbacks } from "./joins.js"
import type { Collection } from "../../collection.js"
import type {
KeyedStream,
NamespacedAndKeyedStream,
Expand All @@ -29,6 +32,8 @@ import type { QueryCache, QueryMapping } from "./types.js"
* Result of query compilation including both the pipeline and collection-specific WHERE clauses
*/
export interface CompilationResult {
/** The ID of the main collection */
collectionId: string
/** The compiled query pipeline */
pipeline: ResultStream
/** Map of collection aliases to their WHERE clauses for index optimization */
Expand All @@ -46,6 +51,9 @@ export interface CompilationResult {
export function compileQuery(
rawQuery: QueryIR,
inputs: Record<string, KeyedStream>,
collections: Record<string, Collection<any, any, any, any, any>>,
callbacks: Record<string, LazyCollectionCallbacks>,
lazyCollections: Set<string>,
cache: QueryCache = new WeakMap(),
queryMapping: QueryMapping = new WeakMap()
): CompilationResult {
Expand All @@ -70,9 +78,16 @@ export function compileQuery(
const tables: Record<string, KeyedStream> = {}

// Process the FROM clause to get the main table
const { alias: mainTableAlias, input: mainInput } = processFrom(
const {
alias: mainTableAlias,
input: mainInput,
collectionId: mainCollectionId,
} = processFrom(
query.from,
allInputs,
collections,
callbacks,
lazyCollections,
cache,
queryMapping
)
Expand All @@ -96,10 +111,15 @@ export function compileQuery(
pipeline,
query.join,
tables,
mainCollectionId,
mainTableAlias,
allInputs,
cache,
queryMapping
queryMapping,
collections,
callbacks,
lazyCollections,
rawQuery
)
}

Expand Down Expand Up @@ -249,6 +269,7 @@ export function compileQuery(
const result = resultPipeline
// Cache the result before returning (use original query as key)
const compilationResult = {
collectionId: mainCollectionId,
pipeline: result,
collectionWhereClauses,
}
Expand All @@ -275,6 +296,7 @@ export function compileQuery(
const result = resultPipeline
// Cache the result before returning (use original query as key)
const compilationResult = {
collectionId: mainCollectionId,
pipeline: result,
collectionWhereClauses,
}
Expand All @@ -289,16 +311,19 @@ export function compileQuery(
function processFrom(
from: CollectionRef | QueryRef,
allInputs: Record<string, KeyedStream>,
collections: Record<string, Collection>,
callbacks: Record<string, LazyCollectionCallbacks>,
lazyCollections: Set<string>,
cache: QueryCache,
queryMapping: QueryMapping
): { alias: string; input: KeyedStream } {
): { alias: string; input: KeyedStream; collectionId: string } {
switch (from.type) {
case `collectionRef`: {
const input = allInputs[from.collection.id]
if (!input) {
throw new CollectionInputNotFoundError(from.collection.id)
}
return { alias: from.alias, input }
return { alias: from.alias, input, collectionId: from.collection.id }
}
case `queryRef`: {
// Find the original query for caching purposes
Expand All @@ -308,6 +333,9 @@ function processFrom(
const subQueryResult = compileQuery(
originalQuery,
allInputs,
collections,
callbacks,
lazyCollections,
cache,
queryMapping
)
Expand All @@ -324,7 +352,11 @@ function processFrom(
})
)

return { alias: from.alias, input: extractedInput }
return {
alias: from.alias,
input: extractedInput,
collectionId: subQueryResult.collectionId,
}
}
default:
throw new UnsupportedFromTypeError((from as any).type)
Expand Down Expand Up @@ -380,3 +412,69 @@ function mapNestedQueries(
}
}
}

function getRefFromAlias(
query: QueryIR,
alias: string
): CollectionRef | QueryRef | void {
if (query.from.alias === alias) {
return query.from
}

for (const join of query.join || []) {
if (join.from.alias === alias) {
return join.from
}
}
}

/**
* Follows the given reference in a query
* until its finds the root field the reference points to.
* @returns The collection, its alias, and the path to the root field in this collection
*/
export function followRef(
query: QueryIR,
ref: PropRef<any>,
collection: Collection
): { collection: Collection; path: Array<string> } | void {
if (ref.path.length === 0) {
return
}

if (ref.path.length === 1) {
// This field should be part of this collection
const field = ref.path[0]!
// is it part of the select clause?
if (query.select) {
const selectedField = query.select[field]
if (selectedField && selectedField.type === `ref`) {
return followRef(query, selectedField, collection)
}
}

// Either this field is not part of the select clause
// and thus it must be part of the collection itself
// or it is part of the select but is not a reference
// so we can stop here and don't have to follow it
return { collection, path: [field] }
}

if (ref.path.length > 1) {
// This is a nested field
const [alias, ...rest] = ref.path
const aliasRef = getRefFromAlias(query, alias!)
if (!aliasRef) {
return
}

if (aliasRef.type === `queryRef`) {
return followRef(aliasRef.query, new PropRef(rest), collection)
} else {
// This is a reference to a collection
// we can't follow it further
// so the field must be on the collection itself
return { collection: aliasRef.collection, path: rest }
}
}
}
Loading