From 0b1425945979e9261cddca14c744e8a3303fccf7 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 27 Aug 2025 08:20:42 +0100 Subject: [PATCH 1/2] add tests for combining limit and where --- packages/db/tests/query/order-by.test.ts | 49 ++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index d48efd3a5..2b82ed7c3 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -383,6 +383,55 @@ function createOrderByTests(autoIndex: `off` | `eager`): void { expect(results.map((r) => r.salary)).toEqual([60000, 55000]) }) + it(`applies a where clause with a limit and an orderBy`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ employees: employeesCollection }) + .orderBy(({ employees }) => employees.salary, `asc`) + .where(({ employees }) => gt(employees.salary, 50000)) + .limit(2) + .select(({ employees }) => ({ + id: employees.id, + name: employees.name, + salary: employees.salary, + })) + ) + await collection.preload() + + const results = Array.from(collection.values()) + + console.log(results) + + expect(results).toHaveLength(2) + expect(results.map((r) => r.salary)).toEqual([52000, 55000]) + expect(results.map((r) => r.id)).toEqual([5, 3]) + }) + + it(`fetches single row with an orderBy and limit 1`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ employees: employeesCollection }) + .orderBy(({ employees }) => employees.id, `asc`) + .where(({ employees }) => eq(employees.id, 2)) + .limit(1) + .select(({ employees }) => ({ + id: employees.id, + name: employees.name, + salary: employees.salary, + })) + ) + await collection.preload() + + const results = Array.from(collection.values()) + + console.log(results) + + expect(results).toHaveLength(1) + expect(results[0]!.id).toEqual(2) + expect(results[0]!.name).toEqual(`Bob`) + expect(results[0]!.salary).toEqual(60000) + }) + it(`throws error when limit/offset used without orderBy`, () => { expect(() => { createLiveQueryCollection((q) => From e4a016670caac99946075b65753363da88d5c9c4 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Wed, 27 Aug 2025 08:44:17 +0100 Subject: [PATCH 2/2] draft fix --- .../src/query/live/collection-subscriber.ts | 96 ++++++++++++++++--- 1 file changed, 82 insertions(+), 14 deletions(-) diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 50e454ce5..929364e45 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -245,8 +245,34 @@ export class CollectionSubscriber< // and filter out changes that are bigger than the biggest value we've sent so far // because they can't affect the topK const splittedChanges = splitUpdates(changes) + + // Apply WHERE clause filtering to incoming changes if needed + const whereFilteredChanges = whereExpression + ? Array.from(splittedChanges).filter((change) => { + try { + // Convert WHERE expression to collection format and apply filter + const collectionAlias = findCollectionAlias( + this.collectionId, + this.collectionConfigBuilder.query + ) + const convertedWhereClause = collectionAlias + ? convertToBasicExpression(whereExpression, collectionAlias) + : null + + if (convertedWhereClause) { + const filterFn = + createFilterFunctionFromExpression(convertedWhereClause) + return filterFn(change.value) + } + return false + } catch { + return false + } + }) + : splittedChanges + const filteredChanges = filterChangesSmallerOrEqualToMax( - splittedChanges, + whereFilteredChanges, comparator, this.biggest ) @@ -257,10 +283,10 @@ export class CollectionSubscriber< } // Subscribe to changes and only send changes that are smaller than the biggest value we've sent so far - // values that are bigger don't need to be sent because they can't affect the topK - const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, { - whereExpression, - }) + // Values that are bigger don't need to be sent because they can't affect the topK + // Note: Don't pass whereExpression to subscribeChanges since we handle WHERE filtering manually + // in both loadNextItems (for initial load) and sendChangesInRange (for live updates) + const unsubscribe = this.collection.subscribeChanges(sendChangesInRange, {}) return unsubscribe } @@ -304,8 +330,8 @@ export class CollectionSubscriber< this.sendChangesToPipeline(trackedChanges, this.loadMoreIfNeeded.bind(this)) } - // Loads the next `n` items from the collection - // starting from the biggest item it has sent + // Loads the next `n` items from the collection starting from the biggest item sent + // Applies WHERE clause filtering if present to ensure LIMIT is satisfied with correct data private loadNextItems(n: number) { const { valueExtractorForRawRow, index } = this.collectionConfigBuilder.optimizableOrderByCollections[ @@ -315,13 +341,55 @@ export class CollectionSubscriber< const biggestSentValue = biggestSentRow ? valueExtractorForRawRow(biggestSentRow) : biggestSentRow - // Take the `n` items after the biggest sent value - const nextOrderedKeys = index.take(n, biggestSentValue) - const nextInserts: Array> = - nextOrderedKeys.map((key) => { - return { type: `insert`, key, value: this.collection.get(key) } - }) - this.sendChangesToPipelineWithTracking(nextInserts) + + // Get WHERE clause for filtering if it exists + const collectionAlias = findCollectionAlias( + this.collectionId, + this.collectionConfigBuilder.query + ) + const whereClause = this.getWhereClauseFromAlias(collectionAlias) + + // Convert the WHERE clause to a collection-compatible format + const convertedWhereClause = + whereClause && collectionAlias + ? convertToBasicExpression(whereClause, collectionAlias) + : null + + const filterFn = convertedWhereClause + ? createFilterFunctionFromExpression(convertedWhereClause) + : null + + const filteredInserts: Array> = [] + let currentBiggestValue = biggestSentValue + + // Keep loading until we have enough filtered results or no more data available + while (filteredInserts.length < n) { + // Load batch of items from index + const batchSize = Math.max(n - filteredInserts.length, 10) + const nextOrderedKeys = index.take(batchSize, currentBiggestValue) + + if (nextOrderedKeys.length === 0) { + // No more items available in the index + break + } + + // Process each item + for (const key of nextOrderedKeys) { + const value = this.collection.get(key) + if (value !== undefined) { + // Update tracking for next iteration + const currentItemValue = valueExtractorForRawRow(value) + currentBiggestValue = currentItemValue + + // Apply WHERE clause filter if it exists + if (!filterFn || filterFn(value)) { + filteredInserts.push({ type: `insert`, key, value }) + } + } + } + } + + this.sendChangesToPipelineWithTracking(filteredInserts) } private getWhereClauseFromAlias(