diff --git a/Cargo.lock b/Cargo.lock index daf3fbc4..9d1f1dfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2829,6 +2829,7 @@ dependencies = [ "rstest", "serde", "serial_test", + "thiserror 2.0.12", "tokio", "tokio-util", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0fe37e31..7c34913e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ tracing = { version = "0.1.41", features = ["log", "log-always"] } tracing-subscriber = "0.3" metrics = { version = "0.24.2", optional = true } metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = ["http-listener"] } +thiserror = "2.0.12" [dev-dependencies] rstest = "0.18.2" diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 3b9d837e..6ad8bd67 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -38,13 +38,13 @@ design and possible refinements. See The implementation must satisfy the following core requirements: -| ID | Requirement | +| ID | Requirement | | --- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | -| G1 | Any async task must be able to push frames to a live connection. | -| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | -| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | -| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | -| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | +| G1 | Any async task must be able to push frames to a live connection. | +| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | +| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | +| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | +| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | ## 3. Core Architecture: The Connection Actor @@ -69,7 +69,7 @@ manage two distinct, bounded `tokio::mpsc` channels for pushed frames: messages like heartbeats, session control notifications, or protocol-level pings. -1. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent +2. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent background messages like log forwarding or secondary status updates. The bounded nature of these channels provides an inherent and robust @@ -89,13 +89,13 @@ The polling order will be: 1. **Graceful Shutdown Signal:** The `CancellationToken` will be checked first to ensure immediate reaction to a server-wide shutdown request. -1. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be +2. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be drained next. -1. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be +3. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be processed after all high-priority messages. -1. **Handler Response Stream:** Frames from the active request's +4. **Handler Response Stream:** Frames from the active request's `Response::Stream` will be processed last. ```rust @@ -784,11 +784,11 @@ sequenceDiagram ## 8. Measurable Objectives & Success Criteria -| Category | Objective | Success Metric | +| Category | Objective | Success Metric | | --------------- | ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | -| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | -| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | -| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | -| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | -| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | +| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | +| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | +| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | +| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | +| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | +| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | diff --git a/examples/echo.rs b/examples/echo.rs index 1791492c..89e4f3b9 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -3,18 +3,16 @@ //! The application listens for incoming frames and simply echoes each //! envelope back to the client. -use std::io; - use wireframe::{ app::{Envelope, WireframeApp}, - server::WireframeServer, + server::{ServerError, WireframeServer}, }; #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), ServerError> { let factory = || { WireframeApp::new() - .unwrap() + .expect("failed to create WireframeApp") .route( 1, std::sync::Arc::new(|_: &Envelope| { @@ -24,11 +22,12 @@ async fn main() -> io::Result<()> { }) }), ) - .unwrap() + .expect("failed to register route 1") }; WireframeServer::new(factory) - .bind("127.0.0.1:7878".parse().unwrap())? + .bind("127.0.0.1:7878".parse().expect("invalid socket address"))? .run() - .await + .await?; + Ok(()) } diff --git a/examples/packet_enum.rs b/examples/packet_enum.rs index ba4ecda1..ff53fd8b 100644 --- a/examples/packet_enum.rs +++ b/examples/packet_enum.rs @@ -3,7 +3,7 @@ //! The application defines an enum representing different packet variants and //! shows how to dispatch handlers based on the variant received. -use std::{collections::HashMap, future::Future, io, pin::Pin}; +use std::{collections::HashMap, future::Future, pin::Pin}; use async_trait::async_trait; use wireframe::{ @@ -11,7 +11,7 @@ use wireframe::{ frame::{LengthFormat, LengthPrefixedProcessor}, message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, - server::WireframeServer, + server::{ServerError, WireframeServer}, }; #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] @@ -76,7 +76,7 @@ fn handle_packet(_env: &Envelope) -> Pin + Send>> { } #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), ServerError> { let factory = || { WireframeApp::new() .expect("Failed to create WireframeApp") @@ -92,5 +92,6 @@ async fn main() -> io::Result<()> { WireframeServer::new(factory) .bind(addr.parse().expect("Invalid server address"))? .run() - .await + .await?; + Ok(()) } diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index ed39b217..14a401a3 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -3,7 +3,7 @@ //! Demonstrates custom packet structs and middleware that maps `Ping` to //! `Pong` responses. -use std::{io, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use async_trait::async_trait; use wireframe::{ @@ -11,7 +11,7 @@ use wireframe::{ message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::BincodeSerializer, - server::WireframeServer, + server::{ServerError, WireframeServer}, }; #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] @@ -136,15 +136,14 @@ fn build_app() -> AppResult { } #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), ServerError> { let factory = || build_app().expect("app build failed"); let default_addr = "127.0.0.1:7878"; let addr_str = std::env::args() .nth(1) .unwrap_or_else(|| default_addr.into()); - let addr: SocketAddr = addr_str - .parse() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - WireframeServer::new(factory).bind(addr)?.run().await + let addr: SocketAddr = addr_str.parse().expect("invalid address"); + WireframeServer::new(factory).bind(addr)?.run().await?; + Ok(()) } diff --git a/src/server/config/binding.rs b/src/server/config/binding.rs new file mode 100644 index 00000000..fdadc041 --- /dev/null +++ b/src/server/config/binding.rs @@ -0,0 +1,183 @@ +//! Binding configuration for [`WireframeServer`]. + +use core::marker::PhantomData; +use std::{ + net::{SocketAddr, TcpListener as StdTcpListener}, + sync::Arc, +}; + +use tokio::net::TcpListener; + +use super::{Unbound, WireframeServer}; +use crate::{ + app::WireframeApp, + preamble::Preamble, + server::{Bound, ServerError}, +}; + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, +{ + /// Return `None` as the server is not bound. + /// + /// # Examples + /// + /// ``` + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// assert!( + /// WireframeServer::new(|| WireframeApp::default()) + /// .local_addr() + /// .is_none() + /// ); + /// ``` + #[must_use] + pub const fn local_addr(&self) -> Option { None } + + /// Bind to a fresh address. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns a [`ServerError`] if binding or configuring the listener fails. + pub fn bind(self, addr: SocketAddr) -> Result, ServerError> { + let std = StdTcpListener::bind(addr).map_err(ServerError::Bind)?; + self.bind_listener(std) + } + + /// Bind to an existing `StdTcpListener`. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let std = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap(); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind_listener(std) + /// .expect("bind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns a [`ServerError`] if configuring the listener fails. + pub fn bind_listener( + self, + std: StdTcpListener, + ) -> Result, ServerError> { + std.set_nonblocking(true).map_err(ServerError::Bind)?; + let tokio = TcpListener::from_std(std).map_err(ServerError::Bind)?; + Ok(WireframeServer { + factory: self.factory, + workers: self.workers, + on_preamble_success: self.on_preamble_success, + on_preamble_failure: self.on_preamble_failure, + ready_tx: self.ready_tx, + state: Bound { + listener: Arc::new(tokio), + }, + _preamble: PhantomData, + }) + } +} + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, +{ + /// Returns the bound address, or `None` if retrieving it fails. + /// + /// # Examples + /// + /// ``` + /// use std::net::SocketAddr; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + #[must_use] + pub fn local_addr(&self) -> Option { self.state.listener.local_addr().ok() } + + /// Rebind to a fresh address. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// let addr2 = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = server.bind(addr2).expect("rebind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns a [`ServerError`] if binding or configuring the listener fails. + pub fn bind(self, addr: SocketAddr) -> Result { + let std = StdTcpListener::bind(addr).map_err(ServerError::Bind)?; + self.bind_listener(std) + } + + /// Rebind using an existing `StdTcpListener`. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// let std = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap(); + /// let server = server.bind_listener(std).expect("rebind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns a [`ServerError`] if configuring the listener fails. + pub fn bind_listener(self, std: StdTcpListener) -> Result { + std.set_nonblocking(true).map_err(ServerError::Bind)?; + let tokio = TcpListener::from_std(std).map_err(ServerError::Bind)?; + Ok(WireframeServer { + factory: self.factory, + workers: self.workers, + on_preamble_success: self.on_preamble_success, + on_preamble_failure: self.on_preamble_failure, + ready_tx: self.ready_tx, + state: Bound { + listener: Arc::new(tokio), + }, + _preamble: PhantomData, + }) + } +} diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 39674a0f..fa5ce2da 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -1,37 +1,59 @@ //! Configuration utilities for [`WireframeServer`]. //! //! Provides a fluent builder for configuring `WireframeServer` instances. -//! The builder exposes worker count tuning, preamble callbacks, ready-signal -//! configuration, and TCP binding. The server holds an optional listener so it -//! may be constructed unbound and later bound via [`bind`](WireframeServer::bind). +//! The builder exposes worker count tuning and ready-signal configuration here. +//! TCP binding is provided via the [`binding`](self::binding) module; preamble +//! behaviour is customized via the [`preamble`](self::preamble) module. The +//! server may be constructed unbound and later bound using +//! [`bind`](WireframeServer::bind) or [`bind_listener`](WireframeServer::bind_listener) +//! on [`Unbound`] servers. use core::marker::PhantomData; -use std::{ - io, - net::{SocketAddr, TcpListener as StdTcpListener}, - sync::Arc, - time::Duration, -}; -use bincode::error::DecodeError; -use futures::future::BoxFuture; -use tokio::{net::TcpListener, sync::oneshot}; +use tokio::sync::oneshot; -use super::WireframeServer; +use super::{ServerState, Unbound, WireframeServer}; use crate::{app::WireframeApp, preamble::Preamble}; -#[cfg(test)] -mod tests; +macro_rules! builder_setter { + ($(#[$meta:meta])* $fn:ident, $field:ident, $arg:ident: $ty:ty => $assign:expr) => { + $(#[$meta])* + #[must_use] + pub fn $fn(mut self, $arg: $ty) -> Self { + self.$field = $assign; + self + } + }; +} + +macro_rules! builder_callback { + ($(#[$meta:meta])* $fn:ident, $field:ident, $($bound:tt)*) => { + $(#[$meta])* + #[must_use] + pub fn $fn(mut self, handler: H) -> Self + where + H: $($bound)*, + { + self.$field = Some(std::sync::Arc::new(handler)); + self + } + }; +} -impl WireframeServer +pub mod binding; +pub mod preamble; + +impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { /// Create a new `WireframeServer` from the given application factory. /// /// The worker count defaults to the number of available CPU cores (or 1 if - /// this cannot be determined). The TCP listener is unset; call - /// [`bind`](Self::bind) before running the server. + /// this cannot be determined). The server is initially [`Unbound`]; call + /// [`bind`](WireframeServer::bind) or + /// [`bind_listener`](WireframeServer::bind_listener) + /// (methods provided by the [`binding`](self::binding) module) before running the server. /// /// # Examples /// @@ -50,139 +72,51 @@ where on_preamble_success: None, on_preamble_failure: None, ready_tx: None, - listener: None, - backoff_config: super::runtime::BackoffConfig::default(), + state: Unbound, _preamble: PhantomData, } } } -impl WireframeServer +impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, T: Preamble, + S: ServerState, { - /// Converts the server to use a custom preamble type for incoming - /// connections. - /// - /// Calling this method drops any previously configured preamble decode - /// callbacks. - /// - /// # Examples - /// - /// ``` - /// use bincode::{Decode, Encode}; - /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; - /// - /// #[derive(Encode, Decode)] - /// struct MyPreamble; - /// impl Preamble for MyPreamble {} - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).with_preamble::(); - /// ``` - #[must_use] - pub fn with_preamble

