Skip to content
27 changes: 20 additions & 7 deletions fc-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,24 @@ pub async fn run() -> Result<()> {
// Breaks the 30s poll loop when POLLHUP is not delivered after snapshot restore.
let restore_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

// Exec server rebind signal — shared by restore-epoch watcher and cache-ready handshake.
// After vsock transport reset, the listener's AsyncFd epoll becomes stale.
let exec_rebind = std::sync::Arc::new(tokio::sync::Notify::new());

// Start restore-epoch watcher
let watcher_output = output.clone();
let watcher_restore_flag = restore_flag.clone();
let watcher_exec_rebind = exec_rebind.clone();
tokio::spawn(async move {
eprintln!("[fc-agent] starting restore-epoch watcher");
mmds::watch_restore_epoch(watcher_output, watcher_restore_flag).await;
mmds::watch_restore_epoch(watcher_output, watcher_restore_flag, watcher_exec_rebind).await;
});

// Start exec server
// Start exec server with rebind signal for vsock transport reset recovery
let (exec_ready_tx, exec_ready_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async {
exec::run_server(exec_ready_tx).await;
let exec_rebind_clone = exec_rebind.clone();
tokio::spawn(async move {
exec::run_server(exec_ready_tx, exec_rebind_clone).await;
});

match tokio::time::timeout(Duration::from_secs(5), exec_ready_rx).await {
Expand Down Expand Up @@ -214,6 +220,11 @@ pub async fn run() -> Result<()> {
eprintln!("[fc-agent] image digest: {}", digest);
if container::notify_cache_ready_and_wait(&digest, &restore_flag) {
eprintln!("[fc-agent] cache ready notification acknowledged");
// Pre-start snapshot was taken and we've been restored into a new
// Firecracker instance. The vsock transport was reset, which
// invalidates the exec server's listener socket (stale AsyncFd epoll).
// Signal it to re-bind.
exec_rebind.notify_one();
} else {
eprintln!("[fc-agent] WARNING: cache-ready handshake failed, continuing");
}
Expand All @@ -225,9 +236,11 @@ pub async fn run() -> Result<()> {

// After cache-ready handshake, Firecracker may have created a pre-start snapshot.
// Snapshot creation resets all vsock connections (VIRTIO_VSOCK_EVENT_TRANSPORT_RESET).
// Reconnect the output vsock. FUSE mounts handle reconnection automatically via
// the reconnectable multiplexer — no explicit check needed.
output.reconnect();
// Output vsock reconnection is handled by handle_clone_restore() which is triggered
// by the restore-epoch watcher. Do NOT reconnect here — a second reconnect() call
// races with handle_clone_restore's reconnect and causes the output writer to cycle
// through multiple ghost connections (Notify stored-permit cascade), dropping data.
// FUSE mounts handle reconnection automatically via the reconnectable multiplexer.

// VM-level setup: hostname and sysctl (runs as root before container starts).
// When using --user, the container runs as non-root and can't do these.
Expand Down
21 changes: 20 additions & 1 deletion fc-agent/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,26 @@ pub fn notify_cache_ready_and_wait(
return false;
}
Ok(0) => {
// Timeout on this poll interval — loop and check deadline
// Poll timeout — actively probe the vsock connection.
// After snapshot restore, the vsock transport is reset but the kernel
// may not deliver POLLHUP on the restored fd (observed in rootless mode).
// A write to a dead connection fails immediately with EPIPE or ECONNRESET,
// which reliably detects that a snapshot was taken and we've been restored.
match write(&sock, b"\n") {
Err(nix::errno::Errno::EPIPE)
| Err(nix::errno::Errno::ECONNRESET)
| Err(nix::errno::Errno::ENOTCONN)
| Err(nix::errno::Errno::ECONNREFUSED) => {
eprintln!("[fc-agent] cache-ack connection dead (write probe), snapshot was taken");
return true;
}
Err(nix::errno::Errno::EAGAIN) => {
// Connection alive but can't write now — continue polling
}
_ => {
// Write succeeded or other error — connection still alive, continue
}
}
continue;
}
Ok(_) => {}
Expand Down
63 changes: 55 additions & 8 deletions fc-agent/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,27 @@ use std::sync::Arc;

use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Notify};

use crate::types::{ExecRequest, ExecResponse};
use crate::vsock;

/// Run the exec server. Sends ready signal when listening.
pub async fn run_server(ready_tx: tokio::sync::oneshot::Sender<()>) {
///
/// The `rebind_signal` handles vsock transport reset after snapshot restore.
/// When Firecracker creates a snapshot and restores, the vsock transport is reset
/// (VIRTIO_VSOCK_EVENT_TRANSPORT_RESET). The listener's AsyncFd epoll registration
/// becomes stale — accept() hangs forever because tokio never delivers readability
/// events for incoming connections. On signal, re-registers the epoll via
/// `VsockListener::re_register()` (extracts fd, re-wraps in new AsyncFd) without
/// closing or rebinding the socket. Falls back to full rebind if re-register fails.
pub async fn run_server(ready_tx: tokio::sync::oneshot::Sender<()>, rebind_signal: Arc<Notify>) {
eprintln!(
"[fc-agent] starting exec server on vsock port {}",
vsock::EXEC_PORT
);

let listener = match vsock::VsockListener::bind(vsock::EXEC_PORT) {
let mut listener = match vsock::VsockListener::bind(vsock::EXEC_PORT) {
Ok(l) => l,
Err(e) => {
eprintln!("[fc-agent] ERROR: failed to bind exec server: {}", e);
Expand All @@ -32,12 +40,51 @@ pub async fn run_server(ready_tx: tokio::sync::oneshot::Sender<()>) {
let _ = ready_tx.send(());

loop {
match listener.accept().await {
Ok(client_fd) => {
tokio::spawn(handle_connection(client_fd));
tokio::select! {
result = listener.accept() => {
match result {
Ok(client_fd) => {
tokio::spawn(handle_connection(client_fd));
}
Err(e) => {
eprintln!("[fc-agent] exec server accept error: {}", e);
}
}
}
Err(e) => {
eprintln!("[fc-agent] exec server accept error: {}", e);
_ = rebind_signal.notified() => {
eprintln!("[fc-agent] exec server: vsock transport reset, re-registering listener");
// Re-register the AsyncFd (epoll) without closing the socket.
// Drop+rebind fails when accepted connections keep the port bound.
match listener.re_register() {
Ok(l) => {
listener = l;
eprintln!("[fc-agent] exec server: re-registered on vsock port {}", vsock::EXEC_PORT);
}
Err(e) => {
// re_register consumed the listener; socket is closed.
eprintln!("[fc-agent] exec server: re-register failed: {}, trying full rebind", e);
let mut retries = 0;
loop {
match vsock::VsockListener::bind(vsock::EXEC_PORT) {
Ok(l) => {
listener = l;
eprintln!("[fc-agent] exec server: re-bound to vsock port {}", vsock::EXEC_PORT);
break;
}
Err(e2) => {
retries += 1;
if retries >= 50 {
eprintln!("[fc-agent] exec server: re-bind failed after {} retries: {}", retries, e2);
eprintln!("[fc-agent] exec server: giving up, exec will be unavailable");
return;
}
eprintln!("[fc-agent] exec server: re-bind failed: {}, retrying ({}/50)", e2, retries);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions fc-agent/src/mmds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ async fn fetch_latest_metadata(client: &reqwest::Client) -> Result<LatestMetadat
pub async fn watch_restore_epoch(
output: OutputHandle,
restore_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
exec_rebind: std::sync::Arc<tokio::sync::Notify>,
) {
let mut last_epoch: Option<String> = None;

Expand Down Expand Up @@ -165,13 +166,13 @@ pub async fn watch_restore_epoch(
// Must be set BEFORE handle_clone_restore so the poll loop
// exits before output reconnect changes vsock state.
restore_flag.store(true, std::sync::atomic::Ordering::Release);
crate::restore::handle_clone_restore(&output).await;
crate::restore::handle_clone_restore(&output, &exec_rebind).await;
last_epoch = metadata.restore_epoch;
}
Some(prev) if prev != current => {
eprintln!("[fc-agent] restore-epoch changed: {} -> {}", prev, current,);
restore_flag.store(true, std::sync::atomic::Ordering::Release);
crate::restore::handle_clone_restore(&output).await;
crate::restore::handle_clone_restore(&output, &exec_rebind).await;
last_epoch = metadata.restore_epoch;
}
_ => {}
Expand Down
Loading
Loading