From 2dc94c283f898fb97e3f49366ec69c625394dab2 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Mon, 13 Apr 2026 20:26:10 -0700 Subject: [PATCH 1/3] feat(config): default screen intelligence, dictation, and vision model off MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flip defaults so no macOS TCC permission prompt fires on first run: - `dictation.enabled`: `true` → `false` (was auto-starting rdev::listen, which requests Accessibility/Input Monitoring on macOS) - `screen_intelligence.use_vision_model`: `true` → `false` (fewer surprise vision-model calls; Pass 1 Apple Vision OCR still runs) Aligns all permission-gated auto-starts on a consistent opt-in posture: `screen_intelligence.enabled`, `autocomplete.enabled`, and `voice_server.auto_start` already default to `false`. Users must now explicitly flip each toggle (config or JSON-RPC) before the core triggers any OS permission dialog. --- src/openhuman/config/schema/accessibility.rs | 2 +- src/openhuman/config/schema/dictation.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/openhuman/config/schema/accessibility.rs b/src/openhuman/config/schema/accessibility.rs index 7ed04092b4..669e44f91f 100644 --- a/src/openhuman/config/schema/accessibility.rs +++ b/src/openhuman/config/schema/accessibility.rs @@ -68,7 +68,7 @@ fn default_autocomplete_enabled() -> bool { } fn default_use_vision_model() -> bool { - true + false } impl Default for ScreenIntelligenceConfig { diff --git a/src/openhuman/config/schema/dictation.rs b/src/openhuman/config/schema/dictation.rs index fbc3d6c7ea..b5e9769643 100644 --- a/src/openhuman/config/schema/dictation.rs +++ b/src/openhuman/config/schema/dictation.rs @@ -44,7 +44,7 @@ pub struct DictationConfig { } fn default_enabled() -> bool { - true + false } fn default_hotkey() -> String { From 8aad2512b3110c295ef26211de0f330ce8603afa Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Mon, 13 Apr 2026 20:26:22 -0700 Subject: [PATCH 2/3] feat(channels): fire typing indicator on webhook-inbound path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two inbound flows exist today and only one fires typing: - Local bot (`channels_config.telegram.bot_token` set) → dispatch.rs already calls `channel.start_typing` + `spawn_scoped_typing_task` - Backend webhook (Telegram → backend → socket.io → core) → `ChannelInboundSubscriber` had **no typing call** — replies route via backend REST, so the local `Channel` trait isn't reachable. Close the gap by going through the backend: - `api/rest.rs`: add `send_channel_typing(channel, jwt, body)` hitting the new `POST /channels/:id/typing` backend route. - `channels/bus.rs`: extract the agent-wait loop into `run_agent_loop` and wrap it with a typing task that fires immediately on `start_chat` success, refreshes every 4s (beats Telegram's ~5s and Discord's ~10s typing TTLs), and cancels on every exit path (done / error / empty / bus-closed / lagged / timeout). Backend failures log at debug — a flaky typing call must never block the reply flow. Generalises to every channel with a backend adapter; adapters without a native typing API no-op gracefully. --- src/api/rest.rs | 21 ++++ src/openhuman/channels/bus.rs | 176 ++++++++++++++++++++++++++-------- 2 files changed, 158 insertions(+), 39 deletions(-) diff --git a/src/api/rest.rs b/src/api/rest.rs index 2f92f4c8c6..65dd336535 100644 --- a/src/api/rest.rs +++ b/src/api/rest.rs @@ -522,6 +522,27 @@ impl BackendOAuthClient { .await } + /// Broadcasts a typing indicator on a communication channel. Short-lived — + /// callers should re-invoke periodically to keep the indicator alive during + /// long-running operations. + pub async fn send_channel_typing( + &self, + channel: &str, + bearer_jwt: &str, + body: Value, + ) -> Result { + let channel = channel.trim().trim_matches('/'); + anyhow::ensure!(!channel.is_empty(), "channel is required"); + let encoded = urlencoding::encode(channel); + self.authed_json( + bearer_jwt, + Method::POST, + &format!("channels/{encoded}/typing"), + Some(body), + ) + .await + } + /// Sends a reaction (e.g. emoji) to a message in a channel. pub async fn send_channel_reaction( &self, diff --git a/src/openhuman/channels/bus.rs b/src/openhuman/channels/bus.rs index 7fbe88d1df..4838194191 100644 --- a/src/openhuman/channels/bus.rs +++ b/src/openhuman/channels/bus.rs @@ -4,9 +4,17 @@ //! by the socket transport layer. It runs the agent inference loop via the web //! channel provider and sends the reply back through the REST API. +use std::time::Duration; + use crate::core::event_bus::{DomainEvent, EventHandler}; use async_trait::async_trait; use serde_json::json; +use tokio_util::sync::CancellationToken; + +/// How often to re-send the typing indicator while waiting for the agent. +/// Telegram's `sendChatAction` typing fades after ~5s and Discord's after ~10s; +/// 4s keeps both alive without excessive traffic. +const TYPING_REFRESH_INTERVAL: Duration = Duration::from_secs(4); /// Subscribes to `ChannelInboundMessage` events and runs the agent loop, /// sending replies back to the originating channel via the backend REST API. @@ -81,56 +89,146 @@ impl EventHandler for ChannelInboundSubscriber { } }; - let timeout = tokio::time::Duration::from_secs(180); - let deadline = tokio::time::Instant::now() + timeout; + // Start a typing indicator that refreshes in the background until the + // agent finishes (or errors/times out). The indicator is fire-and-forget + // — any failure to reach the backend is logged but never blocks the + // reply flow. Cancelling the token on exit stops the refresh loop. + let typing_cancel = CancellationToken::new(); + let typing_handle = spawn_typing_task(channel.to_string(), typing_cancel.clone()); - loop { - tokio::select! { - event = event_rx.recv() => { - match event { - Ok(ev) if ev.request_id == request_id => { - if ev.event == "chat_done" || ev.event == "chat:done" { - let reply = ev.full_response.unwrap_or_default(); - if reply.trim().is_empty() { - tracing::warn!("[channel-inbound] agent returned empty response"); - return; - } - tracing::info!( - "[channel-inbound] agent done, replying to channel='{}' len={}", - channel, - reply.len() - ); - send_channel_reply(channel, &reply).await; - return; - } - if ev.event == "chat_error" || ev.event == "chat:error" { - let err_msg = ev.message.unwrap_or_else(|| "unknown error".to_string()); - tracing::error!("[channel-inbound] agent error: {}", err_msg); - send_channel_reply( - channel, - &format!("Sorry, I encountered an error: {err_msg}"), - ) - .await; + run_agent_loop(channel, request_id, &mut event_rx).await; + + // Stop typing the moment we're done, regardless of outcome. + typing_cancel.cancel(); + if let Err(e) = typing_handle.await { + tracing::debug!("[channel-inbound] typing task join error: {e}"); + } + } +} + +/// Drives the event-bus loop that waits for the agent to finish, sends the +/// reply (or error/timeout text) back to the channel, and returns when the +/// turn is fully resolved. Pulled out of [`ChannelInboundSubscriber::handle`] +/// so every exit path passes through a single caller that can cancel the +/// typing task. +async fn run_agent_loop( + channel: &str, + request_id: String, + event_rx: &mut tokio::sync::broadcast::Receiver, +) { + let timeout = tokio::time::Duration::from_secs(180); + let deadline = tokio::time::Instant::now() + timeout; + + loop { + tokio::select! { + event = event_rx.recv() => { + match event { + Ok(ev) if ev.request_id == request_id => { + if ev.event == "chat_done" || ev.event == "chat:done" { + let reply = ev.full_response.unwrap_or_default(); + if reply.trim().is_empty() { + tracing::warn!("[channel-inbound] agent returned empty response"); return; } + tracing::info!( + "[channel-inbound] agent done, replying to channel='{}' len={}", + channel, + reply.len() + ); + send_channel_reply(channel, &reply).await; + return; } - Ok(_) => {} - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!("[channel-inbound] event bus lagged, skipped {} events", n); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - tracing::error!("[channel-inbound] event bus closed unexpectedly"); + if ev.event == "chat_error" || ev.event == "chat:error" { + let err_msg = ev.message.unwrap_or_else(|| "unknown error".to_string()); + tracing::error!("[channel-inbound] agent error: {}", err_msg); + send_channel_reply( + channel, + &format!("Sorry, I encountered an error: {err_msg}"), + ) + .await; return; } } + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("[channel-inbound] event bus lagged, skipped {} events", n); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::error!("[channel-inbound] event bus closed unexpectedly"); + return; + } } - _ = tokio::time::sleep_until(deadline) => { - tracing::error!("[channel-inbound] agent timed out after {}s", timeout.as_secs()); - send_channel_reply(channel, "Sorry, the request timed out.").await; - return; + } + _ = tokio::time::sleep_until(deadline) => { + tracing::error!("[channel-inbound] agent timed out after {}s", timeout.as_secs()); + send_channel_reply(channel, "Sorry, the request timed out.").await; + return; + } + } + } +} + +/// Spawns a background task that fires a typing indicator immediately, then +/// refreshes it every [`TYPING_REFRESH_INTERVAL`] until the cancellation token +/// is triggered. All HTTP errors are logged at debug level — the reply flow +/// must never be blocked by a flaky typing call. +fn spawn_typing_task(channel: String, cancel: CancellationToken) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Fire once up-front so the indicator shows as soon as possible. + send_channel_typing(&channel).await; + loop { + tokio::select! { + () = cancel.cancelled() => break, + _ = tokio::time::sleep(TYPING_REFRESH_INTERVAL) => { + send_channel_typing(&channel).await; } } } + }) +} + +/// Fire-and-forget typing indicator over the backend REST API. Failures are +/// logged at debug level — missing backend support (e.g. older deployments +/// without the `POST /channels/:id/typing` route) must not break replies. +async fn send_channel_typing(channel: &str) { + let config = match crate::openhuman::config::rpc::load_config_with_timeout().await { + Ok(c) => c, + Err(e) => { + tracing::debug!("[channel-inbound] typing: failed to load config: {}", e); + return; + } + }; + + let api_url = crate::api::config::effective_api_url(&config.api_url); + let jwt = match crate::api::jwt::get_session_token(&config) { + Ok(Some(t)) => t, + Ok(None) => { + tracing::debug!("[channel-inbound] typing: no session JWT — skipping"); + return; + } + Err(e) => { + tracing::debug!("[channel-inbound] typing: session token error: {}", e); + return; + } + }; + + let client = match crate::api::rest::BackendOAuthClient::new(&api_url) { + Ok(c) => c, + Err(e) => { + tracing::debug!( + "[channel-inbound] typing: failed to build API client: {}", + e + ); + return; + } + }; + + if let Err(e) = client.send_channel_typing(channel, &jwt, json!({})).await { + tracing::debug!( + "[channel-inbound] typing: backend call failed channel='{}' err={}", + channel, + e + ); } } From d9e9f416c3c254ce740799f38966437810f3b066 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Mon, 13 Apr 2026 20:55:14 -0700 Subject: [PATCH 3/3] Enhance test stability by introducing a Mutex guard for TRIAGE_DISABLED_ENV in tests - Added a static Mutex guard to ensure safe concurrent access to the `TRIAGE_DISABLED_ENV` variable during tests, preventing interleaved set_var/remove_var calls that could lead to spurious failures. - Updated relevant test cases to acquire the Mutex lock when accessing the environment variable, ensuring consistent behavior across concurrent test executions. --- src/openhuman/composio/bus.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/openhuman/composio/bus.rs b/src/openhuman/composio/bus.rs index 193a39c438..7367534787 100644 --- a/src/openhuman/composio/bus.rs +++ b/src/openhuman/composio/bus.rs @@ -489,6 +489,13 @@ async fn wait_for_connection_active( mod tests { use super::*; use serde_json::json; + use std::sync::Mutex; + + /// Cargo runs tests concurrently by default, and `TRIAGE_DISABLED_ENV` + /// is process-global. Every test that reads or writes it must hold this + /// guard for the duration of its env-var usage, otherwise interleaved + /// `set_var` / `remove_var` calls cause spurious failures. + static TRIAGE_ENV_GUARD: Mutex<()> = Mutex::new(()); #[tokio::test] async fn ignores_non_composio_events() { @@ -505,6 +512,7 @@ mod tests { async fn handles_trigger_event_without_panic() { // Disable triage so this test takes the log-only path and // doesn't spawn a real LLM turn. + let _guard = TRIAGE_ENV_GUARD.lock().unwrap_or_else(|e| e.into_inner()); std::env::set_var(TRIAGE_DISABLED_ENV, "1"); let sub = ComposioTriggerSubscriber::new(); sub.handle(&DomainEvent::ComposioTriggerReceived { @@ -520,6 +528,7 @@ mod tests { #[test] fn triage_disabled_flag_parser() { + let _guard = TRIAGE_ENV_GUARD.lock().unwrap_or_else(|e| e.into_inner()); // Truthy values disable triage. for val in ["1", "true", "TRUE", "yes", "YES"] { std::env::set_var(TRIAGE_DISABLED_ENV, val);