From 197c534a1176d076beb70d34c9cf416ae9f7c935 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 27 Apr 2026 09:40:58 +0000 Subject: [PATCH] fix(relay): detect CRLFCRLF across chunks + cap drain buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bootstrap drain loop's `chunk.windows(4).any(...)` only inspected latest read. CRLFCRLF straddling two reads = miss + spin to 5s deadline. no chunk-count cap either. now: accumulate into bounded buffer (8 KiB, matches PROXY_REQUEST_LINE_BUFFER for symmetric header budget), search whole buffer each iteration. seed w/ trailing 3 bytes of already_read so request-line/chunk boundary also covered. bail w/ tracing::warn + InvalidData err on cap overflow → connection closes promptly. tests: cross-chunk detection + cap-overflow bail. rejected: rolling 4-byte tail. simpler in bytes but harder to reason about correctness around edge cases (short reads, seeded state). 8 KiB buffer cost trivial vs clarity gain. Closes #238 --- crates/relay/src/lib.rs | 42 ++++++++- crates/relay/tests/bootstrap_endpoint.rs | 111 ++++++++++++++++++++++- 2 files changed, 148 insertions(+), 5 deletions(-) diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index a2293dfc..c187257d 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -74,6 +74,15 @@ const PROXY_REQUEST_LINE_BUFFER: usize = 8 * 1024; /// connection. Slow clients (Slowloris) are dropped at this deadline. pub const BOOTSTRAP_IO_TIMEOUT: Duration = Duration::from_secs(5); +/// Maximum number of bytes buffered while draining the remainder of a +/// bootstrap-id request looking for the end-of-headers marker +/// (`\r\n\r\n`). Real HTTP requests for our single-route endpoint never +/// approach this size; a client that exceeds it is malformed or +/// abusive, and we close the connection rather than keep reading until +/// [`BOOTSTRAP_IO_TIMEOUT`]. Matches [`PROXY_REQUEST_LINE_BUFFER`] so +/// the relay applies a single, symmetric budget for header bytes. +pub const BOOTSTRAP_DRAIN_BUFFER_CAP: usize = 8 * 1024; + /// Maximum number of distinct channel topics the topic-announce /// listener will subscribe to. Once this cap is reached the listener /// silently drops further unique announces (after a one-shot warn). @@ -272,6 +281,22 @@ async fn handle_bootstrap_request_after_line( // end-of-headers marker "\r\n\r\n" is already in what we read, we // don't need to read more. if !already_read.windows(4).any(|w| w == b"\r\n\r\n") { + // Accumulate every chunk we read into a single buffer and + // search the *whole* buffer for "\r\n\r\n" each iteration. + // Searching only the latest chunk would miss a marker that + // straddles a chunk boundary, leaving the loop spinning until + // BOOTSTRAP_IO_TIMEOUT (issue #238). + // + // Seed the accumulator with the trailing 3 bytes of + // `already_read` so a marker straddling the boundary between + // the request line and the first newly-read chunk is also + // caught. (3 bytes is the most that can contribute to a + // 4-byte marker without itself containing the full marker — + // if `already_read` already held it, we wouldn't be here.) + let mut buffered: Vec = Vec::with_capacity(1024); + let seed_start = already_read.len().saturating_sub(3); + buffered.extend_from_slice(&already_read[seed_start..]); + let mut chunk = [0u8; 1024]; let deadline = tokio::time::sleep(BOOTSTRAP_IO_TIMEOUT); tokio::pin!(deadline); @@ -280,12 +305,21 @@ async fn handle_bootstrap_request_after_line( _ = &mut deadline => break, res = client.read(&mut chunk) => match res { Ok(0) => break, - Ok(_n) => { - // We don't care about the content; just look - // for the blank line terminator. - if chunk.windows(4).any(|w| w == b"\r\n\r\n") { + Ok(n) => { + buffered.extend_from_slice(&chunk[..n]); + if buffered.windows(4).any(|w| w == b"\r\n\r\n") { break; } + if buffered.len() >= BOOTSTRAP_DRAIN_BUFFER_CAP { + warn!( + cap = BOOTSTRAP_DRAIN_BUFFER_CAP, + "bootstrap request headers exceeded drain cap; closing" + ); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "bootstrap request headers exceeded drain cap", + )); + } } Err(e) => return Err(e), } diff --git a/crates/relay/tests/bootstrap_endpoint.rs b/crates/relay/tests/bootstrap_endpoint.rs index 6dbb9578..81f4e16d 100644 --- a/crates/relay/tests/bootstrap_endpoint.rs +++ b/crates/relay/tests/bootstrap_endpoint.rs @@ -13,7 +13,7 @@ use tokio::sync::Semaphore; use willow_relay::{ handle_bootstrap_connection, run_bootstrap_listener, run_proxy_listener, topic_str_is_valid, - BOOTSTRAP_IO_TIMEOUT, MAX_TOPIC_LEN, + BOOTSTRAP_DRAIN_BUFFER_CAP, BOOTSTRAP_IO_TIMEOUT, MAX_TOPIC_LEN, }; const TEST_ID: &str = "0123456789abcdef0123456789abcdef"; @@ -323,3 +323,112 @@ fn topic_str_is_valid_public_api() { assert!(!topic_str_is_valid(&"x".repeat(MAX_TOPIC_LEN + 1))); assert!(!topic_str_is_valid("bad char!")); } + +// ── Bootstrap drain: CRLFCRLF + buffer cap (#238) ─────────────────────── +// +// The proxy's bootstrap-id handler drains the rest of the HTTP request +// after matching the request line, looking for the end-of-headers +// marker `\r\n\r\n`. Two failure modes need coverage: +// +// 1. The marker straddles a chunk boundary (e.g. `\r\n\r` arrives in +// one read, `\n` in the next). Earlier code only inspected the +// latest chunk and missed it, blocking until BOOTSTRAP_IO_TIMEOUT. +// 2. A peer streams arbitrary bytes without ever sending the marker. +// Without a buffer cap the loop would spin until the deadline; the +// cap (BOOTSTRAP_DRAIN_BUFFER_CAP) lets the handler bail early. + +#[tokio::test] +async fn proxy_bootstrap_detects_crlf_crlf_split_across_chunks() { + // Send the request line + first three bytes of the end-of-headers + // marker (`\r\n\r`), pause to force a separate read, then send the + // final `\n`. The handler must notice the marker straddling the two + // reads and respond promptly — well before BOOTSTRAP_IO_TIMEOUT. + let (upstream, _rx) = spawn_dummy_upstream().await; + let addr = spawn_proxy(upstream).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + + // First write: full request line + a header line + first 3/4 of + // the terminator. Note the trailing `\r\n\r` — the missing byte is + // the final `\n` of `\r\n\r\n`. + stream + .write_all(b"GET /bootstrap-id HTTP/1.1\r\nHost: localhost\r\n\r") + .await + .expect("write part 1"); + stream.flush().await.expect("flush part 1"); + + // Force the handler to perform at least one extra read. A real + // socket may coalesce writes; sleeping past the kernel's send + // queue ensures the second `\n` lands in its own read. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Second write: the missing `\n` that completes `\r\n\r\n`. + stream.write_all(b"\n").await.expect("write part 2"); + stream.flush().await.expect("flush part 2"); + + // The response must arrive well inside the per-IO deadline. If the + // handler missed the cross-chunk marker it would block until + // BOOTSTRAP_IO_TIMEOUT (5s) elapsed — we use a 2s read timeout to + // bound the test below that. + let response = tokio::time::timeout(Duration::from_secs(2), read_full_response(&mut stream)) + .await + .expect("response not received within 2s — drain likely missed cross-chunk CRLFCRLF"); + + assert!( + response.starts_with("HTTP/1.1 200 OK\r\n"), + "response: {response}" + ); + assert!(response.contains(TEST_ID), "body missing id: {response}"); +} + +#[tokio::test] +async fn proxy_bootstrap_closes_when_drain_buffer_cap_exceeded() { + // Send the bootstrap request line, then stream arbitrary bytes + // (no `\r\n\r\n`) past the drain cap. The handler must give up + // and close the connection without waiting for BOOTSTRAP_IO_TIMEOUT. + let (upstream, _rx) = spawn_dummy_upstream().await; + let addr = spawn_proxy(upstream).await; + + let mut stream = TcpStream::connect(addr).await.expect("connect"); + + stream + .write_all(b"GET /bootstrap-id HTTP/1.1\r\nHost: localhost\r\n") + .await + .expect("write request line"); + + // Spam a header value with bytes that contain no `\r\n\r\n`. We + // write well past BOOTSTRAP_DRAIN_BUFFER_CAP so the handler must + // bail on the cap rather than the deadline. + let pad_len = BOOTSTRAP_DRAIN_BUFFER_CAP * 2; + let mut padding = Vec::with_capacity(pad_len + 16); + padding.extend_from_slice(b"X-Pad: "); + padding.extend(std::iter::repeat_n(b'A', pad_len)); + // Deliberately omit the terminating CRLF + blank line. + + // The write may block briefly if the server stops reading, so do + // it concurrently with the response-side read. + let writer = tokio::spawn(async move { + // Best-effort: ignore errors after the server closes the socket. + let _ = stream.write_all(&padding).await; + // Read whatever the server sent (likely nothing) and observe EOF. + let mut buf = Vec::new(); + let _ = stream.read_to_end(&mut buf).await; + buf + }); + + // Bound the entire exchange well below BOOTSTRAP_IO_TIMEOUT (5s). + // If the cap-bail fires the handler closes promptly; if it does + // not, the test would hang here. + let buf = tokio::time::timeout(Duration::from_secs(2), writer) + .await + .expect("connection still open after drain cap should have bailed") + .expect("writer task panicked"); + + // The handler returns an error before writing a response, so the + // client sees an empty stream (EOF) — never a 200. + let body = String::from_utf8_lossy(&buf); + assert!( + !body.contains("HTTP/1.1 200 OK"), + "expected handler to bail without responding, got: {body}" + ); +}