diff --git a/.changeset/empty-poets-wait.md b/.changeset/empty-poets-wait.md new file mode 100644 index 000000000..5c9004943 --- /dev/null +++ b/.changeset/empty-poets-wait.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Fix memory leak that results in linear memory growth with incremental changes over time. Thanks to @sorenbs for finding and fixing this. diff --git a/packages/db-ivm/src/d2.ts b/packages/db-ivm/src/d2.ts index fea5f418f..b3149de73 100644 --- a/packages/db-ivm/src/d2.ts +++ b/packages/db-ivm/src/d2.ts @@ -8,7 +8,6 @@ import type { MultiSet, MultiSetArray } from "./multiset.js" import type { ID2, IStreamBuilder, PipedOperator } from "./types.js" export class D2 implements ID2 { - #streams: Array> = [] #operators: Array | BinaryOperator> = [] #nextOperatorId = 0 #finalized = false @@ -31,7 +30,6 @@ export class D2 implements ID2 { const writer = new DifferenceStreamWriter() // Use the root stream builder that exposes the sendData and sendFrontier methods const streamBuilder = new RootStreamBuilder(this, writer) - this.#streams.push(streamBuilder.connectReader()) return streamBuilder } @@ -40,11 +38,6 @@ export class D2 implements ID2 { this.#operators.push(operator) } - addStream(stream: DifferenceStreamReader): void { - this.#checkNotFinalized() - this.#streams.push(stream) - } - finalize() { this.#checkNotFinalized() this.#finalized = true diff --git a/packages/db-ivm/src/operators/concat.ts b/packages/db-ivm/src/operators/concat.ts index fb9f9b053..075320842 100644 --- a/packages/db-ivm/src/operators/concat.ts +++ b/packages/db-ivm/src/operators/concat.ts @@ -39,7 +39,6 @@ export function concat( output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/consolidate.ts b/packages/db-ivm/src/operators/consolidate.ts index 2fc3a0973..cb3b3c492 100644 --- a/packages/db-ivm/src/operators/consolidate.ts +++ b/packages/db-ivm/src/operators/consolidate.ts @@ -44,7 +44,6 @@ export function consolidate(): PipedOperator { output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/count.ts b/packages/db-ivm/src/operators/count.ts index 80af798a3..c54ed6234 100644 --- a/packages/db-ivm/src/operators/count.ts +++ b/packages/db-ivm/src/operators/count.ts @@ -46,7 +46,6 @@ export function count< output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/debug.ts b/packages/db-ivm/src/operators/debug.ts index f6646f5ac..0d660c95f 100644 --- a/packages/db-ivm/src/operators/debug.ts +++ b/packages/db-ivm/src/operators/debug.ts @@ -52,7 +52,6 @@ export function debug( indent ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/distinct.ts b/packages/db-ivm/src/operators/distinct.ts index 851f3e84a..dc2f5a177 100644 --- a/packages/db-ivm/src/operators/distinct.ts +++ b/packages/db-ivm/src/operators/distinct.ts @@ -92,7 +92,6 @@ export function distinct(by: (value: T) => any = (value: T) => value) { by ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/filter.ts b/packages/db-ivm/src/operators/filter.ts index b7cb7574c..b34777505 100644 --- a/packages/db-ivm/src/operators/filter.ts +++ b/packages/db-ivm/src/operators/filter.ts @@ -42,7 +42,6 @@ export function filter(f: (data: T) => boolean): PipedOperator { f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index 259cfbc05..96e9ba962 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -149,7 +149,6 @@ export function innerJoin< output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/map.ts b/packages/db-ivm/src/operators/map.ts index 467da9b07..2aed5b1f0 100644 --- a/packages/db-ivm/src/operators/map.ts +++ b/packages/db-ivm/src/operators/map.ts @@ -42,7 +42,6 @@ export function map(f: (data: T) => O): PipedOperator { f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/negate.ts b/packages/db-ivm/src/operators/negate.ts index e9dd5f206..8962c210f 100644 --- a/packages/db-ivm/src/operators/negate.ts +++ b/packages/db-ivm/src/operators/negate.ts @@ -27,7 +27,6 @@ export function negate(): PipedOperator { output.writer ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/output.ts b/packages/db-ivm/src/operators/output.ts index c6a47d021..5217b3d66 100644 --- a/packages/db-ivm/src/operators/output.ts +++ b/packages/db-ivm/src/operators/output.ts @@ -47,7 +47,6 @@ export function output( fn ) stream.graph.addOperator(operator) - stream.graph.addStream(outputStream.connectReader()) return outputStream } } diff --git a/packages/db-ivm/src/operators/reduce.ts b/packages/db-ivm/src/operators/reduce.ts index 47591feaa..4301dced5 100644 --- a/packages/db-ivm/src/operators/reduce.ts +++ b/packages/db-ivm/src/operators/reduce.ts @@ -118,7 +118,6 @@ export function reduce< f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/tap.ts b/packages/db-ivm/src/operators/tap.ts index fcf08d33c..20c59fd5a 100644 --- a/packages/db-ivm/src/operators/tap.ts +++ b/packages/db-ivm/src/operators/tap.ts @@ -45,7 +45,6 @@ export function tap(f: (data: MultiSet) => void): PipedOperator { f ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index bcb680782..81e72b54a 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -327,7 +327,6 @@ export function topKWithFractionalIndex( opts ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts index 5f87b6702..7b094d7c4 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts @@ -302,7 +302,6 @@ export function topKWithFractionalIndexBTree( opts ) stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) return output } } diff --git a/packages/db-ivm/src/types.ts b/packages/db-ivm/src/types.ts index fea5a7c6a..b1a8e71d3 100644 --- a/packages/db-ivm/src/types.ts +++ b/packages/db-ivm/src/types.ts @@ -27,7 +27,6 @@ export interface ID2 { getNextOperatorId: () => number newInput: () => IStreamBuilder addOperator: (operator: UnaryOperator | BinaryOperator) => void - addStream: (stream: DifferenceStreamReader) => void finalize: () => void step: () => void }