feat(server,preamble): PreambleHooks, clamp timeout, async failure callback#398
feat(server,preamble): PreambleHooks, clamp timeout, async failure callback#398
Conversation
…ilure callback This update introduces a new `preamble_timeout` configuration for `WireframeServer` to bound the maximum duration allowed for reading a connection preamble. If the preamble read exceeds this timeout, a failure handler (if registered) asynchronously executes, allowing custom protocol errors to be sent before the connection is closed. This prevents resources from being tied up indefinitely by clients that fail to send a valid preamble. Key changes include: - Added `preamble_timeout` builder method clamping minimum timeout to 1 ms. - Modified `spawn_connection_task` to enforce timeout around `read_preamble` with tokio's `timeout`. - Enhanced failure handler signature to accept mutable `TcpStream` for replying before disconnection. - Extended documentation with usage examples and detailed guides. - Added tests verifying timeout enforcement, failure handler invocation, and response writing. This enhances server resilience and DoS protection by bounding handshake durations. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Reviewer's GuideAdds a configurable preamble read timeout to WireframeServer, refactors preamble hook wiring to carry this timeout, and upgrades the preamble decode failure callback to an async handler with access to the TcpStream so it can respond before the connection is closed. Tests and documentation are updated to cover the new behavior. Sequence diagram for preamble read with timeout and async failure handlersequenceDiagram
actor Client
participant WireframeServer
participant AcceptLoop
participant ConnectionTask
participant ProcessStream
participant ReadPreamble
participant PreambleFailureHandler
participant WireframeApp
Client->>WireframeServer: connect
WireframeServer->>AcceptLoop: start accept_loop
AcceptLoop->>AcceptLoop: listener.accept()
AcceptLoop->>ConnectionTask: spawn_connection_task(stream, factory, hooks.on_success, hooks.on_failure, hooks.timeout)
ConnectionTask->>ProcessStream: process_stream(stream, peer_addr, factory, on_success, on_failure, preamble_timeout)
alt preamble_timeout is Some
ProcessStream->>ProcessStream: timeout(preamble_timeout, read_preamble)
ProcessStream->>ReadPreamble: read_preamble(stream)
alt preamble read completes in time
ReadPreamble-->>ProcessStream: Ok(preamble, leftover)
else preamble read times out
ProcessStream-->>ProcessStream: Err(timeout_error)
end
else preamble_timeout is None
ProcessStream->>ReadPreamble: read_preamble(stream)
ReadPreamble-->>ProcessStream: Result(preamble or error)
end
alt preamble_result is Ok
ProcessStream->>ProcessStream: invoke on_success if Some
opt on_success is Some
ProcessStream->>WireframeApp: on_preamble_success(preamble, stream)
WireframeApp-->>ProcessStream: Result
end
ProcessStream->>WireframeApp: hand off stream and preamble
else preamble_result is Err (decode failure or timeout)
alt on_failure is Some
ProcessStream->>PreambleFailureHandler: on_preamble_failure(error, stream)
PreambleFailureHandler-->>ProcessStream: io::Result
ProcessStream->>Client: optional protocol error reply via stream
ProcessStream->>Client: close connection
else on_failure is None
ProcessStream->>ProcessStream: log error with peer_addr
ProcessStream->>Client: close connection
end
end
Updated class diagram for preamble hooks, handlers, and WireframeServerclassDiagram
class WireframeServer~F,T,S~ {
+F factory
+usize workers
+Option~PreambleHandler~T~~ on_preamble_success
+Option~PreambleFailure~T~~ on_preamble_failure
+Option~oneshot_Sender~unit~~ ready_tx
+BackoffConfig backoff_config
+Option~Duration~ preamble_timeout
+S state
+preamble_timeout(timeout Duration) WireframeServer~F,T,S~
+on_preamble_decode_success(handler PreambleHandler~T~) WireframeServer~F,T,S~
+on_preamble_decode_failure(handler PreambleFailure~T~) WireframeServer~F,T,S~
}
class PreambleHooks~T~ {
+Option~PreambleHandler~T~~ on_success
+Option~PreambleFailure~T~~ on_failure
+Option~Duration~ timeout
+new() PreambleHooks~T~
+clone() PreambleHooks~T~
+default() PreambleHooks~T~
}
class PreambleSuccessHandler~T~ {
<<interface>>
+call(preamble T, stream TcpStream) Future
}
class PreambleFailureHandler~T~ {
<<interface>>
+call(error DecodeError, stream TcpStream) BoxFuture~io_Result_unit~~
}
class PreambleHandler~T~ {
<<typealias>>
+Arc~PreambleSuccessHandler~T~~
}
class PreambleFailure~T~ {
<<typealias>>
+Arc~PreambleFailureHandler~T~~
}
class BackoffConfig {
+Duration initial_delay
+Duration max_delay
+f64 multiplier
}
class AcceptLoop {
+accept_loop(listener AcceptListener, factory F, preamble PreambleHooks~T~, shutdown CancellationToken, tracker TaskTracker, backoff_config BackoffConfig) Future
}
class ConnectionTask {
+spawn_connection_task(stream TcpStream, factory F, on_success Option~PreambleHandler~T~~, on_failure Option~PreambleFailure~T~~, preamble_timeout Option~Duration~, tracker TaskTracker)
+process_stream(stream TcpStream, peer_addr Option~SocketAddr~, factory F, on_success Option~PreambleHandler~T~~, on_failure Option~PreambleFailure~T~~, preamble_timeout Option~Duration~) Future
+timeout_error() DecodeError
}
WireframeServer~F,T,S~ --> PreambleHandler~T~ : uses success
WireframeServer~F,T,S~ --> PreambleFailure~T~ : uses failure
WireframeServer~F,T,S~ --> BackoffConfig : has
WireframeServer~F,T,S~ --> PreambleHooks~T~ : configures
PreambleHandler~T~ ..> PreambleSuccessHandler~T~ : wraps
PreambleFailure~T~ ..> PreambleFailureHandler~T~ : wraps
AcceptLoop ..> PreambleHooks~T~ : takes
AcceptLoop ..> ConnectionTask : spawns
ConnectionTask ..> PreambleHandler~T~ : optional
ConnectionTask ..> PreambleFailure~T~ : optional
ConnectionTask ..> PreambleFailureHandler~T~ : invokes
ConnectionTask ..> PreambleSuccessHandler~T~ : invokes
ConnectionTask ..> BackoffConfig : uses
PreambleHooks~T~ --> PreambleHandler~T~ : on_success
PreambleHooks~T~ --> PreambleFailure~T~ : on_failure
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Summary by CodeRabbitRelease Notes
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughIntroduce an optional Changes
Sequence Diagram(s)sequenceDiagram
participant Accept as Acceptor
participant Conn as spawn_connection_task
participant Proc as process_stream
participant Handler as on_preamble_decode_failure
participant Stream as TcpStream
Note over Accept,Conn: hand off TcpStream + PreambleHooks{timeout,...}
Accept->>Conn: hand off TcpStream + PreambleHooks
Conn->>Proc: start processing (passes timeout)
Proc->>Proc: attempt read_preamble (with optional timeout)
alt read_preamble succeeds
Proc->>Proc: invoke on_success handler (async)
Proc->>Stream: proceed to handshake
else read_preamble times out or decodes to error
Proc->>Handler: call Handler(error, &mut Stream) [async]
Handler->>Stream: optionally write error response
Handler-->>Proc: future resolves (ok / err)
Proc->>Stream: close connection
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- The
PreambleHooks<T>struct only contains clonable/optional fields and could deriveCloneandDefaultinstead of hand-written impls, which would simplify the code and keep it in sync if fields change. - The generic type parameter
TonPreambleFailureHandler<T>is never used in the trait bounds or methods, so consider removing it (and using a non-generic trait/alias) to reduce API surface and avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `PreambleHooks<T>` struct only contains clonable/optional fields and could derive `Clone` and `Default` instead of hand-written impls, which would simplify the code and keep it in sync if fields change.
- The generic type parameter `T` on `PreambleFailureHandler<T>` is never used in the trait bounds or methods, so consider removing it (and using a non-generic trait/alias) to reduce API surface and avoid confusion.
## Individual Comments
### Comment 1
<location> `tests/preamble.rs:270` </location>
<code_context>
+
+#[rstest]
+#[tokio::test]
+async fn preamble_timeout_invokes_failure_handler_and_closes_connection(
+ factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
+) {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test to cover the case where a preamble timeout is configured but a valid preamble arrives in time
Currently we only cover the timeout/failure path. Please add a test where `preamble_timeout` is set (e.g., 50–100 ms) and a valid preamble arrives well within that window, asserting that the success handler runs and the connection remains open. This will help catch regressions where the timeout might incorrectly fire on timely preambles.
Suggested implementation:
```rust
timeout(Duration::from_millis(200), failure_rx)
.await
.expect("timeout waiting for failure callback")
.expect("failure callback send");
})
.await;
}
#[rstest]
#[tokio::test]
async fn preamble_timeout_allows_timely_preamble_and_keeps_connection_open(
factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
) {
// Arrange
let (success_holder, success_rx) = channel_holder();
let (failure_holder, failure_rx) = channel_holder();
let server = WireframeServer::new(factory)
.with_preamble::<HotlinePreamble>()
// Configure a relatively short timeout; the client will send a valid preamble well
// within this window.
.with_preamble_timeout(Duration::from_millis(100))
.on_preamble(move |preamble: HotlinePreamble, mut stream| {
let success_holder = success_holder.clone();
Box::pin(async move {
// In a real test you might assert on fields from `preamble` here.
// For now we just signal that the success handler ran and keep the
// connection open by writing back a simple response.
if let Some(tx) = success_holder.lock().expect("lock").take() {
let _ = tx.send(());
}
stream
.write_all(b"OK")
.await
.expect("write in success handler failed");
stream.flush().await.expect("flush in success handler failed");
})
})
.on_preamble_decode_failure(move |_, _stream| {
let failure_holder = failure_holder.clone();
Box::pin(async move {
// We should NOT hit this in this test – if we do, we'll observe it below
// via `failure_rx`.
if let Some(tx) = failure_holder.lock().expect("lock").take() {
let _ = tx.send(());
}
})
});
// Start the server with a connected client using whatever helper is already
// used in the other preamble tests.
let (client, _guard) = start_server_and_connect_client(server).await;
// Act: send a valid preamble immediately, well before the timeout expires.
send_valid_hotline_preamble(&client).await;
// Assert: the preamble success handler ran before the timeout elapsed.
timeout(Duration::from_millis(200), success_rx)
.await
.expect("timeout waiting for preamble success callback")
.expect("preamble success callback send");
// Assert: the failure handler did NOT run (we expect this to time out).
assert!(
timeout(Duration::from_millis(150), failure_rx).await.is_err(),
"preamble failure callback unexpectedly fired for timely, valid preamble"
);
// Assert: the connection remains open and usable after the preamble.
// We expect to be able to read the "OK" response sent in the success handler.
let mut buf = [0u8; 2];
client
.read_exact(&mut buf)
.await
.expect("connection should remain open and readable after preamble");
assert_eq!(&buf, b"OK", "expected OK response from preamble success handler");
}
```
The snippet above assumes the existence of several helpers and APIs that should be aligned with your existing tests:
1) Server/client bootstrap:
- Replace `start_server_and_connect_client(server).await` with whatever helper you are already using in this file to:
- Spawn the `WireframeServer`
- Return a connected client `TcpStream` (or equivalent) and any guard/handle used to keep the server alive for the test.
- Use the same pattern as in `preamble_timeout_invokes_failure_handler_and_closes_connection` for consistency.
2) Sending a valid preamble:
- Implement `send_valid_hotline_preamble(&client).await` or replace it with the actual helper you already use to send a valid `HotlinePreamble` from the client side.
- Make sure it sends exactly what `with_preamble::<HotlinePreamble>()` expects so that the success handler runs.
3) Success handler wiring:
- If your codebase uses a different method name than `.on_preamble(...)` for the success path (e.g., `.on_preamble_success(...)` or a generic `.on_preamble_decoded(...)`), adjust the call accordingly.
- Mirror the pattern used elsewhere in `tests/preamble.rs` for the "happy path" preamble test (e.g., the test that asserts successful preamble handling without a timeout).
4) Channel helpers:
- This test assumes `channel_holder()` returns something like `(Arc<Mutex<Option<oneshot::Sender<()>>>>, oneshot::Receiver<()>)` and that cloning the holder is the intended pattern (as in your failure test).
- If your actual type or pattern is slightly different, adjust the `success_holder`/`failure_holder` usage to match the existing tests.
5) Connection-open assertion:
- If your existing tests use a different way to assert that the connection remains open (for example, attempting a second application-level message or using a helper like `assert_connection_open(&client).await`), replace the manual `read_exact`/`assert_eq!` portion with that existing pattern.
Once you align these placeholders with your current helpers and APIs, this test will exercise the scenario where:
- A preamble timeout is configured,
- A valid preamble arrives well within that timeout,
- The success handler runs,
- The failure handler does not run,
- And the connection remains open and usable after preamble processing.
</issue_to_address>
### Comment 2
<location> `tests/preamble.rs:233` </location>
<code_context>
+#[rstest]
+#[tokio::test]
+async fn failure_callback_can_write_response(
+ factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
+) {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a timeout-based failure test that also writes a response before closing
The new test covers the decode-error path well. Since the timeout path now uses the same failure handler with a `TimedOut` error and a live `TcpStream`, please add a similar test for the preamble-timeout case that writes a protocol error, verifies the client receives the response bytes, and then confirms the connection closes. This will validate the same "respond then close" behavior for timeouts as for decode failures.
Suggested implementation:
```rust
#[rstest]
#[tokio::test]
async fn failure_callback_can_write_response(
factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
let (failure_holder, failure_rx) = channel_holder();
let server = WireframeServer::new(factory)
.with_preamble::<HotlinePreamble>()
.on_preamble_decode_failure(move |_, mut stream| {
let failure_holder = failure_holder.clone();
Box::pin(async move {
stream.write_all(b"ERR").await.expect("write failed");
stream.flush().await.expect("flush failed");
if let Some(tx) = failure_holder.lock().expect("lock").take() {
let _ = tx.send(());
}
// Ensure we always close the connection after responding
stream.shutdown().await.expect("shutdown failed");
Ok::<(), io::Error>(())
})
});
with_running_server(server, |addr| async move {
let mut stream = TcpStream::connect(addr).await.expect("connect failed");
// Send invalid data to trigger a decode failure
stream
.write_all(b"not-a-valid-preamble")
.await
.expect("write failed");
// Read the protocol error response
let mut buf = [0u8; 3];
stream.read_exact(&mut buf).await.expect("read_exact failed");
assert_eq!(&buf, b"ERR");
// After the response is written, the server should close the connection
let mut eof_buf = [0u8; 1];
let read_res = stream.read(&mut eof_buf).await.expect("read failed");
assert_eq!(read_res, 0);
// Also ensure the failure callback actually ran
failure_rx.await.expect("failure callback not called");
})
.await;
}
#[rstest]
#[tokio::test]
async fn preamble_timeout_failure_writes_response_and_closes(
factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
) {
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::timeout;
let (failure_holder, failure_rx) = channel_holder();
let server = WireframeServer::new(factory)
.with_preamble::<HotlinePreamble>()
// Configure a short preamble timeout so the test completes quickly
.with_preamble_timeout(Duration::from_millis(50))
// The timeout path should use the same failure handler and still
// allow us to write a response before closing the connection.
.on_preamble_decode_failure(move |_, mut stream| {
let failure_holder = failure_holder.clone();
Box::pin(async move {
stream.write_all(b"ERR").await.expect("write failed");
stream.flush().await.expect("flush failed");
if let Some(tx) = failure_holder.lock().expect("lock").take() {
let _ = tx.send(());
}
// After responding, close the connection from the server side
stream.shutdown().await.expect("shutdown failed");
Ok::<(), io::Error>(())
})
});
with_running_server(server, |addr| async move {
let mut stream = TcpStream::connect(addr).await.expect("connect failed");
// Do NOT send a preamble; just wait for the server-side timeout to fire.
// We expect the failure handler to write "ERR" and then close.
let mut buf = [0u8; 3];
// Use a client-side timeout so the test doesn't hang if something breaks.
let () = timeout(Duration::from_secs(2), async {
stream.read_exact(&mut buf).await.expect("read_exact failed");
})
.await
.expect("did not receive timeout error response in time");
assert_eq!(&buf, b"ERR");
// After the error response, the connection should be closed.
let mut eof_buf = [0u8; 1];
let read_len = stream.read(&mut eof_buf).await.expect("read failed");
assert_eq!(read_len, 0, "connection should be closed after timeout error response");
// Ensure the timeout failure callback actually executed.
failure_rx.await.expect("timeout failure callback not called");
})
.await;
}
```
The above changes assume the following, which you may need to adjust to your actual API and imports:
1) There is a `with_preamble_timeout(Duration)` builder on `WireframeServer` that configures the preamble timeout. If the real method has a different name or signature (for example, `with_timeout`, `with_preamble_read_timeout`, or configuration via `WireframeConfig`), update the call accordingly.
2) The timeout path reuses the same `.on_preamble_decode_failure` callback with a `TimedOut` error, as referenced in your comment. If the timeout uses a different hook (e.g., `.on_preamble_timeout_failure` or a generic `.on_preamble_failure` that includes the error), wire the closure up to that method instead, keeping the “write ERR, flush, then close” behavior intact.
3) If `AsyncWriteExt::shutdown` is not in scope through your existing imports, ensure `tokio::io::AsyncWriteExt` is imported once at the top of the file instead of inside each test (or remove the explicit `shutdown` call if your server already closes the connection after the handler returns).
4) If you already have shared helpers for:
- connecting a client,
- asserting EOF after a response,
- or waiting for the failure callback,
you can replace the inline logic in the tests with those helpers to align with the rest of the test suite style.
</issue_to_address>
### Comment 3
<location> `src/server/config/tests.rs:142` </location>
<code_context>
- PreambleHandlerKind::Failure => server.on_preamble_decode_failure(move |_err: &DecodeError| {
- c.fetch_add(1, Ordering::SeqCst);
- }),
+ PreambleHandlerKind::Failure => server.on_preamble_decode_failure(
+ move |_err: &DecodeError, _stream| {
+ let c = c.clone();
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test case for a failure handler that returns an error to exercise the error-logging/cleanup path
The current tests only cover the case where the async failure handler returns Ok(()). Since process_stream now logs and closes on handler errors, please add a variant where the handler immediately returns Err(io::Error::new(io::ErrorKind::Other, "boom")), trigger a preamble failure, and assert that the connection is closed and the test completes without panic. This will exercise the new error path and validate the error-handling behavior.
Suggested implementation:
```rust
PreambleHandlerKind::Failure => server.on_preamble_decode_failure(
move |_err: &DecodeError, _stream| {
let c = c.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Ok::<(), io::Error>(())
})
},
),
};
assert_eq!(counter.load(Ordering::SeqCst), 0);
.on_preamble_failure
}
#[rstest]
#[tokio::test]
async fn test_preamble_failure_handler_error() {
use std::io;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
// Arrange
let counter = Arc::new(AtomicUsize::new(0));
let c = counter.clone();
// Build a server with a failure handler that returns an error.
//
// This should mirror the setup used in `test_bind_success`, except that:
// - we always install the `PreambleHandlerKind::Failure` variant
// - the failure handler returns `Err(io::Error)` instead of `Ok(())`
let server = {
let counter = c.clone();
// NOTE: This assumes the same server/builder initialization pattern as in `test_bind_success`.
// The outer setup (e.g., creating the server, binding sockets, etc.) should be identical,
// differing only in this handler configuration.
let server = make_test_server(); // see <additional_changes> for details
let _server = match PreambleHandlerKind::Failure {
PreambleHandlerKind::Failure => server.on_preamble_decode_failure(
move |_err: &DecodeError, _stream| {
let c = counter.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Err::<(), io::Error>(io::Error::new(io::ErrorKind::Other, "boom"))
})
},
),
// If other variants are required by the builder in this scope, mirror them from
// the existing `test_bind_success` implementation.
};
server
};
// Act: trigger a preamble failure on this server/connection.
//
// This should follow the same pattern used by the existing tests to induce a preamble
// decode failure (e.g., connecting a client and sending an invalid/partial preamble).
let result = trigger_preamble_failure(server.clone()).await;
// Assert:
//
// 1. The failure handler was invoked exactly once.
assert_eq!(counter.load(Ordering::SeqCst), 1);
// 2. The processing completed without panicking. Depending on the helpers, this may
// surface as either Ok(()) or an io::Error; the important thing is that the future
// completes and the error path is exercised.
assert!(result.is_ok() || result.is_err());
// 3. The connection associated with this server is closed after the failure.
// Reuse whatever helper the test module already uses for this check.
assert!(connection_is_closed(&server));
```
for details
let _server = match PreambleHandlerKind::Failure {
PreambleHandlerKind::Failure => server.on_preamble_decode_failure(
move |_err: &DecodeError, _stream| {
let c = counter.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Err::<(), io::Error>(io::Error::new(io::ErrorKind::Other, "boom"))
})
},
),
// If other variants are required by the builder in this scope, mirror them from
// the existing `test_bind_success` implementation.
};
server
};
// Act: trigger a preamble failure on this server/connection.
//
// This should follow the same pattern used by the existing tests to induce a preamble
// decode failure (e.g., connecting a client and sending an invalid/partial preamble).
let result = trigger_preamble_failure(server.clone()).await;
// Assert:
//
// 1. The failure handler was invoked exactly once.
assert_eq!(counter.load(Ordering::SeqCst), 1);
// 2. The processing completed without panicking. Depending on the helpers, this may
// surface as either Ok(()) or an io::Error; the important thing is that the future
// completes and the error path is exercised.
assert!(result.is_ok() || result.is_err());
// 3. The connection associated with this server is closed after the failure.
// Reuse whatever helper the test module already uses for this check.
assert!(connection_is_closed(&server));
>>>>>>> REPLACE
</file_operation>
</file_operations>
<additional_changes>
To fully implement and compile this new test, you will need to:
1. Replace `make_test_server()` with the actual server/builder setup used in `test_bind_success`:
- Copy whatever code constructs and configures the `server` in `test_bind_success` into this test.
- Keep it identical except for the `on_preamble_decode_failure` configuration, which should use the error-returning handler shown in this patch.
2. Ensure the `match PreambleHandlerKind::Failure { ... }` block matches your existing pattern:
- In `test_bind_success`, if you currently match on multiple `PreambleHandlerKind` variants (e.g., `Success`, `Failure`, etc.), mirror all of those arms here so the builder is configured consistently.
- The only behavioral difference in this test should be that the `Failure` arm returns `Err(io::Error::new(io::ErrorKind::Other, "boom"))` after incrementing the counter.
3. Implement or reuse the helper to trigger a preamble failure:
- Replace `trigger_preamble_failure(server.clone()).await` with the same sequence your existing tests use to cause a preamble decode failure (e.g., create a client connection, send an invalid preamble, and wait for the server to process it).
- If you already have a helper function for this in the test module, call that instead of `trigger_preamble_failure`.
4. Implement or reuse the connection-closed assertion:
- Replace `connection_is_closed(&server)` with your real assertion for checking that the connection was closed after the failure.
- For example, this might be:
- Checking that `process_stream` returned an error and the underlying transport reports `is_closed()`, or
- Inspecting a captured log / metric / internal state that indicates the connection was closed, depending on how the rest of the tests are structured.
5. Verify imports:
- If `io`, `AtomicUsize`, `Ordering`, `Arc`, `DecodeError`, or `PreambleHandlerKind` are already imported at the module level in this file, you can remove the `use` statements inside the test to avoid duplication.
- Otherwise, move these `use` statements to the top of the file with the other imports to keep the style consistent with the existing tests.
With these adjustments, `test_preamble_failure_handler_error` will:
- Use a failure handler that immediately returns `Err(io::Error::new(io::ErrorKind::Other, "boom"))`,
- Trigger a preamble decode failure to invoke that handler,
- Assert that the handler was called,
- Assert that the connection is closed afterward,
- And confirm that the test completes without panic, exercising the error-logging/cleanup path in `process_stream`.
</issue_to_address>
### Comment 4
<location> `src/server/mod.rs:65` </location>
<code_context>
+///
+/// Implementors may perform asynchronous I/O on the provided stream to emit a
+/// response before the connection is closed.
+pub trait PreambleFailureHandler<T>:
+ for<'a> Fn(&'a DecodeError, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>>
+ + Send
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the new PreambleFailureHandler trait and its blanket impl with a single type alias for the async callback to make the handler’s role clearer and the code simpler.
You can simplify the new preamble failure handler without losing any functionality by dropping the trait and blanket impl, and keeping just a single type alias for the callback.
Right now, `PreambleFailureHandler<T>` is just a renamed function signature with no additional behavior, and the extra indirection makes it harder to see what the handler actually is.
You can replace the trait + impl + alias with a single alias:
```rust
/// Handler invoked when decoding a connection preamble fails.
///
/// Implementors may perform asynchronous I/O on the provided stream to emit a
/// response before the connection is closed.
pub type PreambleFailure<T> = Arc<
dyn for<'a> Fn(
&'a DecodeError,
&'a mut tokio::net::TcpStream,
) -> BoxFuture<'a, io::Result<()>>
+ Send
+ Sync
+ 'static
>;
```
And then keep the usage unchanged in the server struct:
```rust
pub struct WireframeServer<T, S, F>
where
T: Preamble,
S: ServerState,
{
// ...
pub(crate) on_preamble_success: Option<PreambleHandler<T>>,
pub(crate) on_preamble_failure: Option<PreambleFailure<T>>,
// ...
}
```
This:
- Removes an extra public trait and impl that don’t add capabilities (no methods/assoc types).
- Makes it immediately obvious that `PreambleFailure` is an async callback type.
- Keeps the existing async behavior and mut access to the `TcpStream` intact.
</issue_to_address>
### Comment 5
<location> `src/server/config/preamble.rs:73` </location>
<code_context>
+ /// WireframeServer::new(|| WireframeApp::default()).preamble_timeout(Duration::from_secs(1));
+ /// ```
+ #[must_use]
+ pub fn preamble_timeout(mut self, timeout: Duration) -> Self {
+ let normalised = timeout.max(Duration::from_millis(1));
+ self.preamble_timeout = Some(normalised);
</code_context>
<issue_to_address>
**issue (review_instructions):** Add behavioural and unit tests covering preamble_timeout, including clamping and end-to-end timeout behaviour.
Add tests that exercise this new preamble_timeout feature. At minimum:
- Add unit tests that verify the clamping behaviour (e.g. values < 1ms are normalised to 1ms, values >= 1ms are preserved).
- Add behavioural/integration tests that configure a server with a small preamble_timeout and a client that delays or omits the preamble, and assert that the connection times out as expected and routes through the preamble failure path.
These tests are required by the review rule that all new feature changes must be covered by both behavioural and unit tests.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*`
**Instructions:**
For any new feature or change to an existing feature, both behavioural and unit tests are required.
</details>
</issue_to_address>
### Comment 6
<location> `src/server/connection.rs:68` </location>
<code_context>
T: Preamble,
{
- match read_preamble::<_, T>(&mut stream).await {
+ let preamble_result = match preamble_timeout {
+ Some(limit) => match timeout(limit, read_preamble::<_, T>(&mut stream)).await {
+ Ok(result) => result,
</code_context>
<issue_to_address>
**issue (review_instructions):** Add tests validating that preamble timeouts and asynchronous failure handlers behave correctly and are invoked as documented.
Add tests that directly exercise the new timeout path and asynchronous failure handler:
- Add unit tests around process_stream (or a suitably factored helper) that simulate a timeout (e.g. using a stream that never produces a full preamble) and assert that timeout_error is produced and that the failure handler is called.
- Add behavioural/integration tests using on_preamble_decode_failure that perform async I/O on the TcpStream (e.g. writing a marker response) and assert that the client observes this response before the connection closes.
This ensures the new timeout and async failure handling behaviour is verified in accordance with the requirement for both behavioural and unit tests for new features.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*`
**Instructions:**
For any new feature or change to an existing feature, both behavioural and unit tests are required.
</details>
</issue_to_address>
### Comment 7
<location> `docs/users-guide.md:395` </location>
<code_context>
-success handlers and synchronous failure callbacks, letting you reject
-connections or log decode errors before the application runs.[^20]
+success handlers and asynchronous failure callbacks that receive the stream,
+letting you reply or log decode errors before the application runs. An optional
+`preamble_timeout` caps how long `read_preamble` waits; timeouts use the
+failure callback path.[^20]
</code_context>
<issue_to_address>
**suggestion (review_instructions):** This sentence uses the second-person pronoun "you", which the style instructions forbid.
The documentation guidelines ask to avoid first- and second-person pronouns. Consider rephrasing this sentence to avoid "you", for example:
- "letting the server reply or log decode errors before the application runs", or
- "enabling replies or logging of decode errors before the application runs".
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Avoid 2nd person or 1st person pronouns ("I", "you", "we").
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (12)
docs/hardening-wireframe-a-guide-to-production-resilience.md(1 hunks)docs/preamble-validator.md(1 hunks)docs/server/configuration.md(1 hunks)docs/users-guide.md(2 hunks)src/server/config/binding.rs(2 hunks)src/server/config/mod.rs(1 hunks)src/server/config/preamble.rs(3 hunks)src/server/config/tests.rs(6 hunks)src/server/connection.rs(8 hunks)src/server/mod.rs(4 hunks)src/server/runtime.rs(9 hunks)tests/preamble.rs(5 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.md
⚙️ CodeRabbit configuration file
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/server/configuration.mddocs/preamble-validator.mddocs/users-guide.mddocs/hardening-wireframe-a-guide-to-production-resilience.md
**/*.rs
⚙️ CodeRabbit configuration file
**/*.rs: * Seek to keep the cognitive complexity of functions no more than 9.
- Adhere to single responsibility and CQRS
- Place function attributes after doc comments.
- Do not use
returnin single-line functions.- Move conditionals with >2 branches into a predicate function.
- Avoid
unsafeunless absolutely necessary.- Every module must begin with a
//!doc comment that explains the module's purpose and utility.- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.- Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
- Use
rstestfixtures for shared setup and to avoid repetition between tests.- Replace duplicated tests with
#[rstest(...)]parameterised cases.- Prefer
mockallfor mocks/stubs.- Prefer
.expect()over.unwrap()in tests..expect()and.unwrap()are forbidden outside of tests. Errors must be propagated.- Ensure that any API or behavioural changes are reflected in the documentation in
docs/- Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/- Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
- Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such
- For testing of functionality depending upon environment variables, dependency injection and...
Files:
src/server/config/preamble.rssrc/server/config/mod.rssrc/server/config/binding.rssrc/server/connection.rssrc/server/mod.rssrc/server/runtime.rstests/preamble.rssrc/server/config/tests.rs
🧬 Code graph analysis (5)
src/server/config/preamble.rs (1)
src/server/runtime.rs (1)
normalised(75-82)
src/server/config/binding.rs (1)
src/server/config/preamble.rs (1)
preamble_timeout(73-77)
src/server/connection.rs (2)
src/server/config/preamble.rs (1)
preamble_timeout(73-77)src/server/config/mod.rs (1)
new(68-81)
src/server/runtime.rs (2)
src/server/config/mod.rs (1)
new(68-81)src/server/connection.rs (3)
spawn_connection_task(18-55)spawn_connection_task(155-155)spawn_connection_task(205-205)
src/server/config/tests.rs (3)
src/server/test_util.rs (1)
factory(23-25)src/server/config/mod.rs (1)
new(68-81)src/server/config/preamble.rs (1)
preamble_timeout(73-77)
🔍 Remote MCP Deepwiki
Perfect! Now let me compile the findings into a concise summary.
Summary of Additional Context for PR #398 Review
Preamble Architecture & Related Work
The preamble support mechanism allows for optional connection preamble validation and negotiation as soon as a client connects using bincode to decode any type implementing bincode::BorrowDecode. The read_preamble function is an asynchronous function that reads bytes from the TcpStream and attempts to decode them using bincode::borrow_decode_from_slice, with DecodeError::UnexpectedEnd triggering additional reads until decoding succeeds.
This PR builds upon PR #28's preamble handler introduction. PR #28 introduced the PreambleSuccessHandler trait and PreambleHandler type alias, and updated spawn_connection_task to accept on_success and on_failure handlers as arguments. The current PR extends this by converting the synchronous failure handler to an async variant that receives the stream.
Connection Lifecycle & Error Handling
The connection lifecycle in spawn_connection_task wraps process_stream in std::panic::AssertUnwindSafe and catch_unwind() to isolate panics within individual connection tasks, preventing a single misbehaving connection from crashing the entire server. Following successful preamble processing, the TcpStream is wrapped in a RewindStream to handle any leftover bytes from the preamble reading.
DoS Protection & Resource Management Context
The server employs several DoS protection strategies including per-connection rate limiting using an asynchronous token-bucket algorithm, and memory caps and re-assembly timeouts for message re-assembly. A non-optional, configurable reassembly_timeout is applied to partial messages, preventing buffer filling by abandoned or slow-sent fragments. This PR adds a complementary preamble_timeout mechanism to bound the handshake phase specifically.
Timeout Implementation Patterns
Timeouts in the server are enforced using tokio::time::timeout around critical operations, with I/O timeouts applied during handshake phases to prevent resource exhaustion. The PR's use of tokio::time::timeout around read_preamble aligns with established timeout patterns in the codebase.
Async Callback Patterns
BoxFuture is a type alias for Pin<Box<dyn Future<Output = T> + Send>>, which allows for trait objects of futures enabling dynamic dispatch of asynchronous operations. The PreambleSuccessHandler<T> trait defines a handler that takes the decoded preamble T and a mutable TcpStream, returning a BoxFuture<'a, io::Result<()>> for asynchronous I/O operations. The PR's conversion of the failure handler to match this pattern maintains consistency with existing architectural patterns for async stream operations.
DecodeError Handling
During preamble reading, read_preamble handles DecodeError::UnexpectedEnd by attempting to read more bytes from the stream, while other DecodeError variants are returned directly. A custom DecodeError::Other("preamble too long") is returned if the preamble exceeds MAX_PREAMBLE_LEN (1024 bytes). The PR's timeout mechanism creates an additional error path via a synthetic timeout error, requiring proper integration with this error handling flow.
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Sourcery review
- GitHub Check: build-test
🔇 Additional comments (27)
src/server/config/mod.rs (1)
77-77: LGTM!The new
preamble_timeoutfield is correctly initialized toNoneby default, ensuring backward compatibility. Clients must explicitly opt into timeout enforcement via the builder method.src/server/config/binding.rs (1)
44-44: LGTM!The
preamble_timeoutfield is correctly destructured and propagated through the binding transition fromUnboundtoBoundstate.Also applies to: 61-61
docs/hardening-wireframe-a-guide-to-production-resilience.md (1)
238-245: LGTM!Clear, concise documentation of the preamble timeout mechanism. The explanation correctly describes how timeout expiry routes through the asynchronous failure callback path, enabling protocol-specific error responses before socket closure.
docs/server/configuration.md (1)
85-112: LGTM!Well-documented section covering the preamble timeout and async failure handler. The code example clearly demonstrates the pattern, including the use of
.boxed()to return the requiredBoxFuturefrom the closure.docs/users-guide.md (1)
626-627: LGTM!Footnote correctly updated to reference both implementation files for the preamble handling feature.
src/server/config/tests.rs (5)
9-9: LGTM!Import additions support the new async failure handler tests requiring
io::Error,DecodeError, and Tokio networking types.Also applies to: 17-17, 19-19, 32-32
77-87: LGTM!Good coverage of the preamble timeout configuration, including verification of the 1ms minimum clamp for zero duration inputs.
142-150: LGTM!Failure handler registration correctly updated to the new two-argument async signature
(&DecodeError, &mut TcpStream) -> BoxFuture<Result<(), io::Error>>.
180-190: LGTM!Properly sets up a real TCP connection to test the async failure handler with stream access. The pattern correctly maintains the client connection lifetime via
_client.
214-214: LGTM!Minimal update to align with the new failure handler signature whilst maintaining focus on builder method chaining verification.
src/server/config/preamble.rs (3)
55-77: LGTM! Clean builder method for preamble timeout.The normalisation using
.max(Duration::from_millis(1))aligns with theBackoffConfig::normalised()pattern seen insrc/server/runtime.rs. Documentation is clear and the example demonstrates usage well.
105-132: Well-documented API change.The documentation clearly explains the new async signature and provides a practical example showing how to write a response before connection closure.
38-53: Timeout propagation is correct.The
preamble_timeoutis appropriately preserved when switching preamble types, as it represents server-wide configuration rather than preamble-specific behaviour.src/server/mod.rs (1)
142-146: Clear field documentation.The doc comment accurately describes the timeout semantics and normalisation behaviour.
src/server/connection.rs (4)
68-74: Timeout implementation is sound.The timeout wrapping around
read_preamblecorrectly handles both the timeout case (converting elapsed to a syntheticDecodeError) and the no-timeout case. The pattern aligns with established timeout usage in the codebase.
107-112: Appropriate error representation for timeout.Using
DecodeError::IowithErrorKind::TimedOutis the correct choice - it integrates cleanly with existing decode error handling paths and clearly communicates the timeout nature to failure handlers.
89-103: Correct async failure handler invocation.The handler is awaited and its errors are logged without propagation, which is appropriate for connection cleanup paths. The logging pattern mirrors the success handler path.
155-155: Test call site updated correctly.The additional
Noneparameter forpreamble_timeoutis correctly propagated to the test invocations.src/server/runtime.rs (3)
85-109: Good abstraction for bundling preamble configuration.
PreambleHooks<T>cleanly groups the related preamble callbacks and timeout. The manualCloneandDefaultimplementations are necessary since theArc<dyn ...>fields prevent deriving these traits automatically.
229-248: Clean refactor to usePreambleHooks.The bundling of preamble configuration into a single struct simplifies the API and makes the code more maintainable. The per-worker cloning is appropriate given the
Arc-wrapped handlers.
455-462: Tests correctly updated to usePreambleHooks::default().The test invocations properly use the new struct-based API.
tests/preamble.rs (6)
44-65: Helper function signature correctly updated.The
server_with_handlersfunction now accepts the new async failure handler signature, maintaining consistency with the API changes.
231-266: Good coverage for failure handler stream writing.This test validates the core new capability: writing a response before connection closure on preamble failure.
268-311: Comprehensive timeout behaviour test.The test validates:
- Timeout triggers after configured duration
- Failure handler receives
TimedOuterror kind- Connection closes after timeout
The handling of both graceful close (
Ok(0)) andConnectionResetis appropriate for cross-platform compatibility.
313-348: Good edge case: success handler without failure handler.This validates that the server functions correctly when only a success handler is configured, which is a valid use case.
375-390: Helper function correctly updated to async signature.The
failure_cbhelper now returns the expected boxed future, consistent with thesuccess_cbpattern.
155-166: Test callback correctly adapted to new signature.The failure callback closure is properly updated to the two-argument async form.
…ow with timeout Added a detailed sequence diagram to the preamble-validator documentation. The diagram illustrates the interaction between the accept loop, connection task, preamble decoding, timeout handling, success and failure callbacks, and handoff to the application. This enhances the understanding of the preamble processing flow and timeout management. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
…ameter Changed spawn_connection_task and related calls to accept a single PreambleHooks struct instead of separate on_success, on_failure, and preamble_timeout parameters. This reduces parameter bloat and simplifies passing preamble-related handlers and timeout settings together. Updated relevant tests and documentation to reflect this change. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Summary
Changes
Core Functionality
API/Runtime Changes
Documentation
Tests
📎 Task: https://www.terragonlabs.com/task/dc3fcaf4-3dd0-4e73-8631-40364d875efd