diff --git a/src/state/manager.rs b/src/state/manager.rs index 80efe296..eecefbf5 100644 --- a/src/state/manager.rs +++ b/src/state/manager.rs @@ -72,7 +72,12 @@ impl StateManager { .open(&lock_file) .context("opening lock file")?; - // Acquire exclusive lock (blocks if another process has lock) + // Acquire exclusive lock (blocks if another process has lock). + // NOTE: Flock::lock() is technically blocking I/O in an async context, but + // the lock is held for microseconds with near-zero contention (only this + // process writes its own state file). Using spawn_blocking would add more + // overhead than the lock itself. If contention becomes an issue, switch to + // FlockArg::LockExclusiveNonblock with retry + tokio::task::yield_now(). use nix::fcntl::{Flock, FlockArg}; let flock = Flock::lock(lock_fd, FlockArg::LockExclusive) .map_err(|(_, err)| err) diff --git a/src/uffd/server.rs b/src/uffd/server.rs index f497a445..157bb621 100644 --- a/src/uffd/server.rs +++ b/src/uffd/server.rs @@ -3,6 +3,7 @@ use std::fs::File; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use tokio::io::unix::AsyncFd; use tokio::net::UnixListener; use tokio::task::JoinSet; @@ -109,47 +110,41 @@ impl UffdServer { Ok((stream, _)) => { let vm_id = format!("vm-{}", next_vm_id); next_vm_id += 1; + let mmap = Arc::clone(&self.mmap); info!(target: "uffd", vm_id = %vm_id, "new VM connection"); - // Convert tokio UnixStream to std UnixStream for SCM_RIGHTS - // IMPORTANT: tokio sockets are non-blocking, but recv_with_fd needs - // blocking mode to wait for Firecracker to send the UFFD fd. - // Without this, recvmsg returns EAGAIN immediately if data isn't ready. - let mut std_stream = stream.into_std() - .context("converting to std stream")?; - std_stream.set_nonblocking(false) - .context("setting socket to blocking mode")?; - - // Receive UFFD and mappings for this VM - match receive_uffd_and_mappings(&mut std_stream) { - Ok((uffd, mappings)) => { - info!( - target: "uffd", - vm_id = %vm_id, - regions = mappings.len(), - "received UFFD with {} memory regions", - mappings.len() - ); - - // Spawn task to handle this VM's page faults - let mmap = Arc::clone(&self.mmap); - let vm_id_clone = vm_id.clone(); - vm_tasks.spawn(async move { - match handle_vm_page_faults(vm_id_clone.clone(), uffd, mappings, mmap).await { - Ok(()) => info!(target: "uffd", vm_id = %vm_id_clone, "VM handler exited cleanly"), - Err(e) => error!(target: "uffd", vm_id = %vm_id_clone, error = ?e, "VM handler error"), + // Spawn per-connection task so the accept loop returns + // immediately — no blocking on slow/misbehaving clones. + vm_tasks.spawn(async move { + match tokio::time::timeout( + Duration::from_secs(30), + receive_uffd_async(stream), + ).await { + Ok(Ok((uffd, mappings))) => { + info!( + target: "uffd", + vm_id = %vm_id, + regions = mappings.len(), + "received UFFD with {} memory regions", + mappings.len() + ); + match handle_vm_page_faults(vm_id.clone(), uffd, mappings, mmap).await { + Ok(()) => info!(target: "uffd", vm_id = %vm_id, "VM handler exited cleanly"), + Err(e) => error!(target: "uffd", vm_id = %vm_id, error = ?e, "VM handler error"), } - vm_id_clone - }); - - info!(target: "uffd", active_vms = vm_tasks.len(), "VM connected"); - } - Err(e) => { - // Log full error chain for debugging (includes syscall errors) - error!(target: "uffd", vm_id = %vm_id, error = ?e, "failed to receive UFFD"); + } + Ok(Err(e)) => { + error!(target: "uffd", vm_id = %vm_id, error = ?e, "handshake failed"); + } + Err(_) => { + error!(target: "uffd", vm_id = %vm_id, "handshake timed out after 30s"); + } } - } + vm_id + }); + + info!(target: "uffd", active_vms = vm_tasks.len(), "VM connection spawned"); } Err(e) => { error!(target: "uffd", error = %e, "failed to accept connection"); @@ -440,18 +435,33 @@ impl GuestRegionUffdMapping { } } -/// Receive UFFD descriptor and memory mappings from Firecracker over Unix socket -fn receive_uffd_and_mappings( - stream: &mut std::os::unix::net::UnixStream, +/// Receive UFFD descriptor and memory mappings asynchronously using AsyncFd. +/// +/// Uses the same AsyncFd pattern as `handle_vm_page_faults` — waits for readability +/// on the Unix socket, then calls `recv_with_fd` which succeeds immediately since +/// data is ready. This avoids blocking the tokio runtime. +async fn receive_uffd_async( + stream: tokio::net::UnixStream, ) -> Result<(Uffd, Vec)> { - // Receive message with UFFD file descriptor from Firecracker + let std_stream = stream.into_std().context("converting to std stream")?; + // Keep non-blocking — AsyncFd handles readiness + let async_stream = AsyncFd::new(std_stream).context("creating AsyncFd for handshake socket")?; let mut message_buf = vec![0u8; 4096]; - let (bytes_read, uffd_fd_opt) = stream - .recv_with_fd(&mut message_buf) - .context("receiving UFFD from Firecracker")?; - let uffd_file = - uffd_fd_opt.ok_or_else(|| anyhow::anyhow!("no UFFD file descriptor received"))?; + // Wait for data to arrive, then recv with fd passing + let (bytes_read, uffd_fd_opt) = loop { + let mut guard = async_stream.readable().await?; + match guard.get_inner().recv_with_fd(&mut message_buf) { + Ok(result) => break result, + Err(e) if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => { + guard.clear_ready(); + continue; + } + Err(e) => return Err(e).context("receiving UFFD from Firecracker"), + } + }; + + let uffd_file = uffd_fd_opt.ok_or_else(|| anyhow!("no UFFD file descriptor received"))?; message_buf.resize(bytes_read, 0);