Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions crates/relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ const PROXY_REQUEST_LINE_BUFFER: usize = 8 * 1024;
/// connection. Slow clients (Slowloris) are dropped at this deadline.
pub const BOOTSTRAP_IO_TIMEOUT: Duration = Duration::from_secs(5);

/// Maximum number of bytes buffered while draining the remainder of a
/// bootstrap-id request looking for the end-of-headers marker
/// (`\r\n\r\n`). Real HTTP requests for our single-route endpoint never
/// approach this size; a client that exceeds it is malformed or
/// abusive, and we close the connection rather than keep reading until
/// [`BOOTSTRAP_IO_TIMEOUT`]. Matches [`PROXY_REQUEST_LINE_BUFFER`] so
/// the relay applies a single, symmetric budget for header bytes.
pub const BOOTSTRAP_DRAIN_BUFFER_CAP: usize = 8 * 1024;

/// Maximum number of distinct channel topics the topic-announce
/// listener will subscribe to. Once this cap is reached the listener
/// silently drops further unique announces (after a one-shot warn).
Expand Down Expand Up @@ -272,6 +281,22 @@ async fn handle_bootstrap_request_after_line(
// end-of-headers marker "\r\n\r\n" is already in what we read, we
// don't need to read more.
if !already_read.windows(4).any(|w| w == b"\r\n\r\n") {
// Accumulate every chunk we read into a single buffer and
// search the *whole* buffer for "\r\n\r\n" each iteration.
// Searching only the latest chunk would miss a marker that
// straddles a chunk boundary, leaving the loop spinning until
// BOOTSTRAP_IO_TIMEOUT (issue #238).
//
// Seed the accumulator with the trailing 3 bytes of
// `already_read` so a marker straddling the boundary between
// the request line and the first newly-read chunk is also
// caught. (3 bytes is the most that can contribute to a
// 4-byte marker without itself containing the full marker —
// if `already_read` already held it, we wouldn't be here.)
let mut buffered: Vec<u8> = Vec::with_capacity(1024);
let seed_start = already_read.len().saturating_sub(3);
buffered.extend_from_slice(&already_read[seed_start..]);

let mut chunk = [0u8; 1024];
let deadline = tokio::time::sleep(BOOTSTRAP_IO_TIMEOUT);
tokio::pin!(deadline);
Expand All @@ -280,12 +305,21 @@ async fn handle_bootstrap_request_after_line(
_ = &mut deadline => break,
res = client.read(&mut chunk) => match res {
Ok(0) => break,
Ok(_n) => {
// We don't care about the content; just look
// for the blank line terminator.
if chunk.windows(4).any(|w| w == b"\r\n\r\n") {
Ok(n) => {
buffered.extend_from_slice(&chunk[..n]);
if buffered.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
if buffered.len() >= BOOTSTRAP_DRAIN_BUFFER_CAP {
warn!(
cap = BOOTSTRAP_DRAIN_BUFFER_CAP,
"bootstrap request headers exceeded drain cap; closing"
);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"bootstrap request headers exceeded drain cap",
));
}
}
Err(e) => return Err(e),
}
Expand Down
111 changes: 110 additions & 1 deletion crates/relay/tests/bootstrap_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::Semaphore;

use willow_relay::{
handle_bootstrap_connection, run_bootstrap_listener, run_proxy_listener, topic_str_is_valid,
BOOTSTRAP_IO_TIMEOUT, MAX_TOPIC_LEN,
BOOTSTRAP_DRAIN_BUFFER_CAP, BOOTSTRAP_IO_TIMEOUT, MAX_TOPIC_LEN,
};

const TEST_ID: &str = "0123456789abcdef0123456789abcdef";
Expand Down Expand Up @@ -323,3 +323,112 @@ fn topic_str_is_valid_public_api() {
assert!(!topic_str_is_valid(&"x".repeat(MAX_TOPIC_LEN + 1)));
assert!(!topic_str_is_valid("bad char!"));
}

// ── Bootstrap drain: CRLFCRLF + buffer cap (#238) ───────────────────────
//
// The proxy's bootstrap-id handler drains the rest of the HTTP request
// after matching the request line, looking for the end-of-headers
// marker `\r\n\r\n`. Two failure modes need coverage:
//
// 1. The marker straddles a chunk boundary (e.g. `\r\n\r` arrives in
// one read, `\n` in the next). Earlier code only inspected the
// latest chunk and missed it, blocking until BOOTSTRAP_IO_TIMEOUT.
// 2. A peer streams arbitrary bytes without ever sending the marker.
// Without a buffer cap the loop would spin until the deadline; the
// cap (BOOTSTRAP_DRAIN_BUFFER_CAP) lets the handler bail early.

#[tokio::test]
async fn proxy_bootstrap_detects_crlf_crlf_split_across_chunks() {
// Send the request line + first three bytes of the end-of-headers
// marker (`\r\n\r`), pause to force a separate read, then send the
// final `\n`. The handler must notice the marker straddling the two
// reads and respond promptly — well before BOOTSTRAP_IO_TIMEOUT.
let (upstream, _rx) = spawn_dummy_upstream().await;
let addr = spawn_proxy(upstream).await;

let mut stream = TcpStream::connect(addr).await.expect("connect");

// First write: full request line + a header line + first 3/4 of
// the terminator. Note the trailing `\r\n\r` — the missing byte is
// the final `\n` of `\r\n\r\n`.
stream
.write_all(b"GET /bootstrap-id HTTP/1.1\r\nHost: localhost\r\n\r")
.await
.expect("write part 1");
stream.flush().await.expect("flush part 1");

// Force the handler to perform at least one extra read. A real
// socket may coalesce writes; sleeping past the kernel's send
// queue ensures the second `\n` lands in its own read.
tokio::time::sleep(Duration::from_millis(50)).await;

// Second write: the missing `\n` that completes `\r\n\r\n`.
stream.write_all(b"\n").await.expect("write part 2");
stream.flush().await.expect("flush part 2");

// The response must arrive well inside the per-IO deadline. If the
// handler missed the cross-chunk marker it would block until
// BOOTSTRAP_IO_TIMEOUT (5s) elapsed — we use a 2s read timeout to
// bound the test below that.
let response = tokio::time::timeout(Duration::from_secs(2), read_full_response(&mut stream))
.await
.expect("response not received within 2s — drain likely missed cross-chunk CRLFCRLF");

assert!(
response.starts_with("HTTP/1.1 200 OK\r\n"),
"response: {response}"
);
assert!(response.contains(TEST_ID), "body missing id: {response}");
}

#[tokio::test]
async fn proxy_bootstrap_closes_when_drain_buffer_cap_exceeded() {
// Send the bootstrap request line, then stream arbitrary bytes
// (no `\r\n\r\n`) past the drain cap. The handler must give up
// and close the connection without waiting for BOOTSTRAP_IO_TIMEOUT.
let (upstream, _rx) = spawn_dummy_upstream().await;
let addr = spawn_proxy(upstream).await;

let mut stream = TcpStream::connect(addr).await.expect("connect");

stream
.write_all(b"GET /bootstrap-id HTTP/1.1\r\nHost: localhost\r\n")
.await
.expect("write request line");

// Spam a header value with bytes that contain no `\r\n\r\n`. We
// write well past BOOTSTRAP_DRAIN_BUFFER_CAP so the handler must
// bail on the cap rather than the deadline.
let pad_len = BOOTSTRAP_DRAIN_BUFFER_CAP * 2;
let mut padding = Vec::with_capacity(pad_len + 16);
padding.extend_from_slice(b"X-Pad: ");
padding.extend(std::iter::repeat_n(b'A', pad_len));
// Deliberately omit the terminating CRLF + blank line.

// The write may block briefly if the server stops reading, so do
// it concurrently with the response-side read.
let writer = tokio::spawn(async move {
// Best-effort: ignore errors after the server closes the socket.
let _ = stream.write_all(&padding).await;
// Read whatever the server sent (likely nothing) and observe EOF.
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await;
buf
});

// Bound the entire exchange well below BOOTSTRAP_IO_TIMEOUT (5s).
// If the cap-bail fires the handler closes promptly; if it does
// not, the test would hang here.
let buf = tokio::time::timeout(Duration::from_secs(2), writer)
.await
.expect("connection still open after drain cap should have bailed")
.expect("writer task panicked");

// The handler returns an error before writing a response, so the
// client sees an empty stream (EOF) — never a 200.
let body = String::from_utf8_lossy(&buf);
assert!(
!body.contains("HTTP/1.1 200 OK"),
"expected handler to bail without responding, got: {body}"
);
}