From 2b6aa60c9ea3e703657141e91e5bff71faf287a8 Mon Sep 17 00:00:00 2001 From: EJ Campbell Date: Thu, 19 Feb 2026 03:25:50 -0800 Subject: [PATCH 1/2] fix: output pipeline drops data after snapshot restore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three bugs caused container stdout to not reach the host after snapshot restore: 1. put_mmds before VM resume: Firecracker accepts PUT /mmds while the VM is paused but the guest-visible data isn't updated. fc-agent never sees the new restore-epoch so it never reconnects. Fix: move put_mmds after resume. 2. Unconditional notify_one: cmd_snapshot_run fired output_reconnect.notify_one() for all snapshot types. For pre-start snapshots (container not yet running), the listener has no dead connection to drop. The stored Notify permit poisons the first real connection by immediately triggering the reconnect branch in select!, dropping a valid connection. Fix: only notify for startup snapshots. 3. Listener stuck on stale connection: fc-agent reconnects multiple times during snapshot create/restore cycles. Each reconnect creates a new vsock connection queued in the listener's accept backlog. The listener accepted the FIRST connection and blocked on read_line, while fc-agent wrote to the LATEST connection. Fix: race read_line against listener.accept() in tokio::select!, always switching to the newest connection. Also adds output verification to test_heavy_output_after_snapshot_restore to catch this class of bug — the test previously only checked health and exec, not that container output actually reached the host. --- src/commands/common.rs | 44 ++++---- src/commands/podman/listeners.rs | 165 ++++++++++++++++++----------- src/commands/snapshot.rs | 13 ++- tests/test_output_after_restore.rs | 62 +++++++++-- 4 files changed, 186 insertions(+), 98 deletions(-) diff --git a/src/commands/common.rs b/src/commands/common.rs index 4cc40511..929169d9 100644 --- a/src/commands/common.rs +++ b/src/commands/common.rs @@ -816,29 +816,7 @@ pub async fn restore_from_snapshot( "disk patch completed" ); - // Signal fc-agent to flush ARP cache via MMDS restore-epoch update - let restore_epoch = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .context("system time before Unix epoch")? - .as_secs(); - - client - .put_mmds(serde_json::json!({ - "latest": { - "host-time": chrono::Utc::now().timestamp().to_string(), - "restore-epoch": restore_epoch.to_string() - } - })) - .await - .context("updating MMDS with restore-epoch")?; - info!( - restore_epoch = restore_epoch, - "signaled fc-agent to flush ARP via MMDS" - ); - // FCVM_KVM_TRACE: enable KVM ftrace around VM resume for debugging snapshot restore. - // Captures KVM exit reasons (NPF, shutdown, etc.) to /tmp/fcvm-kvm-trace-{vm_id}.log. - // Requires: sudo access (ftrace needs debugfs). Safe to set without sudo — just skips. let kvm_trace = if std::env::var("FCVM_KVM_TRACE").is_ok() { match crate::kvm_trace::KvmTrace::start(&vm_state.vm_id) { Ok(t) => { @@ -869,6 +847,28 @@ pub async fn restore_from_snapshot( "VM resume completed" ); + // Signal fc-agent to flush ARP cache and reconnect output vsock via MMDS. + // MUST be after VM resume — Firecracker accepts PUT /mmds while paused but + // the guest-visible MMDS data isn't updated until after resume. + let restore_epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .context("system time before Unix epoch")? + .as_secs(); + + client + .put_mmds(serde_json::json!({ + "latest": { + "host-time": chrono::Utc::now().timestamp().to_string(), + "restore-epoch": restore_epoch.to_string() + } + })) + .await + .context("updating MMDS with restore-epoch")?; + info!( + restore_epoch = restore_epoch, + "signaled fc-agent to flush ARP via MMDS" + ); + // Stop KVM trace and dump results (captures resume + early VM execution) if let Some(trace) = kvm_trace { // Brief delay to capture initial KVM exits after resume diff --git a/src/commands/podman/listeners.rs b/src/commands/podman/listeners.rs index 53ba4aac..cee62aa8 100644 --- a/src/commands/podman/listeners.rs +++ b/src/commands/podman/listeners.rs @@ -186,78 +186,117 @@ pub(crate) async fn run_output_listener( info!(socket = %socket_path, "Output listener started"); let mut output_lines: Vec<(String, String)> = Vec::new(); - - // Outer loop: accept connections repeatedly. - // Firecracker resets all vsock connections during snapshot creation, so fc-agent - // will reconnect after each snapshot. We must keep accepting new connections. + let mut connection_count: u64 = 0; + let mut lines_read = 0u64; + + // Accept the first connection (no timeout — image import can take 10+ min) + let (initial_stream, _) = match listener.accept().await { + Ok(conn) => conn, + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Error accepting initial output connection"); + let _ = std::fs::remove_file(socket_path); + return Ok(output_lines); + } + }; + connection_count += 1; + debug!(vm_id = %vm_id, connection_count, "Output connection established"); + + let mut reader = BufReader::new(initial_stream); + let mut line_buf = String::new(); + + // Read lines from the current connection. + // fc-agent may reconnect multiple times (snapshot create/restore cycles). + // Always prefer the newest connection — if a new connection arrives while + // reading from the current one, switch to it. The latest connection is + // always the right one because it's from fc-agent's latest vsock connect. loop { - // Accept connection from fc-agent (no timeout - image import can take 10+ min) - let (stream, _) = match listener.accept().await { - Ok(conn) => conn, - Err(e) => { - warn!(vm_id = %vm_id, error = %e, "Error accepting output connection"); - break; - } - }; - - debug!(vm_id = %vm_id, "Output connection established"); - - let mut reader = BufReader::new(stream); - let mut line_buf = String::new(); - - // Read lines until connection closes or snapshot triggers reconnect. - // During snapshot, Firecracker resets vsock but the host-side Unix socket - // stays open (no EOF). The reconnect_notify signals us to drop this - // connection so fc-agent's new vsock connection can be accepted. - loop { - line_buf.clear(); - let read_result = tokio::select! { - result = reader.read_line(&mut line_buf) => result, - _ = reconnect_notify.notified() => { - info!(vm_id = %vm_id, "Snapshot reconnect signal, dropping old connection"); - break; - } - }; - match read_result { - Ok(0) => { - // EOF - connection closed (vsock reset from snapshot, or VM exit) - info!(vm_id = %vm_id, "Output connection closed, waiting for reconnect"); - break; - } - Ok(_) => { - // Parse raw line format: stream:content - let line = line_buf.trim_end(); - if let Some((stream, content)) = line.split_once(':') { - if stream == "heartbeat" { - // Heartbeat from fc-agent during long operations (image import/pull) - info!(vm_id = %vm_id, phase = %content, "VM heartbeat"); - } else { - // Print container output directly (stdout to stdout, stderr to stderr) - // No prefix - clean output for scripting - if stream == "stdout" { - println!("{}", content); - } else { - eprintln!("{}", content); + line_buf.clear(); + + // Race: read data vs new connection vs reconnect signal + tokio::select! { + result = reader.read_line(&mut line_buf) => { + match result { + Ok(0) => { + // EOF — connection closed. Wait for next. + info!(vm_id = %vm_id, lines_read, "Output connection EOF"); + match listener.accept().await { + Ok((s, _)) => { + connection_count += 1; + lines_read = 0; + debug!(vm_id = %vm_id, connection_count, "Output connection established (after EOF)"); + reader = BufReader::new(s); + continue; + } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Accept failed after EOF"); + break; } - output_lines.push((stream.to_string(), content.to_string())); } - - // Forward to broadcast channel for library consumers - if let Some(ref tx) = log_tx { - let _ = tx.send(LogLine { - stream: stream.to_string(), - content: content.to_string(), - }); + } + Ok(n) => { + lines_read += 1; + if lines_read <= 3 || lines_read % 1000 == 0 { + debug!(vm_id = %vm_id, lines_read, bytes = n, "Output line received"); + } + let line = line_buf.trim_end(); + if let Some((stream, content)) = line.split_once(':') { + if stream == "heartbeat" { + info!(vm_id = %vm_id, phase = %content, "VM heartbeat"); + } else { + if stream == "stdout" { + println!("{}", content); + } else { + eprintln!("{}", content); + } + output_lines.push((stream.to_string(), content.to_string())); + } + if let Some(ref tx) = log_tx { + let _ = tx.send(LogLine { + stream: stream.to_string(), + content: content.to_string(), + }); + } } } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Read error on output connection"); + break; + } } - Err(e) => { - warn!(vm_id = %vm_id, error = %e, "Error reading output, waiting for reconnect"); - break; + } + accept = listener.accept() => { + // New connection arrived while reading — switch to it. + // The latest connection is always from fc-agent's latest reconnect. + match accept { + Ok((new_stream, _)) => { + connection_count += 1; + info!(vm_id = %vm_id, connection_count, lines_read, "Switching to newer output connection"); + reader = BufReader::new(new_stream); + lines_read = 0; + } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Accept failed for new connection"); + break; + } + } + } + _ = reconnect_notify.notified() => { + info!(vm_id = %vm_id, lines_read, "Reconnect signal, waiting for new connection"); + match listener.accept().await { + Ok((s, _)) => { + connection_count += 1; + lines_read = 0; + debug!(vm_id = %vm_id, connection_count, "Output connection established (after signal)"); + reader = BufReader::new(s); + } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Accept failed after signal"); + break; + } } } } - } // outer accept loop + } // Clean up let _ = std::fs::remove_file(socket_path); diff --git a/src/commands/snapshot.rs b/src/commands/snapshot.rs index 1f4cdbc5..a664a135 100644 --- a/src/commands/snapshot.rs +++ b/src/commands/snapshot.rs @@ -896,11 +896,14 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { let (mut vm_manager, mut holder_child) = setup_result.unwrap(); - // Signal the output listener to drop its old (dead) vsock stream and re-accept. - // restore_from_snapshot() resumes the VM which resets all vsock connections. - // fc-agent will reconnect on the new vsock, but the host-side listener is stuck - // reading from the old dead stream unless we notify it to cycle back to accept(). - output_reconnect.notify_one(); + // For startup snapshots (container already running), the output listener has an + // active connection from fc-agent that's now dead after VM resume. Signal it to + // drop the dead stream and re-accept. For pre-start snapshots (container not yet + // started), the listener is fresh with no connection — DON'T notify, or the + // stored permit will poison the first real connection by dropping it immediately. + if args.startup_snapshot_base_key.is_some() { + output_reconnect.notify_one(); + } let is_uffd = use_uffd || std::env::var("FCVM_FORCE_UFFD").is_ok() || hugepages; if is_uffd { diff --git a/tests/test_output_after_restore.rs b/tests/test_output_after_restore.rs index 20f19061..8ad9856f 100644 --- a/tests/test_output_after_restore.rs +++ b/tests/test_output_after_restore.rs @@ -43,16 +43,22 @@ fn setup_fuse_mounts(n: usize) -> (Vec, Vec, std::path::PathBuf) } /// Build the fcvm args (identical for cold and warm — required for snapshot key match) -fn build_fcvm_args<'a>(vm_name: &'a str, map_args: &'a [String], cmd: &'a str) -> Vec<&'a str> { +fn build_fcvm_args<'a>( + vm_name: &'a str, + map_args: &'a [String], + cmd: &'a str, + user_spec: &'a str, +) -> Vec<&'a str> { let mut args: Vec<&str> = vec![ "podman", "run", "--name", vm_name, - "--network", - "bridged", "--kernel-profile", "btrfs", + "--user", + user_spec, + "--privileged", ]; for m in map_args { args.push("--map"); @@ -87,17 +93,16 @@ async fn test_heavy_output_after_snapshot_restore() -> Result<()> { .map(|p| format!("cat {}/*.txt >/dev/null 2>&1; ", p)) .collect(); let pad = "x".repeat(200); // long lines like falcon_proxy - // Phase 1: burst 5000 lines to both stdout and stderr (like service startup) - // Phase 2: continuous loop with FUSE reads + output - // Burst 5000 lines (like falcon_proxy startup dump), then continuous counter. - // The burst must drain — if conmon is broken after snapshot restore, this hangs. let cmd = format!( "b=0; while [ $b -lt 5000 ]; do echo \"BURST:$b {pad}\"; echo \"BURST_ERR:$b {pad}\" >&2; b=$((b+1)); done; i=0; while true; do {reads}echo \"COUNT:$i {pad}\"; echo \"ERR:$i {pad}\" >&2; i=$((i+1)); done", reads = reads, pad = pad, ); - let fcvm_args = build_fcvm_args(&vm_name, &map_args, &cmd); + let uid = std::env::var("SUDO_UID").unwrap_or_else(|_| nix::unistd::getuid().to_string()); + let gid = std::env::var("SUDO_GID").unwrap_or_else(|_| nix::unistd::getgid().to_string()); + let user_spec = format!("{}:{}", uid, gid); + let fcvm_args = build_fcvm_args(&vm_name, &map_args, &cmd, &user_spec); // Phase 1: Cold boot println!("Phase 1: Cold boot with 13 FUSE mounts..."); @@ -181,6 +186,47 @@ async fn test_heavy_output_after_snapshot_restore() -> Result<()> { "warm start did NOT use snapshot — test is invalid" ); + // Verify container output ACTUALLY reaches host on warm start. + // Without the output listener fix, the listener stays stuck on a stale vsock + // connection while fc-agent writes to a newer one. + let warm_log_name = format!("{}-warm", vm_name); + let mut found_container_output = false; + let mut output_line_count = 0; + if let Ok(entries) = std::fs::read_dir(log_dir) { + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if name.contains(&warm_log_name) { + let content = std::fs::read_to_string(entry.path()).unwrap_or_default(); + for line in content.lines() { + let is_container_output = (line.contains("COUNT:") || line.contains("BURST:")) + && !line.contains("args=") + && !line.contains("Spawned"); + if is_container_output { + output_line_count += 1; + if !found_container_output { + println!( + " First container output: {}", + line.trim().chars().take(100).collect::() + ); + found_container_output = true; + } + } + } + if found_container_output { + println!( + " Container output lines in warm log: {}", + output_line_count + ); + } + } + } + } + assert!( + found_container_output, + "No container output (COUNT:/BURST:) in warm start logs — \ + output pipeline broken after snapshot restore" + ); + // Cleanup println!(" Stopping VM..."); common::kill_process(fcvm_pid2).await; From 36b7643a538b07944380db782a1e912b1ea114f0 Mon Sep 17 00:00:00 2001 From: "claude[bot]" Date: Thu, 19 Feb 2026 12:41:53 +0000 Subject: [PATCH 2/2] fix: update DESIGN.md snapshot restore flow to reflect post-resume MMDS ordering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move "Update MMDS with new config" from Identity Patching (step 3) to a new step 6 after Resume VM (step 5), matching the code change in restore_from_snapshot() where put_mmds was moved after patch_vm_state("Resumed"). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- DESIGN.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESIGN.md b/DESIGN.md index 4ae9f0cb..488ab500 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -381,12 +381,12 @@ Each VM has: 3. Patch identity (MAC address, hostname, VM ID) 4. Setup new networking (TAP device, ports) 5. Resume VM +6. Update MMDS with new config (must be after resume — guest-visible MMDS data isn't updated until VM is running) **Identity Patching**: - Generate new MAC address - Update hostname in guest - Regenerate machine IDs -- Update MMDS with new config ---