Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/evil-bats-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fix bug that caused initial query results to have too few rows when query has orderBy, limit, and where clauses.
51 changes: 30 additions & 21 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ export class CollectionConfigBuilder<
OrderByOptimizationInfo
> = {}

private collectionReady = false

constructor(
private readonly config: LiveQueryCollectionConfig<TContext, TResult>
) {
Expand All @@ -80,10 +78,6 @@ export class CollectionConfigBuilder<
this.compileBasePipeline()
}

isCollectionReady() {
return this.collectionReady
}

getConfig(): CollectionConfig<TResult> {
return {
id: this.id,
Expand Down Expand Up @@ -132,8 +126,6 @@ export class CollectionConfigBuilder<
// Mark the collection as ready after the first successful run
if (ready && this.allCollectionsReady()) {
markReady()
// Remember that we marked the collection as ready
this.collectionReady = true
}
}
}
Expand All @@ -158,10 +150,13 @@ export class CollectionConfigBuilder<
syncState
)

this.subscribeToAllCollections(config, fullSyncState)
const loadMoreDataCallbacks = this.subscribeToAllCollections(
config,
fullSyncState
)

// Initial run
this.maybeRunGraph(config, fullSyncState)
// Initial run with callback to load more data if needed
this.maybeRunGraph(config, fullSyncState, loadMoreDataCallbacks)

// Return the unsubscribe function
return () => {
Expand Down Expand Up @@ -315,19 +310,33 @@ export class CollectionConfigBuilder<
config: Parameters<SyncConfig<TResult>[`sync`]>[0],
syncState: FullSyncState
) {
Object.entries(this.collections).forEach(([collectionId, collection]) => {
const collectionSubscriber = new CollectionSubscriber(
collectionId,
collection,
config,
syncState,
this
)
collectionSubscriber.subscribe()
})
const loaders = Object.entries(this.collections).map(
([collectionId, collection]) => {
const collectionSubscriber = new CollectionSubscriber(
collectionId,
collection,
config,
syncState,
this
)
collectionSubscriber.subscribe()

const loadMore =
collectionSubscriber.loadMoreIfNeeded.bind(collectionSubscriber)

return loadMore
}
)

const loadMoreDataCallback = () => {
loaders.map((loader) => loader()) // .every((doneLoading) => doneLoading)
return true
}

// Mark the collections as subscribed in the sync state
syncState.subscribedToAllCollections = true

return loadMoreDataCallback
}
}

Expand Down
69 changes: 44 additions & 25 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ export class CollectionSubscriber<
changes,
this.collection.config.getKey
)
if (sentChanges > 0 || !this.collectionConfigBuilder.isCollectionReady()) {
// Only run the graph if we sent any changes
// otherwise we may get into an infinite loop
// trying to load more data for the orderBy query
// when there's no more data in the collection
// EXCEPTION: if the collection is not yet ready
// we need to run it even if there are no changes
// in order for the collection to be marked as ready
this.collectionConfigBuilder.maybeRunGraph(
this.config,
this.syncState,
callback
)
}

// Do not provide the callback that loads more data
// if there's no more data to load
// otherwise we end up in an infinite loop trying to load more data
const dataLoader = sentChanges > 0 ? callback : undefined

// We need to call `maybeRunGraph` even if there's no data to load
// because we need to mark the collection as ready if it's not already
// and that's only done in `maybeRunGraph`
this.collectionConfigBuilder.maybeRunGraph(
this.config,
this.syncState,
dataLoader
)
}

