fix(agent): refresh delegation surface on mid-session Composio connect/revoke#1687
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
✅ Files skipped from review due to trivial changes (2)
🚧 Files skipped from review as they are similar to previous changes (5)
📝 WalkthroughWalkthroughThis PR makes agents detect mid-session Composio connected-integration set changes via a cached snapshot, seed and track a connection-set hash per Agent, eagerly warm/invalidate the integrations cache on connection events, and atomically reconcile synthesized delegation tools and visible tool specs when the set changes. ChangesMid-session integration-set refresh for orchestrator delegation tools
Sequence DiagramsequenceDiagram
participant Agent
participant INTEGRATIONS_CACHE
participant ComposioBus
participant RefreshLogic
Agent->>INTEGRATIONS_CACHE: cached_active_integrations(config)
INTEGRATIONS_CACHE-->>Agent: cached integrations (or None)
Agent->>Agent: compute current_hash = connected_set_hash(integrations)
alt Hash differs
Agent->>RefreshLogic: refresh_delegation_tools()
RefreshLogic->>RefreshLogic: compute new synthesized names/specs
RefreshLogic->>Agent: update tools/specs, visible names
Agent->>Agent: update connected_integrations & last_seen_integrations_hash
end
ComposioBus->>INTEGRATIONS_CACHE: invalidate_connected_integrations_cache()
ComposioBus->>INTEGRATIONS_CACHE: fetch_connected_integrations(config) (eager warm)
INTEGRATIONS_CACHE-->>ComposioBus: warmed cache
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
6ed41e4 to
4052a45
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/agent/harness/session/turn.rs`:
- Around line 171-172: The formatting of the binding for new_hash calling
crate::openhuman::composio::connected_set_hash(&cache_view) is not
rustfmt-compliant; run rustfmt (cargo fmt) on the changed Rust file (or the
whole crate) to fix the binding shape and update the whitespace/line breaks
around the let new_hash =
crate::openhuman::composio::connected_set_hash(&cache_view); expression so it
matches project formatting standards before merging.
- Around line 179-182: The code updates self.last_seen_integrations_hash even
when self.refresh_delegation_tools() can fail, which prevents future
reconciliation; change the flow so you only advance last_seen_integrations_hash
after refresh_delegation_tools() completes successfully. Concretely, call
refresh_delegation_tools() and check its Result/return (or catch the failure
from the shared Arc path) before assigning self.last_seen_integrations_hash =
new_hash — if refresh_delegation_tools() returns Err, leave
last_seen_integrations_hash unchanged (and keep self.connected_integrations set
or roll it back as appropriate) so retries remain possible; reference the
methods/fields refresh_delegation_tools, connected_integrations, and
last_seen_integrations_hash to locate and modify the logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3378ac8f-66f9-436e-a371-979622b4f8e6
📒 Files selected for processing (6)
src/openhuman/agent/harness/session/builder.rssrc/openhuman/agent/harness/session/turn.rssrc/openhuman/agent/harness/session/types.rssrc/openhuman/composio/bus.rssrc/openhuman/composio/mod.rssrc/openhuman/composio/ops.rs
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/composio/ops.rs`:
- Around line 739-747: The cached_active_integrations helper currently has
multiple branches (lock contention, cache miss, expired entry, and hit) but no
diagnostics; update the function cached_active_integrations to emit
development-oriented tracing/logging (use the tracing or log crate at
trace/debug level) on each branch with stable, easily searchable prefixes (e.g.
"cached_active_integrations:lock_contended", "cached_active_integrations:miss",
"cached_active_integrations:expired", "cached_active_integrations:hit") and
include key context like the derived key and elapsed duration for the expired
case so the hot-path probe behavior is observable without changing control flow.
- Around line 773-782: The current connected_set_hash implementation treats
duplicates as distinct because it hashes the sorted slugs Vec directly; to
enforce set semantics deduplicate the sorted slugs before hashing (e.g., call
dedup() on the sorted Vec<&str> named slugs) so only unique toolkit slugs are
hashed; keep the existing sort() then call slugs.dedup() and proceed to hash
slugs into hasher as before.
- Around line 177-181: fetch_connected_integrations currently returns Vec and
conflates "authoritative empty" with backend failures, so change its signature
to return Result<Vec<Integration>, E> (e.g., anyhow::Error or a concrete error
type), update its internal error paths to return Err on backend/unavailable
cases, and update callers such as the call in ops.rs where
fetch_connected_integrations(config).await is used: match on the Result and only
log the eager-warm success when Ok(_) (and distinguish Ok(Vec::new()) if you
want a separate debug message), and log an error/warn with the returned error
when Err(e) occurs so backend/auth failures are visible in logs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 355ead27-3343-440a-9ffd-27f698c6eb02
📒 Files selected for processing (6)
src/openhuman/agent/harness/session/builder.rssrc/openhuman/agent/harness/session/turn.rssrc/openhuman/agent/harness/session/types.rssrc/openhuman/composio/bus.rssrc/openhuman/composio/mod.rssrc/openhuman/composio/ops.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- src/openhuman/agent/harness/session/builder.rs
- src/openhuman/composio/mod.rs
- src/openhuman/composio/bus.rs
- src/openhuman/agent/harness/session/turn.rs
| let mut slugs: Vec<&str> = integrations | ||
| .iter() | ||
| .filter(|i| i.connected) | ||
| .map(|i| i.toolkit.as_str()) | ||
| .collect(); | ||
| slugs.sort(); | ||
|
|
||
| let mut hasher = DefaultHasher::new(); | ||
| slugs.hash(&mut hasher); | ||
| hasher.finish() |
There was a problem hiding this comment.
Enforce true set semantics in connected_set_hash.
The docs state set-equivalence semantics, but duplicates in input would currently change the hash. Dedup the sorted slugs before hashing to match the contract.
Proposed fix
let mut slugs: Vec<&str> = integrations
.iter()
.filter(|i| i.connected)
.map(|i| i.toolkit.as_str())
.collect();
slugs.sort();
+ slugs.dedup();
let mut hasher = DefaultHasher::new();
slugs.hash(&mut hasher);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut slugs: Vec<&str> = integrations | |
| .iter() | |
| .filter(|i| i.connected) | |
| .map(|i| i.toolkit.as_str()) | |
| .collect(); | |
| slugs.sort(); | |
| let mut hasher = DefaultHasher::new(); | |
| slugs.hash(&mut hasher); | |
| hasher.finish() | |
| let mut slugs: Vec<&str> = integrations | |
| .iter() | |
| .filter(|i| i.connected) | |
| .map(|i| i.toolkit.as_str()) | |
| .collect(); | |
| slugs.sort(); | |
| slugs.dedup(); | |
| let mut hasher = DefaultHasher::new(); | |
| slugs.hash(&mut hasher); | |
| hasher.finish() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/openhuman/composio/ops.rs` around lines 773 - 782, The current
connected_set_hash implementation treats duplicates as distinct because it
hashes the sorted slugs Vec directly; to enforce set semantics deduplicate the
sorted slugs before hashing (e.g., call dedup() on the sorted Vec<&str> named
slugs) so only unique toolkit slugs are hashed; keep the existing sort() then
call slugs.dedup() and proceed to hash slugs into hasher as before.
PR tinyhumansai#1670 wired refresh_delegation_tools into the turn-1 path so the orchestrator finally sees delegate_<toolkit> tools when the agent is first built. But that gate (`if self.history.is_empty()`) only fires once per Agent instance — every subsequent turn falls into the `else` branch that deliberately leaves the tool surface alone for KV-cache prefix preservation. If a user connects (or revokes) a Composio toolkit mid-conversation, the orchestrator's function-calling schema stays frozen on the turn-1 snapshot until the session is restarted and the user is stuck telling the model "i just connected slack, can you check it" and watching it deflect. This patch adds a schema-only refresh that fires on every subsequent turn, gated on a stable hash of the connected-toolkit set. When the set has actually changed since the last reconcile, self.tools and self.tool_specs are rebuilt to match — but the system prompt is explicitly left untouched, so the provider's KV cache prefix stays valid for the rest of the conversation. The schema is sent fresh on every API call anyway, so the model picks up the new tool surface on the very next dispatch. Three layers of change: 1. composio/ops.rs gains cached_active_integrations() — a read-only probe over the existing INTEGRATIONS_CACHE that returns the current connected set without triggering a backend fetch. try_read so a writer (UI poll / event subscriber) never blocks a turn. Plus connected_set_hash() that returns a stable u64 derived from the sorted connected-toolkit slug set; order-independent and description-insensitive so cosmetic catalogue edits don't trigger spurious refreshes. 2. composio/bus.rs decouples cache invalidate + eager warm from the provider-routing hook. Previously the subscriber returned early when get_provider(toolkit) returned None — that meant toolkits without a registered provider (most of the 119 Composio toolkits, e.g. googlecalendar, github) bypassed the eager warm entirely and had to wait for the desktop UI's 5s composio_list_connections diff-poll to invalidate the stale cache. Now wait_for_connection_active + invalidate + fetch_connected_integrations runs for every Created event, and the provider hook is an optional downstream step gated on its own get_provider lookup. Same pattern mirrored in the revoke path (ops.rs::composio_delete_connection) so disconnects propagate equally fast. 3. agent/harness/session/turn.rs adds the per-turn schema-only refresh, plus rewrites refresh_delegation_tools() from PR tinyhumansai#1670's append-only shape into a full reconcile of the synthesised subset. The Agent struct now tracks synthesized_tool_names: HashSet<String> as an explicit mask of what came out of the last collect_orchestrator_tools call, so the reconciler can drop every previously-synthesised entry wholesale and append the fresh set. Stale delegate_<toolkit> tools after a revoke now drop cleanly instead of leaking as zombies. Direct tools (query_memory, cron_add, …) are guaranteed untouched because their names are never in synthesized_tool_names. On Arc::get_mut failure (sub-agent captured a snapshot before turn boundary — shouldn't happen but we defend) the previous synthesized_tool_names is restored so the next refresh retries the full transition cleanly. Validated end-to-end on a desktop dev build (staging-api): * Turn-1 baseline sync (PR tinyhumansai#1670 path, unchanged) * Mid-session connect with provider-having toolkit (slack) * Mid-session connect with no-provider toolkit (googlecalendar, github) — the case the bus decouple unlocks * Mid-session revoke (notion) — schema correctly reflected on next fresh-Agent observation * Three incremental mid-session connects on the same thread without Agent reconstruction — each turn picks up the new toolkit, KV cache stays warm, system prompt never re-rendered Cost in the steady state (no integration change): one INTEGRATIONS_CACHE try_read + one u64 compare. KV cache fully preserved. Backend `list_connections` round-trip from the eager warm fires once per OAuth completion in a background task — same data the UI's 5s poll was going to fetch anyway, just moved ~5s earlier. Follow-up issue to file separately: Composio's action-execution gateway has a 30-60s lag after `connection.status == ACTIVE` before the new token is usable for action calls. The first delegate_<toolkit> dispatch immediately after OAuth often gets `Connection error, try to authenticate` and succeeds on retry. Not addressable here; needs a retry / readiness probe inside integrations_agent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
4052a45 to
d7b142e
Compare
…t/revoke (tinyhumansai#1687) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
googlecalendar,github) get the same fast-path as ones with providers (gmail,notion,slack).refresh_delegation_toolsis now a full reconcile of the synthesised subset, not append-only — staledelegate_<toolkit>tools after a revoke are cleanly removed.Problem
PR #1670 (
bd72d78b) fixed the synchronous-builder ↔ async-Composio-fetch gap on turn 1: it addedAgent::refresh_delegation_toolsand wired it into theif self.history.is_empty()branch ofAgent::turn. From that point onward, fresh threads (and resumed threads on app restart) correctly seedelegate_gmail,delegate_notion, etc. in the function-calling schema.But the
elsebranch — every turn after the first — deliberately skips the rebuild to preserve the KV-cache prefix the provider already tokenised. So once a session is running, the orchestrator's tool surface is frozen on the turn-1 snapshot.User-visible symptom: in an active conversation, the user clicks "Connect Slack" in Settings, completes OAuth, types "check my unread slack threads" — orchestrator's schema still only has
delegate_gmail/delegate_notionfrom turn 1, model deflects with "Slack isn't connected" or routes the call intodelegate_tools_agentwhich correctly refuses Composio work. The fix is to start a new thread.The same problem hits the reverse: revoking a toolkit mid-session leaves a zombie
delegate_<toolkit>in the schema; the model can still emit calls against it and they fail downstream at the Composio dispatch layer.PR #1670's author flagged this as a follow-up TODO. Issue #1679 has the full design discussion.
Solution
Three coordinated changes across
composio/andagent/harness/session/.1. Read-only cache probe + connection-set hash (
composio/ops.rs)cached_active_integrationsis the cheap "what does Layer 1 currently say?" probe over the existingINTEGRATIONS_CACHE.try_readso a writer (the desktop UI's 5 scomposio_list_connectionspoll, or the event-driven invalidator) never blocks a turn. ReturnsNoneon cache miss, expired entry, or writer-held lock — the agent treats that as "no signal, leave current state alone".connected_set_hashreturns a stableu64over the sorted connected toolkit slug set. Order-independent. Deliberately description-insensitive so cosmetic Composio catalogue edits don't invalidate the agent's surface — only membership changes do. Unconnected toolkits don't contribute becausecollect_orchestrator_toolsfilters them anyway, so churn among the unconnected set never alters the agent's schema and shouldn't trigger a refresh.2. Eager cache warm, decoupled from provider hooks (
composio/bus.rs+ revoke path)Pre-existing flow:
composio_authorizepublishesComposioConnectionCreated;ComposioConnectionCreatedSubscriberwaits for the connection to goACTIVEand then callsinvalidate_connected_integrations_cache(). After that, the cache is empty until something else fetches.This PR extends that path so the subscriber also calls
fetch_connected_integrations()immediately after invalidating — repopulating the cache eagerly. Net effect: the cache is hot with the new toolkit ~1-2 s after OAuth completes, instead of waiting up to 5 s for the desktop UI's nextcomposio_list_connectionspoll to repopulate it.The non-obvious part: the previous subscriber shape gated the entire
tokio::spawn(containingwait_for_connection_active+ invalidate) onget_provider(toolkit).is_some()— toolkits without a registered provider returned early and the cache was never invalidated at all. Layer 1 only refreshed via the desktop UI's 5 s diff-poll. This PR moves theget_providerlookup downstream of the invalidate + warm so it only gates the optional provider hook; cache refresh now fires for every connection-active event regardless of provider registration. Same fix applied symmetrically incomposio_delete_connectionso revokes propagate equally fast.3. Per-turn schema-only refresh + full reconcile (
agent/harness/session/turn.rs+types.rs+builder.rs)The
Agentstruct gains two fields:last_seen_integrations_hash: u64— initialised to 0, written at the end of every refresh.synthesized_tool_names: HashSet<String>— explicit mask of whatcollect_orchestrator_toolsproduced on the previous reconcile.At the top of every subsequent turn (the
elsebranch ofif self.history.is_empty()), the agent readscached_active_integrations, hashes the result, and compares to its stored hash. On diff, it callsrefresh_delegation_tools. Nobuild_system_promptrebuild; nohistory[0]mutation. The schema is sent fresh on every API call, so the model picks up the new tools without any KV-cache invalidation. The system prompt's## Connected Integrationsblock stays frozen — slightly stale until next session, but the schema is the source of truth the model routes against.refresh_delegation_toolsitself is rewritten from PR #1670's append-only shape into a full reconcile of the synthesised subset:synthesized_tool_namesfrom the previous synthesis. Direct tools (query_memory,cron_add, …) are untouched because their names are never in that mask.synthesized_tool_nameswith the new set so the next refresh has a clean mask to undo.Three correctness wins over append-only:
synthesized_tool_namesare touched.retain+extendguarantees every final entry is either a non-synthesised direct tool or a member of the freshsynthedset.On
Arc::get_mutfailure (sub-agent captured a snapshot before turn boundary — shouldn't happen but defended against), the previoussynthesized_tool_namesis restored so the next refresh attempts the full transition cleanly rather than resuming from a partial state.Validation
Tested end-to-end on a desktop dev build pointed at
staging-api.tinyhumans.ai. All scenarios validated with logged evidence:connection set changed mid-session (hash X -> Y); refreshing tool schema;added=["delegate_slack"]on next turn[composio:bus] connection observed active; invalidating + eagerly warming integrations cache toolkit=googlecalendar;added=["delegate_googlecalendar"]delegate_notioncorrectly absenttry_read+u64compare; KV cache preservedSubmission Checklist
Arc::get_mutharness that doesn't exist yet in the test suite. Happy to add follow-up tests if reviewers want them gated.diff-cover) meet the gate enforced by.github/workflows/coverage.yml. Runpnpm test:coverageandpnpm test:rustlocally; PRs below 80% on changed lines will not merge.docs/TEST-COVERAGE-MATRIX.mdrows added/removed/renamed.Closes #NNNin the## RelatedsectionImpact
list_connectionsround-trip per OAuth completion / revoke in a background task — same call the UI's 5 s poll was going to make anyway, just moved ~5 s earlier.toolsarray of the next API call.subagentsdeclarations or signed-out users hit no-op branches.Related
bd72d78b) —fix(agent): synthesise delegate_<toolkit> tools after live integrations fetch. Adds the mid-session refresh that PR fix(agent): synthesise delegate_<toolkit> tools after live integrations fetch #1670's author flagged as a follow-up TODO.DomainEvent::ComposioConnectionCreated/…Deleted(src/core/event_bus/events.rs:255,:261),ComposioConnectionCreatedSubscriberpattern (src/openhuman/composio/bus.rs:330), andINTEGRATIONS_CACHE(src/openhuman/composio/ops.rs:675).connection.status == ACTIVEbefore the new OAuth token is usable for action calls. The firstdelegate_<toolkit>dispatch immediately after OAuth often returnsConnection error, try to authenticateand succeeds on retry. Not addressable in this PR — needs a retry / readiness-probe insideintegrations_agent.AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
fix/orchestrator-mid-session-tool-refresh6ed41e44Validation Run
cargo check --manifest-path Cargo.toml --lib— clean (18 pre-existing warnings)pnpm typecheck/pnpm format:check/pnpm lintwould all be no-ops for this diffValidation Blocked
command:N/Aerror:N/Aimpact:N/ABehavior Changes
delegate_<toolkit>left in the schema.Parity Contract
subagentsor no connected toolkits hit no-op branches. Turn-1 path unchanged. KV cache prefix preserved (system prompt is never mutated by this PR).Arc::get_mutguard restores the previoussynthesized_tool_nameson shared-Arc failure so the next refresh retries the full transition. Cachetry_read(notread) so a writer in progress never blocks a turn. Both connect-side and revoke-side eager warms are best-effort; failure falls back to the existing UI-poll-based path within ~5 s.Duplicate / Superseded PR Handling
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Improvements