Original issue tinyhumansai#2442 by @obchain on 2026-05-21T11:51:15Z
Summary
memory::ingestion::queue::start_worker_with_state builds the ingestion job channel with mpsc::unbounded_channel, so producers (put_doc, store_skill_sync) can enqueue indefinitely while the worker drains one job at a time under a singleton run-lock. A misbehaving or buggy agent calling put_doc in a tight loop can grow the queue without bound and exhaust process memory.
Problem
src/openhuman/memory/ingestion/queue.rs:106 (current main, 6137b67):
let (tx, rx) = mpsc::unbounded_channel::<IngestionJob>();
The worker (ingestion_worker, same file ~L123) serialises every job behind IngestionState::acquire() so it can only process one extraction at a time — the local extraction LLM contends otherwise. Job processing is on the order of seconds-to-minutes per document depending on doc size and model.
Two producer sites push into the channel directly without backpressure:
src/openhuman/memory/store/client.rs:152 — put_doc
src/openhuman/memory/store/client.rs:266 — store_skill_sync
Both increment IngestionState::enqueue() and then call IngestionQueue::submit(job). submit already handles the "worker gone" path (SendError) but the channel itself has no capacity bound.
Concrete repro: a skill that loops calling put_doc (or any code path that ends up at one of the two producers above) ~100k times faster than the worker can drain results in:
queue_depth atomic climbing to 100k.
- ~100k
IngestionJob values resident in the channel buffer; each holds an owned NamespaceDocumentInput (full document content + metadata), so peak memory scales linearly with doc size × queue depth.
- No backpressure signal to the producer — submit returns
true regardless of pressure.
- OOM kill or paging stall before the worker catches up.
Impact tier: not exploitable across a trust boundary (producers are inside the user's own core), but it is a robustness bug: a buggy skill, a misconfigured Composio sync, or an agent re-ingesting the same source on every tick can DoS the local core without any user-visible warning.
Solution (optional)
Three-step fix in src/openhuman/memory/ingestion/queue.rs:
- Replace
mpsc::unbounded_channel with mpsc::channel(DEFAULT_QUEUE_CAPACITY). Suggest DEFAULT_QUEUE_CAPACITY = 512 — at typical doc sizes (1KB–100KB) the buffer caps memory pressure at well under 100MB while still absorbing reasonable user-driven bursts (bulk import of a Notion workspace, large Slack backfill).
- Change
submit from tx.send(job) to tx.try_send(job) (non-blocking) and distinguish TrySendError::Full from TrySendError::Closed:
Full → log at warn with namespace + title, decrement enqueue counter, return false so producers can surface "queue full, retry later" to the caller.
Closed → keep existing "worker gone" behaviour.
- Expose a
start_worker_with_capacity variant for tests so the capacity-bound path can be exercised deterministically without faking a slow worker.
Optional: emit a DomainEvent::MemoryIngestionEnqueueDropped { … } so the existing event-bus subscribers (status RPC, observability) can surface drops to the operator.
Acceptance criteria
Related
Summary
memory::ingestion::queue::start_worker_with_statebuilds the ingestion job channel withmpsc::unbounded_channel, so producers (put_doc,store_skill_sync) can enqueue indefinitely while the worker drains one job at a time under a singleton run-lock. A misbehaving or buggy agent callingput_docin a tight loop can grow the queue without bound and exhaust process memory.Problem
src/openhuman/memory/ingestion/queue.rs:106(currentmain, 6137b67):The worker (
ingestion_worker, same file ~L123) serialises every job behindIngestionState::acquire()so it can only process one extraction at a time — the local extraction LLM contends otherwise. Job processing is on the order of seconds-to-minutes per document depending on doc size and model.Two producer sites push into the channel directly without backpressure:
src/openhuman/memory/store/client.rs:152—put_docsrc/openhuman/memory/store/client.rs:266—store_skill_syncBoth increment
IngestionState::enqueue()and then callIngestionQueue::submit(job).submitalready handles the "worker gone" path (SendError) but the channel itself has no capacity bound.Concrete repro: a skill that loops calling
put_doc(or any code path that ends up at one of the two producers above) ~100k times faster than the worker can drain results in:queue_depthatomic climbing to 100k.IngestionJobvalues resident in the channel buffer; each holds an ownedNamespaceDocumentInput(full document content + metadata), so peak memory scales linearly with doc size × queue depth.trueregardless of pressure.Impact tier: not exploitable across a trust boundary (producers are inside the user's own core), but it is a robustness bug: a buggy skill, a misconfigured Composio sync, or an agent re-ingesting the same source on every tick can DoS the local core without any user-visible warning.
Solution (optional)
Three-step fix in
src/openhuman/memory/ingestion/queue.rs:mpsc::unbounded_channelwithmpsc::channel(DEFAULT_QUEUE_CAPACITY). SuggestDEFAULT_QUEUE_CAPACITY = 512— at typical doc sizes (1KB–100KB) the buffer caps memory pressure at well under 100MB while still absorbing reasonable user-driven bursts (bulk import of a Notion workspace, large Slack backfill).submitfromtx.send(job)totx.try_send(job)(non-blocking) and distinguishTrySendError::FullfromTrySendError::Closed:Full→ log at warn with namespace + title, decrementenqueuecounter, returnfalseso producers can surface "queue full, retry later" to the caller.Closed→ keep existing "worker gone" behaviour.start_worker_with_capacityvariant for tests so the capacity-bound path can be exercised deterministically without faking a slow worker.Optional: emit a
DomainEvent::MemoryIngestionEnqueueDropped { … }so the existing event-bus subscribers (status RPC, observability) can surface drops to the operator.Acceptance criteria
falsefromsubmitand a logged warning, whilequeue_depthstays at the cap (not the runaway value).put_docandstore_skill_synccontinue to enqueue successfully under normal load.Related
feat: background ingestion queue for memory graph extraction).