Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,34 @@ export class CollectionSubscriber<
// and filter out changes that are bigger than the biggest value we've sent so far
// because they can't affect the topK
const splittedChanges = splitUpdates(changes)

// Apply WHERE clause filtering to incoming changes if needed
const whereFilteredChanges = whereExpression
? Array.from(splittedChanges).filter((change) => {
try {
// Convert WHERE expression to collection format and apply filter
const collectionAlias = findCollectionAlias(
this.collectionId,
this.collectionConfigBuilder.query
)
const convertedWhereClause = collectionAlias
? convertToBasicExpression(whereExpression, collectionAlias)
: null

if (convertedWhereClause) {
const filterFn =
createFilterFunctionFromExpression(convertedWhereClause)
return filterFn(change.value)
}
return false
} catch {
return false
}
})
: splittedChanges

const filteredChanges = filterChangesSmallerOrEqualToMax(
splittedChanges,
whereFilteredChanges,
comparator,
this.biggest
)
Expand All @@ -257,10 +283,10 @@ export class CollectionSubscriber<
}

// Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far
// values that are bigger don't need to be sent because they can't affect the topK
const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, {
whereExpression,
})
// Values that are bigger don't need to be sent because they can't affect the topK
// Note: Don't pass whereExpression to subscribeChanges since we handle WHERE filtering manually
// in both loadNextItems (for initial load) and sendChangesInRange (for live updates)
const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, {})

return unsubscribe
}
Expand Down Expand Up @@ -304,8 +330,8 @@ export class CollectionSubscriber<
this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this))
}

// Loads the next `n` items from the collection
// starting from the biggest item it has sent
// Loads the next `n` items from the collection starting from the biggest item sent
// Applies WHERE clause filtering if present to ensure LIMIT is satisfied with correct data
private loadNextItems(n: number) {
const { valueExtractorForRawRow, index } =
this.collectionConfigBuilder.optimizableOrderByCollections[
Expand All @@ -315,13 +341,55 @@ export class CollectionSubscriber<
const biggestSentValue = biggestSentRow
? valueExtractorForRawRow(biggestSentRow)
: biggestSentRow
// Take the `n` items after the biggest sent value
const nextOrderedKeys = index.take(n, biggestSentValue)
const nextInserts: Array<ChangeMessage<any, string | number>> =
nextOrderedKeys.map((key) => {
return { type: `insert`, key, value: this.collection.get(key) }
})
this.sendChangesToPipelineWithTracking(nextInserts)

// Get WHERE clause for filtering if it exists
const collectionAlias = findCollectionAlias(
this.collectionId,
this.collectionConfigBuilder.query
)
const whereClause = this.getWhereClauseFromAlias(collectionAlias)

// Convert the WHERE clause to a collection-compatible format
const convertedWhereClause =
whereClause && collectionAlias
? convertToBasicExpression(whereClause, collectionAlias)
: null

const filterFn = convertedWhereClause
? createFilterFunctionFromExpression(convertedWhereClause)
: null

const filteredInserts: Array<ChangeMessage<any, string | number>> = []
let currentBiggestValue = biggestSentValue

// Keep loading until we have enough filtered results or no more data available
while (filteredInserts.length < n) {
// Load batch of items from index
const batchSize = Math.max(n - filteredInserts.length, 10)
const nextOrderedKeys = index.take(batchSize, currentBiggestValue)

if (nextOrderedKeys.length === 0) {
// No more items available in the index
break
}

// Process each item
for (const key of nextOrderedKeys) {
const value = this.collection.get(key)
if (value !== undefined) {
// Update tracking for next iteration
const currentItemValue = valueExtractorForRawRow(value)
currentBiggestValue = currentItemValue

// Apply WHERE clause filter if it exists
if (!filterFn || filterFn(value)) {
filteredInserts.push({ type: `insert`, key, value })
}
}
}
}

this.sendChangesToPipelineWithTracking(filteredInserts)
}

private getWhereClauseFromAlias(
Expand Down
49 changes: 49 additions & 0 deletions packages/db/tests/query/order-by.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,55 @@ function createOrderByTests(autoIndex: `off` | `eager`): void {
expect(results.map((r) => r.salary)).toEqual([60000, 55000])
})

it(`applies a where clause with a limit and an orderBy`, async () => {
const collection = createLiveQueryCollection((q) =>
q
.from({ employees: employeesCollection })
.orderBy(({ employees }) => employees.salary, `asc`)
.where(({ employees }) => gt(employees.salary, 50000))
.limit(2)
.select(({ employees }) => ({
id: employees.id,
name: employees.name,
salary: employees.salary,
}))
)
await collection.preload()

const results = Array.from(collection.values())

console.log(results)

expect(results).toHaveLength(2)
expect(results.map((r) => r.salary)).toEqual([52000, 55000])
expect(results.map((r) => r.id)).toEqual([5, 3])
})

it(`fetches single row with an orderBy and limit 1`, async () => {
const collection = createLiveQueryCollection((q) =>
q
.from({ employees: employeesCollection })
.orderBy(({ employees }) => employees.id, `asc`)
.where(({ employees }) => eq(employees.id, 2))
.limit(1)
.select(({ employees }) => ({
id: employees.id,
name: employees.name,
salary: employees.salary,
}))
)
await collection.preload()

const results = Array.from(collection.values())

console.log(results)

expect(results).toHaveLength(1)
expect(results[0]!.id).toEqual(2)
expect(results[0]!.name).toEqual(`Bob`)
expect(results[0]!.salary).toEqual(60000)
})

it(`throws error when limit/offset used without orderBy`, () => {
expect(() => {
createLiveQueryCollection((q) =>
Expand Down
Loading