From b8af72c705b8d9b068729daf8280a074ec337d2f Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 19 Sep 2025 11:51:32 +0100 Subject: [PATCH 1/7] first refactor --- packages/db-ivm/src/operators/join.ts | 304 +++++++++++++----------- packages/db/src/query/compiler/joins.ts | 2 - 2 files changed, 166 insertions(+), 140 deletions(-) diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index 96e9ba962..1750e8d32 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -2,9 +2,6 @@ import { BinaryOperator, DifferenceStreamWriter } from "../graph.js" import { StreamBuilder } from "../d2.js" import { MultiSet } from "../multiset.js" import { Index } from "../indexes.js" -import { negate } from "./negate.js" -import { map } from "./map.js" -import { concat } from "./concat.js" import type { DifferenceStreamReader } from "../graph.js" import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js" @@ -14,66 +11,174 @@ import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js" export type JoinType = `inner` | `left` | `right` | `full` | `anti` /** - * Operator that joins two input streams + * Helper to build delta index and mass map from messages + */ +function buildDelta(messages: Array): [Index, Map] { + const delta = new Index() + const deltaMass = new Map() + + for (const message of messages) { + const multiSetMessage = message as unknown as MultiSet<[K, V]> + for (const [item, multiplicity] of multiSetMessage.getInner()) { + const [key, value] = item + delta.addValue(key, [value, multiplicity]) + deltaMass.set(key, (deltaMass.get(key) || 0) + multiplicity) + } + } + + return [delta, deltaMass] +} + +/** + * Check if a key has presence (non-zero mass) + */ +function hasPresence(mass: Map, key: K): boolean { + return (mass.get(key) || 0) !== 0 +} + +/** + * Operator that joins two input streams using direct join algorithms */ export class JoinOperator extends BinaryOperator< - [K, V1] | [K, V2] | [K, [V1, V2]] + [K, V1] | [K, V2] | [K, [V1, V2]] | [K, [V1 | null, V2 | null]] > { #indexA = new Index() #indexB = new Index() + #massA = new Map() // sum of multiplicities per key on side A + #massB = new Map() // sum of multiplicities per key on side B + #mode: JoinType constructor( id: number, inputA: DifferenceStreamReader<[K, V1]>, inputB: DifferenceStreamReader<[K, V2]>, - output: DifferenceStreamWriter<[K, [V1, V2]]> + output: DifferenceStreamWriter, + mode: JoinType = 'inner' ) { super(id, inputA, inputB, output) + this.#mode = mode } run(): void { - const deltaA = new Index() - const deltaB = new Index() + const start = performance.now() + // 1) Ingest messages and build deltas (no state mutation yet) + const [deltaA, deltaMassA] = buildDelta(this.inputAMessages()) + const [deltaB, deltaMassB] = buildDelta(this.inputBMessages()) - // Process input A - process ALL messages, not just the first one - const messagesA = this.inputAMessages() - for (const message of messagesA) { - const multiSetMessage = message as unknown as MultiSet<[K, V1]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { - const [key, value] = item - deltaA.addValue(key, [value, multiplicity]) - } + const results = new MultiSet() + + // 2) INNER part (used by inner/left/right/full, but NOT anti) + if (this.#mode === 'inner' || this.#mode === 'left' || this.#mode === 'right' || this.#mode === 'full') { + // Emit deltaA ⋈ indexB + results.extend(deltaA.join(this.#indexB)) + + // Create logical indexA ⊎ deltaA and join with deltaB + const tempIndexA = new Index() + tempIndexA.append(this.#indexA) + tempIndexA.append(deltaA) + results.extend(tempIndexA.join(deltaB)) } - // Process input B - process ALL messages, not just the first one - const messagesB = this.inputBMessages() - for (const message of messagesB) { - const multiSetMessage = message as unknown as MultiSet<[K, V2]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { - const [key, value] = item - deltaB.addValue(key, [value, multiplicity]) + // 3) OUTER/ANTI specifics + + // LEFT side nulls or anti-left (depend only on B's presence) + if (this.#mode === 'left' || this.#mode === 'full' || this.#mode === 'anti') { + // 3a) New/deleted left rows that are currently unmatched + // For initial state, check final presence after applying deltaB + for (const [key, valueIterator] of deltaA.entriesIterators()) { + const finalMassB = (this.#massB.get(key) || 0) + (deltaMassB.get(key) || 0) + if (finalMassB === 0) { + for (const [value, multiplicity] of valueIterator) { + if (multiplicity !== 0) { + results.extend([[[key, [value, null]], multiplicity]]) + } + } + } + } + + // 3b) Right-side presence transitions flip match status for *current* left rows + for (const key of deltaMassB.keys()) { + const wasEmpty = !hasPresence(this.#massB, key) + const currentMass = this.#massB.get(key) || 0 + const deltaMass = deltaMassB.get(key) || 0 + const willEmpty = (currentMass + deltaMass) === 0 + + if (wasEmpty && !willEmpty) { + // B: 0 -> >0 — retract previously unmatched left-at-k + for (const [value, multiplicity] of this.#indexA.getIterator(key)) { + if (multiplicity !== 0) { + results.extend([[[key, [value, null]], -multiplicity]]) + } + } + } else if (!wasEmpty && willEmpty) { + // B: >0 -> 0 — emit left-at-k as unmatched + for (const [value, multiplicity] of this.#indexA.getIterator(key)) { + if (multiplicity !== 0) { + results.extend([[[key, [value, null]], multiplicity]]) + } + } + } } } - // Process results - const results = new MultiSet<[K, [V1, V2]]>() + // RIGHT side nulls (depend only on A's presence) + if (this.#mode === 'right' || this.#mode === 'full') { + // 3a) New/deleted right rows that are currently unmatched + // For initial state, check final presence after applying deltaA + for (const [key, valueIterator] of deltaB.entriesIterators()) { + const finalMassA = (this.#massA.get(key) || 0) + (deltaMassA.get(key) || 0) + if (finalMassA === 0) { + for (const [value, multiplicity] of valueIterator) { + if (multiplicity !== 0) { + results.extend([[[key, [null, value]], multiplicity]]) + } + } + } + } - // Join deltaA with existing indexB - results.extend(deltaA.join(this.#indexB)) + // 3b) Left-side presence transitions flip match status for *current* right rows + for (const key of deltaMassA.keys()) { + const wasEmpty = !hasPresence(this.#massA, key) + const currentMass = this.#massA.get(key) || 0 + const deltaMass = deltaMassA.get(key) || 0 + const willEmpty = (currentMass + deltaMass) === 0 - // Append deltaA to indexA - this.#indexA.append(deltaA) + if (wasEmpty && !willEmpty) { + // A: 0 -> >0 — retract previously unmatched right-at-k + for (const [value, multiplicity] of this.#indexB.getIterator(key)) { + if (multiplicity !== 0) { + results.extend([[[key, [null, value]], -multiplicity]]) + } + } + } else if (!wasEmpty && willEmpty) { + // A: >0 -> 0 — emit right-at-k as unmatched + for (const [value, multiplicity] of this.#indexB.getIterator(key)) { + if (multiplicity !== 0) { + results.extend([[[key, [null, value]], multiplicity]]) + } + } + } + } + } - // Join existing indexA with deltaB - results.extend(this.#indexA.join(deltaB)) + // 4) Commit — update state + this.#indexA.append(deltaA) + this.#indexB.append(deltaB) + + // Update masses + for (const [key, deltaMass] of deltaMassA) { + this.#massA.set(key, (this.#massA.get(key) || 0) + deltaMass) + } + for (const [key, deltaMass] of deltaMassB) { + this.#massB.set(key, (this.#massB.get(key) || 0) + deltaMass) + } // Send results if (results.getInner().length > 0) { this.output.sendData(results) } - - // Append deltaB to indexB - this.#indexB.append(deltaB) + const end = performance.now() + console.log(`join took ${end - start}ms`) } } @@ -91,39 +196,28 @@ export function join< other: IStreamBuilder>, type: JoinType = `inner` ): PipedOperator> { - switch (type) { - case `inner`: - return innerJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `anti`: - return antiJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `left`: - return leftJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `right`: - return rightJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `full`: - return fullJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - default: - throw new Error(`Join type ${type} is invalid`) + return (stream: IStreamBuilder): IStreamBuilder> => { + if (stream.graph !== other.graph) { + throw new Error(`Cannot join streams from different graphs`) + } + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>() + ) + const operator = new JoinOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + other.connectReader(), + output.writer, + type + ) + stream.graph.addOperator(operator) + return output } } /** - * Joins two input streams + * Joins two input streams (inner join) * @param other - The other stream to join with */ export function innerJoin< @@ -134,27 +228,11 @@ export function innerJoin< >( other: IStreamBuilder> ): PipedOperator> { - return (stream: IStreamBuilder): IStreamBuilder> => { - if (stream.graph !== other.graph) { - throw new Error(`Cannot join streams from different graphs`) - } - const output = new StreamBuilder>( - stream.graph, - new DifferenceStreamWriter>() - ) - const operator = new JoinOperator( - stream.graph.getNextOperatorId(), - stream.connectReader() as DifferenceStreamReader>, - other.connectReader(), - output.writer - ) - stream.graph.addOperator(operator) - return output - } + return join(other, 'inner') as unknown as PipedOperator> } /** - * Joins two input streams + * Joins two input streams (anti join) * @param other - The other stream to join with */ export function antiJoin< @@ -165,24 +243,11 @@ export function antiJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const matchedLeft = stream.pipe( - innerJoin(other), - map(([key, [valueLeft, _valueRight]]) => [key, valueLeft]) - ) - const anti = stream.pipe( - concat(matchedLeft.pipe(negate())), - // @ts-ignore TODO: fix this - map(([key, value]) => [key, [value, null]]) - ) - return anti as IStreamBuilder> - } + return join(other, 'anti') as unknown as PipedOperator> } /** - * Joins two input streams + * Joins two input streams (left join) * @param other - The other stream to join with */ export function leftJoin< @@ -193,21 +258,11 @@ export function leftJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const left = stream - const right = other - const inner = left.pipe(innerJoin(right)) - const anti = left.pipe(antiJoin(right)) - return inner.pipe(concat(anti)) as IStreamBuilder< - KeyValue - > - } + return join(other, 'left') as unknown as PipedOperator> } /** - * Joins two input streams + * Joins two input streams (right join) * @param other - The other stream to join with */ export function rightJoin< @@ -218,24 +273,11 @@ export function rightJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const left = stream as IStreamBuilder> - const right = other - const inner = left.pipe(innerJoin(right)) - const anti = right.pipe( - antiJoin(left), - map(([key, [a, b]]) => [key, [b, a]]) - ) - return inner.pipe(concat(anti)) as IStreamBuilder< - KeyValue - > - } + return join(other, 'right') as unknown as PipedOperator> } /** - * Joins two input streams + * Joins two input streams (full join) * @param other - The other stream to join with */ export function fullJoin< @@ -246,19 +288,5 @@ export function fullJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const left = stream as IStreamBuilder> - const right = other - const inner = left.pipe(innerJoin(right)) - const antiLeft = left.pipe(antiJoin(right)) - const antiRight = right.pipe( - antiJoin(left), - map(([key, [a, b]]) => [key, [b, a]]) - ) - return inner.pipe(concat(antiLeft), concat(antiRight)) as IStreamBuilder< - KeyValue - > - } + return join(other, 'full') as unknown as PipedOperator> } diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 1beb93728..b9d6210b2 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -1,5 +1,4 @@ import { - consolidate, filter, join as joinOperator, map, @@ -290,7 +289,6 @@ function processJoin( return mainPipeline.pipe( joinOperator(joinedPipeline, joinClause.type as JoinType), - consolidate(), processJoinResults(joinClause.type) ) } From 5d05015d07ee87462aa82d5360caf88170698326 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 19 Sep 2025 12:00:31 +0100 Subject: [PATCH 2/7] second pass --- packages/db-ivm/src/multiset.ts | 6 ++ packages/db-ivm/src/operators/join.ts | 132 +++++++++++++++----------- 2 files changed, 83 insertions(+), 55 deletions(-) diff --git a/packages/db-ivm/src/multiset.ts b/packages/db-ivm/src/multiset.ts index 1e793345e..44ba297ed 100644 --- a/packages/db-ivm/src/multiset.ts +++ b/packages/db-ivm/src/multiset.ts @@ -209,6 +209,12 @@ export class MultiSet { chunkedArrayPush(this.#inner, otherArray) } + add(item: T, multiplicity: number): void { + if (multiplicity !== 0) { + this.#inner.push([item, multiplicity]) + } + } + getInner(): MultiSetArray { return this.#inner } diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index 1750e8d32..e1ee243ca 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -13,27 +13,22 @@ export type JoinType = `inner` | `left` | `right` | `full` | `anti` /** * Helper to build delta index and mass map from messages */ -function buildDelta(messages: Array): [Index, Map] { +function buildDelta( + messages: Array +): [Index, Map] { const delta = new Index() const deltaMass = new Map() - + for (const message of messages) { - const multiSetMessage = message as unknown as MultiSet<[K, V]> + const multiSetMessage = message as MultiSet<[K, V]> for (const [item, multiplicity] of multiSetMessage.getInner()) { const [key, value] = item delta.addValue(key, [value, multiplicity]) deltaMass.set(key, (deltaMass.get(key) || 0) + multiplicity) } } - - return [delta, deltaMass] -} -/** - * Check if a key has presence (non-zero mass) - */ -function hasPresence(mass: Map, key: K): boolean { - return (mass.get(key) || 0) !== 0 + return [delta, deltaMass] } /** @@ -53,14 +48,13 @@ export class JoinOperator extends BinaryOperator< inputA: DifferenceStreamReader<[K, V1]>, inputB: DifferenceStreamReader<[K, V2]>, output: DifferenceStreamWriter, - mode: JoinType = 'inner' + mode: JoinType = `inner` ) { super(id, inputA, inputB, output) this.#mode = mode } run(): void { - const start = performance.now() // 1) Ingest messages and build deltas (no state mutation yet) const [deltaA, deltaMassA] = buildDelta(this.inputAMessages()) const [deltaB, deltaMassB] = buildDelta(this.inputBMessages()) @@ -68,53 +62,61 @@ export class JoinOperator extends BinaryOperator< const results = new MultiSet() // 2) INNER part (used by inner/left/right/full, but NOT anti) - if (this.#mode === 'inner' || this.#mode === 'left' || this.#mode === 'right' || this.#mode === 'full') { - // Emit deltaA ⋈ indexB + if ( + this.#mode === `inner` || + this.#mode === `left` || + this.#mode === `right` || + this.#mode === `full` + ) { + // Emit the three standard delta terms: ΔA⋈B_old, A_old⋈ΔB, ΔA⋈ΔB + // This avoids copying the entire left index each tick results.extend(deltaA.join(this.#indexB)) - - // Create logical indexA ⊎ deltaA and join with deltaB - const tempIndexA = new Index() - tempIndexA.append(this.#indexA) - tempIndexA.append(deltaA) - results.extend(tempIndexA.join(deltaB)) + results.extend(this.#indexA.join(deltaB)) + results.extend(deltaA.join(deltaB)) } // 3) OUTER/ANTI specifics // LEFT side nulls or anti-left (depend only on B's presence) - if (this.#mode === 'left' || this.#mode === 'full' || this.#mode === 'anti') { - // 3a) New/deleted left rows that are currently unmatched + if ( + this.#mode === `left` || + this.#mode === `full` || + this.#mode === `anti` + ) { + // 3a) New/deleted left rows that are currently unmatched // For initial state, check final presence after applying deltaB for (const [key, valueIterator] of deltaA.entriesIterators()) { - const finalMassB = (this.#massB.get(key) || 0) + (deltaMassB.get(key) || 0) + const finalMassB = + (this.#massB.get(key) || 0) + (deltaMassB.get(key) || 0) if (finalMassB === 0) { for (const [value, multiplicity] of valueIterator) { if (multiplicity !== 0) { - results.extend([[[key, [value, null]], multiplicity]]) + results.add([key, [value, null]], multiplicity) } } } } // 3b) Right-side presence transitions flip match status for *current* left rows - for (const key of deltaMassB.keys()) { - const wasEmpty = !hasPresence(this.#massB, key) - const currentMass = this.#massB.get(key) || 0 - const deltaMass = deltaMassB.get(key) || 0 - const willEmpty = (currentMass + deltaMass) === 0 + for (const [key, deltaMass] of deltaMassB) { + const before = this.#massB.get(key) || 0 + const after = before + deltaMass + + // Skip if presence doesn't flip (0->0, >0->different>0) + if ((before === 0) === (after === 0)) continue - if (wasEmpty && !willEmpty) { + if (before === 0 && after !== 0) { // B: 0 -> >0 — retract previously unmatched left-at-k for (const [value, multiplicity] of this.#indexA.getIterator(key)) { if (multiplicity !== 0) { - results.extend([[[key, [value, null]], -multiplicity]]) + results.add([key, [value, null]], -multiplicity) } } - } else if (!wasEmpty && willEmpty) { + } else if (before !== 0 && after === 0) { // B: >0 -> 0 — emit left-at-k as unmatched for (const [value, multiplicity] of this.#indexA.getIterator(key)) { if (multiplicity !== 0) { - results.extend([[[key, [value, null]], multiplicity]]) + results.add([key, [value, null]], multiplicity) } } } @@ -122,39 +124,41 @@ export class JoinOperator extends BinaryOperator< } // RIGHT side nulls (depend only on A's presence) - if (this.#mode === 'right' || this.#mode === 'full') { + if (this.#mode === `right` || this.#mode === `full`) { // 3a) New/deleted right rows that are currently unmatched // For initial state, check final presence after applying deltaA for (const [key, valueIterator] of deltaB.entriesIterators()) { - const finalMassA = (this.#massA.get(key) || 0) + (deltaMassA.get(key) || 0) + const finalMassA = + (this.#massA.get(key) || 0) + (deltaMassA.get(key) || 0) if (finalMassA === 0) { for (const [value, multiplicity] of valueIterator) { if (multiplicity !== 0) { - results.extend([[[key, [null, value]], multiplicity]]) + results.add([key, [null, value]], multiplicity) } } } } // 3b) Left-side presence transitions flip match status for *current* right rows - for (const key of deltaMassA.keys()) { - const wasEmpty = !hasPresence(this.#massA, key) - const currentMass = this.#massA.get(key) || 0 - const deltaMass = deltaMassA.get(key) || 0 - const willEmpty = (currentMass + deltaMass) === 0 + for (const [key, deltaMass] of deltaMassA) { + const before = this.#massA.get(key) || 0 + const after = before + deltaMass - if (wasEmpty && !willEmpty) { + // Skip if presence doesn't flip (0->0, >0->different>0) + if ((before === 0) === (after === 0)) continue + + if (before === 0 && after !== 0) { // A: 0 -> >0 — retract previously unmatched right-at-k for (const [value, multiplicity] of this.#indexB.getIterator(key)) { if (multiplicity !== 0) { - results.extend([[[key, [null, value]], -multiplicity]]) + results.add([key, [null, value]], -multiplicity) } } - } else if (!wasEmpty && willEmpty) { + } else if (before !== 0 && after === 0) { // A: >0 -> 0 — emit right-at-k as unmatched for (const [value, multiplicity] of this.#indexB.getIterator(key)) { if (multiplicity !== 0) { - results.extend([[[key, [null, value]], multiplicity]]) + results.add([key, [null, value]], multiplicity) } } } @@ -162,9 +166,12 @@ export class JoinOperator extends BinaryOperator< } // 4) Commit — update state + // IMPORTANT: All emissions use pre-append snapshots of indexA/indexB. + // For unmatched-on-delta (3a), use final presence (mass + deltaMass) to avoid churn. + // Append deltas and update masses only after all emissions. this.#indexA.append(deltaA) this.#indexB.append(deltaB) - + // Update masses for (const [key, deltaMass] of deltaMassA) { this.#massA.set(key, (this.#massA.get(key) || 0) + deltaMass) @@ -177,8 +184,6 @@ export class JoinOperator extends BinaryOperator< if (results.getInner().length > 0) { this.output.sendData(results) } - const end = performance.now() - console.log(`join took ${end - start}ms`) } } @@ -196,7 +201,9 @@ export function join< other: IStreamBuilder>, type: JoinType = `inner` ): PipedOperator> { - return (stream: IStreamBuilder): IStreamBuilder> => { + return ( + stream: IStreamBuilder + ): IStreamBuilder> => { if (stream.graph !== other.graph) { throw new Error(`Cannot join streams from different graphs`) } @@ -228,7 +235,10 @@ export function innerJoin< >( other: IStreamBuilder> ): PipedOperator> { - return join(other, 'inner') as unknown as PipedOperator> + return join(other, `inner`) as unknown as PipedOperator< + T, + KeyValue + > } /** @@ -243,7 +253,10 @@ export function antiJoin< >( other: IStreamBuilder> ): PipedOperator> { - return join(other, 'anti') as unknown as PipedOperator> + return join(other, `anti`) as unknown as PipedOperator< + T, + KeyValue + > } /** @@ -258,7 +271,10 @@ export function leftJoin< >( other: IStreamBuilder> ): PipedOperator> { - return join(other, 'left') as unknown as PipedOperator> + return join(other, `left`) as unknown as PipedOperator< + T, + KeyValue + > } /** @@ -273,7 +289,10 @@ export function rightJoin< >( other: IStreamBuilder> ): PipedOperator> { - return join(other, 'right') as unknown as PipedOperator> + return join(other, `right`) as unknown as PipedOperator< + T, + KeyValue + > } /** @@ -288,5 +307,8 @@ export function fullJoin< >( other: IStreamBuilder> ): PipedOperator> { - return join(other, 'full') as unknown as PipedOperator> + return join(other, `full`) as unknown as PipedOperator< + T, + KeyValue + > } From 6bad4bff25d90559beb543e1438ee8e4bed623d0 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 19 Sep 2025 12:50:20 +0100 Subject: [PATCH 3/7] futher optimisations --- packages/db-ivm/src/operators/join.ts | 219 +++++++++++++++++--------- 1 file changed, 147 insertions(+), 72 deletions(-) diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index e1ee243ca..73ee08ba5 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -1,3 +1,53 @@ +/** + * # Direct Join Algorithms for Incremental View Maintenance + * + * High-performance join operations implementing all join types (inner, left, right, full, anti) + * with minimal state and optimized performance. + * + * ## Algorithm + * + * For each tick, the algorithm processes incoming changes (deltas) and emits join results: + * + * 1. **Build deltas**: Extract new/changed/deleted rows from input messages + * 2. **Inner results**: Emit `ΔA⋈B_old + A_old⋈ΔB + ΔA⋈ΔB` (matched pairs) + * 3. **Outer results**: For unmatched rows, emit null-extended tuples: + * - New unmatched rows from deltas (when opposite side empty) + * - Presence transitions: when key goes `0→>0` (retract nulls) or `>0→0` (emit nulls) + * 4. **Update state**: Append deltas to indexes and update mass counters + * + * **Mass tracking** enables O(1) presence checks instead of scanning index buckets. + * + * ## State + * + * **Indexes** store the actual data: + * - `indexA: Index` - all left-side rows accumulated over time + * - `indexB: Index` - all right-side rows accumulated over time + * + * **Mass maps** track presence efficiently: + * - `massA/massB: Map` - sum of multiplicities per key + * - Used for O(1) presence checks: `mass.get(key) !== 0` means key exists + * - Avoids scanning entire index buckets just to check if key has any rows + * + * ## Join Types + * + * - **Inner**: Standard delta terms only + * - **Outer**: Inner results + null-extended unmatched rows with transition handling + * - **Anti**: Unmatched rows only (no inner results) + * + * ## Key Optimizations + * + * - **No temp copying**: Uses `(A⊎ΔA)⋈ΔB = A⋈ΔB ⊎ ΔA⋈ΔB` distributive property + * - **Early-out checks**: Skip phases when no deltas present + * - **Zero-entry pruning**: Keep maps compact, O(distinct keys) memory + * - **Final presence logic**: Avoid emit→retract churn within same tick + * + * ## Correctness + * + * - **Ordering**: Pre-append snapshots for emissions, post-emit state updates + * - **Presence**: Key matched iff mass ≠ 0, transitions trigger null handling + * - **Bag semantics**: Proper multiplicity handling including negatives + */ + import { BinaryOperator, DifferenceStreamWriter } from "../graph.js" import { StreamBuilder } from "../d2.js" import { MultiSet } from "../multiset.js" @@ -24,7 +74,14 @@ function buildDelta( for (const [item, multiplicity] of multiSetMessage.getInner()) { const [key, value] = item delta.addValue(key, [value, multiplicity]) - deltaMass.set(key, (deltaMass.get(key) || 0) + multiplicity) + + // Keep deltaMass small by deleting zero entries + const next = (deltaMass.get(key) || 0) + multiplicity + if (next === 0) { + deltaMass.delete(key) + } else { + deltaMass.set(key, next) + } } } @@ -59,64 +116,72 @@ export class JoinOperator extends BinaryOperator< const [deltaA, deltaMassA] = buildDelta(this.inputAMessages()) const [deltaB, deltaMassB] = buildDelta(this.inputBMessages()) + // Early-out checks + const hasDeltaA = deltaA.size > 0 + const hasDeltaB = deltaB.size > 0 + const hasDeltaMassA = deltaMassA.size > 0 + const hasDeltaMassB = deltaMassB.size > 0 + + // If nothing happened, bail early + if (!(hasDeltaA || hasDeltaB || hasDeltaMassA || hasDeltaMassB)) return + + // Precompute mode flags to avoid repeated string comparisons + const mode = this.#mode + const emitInner = + mode === `inner` || mode === `left` || mode === `right` || mode === `full` + const emitLeftNulls = mode === `left` || mode === `full` + const emitRightNulls = mode === `right` || mode === `full` + const emitAntiLeft = mode === `anti` + const results = new MultiSet() // 2) INNER part (used by inner/left/right/full, but NOT anti) - if ( - this.#mode === `inner` || - this.#mode === `left` || - this.#mode === `right` || - this.#mode === `full` - ) { - // Emit the three standard delta terms: ΔA⋈B_old, A_old⋈ΔB, ΔA⋈ΔB + if (emitInner && (hasDeltaA || hasDeltaB)) { + // Emit the three standard delta terms: DeltaA⋈B_old, A_old⋈DeltaB, DeltaA⋈DeltaB // This avoids copying the entire left index each tick - results.extend(deltaA.join(this.#indexB)) - results.extend(this.#indexA.join(deltaB)) - results.extend(deltaA.join(deltaB)) + if (hasDeltaA) results.extend(deltaA.join(this.#indexB)) + if (hasDeltaB) results.extend(this.#indexA.join(deltaB)) + if (hasDeltaA && hasDeltaB) results.extend(deltaA.join(deltaB)) } // 3) OUTER/ANTI specifics // LEFT side nulls or anti-left (depend only on B's presence) - if ( - this.#mode === `left` || - this.#mode === `full` || - this.#mode === `anti` - ) { - // 3a) New/deleted left rows that are currently unmatched - // For initial state, check final presence after applying deltaB - for (const [key, valueIterator] of deltaA.entriesIterators()) { - const finalMassB = - (this.#massB.get(key) || 0) + (deltaMassB.get(key) || 0) - if (finalMassB === 0) { - for (const [value, multiplicity] of valueIterator) { - if (multiplicity !== 0) { - results.add([key, [value, null]], multiplicity) + if ((emitLeftNulls || emitAntiLeft) && (hasDeltaA || hasDeltaMassB)) { + // 3a) New/deleted left rows that are currently unmatched (only if DeltaA changed) + if (hasDeltaA) { + // For initial state, check final presence after applying deltaB + for (const [key, valueIterator] of deltaA.entriesIterators()) { + const finalMassB = + (this.#massB.get(key) || 0) + (deltaMassB.get(key) || 0) + if (finalMassB === 0) { + for (const [value, multiplicity] of valueIterator) { + if (multiplicity !== 0) { + results.add([key, [value, null]], multiplicity) + } } } } } - // 3b) Right-side presence transitions flip match status for *current* left rows - for (const [key, deltaMass] of deltaMassB) { - const before = this.#massB.get(key) || 0 - const after = before + deltaMass + // 3b) Right-side presence transitions (only if some RHS masses changed) + if (hasDeltaMassB) { + for (const [key, deltaMass] of deltaMassB) { + const before = this.#massB.get(key) || 0 + if (deltaMass === 0) continue + const after = before + deltaMass - // Skip if presence doesn't flip (0->0, >0->different>0) - if ((before === 0) === (after === 0)) continue + // Skip if presence doesn't flip (0->0, >0->different>0) + if ((before === 0) === (after === 0)) continue - if (before === 0 && after !== 0) { - // B: 0 -> >0 — retract previously unmatched left-at-k - for (const [value, multiplicity] of this.#indexA.getIterator(key)) { + const it = this.#indexA.getIterator(key) + const retract = before === 0 // 0->!0 => retract, else (>0->0) emit + for (const [value, multiplicity] of it) { if (multiplicity !== 0) { - results.add([key, [value, null]], -multiplicity) - } - } - } else if (before !== 0 && after === 0) { - // B: >0 -> 0 — emit left-at-k as unmatched - for (const [value, multiplicity] of this.#indexA.getIterator(key)) { - if (multiplicity !== 0) { - results.add([key, [value, null]], multiplicity) + results.add( + [key, [value, null]], + retract ? -multiplicity : +multiplicity + ) } } } @@ -124,41 +189,41 @@ export class JoinOperator extends BinaryOperator< } // RIGHT side nulls (depend only on A's presence) - if (this.#mode === `right` || this.#mode === `full`) { - // 3a) New/deleted right rows that are currently unmatched - // For initial state, check final presence after applying deltaA - for (const [key, valueIterator] of deltaB.entriesIterators()) { - const finalMassA = - (this.#massA.get(key) || 0) + (deltaMassA.get(key) || 0) - if (finalMassA === 0) { - for (const [value, multiplicity] of valueIterator) { - if (multiplicity !== 0) { - results.add([key, [null, value]], multiplicity) + if (emitRightNulls && (hasDeltaB || hasDeltaMassA)) { + // 3a) New/deleted right rows that are currently unmatched (only if DeltaB changed) + if (hasDeltaB) { + // For initial state, check final presence after applying deltaA + for (const [key, valueIterator] of deltaB.entriesIterators()) { + const finalMassA = + (this.#massA.get(key) || 0) + (deltaMassA.get(key) || 0) + if (finalMassA === 0) { + for (const [value, multiplicity] of valueIterator) { + if (multiplicity !== 0) { + results.add([key, [null, value]], multiplicity) + } } } } } - // 3b) Left-side presence transitions flip match status for *current* right rows - for (const [key, deltaMass] of deltaMassA) { - const before = this.#massA.get(key) || 0 - const after = before + deltaMass + // 3b) Left-side presence transitions (only if some LHS masses changed) + if (hasDeltaMassA) { + for (const [key, deltaMass] of deltaMassA) { + const before = this.#massA.get(key) || 0 + if (deltaMass === 0) continue + const after = before + deltaMass - // Skip if presence doesn't flip (0->0, >0->different>0) - if ((before === 0) === (after === 0)) continue + // Skip if presence doesn't flip (0->0, >0->different>0) + if ((before === 0) === (after === 0)) continue - if (before === 0 && after !== 0) { - // A: 0 -> >0 — retract previously unmatched right-at-k - for (const [value, multiplicity] of this.#indexB.getIterator(key)) { + const it = this.#indexB.getIterator(key) + const retract = before === 0 // 0->!0 => retract, else (>0->0) emit + for (const [value, multiplicity] of it) { if (multiplicity !== 0) { - results.add([key, [null, value]], -multiplicity) - } - } - } else if (before !== 0 && after === 0) { - // A: >0 -> 0 — emit right-at-k as unmatched - for (const [value, multiplicity] of this.#indexB.getIterator(key)) { - if (multiplicity !== 0) { - results.add([key, [null, value]], multiplicity) + results.add( + [key, [null, value]], + retract ? -multiplicity : +multiplicity + ) } } } @@ -172,12 +237,22 @@ export class JoinOperator extends BinaryOperator< this.#indexA.append(deltaA) this.#indexB.append(deltaB) - // Update masses + // Update masses and keep maps small by deleting zero entries for (const [key, deltaMass] of deltaMassA) { - this.#massA.set(key, (this.#massA.get(key) || 0) + deltaMass) + const next = (this.#massA.get(key) || 0) + deltaMass + if (next === 0) { + this.#massA.delete(key) + } else { + this.#massA.set(key, next) + } } for (const [key, deltaMass] of deltaMassB) { - this.#massB.set(key, (this.#massB.get(key) || 0) + deltaMass) + const next = (this.#massB.get(key) || 0) + deltaMass + if (next === 0) { + this.#massB.delete(key) + } else { + this.#massB.set(key, next) + } } // Send results From 2baf4e1e239b4ae5790d0f3f342160067a74e809 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 19 Sep 2025 12:53:06 +0100 Subject: [PATCH 4/7] changeset --- .changeset/fast-joins-redesign.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fast-joins-redesign.md diff --git a/.changeset/fast-joins-redesign.md b/.changeset/fast-joins-redesign.md new file mode 100644 index 000000000..ef9e20e35 --- /dev/null +++ b/.changeset/fast-joins-redesign.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Redesign of the join operators with direct algorithms for major performance improvements by replacing composition-based joins (inner+anti) with implementation using mass tracking. Delivers significant performance gains while maintaining full correctness for all join types (inner, left, right, full, anti). From b9cf41195c22028540be8cf579b847233b4c3af7 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 19 Sep 2025 13:31:42 +0100 Subject: [PATCH 5/7] format --- packages/db/src/query/compiler/joins.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index b9d6210b2..418783983 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -1,9 +1,4 @@ -import { - filter, - join as joinOperator, - map, - tap, -} from "@tanstack/db-ivm" +import { filter, join as joinOperator, map, tap } from "@tanstack/db-ivm" import { CollectionInputNotFoundError, InvalidJoinCondition, From 99999584b88807ceee5ce3a4797d2f4f0298327d Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 10 Oct 2025 13:06:41 +0100 Subject: [PATCH 6/7] Address review --- packages/db-ivm/src/indexes.ts | 54 +++++ packages/db-ivm/src/operators/join.ts | 276 ++++++++++++-------------- 2 files changed, 177 insertions(+), 153 deletions(-) diff --git a/packages/db-ivm/src/indexes.ts b/packages/db-ivm/src/indexes.ts index 3c52614eb..7dcb349ba 100644 --- a/packages/db-ivm/src/indexes.ts +++ b/packages/db-ivm/src/indexes.ts @@ -150,11 +150,30 @@ export class Index { * hash to identify identical values, storing them in a third level value map. */ #inner: IndexMap + #consolidatedMultiplicity: Map = new Map() // sum of multiplicities per key constructor() { this.#inner = new Map() } + /** + * Create an Index from multiple MultiSet messages. + * @param messages - Array of MultiSet messages to build the index from. + * @returns A new Index containing all the data from the messages. + */ + static fromMultiSets(messages: Array>): Index { + const index = new Index() + + for (const message of messages) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + index.addValue(key, [value, multiplicity]) + } + } + + return index + } + /** * This method returns a string representation of the index. * @param indent - Whether to indent the string representation. @@ -184,6 +203,32 @@ export class Index { return this.#inner.has(key) } + /** + * Check if a key has presence (non-zero consolidated multiplicity). + * @param key - The key to check. + * @returns True if the key has non-zero consolidated multiplicity, false otherwise. + */ + hasPresence(key: TKey): boolean { + return (this.#consolidatedMultiplicity.get(key) || 0) !== 0 + } + + /** + * Get the consolidated multiplicity (sum of multiplicities) for a key. + * @param key - The key to get the consolidated multiplicity for. + * @returns The consolidated multiplicity for the key. + */ + getConsolidatedMultiplicity(key: TKey): number { + return this.#consolidatedMultiplicity.get(key) || 0 + } + + /** + * Get all keys that have presence (non-zero consolidated multiplicity). + * @returns An iterator of keys with non-zero consolidated multiplicity. + */ + getPresenceKeys(): Iterable { + return this.#consolidatedMultiplicity.keys() + } + /** * This method returns all values for a given key. * @param key - The key to get the values for. @@ -257,6 +302,15 @@ export class Index { // If the multiplicity is 0, do nothing if (multiplicity === 0) return + // Update consolidated multiplicity tracking + const newConsolidatedMultiplicity = + (this.#consolidatedMultiplicity.get(key) || 0) + multiplicity + if (newConsolidatedMultiplicity === 0) { + this.#consolidatedMultiplicity.delete(key) + } else { + this.#consolidatedMultiplicity.set(key, newConsolidatedMultiplicity) + } + const mapOrSingleValue = this.#inner.get(key) if (mapOrSingleValue === undefined) { diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index 73ee08ba5..dfe5422ee 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -8,14 +8,14 @@ * * For each tick, the algorithm processes incoming changes (deltas) and emits join results: * - * 1. **Build deltas**: Extract new/changed/deleted rows from input messages + * 1. **Build deltas**: Create delta indexes from input messages using `Index.fromMultiSet()` * 2. **Inner results**: Emit `ΔA⋈B_old + A_old⋈ΔB + ΔA⋈ΔB` (matched pairs) * 3. **Outer results**: For unmatched rows, emit null-extended tuples: * - New unmatched rows from deltas (when opposite side empty) * - Presence transitions: when key goes `0→>0` (retract nulls) or `>0→0` (emit nulls) - * 4. **Update state**: Append deltas to indexes and update mass counters + * 4. **Update state**: Append deltas to indexes (consolidated multiplicity tracking automatic) * - * **Mass tracking** enables O(1) presence checks instead of scanning index buckets. + * **Consolidated multiplicity tracking** enables O(1) presence checks instead of scanning index buckets. * * ## State * @@ -23,9 +23,9 @@ * - `indexA: Index` - all left-side rows accumulated over time * - `indexB: Index` - all right-side rows accumulated over time * - * **Mass maps** track presence efficiently: - * - `massA/massB: Map` - sum of multiplicities per key - * - Used for O(1) presence checks: `mass.get(key) !== 0` means key exists + * **Consolidated multiplicity tracking** (built into Index): + * - Each Index maintains sum of multiplicities per key internally + * - Provides O(1) presence checks: `index.hasPresence(key)` and `index.getConsolidatedMultiplicity(key)` * - Avoids scanning entire index buckets just to check if key has any rows * * ## Join Types @@ -60,34 +60,6 @@ import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js" */ export type JoinType = `inner` | `left` | `right` | `full` | `anti` -/** - * Helper to build delta index and mass map from messages - */ -function buildDelta( - messages: Array -): [Index, Map] { - const delta = new Index() - const deltaMass = new Map() - - for (const message of messages) { - const multiSetMessage = message as MultiSet<[K, V]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { - const [key, value] = item - delta.addValue(key, [value, multiplicity]) - - // Keep deltaMass small by deleting zero entries - const next = (deltaMass.get(key) || 0) + multiplicity - if (next === 0) { - deltaMass.delete(key) - } else { - deltaMass.set(key, next) - } - } - } - - return [delta, deltaMass] -} - /** * Operator that joins two input streams using direct join algorithms */ @@ -96,8 +68,6 @@ export class JoinOperator extends BinaryOperator< > { #indexA = new Index() #indexB = new Index() - #massA = new Map() // sum of multiplicities per key on side A - #massB = new Map() // sum of multiplicities per key on side B #mode: JoinType constructor( @@ -112,153 +82,153 @@ export class JoinOperator extends BinaryOperator< } run(): void { - // 1) Ingest messages and build deltas (no state mutation yet) - const [deltaA, deltaMassA] = buildDelta(this.inputAMessages()) - const [deltaB, deltaMassB] = buildDelta(this.inputBMessages()) - - // Early-out checks - const hasDeltaA = deltaA.size > 0 - const hasDeltaB = deltaB.size > 0 - const hasDeltaMassA = deltaMassA.size > 0 - const hasDeltaMassB = deltaMassB.size > 0 - - // If nothing happened, bail early - if (!(hasDeltaA || hasDeltaB || hasDeltaMassA || hasDeltaMassB)) return + // Build deltas from input messages + const deltaA = Index.fromMultiSets( + this.inputAMessages() as Array> + ) + const deltaB = Index.fromMultiSets( + this.inputBMessages() as Array> + ) - // Precompute mode flags to avoid repeated string comparisons - const mode = this.#mode - const emitInner = - mode === `inner` || mode === `left` || mode === `right` || mode === `full` - const emitLeftNulls = mode === `left` || mode === `full` - const emitRightNulls = mode === `right` || mode === `full` - const emitAntiLeft = mode === `anti` + // Early-out if nothing changed + if (deltaA.size === 0 && deltaB.size === 0) return const results = new MultiSet() - // 2) INNER part (used by inner/left/right/full, but NOT anti) - if (emitInner && (hasDeltaA || hasDeltaB)) { - // Emit the three standard delta terms: DeltaA⋈B_old, A_old⋈DeltaB, DeltaA⋈DeltaB - // This avoids copying the entire left index each tick - if (hasDeltaA) results.extend(deltaA.join(this.#indexB)) - if (hasDeltaB) results.extend(this.#indexA.join(deltaB)) - if (hasDeltaA && hasDeltaB) results.extend(deltaA.join(deltaB)) + // Emit inner results (all modes except anti) + if (this.#mode !== `anti`) { + this.emitInnerResults(deltaA, deltaB, results) } - // 3) OUTER/ANTI specifics + // Emit left outer/anti results + if ( + this.#mode === `left` || + this.#mode === `full` || + this.#mode === `anti` + ) { + this.emitLeftOuterResults(deltaA, deltaB, results) + } - // LEFT side nulls or anti-left (depend only on B's presence) - if ((emitLeftNulls || emitAntiLeft) && (hasDeltaA || hasDeltaMassB)) { - // 3a) New/deleted left rows that are currently unmatched (only if DeltaA changed) - if (hasDeltaA) { - // For initial state, check final presence after applying deltaB - for (const [key, valueIterator] of deltaA.entriesIterators()) { - const finalMassB = - (this.#massB.get(key) || 0) + (deltaMassB.get(key) || 0) - if (finalMassB === 0) { - for (const [value, multiplicity] of valueIterator) { - if (multiplicity !== 0) { - results.add([key, [value, null]], multiplicity) - } - } - } - } - } + // Emit right outer results + if (this.#mode === `right` || this.#mode === `full`) { + this.emitRightOuterResults(deltaA, deltaB, results) + } + + // Update state and send results + // IMPORTANT: All emissions use pre-append snapshots of indexA/indexB. + // Append deltas to indices (consolidated multiplicity tracking is handled automatically) + this.#indexA.append(deltaA) + this.#indexB.append(deltaB) - // 3b) Right-side presence transitions (only if some RHS masses changed) - if (hasDeltaMassB) { - for (const [key, deltaMass] of deltaMassB) { - const before = this.#massB.get(key) || 0 - if (deltaMass === 0) continue - const after = before + deltaMass + // Send results + if (results.getInner().length > 0) { + this.output.sendData(results) + } + } - // Skip if presence doesn't flip (0->0, >0->different>0) - if ((before === 0) === (after === 0)) continue + private emitInnerResults( + deltaA: Index, + deltaB: Index, + results: MultiSet + ): void { + // Emit the three standard delta terms: ΔA⋈B_old, A_old⋈ΔB, ΔA⋈ΔB + if (deltaA.size > 0) results.extend(deltaA.join(this.#indexB)) + if (deltaB.size > 0) results.extend(this.#indexA.join(deltaB)) + if (deltaA.size > 0 && deltaB.size > 0) results.extend(deltaA.join(deltaB)) + } - const it = this.#indexA.getIterator(key) - const retract = before === 0 // 0->!0 => retract, else (>0->0) emit - for (const [value, multiplicity] of it) { + private emitLeftOuterResults( + deltaA: Index, + deltaB: Index, + results: MultiSet + ): void { + // Emit unmatched left rows from deltaA + if (deltaA.size > 0) { + for (const [key, valueIterator] of deltaA.entriesIterators()) { + const currentMultiplicityB = + this.#indexB.getConsolidatedMultiplicity(key) + const deltaMultiplicityB = deltaB.getConsolidatedMultiplicity(key) + const finalMultiplicityB = currentMultiplicityB + deltaMultiplicityB + + if (finalMultiplicityB === 0) { + for (const [value, multiplicity] of valueIterator) { if (multiplicity !== 0) { - results.add( - [key, [value, null]], - retract ? -multiplicity : +multiplicity - ) + results.add([key, [value, null]], multiplicity) } } } } } - // RIGHT side nulls (depend only on A's presence) - if (emitRightNulls && (hasDeltaB || hasDeltaMassA)) { - // 3a) New/deleted right rows that are currently unmatched (only if DeltaB changed) - if (hasDeltaB) { - // For initial state, check final presence after applying deltaA - for (const [key, valueIterator] of deltaB.entriesIterators()) { - const finalMassA = - (this.#massA.get(key) || 0) + (deltaMassA.get(key) || 0) - if (finalMassA === 0) { - for (const [value, multiplicity] of valueIterator) { - if (multiplicity !== 0) { - results.add([key, [null, value]], multiplicity) - } - } + // Handle presence transitions from right side changes + if (deltaB.size > 0) { + for (const key of deltaB.getPresenceKeys()) { + const before = this.#indexB.getConsolidatedMultiplicity(key) + const deltaMult = deltaB.getConsolidatedMultiplicity(key) + if (deltaMult === 0) continue + const after = before + deltaMult + + // Skip if presence doesn't flip + if ((before === 0) === (after === 0)) continue + + const retract = before === 0 // 0->!0 => retract, else (>0->0) emit + for (const [value, multiplicity] of this.#indexA.getIterator(key)) { + if (multiplicity !== 0) { + results.add( + [key, [value, null]], + retract ? -multiplicity : +multiplicity + ) } } } + } + } - // 3b) Left-side presence transitions (only if some LHS masses changed) - if (hasDeltaMassA) { - for (const [key, deltaMass] of deltaMassA) { - const before = this.#massA.get(key) || 0 - if (deltaMass === 0) continue - const after = before + deltaMass - - // Skip if presence doesn't flip (0->0, >0->different>0) - if ((before === 0) === (after === 0)) continue - - const it = this.#indexB.getIterator(key) - const retract = before === 0 // 0->!0 => retract, else (>0->0) emit - for (const [value, multiplicity] of it) { + private emitRightOuterResults( + deltaA: Index, + deltaB: Index, + results: MultiSet + ): void { + // Emit unmatched right rows from deltaB + if (deltaB.size > 0) { + for (const [key, valueIterator] of deltaB.entriesIterators()) { + const currentMultiplicityA = + this.#indexA.getConsolidatedMultiplicity(key) + const deltaMultiplicityA = deltaA.getConsolidatedMultiplicity(key) + const finalMultiplicityA = currentMultiplicityA + deltaMultiplicityA + + if (finalMultiplicityA === 0) { + for (const [value, multiplicity] of valueIterator) { if (multiplicity !== 0) { - results.add( - [key, [null, value]], - retract ? -multiplicity : +multiplicity - ) + results.add([key, [null, value]], multiplicity) } } } } } - // 4) Commit — update state - // IMPORTANT: All emissions use pre-append snapshots of indexA/indexB. - // For unmatched-on-delta (3a), use final presence (mass + deltaMass) to avoid churn. - // Append deltas and update masses only after all emissions. - this.#indexA.append(deltaA) - this.#indexB.append(deltaB) - - // Update masses and keep maps small by deleting zero entries - for (const [key, deltaMass] of deltaMassA) { - const next = (this.#massA.get(key) || 0) + deltaMass - if (next === 0) { - this.#massA.delete(key) - } else { - this.#massA.set(key, next) - } - } - for (const [key, deltaMass] of deltaMassB) { - const next = (this.#massB.get(key) || 0) + deltaMass - if (next === 0) { - this.#massB.delete(key) - } else { - this.#massB.set(key, next) + // Handle presence transitions from left side changes + if (deltaA.size > 0) { + for (const key of deltaA.getPresenceKeys()) { + const before = this.#indexA.getConsolidatedMultiplicity(key) + const deltaMult = deltaA.getConsolidatedMultiplicity(key) + if (deltaMult === 0) continue + const after = before + deltaMult + + // Skip if presence doesn't flip + if ((before === 0) === (after === 0)) continue + + const retract = before === 0 // 0->!0 => retract, else (>0->0) emit + for (const [value, multiplicity] of this.#indexB.getIterator(key)) { + if (multiplicity !== 0) { + results.add( + [key, [null, value]], + retract ? -multiplicity : +multiplicity + ) + } + } } } - - // Send results - if (results.getInner().length > 0) { - this.output.sendData(results) - } } } From 525621b74489b3677ddca6d04e791abe9d9541af Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Fri, 10 Oct 2025 13:43:40 +0100 Subject: [PATCH 7/7] additional comments --- packages/db-ivm/src/operators/join.ts | 29 ++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index dfe5422ee..36f710b2f 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -116,7 +116,8 @@ export class JoinOperator extends BinaryOperator< // Update state and send results // IMPORTANT: All emissions use pre-append snapshots of indexA/indexB. - // Append deltas to indices (consolidated multiplicity tracking is handled automatically) + // Now append ALL deltas to indices - this happens unconditionally for every key, + // regardless of whether presence flipped. Consolidated multiplicity tracking is automatic. this.#indexA.append(deltaA) this.#indexB.append(deltaB) @@ -168,15 +169,22 @@ export class JoinOperator extends BinaryOperator< if (deltaMult === 0) continue const after = before + deltaMult - // Skip if presence doesn't flip + // Skip transition handling if presence doesn't flip (both zero or both non-zero) + // Note: Index updates happen later regardless - we're only skipping null-extension emissions here if ((before === 0) === (after === 0)) continue - const retract = before === 0 // 0->!0 => retract, else (>0->0) emit + // Determine the type of transition: + // - 0 → non-zero: Right becomes non-empty, left rows transition from unmatched to matched + // → RETRACT previously emitted null-extended rows (emit with negative multiplicity) + // - non-zero → 0: Right becomes empty, left rows transition from matched to unmatched + // → EMIT new null-extended rows (emit with positive multiplicity) + const transitioningToMatched = before === 0 + for (const [value, multiplicity] of this.#indexA.getIterator(key)) { if (multiplicity !== 0) { results.add( [key, [value, null]], - retract ? -multiplicity : +multiplicity + transitioningToMatched ? -multiplicity : +multiplicity ) } } @@ -215,15 +223,22 @@ export class JoinOperator extends BinaryOperator< if (deltaMult === 0) continue const after = before + deltaMult - // Skip if presence doesn't flip + // Skip transition handling if presence doesn't flip (both zero or both non-zero) + // Note: Index updates happen later regardless - we're only skipping null-extension emissions here if ((before === 0) === (after === 0)) continue - const retract = before === 0 // 0->!0 => retract, else (>0->0) emit + // Determine the type of transition: + // - 0 → non-zero: Left becomes non-empty, right rows transition from unmatched to matched + // → RETRACT previously emitted null-extended rows (emit with negative multiplicity) + // - non-zero → 0: Left becomes empty, right rows transition from matched to unmatched + // → EMIT new null-extended rows (emit with positive multiplicity) + const transitioningToMatched = before === 0 + for (const [value, multiplicity] of this.#indexB.getIterator(key)) { if (multiplicity !== 0) { results.add( [key, [null, value]], - retract ? -multiplicity : +multiplicity + transitioningToMatched ? -multiplicity : +multiplicity ) } }