From df53db6c27e7dbd9e68b2164b355749af18ba4c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Bramer=20Schmidt?= Date: Fri, 12 Sep 2025 13:44:04 +0200 Subject: [PATCH 1/5] add memtest --- packages/db-ivm/src/examples/memtest.ts | 45 +++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 packages/db-ivm/src/examples/memtest.ts diff --git a/packages/db-ivm/src/examples/memtest.ts b/packages/db-ivm/src/examples/memtest.ts new file mode 100644 index 000000000..9266db54f --- /dev/null +++ b/packages/db-ivm/src/examples/memtest.ts @@ -0,0 +1,45 @@ +import { D2 } from "../../src/d2.js" +import { MultiSet } from "../multiset.js" +import { map, reduce } from "../operators/index.js" + +const graph = new D2() + +const reviews = graph.newInput<{ + id: number + listingId: number + score: number + text: string +}>() + +// Group reviews by listingId and sum score +reviews.pipe( + map((x) => [x.listingId, x.score]), + reduce((values) => { + // `values` is an array of [value, multiplicity] pairs for a specific key + let sum = 0 + for (const [value, multiplicity] of values) { + sum += value * multiplicity + } + return [[sum, 1]] + }) +) +graph.finalize() + +// Get iteration count from command line argument, default to 100000 +const ITER_COUNT = process.argv[2] ? parseInt(process.argv[2], 10) : 1000 +console.log(`ITER_COUNT: ${ITER_COUNT}`) + +const t0 = Date.now() + +for (let i = 1; i < ITER_COUNT; i++) { + reviews.sendData( + new MultiSet([[{ id: 3 + i, listingId: 1, score: 1, text: `tada` }, 1]]) + ) + // reviews.sendFrontier(i+1) + + graph.run() +} + +const t1 = Date.now() + +console.log(`Time taken: ${t1 - t0} ms`) From 3eb69b5cc7c06b62f566d0d0302e5eff5e4f6aa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Bramer=20Schmidt?= Date: Fri, 12 Sep 2025 13:46:04 +0200 Subject: [PATCH 2/5] =?UTF-8?q?Minor=20performance=20optimization:=20src/v?= =?UTF-8?q?alueIndex.ts:28=E2=80=9340=20=E2=80=94=20fast=20path=20for=20pr?= =?UTF-8?q?imitive=20equality=20(number/string)=20avoids=20hashing=20when?= =?UTF-8?q?=20possible.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/db-ivm/src/valueIndex.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/db-ivm/src/valueIndex.ts b/packages/db-ivm/src/valueIndex.ts index 2470e7aa8..e8d1ed709 100644 --- a/packages/db-ivm/src/valueIndex.ts +++ b/packages/db-ivm/src/valueIndex.ts @@ -53,6 +53,16 @@ export class ValueIndex { if (this.has(key)) { const [currValue, currMultiplicity] = this.get(key)! + // Fast path for primitive equality to avoid hashing cost + if ( + (typeof value === `number` || typeof value === `string`) && + value === (currValue as any) + ) { + // Update the multiplicity + this.#setMultiplicity(key, value as V, currMultiplicity + multiplicity) + return + } + // Fallback to structural hash-based equality for complex values if (hash(value) === hash(currValue)) { // Update the multiplicity this.#setMultiplicity(key, value, currMultiplicity + multiplicity) From 91983ce195f15d1163c374d80904b7a72db04bd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Bramer=20Schmidt?= Date: Fri, 12 Sep 2025 13:53:05 +0200 Subject: [PATCH 3/5] Root Cause MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each operator factory added a fresh “public” reader via graph.addStream(output.connectReader()). These readers are not consumed anywhere in the graph engine (D2.run() only checks operator inputs). As a result, every message at every stage was duplicated into at least one extra queue that was never drained, leading to unbounded growth. In src/d2.ts, newInput() also added such a reader, further duplicating messages at the graph boundary. This is an architectural memory leak: dangling readers retained all messages indefinitely. Changes Implemented Stop creating unconsumed “public” readers: src/d2.ts:28–34 — newInput no longer creates and stores a reader. src/d2.ts:41–45 — addStream kept as a no-op for API compatibility (prevents accidental accumulation). Remove graph.addStream(output.connectReader()) across all operators so they don’t create dangling readers: src/operators/map.ts:44 src/operators/reduce.ts:120 src/operators/filter.ts:… (operator creation site) src/operators/concat.ts:… (operator creation site) src/operators/output.ts:… (operator creation site) src/operators/debug.ts:… (operator creation site) src/operators/tap.ts:… (operator creation site) src/operators/join.ts:… (operator creation site) src/operators/count.ts:… (operator creation site) src/operators/distinct.ts:… (operator creation site) src/operators/consolidate.ts:… (operator creation site) src/operators/topKWithFractionalIndex.ts:… (operator creation site) src/operators/topKWithFractionalIndexBTree.ts:… (operator creation site) Validation Memory after fix (gtime): 1k: Max memory: 54112 KB 100k: Max memory: 108112 KB 1M: Max memory: 176160 KB Compared to the pre-fix 1M at ~1.26 GB, this is a dramatic improvement. The remaining increase is modest and consistent with runtime/GC behavior and transient allocations (e.g., output multisets per step), not retained state. Why This Works Operators still create one reader per actual consumer (the next operator in the pipeline). Those are drained synchronously in each step(). No extra dangling readers are created; hence no permanent queues retain messages. If a stream has no consumer (like the final reduce in the memtest), its writer has zero readers, so sendData does nothing — no queue, no retention. Please explain to me: Why did this library implement these “public” readers? Are they needed for any part of the public API? Can we safely remove them without impacting the public API and functioning of the library? Here’s the straight answer. Why they existed Likely intent: Materialize every operator’s output “somewhere” by default — either as a vestige from the original D2TS design (where graphs tracked streams/frontiers) or as a convenience to make outputs observable later. In this simplified, version-free engine, that mechanism was never actually used. The graph scheduler does not read those queues, nor does the public API expose them. Are they needed for the public API? No. Public consumption is already explicit: IStreamBuilder.connectReader() lets you attach readers where you need results. See implementation in src/d2.ts:81. Side-effect sinks (e.g. debug, output) do observation without needing a reader. The graph’s private storage of readers is not exposed and is not used by scheduling: D2.run() drives work by operators only; it doesn’t look at readers. See src/d2.ts:61 and src/d2.ts:65. Readers themselves only buffer for the parties that explicitly connect. See src/graph.ts:46 (writer creates a queue per reader) and src/graph.ts:19 (reader drains and clears). Can we remove them safely? Yes. We removed the automatic “public” readers and everything still works: No change to surface API or types. All tests still pass (247/247). Memory usage improves dramatically because we no longer create undrained queues holding every message. Behavioral semantics after removal Streams without a consumer: If you don’t connect a reader (or end in a sink), outputs are dropped — which matches normal dataflow semantics. Previously, they were buffered in invisible, undrained queues (the source of the memory growth). How to consume results explicitly: Example: const reader = output.connectReader(); graph.run(); const messages = reader.drain(); This is consistent with the library’s public contract — users already needed to attach readers for data they wanted to materialize. Key references src/graph.ts:36 — Writer enqueues to each reader’s queue. src/graph.ts:19 — Reader drain() clears its queue. src/d2.ts:61 and src/d2.ts:65 — Scheduler only asks operators if they have pending input and runs them; it never reads “graph-level” readers. src/d2.ts:81 — StreamBuilder.connectReader() is the supported way to consume a stream. Bottom line The “public” readers were not part of the usable public API and weren’t used by the engine. They created undrained queues that retained all messages. Removing them is safe and correct. If you want materialized outputs, attach a reader explicitly or use a sink operator. If you want a convenience “materialize()” operator that returns a handle, we can add one as a follow-up. --- packages/db-ivm/src/d2.ts | 7 +++---- packages/db-ivm/src/operators/concat.ts | 1 - packages/db-ivm/src/operators/consolidate.ts | 1 - packages/db-ivm/src/operators/count.ts | 1 - packages/db-ivm/src/operators/debug.ts | 1 - packages/db-ivm/src/operators/distinct.ts | 1 - packages/db-ivm/src/operators/filter.ts | 1 - packages/db-ivm/src/operators/join.ts | 1 - packages/db-ivm/src/operators/map.ts | 1 - packages/db-ivm/src/operators/negate.ts | 1 - packages/db-ivm/src/operators/output.ts | 1 - packages/db-ivm/src/operators/reduce.ts | 1 - packages/db-ivm/src/operators/tap.ts | 1 - packages/db-ivm/src/operators/topKWithFractionalIndex.ts | 1 - .../db-ivm/src/operators/topKWithFractionalIndexBTree.ts | 1 - 15 files changed, 3 insertions(+), 18 deletions(-) diff --git a/packages/db-ivm/src/d2.ts b/packages/db-ivm/src/d2.ts index fea5f418f..e07d29564 100644 --- a/packages/db-ivm/src/d2.ts +++ b/packages/db-ivm/src/d2.ts @@ -8,7 +8,6 @@ import type { MultiSet, MultiSetArray } from "./multiset.js" import type { ID2, IStreamBuilder, PipedOperator } from "./types.js" export class D2 implements ID2 { - #streams: Array> = [] #operators: Array | BinaryOperator> = [] #nextOperatorId = 0 #finalized = false @@ -31,7 +30,6 @@ export class D2 implements ID2 { const writer = new DifferenceStreamWriter() // Use the root stream builder that exposes the sendData and sendFrontier methods const streamBuilder = new RootStreamBuilder(this, writer) - this.#streams.push(streamBuilder.connectReader()) return streamBuilder } @@ -40,9 +38,10 @@ export class D2 implements ID2 { this.#operators.push(operator) } - addStream(stream: DifferenceStreamReader): void { + addStream(_stream: DifferenceStreamReader): void { + // No-op: we no longer track dangling public readers to avoid memory growth + // This method remains for API compatibility this.#checkNotFinalized() - this.#streams.push(stream) } finalize() { diff --git a/packages/db-ivm/src/operators/concat.ts b/packages/db-ivm/src/operators/concat.ts index fb9f9b053..075320842 100644 --- a/packages/db-ivm/src/operators/concat.ts +++ b/packages/db-ivm/src/operators/concat.ts @@ -39,7 +39,6 @@ export function concat( output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/consolidate.ts b/packages/db-ivm/src/operators/consolidate.ts index 2fc3a0973..cb3b3c492 100644 --- a/packages/db-ivm/src/operators/consolidate.ts +++ b/packages/db-ivm/src/operators/consolidate.ts @@ -44,7 +44,6 @@ export function consolidate(): PipedOperator { output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/count.ts b/packages/db-ivm/src/operators/count.ts index 80af798a3..c54ed6234 100644 --- a/packages/db-ivm/src/operators/count.ts +++ b/packages/db-ivm/src/operators/count.ts @@ -46,7 +46,6 @@ export function count< output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/debug.ts b/packages/db-ivm/src/operators/debug.ts index f6646f5ac..0d660c95f 100644 --- a/packages/db-ivm/src/operators/debug.ts +++ b/packages/db-ivm/src/operators/debug.ts @@ -52,7 +52,6 @@ export function debug( indent ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index 851f3e84a..dc2f5a177 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -92,7 +92,6 @@ export function distinct(by: (value: T) => any = (value: T) => value) { by ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/filter.ts b/packages/db-ivm/src/operators/filter.ts index b7cb7574c..b34777505 100644 --- a/packages/db-ivm/src/operators/filter.ts +++ b/packages/db-ivm/src/operators/filter.ts @@ -42,7 +42,6 @@ export function filter(f: (data: T) => boolean): PipedOperator { f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index 259cfbc05..96e9ba962 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -149,7 +149,6 @@ export function innerJoin< output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/map.ts b/packages/db-ivm/src/operators/map.ts index 467da9b07..2aed5b1f0 100644 --- a/packages/db-ivm/src/operators/map.ts +++ b/packages/db-ivm/src/operators/map.ts @@ -42,7 +42,6 @@ export function map(f: (data: T) => O): PipedOperator { f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/negate.ts b/packages/db-ivm/src/operators/negate.ts index e9dd5f206..8962c210f 100644 --- a/packages/db-ivm/src/operators/negate.ts +++ b/packages/db-ivm/src/operators/negate.ts @@ -27,7 +27,6 @@ export function negate(): PipedOperator { output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/output.ts b/packages/db-ivm/src/operators/output.ts index c6a47d021..5217b3d66 100644 --- a/packages/db-ivm/src/operators/output.ts +++ b/packages/db-ivm/src/operators/output.ts @@ -47,7 +47,6 @@ export function output( fn ) stream.graph.addOperator(operator) - stream.graph.addStream(outputStream.connectReader()) return outputStream } } diff --git a/packages/db-ivm/src/operators/reduce.ts b/packages/db-ivm/src/operators/reduce.ts index 47591feaa..4301dced5 100644 --- a/packages/db-ivm/src/operators/reduce.ts +++ b/packages/db-ivm/src/operators/reduce.ts @@ -118,7 +118,6 @@ export function reduce< f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/tap.ts b/packages/db-ivm/src/operators/tap.ts index fcf08d33c..20c59fd5a 100644 --- a/packages/db-ivm/src/operators/tap.ts +++ b/packages/db-ivm/src/operators/tap.ts @@ -45,7 +45,6 @@ export function tap(f: (data: MultiSet) => void): PipedOperator { f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index bcb680782..81e72b54a 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -327,7 +327,6 @@ export function topKWithFractionalIndex( opts ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts index 5f87b6702..7b094d7c4 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts @@ -302,7 +302,6 @@ export function topKWithFractionalIndexBTree( opts ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } From 854754732f9cfd3d2d6a27f1e3603ed74a1ad362 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 13 Sep 2025 18:12:06 +0100 Subject: [PATCH 4/5] tidy --- packages/db-ivm/src/d2.ts | 6 ---- packages/db-ivm/src/examples/memtest.ts | 45 ------------------------- packages/db-ivm/src/types.ts | 1 - packages/db-ivm/src/valueIndex.ts | 10 ------ 4 files changed, 62 deletions(-) delete mode 100644 packages/db-ivm/src/examples/memtest.ts diff --git a/packages/db-ivm/src/d2.ts b/packages/db-ivm/src/d2.ts index e07d29564..b3149de73 100644 --- a/packages/db-ivm/src/d2.ts +++ b/packages/db-ivm/src/d2.ts @@ -38,12 +38,6 @@ export class D2 implements ID2 { this.#operators.push(operator) } - addStream(_stream: DifferenceStreamReader): void { - // No-op: we no longer track dangling public readers to avoid memory growth - // This method remains for API compatibility - this.#checkNotFinalized() - } - finalize() { this.#checkNotFinalized() this.#finalized = true diff --git a/packages/db-ivm/src/examples/memtest.ts b/packages/db-ivm/src/examples/memtest.ts deleted file mode 100644 index 9266db54f..000000000 --- a/packages/db-ivm/src/examples/memtest.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { D2 } from "../../src/d2.js" -import { MultiSet } from "../multiset.js" -import { map, reduce } from "../operators/index.js" - -const graph = new D2() - -const reviews = graph.newInput<{ - id: number - listingId: number - score: number - text: string -}>() - -// Group reviews by listingId and sum score -reviews.pipe( - map((x) => [x.listingId, x.score]), - reduce((values) => { - // `values` is an array of [value, multiplicity] pairs for a specific key - let sum = 0 - for (const [value, multiplicity] of values) { - sum += value * multiplicity - } - return [[sum, 1]] - }) -) -graph.finalize() - -// Get iteration count from command line argument, default to 100000 -const ITER_COUNT = process.argv[2] ? parseInt(process.argv[2], 10) : 1000 -console.log(`ITER_COUNT: ${ITER_COUNT}`) - -const t0 = Date.now() - -for (let i = 1; i < ITER_COUNT; i++) { - reviews.sendData( - new MultiSet([[{ id: 3 + i, listingId: 1, score: 1, text: `tada` }, 1]]) - ) - // reviews.sendFrontier(i+1) - - graph.run() -} - -const t1 = Date.now() - -console.log(`Time taken: ${t1 - t0} ms`) diff --git a/packages/db-ivm/src/types.ts b/packages/db-ivm/src/types.ts index fea5a7c6a..b1a8e71d3 100644 --- a/packages/db-ivm/src/types.ts +++ b/packages/db-ivm/src/types.ts @@ -27,7 +27,6 @@ export interface ID2 { getNextOperatorId: () => number newInput: () => IStreamBuilder addOperator: (operator: UnaryOperator | BinaryOperator) => void - addStream: (stream: DifferenceStreamReader) => void finalize: () => void step: () => void } diff --git a/packages/db-ivm/src/valueIndex.ts b/packages/db-ivm/src/valueIndex.ts index e8d1ed709..2470e7aa8 100644 --- a/packages/db-ivm/src/valueIndex.ts +++ b/packages/db-ivm/src/valueIndex.ts @@ -53,16 +53,6 @@ export class ValueIndex { if (this.has(key)) { const [currValue, currMultiplicity] = this.get(key)! - // Fast path for primitive equality to avoid hashing cost - if ( - (typeof value === `number` || typeof value === `string`) && - value === (currValue as any) - ) { - // Update the multiplicity - this.#setMultiplicity(key, value as V, currMultiplicity + multiplicity) - return - } - // Fallback to structural hash-based equality for complex values if (hash(value) === hash(currValue)) { // Update the multiplicity this.#setMultiplicity(key, value, currMultiplicity + multiplicity) From 0c85527f3bacc83f11848f8d465d4a0aea27c88c Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sat, 13 Sep 2025 18:15:41 +0100 Subject: [PATCH 5/5] changeset --- .changeset/empty-poets-wait.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/empty-poets-wait.md diff --git a/.changeset/empty-poets-wait.md b/.changeset/empty-poets-wait.md new file mode 100644 index 000000000..5c9004943 --- /dev/null +++ b/.changeset/empty-poets-wait.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Fix memory leak that results in linear memory growth with incremental changes over time. Thanks to @sorenbs for finding and fixing this.