Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions runner/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
use std::collections::HashMap;

use bollard::container::{
Config as ContainerConfig, CreateContainerOptions, LogOutput, RemoveContainerOptions,
StopContainerOptions,
Config as ContainerConfig, CreateContainerOptions, ListContainersOptions, LogOutput,
RemoveContainerOptions, StopContainerOptions,
};
use bollard::exec::{CreateExecOptions, ResizeExecOptions, StartExecResults};
use bollard::image::CreateImageOptions;
use bollard::models::HostConfig;
use bollard::network::CreateNetworkOptions;
use bollard::models::{ContainerSummary, HostConfig, Network};
use bollard::network::{CreateNetworkOptions, ListNetworksOptions};
use bollard::Docker;
use tokio::io::AsyncWrite;
use tokio_stream::{Stream, StreamExt as _};
Expand All @@ -28,10 +28,8 @@ use crate::error::{RelayError, RelayResult};
// ---------------------------------------------------------------------------

/// Label marking all Relay-managed Docker resources.
#[allow(dead_code)] // Used by create_container, consumed by Wave-3 cleanup (T-9)
const LABEL_MANAGED: &str = "relay.managed";
/// Label storing the session ID on a container.
#[allow(dead_code)] // Used by create_container, consumed by Wave-3 cleanup (T-9)
const LABEL_SESSION_ID: &str = "relay.session_id";

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -66,7 +64,6 @@ impl std::fmt::Debug for ExecHandle {
///
/// Thread-safe (`Clone + Send + Sync`) — bollard `Docker` client is Arc-backed.
#[derive(Clone)]
#[allow(dead_code)] // Consumed by Wave-2/3 modules (T-7b, T-8)
pub struct DockerOrchestrator {
client: Docker,
config: DockerConfig,
Expand All @@ -81,7 +78,7 @@ impl std::fmt::Debug for DockerOrchestrator {
}
}

#[allow(dead_code)] // All methods consumed by Wave-2/3 modules (T-7b, T-8, T-9)
#[allow(dead_code)] // Most methods consumed by Wave-2/3 modules (T-7b, T-8); reconciliation uses list_* methods.
impl DockerOrchestrator {
/// Create a new orchestrator connected to the Docker daemon.
///
Expand Down Expand Up @@ -408,6 +405,57 @@ impl DockerOrchestrator {
&self.config.default_image
}

// -----------------------------------------------------------------------
// Reconciliation helpers
// -----------------------------------------------------------------------

/// List all containers that carry the `relay.managed=true` label.
///
/// Includes stopped/exited containers (`all: true`) so that orphans that
/// died while the runner was down are also visible.
pub async fn list_managed_containers(&self) -> RelayResult<Vec<ContainerSummary>> {
let mut filters = HashMap::new();
filters.insert("label".to_string(), vec![format!("{LABEL_MANAGED}=true")]);

let opts = ListContainersOptions {
all: true,
filters,
..Default::default()
};

self.client
.list_containers(Some(opts))
.await
.map_err(RelayError::Docker)
}

/// List all Docker networks that carry the `relay.managed=true` label.
pub async fn list_managed_networks(&self) -> RelayResult<Vec<Network>> {
let mut filters = HashMap::new();
filters.insert("label".to_string(), vec![format!("{LABEL_MANAGED}=true")]);

let opts = ListNetworksOptions { filters };

self.client
.list_networks(Some(opts))
.await
.map_err(RelayError::Docker)
}

/// Remove a Docker network by name, ignoring "not found" errors.
pub async fn remove_network_if_exists(&self, name: &str) -> RelayResult<()> {
match self.client.remove_network(name).await {
Ok(()) => {
debug!(network = %name, "orphan network removed");
Ok(())
}
Err(bollard::errors::Error::DockerResponseServerError {
status_code: 404, ..
}) => Ok(()),
Err(e) => Err(RelayError::Docker(e)),
}
}

// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
Expand Down
34 changes: 34 additions & 0 deletions runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ mod backend;
mod backends;
mod client_registry;
mod config;
mod docker;
mod docker_exec_poc;
mod error;
mod event_bus;
mod grpc_service;
mod isolation;
mod mdns;
mod pty;
mod reconciliation;
#[allow(clippy::needless_for_each)] // utoipa OpenApi derive macro triggers this
mod rest_api;
mod session;
Expand All @@ -32,9 +34,11 @@ use tracing_subscriber::{fmt, EnvFilter};
use crate::auth::{AuthInterceptor, RateLimiter};
use crate::client_registry::ClientRegistry;
use crate::config::Config;
use crate::docker::DockerOrchestrator;
use crate::grpc_service::TerminalServiceImpl;
use crate::mdns::MdnsAdvertiser;
use crate::session_manager::SessionManager;
use crate::store::Store;
use crate::tls::CertificateAuthority;

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -566,6 +570,36 @@ async fn run_server() -> ! {
let auth = Arc::new(AuthInterceptor::new(token_hash, require_mtls));
let rate_limiter = Arc::new(RateLimiter::new(10, Duration::from_secs(60)));

// 4a. Startup reconciliation — sync SQLite state with live Docker state.
//
// Only runs when a database file is present (cloud mode). Errors are
// tolerated: a partial reconciliation is better than refusing to start.
{
let db_path = config.data_dir.join("relay.db");
if db_path.exists() {
let db_url = format!("sqlite:{}", db_path.display());
match Store::new(&db_url).await {
Ok(store) => match DockerOrchestrator::new(config.docker.clone()).await {
Ok(docker) => {
reconciliation::reconcile(&store, &docker).await;
}
Err(e) => {
tracing::warn!(
error = %e,
"reconciliation: Docker unavailable, skipping"
);
}
},
Err(e) => {
tracing::warn!(
error = %e,
"reconciliation: failed to open DB, skipping"
);
}
}
}
}

// 5. Create gRPC service.
let service = TerminalServiceImpl::new(
session_manager.clone(),
Expand Down
Loading
Loading