diff --git a/DESIGN.md b/DESIGN.md index 4ae9f0cb..488ab500 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -381,12 +381,12 @@ Each VM has: 3. Patch identity (MAC address, hostname, VM ID) 4. Setup new networking (TAP device, ports) 5. Resume VM +6. Update MMDS with new config (must be after resume — guest-visible MMDS data isn't updated until VM is running) **Identity Patching**: - Generate new MAC address - Update hostname in guest - Regenerate machine IDs -- Update MMDS with new config --- diff --git a/src/commands/common.rs b/src/commands/common.rs index 4cc40511..929169d9 100644 --- a/src/commands/common.rs +++ b/src/commands/common.rs @@ -816,29 +816,7 @@ pub async fn restore_from_snapshot( "disk patch completed" ); - // Signal fc-agent to flush ARP cache via MMDS restore-epoch update - let restore_epoch = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .context("system time before Unix epoch")? - .as_secs(); - - client - .put_mmds(serde_json::json!({ - "latest": { - "host-time": chrono::Utc::now().timestamp().to_string(), - "restore-epoch": restore_epoch.to_string() - } - })) - .await - .context("updating MMDS with restore-epoch")?; - info!( - restore_epoch = restore_epoch, - "signaled fc-agent to flush ARP via MMDS" - ); - // FCVM_KVM_TRACE: enable KVM ftrace around VM resume for debugging snapshot restore. - // Captures KVM exit reasons (NPF, shutdown, etc.) to /tmp/fcvm-kvm-trace-{vm_id}.log. - // Requires: sudo access (ftrace needs debugfs). Safe to set without sudo — just skips. let kvm_trace = if std::env::var("FCVM_KVM_TRACE").is_ok() { match crate::kvm_trace::KvmTrace::start(&vm_state.vm_id) { Ok(t) => { @@ -869,6 +847,28 @@ pub async fn restore_from_snapshot( "VM resume completed" ); + // Signal fc-agent to flush ARP cache and reconnect output vsock via MMDS. + // MUST be after VM resume — Firecracker accepts PUT /mmds while paused but + // the guest-visible MMDS data isn't updated until after resume. + let restore_epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .context("system time before Unix epoch")? + .as_secs(); + + client + .put_mmds(serde_json::json!({ + "latest": { + "host-time": chrono::Utc::now().timestamp().to_string(), + "restore-epoch": restore_epoch.to_string() + } + })) + .await + .context("updating MMDS with restore-epoch")?; + info!( + restore_epoch = restore_epoch, + "signaled fc-agent to flush ARP via MMDS" + ); + // Stop KVM trace and dump results (captures resume + early VM execution) if let Some(trace) = kvm_trace { // Brief delay to capture initial KVM exits after resume diff --git a/src/commands/podman/listeners.rs b/src/commands/podman/listeners.rs index 53ba4aac..cee62aa8 100644 --- a/src/commands/podman/listeners.rs +++ b/src/commands/podman/listeners.rs @@ -186,78 +186,117 @@ pub(crate) async fn run_output_listener( info!(socket = %socket_path, "Output listener started"); let mut output_lines: Vec<(String, String)> = Vec::new(); - - // Outer loop: accept connections repeatedly. - // Firecracker resets all vsock connections during snapshot creation, so fc-agent - // will reconnect after each snapshot. We must keep accepting new connections. + let mut connection_count: u64 = 0; + let mut lines_read = 0u64; + + // Accept the first connection (no timeout — image import can take 10+ min) + let (initial_stream, _) = match listener.accept().await { + Ok(conn) => conn, + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Error accepting initial output connection"); + let _ = std::fs::remove_file(socket_path); + return Ok(output_lines); + } + }; + connection_count += 1; + debug!(vm_id = %vm_id, connection_count, "Output connection established"); + + let mut reader = BufReader::new(initial_stream); + let mut line_buf = String::new(); + + // Read lines from the current connection. + // fc-agent may reconnect multiple times (snapshot create/restore cycles). + // Always prefer the newest connection — if a new connection arrives while + // reading from the current one, switch to it. The latest connection is + // always the right one because it's from fc-agent's latest vsock connect. loop { - // Accept connection from fc-agent (no timeout - image import can take 10+ min) - let (stream, _) = match listener.accept().await { - Ok(conn) => conn, - Err(e) => { - warn!(vm_id = %vm_id, error = %e, "Error accepting output connection"); - break; - } - }; - - debug!(vm_id = %vm_id, "Output connection established"); - - let mut reader = BufReader::new(stream); - let mut line_buf = String::new(); - - // Read lines until connection closes or snapshot triggers reconnect. - // During snapshot, Firecracker resets vsock but the host-side Unix socket - // stays open (no EOF). The reconnect_notify signals us to drop this - // connection so fc-agent's new vsock connection can be accepted. - loop { - line_buf.clear(); - let read_result = tokio::select! { - result = reader.read_line(&mut line_buf) => result, - _ = reconnect_notify.notified() => { - info!(vm_id = %vm_id, "Snapshot reconnect signal, dropping old connection"); - break; - } - }; - match read_result { - Ok(0) => { - // EOF - connection closed (vsock reset from snapshot, or VM exit) - info!(vm_id = %vm_id, "Output connection closed, waiting for reconnect"); - break; - } - Ok(_) => { - // Parse raw line format: stream:content - let line = line_buf.trim_end(); - if let Some((stream, content)) = line.split_once(':') { - if stream == "heartbeat" { - // Heartbeat from fc-agent during long operations (image import/pull) - info!(vm_id = %vm_id, phase = %content, "VM heartbeat"); - } else { - // Print container output directly (stdout to stdout, stderr to stderr) - // No prefix - clean output for scripting - if stream == "stdout" { - println!("{}", content); - } else { - eprintln!("{}", content); + line_buf.clear(); + + // Race: read data vs new connection vs reconnect signal + tokio::select! { + result = reader.read_line(&mut line_buf) => { + match result { + Ok(0) => { + // EOF — connection closed. Wait for next. + info!(vm_id = %vm_id, lines_read, "Output connection EOF"); + match listener.accept().await { + Ok((s, _)) => { + connection_count += 1; + lines_read = 0; + debug!(vm_id = %vm_id, connection_count, "Output connection established (after EOF)"); + reader = BufReader::new(s); + continue; + } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Accept failed after EOF"); + break; } - output_lines.push((stream.to_string(), content.to_string())); } - - // Forward to broadcast channel for library consumers - if let Some(ref tx) = log_tx { - let _ = tx.send(LogLine { - stream: stream.to_string(), - content: content.to_string(), - }); + } + Ok(n) => { + lines_read += 1; + if lines_read <= 3 || lines_read % 1000 == 0 { + debug!(vm_id = %vm_id, lines_read, bytes = n, "Output line received"); + } + let line = line_buf.trim_end(); + if let Some((stream, content)) = line.split_once(':') { + if stream == "heartbeat" { + info!(vm_id = %vm_id, phase = %content, "VM heartbeat"); + } else { + if stream == "stdout" { + println!("{}", content); + } else { + eprintln!("{}", content); + } + output_lines.push((stream.to_string(), content.to_string())); + } + if let Some(ref tx) = log_tx { + let _ = tx.send(LogLine { + stream: stream.to_string(), + content: content.to_string(), + }); + } } } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Read error on output connection"); + break; + } } - Err(e) => { - warn!(vm_id = %vm_id, error = %e, "Error reading output, waiting for reconnect"); - break; + } + accept = listener.accept() => { + // New connection arrived while reading — switch to it. + // The latest connection is always from fc-agent's latest reconnect. + match accept { + Ok((new_stream, _)) => { + connection_count += 1; + info!(vm_id = %vm_id, connection_count, lines_read, "Switching to newer output connection"); + reader = BufReader::new(new_stream); + lines_read = 0; + } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Accept failed for new connection"); + break; + } + } + } + _ = reconnect_notify.notified() => { + info!(vm_id = %vm_id, lines_read, "Reconnect signal, waiting for new connection"); + match listener.accept().await { + Ok((s, _)) => { + connection_count += 1; + lines_read = 0; + debug!(vm_id = %vm_id, connection_count, "Output connection established (after signal)"); + reader = BufReader::new(s); + } + Err(e) => { + warn!(vm_id = %vm_id, error = %e, "Accept failed after signal"); + break; + } } } } - } // outer accept loop + } // Clean up let _ = std::fs::remove_file(socket_path); diff --git a/src/commands/snapshot.rs b/src/commands/snapshot.rs index 1f4cdbc5..a664a135 100644 --- a/src/commands/snapshot.rs +++ b/src/commands/snapshot.rs @@ -896,11 +896,14 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { let (mut vm_manager, mut holder_child) = setup_result.unwrap(); - // Signal the output listener to drop its old (dead) vsock stream and re-accept. - // restore_from_snapshot() resumes the VM which resets all vsock connections. - // fc-agent will reconnect on the new vsock, but the host-side listener is stuck - // reading from the old dead stream unless we notify it to cycle back to accept(). - output_reconnect.notify_one(); + // For startup snapshots (container already running), the output listener has an + // active connection from fc-agent that's now dead after VM resume. Signal it to + // drop the dead stream and re-accept. For pre-start snapshots (container not yet + // started), the listener is fresh with no connection — DON'T notify, or the + // stored permit will poison the first real connection by dropping it immediately. + if args.startup_snapshot_base_key.is_some() { + output_reconnect.notify_one(); + } let is_uffd = use_uffd || std::env::var("FCVM_FORCE_UFFD").is_ok() || hugepages; if is_uffd { diff --git a/tests/test_output_after_restore.rs b/tests/test_output_after_restore.rs index 20f19061..8ad9856f 100644 --- a/tests/test_output_after_restore.rs +++ b/tests/test_output_after_restore.rs @@ -43,16 +43,22 @@ fn setup_fuse_mounts(n: usize) -> (Vec, Vec, std::path::PathBuf) } /// Build the fcvm args (identical for cold and warm — required for snapshot key match) -fn build_fcvm_args<'a>(vm_name: &'a str, map_args: &'a [String], cmd: &'a str) -> Vec<&'a str> { +fn build_fcvm_args<'a>( + vm_name: &'a str, + map_args: &'a [String], + cmd: &'a str, + user_spec: &'a str, +) -> Vec<&'a str> { let mut args: Vec<&str> = vec![ "podman", "run", "--name", vm_name, - "--network", - "bridged", "--kernel-profile", "btrfs", + "--user", + user_spec, + "--privileged", ]; for m in map_args { args.push("--map"); @@ -87,17 +93,16 @@ async fn test_heavy_output_after_snapshot_restore() -> Result<()> { .map(|p| format!("cat {}/*.txt >/dev/null 2>&1; ", p)) .collect(); let pad = "x".repeat(200); // long lines like falcon_proxy - // Phase 1: burst 5000 lines to both stdout and stderr (like service startup) - // Phase 2: continuous loop with FUSE reads + output - // Burst 5000 lines (like falcon_proxy startup dump), then continuous counter. - // The burst must drain — if conmon is broken after snapshot restore, this hangs. let cmd = format!( "b=0; while [ $b -lt 5000 ]; do echo \"BURST:$b {pad}\"; echo \"BURST_ERR:$b {pad}\" >&2; b=$((b+1)); done; i=0; while true; do {reads}echo \"COUNT:$i {pad}\"; echo \"ERR:$i {pad}\" >&2; i=$((i+1)); done", reads = reads, pad = pad, ); - let fcvm_args = build_fcvm_args(&vm_name, &map_args, &cmd); + let uid = std::env::var("SUDO_UID").unwrap_or_else(|_| nix::unistd::getuid().to_string()); + let gid = std::env::var("SUDO_GID").unwrap_or_else(|_| nix::unistd::getgid().to_string()); + let user_spec = format!("{}:{}", uid, gid); + let fcvm_args = build_fcvm_args(&vm_name, &map_args, &cmd, &user_spec); // Phase 1: Cold boot println!("Phase 1: Cold boot with 13 FUSE mounts..."); @@ -181,6 +186,47 @@ async fn test_heavy_output_after_snapshot_restore() -> Result<()> { "warm start did NOT use snapshot — test is invalid" ); + // Verify container output ACTUALLY reaches host on warm start. + // Without the output listener fix, the listener stays stuck on a stale vsock + // connection while fc-agent writes to a newer one. + let warm_log_name = format!("{}-warm", vm_name); + let mut found_container_output = false; + let mut output_line_count = 0; + if let Ok(entries) = std::fs::read_dir(log_dir) { + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if name.contains(&warm_log_name) { + let content = std::fs::read_to_string(entry.path()).unwrap_or_default(); + for line in content.lines() { + let is_container_output = (line.contains("COUNT:") || line.contains("BURST:")) + && !line.contains("args=") + && !line.contains("Spawned"); + if is_container_output { + output_line_count += 1; + if !found_container_output { + println!( + " First container output: {}", + line.trim().chars().take(100).collect::() + ); + found_container_output = true; + } + } + } + if found_container_output { + println!( + " Container output lines in warm log: {}", + output_line_count + ); + } + } + } + } + assert!( + found_container_output, + "No container output (COUNT:/BURST:) in warm start logs — \ + output pipeline broken after snapshot restore" + ); + // Cleanup println!(" Stopping VM..."); common::kill_process(fcvm_pid2).await;