Conversation
src/valueIndex.ts:28–40 — fast path for primitive equality (number/string) avoids hashing when possible.
Each operator factory added a fresh “public” reader via graph.addStream(output.connectReader()). These readers are not consumed anywhere in the graph engine (D2.run() only checks operator inputs). As a result, every message at every stage was duplicated into at least one extra queue that was never drained, leading to unbounded growth. In src/d2.ts, newInput() also added such a reader, further duplicating messages at the graph boundary. This is an architectural memory leak: dangling readers retained all messages indefinitely. Changes Implemented Stop creating unconsumed “public” readers: src/d2.ts:28–34 — newInput no longer creates and stores a reader. src/d2.ts:41–45 — addStream kept as a no-op for API compatibility (prevents accidental accumulation). Remove graph.addStream(output.connectReader()) across all operators so they don’t create dangling readers: src/operators/map.ts:44 src/operators/reduce.ts:120 src/operators/filter.ts:… (operator creation site) src/operators/concat.ts:… (operator creation site) src/operators/output.ts:… (operator creation site) src/operators/debug.ts:… (operator creation site) src/operators/tap.ts:… (operator creation site) src/operators/join.ts:… (operator creation site) src/operators/count.ts:… (operator creation site) src/operators/distinct.ts:… (operator creation site) src/operators/consolidate.ts:… (operator creation site) src/operators/topKWithFractionalIndex.ts:… (operator creation site) src/operators/topKWithFractionalIndexBTree.ts:… (operator creation site) Validation Memory after fix (gtime): 1k: Max memory: 54112 KB 100k: Max memory: 108112 KB 1M: Max memory: 176160 KB Compared to the pre-fix 1M at ~1.26 GB, this is a dramatic improvement. The remaining increase is modest and consistent with runtime/GC behavior and transient allocations (e.g., output multisets per step), not retained state. Why This Works Operators still create one reader per actual consumer (the next operator in the pipeline). Those are drained synchronously in each step(). No extra dangling readers are created; hence no permanent queues retain messages. If a stream has no consumer (like the final reduce in the memtest), its writer has zero readers, so sendData does nothing — no queue, no retention. Please explain to me: Why did this library implement these “public” readers? Are they needed for any part of the public API? Can we safely remove them without impacting the public API and functioning of the library? Here’s the straight answer. Why they existed Likely intent: Materialize every operator’s output “somewhere” by default — either as a vestige from the original D2TS design (where graphs tracked streams/frontiers) or as a convenience to make outputs observable later. In this simplified, version-free engine, that mechanism was never actually used. The graph scheduler does not read those queues, nor does the public API expose them. Are they needed for the public API? No. Public consumption is already explicit: IStreamBuilder.connectReader() lets you attach readers where you need results. See implementation in src/d2.ts:81. Side-effect sinks (e.g. debug, output) do observation without needing a reader. The graph’s private storage of readers is not exposed and is not used by scheduling: D2.run() drives work by operators only; it doesn’t look at readers. See src/d2.ts:61 and src/d2.ts:65. Readers themselves only buffer for the parties that explicitly connect. See src/graph.ts:46 (writer creates a queue per reader) and src/graph.ts:19 (reader drains and clears). Can we remove them safely? Yes. We removed the automatic “public” readers and everything still works: No change to surface API or types. All tests still pass (247/247). Memory usage improves dramatically because we no longer create undrained queues holding every message. Behavioral semantics after removal Streams without a consumer: If you don’t connect a reader (or end in a sink), outputs are dropped — which matches normal dataflow semantics. Previously, they were buffered in invisible, undrained queues (the source of the memory growth). How to consume results explicitly: Example: const reader = output.connectReader(); graph.run(); const messages = reader.drain(); This is consistent with the library’s public contract — users already needed to attach readers for data they wanted to materialize. Key references src/graph.ts:36 — Writer enqueues to each reader’s queue. src/graph.ts:19 — Reader drain() clears its queue. src/d2.ts:61 and src/d2.ts:65 — Scheduler only asks operators if they have pending input and runs them; it never reads “graph-level” readers. src/d2.ts:81 — StreamBuilder.connectReader() is the supported way to consume a stream. Bottom line The “public” readers were not part of the usable public API and weren’t used by the engine. They created undrained queues that retained all messages. Removing them is safe and correct. If you want materialized outputs, attach a reader explicitly or use a sink operator. If you want a convenience “materialize()” operator that returns a handle, we can add one as a follow-up.
🦋 Changeset detectedLatest commit: 0c85527 The changes in this PR will be included in the next version bump. This PR includes changesets to release 11 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
More templates
@tanstack/angular-db
@tanstack/db
@tanstack/db-ivm
@tanstack/electric-db-collection
@tanstack/query-db-collection
@tanstack/react-db
@tanstack/rxdb-db-collection
@tanstack/solid-db
@tanstack/svelte-db
@tanstack/trailbase-db-collection
@tanstack/vue-db
commit: |
|
Size Change: 0 B Total Size: 66.6 kB ℹ️ View Unchanged
|
|
Size Change: 0 B Total Size: 1.18 kB ℹ️ View Unchanged
|
|
So stoked for this! Reduced memory from ~6GB to ~200MB in one of my tests. |
|
@samwillis could you explain what was causing the memory leak? I see that you removed the |
|
@kevin-dp we were adding an output from each operator which would accumulate messages into a list in the graph, these outputs were never drained. It was a remnant of how things worked from very early in D2TS that was overlooked. |
Co-authored-by: Søren Bramer Schmidt <sorenbs@gmail.com>
Thanks to @sorenbs for finding and fixing this.