fix(memory_tree): gate ingest on source_id so summariser tree never sees a source twice#1353
Conversation
📝 WalkthroughWalkthroughAdds source-level deduplication for document ingests: new ingested-sources table, public pre-check and transactional claim APIs, IngestResult.already_ingested field and constructors, early pre-check short-circuit for documents, transactional gate in persist, tests, and a Slack empty-bucket result update. ChangesSource-level deduplication in memory ingestion
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
…ees a source twice Memory items (documents, chat batches, email threads) are append-only — once `(source_kind, source_id)` is ingested, re-ingesting must not flow through extract → admit → buffer → seal again, otherwise the same content lands in the summariser tree twice. - New `mem_tree_ingested_sources` table keyed on `(source_kind, source_id)`. - `ingest_chat` / `ingest_email` / `ingest_document` short-circuit on the fast-path lookup before canonicalisation. - `persist` claims the row inside the same transaction as the chunk / score / job writes via `INSERT OR IGNORE`, so two concurrent ingests of the same source can't both pass the gate. - `IngestResult.already_ingested` surfaces the no-op to callers.
Chat (`slack:{conn}`) and email (`gmail:{participants}`) `source_id`s
are stream identifiers — many batches / threads accumulate under one
source over time. The previous source-level gate made every bucket
after the first a no-op, breaking the slack workspace tree fill /
seal cascade and the gmail per-participant append flow, and turning
`read_rpc::tests::list_sources_aggregates` red.
Document `source_id`s on the other hand identify a single immutable
file (one notion page, one drive doc), so the gate stays in place
for `ingest_document`. Chat / email keep their existing chunk-level
idempotency (`chunk_id` includes content) which already swallows true
replays.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/memory/tree/ingest.rs`:
- Around line 120-124: The debug logs emit raw source_id; update the logging in
the ingest flow (e.g., where already_ingested(...) is checked and the other
debug at lines ~223-225) to log an opaque value instead: compute a short
deterministic hash or redacted token from source_id (e.g., SHA256 and take first
N chars) and use that hashed_id in log messages instead of the raw source_id;
apply the same change for both occurrences (refer to already_ingested(...) call
and the other duplicate-path debug log) so logs contain only the hashed/opaque
correlation id.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ba4e0341-cad1-4c60-9b29-9e6ee95b04a1
📒 Files selected for processing (1)
src/openhuman/memory/tree/ingest.rs
Per CodeRabbit (and existing convention in `chunker.rs` / `composio/providers/gmail/ingest.rs`), raw `source_id` values are recoverable inputs and should not be emitted to logs. Switch the two new duplicate-path debug lines in `ingest.rs` to log `source_id_hash=<redact()>` instead.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/openhuman/memory/tree/ingest.rs (1)
300-309: 💤 Low valueLGTM — Race-loss handling is correct.
Not waking workers on the "already ingested" path is appropriate since no jobs were enqueued.
One minor consideration: if the pre-check passes but the transactional claim fails (rare concurrent ingest race), the staged content files from line 172 remain orphaned on disk. This is low severity given the rarity of the race and could be addressed with a periodic GC pass if needed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/openhuman/memory/tree/ingest.rs` around lines 300 - 309, When the transactional claim loses (the match on written yields None and returns Ok(IngestResult::already_ingested(source_id))), the staged content files created earlier in this function remain orphaned; before returning, delete those staged files. Locate the staging step used earlier in this function (the variable/paths holding the staged content) and add a cleanup call to remove those files or atomically roll back staging, then return Ok(IngestResult::already_ingested(source_id)); keep the jobs::wake_workers() behavior unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/openhuman/memory/tree/ingest.rs`:
- Around line 300-309: When the transactional claim loses (the match on written
yields None and returns Ok(IngestResult::already_ingested(source_id))), the
staged content files created earlier in this function remain orphaned; before
returning, delete those staged files. Locate the staging step used earlier in
this function (the variable/paths holding the staged content) and add a cleanup
call to remove those files or atomically roll back staging, then return
Ok(IngestResult::already_ingested(source_id)); keep the jobs::wake_workers()
behavior unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ccd3a7c1-222f-42f6-93f6-dc2c99f59ab8
📒 Files selected for processing (1)
src/openhuman/memory/tree/ingest.rs
…ees a source twice (tinyhumansai#1353)
Summary
(source_kind, source_id)twice.mem_tree_ingested_sourcestable claims a source on first ingest; subsequentingest_chat/ingest_email/ingest_documentcalls short-circuit.Problem
Memory items (documents, chat batches, email threads) are append-only — once a source has been ingested, the file is never updated, only added to. But the existing chunk-level idempotency only catches identical
(source_kind, source_id, seq, content)triples. If the same logical source is ingested twice through any path that yields different chunk content (whitespace drift, re-canonicalisation, partial replay) it flows back throughextract → admit → buffer → seal, duplicating the same content into the summariser tree. We were observing duplicates in the graph as a result.Solution
src/openhuman/memory/tree/store.rs— newmem_tree_ingested_sourcestable keyed on(source_kind, source_id), plusis_source_ingested(best-effort lookup) andclaim_source_ingest_tx(transactionalINSERT OR IGNORE).src/openhuman/memory/tree/ingest.rs—ingest_*entry point checksis_source_ingestedbefore canonicalisation and short-circuits on hit.persist's transaction,claim_source_ingest_txis the authoritative gate. If the row already exists the closure returns early and nothing is committed.IngestResultgot a newalready_ingested: boolfield (defaulted via serde for wire compatibility).src/openhuman/memory/slack_ingestion/ops.rs— updated empty-bucket short-circuit for the new field.second_ingest_of_same_source_id_is_short_circuitedproves a secondingest_documentunder the samesource_id(even with different body) writes nothing.Trade-off: documents are append-only by design, so the gate uses
source_idalone — even mutated bodies under the same id are rejected. That matches the data model and is what we want for the summariser tree.Submission Checklist
docs/TESTING-STRATEGY.mdcargo-llvm-covdocs/TESTING-STRATEGY.md)Impact
chunks.db. Schema is created viaCREATE TABLE IF NOT EXISTSso existing workspaces upgrade transparently.(source_kind, source_id)is now a no-op returningIngestResult { already_ingested: true, .. }. Callers that today re-drove ingest as a way to refresh content will need a different mechanism — but per the data model, "memory items are final once ingested", so this is the intended contract.Related
AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
fix/memory-permcd7b29a56f2c9ac868c3bece2738e20ee869d7aeValidation Run
pnpm --filter openhuman-app format:checkpnpm typecheckcargo test --lib openhuman::memory::tree::ingest::— 5/5 passingcargo fmt --check+cargo check --manifest-path Cargo.tomlValidation Blocked
command:N/Aerror:N/Aimpact:N/ABehavior Changes
(source_kind, source_id)becomes a no-op.Parity Contract
persistis unchanged; the new gate sits in front of it.IngestResultgot a new field with#[serde(default)]so old callers / persisted JSON deserialise unchanged.Duplicate / Superseded PR Handling
Summary by CodeRabbit
New Features
Bug Fixes