Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/state/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ impl StateManager {
.open(&lock_file)
.context("opening lock file")?;

// Acquire exclusive lock (blocks if another process has lock)
// Acquire exclusive lock (blocks if another process has lock).
// NOTE: Flock::lock() is technically blocking I/O in an async context, but
// the lock is held for microseconds with near-zero contention (only this
// process writes its own state file). Using spawn_blocking would add more
// overhead than the lock itself. If contention becomes an issue, switch to
// FlockArg::LockExclusiveNonblock with retry + tokio::task::yield_now().
use nix::fcntl::{Flock, FlockArg};
let flock = Flock::lock(lock_fd, FlockArg::LockExclusive)
.map_err(|(_, err)| err)
Expand Down
100 changes: 55 additions & 45 deletions src/uffd/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use tokio::net::UnixListener;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -109,47 +110,41 @@ impl UffdServer {
Ok((stream, _)) => {
let vm_id = format!("vm-{}", next_vm_id);
next_vm_id += 1;
let mmap = Arc::clone(&self.mmap);

info!(target: "uffd", vm_id = %vm_id, "new VM connection");

// Convert tokio UnixStream to std UnixStream for SCM_RIGHTS
// IMPORTANT: tokio sockets are non-blocking, but recv_with_fd needs
// blocking mode to wait for Firecracker to send the UFFD fd.
// Without this, recvmsg returns EAGAIN immediately if data isn't ready.
let mut std_stream = stream.into_std()
.context("converting to std stream")?;
std_stream.set_nonblocking(false)
.context("setting socket to blocking mode")?;

// Receive UFFD and mappings for this VM
match receive_uffd_and_mappings(&mut std_stream) {
Ok((uffd, mappings)) => {
info!(
target: "uffd",
vm_id = %vm_id,
regions = mappings.len(),
"received UFFD with {} memory regions",
mappings.len()
);

// Spawn task to handle this VM's page faults
let mmap = Arc::clone(&self.mmap);
let vm_id_clone = vm_id.clone();
vm_tasks.spawn(async move {
match handle_vm_page_faults(vm_id_clone.clone(), uffd, mappings, mmap).await {
Ok(()) => info!(target: "uffd", vm_id = %vm_id_clone, "VM handler exited cleanly"),
Err(e) => error!(target: "uffd", vm_id = %vm_id_clone, error = ?e, "VM handler error"),
// Spawn per-connection task so the accept loop returns
// immediately — no blocking on slow/misbehaving clones.
vm_tasks.spawn(async move {
match tokio::time::timeout(
Duration::from_secs(30),
receive_uffd_async(stream),
).await {
Ok(Ok((uffd, mappings))) => {
info!(
target: "uffd",
vm_id = %vm_id,
regions = mappings.len(),
"received UFFD with {} memory regions",
mappings.len()
);
match handle_vm_page_faults(vm_id.clone(), uffd, mappings, mmap).await {
Ok(()) => info!(target: "uffd", vm_id = %vm_id, "VM handler exited cleanly"),
Err(e) => error!(target: "uffd", vm_id = %vm_id, error = ?e, "VM handler error"),
}
vm_id_clone
});

info!(target: "uffd", active_vms = vm_tasks.len(), "VM connected");
}
Err(e) => {
// Log full error chain for debugging (includes syscall errors)
error!(target: "uffd", vm_id = %vm_id, error = ?e, "failed to receive UFFD");
}
Ok(Err(e)) => {
error!(target: "uffd", vm_id = %vm_id, error = ?e, "handshake failed");
}
Err(_) => {
error!(target: "uffd", vm_id = %vm_id, "handshake timed out after 30s");
}
}
}
vm_id
});

info!(target: "uffd", active_vms = vm_tasks.len(), "VM connection spawned");
}
Err(e) => {
error!(target: "uffd", error = %e, "failed to accept connection");
Expand Down Expand Up @@ -440,18 +435,33 @@ impl GuestRegionUffdMapping {
}
}

/// Receive UFFD descriptor and memory mappings from Firecracker over Unix socket
fn receive_uffd_and_mappings(
stream: &mut std::os::unix::net::UnixStream,
/// Receive UFFD descriptor and memory mappings asynchronously using AsyncFd.
///
/// Uses the same AsyncFd pattern as `handle_vm_page_faults` — waits for readability
/// on the Unix socket, then calls `recv_with_fd` which succeeds immediately since
/// data is ready. This avoids blocking the tokio runtime.
async fn receive_uffd_async(
stream: tokio::net::UnixStream,
) -> Result<(Uffd, Vec<GuestRegionUffdMapping>)> {
// Receive message with UFFD file descriptor from Firecracker
let std_stream = stream.into_std().context("converting to std stream")?;
// Keep non-blocking — AsyncFd handles readiness
let async_stream = AsyncFd::new(std_stream).context("creating AsyncFd for handshake socket")?;
Comment on lines +446 to +448
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ensure handshake socket stays nonblocking before AsyncFd

If tokio::net::UnixStream::into_std() returns a blocking socket (Tokio’s documented behavior), AsyncFd::new(std_stream) will wrap a blocking FD and recv_with_fd can block the runtime thread if Firecracker connects but delays sending the UFFD message. In that case the 30s timeout won’t fire because the task is stuck in a blocking syscall, and enough stalled connections can exhaust Tokio worker threads. Consider explicitly setting the std stream to nonblocking (or using from_std on a nonblocking fd) before wrapping in AsyncFd.

Useful? React with 👍 / 👎.

let mut message_buf = vec![0u8; 4096];
let (bytes_read, uffd_fd_opt) = stream
.recv_with_fd(&mut message_buf)
.context("receiving UFFD from Firecracker")?;

let uffd_file =
uffd_fd_opt.ok_or_else(|| anyhow::anyhow!("no UFFD file descriptor received"))?;
// Wait for data to arrive, then recv with fd passing
let (bytes_read, uffd_fd_opt) = loop {
let mut guard = async_stream.readable().await?;
match guard.get_inner().recv_with_fd(&mut message_buf) {
Ok(result) => break result,
Err(e) if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
guard.clear_ready();
continue;
}
Err(e) => return Err(e).context("receiving UFFD from Firecracker"),
}
};

let uffd_file = uffd_fd_opt.ok_or_else(|| anyhow!("no UFFD file descriptor received"))?;

message_buf.resize(bytes_read, 0);

Expand Down