diff --git a/runner/src/docker.rs b/runner/src/docker.rs index 56650ac..53f714b 100644 --- a/runner/src/docker.rs +++ b/runner/src/docker.rs @@ -8,13 +8,13 @@ use std::collections::HashMap; use bollard::container::{ - Config as ContainerConfig, CreateContainerOptions, LogOutput, RemoveContainerOptions, - StopContainerOptions, + Config as ContainerConfig, CreateContainerOptions, ListContainersOptions, LogOutput, + RemoveContainerOptions, StopContainerOptions, }; use bollard::exec::{CreateExecOptions, ResizeExecOptions, StartExecResults}; use bollard::image::CreateImageOptions; -use bollard::models::HostConfig; -use bollard::network::CreateNetworkOptions; +use bollard::models::{ContainerSummary, HostConfig, Network}; +use bollard::network::{CreateNetworkOptions, ListNetworksOptions}; use bollard::Docker; use tokio::io::AsyncWrite; use tokio_stream::{Stream, StreamExt as _}; @@ -28,10 +28,8 @@ use crate::error::{RelayError, RelayResult}; // --------------------------------------------------------------------------- /// Label marking all Relay-managed Docker resources. -#[allow(dead_code)] // Used by create_container, consumed by Wave-3 cleanup (T-9) const LABEL_MANAGED: &str = "relay.managed"; /// Label storing the session ID on a container. -#[allow(dead_code)] // Used by create_container, consumed by Wave-3 cleanup (T-9) const LABEL_SESSION_ID: &str = "relay.session_id"; // --------------------------------------------------------------------------- @@ -66,7 +64,6 @@ impl std::fmt::Debug for ExecHandle { /// /// Thread-safe (`Clone + Send + Sync`) — bollard `Docker` client is Arc-backed. #[derive(Clone)] -#[allow(dead_code)] // Consumed by Wave-2/3 modules (T-7b, T-8) pub struct DockerOrchestrator { client: Docker, config: DockerConfig, @@ -81,7 +78,7 @@ impl std::fmt::Debug for DockerOrchestrator { } } -#[allow(dead_code)] // All methods consumed by Wave-2/3 modules (T-7b, T-8, T-9) +#[allow(dead_code)] // Most methods consumed by Wave-2/3 modules (T-7b, T-8); reconciliation uses list_* methods. impl DockerOrchestrator { /// Create a new orchestrator connected to the Docker daemon. /// @@ -408,6 +405,57 @@ impl DockerOrchestrator { &self.config.default_image } + // ----------------------------------------------------------------------- + // Reconciliation helpers + // ----------------------------------------------------------------------- + + /// List all containers that carry the `relay.managed=true` label. + /// + /// Includes stopped/exited containers (`all: true`) so that orphans that + /// died while the runner was down are also visible. + pub async fn list_managed_containers(&self) -> RelayResult> { + let mut filters = HashMap::new(); + filters.insert("label".to_string(), vec![format!("{LABEL_MANAGED}=true")]); + + let opts = ListContainersOptions { + all: true, + filters, + ..Default::default() + }; + + self.client + .list_containers(Some(opts)) + .await + .map_err(RelayError::Docker) + } + + /// List all Docker networks that carry the `relay.managed=true` label. + pub async fn list_managed_networks(&self) -> RelayResult> { + let mut filters = HashMap::new(); + filters.insert("label".to_string(), vec![format!("{LABEL_MANAGED}=true")]); + + let opts = ListNetworksOptions { filters }; + + self.client + .list_networks(Some(opts)) + .await + .map_err(RelayError::Docker) + } + + /// Remove a Docker network by name, ignoring "not found" errors. + pub async fn remove_network_if_exists(&self, name: &str) -> RelayResult<()> { + match self.client.remove_network(name).await { + Ok(()) => { + debug!(network = %name, "orphan network removed"); + Ok(()) + } + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, .. + }) => Ok(()), + Err(e) => Err(RelayError::Docker(e)), + } + } + // ----------------------------------------------------------------------- // Internal helpers // ----------------------------------------------------------------------- diff --git a/runner/src/main.rs b/runner/src/main.rs index bdd7e48..88969ce 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -7,6 +7,7 @@ mod backend; mod backends; mod client_registry; mod config; +mod docker; mod docker_exec_poc; mod error; mod event_bus; @@ -14,6 +15,7 @@ mod grpc_service; mod isolation; mod mdns; mod pty; +mod reconciliation; #[allow(clippy::needless_for_each)] // utoipa OpenApi derive macro triggers this mod rest_api; mod session; @@ -32,9 +34,11 @@ use tracing_subscriber::{fmt, EnvFilter}; use crate::auth::{AuthInterceptor, RateLimiter}; use crate::client_registry::ClientRegistry; use crate::config::Config; +use crate::docker::DockerOrchestrator; use crate::grpc_service::TerminalServiceImpl; use crate::mdns::MdnsAdvertiser; use crate::session_manager::SessionManager; +use crate::store::Store; use crate::tls::CertificateAuthority; // --------------------------------------------------------------------------- @@ -566,6 +570,36 @@ async fn run_server() -> ! { let auth = Arc::new(AuthInterceptor::new(token_hash, require_mtls)); let rate_limiter = Arc::new(RateLimiter::new(10, Duration::from_secs(60))); + // 4a. Startup reconciliation — sync SQLite state with live Docker state. + // + // Only runs when a database file is present (cloud mode). Errors are + // tolerated: a partial reconciliation is better than refusing to start. + { + let db_path = config.data_dir.join("relay.db"); + if db_path.exists() { + let db_url = format!("sqlite:{}", db_path.display()); + match Store::new(&db_url).await { + Ok(store) => match DockerOrchestrator::new(config.docker.clone()).await { + Ok(docker) => { + reconciliation::reconcile(&store, &docker).await; + } + Err(e) => { + tracing::warn!( + error = %e, + "reconciliation: Docker unavailable, skipping" + ); + } + }, + Err(e) => { + tracing::warn!( + error = %e, + "reconciliation: failed to open DB, skipping" + ); + } + } + } + } + // 5. Create gRPC service. let service = TerminalServiceImpl::new( session_manager.clone(), diff --git a/runner/src/reconciliation.rs b/runner/src/reconciliation.rs new file mode 100644 index 0000000..62d9d75 --- /dev/null +++ b/runner/src/reconciliation.rs @@ -0,0 +1,440 @@ +//! Server-startup reconciliation: sync SQLite state with live Docker state. +//! +//! Called once during [`run_server`] startup, before accepting connections. +//! +//! ## What it does +//! +//! 1. **Dead sessions** — sessions with state `Starting` or `Running` in DB +//! whose container no longer exists (or has exited) are marked `Stopped`. +//! +//! 2. **Orphan containers** — containers labelled `relay.managed=true` that +//! have no corresponding DB session are force-removed. +//! +//! 3. **Orphan networks** — Docker networks labelled `relay.managed=true` +//! that are not attached to any managed container are removed. +//! +//! All errors during reconciliation are logged and tolerated — a partial +//! reconciliation is better than refusing to start. + +use std::collections::HashSet; + +use tracing::{error, info, warn}; + +use crate::docker::DockerOrchestrator; +use crate::store::Store; + +// --------------------------------------------------------------------------- +// Public entry point +// --------------------------------------------------------------------------- + +/// Run the full reconciliation pass. +/// +/// Errors from individual reconciliation steps are logged but not propagated; +/// the server should start even if cleanup is partial. +pub async fn reconcile(store: &Store, docker: &DockerOrchestrator) { + info!("reconciliation: starting"); + + let (dead_sessions, orphan_containers, orphan_networks) = tokio::join!( + reconcile_dead_sessions(store, docker), + reconcile_orphan_containers(store, docker), + reconcile_orphan_networks(docker), + ); + + info!( + dead_sessions, + orphan_containers, orphan_networks, "reconciliation: complete" + ); +} + +// --------------------------------------------------------------------------- +// Step 1 — mark dead sessions as Stopped +// --------------------------------------------------------------------------- + +/// Mark `Starting`/`Running` sessions whose container is absent or not running +/// as `Stopped`. +/// +/// Returns the number of sessions updated. +async fn reconcile_dead_sessions(store: &Store, docker: &DockerOrchestrator) -> u32 { + // Collect the set of container IDs that are currently running in Docker. + let running_ids: HashSet = match docker.list_managed_containers().await { + Ok(containers) => containers + .into_iter() + .filter_map(|c| { + // Only containers in a "running" state count as alive. + let is_running = c + .state + .as_deref() + .is_some_and(|s| s.eq_ignore_ascii_case("running")); + if is_running { + c.id + } else { + None + } + }) + .collect(), + Err(e) => { + error!(error = %e, "reconciliation: failed to list Docker containers"); + // Without Docker data we cannot safely mark anything stopped. + return 0; + } + }; + + let active_sessions = match store.list_active_sessions().await { + Ok(s) => s, + Err(e) => { + error!(error = %e, "reconciliation: failed to list active sessions from DB"); + return 0; + } + }; + + let mut stopped = 0u32; + + for session in active_sessions { + let container_alive = session + .container_id + .as_deref() + .is_some_and(|cid| running_ids.contains(cid)); + + if !container_alive { + let container_id = session.container_id.as_deref().unwrap_or(""); + match store + .force_stop_session(&session.id, "container not found after runner restart") + .await + { + Ok(true) => { + info!( + session_id = %session.id, + container_id, + "reconciliation: session marked Stopped (container dead)" + ); + stopped += 1; + } + Ok(false) => { + // Already stopped by a concurrent writer — nothing to do. + } + Err(e) => { + warn!( + session_id = %session.id, + error = %e, + "reconciliation: failed to stop dead session" + ); + } + } + } + } + + stopped +} + +// --------------------------------------------------------------------------- +// Step 2 — remove orphan containers +// --------------------------------------------------------------------------- + +/// Remove managed containers that have no corresponding DB session. +/// +/// Returns the number of containers removed. +async fn reconcile_orphan_containers(store: &Store, docker: &DockerOrchestrator) -> u32 { + let containers = match docker.list_managed_containers().await { + Ok(c) => c, + Err(e) => { + error!(error = %e, "reconciliation: failed to list managed containers"); + return 0; + } + }; + + let mut removed = 0u32; + + for container in containers { + let container_id = match &container.id { + Some(id) => id.clone(), + None => continue, + }; + + // Extract the session ID from the container's labels. + let session_id = container + .labels + .as_ref() + .and_then(|l| l.get("relay.session_id")) + .cloned(); + + let is_orphan = match &session_id { + None => { + // No session label at all — treat as orphan. + true + } + Some(sid) => match store.get_session(sid).await { + Ok(Some(_)) => false, // Session exists in DB — keep container. + Ok(None) => true, // No DB record — orphan. + Err(e) => { + warn!( + container_id = %container_id, + session_id = %sid, + error = %e, + "reconciliation: DB lookup failed, skipping container" + ); + false // Skip on error to avoid false-positive removal. + } + }, + }; + + if is_orphan { + match docker.remove_container(&container_id).await { + Ok(()) => { + info!( + container_id = %container_id, + session_id = ?session_id, + "reconciliation: orphan container removed" + ); + removed += 1; + } + Err(e) => { + warn!( + container_id = %container_id, + error = %e, + "reconciliation: failed to remove orphan container" + ); + } + } + } + } + + removed +} + +// --------------------------------------------------------------------------- +// Step 3 — remove orphan networks +// --------------------------------------------------------------------------- + +/// Remove managed networks that are not connected to any managed container. +/// +/// Returns the number of networks removed. +async fn reconcile_orphan_networks(docker: &DockerOrchestrator) -> u32 { + // Re-fetch containers (after orphan container removal) to get the live set. + let containers = match docker.list_managed_containers().await { + Ok(c) => c, + Err(e) => { + error!(error = %e, "reconciliation: failed to list containers for network check"); + return 0; + } + }; + + // Build the set of network names that still have at least one container. + let occupied_networks: HashSet = containers + .into_iter() + .filter_map(|c| c.network_settings) + .filter_map(|ns| ns.networks) + .flat_map(std::collections::HashMap::into_keys) + .collect(); + + let networks = match docker.list_managed_networks().await { + Ok(n) => n, + Err(e) => { + error!(error = %e, "reconciliation: failed to list managed networks"); + return 0; + } + }; + + let mut removed = 0u32; + + for network in networks { + let name = match &network.name { + Some(n) => n.clone(), + None => continue, + }; + + if !occupied_networks.contains(&name) { + match docker.remove_network_if_exists(&name).await { + Ok(()) => { + info!(network = %name, "reconciliation: orphan network removed"); + removed += 1; + } + Err(e) => { + warn!( + network = %name, + error = %e, + "reconciliation: failed to remove orphan network" + ); + } + } + } + } + + removed +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; + use std::str::FromStr; + + use crate::store::{CreateProject, CreateSession, CreateWorkspace, SessionState, Store}; + + async fn make_store() -> Store { + let opts = SqliteConnectOptions::from_str("sqlite::memory:") + .expect("valid sqlite connection string") + .foreign_keys(true); + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect_with(opts) + .await + .expect("connect to in-memory sqlite"); + Store::from_pool_with_migrations(pool) + .await + .expect("run migrations") + } + + #[tokio::test] + async fn list_active_sessions_returns_starting_and_running() { + let store = make_store().await; + + // Create project → workspace → 3 sessions in different states. + let proj = store + .create_project(CreateProject { + name: "p".into(), + git_url: "https://example.com/repo.git".into(), + local_path: "/tmp/p".into(), + }) + .await + .expect("create project"); + + let ws = store + .create_workspace(CreateWorkspace { + project_id: proj.id.clone(), + branch: "main".into(), + }) + .await + .expect("create workspace"); + + // Session 1: Starting (default after create_session) + let s1 = store + .create_session(CreateSession { + workspace_id: ws.id.clone(), + profile: "default".into(), + }) + .await + .expect("create session s1"); + + // Session 2: Running + let s2 = store + .create_session(CreateSession { + workspace_id: ws.id.clone(), + profile: "default".into(), + }) + .await + .expect("create session s2"); + store + .update_session_state( + &s2.id, + SessionState::Running, + Some("cid-running"), + None, + &s2.updated_at.to_rfc3339(), + ) + .await + .expect("update s2 to Running"); + + // Session 3: Stopped + let s3 = store + .create_session(CreateSession { + workspace_id: ws.id.clone(), + profile: "default".into(), + }) + .await + .expect("create session s3"); + store + .update_session_state( + &s3.id, + SessionState::Stopped, + None, + None, + &s3.updated_at.to_rfc3339(), + ) + .await + .expect("update s3 to Stopped"); + + let active = store + .list_active_sessions() + .await + .expect("list active sessions"); + let ids: Vec<&str> = active.iter().map(|s| s.id.as_str()).collect(); + + assert!( + ids.contains(&s1.id.as_str()), + "Starting session should be active" + ); + assert!( + ids.contains(&s2.id.as_str()), + "Running session should be active" + ); + assert!( + !ids.contains(&s3.id.as_str()), + "Stopped session should not be active" + ); + } + + #[tokio::test] + async fn force_stop_session_updates_state() { + let store = make_store().await; + + let proj = store + .create_project(CreateProject { + name: "p2".into(), + git_url: "https://example.com/repo.git".into(), + local_path: "/tmp/p2".into(), + }) + .await + .expect("create project"); + + let ws = store + .create_workspace(CreateWorkspace { + project_id: proj.id.clone(), + branch: "main".into(), + }) + .await + .expect("create workspace"); + + let session = store + .create_session(CreateSession { + workspace_id: ws.id.clone(), + profile: "default".into(), + }) + .await + .expect("create session"); + + // First call should succeed (state is Starting). + let updated = store + .force_stop_session(&session.id, "container not found after runner restart") + .await + .expect("force_stop_session first call"); + assert!( + updated, + "force_stop_session should return true when updating Starting" + ); + + // Verify state is now Stopped. + let fetched = store + .get_session(&session.id) + .await + .expect("get_session query") + .expect("session should exist"); + assert_eq!(fetched.state, SessionState::Stopped); + assert_eq!( + fetched.error_reason.as_deref(), + Some("container not found after runner restart") + ); + + // Second call on already-stopped session should return false. + let updated_again = store + .force_stop_session(&session.id, "should not change") + .await + .expect("force_stop_session second call"); + assert!( + !updated_again, + "force_stop_session should return false when already Stopped" + ); + } +} diff --git a/runner/src/store.rs b/runner/src/store.rs index d022376..8ab0fa8 100644 --- a/runner/src/store.rs +++ b/runner/src/store.rs @@ -571,6 +571,48 @@ impl Store { Ok(rows_affected > 0) } + /// List all non-deleted sessions that are in `Starting` or `Running` state. + /// + /// Used by the reconciliation pass at server startup to find sessions whose + /// containers may have disappeared while the runner was down. + pub async fn list_active_sessions(&self) -> Result, sqlx::Error> { + let rows = sqlx::query_as::<_, SessionRow>( + "SELECT id, workspace_id, container_id, state, profile, + created_at, updated_at, stopped_at, error_reason, deleted_at + FROM sessions + WHERE deleted_at IS NULL AND state IN ('Starting', 'Running') + ORDER BY created_at", + ) + .fetch_all(&self.pool) + .await?; + + rows.into_iter().map(Session::try_from).collect() + } + + /// Force-set a session's state to `Stopped` without an optimistic-concurrency + /// check. Used only by the reconciliation pass where stale-read retry is not + /// meaningful (the container is already gone). + pub async fn force_stop_session( + &self, + id: &str, + error_reason: &str, + ) -> Result { + let now = now_str(); + let rows_affected = sqlx::query( + "UPDATE sessions + SET state = 'Stopped', error_reason = ?1, stopped_at = ?2, updated_at = ?2 + WHERE id = ?3 AND deleted_at IS NULL AND state IN ('Starting', 'Running')", + ) + .bind(error_reason) + .bind(&now) + .bind(id) + .execute(&self.pool) + .await? + .rows_affected(); + + Ok(rows_affected > 0) + } + /// Soft-delete a session. Returns `true` if the row was marked deleted. pub async fn soft_delete_session(&self, id: &str) -> Result { let now = now_str();