diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index 84d19a9e..32bfe153 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -1,4 +1,223 @@ -//! Willow relay server configuration. +//! # Willow Relay (library) //! -//! The relay binary wraps iroh-relay for NAT traversal and runs a -//! bootstrap gossip node alongside it. +//! Helpers and constants used by the `willow-relay` binary. This is a +//! thin library wrapper so that the bootstrap-handler logic and topic +//! validation can be exercised by integration tests in +//! `crates/relay/tests/`. The actual `main` lives in `src/main.rs`. + +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::sync::Semaphore; +use tracing::{info, warn}; +use willow_network::traits::{GossipEvent, TopicEvents}; +use willow_network::Network; + +/// Maximum concurrent client connections accepted by the bootstrap-id +/// HTTP endpoint. Excess accepts are dropped immediately to prevent +/// FD/memory exhaustion under a connection-flood DoS. +pub const MAX_CONCURRENT_BOOTSTRAP_CONNECTIONS: usize = 1024; + +/// I/O deadline for any single read or write on a bootstrap-id +/// connection. Slow clients (Slowloris) are dropped at this deadline. +pub const BOOTSTRAP_IO_TIMEOUT: Duration = Duration::from_secs(5); + +/// Maximum number of distinct channel topics the topic-announce +/// listener will subscribe to. Once this cap is reached the listener +/// silently drops further unique announces (after a one-shot warn). +pub const MAX_TOPICS: usize = 10_000; + +/// Maximum length, in bytes, of a topic string accepted from a +/// `TopicAnnounce` message. Anything longer is rejected outright. +pub const MAX_TOPIC_LEN: usize = 256; + +/// Returns `true` iff `s` is a syntactically valid channel topic +/// string: non-empty, no longer than [`MAX_TOPIC_LEN`] bytes, and +/// composed entirely of ASCII alphanumerics or `_ / : . -`. +pub fn topic_str_is_valid(s: &str) -> bool { + if s.is_empty() || s.len() > MAX_TOPIC_LEN { + return false; + } + s.chars() + .all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '/' | ':' | '.' | '-')) +} + +/// Serve the bootstrap-id response on a single connection with read +/// and write timeouts. Returns `Ok(())` on a successful exchange and +/// `Err` if either I/O step times out or fails. The HTTP response +/// always carries `Connection: close` so the client knows not to +/// pipeline another request on this socket. +pub async fn handle_bootstrap_connection(mut stream: S, id: &str) -> std::io::Result<()> +where + S: AsyncRead + AsyncWrite + Unpin, +{ + // Drain (best-effort) the request line; we don't actually parse it. + let mut buf = [0u8; 1024]; + tokio::time::timeout(BOOTSTRAP_IO_TIMEOUT, stream.read(&mut buf)) + .await + .map_err(|_| { + std::io::Error::new(std::io::ErrorKind::TimedOut, "bootstrap read timed out") + })??; + + let response = format!( + "HTTP/1.1 200 OK\r\n\ + Access-Control-Allow-Origin: *\r\n\ + Content-Type: text/plain\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n\ + {}", + id.len(), + id + ); + + tokio::time::timeout(BOOTSTRAP_IO_TIMEOUT, stream.write_all(response.as_bytes())) + .await + .map_err(|_| { + std::io::Error::new(std::io::ErrorKind::TimedOut, "bootstrap write timed out") + })??; + + Ok(()) +} + +/// Run the bootstrap-id accept loop on `listener`. Each accepted +/// connection is gated by `semaphore`; if no permit is available the +/// connection is dropped immediately and a warning is logged. +/// +/// This loop runs forever — callers should `tokio::spawn` it. +pub async fn run_bootstrap_listener( + listener: tokio::net::TcpListener, + id: Arc, + semaphore: Arc, +) { + loop { + let (stream, peer) = match listener.accept().await { + Ok(pair) => pair, + Err(e) => { + warn!(%e, "bootstrap accept failed"); + continue; + } + }; + + let permit = match Arc::clone(&semaphore).try_acquire_owned() { + Ok(p) => p, + Err(_) => { + warn!( + %peer, + "bootstrap connection cap reached; dropping connection" + ); + drop(stream); + continue; + } + }; + + let id = Arc::clone(&id); + tokio::spawn(async move { + if let Err(e) = handle_bootstrap_connection(stream, id.as_str()).await { + tracing::debug!(%e, %peer, "bootstrap connection error"); + } + // Hold the permit for the lifetime of the per-connection task. + drop(permit); + }); + } +} + +/// Listen for `TopicAnnounce` messages on the server-ops topic and +/// dynamically subscribe to announced channel topics. Topics are +/// validated against [`topic_str_is_valid`] and the number of distinct +/// subscribed topics is capped at [`MAX_TOPICS`]. +pub async fn topic_announce_listener(mut events: N::Events, network: N) +where + N: Network, +{ + let mut subscribed: HashSet = HashSet::new(); + let mut warned_full = false; + + while let Some(Ok(event)) = events.next().await { + let GossipEvent::Received(msg) = event else { + continue; + }; + let Some((willow_common::WireMessage::TopicAnnounce { topics }, _)) = + willow_common::unpack_wire(&msg.content) + else { + continue; + }; + for topic_str in topics { + if !topic_str_is_valid(&topic_str) { + warn!( + topic = %topic_str, + "rejecting invalid topic string from announce" + ); + continue; + } + if subscribed.contains(&topic_str) { + continue; + } + if subscribed.len() >= MAX_TOPICS { + if !warned_full { + warn!( + cap = MAX_TOPICS, + "topic subscription cap reached; dropping further announces" + ); + warned_full = true; + } + continue; + } + subscribed.insert(topic_str.clone()); + let topic_id = willow_network::topic_id(&topic_str); + match network.subscribe(topic_id, vec![]).await { + Ok(_) => { + info!(topic = %topic_str, "subscribed to announced channel topic"); + } + Err(e) => { + warn!( + topic = %topic_str, %e, + "failed to subscribe to announced topic" + ); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn topic_str_is_valid_accepts_basic_ascii() { + assert!(topic_str_is_valid("general")); + assert!(topic_str_is_valid("server/123/channel/abc")); + assert!(topic_str_is_valid("_willow_server_ops")); + assert!(topic_str_is_valid("a.b-c_d:e/f")); + assert!(topic_str_is_valid("0123456789")); + assert!(topic_str_is_valid("AZaz09_/:.-")); + } + + #[test] + fn topic_str_is_valid_rejects_empty() { + assert!(!topic_str_is_valid("")); + } + + #[test] + fn topic_str_is_valid_rejects_too_long() { + let long = "a".repeat(MAX_TOPIC_LEN + 1); + assert!(!topic_str_is_valid(&long)); + // Boundary: exactly MAX_TOPIC_LEN is fine. + let max = "a".repeat(MAX_TOPIC_LEN); + assert!(topic_str_is_valid(&max)); + } + + #[test] + fn topic_str_is_valid_rejects_disallowed_chars() { + assert!(!topic_str_is_valid("hello world")); // space + assert!(!topic_str_is_valid("hello\nworld")); // control + assert!(!topic_str_is_valid("hello!")); + assert!(!topic_str_is_valid("hello#world")); + assert!(!topic_str_is_valid("hello@world")); + assert!(!topic_str_is_valid("héllo")); // non-ASCII + assert!(!topic_str_is_valid("hello\0")); + } +} diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index c5ca0ea6..32840727 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -5,7 +5,6 @@ //! the gossip mesh. Dynamically subscribes to channel topics announced //! by peers via [`TopicAnnounce`](willow_common::WireMessage::TopicAnnounce). -use std::collections::HashSet; use std::net::{Ipv4Addr, SocketAddr}; use std::sync::Arc; @@ -13,10 +12,13 @@ use anyhow::{Context, Result}; use clap::Parser; use iroh_base::RelayUrl; use iroh_relay::server::{AccessConfig, RelayConfig, Server, ServerConfig}; +use tokio::sync::Semaphore; use tracing::info; use willow_identity::Identity; -use willow_network::traits::{GossipEvent, TopicEvents}; use willow_network::Network; +use willow_relay::{ + run_bootstrap_listener, topic_announce_listener, MAX_CONCURRENT_BOOTSTRAP_CONNECTIONS, +}; #[derive(Parser)] #[command(name = "willow-relay", about = "Willow iroh relay + bootstrap node")] @@ -116,35 +118,22 @@ async fn main() -> Result<()> { // ── Bootstrap ID HTTP endpoint ────────────────────────────────────── // Serve the bootstrap node's endpoint ID so web clients can fetch it. + // The accept loop applies per-connection I/O timeouts and is gated + // by a semaphore so a connection-flood DoS cannot exhaust file + // descriptors. See `willow_relay::run_bootstrap_listener`. let bootstrap_id = Arc::new(identity.endpoint_id().to_string()); - let id_for_handler = Arc::clone(&bootstrap_id); let bootstrap_listener = tokio::net::TcpListener::bind((Ipv4Addr::UNSPECIFIED, args.relay_port + 1)) .await .context("failed to bind bootstrap-id HTTP port")?; let bootstrap_port = bootstrap_listener.local_addr()?.port(); info!(port = bootstrap_port, "bootstrap-id endpoint listening"); - tokio::spawn(async move { - loop { - if let Ok((stream, _)) = bootstrap_listener.accept().await { - let id = Arc::clone(&id_for_handler); - tokio::spawn(async move { - let (mut reader, mut writer) = stream.into_split(); - // Read the request (we don't care about its contents). - let mut buf = [0u8; 1024]; - let _ = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await; - let body = id.as_str(); - let response = format!( - "HTTP/1.1 200 OK\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}", - body.len(), - body - ); - let _ = - tokio::io::AsyncWriteExt::write_all(&mut writer, response.as_bytes()).await; - }); - } - } - }); + let bootstrap_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_BOOTSTRAP_CONNECTIONS)); + tokio::spawn(run_bootstrap_listener( + bootstrap_listener, + Arc::clone(&bootstrap_id), + bootstrap_semaphore, + )); // Spawn a task that listens for TopicAnnounce messages on _willow_server_ops // and dynamically subscribes to announced channel topics. @@ -171,37 +160,3 @@ async fn main() -> Result<()> { info!("shut down complete"); Ok(()) } - -/// Listen for [`TopicAnnounce`] messages on `_willow_server_ops` and -/// dynamically subscribe to announced channel topics. -async fn topic_announce_listener( - mut events: ::Events, - network: willow_network::iroh::IrohNetwork, -) { - let mut subscribed: HashSet = HashSet::new(); - - while let Some(Ok(event)) = events.next().await { - if let GossipEvent::Received(msg) = event { - if let Some((willow_common::WireMessage::TopicAnnounce { topics }, _)) = - willow_common::unpack_wire(&msg.content) - { - for topic_str in topics { - if subscribed.insert(topic_str.clone()) { - let topic_id = willow_network::topic_id(&topic_str); - match network.subscribe(topic_id, vec![]).await { - Ok(_) => { - info!(topic = %topic_str, "subscribed to announced channel topic"); - } - Err(e) => { - tracing::warn!( - topic = %topic_str, %e, - "failed to subscribe to announced topic" - ); - } - } - } - } - } - } - } -} diff --git a/crates/relay/tests/bootstrap_endpoint.rs b/crates/relay/tests/bootstrap_endpoint.rs new file mode 100644 index 00000000..f1f1619b --- /dev/null +++ b/crates/relay/tests/bootstrap_endpoint.rs @@ -0,0 +1,200 @@ +//! Integration tests for the bootstrap-id HTTP endpoint hardening +//! (issue #112) and the topic-announce listener bounds (issue #113). +//! +//! These tests exercise the helpers exposed by `willow-relay` directly, +//! without standing up a full relay/iroh stack. + +use std::sync::Arc; +use std::time::Duration; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::Semaphore; + +use willow_relay::{ + handle_bootstrap_connection, run_bootstrap_listener, topic_str_is_valid, BOOTSTRAP_IO_TIMEOUT, + MAX_TOPIC_LEN, +}; + +const TEST_ID: &str = "0123456789abcdef0123456789abcdef"; + +/// Spawn the bootstrap listener bound to an ephemeral loopback port +/// and return its address. +async fn spawn_listener_with_capacity(capacity: usize) -> std::net::SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + let semaphore = Arc::new(Semaphore::new(capacity)); + let id = Arc::new(TEST_ID.to_string()); + tokio::spawn(run_bootstrap_listener(listener, id, semaphore)); + addr +} + +/// Read the full HTTP response from `stream` and return it as a string. +async fn read_full_response(stream: &mut TcpStream) -> String { + let mut buf = Vec::new(); + // Read until EOF or 8 KiB, whichever comes first. + let _ = tokio::time::timeout(Duration::from_secs(2), async { + let mut chunk = [0u8; 1024]; + loop { + match stream.read(&mut chunk).await { + Ok(0) => break, + Ok(n) => buf.extend_from_slice(&chunk[..n]), + Err(_) => break, + } + if buf.len() >= 8192 { + break; + } + } + }) + .await; + String::from_utf8_lossy(&buf).into_owned() +} + +#[tokio::test] +async fn bootstrap_endpoint_serves_normal_request_quickly() { + let addr = spawn_listener_with_capacity(8).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + stream + .write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + .await + .expect("write request"); + + let response = read_full_response(&mut stream).await; + + assert!( + response.starts_with("HTTP/1.1 200 OK\r\n"), + "response: {response}" + ); + assert!(response.contains(TEST_ID), "body missing id: {response}"); +} + +#[tokio::test] +async fn bootstrap_response_contains_connection_close_header() { + let addr = spawn_listener_with_capacity(8).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + stream + .write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + .await + .expect("write request"); + + let response = read_full_response(&mut stream).await; + assert!( + response.contains("Connection: close\r\n"), + "response missing Connection: close header: {response}" + ); +} + +#[tokio::test(start_paused = true)] +async fn handle_bootstrap_connection_times_out_slow_reader() { + // Use an in-memory duplex pipe so we can drive the test with paused + // tokio time. The "client" side never writes, so the handler's read + // call should block until BOOTSTRAP_IO_TIMEOUT elapses in virtual + // time, then return TimedOut. + let (server, _client) = tokio::io::duplex(64); + + let handler = tokio::spawn(async move { handle_bootstrap_connection(server, TEST_ID).await }); + + // Advance virtual time past the read deadline. + tokio::time::advance(BOOTSTRAP_IO_TIMEOUT + Duration::from_millis(1)).await; + + let result = handler.await.expect("handler join"); + let err = result.expect_err("expected timeout error"); + assert_eq!(err.kind(), std::io::ErrorKind::TimedOut); +} + +#[tokio::test] +async fn bootstrap_listener_drops_connections_when_capacity_saturated() { + // Capacity of 1 — first connection holds the only permit, second + // connection should be accepted, immediately denied a permit, and + // closed by the server before any response is written. + let addr = spawn_listener_with_capacity(1).await; + + // First client: open a connection but do NOT send a request. The + // handler's read will block waiting for request data, holding the + // sole permit for up to BOOTSTRAP_IO_TIMEOUT (5s). The test does + // not wait that long — we drop `hold` at the end. + let hold = TcpStream::connect(addr).await.expect("first connect"); + + // Give the listener a moment to accept the first connection and + // spawn the handler that grabs the permit. 50ms on local loopback + // is generous; this is the only timing-sensitive step. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Second client: should be accepted, denied a permit, and dropped. + // Do NOT write a request — the server-side stream is dropped + // immediately on permit denial, so reading from the client side + // should yield EOF (clean FIN) rather than RST. + let mut denied = TcpStream::connect(addr).await.expect("second connect"); + + // The server should close the socket without responding. Reading + // should yield EOF (0 bytes), or in some kernels a ConnectionReset + // — either outcome proves the server did not write a response. + let mut buf = [0u8; 1024]; + let read = tokio::time::timeout(Duration::from_secs(1), denied.read(&mut buf)) + .await + .expect("denied read should not block forever"); + match read { + Ok(0) => {} // EOF — expected + Ok(n) => panic!( + "expected EOF on saturated connection, got {n} bytes: {:?}", + &buf[..n] + ), + Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {} // also acceptable + Err(e) => panic!("unexpected read error on saturated connection: {e}"), + } + + // Keep `hold` alive until the assertion is done so the permit stays + // taken. Drop it explicitly here for clarity. + drop(hold); +} + +#[tokio::test] +async fn bootstrap_listener_recovers_after_permit_released() { + // After a connection completes and releases its permit, new + // connections should succeed again. + let addr = spawn_listener_with_capacity(1).await; + + // First request: serve and complete. + { + let mut stream = TcpStream::connect(addr).await.expect("connect 1"); + stream + .write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + .await + .expect("write 1"); + let response = read_full_response(&mut stream).await; + assert!(response.contains(TEST_ID)); + } + + // The server-side task drops its permit when its scope ends. Give + // the runtime a tick to run the drop, then try again. + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect 2"); + stream + .write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + .await + .expect("write 2"); + let response = read_full_response(&mut stream).await; + assert!( + response.contains(TEST_ID), + "second request failed: {response}" + ); +} + +// ── Topic validation (#113) ───────────────────────────────────────────── +// +// Most of the topic_str_is_valid coverage lives in the `tests` module +// inside `crates/relay/src/lib.rs` (those are unit tests). Re-assert +// the headline cases here so the integration suite documents the API +// surface advertised by the public helper. + +#[test] +fn topic_str_is_valid_public_api() { + assert!(topic_str_is_valid("general")); + assert!(!topic_str_is_valid("")); + assert!(!topic_str_is_valid(&"x".repeat(MAX_TOPIC_LEN + 1))); + assert!(!topic_str_is_valid("bad char!")); +}