Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion clients/agent-runtime/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ corvus integrations info Telegram

# Manage background service
corvus service install
corvus service install --linger on # Linux: keep running after logout/reboot (user service + linger)
corvus service restart # useful after binary updates
corvus service status

# Migrate memory from OpenClaw (safe preview first)
corvus migrate openclaw --dry-run
corvus migrate openclaw
```

Corvus also checks for newer releases in `agent`, `daemon`, and `status` commands.
Set `CORVUS_DISABLE_UPDATE_CHECK=1` to disable update notifications.

> **Dev fallback (no global install):** prefix commands with `cargo run --release --` (example:
`cargo run --release -- status`).

Expand Down Expand Up @@ -447,7 +452,7 @@ See [aieos.org](https://aieos.org) for the full schema and live examples.
| `gateway` | Start webhook server (default: `127.0.0.1:8080`) |
| `gateway --port 0` | Random port mode |
| `daemon` | Start long-running autonomous runtime |
| `service install/start/stop/status/uninstall` | Manage user-level background service |
| `service install/start/restart/stop/status/uninstall` | Manage background service lifecycle |
| `doctor` | Diagnose daemon/scheduler/channel freshness |
| `status` | Show full system status |
| `channel doctor` | Run health checks for configured channels |
Expand Down
62 changes: 60 additions & 2 deletions clients/agent-runtime/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ where
tokio::spawn(async move {
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
let mut gateway_port_conflict_logged = false;

loop {
crate::health::mark_component_ok(name);
Expand All @@ -160,8 +161,17 @@ where
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
crate::health::mark_component_error(name, e.to_string());
tracing::error!("Daemon component '{name}' failed: {e}");
let is_gateway_addr_in_use =
name == "gateway" && is_addr_in_use_error(&e);
let err_str = e.to_string();
crate::health::mark_component_error(name, &err_str);
tracing::error!("Daemon component '{name}' failed: {err_str}");
if is_gateway_addr_in_use && !gateway_port_conflict_logged {
gateway_port_conflict_logged = true;
tracing::warn!(
"Gateway port is already in use. This usually means another daemon/gateway instance is already running. If this happened after an upgrade, run `corvus service restart` instead of starting a second daemon process."
);
}
}
}

Expand All @@ -173,6 +183,13 @@ where
})
}

fn is_addr_in_use_error(error: &anyhow::Error) -> bool {
error
.chain()
.filter_map(|cause| cause.downcast_ref::<std::io::Error>())
.any(|io_error| io_error.kind() == std::io::ErrorKind::AddrInUse)
}

async fn run_heartbeat_worker(config: Config) -> Result<()> {
let observer: std::sync::Arc<dyn crate::observability::Observer> =
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
Expand Down Expand Up @@ -227,6 +244,23 @@ mod tests {
use super::*;
use tempfile::TempDir;

struct HealthComponentGuard {
name: &'static str,
}

impl HealthComponentGuard {
fn new(name: &'static str) -> Self {
crate::health::clear_component(name);
Self { name }
}
}

impl Drop for HealthComponentGuard {
fn drop(&mut self) {
crate::health::clear_component(self.name);
}
}

fn test_config(tmp: &TempDir) -> Config {
let config = Config {
workspace_dir: tmp.path().join("workspace"),
Expand Down Expand Up @@ -284,6 +318,30 @@ mod tests {
.contains("component exited unexpectedly"));
}

#[tokio::test]
async fn supervisor_logs_hint_on_gateway_addr_in_use() {
let _health_guard = HealthComponentGuard::new("gateway");
let handle = spawn_component_supervisor("gateway", 1, 1, || async {
Err(anyhow::Error::new(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
"Address already in use",
)))
});

tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;

let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["gateway"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("Address already in use"));
}

#[test]
fn detects_no_supervised_channels() {
let config = Config::default();
Expand Down
6 changes: 1 addition & 5 deletions clients/agent-runtime/src/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,6 @@ mod tests {
assert!(text.contains("corvus_heartbeat_ticks_total 1"));
}


#[test]
fn extract_bearer_token_accepts_case_insensitive_scheme_and_trims() {
let mut headers = HeaderMap::new();
Expand All @@ -1118,10 +1117,7 @@ mod tests {
let mut headers = HeaderMap::new();
let oversized = "x".repeat(TOKEN_MAX_LEN + 1);
let auth = format!("Bearer {oversized}");
headers.insert(
header::AUTHORIZATION,
HeaderValue::from_str(&auth).unwrap(),
);
headers.insert(header::AUTHORIZATION, HeaderValue::from_str(&auth).unwrap());

assert!(extract_bearer_token(&headers).is_none());
}
Expand Down
5 changes: 5 additions & 0 deletions clients/agent-runtime/src/health/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ pub fn bump_component_restart(component: &str) {
});
}

pub fn clear_component(component: &str) {
let mut map = registry().components.lock();
map.remove(component);
}

pub fn snapshot() -> HealthSnapshot {
let components = registry().components.lock().clone();

Expand Down
17 changes: 15 additions & 2 deletions clients/agent-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
dead_code
)]

use clap::Subcommand;
use clap::{Subcommand, ValueEnum};
use serde::{Deserialize, Serialize};

pub mod agent;
Expand Down Expand Up @@ -71,13 +71,26 @@ pub mod util;

pub use config::Config;

#[derive(Debug, Clone, Copy, ValueEnum, Serialize, Deserialize, PartialEq, Eq)]
pub enum ServiceLingerMode {
Keep,
On,
Off,
}

/// Service management subcommands
#[derive(Subcommand, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ServiceCommands {
/// Install daemon service unit for auto-start and restart
Install,
Install {
/// Linux only: keep user service active without an interactive session
#[arg(long, value_enum, default_value_t = ServiceLingerMode::Keep)]
linger: ServiceLingerMode,
},
/// Start daemon service
Start,
/// Restart daemon service
Restart,
/// Stop daemon service
Stop,
/// Check daemon service status
Expand Down
48 changes: 29 additions & 19 deletions clients/agent-runtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use anyhow::{bail, Result};
use clap::{Parser, Subcommand};
use dialoguer::{Input, Password};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{info, warn};
use tracing_subscriber::{fmt, EnvFilter};

Expand Down Expand Up @@ -70,12 +71,15 @@ mod skillforge;
mod skills;
mod tools;
mod tunnel;
mod update;
mod util;

use config::Config;

// Re-export so binary's hardware/peripherals modules can use crate::HardwareCommands etc.
pub use corvus::{HardwareCommands, PeripheralCommands};
// Re-export so binary modules can use crate::...Commands from the library crate.
pub use corvus::{
HardwareCommands, PeripheralCommands, ServiceCommands, ServiceLingerMode,
};

/// `Corvus` - Zero overhead. Zero compromise. 100% Rust.
#[derive(Parser, Debug)]
Expand All @@ -88,20 +92,6 @@ struct Cli {
command: Commands,
}

#[derive(Subcommand, Debug)]
enum ServiceCommands {
/// Install daemon service unit for auto-start and restart
Install,
/// Start daemon service
Start,
/// Stop daemon service
Stop,
/// Check daemon service status
Status,
/// Uninstall daemon service unit
Uninstall,
}

#[derive(Subcommand, Debug)]
enum Commands {
/// Initialize your workspace and configuration
Expand Down Expand Up @@ -591,9 +581,12 @@ async fn main() -> Result<()> {
model,
temperature,
peripheral,
} => agent::run(config, message, provider, model, temperature, peripheral)
.await
.map(|_| ()),
} => {
maybe_print_update_notice_bounded(&config).await;
agent::run(config, message, provider, model, temperature, peripheral)
.await
.map(|_| ())
}
Comment thread
yacosta738 marked this conversation as resolved.

Commands::Gateway { port, host } => {
let port = port.unwrap_or(config.gateway.port);
Expand All @@ -607,6 +600,10 @@ async fn main() -> Result<()> {
}

Commands::Daemon { port, host } => {
let update_config = config.clone();
tokio::spawn(async move {
update::maybe_print_update_notice(&update_config).await;
});
let port = port.unwrap_or(config.gateway.port);
let host = host.unwrap_or_else(|| config.gateway.host.clone());
if port == 0 {
Expand All @@ -618,6 +615,7 @@ async fn main() -> Result<()> {
}

Commands::Status => {
maybe_print_update_notice_bounded(&config).await;
println!("🦀 Corvus Status");
println!();
println!("Version: {}", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -881,6 +879,18 @@ async fn main() -> Result<()> {
}
}

async fn maybe_print_update_notice_bounded(config: &Config) {
if tokio::time::timeout(
Duration::from_millis(500),
update::maybe_print_update_notice(config),
)
.await
.is_err()
{
tracing::debug!("Update notice check timed out after 500ms");
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PendingOpenAiLogin {
profile: String,
Expand Down
Loading
Loading