// Wraps the sendChangesToPipeline function
Expand Down Expand Up @@ -229,7 +229,7 @@ export class CollectionSubscriber<
private subscribeToOrderedChanges(
whereExpression: BasicExpression<boolean> | undefined
) {
const { offset, limit, comparator } =
const { offset, limit, comparator, dataNeeded } =
this.collectionConfigBuilder.optimizableOrderByCollections[
this.collectionId
]!
Expand All @@ -245,11 +245,18 @@ export class CollectionSubscriber<
// and filter out changes that are bigger than the biggest value we've sent so far
// because they can't affect the topK
const splittedChanges = splitUpdates(changes)
const filteredChanges = filterChangesSmallerOrEqualToMax(
splittedChanges,
comparator,
this.biggest
)
let filteredChanges = splittedChanges
if (dataNeeded!() === 0) {
// If the topK is full [..., maxSentValue] then we do not need to send changes > maxSentValue
// because they can never make it into the topK.
// However, if the topK isn't full yet, we need to also send changes > maxSentValue
// because they will make it into the topK
filteredChanges = filterChangesSmallerOrEqualToMax(
splittedChanges,
comparator,
this.biggest
)
}
this.sendChangesToPipeline(
filteredChanges,
this.loadMoreIfNeeded.bind(this)
Expand All @@ -268,11 +275,19 @@ export class CollectionSubscriber<
// This function is called by maybeRunGraph
// after each iteration of the query pipeline
// to ensure that the orderBy operator has enough data to work with
private loadMoreIfNeeded() {
const { dataNeeded } =
loadMoreIfNeeded() {
const orderByInfo =
this.collectionConfigBuilder.optimizableOrderByCollections[
this.collectionId
]!
]

if (!orderByInfo) {
// This query has no orderBy operator
// so there's no data to load, just return true
return true
}

const { dataNeeded } = orderByInfo

if (!dataNeeded) {
// This should never happen because the topK operator should always set the size callback
Expand All @@ -285,12 +300,15 @@ export class CollectionSubscriber<
// `dataNeeded` probes the orderBy operator to see if it needs more data
// if it needs more data, it returns the number of items it needs
const n = dataNeeded()
let noMoreNextItems = false
if (n > 0) {
this.loadNextItems(n)
const loadedItems = this.loadNextItems(n)
noMoreNextItems = loadedItems === 0
}

// Indicate that we're done loading data if we didn't need to load more data
return n === 0
// or there's no more data to load
return n === 0 || noMoreNextItems
}

private sendChangesToPipelineWithTracking(
Expand Down Expand Up @@ -322,6 +340,7 @@ export class CollectionSubscriber<
return { type: `insert`, key, value: this.collection.get(key) }
})
this.sendChangesToPipelineWithTracking(nextInserts)
return nextInserts.length
}

private getWhereClauseFromAlias(
Expand Down
95 changes: 95 additions & 0 deletions packages/db/tests/query/order-by.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,57 @@ function createOrderByTests(autoIndex: `off` | `eager`): void {
])
})

it(`applies incremental insert of a new row inside the topK but after max sent value correctly`, async () => {
const collection = createLiveQueryCollection((q) =>
q
.from({ employees: employeesCollection })
.orderBy(({ employees }) => employees.salary, `asc`)
.offset(1)
.limit(10)
.select(({ employees }) => ({
id: employees.id,
name: employees.name,
salary: employees.salary,
}))
)
await collection.preload()

const results = Array.from(collection.values())

expect(results.map((r) => r.salary)).toEqual([
52_000, 55_000, 60_000, 65_000,
])

// Now insert a new employee with highest salary
// this should now become part of the topK because
// the topK isn't full yet, so even though it's after the max sent value
// it should still be part of the topK
const newEmployee = {
id: 6,
name: `George`,
department_id: 1,
salary: 72_000,
hire_date: `2023-01-01`,
}

employeesCollection.utils.begin()
employeesCollection.utils.write({
type: `insert`,
value: newEmployee,
})
employeesCollection.utils.commit()

const newResults = Array.from(collection.values())

expect(newResults.map((r) => [r.id, r.salary])).toEqual([
[5, 52_000],
[3, 55_000],
[2, 60_000],
[4, 65_000],
[6, 72_000],
])
})

it(`applies incremental insert of a new row after the topK correctly`, async () => {
const collection = createLiveQueryCollection((q) =>
q
Expand Down Expand Up @@ -680,6 +731,50 @@ function createOrderByTests(autoIndex: `off` | `eager`): void {
expect(results).toHaveLength(3) // Alice (50000) and Eve (52000) filtered out
expect(results.map((r) => r.salary)).toEqual([55000, 60000, 65000])
})

it(`orders correctly with a limit`, async () => {
const collection = createLiveQueryCollection((q) =>
q
.from({ employees: employeesCollection })
.where(({ employees }) => gt(employees.salary, 50000))
.orderBy(({ employees }) => employees.salary, `asc`)
.limit(2)
.select(({ employees }) => ({
id: employees.id,
name: employees.name,
salary: employees.salary,
}))
)
await collection.preload()

const results = Array.from(collection.values())

expect(results).toHaveLength(2)
expect(results.map((r) => r.salary)).toEqual([52000, 55000])
expect(results.map((r) => r.id)).toEqual([5, 3])
})

it(`returns a single row with limit 1`, async () => {
const collection = createLiveQueryCollection((q) =>
q
.from({ employees: employeesCollection })
.where(({ employees }) => gt(employees.salary, 50000))
.orderBy(({ employees }) => employees.salary, `asc`)
.limit(1)
.select(({ employees }) => ({
id: employees.id,
name: employees.name,
salary: employees.salary,
}))
)
await collection.preload()

const results = Array.from(collection.values())

expect(results).toHaveLength(1)
expect(results[0]!.id).toEqual(5)
expect(results[0]!.salary).toEqual(52000)
})
})

describe(`Fractional Index Behavior`, () => {
Expand Down
Loading