From 185a9866d5f4ced19442c36242aee767c32d99c9 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 6 Apr 2026 16:53:19 -0600 Subject: [PATCH 1/5] fix(db): propagate changes through nested toArray includes at depth 3+ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit flushIncludesState only flushed entries dirty from direct child changes (Phase 2) or single-level buffer drain (Phase 3). When only the deepest level changed (e.g., textDeltas in runs→texts→textDeltas), intermediate levels had no changes, so Phase 4 never triggered recursive flushing and deep buffer changes were permanently stranded. Add a third pass in Phase 4 that scans child registry entries for pending deep nested buffer changes via hasPendingIncludesChanges(). Entries with stranded buffers are recursively flushed and their correlation keys added to the inline re-emit set. Also prevent toArray()/concat(toArray()) from being silently wrapped as Value nodes inside expressions like coalesce() — throw a clear error at query construction time instead. Co-Authored-By: Claude Opus 4.6 --- packages/db/src/query/builder/functions.ts | 2 + packages/db/src/query/builder/ref-proxy.ts | 20 +- .../query/live/collection-config-builder.ts | 30 +- packages/db/tests/query/repro-bug2.test.ts | 892 ++++++++++++++++++ .../tests/query/repro-many-siblings.test.ts | 189 ++++ 5 files changed, 1130 insertions(+), 3 deletions(-) create mode 100644 packages/db/tests/query/repro-bug2.test.ts create mode 100644 packages/db/tests/query/repro-many-siblings.test.ts diff --git a/packages/db/src/query/builder/functions.ts b/packages/db/src/query/builder/functions.ts index 84e203589..9ab4e735d 100644 --- a/packages/db/src/query/builder/functions.ts +++ b/packages/db/src/query/builder/functions.ts @@ -437,12 +437,14 @@ export const operators = [ export type OperatorName = (typeof operators)[number] export class ToArrayWrapper<_T = unknown> { + readonly __brand = `ToArrayWrapper` as const declare readonly _type: `toArray` declare readonly _result: _T constructor(public readonly query: QueryBuilder) {} } export class ConcatToArrayWrapper<_T = unknown> { + readonly __brand = `ConcatToArrayWrapper` as const declare readonly _type: `concatToArray` declare readonly _result: _T constructor(public readonly query: QueryBuilder) {} diff --git a/packages/db/src/query/builder/ref-proxy.ts b/packages/db/src/query/builder/ref-proxy.ts index 3e3bea2a4..d7ea8d786 100644 --- a/packages/db/src/query/builder/ref-proxy.ts +++ b/packages/db/src/query/builder/ref-proxy.ts @@ -284,8 +284,10 @@ export function createRefProxyWithSelected>( } /** - * Converts a value to an Expression - * If it's a RefProxy, creates a Ref, otherwise creates a Value + * Converts a value to an Expression. + * If it's a RefProxy, creates a PropRef. Throws if the value is a + * ToArrayWrapper or ConcatToArrayWrapper (these must be used as direct + * select fields). Otherwise wraps it as a Value. */ export function toExpression(value: T): BasicExpression export function toExpression(value: RefProxy): BasicExpression @@ -293,6 +295,20 @@ export function toExpression(value: any): BasicExpression { if (isRefProxy(value)) { return new PropRef(value.__path) } + // toArray() and concat(toArray()) must be used as direct select fields, not inside expressions + if ( + value && + typeof value === `object` && + (value.__brand === `ToArrayWrapper` || + value.__brand === `ConcatToArrayWrapper`) + ) { + const name = + value.__brand === `ToArrayWrapper` ? `toArray()` : `concat(toArray())` + throw new Error( + `${name} cannot be used inside expressions (e.g., coalesce(), eq(), not()). ` + + `Use ${name} directly as a select field value instead.`, + ) + } // If it's already an Expression (Func, Ref, Value) or Agg, return it directly if ( value && diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 3a4e948e1..7353b2116 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1699,6 +1699,30 @@ function flushIncludesState( ) } } + // Finally: entries with deep nested buffer changes (grandchild-or-deeper buffers + // have pending data, but neither this level nor the immediate child level changed). + // Without this pass, changes at depth 3+ are stranded because drainNestedBuffers + // only drains one level and Phase 4 only flushes entries dirty from Phase 2/3. + const deepBufferDirty = new Set() + if (state.nestedSetups) { + for (const [correlationKey, entry] of state.childRegistry) { + if (entriesWithChildChanges.has(correlationKey)) continue + if (dirtyFromBuffers.has(correlationKey)) continue + if ( + entry.includesStates && + hasPendingIncludesChanges(entry.includesStates) + ) { + flushIncludesState( + entry.includesStates, + entry.collection, + entry.collection.id, + null, + entry.syncMethods, + ) + deepBufferDirty.add(correlationKey) + } + } + } // For inline materializations: re-emit affected parents with updated snapshots. // We mutate items in-place (so collection.get() reflects changes immediately) @@ -1707,7 +1731,11 @@ function flushIncludesState( // deepEquals, but in-place mutation means both sides reference the same // object, so the comparison always returns true and suppresses the event. const inlineReEmitKeys = materializesInline(state) - ? new Set([...(affectedCorrelationKeys || []), ...dirtyFromBuffers]) + ? new Set([ + ...(affectedCorrelationKeys || []), + ...dirtyFromBuffers, + ...deepBufferDirty, + ]) : null if (parentSyncMethods && inlineReEmitKeys && inlineReEmitKeys.size > 0) { const events: Array> = [] diff --git a/packages/db/tests/query/repro-bug2.test.ts b/packages/db/tests/query/repro-bug2.test.ts new file mode 100644 index 000000000..53ba96531 --- /dev/null +++ b/packages/db/tests/query/repro-bug2.test.ts @@ -0,0 +1,892 @@ +/** + * Repro for bugs reported against TanStack DB 0.6 + * Bug 1: coalesce(concat(toArray(...))) bypasses includes detection + * Bug 2: sequential inserts into toArray() child don't fully propagate + */ +import { describe, expect, it } from 'vitest' +import { + coalesce, + concat, + createLiveQueryCollection, + eq, + toArray, +} from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { localOnlyCollectionOptions } from '../../src/local-only.js' +import { mockSyncCollectionOptions } from '../utils.js' + +describe(`bug repro`, () => { + describe(`Bug 1: coalesce(concat(toArray(...)))`, () => { + it(`throws a clear error when concat(toArray()) is wrapped in coalesce()`, () => { + type Message = { id: number; role: string } + type Chunk = { + id: number + messageId: number + text: string + timestamp: number + } + + const messages = createCollection( + mockSyncCollectionOptions({ + id: `bug1-messages`, + getKey: (m) => m.id, + initialData: [{ id: 1, role: `assistant` }], + }), + ) + + const chunks = createCollection( + mockSyncCollectionOptions({ + id: `bug1-chunks`, + getKey: (c) => c.id, + initialData: [ + { id: 10, messageId: 1, text: `Hello`, timestamp: 1 }, + ], + }), + ) + + expect( + () => + createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: coalesce( + concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ) as any, + ``, + ), + })), + ), + ).toThrow(`concat(toArray()) cannot be used inside expressions`) + }) + + it(`toArray() wrapped in coalesce() also throws`, () => { + type Parent = { id: number } + type Child = { id: number; parentId: number } + + const parents = createCollection( + mockSyncCollectionOptions({ + id: `bug1b-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1 }], + }), + ) + + const children = createCollection( + mockSyncCollectionOptions({ + id: `bug1b-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + expect( + () => + createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: coalesce( + toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ id: c.id })), + ) as any, + [], + ), + })), + ), + ).toThrow(`toArray() cannot be used inside expressions`) + }) + }) + + describe(`Bug 2: sequential inserts into toArray child`, () => { + it(`second insert propagates (mockSync)`, async () => { + type Parent = { id: number; name: string } + type Child = { id: number; parentId: number; title: string } + + const parents = createCollection( + mockSyncCollectionOptions({ + id: `bug2a-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1, name: `Alpha` }], + }), + ) + + const children = createCollection( + mockSyncCollectionOptions({ + id: `bug2a-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ + id: c.id, + title: c.title, + })), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).items).toEqual([]) + + // First insert + children.utils.begin() + children.utils.write({ + type: `insert`, + value: { id: 10, parentId: 1, title: `First` }, + }) + children.utils.commit() + expect((collection.get(1) as any).items).toHaveLength(1) + + // Second insert + children.utils.begin() + children.utils.write({ + type: `insert`, + value: { id: 11, parentId: 1, title: `Second` }, + }) + children.utils.commit() + expect((collection.get(1) as any).items).toHaveLength(2) + }) + + it(`second insert propagates (localOnly + collection.insert)`, async () => { + type Parent = { id: number; name: string } + type Child = { id: number; parentId: number; title: string } + + const parents = createCollection( + localOnlyCollectionOptions({ + id: `bug2b-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1, name: `Alpha` }], + }), + ) + + const children = createCollection( + localOnlyCollectionOptions({ + id: `bug2b-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ + id: c.id, + title: c.title, + })), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).items).toEqual([]) + + // First insert via collection.insert() + children.insert({ id: 10, parentId: 1, title: `First` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).items).toHaveLength(1) + + // Second insert via collection.insert() + children.insert({ id: 11, parentId: 1, title: `Second` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).items).toHaveLength(2) + }) + + it(`second insert propagates via concat(toArray)`, async () => { + type Message = { id: number; role: string } + type Chunk = { + id: number + messageId: number + text: string + timestamp: number + } + + const messages = createCollection( + localOnlyCollectionOptions({ + id: `bug2c-messages`, + getKey: (m) => m.id, + initialData: [{ id: 1, role: `assistant` }], + }), + ) + + const chunks = createCollection( + localOnlyCollectionOptions({ + id: `bug2c-chunks`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).content).toBe(``) + + // First insert + chunks.insert({ + id: 10, + messageId: 1, + text: `Hello`, + timestamp: 1, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).content).toBe(`Hello`) + + // Second insert + chunks.insert({ + id: 11, + messageId: 1, + text: ` world`, + timestamp: 2, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).content).toBe(`Hello world`) + }) + + it(`second insert propagates through chained live query collections (darix pattern)`, async () => { + // This matches the darix entity-timeline pattern: + // Layer 1: raw collection → derived live query collection (adds synthetic key) + // Layer 2: derived collection → main query with toArray() includes + type RawDelta = { + key: string + text_id: string + delta: string + _seq: number + } + type DerivedDelta = { + key: string + text_id: string + timelineKey: string + order: number + delta: string + } + type Seed = { key: string } + + const TIMELINE_KEY = `timeline-1` + + // Raw source collection + const rawDeltas = createCollection( + localOnlyCollectionOptions({ + id: `chained-raw-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + // Layer 1: derived collection that adds timelineKey and renames _seq → order + const derivedDeltas = createLiveQueryCollection({ + id: `chained-derived-deltas`, + query: (q) => + q.from({ d: rawDeltas }).select(({ d }) => ({ + key: d.key, + text_id: d.text_id, + timelineKey: TIMELINE_KEY, + order: d._seq, + delta: d.delta, + })), + }) + + // Seed collection for singleton parent + const seeds = createCollection( + localOnlyCollectionOptions({ + id: `chained-seeds`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + // Layer 2: main query with toArray() include from derived collection + const collection = createLiveQueryCollection({ + query: (q) => + q.from({ s: seeds }).select(({ s }) => ({ + key: s.key, + deltas: toArray( + q + .from({ d: derivedDeltas }) + .where(({ d }) => eq(d.timelineKey, s.key)) + .orderBy(({ d }) => d.order) + .select(({ d }) => ({ + key: d.key, + delta: d.delta, + })), + ), + })), + }) + + await collection.preload() + const data = () => collection.get(TIMELINE_KEY) as any + + expect(data().deltas).toEqual([]) + + // First insert into raw collection + rawDeltas.insert({ key: `td-1`, text_id: `t-1`, delta: `Hello`, _seq: 1 }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().deltas).toHaveLength(1) + expect(data().deltas[0].delta).toBe(`Hello`) + + // Second insert — this is the critical path through chained collections + rawDeltas.insert({ key: `td-2`, text_id: `t-1`, delta: ` world`, _seq: 2 }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().deltas).toHaveLength(2) + }) + + it(`second insert propagates with multiple sibling toArray includes`, async () => { + type Seed = { key: string } + type Text = { key: string; seedKey: string; status: string } + type TextDelta = { + key: string + textId: string + seedKey: string + delta: string + seq: number + } + + const seeds = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-seeds`, + getKey: (s) => s.key, + initialData: [{ key: `seed-1` }], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-textDeltas`, + getKey: (td) => td.key, + initialData: [], + }), + ) + + // Singleton parent with multiple sibling toArray includes + const collection = createLiveQueryCollection((q) => + q.from({ s: seeds }).select(({ s }) => ({ + key: s.key, + texts: toArray( + q + .from({ t: texts }) + .where(({ t }) => eq(t.seedKey, s.key)) + .select(({ t }) => ({ + key: t.key, + status: t.status, + })), + ), + textDeltas: toArray( + q + .from({ td: textDeltas }) + .where(({ td }) => eq(td.seedKey, s.key)) + .orderBy(({ td }) => td.seq) + .select(({ td }) => ({ + key: td.key, + textId: td.textId, + delta: td.delta, + })), + ), + })), + ) + + await collection.preload() + + const data = () => collection.get(`seed-1`) as any + + // Insert text + texts.insert({ key: `text-1`, seedKey: `seed-1`, status: `streaming` }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().texts).toHaveLength(1) + + // Insert first delta + textDeltas.insert({ + key: `td-1`, + textId: `text-1`, + seedKey: `seed-1`, + delta: `Hello`, + seq: 1, + }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().textDeltas).toHaveLength(1) + expect(data().textDeltas[0].delta).toBe(`Hello`) + + // Insert second delta — this is the critical test + textDeltas.insert({ + key: `td-2`, + textId: `text-1`, + seedKey: `seed-1`, + delta: ` world`, + seq: 2, + }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().textDeltas).toHaveLength(2) + }) + }) + + describe(`Bug 3: nested toArray includes (runs -> texts -> concat(toArray(textDeltas)))`, () => { + it(`control: flat concat(toArray) propagates delta inserts`, async () => { + type Text = { key: string; _seq: number; status: string } + type TextDelta = { + key: string + text_id: string + _seq: number + delta: string + } + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `nested-ctrl-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `nested-ctrl-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection({ + id: `nested-ctrl-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + key: text.key, + order: coalesce(text._seq, -1), + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltas }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta._seq) + .select(({ delta }) => delta.delta), + ), + ), + })), + }) + + await collection.preload() + + texts.insert({ key: `text-1`, _seq: 1, status: `streaming` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(``) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + _seq: 2, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + _seq: 3, + delta: ` world`, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(`Hello world`) + }) + + it(`nested toArray(runs) -> toArray(texts) -> concat(toArray(textDeltas)) propagates`, async () => { + const TIMELINE_KEY = `tl-nested` + + type Seed = { key: string } + type Run = { key: string; _seq: number; status: string } + type Text = { + key: string + run_id: string + _seq: number + status: string + } + type TextDelta = { + key: string + text_id: string + run_id: string + _seq: number + delta: string + } + + const seed = createCollection( + localOnlyCollectionOptions({ + id: `nested-seed`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const runs = createCollection( + localOnlyCollectionOptions({ + id: `nested-runs`, + getKey: (r) => r.key, + initialData: [], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `nested-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `nested-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + // Layer 1: derived collections (matching darix pattern) + const runsLive = createLiveQueryCollection({ + id: `nested-runs-live`, + query: (q) => + q.from({ run: runs }).select(({ run }) => ({ + timelineKey: TIMELINE_KEY, + key: run.key, + order: coalesce(run._seq, -1), + status: run.status, + })), + }) + + const textsLive = createLiveQueryCollection({ + id: `nested-texts-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + timelineKey: TIMELINE_KEY, + key: text.key, + run_id: text.run_id, + order: coalesce(text._seq, -1), + status: text.status, + })), + }) + + const textDeltasLive = createLiveQueryCollection({ + id: `nested-deltas-live`, + query: (q) => + q.from({ delta: textDeltas }).select(({ delta }) => ({ + timelineKey: TIMELINE_KEY, + key: delta.key, + text_id: delta.text_id, + run_id: delta.run_id, + order: coalesce(delta._seq, -1), + delta: delta.delta, + })), + }) + + // Layer 2: main query with nested includes + const timeline = createLiveQueryCollection({ + id: `nested-timeline`, + query: (q) => + q.from({ s: seed }).select(({ s }) => ({ + key: s.key, + runs: toArray( + q + .from({ run: runsLive }) + .where(({ run }) => eq(run.timelineKey, s.key)) + .orderBy(({ run }) => run.order) + .select(({ run }) => ({ + key: run.key, + order: run.order, + status: run.status, + texts: toArray( + q + .from({ text: textsLive }) + .where(({ text }) => eq(text.run_id, run.key)) + .orderBy(({ text }) => text.order) + .select(({ text }) => ({ + key: text.key, + run_id: text.run_id, + order: text.order, + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltasLive }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta.order) + .select(({ delta }) => delta.delta), + ), + ), + })), + ), + })), + ), + })), + }) + + await timeline.preload() + + const data = () => timeline.get(TIMELINE_KEY) as any + + // Insert run + text + runs.insert({ key: `run-1`, _seq: 1, status: `started` }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + _seq: 2, + status: `streaming`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(1) + expect(data().runs[0].texts).toHaveLength(1) + expect(data().runs[0].texts[0].text).toBe(``) + + // First textDelta + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + run_id: `run-1`, + _seq: 3, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().runs[0].texts[0].text).toBe(`Hello`) + + // Second textDelta + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + run_id: `run-1`, + _seq: 4, + delta: ` world`, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().runs[0].texts[0].text).toBe(`Hello world`) + }) + + it(`deep buffer change for one parent does not emit spurious update for sibling parent`, async () => { + const TIMELINE_KEY = `tl-spurious` + + type Seed = { key: string } + type Run = { key: string; _seq: number; status: string } + type Text = { + key: string + run_id: string + _seq: number + status: string + } + type TextDelta = { + key: string + text_id: string + run_id: string + _seq: number + delta: string + } + + const seed = createCollection( + localOnlyCollectionOptions({ + id: `spurious-seed`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const runs = createCollection( + localOnlyCollectionOptions({ + id: `spurious-runs`, + getKey: (r) => r.key, + initialData: [], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `spurious-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `spurious-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + // Layer 1: derived collections + const runsLive = createLiveQueryCollection({ + id: `spurious-runs-live`, + query: (q) => + q.from({ run: runs }).select(({ run }) => ({ + timelineKey: TIMELINE_KEY, + key: run.key, + order: coalesce(run._seq, -1), + status: run.status, + })), + }) + + const textsLive = createLiveQueryCollection({ + id: `spurious-texts-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + timelineKey: TIMELINE_KEY, + key: text.key, + run_id: text.run_id, + order: coalesce(text._seq, -1), + status: text.status, + })), + }) + + const textDeltasLive = createLiveQueryCollection({ + id: `spurious-deltas-live`, + query: (q) => + q.from({ delta: textDeltas }).select(({ delta }) => ({ + timelineKey: TIMELINE_KEY, + key: delta.key, + text_id: delta.text_id, + run_id: delta.run_id, + order: coalesce(delta._seq, -1), + delta: delta.delta, + })), + }) + + // Layer 2: main query with nested includes + const timeline = createLiveQueryCollection({ + id: `spurious-timeline`, + query: (q) => + q.from({ s: seed }).select(({ s }) => ({ + key: s.key, + runs: toArray( + q + .from({ run: runsLive }) + .where(({ run }) => eq(run.timelineKey, s.key)) + .orderBy(({ run }) => run.order) + .select(({ run }) => ({ + key: run.key, + order: run.order, + status: run.status, + texts: toArray( + q + .from({ text: textsLive }) + .where(({ text }) => eq(text.run_id, run.key)) + .orderBy(({ text }) => text.order) + .select(({ text }) => ({ + key: text.key, + run_id: text.run_id, + order: text.order, + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltasLive }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta.order) + .select(({ delta }) => delta.delta), + ), + ), + })), + ), + })), + ), + })), + }) + + await timeline.preload() + + const data = () => timeline.get(TIMELINE_KEY) as any + + // Insert TWO runs, each with their own text + runs.insert({ key: `run-1`, _seq: 1, status: `started` }) + runs.insert({ key: `run-2`, _seq: 2, status: `started` }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + _seq: 3, + status: `streaming`, + }) + texts.insert({ + key: `text-2`, + run_id: `run-2`, + _seq: 4, + status: `streaming`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(2) + expect(data().runs[0].texts[0].text).toBe(``) + expect(data().runs[1].texts[0].text).toBe(``) + + // Capture the timeline row reference BEFORE the delta insert + const timelineRowBefore = data() + const run1TextsBefore = timelineRowBefore.runs[0].texts + // Track update events on the timeline collection + const updateEvents: Array = [] + timeline.subscribeChanges((changes) => { + for (const change of changes) { + if (change.type === `update`) { + updateEvents.push(change) + } + } + }) + + // Insert a textDelta ONLY for run-2's text + textDeltas.insert({ + key: `td-1`, + text_id: `text-2`, + run_id: `run-2`, + _seq: 5, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 100)) + + // Verify run-2's text updated correctly + expect(data().runs[1].texts[0].text).toBe(`Hello`) + // Verify run-1's text is still empty + expect(data().runs[0].texts[0].text).toBe(``) + + // The critical check: only ONE update event should fire (for the timeline row). + // If the deep-buffer pass marks unrelated parents dirty, we'd see multiple + // updates or the runs[0].texts array would be unnecessarily re-materialized. + // Check that run-1's texts array reference is unchanged (not re-materialized) + expect(data().runs[0].texts).toBe(run1TextsBefore) + }) + }) +}) diff --git a/packages/db/tests/query/repro-many-siblings.test.ts b/packages/db/tests/query/repro-many-siblings.test.ts new file mode 100644 index 000000000..c8f95b8c8 --- /dev/null +++ b/packages/db/tests/query/repro-many-siblings.test.ts @@ -0,0 +1,189 @@ +/** + * Repro for Bug 2: Many sibling toArray includes with chained derived collections. + * Matches the exact darix entity-timeline query pattern. + */ +import { describe, expect, it } from 'vitest' +import { + coalesce, + createLiveQueryCollection, + eq, + toArray, +} from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import type { SyncConfig } from '../../src/types.js' + +const TIMELINE_KEY = `timeline` + +function createSyncCollection( + id: string, + getKey: (item: T) => string | number, +) { + let syncBegin: () => void + let syncWrite: (msg: { type: string; value: T }) => void + let syncCommit: () => void + + const collection = createCollection({ + id, + getKey, + sync: { + sync: (params: any) => { + syncBegin = params.begin + syncWrite = params.write + syncCommit = params.commit + params.markReady() + return () => {} + }, + } as SyncConfig, + startSync: true, + gcTime: 0, + }) + + return { + collection, + insert(value: T) { + syncBegin!() + syncWrite!({ type: `insert`, value }) + syncCommit!() + }, + } +} + +type RawItem = { key: string; _seq: number; [k: string]: unknown } + +function createDerivedCollection( + id: string, + source: ReturnType>[`collection`], + extraFields?: (d: any) => Record, +) { + return createLiveQueryCollection({ + id: `${id}:derived`, + query: (q: any) => + q.from({ d: source }).select(({ d }: any) => ({ + timelineKey: TIMELINE_KEY, + key: d.key, + order: coalesce(d._seq, -1), + ...(extraFields ? extraFields(d) : {}), + })), + }) +} + +describe(`many sibling toArray includes`, () => { + it(`second insert propagates with 5 sibling chained toArray includes`, async () => { + // Raw source collections + const runs = createSyncCollection(`raw-runs`, (r) => r.key) + const texts = createSyncCollection(`raw-texts`, (r) => r.key) + const textDeltas = createSyncCollection(`raw-textDeltas`, (r) => r.key) + const toolCalls = createSyncCollection(`raw-toolCalls`, (r) => r.key) + const steps = createSyncCollection(`raw-steps`, (r) => r.key) + + // Layer 1: derived collections + const derivedRuns = createDerivedCollection(`runs`, runs.collection, (d) => ({ + status: d.status, + })) + const derivedTexts = createDerivedCollection(`texts`, texts.collection, (d) => ({ + run_id: d.run_id, + status: d.status, + })) + const derivedTextDeltas = createDerivedCollection(`textDeltas`, textDeltas.collection, (d) => ({ + text_id: d.text_id, + run_id: d.run_id, + delta: d.delta, + })) + const derivedToolCalls = createDerivedCollection(`toolCalls`, toolCalls.collection, (d) => ({ + run_id: d.run_id, + tool_name: d.tool_name, + })) + const derivedSteps = createDerivedCollection(`steps`, steps.collection, (d) => ({ + run_id: d.run_id, + step_number: d.step_number, + })) + + // Seed collection + const seeds = createCollection({ + id: `seed`, + getKey: (s: { key: string }) => s.key, + sync: { + sync: (params: any) => { + params.begin() + params.write({ type: `insert`, value: { key: TIMELINE_KEY } }) + params.commit() + params.markReady() + return () => {} + }, + } as SyncConfig<{ key: string }>, + startSync: true, + gcTime: 0, + }) + + // Layer 2: main query with many sibling includes + const collection = createLiveQueryCollection({ + query: (q: any) => + q.from({ s: seeds }).select(({ s }: any) => ({ + key: s.key, + runs: toArray( + q + .from({ r: derivedRuns }) + .where(({ r }: any) => eq(r.timelineKey, s.key)) + .orderBy(({ r }: any) => r.order) + .select(({ r }: any) => ({ key: r.key, status: r.status })), + ), + texts: toArray( + q + .from({ t: derivedTexts }) + .where(({ t }: any) => eq(t.timelineKey, s.key)) + .orderBy(({ t }: any) => t.order) + .select(({ t }: any) => ({ key: t.key, run_id: t.run_id, status: t.status })), + ), + textDeltas: toArray( + q + .from({ td: derivedTextDeltas }) + .where(({ td }: any) => eq(td.timelineKey, s.key)) + .orderBy(({ td }: any) => td.order) + .select(({ td }: any) => ({ + key: td.key, + text_id: td.text_id, + delta: td.delta, + })), + ), + toolCalls: toArray( + q + .from({ tc: derivedToolCalls }) + .where(({ tc }: any) => eq(tc.timelineKey, s.key)) + .orderBy(({ tc }: any) => tc.order) + .select(({ tc }: any) => ({ key: tc.key, tool_name: tc.tool_name })), + ), + steps: toArray( + q + .from({ st: derivedSteps }) + .where(({ st }: any) => eq(st.timelineKey, s.key)) + .orderBy(({ st }: any) => st.order) + .select(({ st }: any) => ({ key: st.key, step_number: st.step_number })), + ), + })), + }) + + await collection.preload() + + const data = () => collection.get(TIMELINE_KEY) + + // Insert run + text + runs.insert({ key: `run-1`, status: `started`, _seq: 1 }) + texts.insert({ key: `text-1`, run_id: `run-1`, status: `streaming`, _seq: 2 }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(1) + expect(data().texts).toHaveLength(1) + expect(data().textDeltas).toHaveLength(0) + + // First textDelta + textDeltas.insert({ key: `td-1`, text_id: `text-1`, run_id: `run-1`, delta: `Hello`, _seq: 3 }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().textDeltas).toHaveLength(1) + expect(data().textDeltas[0].delta).toBe(`Hello`) + + // Second textDelta — the critical test + textDeltas.insert({ key: `td-2`, text_id: `text-1`, run_id: `run-1`, delta: ` world`, _seq: 4 }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().textDeltas).toHaveLength(2) + }) +}) From 759c788513c63aa8f92518ebab144f9c41b3c7af Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 6 Apr 2026 16:53:55 -0600 Subject: [PATCH 2/5] chore: add changeset for nested includes fix Co-Authored-By: Claude Opus 4.6 --- .changeset/fix-nested-includes-propagation.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-nested-includes-propagation.md diff --git a/.changeset/fix-nested-includes-propagation.md b/.changeset/fix-nested-includes-propagation.md new file mode 100644 index 000000000..1fa6e00c1 --- /dev/null +++ b/.changeset/fix-nested-includes-propagation.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': patch +--- + +Fix nested `toArray()` includes not propagating changes at depth 3+. When a query used nested includes like `toArray(runs) → toArray(texts) → concat(toArray(textDeltas))`, changes to the deepest level (e.g., inserting a textDelta) were silently lost because `flushIncludesState` only drained one level of nested buffers. Also throw a clear error when `toArray()` or `concat(toArray())` is used inside expressions like `coalesce()`, instead of silently producing incorrect results. From 18b1a15022b44833ff88712c70a7a73dc10f185b Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 6 Apr 2026 22:55:44 +0000 Subject: [PATCH 3/5] ci: apply automated fixes --- packages/db/tests/query/repro-bug2.test.ts | 79 ++++++------- .../tests/query/repro-many-siblings.test.ts | 109 +++++++++++++----- 2 files changed, 121 insertions(+), 67 deletions(-) diff --git a/packages/db/tests/query/repro-bug2.test.ts b/packages/db/tests/query/repro-bug2.test.ts index 53ba96531..83c0edd34 100644 --- a/packages/db/tests/query/repro-bug2.test.ts +++ b/packages/db/tests/query/repro-bug2.test.ts @@ -38,31 +38,28 @@ describe(`bug repro`, () => { mockSyncCollectionOptions({ id: `bug1-chunks`, getKey: (c) => c.id, - initialData: [ - { id: 10, messageId: 1, text: `Hello`, timestamp: 1 }, - ], + initialData: [{ id: 10, messageId: 1, text: `Hello`, timestamp: 1 }], }), ) - expect( - () => - createLiveQueryCollection((q) => - q.from({ m: messages }).select(({ m }) => ({ - id: m.id, - content: coalesce( - concat( - toArray( - q - .from({ c: chunks }) - .where(({ c }) => eq(c.messageId, m.id)) - .orderBy(({ c }) => c.timestamp) - .select(({ c }) => c.text), - ), - ) as any, - ``, - ), - })), - ), + expect(() => + createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: coalesce( + concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ) as any, + ``, + ), + })), + ), ).toThrow(`concat(toArray()) cannot be used inside expressions`) }) @@ -86,22 +83,21 @@ describe(`bug repro`, () => { }), ) - expect( - () => - createLiveQueryCollection((q) => - q.from({ p: parents }).select(({ p }) => ({ - id: p.id, - items: coalesce( - toArray( - q - .from({ c: children }) - .where(({ c }) => eq(c.parentId, p.id)) - .select(({ c }) => ({ id: c.id })), - ) as any, - [], - ), - })), - ), + expect(() => + createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: coalesce( + toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ id: c.id })), + ) as any, + [], + ), + })), + ), ).toThrow(`toArray() cannot be used inside expressions`) }) }) @@ -359,7 +355,12 @@ describe(`bug repro`, () => { expect(data().deltas[0].delta).toBe(`Hello`) // Second insert — this is the critical path through chained collections - rawDeltas.insert({ key: `td-2`, text_id: `t-1`, delta: ` world`, _seq: 2 }) + rawDeltas.insert({ + key: `td-2`, + text_id: `t-1`, + delta: ` world`, + _seq: 2, + }) await new Promise((r) => setTimeout(r, 100)) expect(data().deltas).toHaveLength(2) }) diff --git a/packages/db/tests/query/repro-many-siblings.test.ts b/packages/db/tests/query/repro-many-siblings.test.ts index c8f95b8c8..bd1190496 100644 --- a/packages/db/tests/query/repro-many-siblings.test.ts +++ b/packages/db/tests/query/repro-many-siblings.test.ts @@ -72,31 +72,57 @@ describe(`many sibling toArray includes`, () => { // Raw source collections const runs = createSyncCollection(`raw-runs`, (r) => r.key) const texts = createSyncCollection(`raw-texts`, (r) => r.key) - const textDeltas = createSyncCollection(`raw-textDeltas`, (r) => r.key) - const toolCalls = createSyncCollection(`raw-toolCalls`, (r) => r.key) + const textDeltas = createSyncCollection( + `raw-textDeltas`, + (r) => r.key, + ) + const toolCalls = createSyncCollection( + `raw-toolCalls`, + (r) => r.key, + ) const steps = createSyncCollection(`raw-steps`, (r) => r.key) // Layer 1: derived collections - const derivedRuns = createDerivedCollection(`runs`, runs.collection, (d) => ({ - status: d.status, - })) - const derivedTexts = createDerivedCollection(`texts`, texts.collection, (d) => ({ - run_id: d.run_id, - status: d.status, - })) - const derivedTextDeltas = createDerivedCollection(`textDeltas`, textDeltas.collection, (d) => ({ - text_id: d.text_id, - run_id: d.run_id, - delta: d.delta, - })) - const derivedToolCalls = createDerivedCollection(`toolCalls`, toolCalls.collection, (d) => ({ - run_id: d.run_id, - tool_name: d.tool_name, - })) - const derivedSteps = createDerivedCollection(`steps`, steps.collection, (d) => ({ - run_id: d.run_id, - step_number: d.step_number, - })) + const derivedRuns = createDerivedCollection( + `runs`, + runs.collection, + (d) => ({ + status: d.status, + }), + ) + const derivedTexts = createDerivedCollection( + `texts`, + texts.collection, + (d) => ({ + run_id: d.run_id, + status: d.status, + }), + ) + const derivedTextDeltas = createDerivedCollection( + `textDeltas`, + textDeltas.collection, + (d) => ({ + text_id: d.text_id, + run_id: d.run_id, + delta: d.delta, + }), + ) + const derivedToolCalls = createDerivedCollection( + `toolCalls`, + toolCalls.collection, + (d) => ({ + run_id: d.run_id, + tool_name: d.tool_name, + }), + ) + const derivedSteps = createDerivedCollection( + `steps`, + steps.collection, + (d) => ({ + run_id: d.run_id, + step_number: d.step_number, + }), + ) // Seed collection const seeds = createCollection({ @@ -132,7 +158,11 @@ describe(`many sibling toArray includes`, () => { .from({ t: derivedTexts }) .where(({ t }: any) => eq(t.timelineKey, s.key)) .orderBy(({ t }: any) => t.order) - .select(({ t }: any) => ({ key: t.key, run_id: t.run_id, status: t.status })), + .select(({ t }: any) => ({ + key: t.key, + run_id: t.run_id, + status: t.status, + })), ), textDeltas: toArray( q @@ -150,14 +180,20 @@ describe(`many sibling toArray includes`, () => { .from({ tc: derivedToolCalls }) .where(({ tc }: any) => eq(tc.timelineKey, s.key)) .orderBy(({ tc }: any) => tc.order) - .select(({ tc }: any) => ({ key: tc.key, tool_name: tc.tool_name })), + .select(({ tc }: any) => ({ + key: tc.key, + tool_name: tc.tool_name, + })), ), steps: toArray( q .from({ st: derivedSteps }) .where(({ st }: any) => eq(st.timelineKey, s.key)) .orderBy(({ st }: any) => st.order) - .select(({ st }: any) => ({ key: st.key, step_number: st.step_number })), + .select(({ st }: any) => ({ + key: st.key, + step_number: st.step_number, + })), ), })), }) @@ -168,7 +204,12 @@ describe(`many sibling toArray includes`, () => { // Insert run + text runs.insert({ key: `run-1`, status: `started`, _seq: 1 }) - texts.insert({ key: `text-1`, run_id: `run-1`, status: `streaming`, _seq: 2 }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + status: `streaming`, + _seq: 2, + }) await new Promise((r) => setTimeout(r, 100)) expect(data().runs).toHaveLength(1) @@ -176,13 +217,25 @@ describe(`many sibling toArray includes`, () => { expect(data().textDeltas).toHaveLength(0) // First textDelta - textDeltas.insert({ key: `td-1`, text_id: `text-1`, run_id: `run-1`, delta: `Hello`, _seq: 3 }) + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + run_id: `run-1`, + delta: `Hello`, + _seq: 3, + }) await new Promise((r) => setTimeout(r, 100)) expect(data().textDeltas).toHaveLength(1) expect(data().textDeltas[0].delta).toBe(`Hello`) // Second textDelta — the critical test - textDeltas.insert({ key: `td-2`, text_id: `text-1`, run_id: `run-1`, delta: ` world`, _seq: 4 }) + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + run_id: `run-1`, + delta: ` world`, + _seq: 4, + }) await new Promise((r) => setTimeout(r, 100)) expect(data().textDeltas).toHaveLength(2) }) From 55bacf89a7e4a0ee82f186b28f1234e9fb3ab948 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 6 Apr 2026 18:42:04 -0600 Subject: [PATCH 4/5] fix: resolve TypeScript errors in chained collection test Remove explicit generic parameter from createLiveQueryCollection and use `any` casts for query/select callbacks in the chained darix pattern test, matching the style used in other test cases. Co-Authored-By: Claude Opus 4.6 --- packages/db/tests/query/repro-bug2.test.ts | 25 ++++++++-------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/packages/db/tests/query/repro-bug2.test.ts b/packages/db/tests/query/repro-bug2.test.ts index 83c0edd34..f5897038e 100644 --- a/packages/db/tests/query/repro-bug2.test.ts +++ b/packages/db/tests/query/repro-bug2.test.ts @@ -283,13 +283,6 @@ describe(`bug repro`, () => { delta: string _seq: number } - type DerivedDelta = { - key: string - text_id: string - timelineKey: string - order: number - delta: string - } type Seed = { key: string } const TIMELINE_KEY = `timeline-1` @@ -304,10 +297,10 @@ describe(`bug repro`, () => { ) // Layer 1: derived collection that adds timelineKey and renames _seq → order - const derivedDeltas = createLiveQueryCollection({ + const derivedDeltas = createLiveQueryCollection({ id: `chained-derived-deltas`, - query: (q) => - q.from({ d: rawDeltas }).select(({ d }) => ({ + query: (q: any) => + q.from({ d: rawDeltas }).select(({ d }: any) => ({ key: d.key, text_id: d.text_id, timelineKey: TIMELINE_KEY, @@ -327,15 +320,15 @@ describe(`bug repro`, () => { // Layer 2: main query with toArray() include from derived collection const collection = createLiveQueryCollection({ - query: (q) => - q.from({ s: seeds }).select(({ s }) => ({ + query: (q: any) => + q.from({ s: seeds }).select(({ s }: any) => ({ key: s.key, deltas: toArray( q .from({ d: derivedDeltas }) - .where(({ d }) => eq(d.timelineKey, s.key)) - .orderBy(({ d }) => d.order) - .select(({ d }) => ({ + .where(({ d }: any) => eq(d.timelineKey, s.key)) + .orderBy(({ d }: any) => d.order) + .select(({ d }: any) => ({ key: d.key, delta: d.delta, })), @@ -344,7 +337,7 @@ describe(`bug repro`, () => { }) await collection.preload() - const data = () => collection.get(TIMELINE_KEY) as any + const data = () => collection.get(TIMELINE_KEY) expect(data().deltas).toEqual([]) From 96d3b6f44aa97b894c1d947cd3064db3af49abc0 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Mon, 6 Apr 2026 19:08:15 -0600 Subject: [PATCH 5/5] refactor: move repro tests into existing includes.test.ts Co-Authored-By: Claude Opus 4.6 --- packages/db/tests/query/includes.test.ts | 1063 +++++++++++++++++ packages/db/tests/query/repro-bug2.test.ts | 886 -------------- .../tests/query/repro-many-siblings.test.ts | 242 ---- 3 files changed, 1063 insertions(+), 1128 deletions(-) delete mode 100644 packages/db/tests/query/repro-bug2.test.ts delete mode 100644 packages/db/tests/query/repro-many-siblings.test.ts diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index 9d923bdfa..a0b490bdf 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { and, + coalesce, concat, count, createLiveQueryCollection, @@ -9,7 +10,9 @@ import { } from '../../src/query/index.js' import { createCollection } from '../../src/collection/index.js' import { CleanupQueue } from '../../src/collection/cleanup-queue.js' +import { localOnlyCollectionOptions } from '../../src/local-only.js' import { mockSyncCollectionOptions, stripVirtualProps } from '../utils.js' +import type { SyncConfig } from '../../src/types.js' type Project = { id: number @@ -4061,4 +4064,1064 @@ describe(`includes subqueries`, () => { ]) }) }) + + describe(`toArray/concat(toArray) inside expressions throws`, () => { + it(`throws a clear error when concat(toArray()) is wrapped in coalesce()`, () => { + type Message = { id: number; role: string } + type Chunk = { + id: number + messageId: number + text: string + timestamp: number + } + + const messages = createCollection( + mockSyncCollectionOptions({ + id: `bug1-messages`, + getKey: (m) => m.id, + initialData: [{ id: 1, role: `assistant` }], + }), + ) + + const chunks = createCollection( + mockSyncCollectionOptions({ + id: `bug1-chunks`, + getKey: (c) => c.id, + initialData: [{ id: 10, messageId: 1, text: `Hello`, timestamp: 1 }], + }), + ) + + expect(() => + createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: coalesce( + concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ) as any, + ``, + ), + })), + ), + ).toThrow(`concat(toArray()) cannot be used inside expressions`) + }) + + it(`toArray() wrapped in coalesce() also throws`, () => { + type Parent = { id: number } + type Child = { id: number; parentId: number } + + const parents = createCollection( + mockSyncCollectionOptions({ + id: `bug1b-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1 }], + }), + ) + + const children = createCollection( + mockSyncCollectionOptions({ + id: `bug1b-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + expect(() => + createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: coalesce( + toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ id: c.id })), + ) as any, + [], + ), + })), + ), + ).toThrow(`toArray() cannot be used inside expressions`) + }) + }) + + describe(`sequential inserts into toArray child`, () => { + it(`second insert propagates (mockSync)`, async () => { + type Parent = { id: number; name: string } + type Child = { id: number; parentId: number; title: string } + + const parents = createCollection( + mockSyncCollectionOptions({ + id: `bug2a-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1, name: `Alpha` }], + }), + ) + + const children = createCollection( + mockSyncCollectionOptions({ + id: `bug2a-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ + id: c.id, + title: c.title, + })), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).items).toEqual([]) + + // First insert + children.utils.begin() + children.utils.write({ + type: `insert`, + value: { id: 10, parentId: 1, title: `First` }, + }) + children.utils.commit() + expect((collection.get(1) as any).items).toHaveLength(1) + + // Second insert + children.utils.begin() + children.utils.write({ + type: `insert`, + value: { id: 11, parentId: 1, title: `Second` }, + }) + children.utils.commit() + expect((collection.get(1) as any).items).toHaveLength(2) + }) + + it(`second insert propagates (localOnly + collection.insert)`, async () => { + type Parent = { id: number; name: string } + type Child = { id: number; parentId: number; title: string } + + const parents = createCollection( + localOnlyCollectionOptions({ + id: `bug2b-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1, name: `Alpha` }], + }), + ) + + const children = createCollection( + localOnlyCollectionOptions({ + id: `bug2b-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ + id: c.id, + title: c.title, + })), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).items).toEqual([]) + + // First insert via collection.insert() + children.insert({ id: 10, parentId: 1, title: `First` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).items).toHaveLength(1) + + // Second insert via collection.insert() + children.insert({ id: 11, parentId: 1, title: `Second` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).items).toHaveLength(2) + }) + + it(`second insert propagates via concat(toArray)`, async () => { + type Message = { id: number; role: string } + type Chunk = { + id: number + messageId: number + text: string + timestamp: number + } + + const messages = createCollection( + localOnlyCollectionOptions({ + id: `bug2c-messages`, + getKey: (m) => m.id, + initialData: [{ id: 1, role: `assistant` }], + }), + ) + + const chunks = createCollection( + localOnlyCollectionOptions({ + id: `bug2c-chunks`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).content).toBe(``) + + // First insert + chunks.insert({ + id: 10, + messageId: 1, + text: `Hello`, + timestamp: 1, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).content).toBe(`Hello`) + + // Second insert + chunks.insert({ + id: 11, + messageId: 1, + text: ` world`, + timestamp: 2, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).content).toBe(`Hello world`) + }) + + it(`second insert propagates through chained live query collections (darix pattern)`, async () => { + type RawDelta = { + key: string + text_id: string + delta: string + _seq: number + } + type Seed = { key: string } + + const TIMELINE_KEY = `timeline-1` + + const rawDeltas = createCollection( + localOnlyCollectionOptions({ + id: `chained-raw-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const derivedDeltas = createLiveQueryCollection({ + id: `chained-derived-deltas`, + query: (q: any) => + q.from({ d: rawDeltas }).select(({ d }: any) => ({ + key: d.key, + text_id: d.text_id, + timelineKey: TIMELINE_KEY, + order: d._seq, + delta: d.delta, + })), + }) + + const seeds = createCollection( + localOnlyCollectionOptions({ + id: `chained-seeds`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const collection = createLiveQueryCollection({ + query: (q: any) => + q.from({ s: seeds }).select(({ s }: any) => ({ + key: s.key, + deltas: toArray( + q + .from({ d: derivedDeltas }) + .where(({ d }: any) => eq(d.timelineKey, s.key)) + .orderBy(({ d }: any) => d.order) + .select(({ d }: any) => ({ + key: d.key, + delta: d.delta, + })), + ), + })), + }) + + await collection.preload() + const data = () => collection.get(TIMELINE_KEY) + + expect(data().deltas).toEqual([]) + + rawDeltas.insert({ key: `td-1`, text_id: `t-1`, delta: `Hello`, _seq: 1 }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().deltas).toHaveLength(1) + expect(data().deltas[0].delta).toBe(`Hello`) + + rawDeltas.insert({ + key: `td-2`, + text_id: `t-1`, + delta: ` world`, + _seq: 2, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().deltas).toHaveLength(2) + }) + + it(`second insert propagates with multiple sibling toArray includes`, async () => { + type Seed = { key: string } + type Text = { key: string; seedKey: string; status: string } + type TextDelta = { + key: string + textId: string + seedKey: string + delta: string + seq: number + } + + const seeds = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-seeds`, + getKey: (s) => s.key, + initialData: [{ key: `seed-1` }], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-textDeltas`, + getKey: (td) => td.key, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ s: seeds }).select(({ s }) => ({ + key: s.key, + texts: toArray( + q + .from({ t: texts }) + .where(({ t }) => eq(t.seedKey, s.key)) + .select(({ t }) => ({ + key: t.key, + status: t.status, + })), + ), + textDeltas: toArray( + q + .from({ td: textDeltas }) + .where(({ td }) => eq(td.seedKey, s.key)) + .orderBy(({ td }) => td.seq) + .select(({ td }) => ({ + key: td.key, + textId: td.textId, + delta: td.delta, + })), + ), + })), + ) + + await collection.preload() + + const data = () => collection.get(`seed-1`) as any + + texts.insert({ key: `text-1`, seedKey: `seed-1`, status: `streaming` }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().texts).toHaveLength(1) + + textDeltas.insert({ + key: `td-1`, + textId: `text-1`, + seedKey: `seed-1`, + delta: `Hello`, + seq: 1, + }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().textDeltas).toHaveLength(1) + expect(data().textDeltas[0].delta).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + textId: `text-1`, + seedKey: `seed-1`, + delta: ` world`, + seq: 2, + }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().textDeltas).toHaveLength(2) + }) + }) + + describe(`nested toArray includes (depth 3+)`, () => { + it(`control: flat concat(toArray) propagates delta inserts`, async () => { + type Text = { key: string; _seq: number; status: string } + type TextDelta = { + key: string + text_id: string + _seq: number + delta: string + } + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `nested-ctrl-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `nested-ctrl-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection({ + id: `nested-ctrl-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + key: text.key, + order: coalesce(text._seq, -1), + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltas }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta._seq) + .select(({ delta }) => delta.delta), + ), + ), + })), + }) + + await collection.preload() + + texts.insert({ key: `text-1`, _seq: 1, status: `streaming` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(``) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + _seq: 2, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + _seq: 3, + delta: ` world`, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(`Hello world`) + }) + + it(`nested toArray(runs) -> toArray(texts) -> concat(toArray(textDeltas)) propagates`, async () => { + const TIMELINE_KEY = `tl-nested` + + type Seed = { key: string } + type Run = { key: string; _seq: number; status: string } + type Text = { + key: string + run_id: string + _seq: number + status: string + } + type TextDelta = { + key: string + text_id: string + run_id: string + _seq: number + delta: string + } + + const seed = createCollection( + localOnlyCollectionOptions({ + id: `nested-seed`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const runs = createCollection( + localOnlyCollectionOptions({ + id: `nested-runs`, + getKey: (r) => r.key, + initialData: [], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `nested-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `nested-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const runsLive = createLiveQueryCollection({ + id: `nested-runs-live`, + query: (q) => + q.from({ run: runs }).select(({ run }) => ({ + timelineKey: TIMELINE_KEY, + key: run.key, + order: coalesce(run._seq, -1), + status: run.status, + })), + }) + + const textsLive = createLiveQueryCollection({ + id: `nested-texts-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + timelineKey: TIMELINE_KEY, + key: text.key, + run_id: text.run_id, + order: coalesce(text._seq, -1), + status: text.status, + })), + }) + + const textDeltasLive = createLiveQueryCollection({ + id: `nested-deltas-live`, + query: (q) => + q.from({ delta: textDeltas }).select(({ delta }) => ({ + timelineKey: TIMELINE_KEY, + key: delta.key, + text_id: delta.text_id, + run_id: delta.run_id, + order: coalesce(delta._seq, -1), + delta: delta.delta, + })), + }) + + const timeline = createLiveQueryCollection({ + id: `nested-timeline`, + query: (q) => + q.from({ s: seed }).select(({ s }) => ({ + key: s.key, + runs: toArray( + q + .from({ run: runsLive }) + .where(({ run }) => eq(run.timelineKey, s.key)) + .orderBy(({ run }) => run.order) + .select(({ run }) => ({ + key: run.key, + order: run.order, + status: run.status, + texts: toArray( + q + .from({ text: textsLive }) + .where(({ text }) => eq(text.run_id, run.key)) + .orderBy(({ text }) => text.order) + .select(({ text }) => ({ + key: text.key, + run_id: text.run_id, + order: text.order, + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltasLive }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta.order) + .select(({ delta }) => delta.delta), + ), + ), + })), + ), + })), + ), + })), + }) + + await timeline.preload() + + const data = () => timeline.get(TIMELINE_KEY) as any + + runs.insert({ key: `run-1`, _seq: 1, status: `started` }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + _seq: 2, + status: `streaming`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(1) + expect(data().runs[0].texts).toHaveLength(1) + expect(data().runs[0].texts[0].text).toBe(``) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + run_id: `run-1`, + _seq: 3, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().runs[0].texts[0].text).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + run_id: `run-1`, + _seq: 4, + delta: ` world`, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().runs[0].texts[0].text).toBe(`Hello world`) + }) + + it(`deep buffer change for one parent does not emit spurious update for sibling parent`, async () => { + const TIMELINE_KEY = `tl-spurious` + + type Seed = { key: string } + type Run = { key: string; _seq: number; status: string } + type Text = { + key: string + run_id: string + _seq: number + status: string + } + type TextDelta = { + key: string + text_id: string + run_id: string + _seq: number + delta: string + } + + const seed = createCollection( + localOnlyCollectionOptions({ + id: `spurious-seed`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const runs = createCollection( + localOnlyCollectionOptions({ + id: `spurious-runs`, + getKey: (r) => r.key, + initialData: [], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `spurious-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `spurious-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const runsLive = createLiveQueryCollection({ + id: `spurious-runs-live`, + query: (q) => + q.from({ run: runs }).select(({ run }) => ({ + timelineKey: TIMELINE_KEY, + key: run.key, + order: coalesce(run._seq, -1), + status: run.status, + })), + }) + + const textsLive = createLiveQueryCollection({ + id: `spurious-texts-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + timelineKey: TIMELINE_KEY, + key: text.key, + run_id: text.run_id, + order: coalesce(text._seq, -1), + status: text.status, + })), + }) + + const textDeltasLive = createLiveQueryCollection({ + id: `spurious-deltas-live`, + query: (q) => + q.from({ delta: textDeltas }).select(({ delta }) => ({ + timelineKey: TIMELINE_KEY, + key: delta.key, + text_id: delta.text_id, + run_id: delta.run_id, + order: coalesce(delta._seq, -1), + delta: delta.delta, + })), + }) + + const timeline = createLiveQueryCollection({ + id: `spurious-timeline`, + query: (q) => + q.from({ s: seed }).select(({ s }) => ({ + key: s.key, + runs: toArray( + q + .from({ run: runsLive }) + .where(({ run }) => eq(run.timelineKey, s.key)) + .orderBy(({ run }) => run.order) + .select(({ run }) => ({ + key: run.key, + order: run.order, + status: run.status, + texts: toArray( + q + .from({ text: textsLive }) + .where(({ text }) => eq(text.run_id, run.key)) + .orderBy(({ text }) => text.order) + .select(({ text }) => ({ + key: text.key, + run_id: text.run_id, + order: text.order, + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltasLive }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta.order) + .select(({ delta }) => delta.delta), + ), + ), + })), + ), + })), + ), + })), + }) + + await timeline.preload() + + const data = () => timeline.get(TIMELINE_KEY) as any + + runs.insert({ key: `run-1`, _seq: 1, status: `started` }) + runs.insert({ key: `run-2`, _seq: 2, status: `started` }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + _seq: 3, + status: `streaming`, + }) + texts.insert({ + key: `text-2`, + run_id: `run-2`, + _seq: 4, + status: `streaming`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(2) + expect(data().runs[0].texts[0].text).toBe(``) + expect(data().runs[1].texts[0].text).toBe(``) + + const timelineRowBefore = data() + const run1TextsBefore = timelineRowBefore.runs[0].texts + const updateEvents: Array = [] + timeline.subscribeChanges((changes) => { + for (const change of changes) { + if (change.type === `update`) { + updateEvents.push(change) + } + } + }) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-2`, + run_id: `run-2`, + _seq: 5, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs[1].texts[0].text).toBe(`Hello`) + expect(data().runs[0].texts[0].text).toBe(``) + + expect(data().runs[0].texts).toBe(run1TextsBefore) + }) + }) + + describe(`many sibling toArray includes with chained derived collections`, () => { + function createSyncCollection( + id: string, + getKey: (item: T) => string | number, + ) { + let syncBegin: () => void + let syncWrite: (msg: { type: string; value: T }) => void + let syncCommit: () => void + + const collection = createCollection({ + id, + getKey, + sync: { + sync: (params: any) => { + syncBegin = params.begin + syncWrite = params.write + syncCommit = params.commit + params.markReady() + return () => {} + }, + } as SyncConfig, + startSync: true, + gcTime: 0, + }) + + return { + collection, + insert(value: T) { + syncBegin!() + syncWrite!({ type: `insert`, value }) + syncCommit!() + }, + } + } + + const TIMELINE_KEY = `timeline` + + type RawItem = { key: string; _seq: number; [k: string]: unknown } + + function createDerivedCollection( + id: string, + source: ReturnType>[`collection`], + extraFields?: (d: any) => Record, + ) { + return createLiveQueryCollection({ + id: `${id}:derived`, + query: (q: any) => + q.from({ d: source }).select(({ d }: any) => ({ + timelineKey: TIMELINE_KEY, + key: d.key, + order: coalesce(d._seq, -1), + ...(extraFields ? extraFields(d) : {}), + })), + }) + } + + it(`second insert propagates with 5 sibling chained toArray includes`, async () => { + const runs = createSyncCollection(`raw-runs`, (r) => r.key) + const texts = createSyncCollection(`raw-texts`, (r) => r.key) + const textDeltas = createSyncCollection( + `raw-textDeltas`, + (r) => r.key, + ) + const toolCalls = createSyncCollection( + `raw-toolCalls`, + (r) => r.key, + ) + const steps = createSyncCollection(`raw-steps`, (r) => r.key) + + const derivedRuns = createDerivedCollection( + `runs`, + runs.collection, + (d) => ({ + status: d.status, + }), + ) + const derivedTexts = createDerivedCollection( + `texts`, + texts.collection, + (d) => ({ + run_id: d.run_id, + status: d.status, + }), + ) + const derivedTextDeltas = createDerivedCollection( + `textDeltas`, + textDeltas.collection, + (d) => ({ + text_id: d.text_id, + run_id: d.run_id, + delta: d.delta, + }), + ) + const derivedToolCalls = createDerivedCollection( + `toolCalls`, + toolCalls.collection, + (d) => ({ + run_id: d.run_id, + tool_name: d.tool_name, + }), + ) + const derivedSteps = createDerivedCollection( + `steps`, + steps.collection, + (d) => ({ + run_id: d.run_id, + step_number: d.step_number, + }), + ) + + const seeds = createCollection({ + id: `seed`, + getKey: (s: { key: string }) => s.key, + sync: { + sync: (params: any) => { + params.begin() + params.write({ type: `insert`, value: { key: TIMELINE_KEY } }) + params.commit() + params.markReady() + return () => {} + }, + } as SyncConfig<{ key: string }>, + startSync: true, + gcTime: 0, + }) + + const collection = createLiveQueryCollection({ + query: (q: any) => + q.from({ s: seeds }).select(({ s }: any) => ({ + key: s.key, + runs: toArray( + q + .from({ r: derivedRuns }) + .where(({ r }: any) => eq(r.timelineKey, s.key)) + .orderBy(({ r }: any) => r.order) + .select(({ r }: any) => ({ key: r.key, status: r.status })), + ), + texts: toArray( + q + .from({ t: derivedTexts }) + .where(({ t }: any) => eq(t.timelineKey, s.key)) + .orderBy(({ t }: any) => t.order) + .select(({ t }: any) => ({ + key: t.key, + run_id: t.run_id, + status: t.status, + })), + ), + textDeltas: toArray( + q + .from({ td: derivedTextDeltas }) + .where(({ td }: any) => eq(td.timelineKey, s.key)) + .orderBy(({ td }: any) => td.order) + .select(({ td }: any) => ({ + key: td.key, + text_id: td.text_id, + delta: td.delta, + })), + ), + toolCalls: toArray( + q + .from({ tc: derivedToolCalls }) + .where(({ tc }: any) => eq(tc.timelineKey, s.key)) + .orderBy(({ tc }: any) => tc.order) + .select(({ tc }: any) => ({ + key: tc.key, + tool_name: tc.tool_name, + })), + ), + steps: toArray( + q + .from({ st: derivedSteps }) + .where(({ st }: any) => eq(st.timelineKey, s.key)) + .orderBy(({ st }: any) => st.order) + .select(({ st }: any) => ({ + key: st.key, + step_number: st.step_number, + })), + ), + })), + }) + + await collection.preload() + + const data = () => collection.get(TIMELINE_KEY) + + runs.insert({ key: `run-1`, status: `started`, _seq: 1 }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + status: `streaming`, + _seq: 2, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(1) + expect(data().texts).toHaveLength(1) + expect(data().textDeltas).toHaveLength(0) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + run_id: `run-1`, + delta: `Hello`, + _seq: 3, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().textDeltas).toHaveLength(1) + expect(data().textDeltas[0].delta).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + run_id: `run-1`, + delta: ` world`, + _seq: 4, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().textDeltas).toHaveLength(2) + }) + }) }) diff --git a/packages/db/tests/query/repro-bug2.test.ts b/packages/db/tests/query/repro-bug2.test.ts deleted file mode 100644 index f5897038e..000000000 --- a/packages/db/tests/query/repro-bug2.test.ts +++ /dev/null @@ -1,886 +0,0 @@ -/** - * Repro for bugs reported against TanStack DB 0.6 - * Bug 1: coalesce(concat(toArray(...))) bypasses includes detection - * Bug 2: sequential inserts into toArray() child don't fully propagate - */ -import { describe, expect, it } from 'vitest' -import { - coalesce, - concat, - createLiveQueryCollection, - eq, - toArray, -} from '../../src/query/index.js' -import { createCollection } from '../../src/collection/index.js' -import { localOnlyCollectionOptions } from '../../src/local-only.js' -import { mockSyncCollectionOptions } from '../utils.js' - -describe(`bug repro`, () => { - describe(`Bug 1: coalesce(concat(toArray(...)))`, () => { - it(`throws a clear error when concat(toArray()) is wrapped in coalesce()`, () => { - type Message = { id: number; role: string } - type Chunk = { - id: number - messageId: number - text: string - timestamp: number - } - - const messages = createCollection( - mockSyncCollectionOptions({ - id: `bug1-messages`, - getKey: (m) => m.id, - initialData: [{ id: 1, role: `assistant` }], - }), - ) - - const chunks = createCollection( - mockSyncCollectionOptions({ - id: `bug1-chunks`, - getKey: (c) => c.id, - initialData: [{ id: 10, messageId: 1, text: `Hello`, timestamp: 1 }], - }), - ) - - expect(() => - createLiveQueryCollection((q) => - q.from({ m: messages }).select(({ m }) => ({ - id: m.id, - content: coalesce( - concat( - toArray( - q - .from({ c: chunks }) - .where(({ c }) => eq(c.messageId, m.id)) - .orderBy(({ c }) => c.timestamp) - .select(({ c }) => c.text), - ), - ) as any, - ``, - ), - })), - ), - ).toThrow(`concat(toArray()) cannot be used inside expressions`) - }) - - it(`toArray() wrapped in coalesce() also throws`, () => { - type Parent = { id: number } - type Child = { id: number; parentId: number } - - const parents = createCollection( - mockSyncCollectionOptions({ - id: `bug1b-parents`, - getKey: (p) => p.id, - initialData: [{ id: 1 }], - }), - ) - - const children = createCollection( - mockSyncCollectionOptions({ - id: `bug1b-children`, - getKey: (c) => c.id, - initialData: [], - }), - ) - - expect(() => - createLiveQueryCollection((q) => - q.from({ p: parents }).select(({ p }) => ({ - id: p.id, - items: coalesce( - toArray( - q - .from({ c: children }) - .where(({ c }) => eq(c.parentId, p.id)) - .select(({ c }) => ({ id: c.id })), - ) as any, - [], - ), - })), - ), - ).toThrow(`toArray() cannot be used inside expressions`) - }) - }) - - describe(`Bug 2: sequential inserts into toArray child`, () => { - it(`second insert propagates (mockSync)`, async () => { - type Parent = { id: number; name: string } - type Child = { id: number; parentId: number; title: string } - - const parents = createCollection( - mockSyncCollectionOptions({ - id: `bug2a-parents`, - getKey: (p) => p.id, - initialData: [{ id: 1, name: `Alpha` }], - }), - ) - - const children = createCollection( - mockSyncCollectionOptions({ - id: `bug2a-children`, - getKey: (c) => c.id, - initialData: [], - }), - ) - - const collection = createLiveQueryCollection((q) => - q.from({ p: parents }).select(({ p }) => ({ - id: p.id, - items: toArray( - q - .from({ c: children }) - .where(({ c }) => eq(c.parentId, p.id)) - .select(({ c }) => ({ - id: c.id, - title: c.title, - })), - ), - })), - ) - - await collection.preload() - expect((collection.get(1) as any).items).toEqual([]) - - // First insert - children.utils.begin() - children.utils.write({ - type: `insert`, - value: { id: 10, parentId: 1, title: `First` }, - }) - children.utils.commit() - expect((collection.get(1) as any).items).toHaveLength(1) - - // Second insert - children.utils.begin() - children.utils.write({ - type: `insert`, - value: { id: 11, parentId: 1, title: `Second` }, - }) - children.utils.commit() - expect((collection.get(1) as any).items).toHaveLength(2) - }) - - it(`second insert propagates (localOnly + collection.insert)`, async () => { - type Parent = { id: number; name: string } - type Child = { id: number; parentId: number; title: string } - - const parents = createCollection( - localOnlyCollectionOptions({ - id: `bug2b-parents`, - getKey: (p) => p.id, - initialData: [{ id: 1, name: `Alpha` }], - }), - ) - - const children = createCollection( - localOnlyCollectionOptions({ - id: `bug2b-children`, - getKey: (c) => c.id, - initialData: [], - }), - ) - - const collection = createLiveQueryCollection((q) => - q.from({ p: parents }).select(({ p }) => ({ - id: p.id, - items: toArray( - q - .from({ c: children }) - .where(({ c }) => eq(c.parentId, p.id)) - .select(({ c }) => ({ - id: c.id, - title: c.title, - })), - ), - })), - ) - - await collection.preload() - expect((collection.get(1) as any).items).toEqual([]) - - // First insert via collection.insert() - children.insert({ id: 10, parentId: 1, title: `First` }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(1) as any).items).toHaveLength(1) - - // Second insert via collection.insert() - children.insert({ id: 11, parentId: 1, title: `Second` }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(1) as any).items).toHaveLength(2) - }) - - it(`second insert propagates via concat(toArray)`, async () => { - type Message = { id: number; role: string } - type Chunk = { - id: number - messageId: number - text: string - timestamp: number - } - - const messages = createCollection( - localOnlyCollectionOptions({ - id: `bug2c-messages`, - getKey: (m) => m.id, - initialData: [{ id: 1, role: `assistant` }], - }), - ) - - const chunks = createCollection( - localOnlyCollectionOptions({ - id: `bug2c-chunks`, - getKey: (c) => c.id, - initialData: [], - }), - ) - - const collection = createLiveQueryCollection((q) => - q.from({ m: messages }).select(({ m }) => ({ - id: m.id, - content: concat( - toArray( - q - .from({ c: chunks }) - .where(({ c }) => eq(c.messageId, m.id)) - .orderBy(({ c }) => c.timestamp) - .select(({ c }) => c.text), - ), - ), - })), - ) - - await collection.preload() - expect((collection.get(1) as any).content).toBe(``) - - // First insert - chunks.insert({ - id: 10, - messageId: 1, - text: `Hello`, - timestamp: 1, - }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(1) as any).content).toBe(`Hello`) - - // Second insert - chunks.insert({ - id: 11, - messageId: 1, - text: ` world`, - timestamp: 2, - }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(1) as any).content).toBe(`Hello world`) - }) - - it(`second insert propagates through chained live query collections (darix pattern)`, async () => { - // This matches the darix entity-timeline pattern: - // Layer 1: raw collection → derived live query collection (adds synthetic key) - // Layer 2: derived collection → main query with toArray() includes - type RawDelta = { - key: string - text_id: string - delta: string - _seq: number - } - type Seed = { key: string } - - const TIMELINE_KEY = `timeline-1` - - // Raw source collection - const rawDeltas = createCollection( - localOnlyCollectionOptions({ - id: `chained-raw-deltas`, - getKey: (d) => d.key, - initialData: [], - }), - ) - - // Layer 1: derived collection that adds timelineKey and renames _seq → order - const derivedDeltas = createLiveQueryCollection({ - id: `chained-derived-deltas`, - query: (q: any) => - q.from({ d: rawDeltas }).select(({ d }: any) => ({ - key: d.key, - text_id: d.text_id, - timelineKey: TIMELINE_KEY, - order: d._seq, - delta: d.delta, - })), - }) - - // Seed collection for singleton parent - const seeds = createCollection( - localOnlyCollectionOptions({ - id: `chained-seeds`, - getKey: (s) => s.key, - initialData: [{ key: TIMELINE_KEY }], - }), - ) - - // Layer 2: main query with toArray() include from derived collection - const collection = createLiveQueryCollection({ - query: (q: any) => - q.from({ s: seeds }).select(({ s }: any) => ({ - key: s.key, - deltas: toArray( - q - .from({ d: derivedDeltas }) - .where(({ d }: any) => eq(d.timelineKey, s.key)) - .orderBy(({ d }: any) => d.order) - .select(({ d }: any) => ({ - key: d.key, - delta: d.delta, - })), - ), - })), - }) - - await collection.preload() - const data = () => collection.get(TIMELINE_KEY) - - expect(data().deltas).toEqual([]) - - // First insert into raw collection - rawDeltas.insert({ key: `td-1`, text_id: `t-1`, delta: `Hello`, _seq: 1 }) - await new Promise((r) => setTimeout(r, 100)) - expect(data().deltas).toHaveLength(1) - expect(data().deltas[0].delta).toBe(`Hello`) - - // Second insert — this is the critical path through chained collections - rawDeltas.insert({ - key: `td-2`, - text_id: `t-1`, - delta: ` world`, - _seq: 2, - }) - await new Promise((r) => setTimeout(r, 100)) - expect(data().deltas).toHaveLength(2) - }) - - it(`second insert propagates with multiple sibling toArray includes`, async () => { - type Seed = { key: string } - type Text = { key: string; seedKey: string; status: string } - type TextDelta = { - key: string - textId: string - seedKey: string - delta: string - seq: number - } - - const seeds = createCollection( - localOnlyCollectionOptions({ - id: `bug2d-seeds`, - getKey: (s) => s.key, - initialData: [{ key: `seed-1` }], - }), - ) - - const texts = createCollection( - localOnlyCollectionOptions({ - id: `bug2d-texts`, - getKey: (t) => t.key, - initialData: [], - }), - ) - - const textDeltas = createCollection( - localOnlyCollectionOptions({ - id: `bug2d-textDeltas`, - getKey: (td) => td.key, - initialData: [], - }), - ) - - // Singleton parent with multiple sibling toArray includes - const collection = createLiveQueryCollection((q) => - q.from({ s: seeds }).select(({ s }) => ({ - key: s.key, - texts: toArray( - q - .from({ t: texts }) - .where(({ t }) => eq(t.seedKey, s.key)) - .select(({ t }) => ({ - key: t.key, - status: t.status, - })), - ), - textDeltas: toArray( - q - .from({ td: textDeltas }) - .where(({ td }) => eq(td.seedKey, s.key)) - .orderBy(({ td }) => td.seq) - .select(({ td }) => ({ - key: td.key, - textId: td.textId, - delta: td.delta, - })), - ), - })), - ) - - await collection.preload() - - const data = () => collection.get(`seed-1`) as any - - // Insert text - texts.insert({ key: `text-1`, seedKey: `seed-1`, status: `streaming` }) - await new Promise((r) => setTimeout(r, 50)) - expect(data().texts).toHaveLength(1) - - // Insert first delta - textDeltas.insert({ - key: `td-1`, - textId: `text-1`, - seedKey: `seed-1`, - delta: `Hello`, - seq: 1, - }) - await new Promise((r) => setTimeout(r, 50)) - expect(data().textDeltas).toHaveLength(1) - expect(data().textDeltas[0].delta).toBe(`Hello`) - - // Insert second delta — this is the critical test - textDeltas.insert({ - key: `td-2`, - textId: `text-1`, - seedKey: `seed-1`, - delta: ` world`, - seq: 2, - }) - await new Promise((r) => setTimeout(r, 50)) - expect(data().textDeltas).toHaveLength(2) - }) - }) - - describe(`Bug 3: nested toArray includes (runs -> texts -> concat(toArray(textDeltas)))`, () => { - it(`control: flat concat(toArray) propagates delta inserts`, async () => { - type Text = { key: string; _seq: number; status: string } - type TextDelta = { - key: string - text_id: string - _seq: number - delta: string - } - - const texts = createCollection( - localOnlyCollectionOptions({ - id: `nested-ctrl-texts`, - getKey: (t) => t.key, - initialData: [], - }), - ) - - const textDeltas = createCollection( - localOnlyCollectionOptions({ - id: `nested-ctrl-deltas`, - getKey: (d) => d.key, - initialData: [], - }), - ) - - const collection = createLiveQueryCollection({ - id: `nested-ctrl-live`, - query: (q) => - q.from({ text: texts }).select(({ text }) => ({ - key: text.key, - order: coalesce(text._seq, -1), - status: text.status, - text: concat( - toArray( - q - .from({ delta: textDeltas }) - .where(({ delta }) => eq(delta.text_id, text.key)) - .orderBy(({ delta }) => delta._seq) - .select(({ delta }) => delta.delta), - ), - ), - })), - }) - - await collection.preload() - - texts.insert({ key: `text-1`, _seq: 1, status: `streaming` }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(`text-1`) as any)?.text).toBe(``) - - textDeltas.insert({ - key: `td-1`, - text_id: `text-1`, - _seq: 2, - delta: `Hello`, - }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(`text-1`) as any)?.text).toBe(`Hello`) - - textDeltas.insert({ - key: `td-2`, - text_id: `text-1`, - _seq: 3, - delta: ` world`, - }) - await new Promise((r) => setTimeout(r, 50)) - expect((collection.get(`text-1`) as any)?.text).toBe(`Hello world`) - }) - - it(`nested toArray(runs) -> toArray(texts) -> concat(toArray(textDeltas)) propagates`, async () => { - const TIMELINE_KEY = `tl-nested` - - type Seed = { key: string } - type Run = { key: string; _seq: number; status: string } - type Text = { - key: string - run_id: string - _seq: number - status: string - } - type TextDelta = { - key: string - text_id: string - run_id: string - _seq: number - delta: string - } - - const seed = createCollection( - localOnlyCollectionOptions({ - id: `nested-seed`, - getKey: (s) => s.key, - initialData: [{ key: TIMELINE_KEY }], - }), - ) - - const runs = createCollection( - localOnlyCollectionOptions({ - id: `nested-runs`, - getKey: (r) => r.key, - initialData: [], - }), - ) - - const texts = createCollection( - localOnlyCollectionOptions({ - id: `nested-texts`, - getKey: (t) => t.key, - initialData: [], - }), - ) - - const textDeltas = createCollection( - localOnlyCollectionOptions({ - id: `nested-deltas`, - getKey: (d) => d.key, - initialData: [], - }), - ) - - // Layer 1: derived collections (matching darix pattern) - const runsLive = createLiveQueryCollection({ - id: `nested-runs-live`, - query: (q) => - q.from({ run: runs }).select(({ run }) => ({ - timelineKey: TIMELINE_KEY, - key: run.key, - order: coalesce(run._seq, -1), - status: run.status, - })), - }) - - const textsLive = createLiveQueryCollection({ - id: `nested-texts-live`, - query: (q) => - q.from({ text: texts }).select(({ text }) => ({ - timelineKey: TIMELINE_KEY, - key: text.key, - run_id: text.run_id, - order: coalesce(text._seq, -1), - status: text.status, - })), - }) - - const textDeltasLive = createLiveQueryCollection({ - id: `nested-deltas-live`, - query: (q) => - q.from({ delta: textDeltas }).select(({ delta }) => ({ - timelineKey: TIMELINE_KEY, - key: delta.key, - text_id: delta.text_id, - run_id: delta.run_id, - order: coalesce(delta._seq, -1), - delta: delta.delta, - })), - }) - - // Layer 2: main query with nested includes - const timeline = createLiveQueryCollection({ - id: `nested-timeline`, - query: (q) => - q.from({ s: seed }).select(({ s }) => ({ - key: s.key, - runs: toArray( - q - .from({ run: runsLive }) - .where(({ run }) => eq(run.timelineKey, s.key)) - .orderBy(({ run }) => run.order) - .select(({ run }) => ({ - key: run.key, - order: run.order, - status: run.status, - texts: toArray( - q - .from({ text: textsLive }) - .where(({ text }) => eq(text.run_id, run.key)) - .orderBy(({ text }) => text.order) - .select(({ text }) => ({ - key: text.key, - run_id: text.run_id, - order: text.order, - status: text.status, - text: concat( - toArray( - q - .from({ delta: textDeltasLive }) - .where(({ delta }) => eq(delta.text_id, text.key)) - .orderBy(({ delta }) => delta.order) - .select(({ delta }) => delta.delta), - ), - ), - })), - ), - })), - ), - })), - }) - - await timeline.preload() - - const data = () => timeline.get(TIMELINE_KEY) as any - - // Insert run + text - runs.insert({ key: `run-1`, _seq: 1, status: `started` }) - texts.insert({ - key: `text-1`, - run_id: `run-1`, - _seq: 2, - status: `streaming`, - }) - await new Promise((r) => setTimeout(r, 100)) - - expect(data().runs).toHaveLength(1) - expect(data().runs[0].texts).toHaveLength(1) - expect(data().runs[0].texts[0].text).toBe(``) - - // First textDelta - textDeltas.insert({ - key: `td-1`, - text_id: `text-1`, - run_id: `run-1`, - _seq: 3, - delta: `Hello`, - }) - await new Promise((r) => setTimeout(r, 100)) - expect(data().runs[0].texts[0].text).toBe(`Hello`) - - // Second textDelta - textDeltas.insert({ - key: `td-2`, - text_id: `text-1`, - run_id: `run-1`, - _seq: 4, - delta: ` world`, - }) - await new Promise((r) => setTimeout(r, 100)) - expect(data().runs[0].texts[0].text).toBe(`Hello world`) - }) - - it(`deep buffer change for one parent does not emit spurious update for sibling parent`, async () => { - const TIMELINE_KEY = `tl-spurious` - - type Seed = { key: string } - type Run = { key: string; _seq: number; status: string } - type Text = { - key: string - run_id: string - _seq: number - status: string - } - type TextDelta = { - key: string - text_id: string - run_id: string - _seq: number - delta: string - } - - const seed = createCollection( - localOnlyCollectionOptions({ - id: `spurious-seed`, - getKey: (s) => s.key, - initialData: [{ key: TIMELINE_KEY }], - }), - ) - - const runs = createCollection( - localOnlyCollectionOptions({ - id: `spurious-runs`, - getKey: (r) => r.key, - initialData: [], - }), - ) - - const texts = createCollection( - localOnlyCollectionOptions({ - id: `spurious-texts`, - getKey: (t) => t.key, - initialData: [], - }), - ) - - const textDeltas = createCollection( - localOnlyCollectionOptions({ - id: `spurious-deltas`, - getKey: (d) => d.key, - initialData: [], - }), - ) - - // Layer 1: derived collections - const runsLive = createLiveQueryCollection({ - id: `spurious-runs-live`, - query: (q) => - q.from({ run: runs }).select(({ run }) => ({ - timelineKey: TIMELINE_KEY, - key: run.key, - order: coalesce(run._seq, -1), - status: run.status, - })), - }) - - const textsLive = createLiveQueryCollection({ - id: `spurious-texts-live`, - query: (q) => - q.from({ text: texts }).select(({ text }) => ({ - timelineKey: TIMELINE_KEY, - key: text.key, - run_id: text.run_id, - order: coalesce(text._seq, -1), - status: text.status, - })), - }) - - const textDeltasLive = createLiveQueryCollection({ - id: `spurious-deltas-live`, - query: (q) => - q.from({ delta: textDeltas }).select(({ delta }) => ({ - timelineKey: TIMELINE_KEY, - key: delta.key, - text_id: delta.text_id, - run_id: delta.run_id, - order: coalesce(delta._seq, -1), - delta: delta.delta, - })), - }) - - // Layer 2: main query with nested includes - const timeline = createLiveQueryCollection({ - id: `spurious-timeline`, - query: (q) => - q.from({ s: seed }).select(({ s }) => ({ - key: s.key, - runs: toArray( - q - .from({ run: runsLive }) - .where(({ run }) => eq(run.timelineKey, s.key)) - .orderBy(({ run }) => run.order) - .select(({ run }) => ({ - key: run.key, - order: run.order, - status: run.status, - texts: toArray( - q - .from({ text: textsLive }) - .where(({ text }) => eq(text.run_id, run.key)) - .orderBy(({ text }) => text.order) - .select(({ text }) => ({ - key: text.key, - run_id: text.run_id, - order: text.order, - status: text.status, - text: concat( - toArray( - q - .from({ delta: textDeltasLive }) - .where(({ delta }) => eq(delta.text_id, text.key)) - .orderBy(({ delta }) => delta.order) - .select(({ delta }) => delta.delta), - ), - ), - })), - ), - })), - ), - })), - }) - - await timeline.preload() - - const data = () => timeline.get(TIMELINE_KEY) as any - - // Insert TWO runs, each with their own text - runs.insert({ key: `run-1`, _seq: 1, status: `started` }) - runs.insert({ key: `run-2`, _seq: 2, status: `started` }) - texts.insert({ - key: `text-1`, - run_id: `run-1`, - _seq: 3, - status: `streaming`, - }) - texts.insert({ - key: `text-2`, - run_id: `run-2`, - _seq: 4, - status: `streaming`, - }) - await new Promise((r) => setTimeout(r, 100)) - - expect(data().runs).toHaveLength(2) - expect(data().runs[0].texts[0].text).toBe(``) - expect(data().runs[1].texts[0].text).toBe(``) - - // Capture the timeline row reference BEFORE the delta insert - const timelineRowBefore = data() - const run1TextsBefore = timelineRowBefore.runs[0].texts - // Track update events on the timeline collection - const updateEvents: Array = [] - timeline.subscribeChanges((changes) => { - for (const change of changes) { - if (change.type === `update`) { - updateEvents.push(change) - } - } - }) - - // Insert a textDelta ONLY for run-2's text - textDeltas.insert({ - key: `td-1`, - text_id: `text-2`, - run_id: `run-2`, - _seq: 5, - delta: `Hello`, - }) - await new Promise((r) => setTimeout(r, 100)) - - // Verify run-2's text updated correctly - expect(data().runs[1].texts[0].text).toBe(`Hello`) - // Verify run-1's text is still empty - expect(data().runs[0].texts[0].text).toBe(``) - - // The critical check: only ONE update event should fire (for the timeline row). - // If the deep-buffer pass marks unrelated parents dirty, we'd see multiple - // updates or the runs[0].texts array would be unnecessarily re-materialized. - // Check that run-1's texts array reference is unchanged (not re-materialized) - expect(data().runs[0].texts).toBe(run1TextsBefore) - }) - }) -}) diff --git a/packages/db/tests/query/repro-many-siblings.test.ts b/packages/db/tests/query/repro-many-siblings.test.ts deleted file mode 100644 index bd1190496..000000000 --- a/packages/db/tests/query/repro-many-siblings.test.ts +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Repro for Bug 2: Many sibling toArray includes with chained derived collections. - * Matches the exact darix entity-timeline query pattern. - */ -import { describe, expect, it } from 'vitest' -import { - coalesce, - createLiveQueryCollection, - eq, - toArray, -} from '../../src/query/index.js' -import { createCollection } from '../../src/collection/index.js' -import type { SyncConfig } from '../../src/types.js' - -const TIMELINE_KEY = `timeline` - -function createSyncCollection( - id: string, - getKey: (item: T) => string | number, -) { - let syncBegin: () => void - let syncWrite: (msg: { type: string; value: T }) => void - let syncCommit: () => void - - const collection = createCollection({ - id, - getKey, - sync: { - sync: (params: any) => { - syncBegin = params.begin - syncWrite = params.write - syncCommit = params.commit - params.markReady() - return () => {} - }, - } as SyncConfig, - startSync: true, - gcTime: 0, - }) - - return { - collection, - insert(value: T) { - syncBegin!() - syncWrite!({ type: `insert`, value }) - syncCommit!() - }, - } -} - -type RawItem = { key: string; _seq: number; [k: string]: unknown } - -function createDerivedCollection( - id: string, - source: ReturnType>[`collection`], - extraFields?: (d: any) => Record, -) { - return createLiveQueryCollection({ - id: `${id}:derived`, - query: (q: any) => - q.from({ d: source }).select(({ d }: any) => ({ - timelineKey: TIMELINE_KEY, - key: d.key, - order: coalesce(d._seq, -1), - ...(extraFields ? extraFields(d) : {}), - })), - }) -} - -describe(`many sibling toArray includes`, () => { - it(`second insert propagates with 5 sibling chained toArray includes`, async () => { - // Raw source collections - const runs = createSyncCollection(`raw-runs`, (r) => r.key) - const texts = createSyncCollection(`raw-texts`, (r) => r.key) - const textDeltas = createSyncCollection( - `raw-textDeltas`, - (r) => r.key, - ) - const toolCalls = createSyncCollection( - `raw-toolCalls`, - (r) => r.key, - ) - const steps = createSyncCollection(`raw-steps`, (r) => r.key) - - // Layer 1: derived collections - const derivedRuns = createDerivedCollection( - `runs`, - runs.collection, - (d) => ({ - status: d.status, - }), - ) - const derivedTexts = createDerivedCollection( - `texts`, - texts.collection, - (d) => ({ - run_id: d.run_id, - status: d.status, - }), - ) - const derivedTextDeltas = createDerivedCollection( - `textDeltas`, - textDeltas.collection, - (d) => ({ - text_id: d.text_id, - run_id: d.run_id, - delta: d.delta, - }), - ) - const derivedToolCalls = createDerivedCollection( - `toolCalls`, - toolCalls.collection, - (d) => ({ - run_id: d.run_id, - tool_name: d.tool_name, - }), - ) - const derivedSteps = createDerivedCollection( - `steps`, - steps.collection, - (d) => ({ - run_id: d.run_id, - step_number: d.step_number, - }), - ) - - // Seed collection - const seeds = createCollection({ - id: `seed`, - getKey: (s: { key: string }) => s.key, - sync: { - sync: (params: any) => { - params.begin() - params.write({ type: `insert`, value: { key: TIMELINE_KEY } }) - params.commit() - params.markReady() - return () => {} - }, - } as SyncConfig<{ key: string }>, - startSync: true, - gcTime: 0, - }) - - // Layer 2: main query with many sibling includes - const collection = createLiveQueryCollection({ - query: (q: any) => - q.from({ s: seeds }).select(({ s }: any) => ({ - key: s.key, - runs: toArray( - q - .from({ r: derivedRuns }) - .where(({ r }: any) => eq(r.timelineKey, s.key)) - .orderBy(({ r }: any) => r.order) - .select(({ r }: any) => ({ key: r.key, status: r.status })), - ), - texts: toArray( - q - .from({ t: derivedTexts }) - .where(({ t }: any) => eq(t.timelineKey, s.key)) - .orderBy(({ t }: any) => t.order) - .select(({ t }: any) => ({ - key: t.key, - run_id: t.run_id, - status: t.status, - })), - ), - textDeltas: toArray( - q - .from({ td: derivedTextDeltas }) - .where(({ td }: any) => eq(td.timelineKey, s.key)) - .orderBy(({ td }: any) => td.order) - .select(({ td }: any) => ({ - key: td.key, - text_id: td.text_id, - delta: td.delta, - })), - ), - toolCalls: toArray( - q - .from({ tc: derivedToolCalls }) - .where(({ tc }: any) => eq(tc.timelineKey, s.key)) - .orderBy(({ tc }: any) => tc.order) - .select(({ tc }: any) => ({ - key: tc.key, - tool_name: tc.tool_name, - })), - ), - steps: toArray( - q - .from({ st: derivedSteps }) - .where(({ st }: any) => eq(st.timelineKey, s.key)) - .orderBy(({ st }: any) => st.order) - .select(({ st }: any) => ({ - key: st.key, - step_number: st.step_number, - })), - ), - })), - }) - - await collection.preload() - - const data = () => collection.get(TIMELINE_KEY) - - // Insert run + text - runs.insert({ key: `run-1`, status: `started`, _seq: 1 }) - texts.insert({ - key: `text-1`, - run_id: `run-1`, - status: `streaming`, - _seq: 2, - }) - await new Promise((r) => setTimeout(r, 100)) - - expect(data().runs).toHaveLength(1) - expect(data().texts).toHaveLength(1) - expect(data().textDeltas).toHaveLength(0) - - // First textDelta - textDeltas.insert({ - key: `td-1`, - text_id: `text-1`, - run_id: `run-1`, - delta: `Hello`, - _seq: 3, - }) - await new Promise((r) => setTimeout(r, 100)) - expect(data().textDeltas).toHaveLength(1) - expect(data().textDeltas[0].delta).toBe(`Hello`) - - // Second textDelta — the critical test - textDeltas.insert({ - key: `td-2`, - text_id: `text-1`, - run_id: `run-1`, - delta: ` world`, - _seq: 4, - }) - await new Promise((r) => setTimeout(r, 100)) - expect(data().textDeltas).toHaveLength(2) - }) -})