(self) -> WireframeServer - where - P: Preamble, - { - WireframeServer { - factory: self.factory, - workers: self.workers, - on_preamble_success: None, - on_preamble_failure: None, - ready_tx: None, - listener: self.listener, - backoff_config: self.backoff_config, - _preamble: PhantomData, - } - } - - /// Set the number of worker tasks to spawn for the server. - /// - /// # Examples - /// - /// ``` - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).workers(4); - /// assert_eq!(server.worker_count(), 4); - /// ``` - #[must_use] - pub fn workers(mut self, count: usize) -> Self { - self.workers = count.max(1); - self - } - - /// Register a callback invoked when the connection preamble decodes successfully. - /// - /// # Examples - /// - /// ``` - /// use std::sync::Arc; - /// - /// use bincode::{Decode, Encode}; - /// use futures::FutureExt; - /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; - /// - /// #[derive(Encode, Decode)] - /// struct MyPreamble; - /// impl Preamble for MyPreamble {} - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .with_preamble::() - /// .on_preamble_decode_success(|_preamble: &MyPreamble, _stream| async { Ok(()) }.boxed()); - /// ``` - #[must_use] - pub fn on_preamble_decode_success(mut self, handler: H) -> Self - where - H: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> - + Send - + Sync - + 'static, - { - self.on_preamble_success = Some(Arc::new(handler)); - self - } - - /// Register a callback invoked when the connection preamble fails to decode. - /// - /// # Examples - /// - /// ``` - /// use bincode::error::DecodeError; - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( - /// |_error: &DecodeError| { - /// eprintln!("Failed to decode preamble"); - /// }, - /// ); - /// ``` - #[must_use] - pub fn on_preamble_decode_failure(mut self, handler: H) -> Self - where - H: Fn(&DecodeError) + Send + Sync + 'static, - { - self.on_preamble_failure = Some(Arc::new(handler)); - self - } - - /// Configure a channel used to signal when the server is ready to accept connections. - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::oneshot; - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let (tx, _rx) = oneshot::channel(); - /// let server = WireframeServer::new(|| WireframeApp::default()).ready_signal(tx); - /// ``` - #[must_use] - pub fn ready_signal(mut self, tx: oneshot::Sender<()>) -> Self { - self.ready_tx = Some(tx); - self - } + builder_setter!( + /// Set the number of worker tasks to spawn for the server. + /// + /// A minimum of one worker is enforced. + /// + /// # Examples + /// + /// ``` + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).workers(4); + /// assert_eq!(server.worker_count(), 4); + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).workers(0); + /// assert_eq!(server.worker_count(), 1); + /// ``` + workers, workers, count: usize => count.max(1) + ); + + builder_setter!( + /// Configure a channel used to signal when the server is ready to accept connections. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::oneshot; + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let (tx, _rx) = oneshot::channel(); + /// let server = WireframeServer::new(|| WireframeApp::default()).ready_signal(tx); + /// ``` + ready_signal, ready_tx, tx: oneshot::Sender<()> => Some(tx) + ); /// Returns the configured number of worker tasks for the server. /// @@ -197,142 +131,4 @@ where #[inline] #[must_use] pub const fn worker_count(&self) -> usize { self.workers } - - /// Returns the bound address, or `None` if not yet bound. - /// - /// # Examples - /// - /// ``` - /// use std::net::{Ipv4Addr, SocketAddr}; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))) - /// .expect("Failed to bind"); - /// assert!(server.local_addr().is_some()); - /// ``` - #[must_use] - pub fn local_addr(&self) -> Option { - self.listener.as_ref().and_then(|l| l.local_addr().ok()) - } - - /// Bind to a fresh address. - /// - /// # Examples - /// - /// ``` - /// use std::net::{Ipv4Addr, SocketAddr}; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))); - /// assert!(server.is_ok()); - /// ``` - /// - /// # Errors - /// Returns an `io::Error` if binding or configuring the listener fails. - pub fn bind(self, addr: SocketAddr) -> io::Result { - let std = StdTcpListener::bind(addr)?; - self.bind_listener(std) - } - - /// Bind to an existing `StdTcpListener`. - /// - /// # Examples - /// - /// ``` - /// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let std_listener = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))) - /// .expect("Failed to bind std listener"); - /// let server = WireframeServer::new(|| WireframeApp::default()).bind_listener(std_listener); - /// assert!(server.is_ok()); - /// ``` - /// - /// # Errors - /// Returns an `io::Error` if configuring the listener fails. - pub fn bind_listener(mut self, std: StdTcpListener) -> io::Result { - std.set_nonblocking(true)?; - let tokio = TcpListener::from_std(std)?; - self.listener = Some(Arc::new(tokio)); - Ok(self) - } - - /// Configure the exponential backoff parameters for accept loop retries. - /// - /// # Behaviour - /// - If `initial_delay > max_delay`, the values are swapped. - /// - `initial_delay` is clamped to at least 1 millisecond. - /// - `max_delay` is raised to be at least `initial_delay` to preserve invariants. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .accept_backoff(Duration::from_millis(5), Duration::from_millis(500)); - /// ``` - #[must_use] - pub fn accept_backoff(mut self, initial_delay: Duration, max_delay: Duration) -> Self { - let (mut a, mut b) = (initial_delay, max_delay); - if a > b { - core::mem::swap(&mut a, &mut b); - } - let init = a.max(Duration::from_millis(1)); - let maxd = b.max(init); - - self.backoff_config.initial_delay = init; - self.backoff_config.max_delay = maxd; - self - } - - /// Configure the initial delay for accept loop retries. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .accept_initial_delay(Duration::from_millis(5)); - /// ``` - #[must_use] - pub fn accept_initial_delay(mut self, delay: Duration) -> Self { - self.backoff_config.initial_delay = delay.max(Duration::from_millis(1)); - if self.backoff_config.initial_delay > self.backoff_config.max_delay { - self.backoff_config.max_delay = self.backoff_config.initial_delay; - } - self - } - - /// Configure the maximum delay cap for accept loop retries. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .accept_max_delay(Duration::from_millis(500)); - /// ``` - #[must_use] - pub fn accept_max_delay(mut self, delay: Duration) -> Self { - if delay < self.backoff_config.initial_delay { - self.backoff_config.max_delay = self.backoff_config.initial_delay; - } else { - self.backoff_config.max_delay = delay; - } - self - } } diff --git a/src/server/config/preamble.rs b/src/server/config/preamble.rs new file mode 100644 index 00000000..308b8f1c --- /dev/null +++ b/src/server/config/preamble.rs @@ -0,0 +1,99 @@ +//! Preamble configuration for [`WireframeServer`]. + +use core::marker::PhantomData; + +use bincode::error::DecodeError; + +use super::WireframeServer; +use crate::{ + app::WireframeApp, + preamble::Preamble, + server::{PreambleSuccessHandler, ServerState}, +}; + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, + S: ServerState, +{ + /// Converts the server to use a custom preamble type implementing + /// [`crate::preamble::Preamble`] for incoming connections. + /// + /// Calling this method drops any previously configured preamble handlers + /// (both success and failure). + /// + /// # Examples + /// + /// ``` + /// use bincode::{Decode, Encode}; + /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; + /// + /// #[derive(Encode, Decode)] + /// struct MyPreamble; + /// impl Preamble for MyPreamble {} + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).with_preamble::(); + /// ``` + #[must_use] + pub fn with_preamble

