Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ Each VM has:
3. Patch identity (MAC address, hostname, VM ID)
4. Setup new networking (TAP device, ports)
5. Resume VM
6. Update MMDS with new config (must be after resume — guest-visible MMDS data isn't updated until VM is running)

**Identity Patching**:
- Generate new MAC address
- Update hostname in guest
- Regenerate machine IDs
- Update MMDS with new config

---

Expand Down
44 changes: 22 additions & 22 deletions src/commands/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,29 +816,7 @@ pub async fn restore_from_snapshot(
"disk patch completed"
);

// Signal fc-agent to flush ARP cache via MMDS restore-epoch update
let restore_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.context("system time before Unix epoch")?
.as_secs();

client
.put_mmds(serde_json::json!({
"latest": {
"host-time": chrono::Utc::now().timestamp().to_string(),
"restore-epoch": restore_epoch.to_string()
}
}))
.await
.context("updating MMDS with restore-epoch")?;
info!(
restore_epoch = restore_epoch,
"signaled fc-agent to flush ARP via MMDS"
);

// FCVM_KVM_TRACE: enable KVM ftrace around VM resume for debugging snapshot restore.
// Captures KVM exit reasons (NPF, shutdown, etc.) to /tmp/fcvm-kvm-trace-{vm_id}.log.
// Requires: sudo access (ftrace needs debugfs). Safe to set without sudo — just skips.
let kvm_trace = if std::env::var("FCVM_KVM_TRACE").is_ok() {
match crate::kvm_trace::KvmTrace::start(&vm_state.vm_id) {
Ok(t) => {
Expand Down Expand Up @@ -869,6 +847,28 @@ pub async fn restore_from_snapshot(
"VM resume completed"
);

// Signal fc-agent to flush ARP cache and reconnect output vsock via MMDS.
// MUST be after VM resume — Firecracker accepts PUT /mmds while paused but
// the guest-visible MMDS data isn't updated until after resume.
let restore_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.context("system time before Unix epoch")?
.as_secs();

client
.put_mmds(serde_json::json!({
"latest": {
"host-time": chrono::Utc::now().timestamp().to_string(),
"restore-epoch": restore_epoch.to_string()
}
}))
.await
.context("updating MMDS with restore-epoch")?;
info!(
restore_epoch = restore_epoch,
"signaled fc-agent to flush ARP via MMDS"
);

// Stop KVM trace and dump results (captures resume + early VM execution)
if let Some(trace) = kvm_trace {
// Brief delay to capture initial KVM exits after resume
Expand Down
165 changes: 102 additions & 63 deletions src/commands/podman/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,78 +186,117 @@ pub(crate) async fn run_output_listener(
info!(socket = %socket_path, "Output listener started");

let mut output_lines: Vec<(String, String)> = Vec::new();

// Outer loop: accept connections repeatedly.
// Firecracker resets all vsock connections during snapshot creation, so fc-agent
// will reconnect after each snapshot. We must keep accepting new connections.
let mut connection_count: u64 = 0;
let mut lines_read = 0u64;

// Accept the first connection (no timeout — image import can take 10+ min)
let (initial_stream, _) = match listener.accept().await {
Ok(conn) => conn,
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Error accepting initial output connection");
let _ = std::fs::remove_file(socket_path);
return Ok(output_lines);
}
};
connection_count += 1;
debug!(vm_id = %vm_id, connection_count, "Output connection established");

let mut reader = BufReader::new(initial_stream);
let mut line_buf = String::new();

// Read lines from the current connection.
// fc-agent may reconnect multiple times (snapshot create/restore cycles).
// Always prefer the newest connection — if a new connection arrives while
// reading from the current one, switch to it. The latest connection is
// always the right one because it's from fc-agent's latest vsock connect.
loop {
// Accept connection from fc-agent (no timeout - image import can take 10+ min)
let (stream, _) = match listener.accept().await {
Ok(conn) => conn,
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Error accepting output connection");
break;
}
};

debug!(vm_id = %vm_id, "Output connection established");

let mut reader = BufReader::new(stream);
let mut line_buf = String::new();

// Read lines until connection closes or snapshot triggers reconnect.
// During snapshot, Firecracker resets vsock but the host-side Unix socket
// stays open (no EOF). The reconnect_notify signals us to drop this
// connection so fc-agent's new vsock connection can be accepted.
loop {
line_buf.clear();
let read_result = tokio::select! {
result = reader.read_line(&mut line_buf) => result,
_ = reconnect_notify.notified() => {
info!(vm_id = %vm_id, "Snapshot reconnect signal, dropping old connection");
break;
}
};
match read_result {
Ok(0) => {
// EOF - connection closed (vsock reset from snapshot, or VM exit)
info!(vm_id = %vm_id, "Output connection closed, waiting for reconnect");
break;
}
Ok(_) => {
// Parse raw line format: stream:content
let line = line_buf.trim_end();
if let Some((stream, content)) = line.split_once(':') {
if stream == "heartbeat" {
// Heartbeat from fc-agent during long operations (image import/pull)
info!(vm_id = %vm_id, phase = %content, "VM heartbeat");
} else {
// Print container output directly (stdout to stdout, stderr to stderr)
// No prefix - clean output for scripting
if stream == "stdout" {
println!("{}", content);
} else {
eprintln!("{}", content);
line_buf.clear();

// Race: read data vs new connection vs reconnect signal
tokio::select! {
result = reader.read_line(&mut line_buf) => {
match result {
Ok(0) => {
// EOF — connection closed. Wait for next.
info!(vm_id = %vm_id, lines_read, "Output connection EOF");
match listener.accept().await {
Ok((s, _)) => {
connection_count += 1;
lines_read = 0;
debug!(vm_id = %vm_id, connection_count, "Output connection established (after EOF)");
reader = BufReader::new(s);
continue;
}
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Accept failed after EOF");
break;
}
output_lines.push((stream.to_string(), content.to_string()));
}

// Forward to broadcast channel for library consumers
if let Some(ref tx) = log_tx {
let _ = tx.send(LogLine {
stream: stream.to_string(),
content: content.to_string(),
});
}
Ok(n) => {
lines_read += 1;
if lines_read <= 3 || lines_read % 1000 == 0 {
debug!(vm_id = %vm_id, lines_read, bytes = n, "Output line received");
}
let line = line_buf.trim_end();
if let Some((stream, content)) = line.split_once(':') {
if stream == "heartbeat" {
info!(vm_id = %vm_id, phase = %content, "VM heartbeat");
} else {
if stream == "stdout" {
println!("{}", content);
} else {
eprintln!("{}", content);
}
output_lines.push((stream.to_string(), content.to_string()));
}
if let Some(ref tx) = log_tx {
let _ = tx.send(LogLine {
stream: stream.to_string(),
content: content.to_string(),
});
}
}
}
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Read error on output connection");
break;
}
}
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Error reading output, waiting for reconnect");
break;
}
accept = listener.accept() => {
// New connection arrived while reading — switch to it.
// The latest connection is always from fc-agent's latest reconnect.
match accept {
Ok((new_stream, _)) => {
connection_count += 1;
info!(vm_id = %vm_id, connection_count, lines_read, "Switching to newer output connection");
reader = BufReader::new(new_stream);
lines_read = 0;
}
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Accept failed for new connection");
break;
}
}
}
_ = reconnect_notify.notified() => {
info!(vm_id = %vm_id, lines_read, "Reconnect signal, waiting for new connection");
match listener.accept().await {
Ok((s, _)) => {
connection_count += 1;
lines_read = 0;
debug!(vm_id = %vm_id, connection_count, "Output connection established (after signal)");
reader = BufReader::new(s);
}
Err(e) => {
warn!(vm_id = %vm_id, error = %e, "Accept failed after signal");
break;
}
}
}
}
} // outer accept loop
}

// Clean up
let _ = std::fs::remove_file(socket_path);
Expand Down
13 changes: 8 additions & 5 deletions src/commands/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,11 +896,14 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> {

let (mut vm_manager, mut holder_child) = setup_result.unwrap();

// Signal the output listener to drop its old (dead) vsock stream and re-accept.
// restore_from_snapshot() resumes the VM which resets all vsock connections.
// fc-agent will reconnect on the new vsock, but the host-side listener is stuck
// reading from the old dead stream unless we notify it to cycle back to accept().
output_reconnect.notify_one();
// For startup snapshots (container already running), the output listener has an
// active connection from fc-agent that's now dead after VM resume. Signal it to
// drop the dead stream and re-accept. For pre-start snapshots (container not yet
// started), the listener is fresh with no connection — DON'T notify, or the
// stored permit will poison the first real connection by dropping it immediately.
if args.startup_snapshot_base_key.is_some() {
output_reconnect.notify_one();
}

let is_uffd = use_uffd || std::env::var("FCVM_FORCE_UFFD").is_ok() || hugepages;
if is_uffd {
Expand Down
62 changes: 54 additions & 8 deletions tests/test_output_after_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,22 @@ fn setup_fuse_mounts(n: usize) -> (Vec<String>, Vec<String>, std::path::PathBuf)
}

/// Build the fcvm args (identical for cold and warm — required for snapshot key match)
fn build_fcvm_args<'a>(vm_name: &'a str, map_args: &'a [String], cmd: &'a str) -> Vec<&'a str> {
fn build_fcvm_args<'a>(
vm_name: &'a str,
map_args: &'a [String],
cmd: &'a str,
user_spec: &'a str,
) -> Vec<&'a str> {
let mut args: Vec<&str> = vec![
"podman",
"run",
"--name",
vm_name,
"--network",
"bridged",
"--kernel-profile",
"btrfs",
"--user",
user_spec,
"--privileged",
];
for m in map_args {
args.push("--map");
Expand Down Expand Up @@ -87,17 +93,16 @@ async fn test_heavy_output_after_snapshot_restore() -> Result<()> {
.map(|p| format!("cat {}/*.txt >/dev/null 2>&1; ", p))
.collect();
let pad = "x".repeat(200); // long lines like falcon_proxy
// Phase 1: burst 5000 lines to both stdout and stderr (like service startup)
// Phase 2: continuous loop with FUSE reads + output
// Burst 5000 lines (like falcon_proxy startup dump), then continuous counter.
// The burst must drain — if conmon is broken after snapshot restore, this hangs.
let cmd = format!(
"b=0; while [ $b -lt 5000 ]; do echo \"BURST:$b {pad}\"; echo \"BURST_ERR:$b {pad}\" >&2; b=$((b+1)); done; i=0; while true; do {reads}echo \"COUNT:$i {pad}\"; echo \"ERR:$i {pad}\" >&2; i=$((i+1)); done",
reads = reads,
pad = pad,
);

let fcvm_args = build_fcvm_args(&vm_name, &map_args, &cmd);
let uid = std::env::var("SUDO_UID").unwrap_or_else(|_| nix::unistd::getuid().to_string());
let gid = std::env::var("SUDO_GID").unwrap_or_else(|_| nix::unistd::getgid().to_string());
let user_spec = format!("{}:{}", uid, gid);
let fcvm_args = build_fcvm_args(&vm_name, &map_args, &cmd, &user_spec);

// Phase 1: Cold boot
println!("Phase 1: Cold boot with 13 FUSE mounts...");
Expand Down Expand Up @@ -181,6 +186,47 @@ async fn test_heavy_output_after_snapshot_restore() -> Result<()> {
"warm start did NOT use snapshot — test is invalid"
);

// Verify container output ACTUALLY reaches host on warm start.
// Without the output listener fix, the listener stays stuck on a stale vsock
// connection while fc-agent writes to a newer one.
let warm_log_name = format!("{}-warm", vm_name);
let mut found_container_output = false;
let mut output_line_count = 0;
if let Ok(entries) = std::fs::read_dir(log_dir) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.contains(&warm_log_name) {
let content = std::fs::read_to_string(entry.path()).unwrap_or_default();
for line in content.lines() {
let is_container_output = (line.contains("COUNT:") || line.contains("BURST:"))
&& !line.contains("args=")
&& !line.contains("Spawned");
if is_container_output {
output_line_count += 1;
if !found_container_output {
println!(
" First container output: {}",
line.trim().chars().take(100).collect::<String>()
);
found_container_output = true;
}
}
}
if found_container_output {
println!(
" Container output lines in warm log: {}",
output_line_count
);
}
}
}
}
assert!(
found_container_output,
"No container output (COUNT:/BURST:) in warm start logs — \
output pipeline broken after snapshot restore"
);

// Cleanup
println!(" Stopping VM...");
common::kill_process(fcvm_pid2).await;
Expand Down
Loading