(self) -> WireframeServer + where + P: Preamble, + { + WireframeServer { + factory: self.factory, + workers: self.workers, + on_preamble_success: None, + on_preamble_failure: None, + ready_tx: self.ready_tx, + state: self.state, + _preamble: PhantomData, + } + } + + builder_callback!( + /// Register a handler invoked when the connection preamble decodes successfully. + /// + /// The handler must implement [`crate::server::PreambleSuccessHandler`]. + /// + /// # Examples + /// + /// ``` + /// use bincode::{Decode, Encode}; + /// use futures::FutureExt; + /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; + /// + /// #[derive(Encode, Decode)] + /// struct MyPreamble; + /// impl Preamble for MyPreamble {} + /// + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .with_preamble::() + /// .on_preamble_decode_success(|_p: &MyPreamble, _s| async { Ok(()) }.boxed()); + /// ``` + on_preamble_decode_success, + on_preamble_success, + PreambleSuccessHandler + ); + + builder_callback!( + /// Register a handler invoked when the connection preamble fails to decode. + /// + /// The handler receives a [`bincode::error::DecodeError`]. + /// + /// # Examples + /// + /// ``` + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( + /// |_err: &bincode::error::DecodeError| { + /// eprintln!("Failed to decode preamble"); + /// }, + /// ); + /// ``` + on_preamble_decode_failure, + on_preamble_failure, + Fn(&DecodeError) + Send + Sync + 'static + ); +} diff --git a/src/server/config/tests.rs b/src/server/config/tests.rs index c1aa7eb4..177930a1 100644 --- a/src/server/config/tests.rs +++ b/src/server/config/tests.rs @@ -1,7 +1,7 @@ //! Tests for server configuration utilities. //! //! This module exercises the `WireframeServer` builder, covering worker counts, -//! binding behaviour, preamble handling, callback registration, and method +//! binding behaviour, preamble handling, handler registration, and method //! chaining. Fixtures from `test_util` provide shared setup and parameterised //! cases via `rstest`. @@ -95,15 +95,15 @@ async fn test_local_addr_after_bind( #[case("success")] #[case("failure")] #[tokio::test] -async fn test_preamble_callback_registration( +async fn test_preamble_handler_registration( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - #[case] callback_type: &str, + #[case] handler_type: &str, ) { let counter = Arc::new(AtomicUsize::new(0)); let c = counter.clone(); let server = server_with_preamble(factory); - let server = match callback_type { + let server = match handler_type { "success" => server.on_preamble_decode_success(move |_p: &TestPreamble, _| { let c = c.clone(); Box::pin(async move { @@ -114,11 +114,11 @@ async fn test_preamble_callback_registration( "failure" => server.on_preamble_decode_failure(move |_err: &DecodeError| { c.fetch_add(1, Ordering::SeqCst); }), - _ => panic!("Invalid callback type"), + _ => panic!("Invalid handler type"), }; assert_eq!(counter.load(Ordering::SeqCst), 0); - match callback_type { + match handler_type { "success" => assert!(server.on_preamble_success.is_some()), "failure" => assert!(server.on_preamble_failure.is_some()), _ => unreachable!(), @@ -131,8 +131,8 @@ async fn test_method_chaining( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, free_port: SocketAddr, ) { - let callback_invoked = Arc::new(AtomicUsize::new(0)); - let counter = callback_invoked.clone(); + let handler_invoked = Arc::new(AtomicUsize::new(0)); + let counter = handler_invoked.clone(); let server = WireframeServer::new(factory) .workers(2) .with_preamble::() @@ -148,7 +148,7 @@ async fn test_method_chaining( .expect("Failed to bind"); assert_eq!(server.worker_count(), 2); assert!(server.local_addr().is_some()); - assert_eq!(callback_invoked.load(Ordering::SeqCst), 0); + assert_eq!(handler_invoked.load(Ordering::SeqCst), 0); } #[rstest] diff --git a/src/server/connection.rs b/src/server/connection.rs index 503640d8..dae0c8d0 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -6,7 +6,7 @@ use futures::FutureExt; use tokio::net::TcpStream; use tokio_util::task::TaskTracker; -use super::{PreambleCallback, PreambleErrorCallback}; +use super::{PreambleErrorHandler, PreambleHandler}; use crate::{ app::WireframeApp, preamble::{Preamble, read_preamble}, @@ -17,8 +17,8 @@ use crate::{ pub(super) fn spawn_connection_task( stream: TcpStream, factory: F, - on_success: Option>, - on_failure: Option, + on_success: Option>, + on_failure: Option, tracker: &TaskTracker, ) where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, @@ -53,8 +53,8 @@ async fn process_stream( mut stream: TcpStream, peer_addr: Option, factory: F, - on_success: Option>, - on_failure: Option, + on_success: Option>, + on_failure: Option, ) where F: Fn() -> WireframeApp + Send + Sync + 'static, T: Preamble, @@ -64,7 +64,7 @@ async fn process_stream( if let Some(handler) = on_success.as_ref() && let Err(e) = handler(&preamble, &mut stream).await { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + tracing::error!(error = ?e, ?peer_addr, "preamble handler error"); } let stream = RewindStream::new(leftover, stream); let app = (factory)(); diff --git a/src/server/error.rs b/src/server/error.rs new file mode 100644 index 00000000..1da12fae --- /dev/null +++ b/src/server/error.rs @@ -0,0 +1,17 @@ +//! Errors raised by [`WireframeServer`] operations. + +use std::io; + +use thiserror::Error; + +/// Errors that may occur while configuring or running the server. +#[derive(Debug, Error)] +pub enum ServerError { + /// Binding or configuring the listener failed. + #[error("bind error: {0}")] + Bind(#[source] io::Error), + + /// Accepting a connection failed. + #[error("accept error: {0}")] + Accept(#[source] io::Error), +} diff --git a/src/server/mod.rs b/src/server/mod.rs index ee4f0678..1177e77d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -13,37 +13,99 @@ use tokio::{net::TcpListener, sync::oneshot}; use crate::{app::WireframeApp, preamble::Preamble}; -/// Callback invoked when a connection preamble decodes successfully. +/// Handler invoked when a connection preamble decodes successfully. /// -/// The callback may perform asynchronous I/O on the provided stream before the +/// Implementors may perform asynchronous I/O on the provided stream before the /// connection is handed off to [`WireframeApp`]. -pub type PreambleCallback = Arc< - dyn for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> +/// +/// # Examples +/// ```no_run +/// use std::io; +/// +/// use futures::future::BoxFuture; +/// use tokio::net::TcpStream; +/// use wireframe::{app::WireframeApp, server::WireframeServer}; +/// +/// #[derive(bincode::Decode, bincode::BorrowDecode)] +/// struct MyPreamble; +/// +/// let _server = WireframeServer::new(|| WireframeApp::default()) +/// .with_preamble::() +/// .on_preamble_decode_success( +/// |_preamble: &MyPreamble, stream: &mut TcpStream| -> BoxFuture<'_, io::Result<()>> { +/// Box::pin(async move { +/// // Perform any initial handshake here. +/// Ok(()) +/// }) +/// }, +/// ); +/// ``` +pub trait PreambleSuccessHandler: + for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + + Send + + Sync + + 'static +{ +} + +impl PreambleSuccessHandler for F where + F: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + Send - + Sync, ->; + + Sync + + 'static +{ +} -/// Callback invoked when decoding a connection preamble fails. -pub type PreambleErrorCallback = Arc; +/// Handler invoked when a connection preamble decodes successfully. +pub type PreambleHandler = Arc>; -/// Tokio-based server for `WireframeApp` instances. +/// Handler invoked when decoding a connection preamble fails. +pub type PreambleErrorHandler = Arc; + +/// Tokio-based server for [`WireframeApp`] instances. /// -/// `WireframeServer` spawns a worker task per thread. Each worker -/// receives its own `WireframeApp` from the provided factory +/// The server carries a typestate `S` indicating whether it is +/// [`Unbound`] (not yet bound to a TCP listener) or [`Bound`]. New +/// servers start `Unbound` and must call [`binding::WireframeServer::bind`] or +/// [`binding::WireframeServer::bind_listener`] before running. A worker task is spawned per +/// thread; each receives its own `WireframeApp` from the provided factory /// closure. The server listens for a shutdown signal using -/// `tokio::signal::ctrl_c` and notifies all workers to stop -/// accepting new connections. -pub struct WireframeServer +/// `tokio::signal::ctrl_c` and notifies all workers to stop accepting new +/// connections. +/// +/// # Examples +/// ```no_run +/// use wireframe::{ +/// app::WireframeApp, +/// server::{ServerError, WireframeServer}, +/// }; +/// +/// # #[tokio::main] +/// # async fn main() -> Result<(), ServerError> { +/// // Start unbound (S = Unbound) +/// let srv = WireframeServer::new(|| WireframeApp::default()); +/// +/// // Transition to bound (S = Bound) +/// let srv = srv.bind("127.0.0.1:0")?; +/// +/// // Run the server +/// srv.run().await +/// # } +/// ``` +pub struct WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, // `Preamble` covers types implementing `BorrowDecode` for any lifetime, // enabling decoding of borrowed data without external context. + // `()` satisfies this bound via bincode's `BorrowDecode` support for unit, + // so servers default to having no preamble. T: Preamble, + S: ServerState, { pub(crate) factory: F, pub(crate) workers: usize, - pub(crate) on_preamble_success: Option>, - pub(crate) on_preamble_failure: Option, + pub(crate) on_preamble_success: Option>, + pub(crate) on_preamble_failure: Option, /// Channel used to notify when the server is ready. /// /// # Thread Safety @@ -57,15 +119,41 @@ where /// Because only one notification may be sent, a new `ready_tx` must be /// provided each time the server is started. pub(crate) ready_tx: Option>, - pub(crate) listener: Option>, - /// Configuration for exponential backoff when `accept()` fails. - /// Defaults to 10ms initial delay with 1s maximum. - pub(crate) backoff_config: runtime::BackoffConfig, + /// Typestate tracking whether the server has been bound to a listener. + /// [`Unbound`] servers require binding before they can run. + pub(crate) state: S, pub(crate) _preamble: PhantomData, } +/// Marker indicating the server has not yet bound a listener. +#[derive(Debug, Clone, Copy, Default)] +pub struct Unbound; + +/// Marker indicating the server is bound to a TCP listener. +#[derive(Debug, Clone)] +pub struct Bound { + pub(crate) listener: Arc, +} + +/// Trait implemented by [`Unbound`] and [`Bound`] to model binding typestate. +pub trait ServerState: sealed::Sealed {} + +mod sealed { + //! Prevent external implementations of [`ServerState`]. + + pub trait Sealed {} + impl Sealed for super::Unbound {} + impl Sealed for super::Bound {} +} + +impl ServerState for Unbound {} +impl ServerState for Bound {} + mod config; +pub use config::{binding, preamble}; mod connection; +pub mod error; +pub use error::ServerError; mod runtime; /// Re-exported configuration types for server backoff behavior. diff --git a/src/server/runtime.rs b/src/server/runtime.rs index f5345174..d8b12230 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -1,6 +1,6 @@ //! Runtime control for [`WireframeServer`]. -use std::{io, sync::Arc}; +use std::sync::Arc; use futures::Future; use tokio::{ @@ -12,8 +12,10 @@ use tokio::{ use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::{ - PreambleCallback, - PreambleErrorCallback, + Bound, + PreambleErrorHandler, + PreambleHandler, + ServerError, WireframeServer, connection::spawn_connection_task, }; @@ -49,7 +51,7 @@ impl Default for BackoffConfig { } } -impl WireframeServer +impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, T: Preamble, @@ -64,7 +66,7 @@ where /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { + /// # async fn main() -> Result<(), wireframe::server::ServerError> { /// let server = /// WireframeServer::new(|| WireframeApp::default()).bind(([127, 0, 0, 1], 8080).into())?; /// server.run().await?; @@ -74,8 +76,8 @@ where /// /// # Errors /// - /// Returns an [`io::Error`] if accepting a connection fails. - pub async fn run(self) -> io::Result<()> { + /// Returns a [`ServerError`] if runtime initialisation fails. + pub async fn run(self) -> Result<(), ServerError> { self.run_with_shutdown(async { let _ = signal::ctrl_c().await; }) @@ -91,7 +93,7 @@ where /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { + /// # async fn main() -> Result<(), wireframe::server::ServerError> { /// let server = /// WireframeServer::new(|| WireframeApp::default()).bind(([127, 0, 0, 1], 0).into())?; /// @@ -113,8 +115,8 @@ where /// /// # Errors /// - /// Returns an [`io::Error`] if accepting a connection fails during runtime. - pub async fn run_with_shutdown(self, shutdown: S) -> io::Result<()> + /// Returns a [`ServerError`] if runtime initialisation fails. + pub async fn run_with_shutdown(self, shutdown: S) -> Result<(), ServerError> where S: Future + Send, { @@ -124,13 +126,10 @@ where on_preamble_success, on_preamble_failure, ready_tx, - listener, - backoff_config, + state: Bound { listener }, .. } = self; - let listener = listener.ok_or_else(|| io::Error::other("listener not bound"))?; - if let Some(tx) = ready_tx && tx.send(()).is_err() { @@ -154,7 +153,7 @@ where on_failure, token, t, - backoff_config, + BackoffConfig::default(), )); } @@ -172,8 +171,8 @@ where pub(super) async fn accept_loop( listener: Arc, factory: F, - on_success: Option>, - on_failure: Option, + on_success: Option>, + on_failure: Option, shutdown: CancellationToken, tracker: TaskTracker, backoff_config: BackoffConfig, diff --git a/src/server/test_util.rs b/src/server/test_util.rs index e93bc38e..5c12657e 100644 --- a/src/server/test_util.rs +++ b/src/server/test_util.rs @@ -5,9 +5,14 @@ use std::net::{Ipv4Addr, SocketAddr}; use bincode::{Decode, Encode}; use rstest::fixture; -use super::WireframeServer; +use super::{Bound, WireframeServer}; use crate::app::WireframeApp; +#[cfg_attr( + not(test), + expect(dead_code, reason = "Used in builder tests via fixtures") +)] +#[cfg_attr(test, allow(dead_code, reason = "Used in builder tests via fixtures"))] #[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct TestPreamble { pub id: u32, @@ -28,7 +33,7 @@ pub fn free_port() -> SocketAddr { .expect("failed to read free port listener address") } -pub fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer +pub fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { @@ -37,6 +42,11 @@ where .expect("Failed to bind") } +#[cfg_attr( + not(test), + expect(dead_code, reason = "Only used in configuration tests") +)] +#[cfg_attr(test, allow(dead_code, reason = "Only used in configuration tests"))] pub fn server_with_preamble(factory: F) -> WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, diff --git a/tests/preamble.rs b/tests/preamble.rs index d2703b00..1671da06 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -1,6 +1,9 @@ //! Tests for connection preamble reading. -use std::io; +use std::{ + io, + sync::{Arc, Mutex}, +}; use bincode::error::DecodeError; use futures::future::BoxFuture; @@ -170,7 +173,7 @@ async fn server_triggers_expected_callback( .expect("success send"); assert_eq!(preamble.magic, HotlinePreamble::MAGIC); assert!( - timeout(Duration::from_millis(100), failure_rx) + timeout(Duration::from_millis(500), failure_rx) .await .is_err() ); @@ -181,7 +184,7 @@ async fn server_triggers_expected_callback( .expect("timeout waiting for failure") .expect("failure send"); assert!( - timeout(Duration::from_millis(100), success_rx) + timeout(Duration::from_millis(500), success_rx) .await .is_err() ); @@ -216,3 +219,92 @@ async fn success_callback_can_write_response( }) .await; } + +#[derive(Debug, Clone, Copy, PartialEq, Eq, bincode::Encode, bincode::Decode)] +struct OtherPreamble(u8); + +type Holder = Arc>>>; + +fn channel_holder() -> (Holder, oneshot::Receiver<()>) { + let (tx, rx) = oneshot::channel(); + (Arc::new(Mutex::new(Some(tx))), rx) +} + +fn success_cb

( + holder: Arc>>>, +) -> impl for<'a> Fn(&'a P, &'a mut TcpStream) -> BoxFuture<'a, io::Result<()>> + Send + Sync + 'static +{ + move |_, _| { + let holder = holder.clone(); + Box::pin(async move { + if let Some(tx) = holder.lock().expect("lock").take() { + let _ = tx.send(()); + } + Ok(()) + }) + } +} + +fn failure_cb( + holder: Arc>>>, +) -> impl Fn(&DecodeError) + Send + Sync + 'static { + move |_| { + if let Some(tx) = holder.lock().expect("lock").take() { + let _ = tx.send(()); + } + } +} + +#[rstest] +#[tokio::test] +async fn callbacks_dropped_when_overriding_preamble( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let (hotline_success, hotline_success_rx) = channel_holder(); + let (hotline_failure, hotline_failure_rx) = channel_holder(); + let (other_success, other_success_rx) = channel_holder(); + let (other_failure, other_failure_rx) = channel_holder(); + + let server = WireframeServer::new(factory.clone()) + .with_preamble::() + .on_preamble_decode_success(success_cb::(hotline_success.clone())) + .on_preamble_decode_failure(failure_cb(hotline_failure.clone())) + .with_preamble::() + .on_preamble_decode_success(success_cb::(other_success.clone())) + .on_preamble_decode_failure(failure_cb(other_failure.clone())); + + with_running_server(server, |addr| async move { + let mut stream = TcpStream::connect(addr).await.expect("connect failed"); + let config = bincode::config::standard() + .with_big_endian() + .with_fixed_int_encoding(); + let mut bytes = bincode::encode_to_vec(OtherPreamble(1), config).expect("encode preamble"); + bytes.resize(8, 0); + stream.write_all(&bytes).await.expect("write failed"); + stream.shutdown().await.expect("shutdown failed"); + // Wait for the success callback before shutting down the server. + timeout(Duration::from_secs(1), other_success_rx) + .await + .expect("timeout waiting for other success") + .expect("other success send"); + }) + .await; + assert!( + timeout(Duration::from_millis(500), other_failure_rx) + .await + .is_err(), + "other failure callback invoked", + ); + assert!( + timeout(Duration::from_millis(500), hotline_success_rx) + .await + .is_err(), + "hotline success callback invoked", + ); + assert!( + timeout(Duration::from_millis(500), hotline_failure_rx) + .await + .is_err(), + "hotline failure callback invoked", + ); +}