From 3daf18a3df628cde9416229c5b58c7365153ad9d Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 6 Aug 2025 23:26:41 +0100 Subject: [PATCH 01/20] Split config into submodules --- Cargo.lock | 1 + Cargo.toml | 1 + .../asynchronous-outbound-messaging-design.md | 34 +-- examples/echo.rs | 7 +- examples/metadata_routing.rs | 2 +- examples/packet_enum.rs | 7 +- examples/ping_pong.rs | 11 +- src/server/config/binding.rs | 177 ++++++++++++++ src/server/config/mod.rs | 218 +----------------- src/server/config/preamble.rs | 101 ++++++++ src/server/error.rs | 13 ++ src/server/mod.rs | 23 +- src/server/runtime.rs | 19 +- src/server/test_util.rs | 6 +- 14 files changed, 366 insertions(+), 254 deletions(-) create mode 100644 src/server/config/binding.rs create mode 100644 src/server/config/preamble.rs create mode 100644 src/server/error.rs diff --git a/Cargo.lock b/Cargo.lock index daf3fbc4..81d2f0c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2829,6 +2829,7 @@ dependencies = [ "rstest", "serde", "serial_test", + "thiserror 1.0.69", "tokio", "tokio-util", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0fe37e31..cd50facd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ tracing = { version = "0.1.41", features = ["log", "log-always"] } tracing-subscriber = "0.3" metrics = { version = "0.24.2", optional = true } metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = ["http-listener"] } +thiserror = "1.0.69" [dev-dependencies] rstest = "0.18.2" diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 3b9d837e..6ad8bd67 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -38,13 +38,13 @@ design and possible refinements. See The implementation must satisfy the following core requirements: -| ID | Requirement | +| ID | Requirement | | --- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | -| G1 | Any async task must be able to push frames to a live connection. | -| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | -| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | -| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | -| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | +| G1 | Any async task must be able to push frames to a live connection. | +| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | +| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | +| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | +| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | ## 3. Core Architecture: The Connection Actor @@ -69,7 +69,7 @@ manage two distinct, bounded `tokio::mpsc` channels for pushed frames: messages like heartbeats, session control notifications, or protocol-level pings. -1. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent +2. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent background messages like log forwarding or secondary status updates. The bounded nature of these channels provides an inherent and robust @@ -89,13 +89,13 @@ The polling order will be: 1. **Graceful Shutdown Signal:** The `CancellationToken` will be checked first to ensure immediate reaction to a server-wide shutdown request. -1. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be +2. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be drained next. -1. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be +3. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be processed after all high-priority messages. -1. **Handler Response Stream:** Frames from the active request's +4. **Handler Response Stream:** Frames from the active request's `Response::Stream` will be processed last. ```rust @@ -784,11 +784,11 @@ sequenceDiagram ## 8. Measurable Objectives & Success Criteria -| Category | Objective | Success Metric | +| Category | Objective | Success Metric | | --------------- | ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | -| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | -| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | -| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | -| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | -| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | +| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | +| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | +| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | +| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | +| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | +| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | diff --git a/examples/echo.rs b/examples/echo.rs index 1791492c..63afb400 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -3,15 +3,13 @@ //! The application listens for incoming frames and simply echoes each //! envelope back to the client. -use std::io; - use wireframe::{ app::{Envelope, WireframeApp}, server::WireframeServer, }; #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), Box> { let factory = || { WireframeApp::new() .unwrap() @@ -30,5 +28,6 @@ async fn main() -> io::Result<()> { WireframeServer::new(factory) .bind("127.0.0.1:7878".parse().unwrap())? .run() - .await + .await?; + Ok(()) } diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index b2f77eca..db80325a 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,7 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), Box> { let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor::default()) diff --git a/examples/packet_enum.rs b/examples/packet_enum.rs index ba4ecda1..934321ec 100644 --- a/examples/packet_enum.rs +++ b/examples/packet_enum.rs @@ -3,7 +3,7 @@ //! The application defines an enum representing different packet variants and //! shows how to dispatch handlers based on the variant received. -use std::{collections::HashMap, future::Future, io, pin::Pin}; +use std::{collections::HashMap, future::Future, pin::Pin}; use async_trait::async_trait; use wireframe::{ @@ -76,7 +76,7 @@ fn handle_packet(_env: &Envelope) -> Pin + Send>> { } #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), Box> { let factory = || { WireframeApp::new() .expect("Failed to create WireframeApp") @@ -92,5 +92,6 @@ async fn main() -> io::Result<()> { WireframeServer::new(factory) .bind(addr.parse().expect("Invalid server address"))? .run() - .await + .await?; + Ok(()) } diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index ed39b217..00992a11 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -3,7 +3,7 @@ //! Demonstrates custom packet structs and middleware that maps `Ping` to //! `Pong` responses. -use std::{io, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use async_trait::async_trait; use wireframe::{ @@ -136,15 +136,14 @@ fn build_app() -> AppResult { } #[tokio::main] -async fn main() -> io::Result<()> { +async fn main() -> Result<(), Box> { let factory = || build_app().expect("app build failed"); let default_addr = "127.0.0.1:7878"; let addr_str = std::env::args() .nth(1) .unwrap_or_else(|| default_addr.into()); - let addr: SocketAddr = addr_str - .parse() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; - WireframeServer::new(factory).bind(addr)?.run().await + let addr: SocketAddr = addr_str.parse()?; + WireframeServer::new(factory).bind(addr)?.run().await?; + Ok(()) } diff --git a/src/server/config/binding.rs b/src/server/config/binding.rs new file mode 100644 index 00000000..6a77346b --- /dev/null +++ b/src/server/config/binding.rs @@ -0,0 +1,177 @@ +//! Binding configuration for [`WireframeServer`]. + +use core::marker::PhantomData; +use std::{ + io, + net::{SocketAddr, TcpListener as StdTcpListener}, + sync::Arc, +}; + +use tokio::net::TcpListener; + +use super::{Bound, Unbound, WireframeServer}; +use crate::{app::WireframeApp, preamble::Preamble}; + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, +{ + /// Return `None` as the server is not bound. + /// + /// # Examples + /// + /// ``` + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// assert!( + /// WireframeServer::new(|| WireframeApp::default()) + /// .local_addr() + /// .is_none() + /// ); + /// ``` + #[must_use] + pub const fn local_addr(&self) -> Option { None } + + /// Bind to a fresh address. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns an `io::Error` if binding or configuring the listener fails. + pub fn bind(self, addr: SocketAddr) -> io::Result> { + let std = StdTcpListener::bind(addr)?; + self.bind_listener(std) + } + + /// Bind to an existing `StdTcpListener`. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let std = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap(); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind_listener(std) + /// .expect("bind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns an `io::Error` if configuring the listener fails. + pub fn bind_listener(self, std: StdTcpListener) -> io::Result> { + std.set_nonblocking(true)?; + let tokio = TcpListener::from_std(std)?; + Ok(WireframeServer { + factory: self.factory, + workers: self.workers, + on_preamble_success: self.on_preamble_success, + on_preamble_failure: self.on_preamble_failure, + ready_tx: self.ready_tx, + state: Bound { + listener: Arc::new(tokio), + }, + _preamble: PhantomData, + }) + } +} + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, +{ + /// Returns the bound address, or `None` if retrieving it fails. + /// + /// # Examples + /// + /// ``` + /// use std::net::SocketAddr; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + #[must_use] + pub fn local_addr(&self) -> Option { self.state.listener.local_addr().ok() } + + /// Rebind to a fresh address. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// let addr2 = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = server.bind(addr2).expect("rebind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns an `io::Error` if binding or configuring the listener fails. + pub fn bind(self, addr: SocketAddr) -> io::Result { + let std = StdTcpListener::bind(addr)?; + self.bind_listener(std) + } + + /// Rebind using an existing `StdTcpListener`. + /// + /// # Examples + /// + /// ``` + /// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; + /// + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .bind(addr) + /// .expect("bind failed"); + /// let std = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap(); + /// let server = server.bind_listener(std).expect("rebind failed"); + /// assert!(server.local_addr().is_some()); + /// ``` + /// + /// # Errors + /// Returns an `io::Error` if configuring the listener fails. + pub fn bind_listener(self, std: StdTcpListener) -> io::Result { + std.set_nonblocking(true)?; + let tokio = TcpListener::from_std(std)?; + Ok(WireframeServer { + factory: self.factory, + workers: self.workers, + on_preamble_success: self.on_preamble_success, + on_preamble_failure: self.on_preamble_failure, + ready_tx: self.ready_tx, + state: Bound { + listener: Arc::new(tokio), + }, + _preamble: PhantomData, + }) + } +} diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 39674a0f..83b92b4a 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -1,102 +1,36 @@ //! Configuration utilities for [`WireframeServer`]. //! //! Provides a fluent builder for configuring `WireframeServer` instances. -//! The builder exposes worker count tuning, preamble callbacks, ready-signal -//! configuration, and TCP binding. The server holds an optional listener so it -//! may be constructed unbound and later bound via [`bind`](WireframeServer::bind). +//! The builder exposes worker count tuning, preamble callbacks, +//! ready-signal configuration, and TCP binding. The server may be constructed +//! unbound and later bound via [`bind`](WireframeServer::bind). use core::marker::PhantomData; -use std::{ - io, - net::{SocketAddr, TcpListener as StdTcpListener}, - sync::Arc, - time::Duration, -}; +use tokio::sync::oneshot; -use bincode::error::DecodeError; -use futures::future::BoxFuture; -use tokio::{net::TcpListener, sync::oneshot}; +use super::{ServerState, Unbound, WireframeServer}; +use crate::{app::WireframeApp, preamble::Preamble, server::Bound}; -use super::WireframeServer; -use crate::{app::WireframeApp, preamble::Preamble}; +mod binding; +mod preamble; -#[cfg(test)] -mod tests; - -impl WireframeServer +impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { - /// Create a new `WireframeServer` from the given application factory. - /// - /// The worker count defaults to the number of available CPU cores (or 1 if - /// this cannot be determined). The TCP listener is unset; call - /// [`bind`](Self::bind) before running the server. - /// - /// # Examples - /// - /// ``` - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()); - /// assert!(server.worker_count() >= 1); - /// ``` #[must_use] pub fn new(factory: F) -> Self { let workers = std::thread::available_parallelism().map_or(1, std::num::NonZeroUsize::get); - Self { - factory, - workers, - on_preamble_success: None, - on_preamble_failure: None, - ready_tx: None, - listener: None, - backoff_config: super::runtime::BackoffConfig::default(), - _preamble: PhantomData, - } + Self { factory, workers, on_preamble_success: None, on_preamble_failure: None, ready_tx: None, state: Unbound, _preamble: PhantomData } } } -impl WireframeServer +impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, T: Preamble, + S: ServerState, { - /// Converts the server to use a custom preamble type for incoming - /// connections. - /// - /// Calling this method drops any previously configured preamble decode - /// callbacks. - /// - /// # Examples - /// - /// ``` - /// use bincode::{Decode, Encode}; - /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; - /// - /// #[derive(Encode, Decode)] - /// struct MyPreamble; - /// impl Preamble for MyPreamble {} - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).with_preamble::(); - /// ``` - #[must_use] - pub fn with_preamble

(self) -> WireframeServer - where - P: Preamble, - { - WireframeServer { - factory: self.factory, - workers: self.workers, - on_preamble_success: None, - on_preamble_failure: None, - ready_tx: None, - listener: self.listener, - backoff_config: self.backoff_config, - _preamble: PhantomData, - } - } - /// Set the number of worker tasks to spawn for the server. /// /// # Examples @@ -113,60 +47,6 @@ where self } - /// Register a callback invoked when the connection preamble decodes successfully. - /// - /// # Examples - /// - /// ``` - /// use std::sync::Arc; - /// - /// use bincode::{Decode, Encode}; - /// use futures::FutureExt; - /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; - /// - /// #[derive(Encode, Decode)] - /// struct MyPreamble; - /// impl Preamble for MyPreamble {} - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .with_preamble::() - /// .on_preamble_decode_success(|_preamble: &MyPreamble, _stream| async { Ok(()) }.boxed()); - /// ``` - #[must_use] - pub fn on_preamble_decode_success(mut self, handler: H) -> Self - where - H: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> - + Send - + Sync - + 'static, - { - self.on_preamble_success = Some(Arc::new(handler)); - self - } - - /// Register a callback invoked when the connection preamble fails to decode. - /// - /// # Examples - /// - /// ``` - /// use bincode::error::DecodeError; - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( - /// |_error: &DecodeError| { - /// eprintln!("Failed to decode preamble"); - /// }, - /// ); - /// ``` - #[must_use] - pub fn on_preamble_decode_failure(mut self, handler: H) -> Self - where - H: Fn(&DecodeError) + Send + Sync + 'static, - { - self.on_preamble_failure = Some(Arc::new(handler)); - self - } - /// Configure a channel used to signal when the server is ready to accept connections. /// /// # Examples @@ -261,78 +141,4 @@ where self.listener = Some(Arc::new(tokio)); Ok(self) } - - /// Configure the exponential backoff parameters for accept loop retries. - /// - /// # Behaviour - /// - If `initial_delay > max_delay`, the values are swapped. - /// - `initial_delay` is clamped to at least 1 millisecond. - /// - `max_delay` is raised to be at least `initial_delay` to preserve invariants. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .accept_backoff(Duration::from_millis(5), Duration::from_millis(500)); - /// ``` - #[must_use] - pub fn accept_backoff(mut self, initial_delay: Duration, max_delay: Duration) -> Self { - let (mut a, mut b) = (initial_delay, max_delay); - if a > b { - core::mem::swap(&mut a, &mut b); - } - let init = a.max(Duration::from_millis(1)); - let maxd = b.max(init); - - self.backoff_config.initial_delay = init; - self.backoff_config.max_delay = maxd; - self - } - - /// Configure the initial delay for accept loop retries. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .accept_initial_delay(Duration::from_millis(5)); - /// ``` - #[must_use] - pub fn accept_initial_delay(mut self, delay: Duration) -> Self { - self.backoff_config.initial_delay = delay.max(Duration::from_millis(1)); - if self.backoff_config.initial_delay > self.backoff_config.max_delay { - self.backoff_config.max_delay = self.backoff_config.initial_delay; - } - self - } - - /// Configure the maximum delay cap for accept loop retries. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .accept_max_delay(Duration::from_millis(500)); - /// ``` - #[must_use] - pub fn accept_max_delay(mut self, delay: Duration) -> Self { - if delay < self.backoff_config.initial_delay { - self.backoff_config.max_delay = self.backoff_config.initial_delay; - } else { - self.backoff_config.max_delay = delay; - } - self - } } diff --git a/src/server/config/preamble.rs b/src/server/config/preamble.rs new file mode 100644 index 00000000..286a364d --- /dev/null +++ b/src/server/config/preamble.rs @@ -0,0 +1,101 @@ +//! Preamble configuration for [`WireframeServer`]. + +use core::marker::PhantomData; +use std::{io, sync::Arc}; + +use bincode::error::DecodeError; +use futures::future::BoxFuture; + +use super::WireframeServer; +use crate::{app::WireframeApp, preamble::Preamble, server::ServerState}; + +impl WireframeServer +where + F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, + T: Preamble, + S: ServerState, +{ + /// Converts the server to use a custom preamble type for incoming connections. + /// + /// Calling this method drops any previously configured preamble decode callbacks. + /// + /// # Examples + /// + /// ``` + /// use bincode::{Decode, Encode}; + /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; + /// + /// #[derive(Encode, Decode)] + /// struct MyPreamble; + /// impl Preamble for MyPreamble {} + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).with_preamble::(); + /// ``` + #[must_use] + pub fn with_preamble

(self) -> WireframeServer + where + P: Preamble, + { + WireframeServer { + factory: self.factory, + workers: self.workers, + on_preamble_success: None, + on_preamble_failure: None, + ready_tx: self.ready_tx, + state: self.state, + _preamble: PhantomData, + } + } + + /// Register a callback invoked when the connection preamble decodes successfully. + /// + /// # Examples + /// + /// ``` + /// use bincode::{Decode, Encode}; + /// use futures::FutureExt; + /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; + /// + /// #[derive(Encode, Decode)] + /// struct MyPreamble; + /// impl Preamble for MyPreamble {} + /// + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .with_preamble::() + /// .on_preamble_decode_success(|_p: &MyPreamble, _s| async { Ok(()) }.boxed()); + /// ``` + #[must_use] + pub fn on_preamble_decode_success(mut self, handler: H) -> Self + where + H: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + + Send + + Sync + + 'static, + { + self.on_preamble_success = Some(Arc::new(handler)); + self + } + + /// Register a callback invoked when the connection preamble fails to decode. + /// + /// # Examples + /// + /// ``` + /// use bincode::error::DecodeError; + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( + /// |_err: &DecodeError| { + /// eprintln!("Failed to decode preamble"); + /// }, + /// ); + /// ``` + #[must_use] + pub fn on_preamble_decode_failure(mut self, handler: H) -> Self + where + H: Fn(&DecodeError) + Send + Sync + 'static, + { + self.on_preamble_failure = Some(Arc::new(handler)); + self + } +} diff --git a/src/server/error.rs b/src/server/error.rs new file mode 100644 index 00000000..dd68a6d2 --- /dev/null +++ b/src/server/error.rs @@ -0,0 +1,13 @@ +//! Errors raised by [`WireframeServer`] operations. + +use std::io; + +use thiserror::Error; + +/// Errors that may occur while running the server. +#[derive(Debug, Error)] +pub enum ServerError { + /// Accepting a connection failed. + #[error("accept error: {0}")] + Accept(#[from] io::Error), +} diff --git a/src/server/mod.rs b/src/server/mod.rs index ee4f0678..88233aa4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -33,12 +33,13 @@ pub type PreambleErrorCallback = Arc; /// closure. The server listens for a shutdown signal using /// `tokio::signal::ctrl_c` and notifies all workers to stop /// accepting new connections. -pub struct WireframeServer +pub struct WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, // `Preamble` covers types implementing `BorrowDecode` for any lifetime, // enabling decoding of borrowed data without external context. T: Preamble, + S: ServerState, { pub(crate) factory: F, pub(crate) workers: usize, @@ -57,15 +58,27 @@ where /// Because only one notification may be sent, a new `ready_tx` must be /// provided each time the server is started. pub(crate) ready_tx: Option>, - pub(crate) listener: Option>, - /// Configuration for exponential backoff when `accept()` fails. - /// Defaults to 10ms initial delay with 1s maximum. - pub(crate) backoff_config: runtime::BackoffConfig, + pub(crate) state: S, pub(crate) _preamble: PhantomData, } +/// Marker type for an unbound server. +pub struct Unbound; + +/// Marker type for a bound server holding a listener. +pub struct Bound { + pub(crate) listener: Arc, +} + +/// Trait implemented by binding state markers. +pub trait ServerState {} + +impl ServerState for Unbound {} +impl ServerState for Bound {} + mod config; mod connection; +pub mod error; mod runtime; /// Re-exported configuration types for server backoff behavior. diff --git a/src/server/runtime.rs b/src/server/runtime.rs index f5345174..6ba498e8 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -1,6 +1,6 @@ //! Runtime control for [`WireframeServer`]. -use std::{io, sync::Arc}; +use std::sync::Arc; use futures::Future; use tokio::{ @@ -12,10 +12,12 @@ use tokio::{ use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::{ + Bound, PreambleCallback, PreambleErrorCallback, WireframeServer, connection::spawn_connection_task, + error::ServerError, }; use crate::{app::WireframeApp, preamble::Preamble}; @@ -49,7 +51,7 @@ impl Default for BackoffConfig { } } -impl WireframeServer +impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, T: Preamble, @@ -74,8 +76,8 @@ where /// /// # Errors /// - /// Returns an [`io::Error`] if accepting a connection fails. - pub async fn run(self) -> io::Result<()> { + /// Returns a [`ServerError`] if runtime initialisation fails. + pub async fn run(self) -> Result<(), ServerError> { self.run_with_shutdown(async { let _ = signal::ctrl_c().await; }) @@ -113,8 +115,8 @@ where /// /// # Errors /// - /// Returns an [`io::Error`] if accepting a connection fails during runtime. - pub async fn run_with_shutdown(self, shutdown: S) -> io::Result<()> + /// Returns a [`ServerError`] if runtime initialisation fails. + pub async fn run_with_shutdown(self, shutdown: S) -> Result<(), ServerError> where S: Future + Send, { @@ -124,13 +126,10 @@ where on_preamble_success, on_preamble_failure, ready_tx, - listener, - backoff_config, + state: Bound { listener }, .. } = self; - let listener = listener.ok_or_else(|| io::Error::other("listener not bound"))?; - if let Some(tx) = ready_tx && tx.send(()).is_err() { diff --git a/src/server/test_util.rs b/src/server/test_util.rs index e93bc38e..f8f5fea4 100644 --- a/src/server/test_util.rs +++ b/src/server/test_util.rs @@ -5,9 +5,10 @@ use std::net::{Ipv4Addr, SocketAddr}; use bincode::{Decode, Encode}; use rstest::fixture; -use super::WireframeServer; +use super::{Bound, WireframeServer}; use crate::app::WireframeApp; +#[allow(dead_code, reason = "Used in builder tests via fixtures")] #[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct TestPreamble { pub id: u32, @@ -28,7 +29,7 @@ pub fn free_port() -> SocketAddr { .expect("failed to read free port listener address") } -pub fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer +pub fn bind_server(factory: F, addr: SocketAddr) -> WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { @@ -37,6 +38,7 @@ where .expect("Failed to bind") } +#[allow(dead_code, reason = "Only used in configuration tests")] pub fn server_with_preamble(factory: F) -> WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, From d9cba4e70bda656573398dc1e331f7311c904f08 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 7 Aug 2025 16:54:01 +0100 Subject: [PATCH 02/20] Add conditional dead code expectations --- src/server/test_util.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/server/test_util.rs b/src/server/test_util.rs index f8f5fea4..5c12657e 100644 --- a/src/server/test_util.rs +++ b/src/server/test_util.rs @@ -8,7 +8,11 @@ use rstest::fixture; use super::{Bound, WireframeServer}; use crate::app::WireframeApp; -#[allow(dead_code, reason = "Used in builder tests via fixtures")] +#[cfg_attr( + not(test), + expect(dead_code, reason = "Used in builder tests via fixtures") +)] +#[cfg_attr(test, allow(dead_code, reason = "Used in builder tests via fixtures"))] #[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct TestPreamble { pub id: u32, @@ -38,7 +42,11 @@ where .expect("Failed to bind") } -#[allow(dead_code, reason = "Only used in configuration tests")] +#[cfg_attr( + not(test), + expect(dead_code, reason = "Only used in configuration tests") +)] +#[cfg_attr(test, allow(dead_code, reason = "Only used in configuration tests"))] pub fn server_with_preamble(factory: F) -> WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, From ebefe6bf72a16ac6791f598d044fedd374dbaf47 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 7 Aug 2025 20:50:53 +0100 Subject: [PATCH 03/20] Unify server errors and clarify docs --- Cargo.lock | 2 +- Cargo.toml | 2 +- examples/echo.rs | 4 +- examples/metadata_routing.rs | 2 +- examples/packet_enum.rs | 4 +- examples/ping_pong.rs | 6 +-- src/server/config/binding.rs | 34 ++++++++-------- src/server/config/mod.rs | 78 +++++++----------------------------- src/server/error.rs | 8 +++- src/server/runtime.rs | 4 +- 10 files changed, 51 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81d2f0c7..9d1f1dfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2829,7 +2829,7 @@ dependencies = [ "rstest", "serde", "serial_test", - "thiserror 1.0.69", + "thiserror 2.0.12", "tokio", "tokio-util", "tracing", diff --git a/Cargo.toml b/Cargo.toml index cd50facd..7c34913e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ tracing = { version = "0.1.41", features = ["log", "log-always"] } tracing-subscriber = "0.3" metrics = { version = "0.24.2", optional = true } metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = ["http-listener"] } -thiserror = "1.0.69" +thiserror = "2.0.12" [dev-dependencies] rstest = "0.18.2" diff --git a/examples/echo.rs b/examples/echo.rs index 63afb400..3629f877 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -5,11 +5,11 @@ use wireframe::{ app::{Envelope, WireframeApp}, - server::WireframeServer, + server::{WireframeServer, error::ServerError}, }; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), ServerError> { let factory = || { WireframeApp::new() .unwrap() diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index db80325a..b2f77eca 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,7 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> io::Result<()> { let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor::default()) diff --git a/examples/packet_enum.rs b/examples/packet_enum.rs index 934321ec..8a05d1c9 100644 --- a/examples/packet_enum.rs +++ b/examples/packet_enum.rs @@ -11,7 +11,7 @@ use wireframe::{ frame::{LengthFormat, LengthPrefixedProcessor}, message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, - server::WireframeServer, + server::{WireframeServer, error::ServerError}, }; #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] @@ -76,7 +76,7 @@ fn handle_packet(_env: &Envelope) -> Pin + Send>> { } #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), ServerError> { let factory = || { WireframeApp::new() .expect("Failed to create WireframeApp") diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index 00992a11..d5e272f1 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -11,7 +11,7 @@ use wireframe::{ message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::BincodeSerializer, - server::WireframeServer, + server::{WireframeServer, error::ServerError}, }; #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] @@ -136,14 +136,14 @@ fn build_app() -> AppResult { } #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), ServerError> { let factory = || build_app().expect("app build failed"); let default_addr = "127.0.0.1:7878"; let addr_str = std::env::args() .nth(1) .unwrap_or_else(|| default_addr.into()); - let addr: SocketAddr = addr_str.parse()?; + let addr: SocketAddr = addr_str.parse().expect("invalid address"); WireframeServer::new(factory).bind(addr)?.run().await?; Ok(()) } diff --git a/src/server/config/binding.rs b/src/server/config/binding.rs index 6a77346b..65dba059 100644 --- a/src/server/config/binding.rs +++ b/src/server/config/binding.rs @@ -2,7 +2,6 @@ use core::marker::PhantomData; use std::{ - io, net::{SocketAddr, TcpListener as StdTcpListener}, sync::Arc, }; @@ -10,7 +9,7 @@ use std::{ use tokio::net::TcpListener; use super::{Bound, Unbound, WireframeServer}; -use crate::{app::WireframeApp, preamble::Preamble}; +use crate::{app::WireframeApp, preamble::Preamble, server::error::ServerError}; impl WireframeServer where @@ -50,9 +49,9 @@ where /// ``` /// /// # Errors - /// Returns an `io::Error` if binding or configuring the listener fails. - pub fn bind(self, addr: SocketAddr) -> io::Result> { - let std = StdTcpListener::bind(addr)?; + /// Returns a [`ServerError`] if binding or configuring the listener fails. + pub fn bind(self, addr: SocketAddr) -> Result, ServerError> { + let std = StdTcpListener::bind(addr).map_err(ServerError::Bind)?; self.bind_listener(std) } @@ -73,10 +72,13 @@ where /// ``` /// /// # Errors - /// Returns an `io::Error` if configuring the listener fails. - pub fn bind_listener(self, std: StdTcpListener) -> io::Result> { - std.set_nonblocking(true)?; - let tokio = TcpListener::from_std(std)?; + /// Returns a [`ServerError`] if configuring the listener fails. + pub fn bind_listener( + self, + std: StdTcpListener, + ) -> Result, ServerError> { + std.set_nonblocking(true).map_err(ServerError::Bind)?; + let tokio = TcpListener::from_std(std).map_err(ServerError::Bind)?; Ok(WireframeServer { factory: self.factory, workers: self.workers, @@ -133,9 +135,9 @@ where /// ``` /// /// # Errors - /// Returns an `io::Error` if binding or configuring the listener fails. - pub fn bind(self, addr: SocketAddr) -> io::Result { - let std = StdTcpListener::bind(addr)?; + /// Returns a [`ServerError`] if binding or configuring the listener fails. + pub fn bind(self, addr: SocketAddr) -> Result { + let std = StdTcpListener::bind(addr).map_err(ServerError::Bind)?; self.bind_listener(std) } @@ -158,10 +160,10 @@ where /// ``` /// /// # Errors - /// Returns an `io::Error` if configuring the listener fails. - pub fn bind_listener(self, std: StdTcpListener) -> io::Result { - std.set_nonblocking(true)?; - let tokio = TcpListener::from_std(std)?; + /// Returns a [`ServerError`] if configuring the listener fails. + pub fn bind_listener(self, std: StdTcpListener) -> Result { + std.set_nonblocking(true).map_err(ServerError::Bind)?; + let tokio = TcpListener::from_std(std).map_err(ServerError::Bind)?; Ok(WireframeServer { factory: self.factory, workers: self.workers, diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 83b92b4a..016e0861 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -9,7 +9,7 @@ use core::marker::PhantomData; use tokio::sync::oneshot; use super::{ServerState, Unbound, WireframeServer}; -use crate::{app::WireframeApp, preamble::Preamble, server::Bound}; +use crate::{app::WireframeApp, preamble::Preamble}; mod binding; mod preamble; @@ -18,6 +18,20 @@ impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, { + /// Create a new `WireframeServer` from the given application factory. + /// + /// The worker count defaults to the number of available CPU cores (or 1 if + /// this cannot be determined). The server is initially unbound; call + /// `bind` (available on unbound servers) before running the server. + /// + /// # Examples + /// + /// ``` + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let server = WireframeServer::new(|| WireframeApp::default()); + /// assert!(server.worker_count() >= 1); + /// ``` #[must_use] pub fn new(factory: F) -> Self { let workers = std::thread::available_parallelism().map_or(1, std::num::NonZeroUsize::get); @@ -78,67 +92,5 @@ where #[must_use] pub const fn worker_count(&self) -> usize { self.workers } - /// Returns the bound address, or `None` if not yet bound. - /// - /// # Examples - /// - /// ``` - /// use std::net::{Ipv4Addr, SocketAddr}; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))) - /// .expect("Failed to bind"); - /// assert!(server.local_addr().is_some()); - /// ``` - #[must_use] - pub fn local_addr(&self) -> Option { - self.listener.as_ref().and_then(|l| l.local_addr().ok()) - } - /// Bind to a fresh address. - /// - /// # Examples - /// - /// ``` - /// use std::net::{Ipv4Addr, SocketAddr}; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))); - /// assert!(server.is_ok()); - /// ``` - /// - /// # Errors - /// Returns an `io::Error` if binding or configuring the listener fails. - pub fn bind(self, addr: SocketAddr) -> io::Result { - let std = StdTcpListener::bind(addr)?; - self.bind_listener(std) - } - - /// Bind to an existing `StdTcpListener`. - /// - /// # Examples - /// - /// ``` - /// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; - /// - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let std_listener = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))) - /// .expect("Failed to bind std listener"); - /// let server = WireframeServer::new(|| WireframeApp::default()).bind_listener(std_listener); - /// assert!(server.is_ok()); - /// ``` - /// - /// # Errors - /// Returns an `io::Error` if configuring the listener fails. - pub fn bind_listener(mut self, std: StdTcpListener) -> io::Result { - std.set_nonblocking(true)?; - let tokio = TcpListener::from_std(std)?; - self.listener = Some(Arc::new(tokio)); - Ok(self) - } } diff --git a/src/server/error.rs b/src/server/error.rs index dd68a6d2..1da12fae 100644 --- a/src/server/error.rs +++ b/src/server/error.rs @@ -4,10 +4,14 @@ use std::io; use thiserror::Error; -/// Errors that may occur while running the server. +/// Errors that may occur while configuring or running the server. #[derive(Debug, Error)] pub enum ServerError { + /// Binding or configuring the listener failed. + #[error("bind error: {0}")] + Bind(#[source] io::Error), + /// Accepting a connection failed. #[error("accept error: {0}")] - Accept(#[from] io::Error), + Accept(#[source] io::Error), } diff --git a/src/server/runtime.rs b/src/server/runtime.rs index 6ba498e8..6f15eae2 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -66,7 +66,7 @@ where /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { + /// # async fn main() -> Result<(), wireframe::server::error::ServerError> { /// let server = /// WireframeServer::new(|| WireframeApp::default()).bind(([127, 0, 0, 1], 8080).into())?; /// server.run().await?; @@ -93,7 +93,7 @@ where /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { + /// # async fn main() -> Result<(), wireframe::server::error::ServerError> { /// let server = /// WireframeServer::new(|| WireframeApp::default()).bind(([127, 0, 0, 1], 0).into())?; /// From 65618b648bfccfd45f81bb26d273e0514dff72ac Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 7 Aug 2025 23:42:52 +0100 Subject: [PATCH 04/20] Clarify binding module in server config docs --- src/server/config/mod.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 016e0861..2e27f54f 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -2,8 +2,10 @@ //! //! Provides a fluent builder for configuring `WireframeServer` instances. //! The builder exposes worker count tuning, preamble callbacks, -//! ready-signal configuration, and TCP binding. The server may be constructed -//! unbound and later bound via [`bind`](WireframeServer::bind). +//! ready-signal configuration, and TCP binding via the `binding` +//! module. Preamble behaviour is customised through the `preamble` +//! module. The server may be constructed unbound and later bound using +//! the `binding` module's `bind` functions. use core::marker::PhantomData; use tokio::sync::oneshot; @@ -11,8 +13,8 @@ use tokio::sync::oneshot; use super::{ServerState, Unbound, WireframeServer}; use crate::{app::WireframeApp, preamble::Preamble}; -mod binding; -mod preamble; +pub mod binding; +pub mod preamble; impl WireframeServer where @@ -22,7 +24,8 @@ where /// /// The worker count defaults to the number of available CPU cores (or 1 if /// this cannot be determined). The server is initially unbound; call - /// `bind` (available on unbound servers) before running the server. + /// `bind` (provided by the `binding` module for unbound servers) before + /// running the server. /// /// # Examples /// From 164e7424438fdb5d95d9752a3bee45028ac16ec5 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 00:44:35 +0100 Subject: [PATCH 05/20] Link config docs to binding and preamble --- src/server/config/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 2e27f54f..dde9f4bd 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -2,10 +2,10 @@ //! //! Provides a fluent builder for configuring `WireframeServer` instances. //! The builder exposes worker count tuning, preamble callbacks, -//! ready-signal configuration, and TCP binding via the `binding` -//! module. Preamble behaviour is customised through the `preamble` +//! ready-signal configuration, and TCP binding via the [`binding`](self::binding) +//! module. Preamble behaviour is customised through the [`preamble`](self::preamble) //! module. The server may be constructed unbound and later bound using -//! the `binding` module's `bind` functions. +//! the [`bind`](WireframeServer::bind) functions on unbound servers. use core::marker::PhantomData; use tokio::sync::oneshot; @@ -24,8 +24,8 @@ where /// /// The worker count defaults to the number of available CPU cores (or 1 if /// this cannot be determined). The server is initially unbound; call - /// `bind` (provided by the `binding` module for unbound servers) before - /// running the server. + /// [`bind`](WireframeServer::bind) on the unbound server (method provided by the + /// [`binding`](self::binding) module) before running the server. /// /// # Examples /// From 4be1d5b24d742c5acb080be41ad565d988ac467e Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 11:51:33 +0100 Subject: [PATCH 06/20] Refactor config builder with macros --- src/server/config/mod.rs | 101 +++++++++++++++++++++------------- src/server/config/preamble.rs | 91 ++++++++++++++---------------- 2 files changed, 103 insertions(+), 89 deletions(-) diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index dde9f4bd..1818fdbb 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -1,11 +1,12 @@ //! Configuration utilities for [`WireframeServer`]. //! //! Provides a fluent builder for configuring `WireframeServer` instances. -//! The builder exposes worker count tuning, preamble callbacks, -//! ready-signal configuration, and TCP binding via the [`binding`](self::binding) -//! module. Preamble behaviour is customised through the [`preamble`](self::preamble) -//! module. The server may be constructed unbound and later bound using -//! the [`bind`](WireframeServer::bind) functions on unbound servers. +//! The builder exposes worker count tuning and ready-signal configuration here. +//! TCP binding is provided via the [`binding`](self::binding) module; preamble +//! behaviour is customized via the [`preamble`](self::preamble) module. The +//! server may be constructed unbound and later bound using +//! [`bind`](WireframeServer::bind) or [`bind_listener`](WireframeServer::bind_listener) +//! on [`Unbound`] servers. use core::marker::PhantomData; use tokio::sync::oneshot; @@ -13,6 +14,31 @@ use tokio::sync::oneshot; use super::{ServerState, Unbound, WireframeServer}; use crate::{app::WireframeApp, preamble::Preamble}; +macro_rules! builder_setter { + ($(#[$meta:meta])* $fn:ident, $field:ident, $arg:ident: $ty:ty => $assign:expr) => { + $(#[$meta])* + #[must_use] + pub fn $fn(mut self, $arg: $ty) -> Self { + self.$field = $assign; + self + } + }; +} + +macro_rules! builder_callback { + ($(#[$meta:meta])* $fn:ident, $field:ident, $($bound:tt)*) => { + $(#[$meta])* + #[must_use] + pub fn $fn(mut self, handler: H) -> Self + where + H: $($bound)*, + { + self.$field = Some(Arc::new(handler)); + self + } + }; +} + pub mod binding; pub mod preamble; @@ -23,9 +49,10 @@ where /// Create a new `WireframeServer` from the given application factory. /// /// The worker count defaults to the number of available CPU cores (or 1 if - /// this cannot be determined). The server is initially unbound; call - /// [`bind`](WireframeServer::bind) on the unbound server (method provided by the - /// [`binding`](self::binding) module) before running the server. + /// this cannot be determined). The server is initially [`Unbound`]; call + /// [`bind`](WireframeServer::bind) or + /// [`bind_listener`](WireframeServer::bind_listener) + /// (methods provided by the [`binding`](self::binding) module) before running the server. /// /// # Examples /// @@ -48,38 +75,34 @@ where T: Preamble, S: ServerState, { - /// Set the number of worker tasks to spawn for the server. - /// - /// # Examples - /// - /// ``` - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).workers(4); - /// assert_eq!(server.worker_count(), 4); - /// ``` - #[must_use] - pub fn workers(mut self, count: usize) -> Self { - self.workers = count.max(1); - self - } + builder_setter!( + /// Set the number of worker tasks to spawn for the server. + /// + /// # Examples + /// + /// ``` + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).workers(4); + /// assert_eq!(server.worker_count(), 4); + /// ``` + workers, workers, count: usize => count.max(1) + ); - /// Configure a channel used to signal when the server is ready to accept connections. - /// - /// # Examples - /// - /// ``` - /// use tokio::sync::oneshot; - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let (tx, _rx) = oneshot::channel(); - /// let server = WireframeServer::new(|| WireframeApp::default()).ready_signal(tx); - /// ``` - #[must_use] - pub fn ready_signal(mut self, tx: oneshot::Sender<()>) -> Self { - self.ready_tx = Some(tx); - self - } + builder_setter!( + /// Configure a channel used to signal when the server is ready to accept connections. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::oneshot; + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let (tx, _rx) = oneshot::channel(); + /// let server = WireframeServer::new(|| WireframeApp::default()).ready_signal(tx); + /// ``` + ready_signal, ready_tx, tx: oneshot::Sender<()> => Some(tx) + ); /// Returns the configured number of worker tasks for the server. /// diff --git a/src/server/config/preamble.rs b/src/server/config/preamble.rs index 286a364d..bb5c5892 100644 --- a/src/server/config/preamble.rs +++ b/src/server/config/preamble.rs @@ -47,55 +47,46 @@ where } } - /// Register a callback invoked when the connection preamble decodes successfully. - /// - /// # Examples - /// - /// ``` - /// use bincode::{Decode, Encode}; - /// use futures::FutureExt; - /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; - /// - /// #[derive(Encode, Decode)] - /// struct MyPreamble; - /// impl Preamble for MyPreamble {} - /// - /// let server = WireframeServer::new(|| WireframeApp::default()) - /// .with_preamble::() - /// .on_preamble_decode_success(|_p: &MyPreamble, _s| async { Ok(()) }.boxed()); - /// ``` - #[must_use] - pub fn on_preamble_decode_success(mut self, handler: H) -> Self - where - H: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> - + Send - + Sync - + 'static, - { - self.on_preamble_success = Some(Arc::new(handler)); - self - } + builder_callback!( + /// Register a callback invoked when the connection preamble decodes successfully. + /// + /// # Examples + /// + /// ``` + /// use bincode::{Decode, Encode}; + /// use futures::FutureExt; + /// use wireframe::{app::WireframeApp, preamble::Preamble, server::WireframeServer}; + /// + /// #[derive(Encode, Decode)] + /// struct MyPreamble; + /// impl Preamble for MyPreamble {} + /// + /// let server = WireframeServer::new(|| WireframeApp::default()) + /// .with_preamble::() + /// .on_preamble_decode_success(|_p: &MyPreamble, _s| async { Ok(()) }.boxed()); + /// ``` + on_preamble_decode_success, + on_preamble_success, + for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + Send + Sync + 'static + ); - /// Register a callback invoked when the connection preamble fails to decode. - /// - /// # Examples - /// - /// ``` - /// use bincode::error::DecodeError; - /// use wireframe::{app::WireframeApp, server::WireframeServer}; - /// - /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( - /// |_err: &DecodeError| { - /// eprintln!("Failed to decode preamble"); - /// }, - /// ); - /// ``` - #[must_use] - pub fn on_preamble_decode_failure(mut self, handler: H) -> Self - where - H: Fn(&DecodeError) + Send + Sync + 'static, - { - self.on_preamble_failure = Some(Arc::new(handler)); - self - } + builder_callback!( + /// Register a callback invoked when the connection preamble fails to decode. + /// + /// # Examples + /// + /// ``` + /// use bincode::error::DecodeError; + /// use wireframe::{app::WireframeApp, server::WireframeServer}; + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( + /// |_err: &DecodeError| { + /// eprintln!("Failed to decode preamble"); + /// }, + /// ); + /// ``` + on_preamble_decode_failure, + on_preamble_failure, + Fn(&DecodeError) + Send + Sync + 'static + ); } From 9ae09980256ca917a238c5f5307f2c78c40584e2 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 12:51:38 +0100 Subject: [PATCH 07/20] Refine preamble builder and tests --- src/server/config/mod.rs | 7 +++- src/server/config/preamble.rs | 15 +++++--- src/server/mod.rs | 16 +++++---- tests/preamble.rs | 64 +++++++++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 13 deletions(-) diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 1818fdbb..97e4b988 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -33,7 +33,7 @@ macro_rules! builder_callback { where H: $($bound)*, { - self.$field = Some(Arc::new(handler)); + self.$field = Some(std::sync::Arc::new(handler)); self } }; @@ -78,6 +78,8 @@ where builder_setter!( /// Set the number of worker tasks to spawn for the server. /// + /// A minimum of one worker is enforced. + /// /// # Examples /// /// ``` @@ -85,6 +87,9 @@ where /// /// let server = WireframeServer::new(|| WireframeApp::default()).workers(4); /// assert_eq!(server.worker_count(), 4); + /// + /// let server = WireframeServer::new(|| WireframeApp::default()).workers(0); + /// assert_eq!(server.worker_count(), 1); /// ``` workers, workers, count: usize => count.max(1) ); diff --git a/src/server/config/preamble.rs b/src/server/config/preamble.rs index bb5c5892..c4c32e1f 100644 --- a/src/server/config/preamble.rs +++ b/src/server/config/preamble.rs @@ -1,7 +1,7 @@ //! Preamble configuration for [`WireframeServer`]. use core::marker::PhantomData; -use std::{io, sync::Arc}; +use std::io; use bincode::error::DecodeError; use futures::future::BoxFuture; @@ -15,9 +15,11 @@ where T: Preamble, S: ServerState, { - /// Converts the server to use a custom preamble type for incoming connections. + /// Converts the server to use a custom preamble type implementing + /// [`crate::preamble::Preamble`] for incoming connections. /// - /// Calling this method drops any previously configured preamble decode callbacks. + /// Calling this method drops any previously configured preamble decode callbacks + /// (both success and failure). /// /// # Examples /// @@ -50,6 +52,8 @@ where builder_callback!( /// Register a callback invoked when the connection preamble decodes successfully. /// + /// The handler must implement [`crate::server::PreambleSuccessHandler`]. + /// /// # Examples /// /// ``` @@ -73,14 +77,15 @@ where builder_callback!( /// Register a callback invoked when the connection preamble fails to decode. /// + /// The handler receives a [`bincode::error::DecodeError`]. + /// /// # Examples /// /// ``` - /// use bincode::error::DecodeError; /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// let server = WireframeServer::new(|| WireframeApp::default()).on_preamble_decode_failure( - /// |_err: &DecodeError| { + /// |_err: &bincode::error::DecodeError| { /// eprintln!("Failed to decode preamble"); /// }, /// ); diff --git a/src/server/mod.rs b/src/server/mod.rs index 88233aa4..a26e197e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -13,15 +13,17 @@ use tokio::{net::TcpListener, sync::oneshot}; use crate::{app::WireframeApp, preamble::Preamble}; -/// Callback invoked when a connection preamble decodes successfully. +/// Handler invoked when a connection preamble decodes successfully. /// -/// The callback may perform asynchronous I/O on the provided stream before the +/// The handler may perform asynchronous I/O on the provided stream before the /// connection is handed off to [`WireframeApp`]. -pub type PreambleCallback = Arc< - dyn for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> - + Send - + Sync, ->; +pub type PreambleSuccessHandler = dyn for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + + Send + + Sync + + 'static; + +/// Callback invoked when a connection preamble decodes successfully. +pub type PreambleCallback = Arc>; /// Callback invoked when decoding a connection preamble fails. pub type PreambleErrorCallback = Arc; diff --git a/tests/preamble.rs b/tests/preamble.rs index d2703b00..f1dc752e 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -216,3 +216,67 @@ async fn success_callback_can_write_response( }) .await; } + +#[derive(Debug, Clone, Copy, PartialEq, Eq, bincode::Encode, bincode::Decode)] +struct OtherPreamble(u8); + +#[rstest] +#[tokio::test] +async fn callbacks_dropped_when_overriding_preamble( + factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, +) { + let (hotline_success_tx, hotline_success_rx) = tokio::sync::oneshot::channel::<()>(); + let (hotline_failure_tx, hotline_failure_rx) = tokio::sync::oneshot::channel::<()>(); + + let hotline_success_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(hotline_success_tx))); + let hotline_failure_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(hotline_failure_tx))); + + let server = WireframeServer::new(factory.clone()) + .with_preamble::() + .on_preamble_decode_success({ + let hotline_success_tx = hotline_success_tx.clone(); + move |_, _| { + let hotline_success_tx = hotline_success_tx.clone(); + Box::pin(async move { + if let Some(tx) = hotline_success_tx.lock().expect("lock").take() { + let _ = tx.send(()); + } + Ok(()) + }) + } + }) + .on_preamble_decode_failure({ + let hotline_failure_tx = hotline_failure_tx.clone(); + move |_| { + if let Some(tx) = hotline_failure_tx.lock().expect("lock").take() { + let _ = tx.send(()); + } + } + }) + .with_preamble::(); + + with_running_server(server, |addr| async move { + let mut stream = TcpStream::connect(addr).await.expect("connect failed"); + let config = bincode::config::standard() + .with_big_endian() + .with_fixed_int_encoding(); + let bytes = bincode::encode_to_vec(OtherPreamble(1), config).unwrap(); + stream.write_all(&bytes).await.expect("write failed"); + stream.shutdown().await.expect("shutdown failed"); + tokio::time::sleep(Duration::from_millis(50)).await; + }) + .await; + + assert!( + timeout(Duration::from_millis(100), hotline_success_rx) + .await + .is_err(), + "hotline success callback invoked", + ); + assert!( + timeout(Duration::from_millis(100), hotline_failure_rx) + .await + .is_err(), + "hotline failure callback invoked", + ); +} From c15dc0b3918440eb70a42836c6e54f4ee5eb1bdc Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 14:34:51 +0100 Subject: [PATCH 08/20] Expose preamble handler trait and re-export config --- src/server/config/preamble.rs | 10 ++++--- src/server/mod.rs | 26 ++++++++++++++----- tests/preamble.rs | 49 ++++++++++++++++++++++++++++++----- 3 files changed, 67 insertions(+), 18 deletions(-) diff --git a/src/server/config/preamble.rs b/src/server/config/preamble.rs index c4c32e1f..309789c7 100644 --- a/src/server/config/preamble.rs +++ b/src/server/config/preamble.rs @@ -1,13 +1,15 @@ //! Preamble configuration for [`WireframeServer`]. use core::marker::PhantomData; -use std::io; use bincode::error::DecodeError; -use futures::future::BoxFuture; use super::WireframeServer; -use crate::{app::WireframeApp, preamble::Preamble, server::ServerState}; +use crate::{ + app::WireframeApp, + preamble::Preamble, + server::{PreambleSuccessHandler, ServerState}, +}; impl WireframeServer where @@ -71,7 +73,7 @@ where /// ``` on_preamble_decode_success, on_preamble_success, - for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + Send + Sync + 'static + PreambleSuccessHandler ); builder_callback!( diff --git a/src/server/mod.rs b/src/server/mod.rs index a26e197e..a508879a 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -15,15 +15,26 @@ use crate::{app::WireframeApp, preamble::Preamble}; /// Handler invoked when a connection preamble decodes successfully. /// -/// The handler may perform asynchronous I/O on the provided stream before the +/// Implementors may perform asynchronous I/O on the provided stream before the /// connection is handed off to [`WireframeApp`]. -pub type PreambleSuccessHandler = dyn for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> +pub trait PreambleSuccessHandler: + for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + Send + Sync - + 'static; + + 'static +{ +} + +impl PreambleSuccessHandler for F where + F: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + + Send + + Sync + + 'static +{ +} /// Callback invoked when a connection preamble decodes successfully. -pub type PreambleCallback = Arc>; +pub type PreambleCallback = Arc>; /// Callback invoked when decoding a connection preamble fails. pub type PreambleErrorCallback = Arc; @@ -64,21 +75,22 @@ where pub(crate) _preamble: PhantomData, } -/// Marker type for an unbound server. +/// Marker indicating the server has not yet bound a listener. pub struct Unbound; -/// Marker type for a bound server holding a listener. +/// Marker indicating the server is bound to a TCP listener. pub struct Bound { pub(crate) listener: Arc, } -/// Trait implemented by binding state markers. +/// Trait implemented by [`Unbound`] and [`Bound`] to model binding typestate. pub trait ServerState {} impl ServerState for Unbound {} impl ServerState for Bound {} mod config; +pub use config::{binding, preamble}; mod connection; pub mod error; mod runtime; diff --git a/tests/preamble.rs b/tests/preamble.rs index f1dc752e..930cddd7 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -170,7 +170,7 @@ async fn server_triggers_expected_callback( .expect("success send"); assert_eq!(preamble.magic, HotlinePreamble::MAGIC); assert!( - timeout(Duration::from_millis(100), failure_rx) + timeout(Duration::from_millis(500), failure_rx) .await .is_err() ); @@ -181,7 +181,7 @@ async fn server_triggers_expected_callback( .expect("timeout waiting for failure") .expect("failure send"); assert!( - timeout(Duration::from_millis(100), success_rx) + timeout(Duration::from_millis(500), success_rx) .await .is_err() ); @@ -227,9 +227,13 @@ async fn callbacks_dropped_when_overriding_preamble( ) { let (hotline_success_tx, hotline_success_rx) = tokio::sync::oneshot::channel::<()>(); let (hotline_failure_tx, hotline_failure_rx) = tokio::sync::oneshot::channel::<()>(); + let (other_success_tx, other_success_rx) = tokio::sync::oneshot::channel::<()>(); + let (other_failure_tx, other_failure_rx) = tokio::sync::oneshot::channel::<()>(); let hotline_success_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(hotline_success_tx))); let hotline_failure_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(hotline_failure_tx))); + let other_success_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(other_success_tx))); + let other_failure_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(other_failure_tx))); let server = WireframeServer::new(factory.clone()) .with_preamble::() @@ -253,28 +257,59 @@ async fn callbacks_dropped_when_overriding_preamble( } } }) - .with_preamble::(); + .with_preamble::() + .on_preamble_decode_success({ + let other_success_tx = other_success_tx.clone(); + move |_: &OtherPreamble, _| { + let other_success_tx = other_success_tx.clone(); + Box::pin(async move { + if let Some(tx) = other_success_tx.lock().expect("lock").take() { + let _ = tx.send(()); + } + Ok(()) + }) + } + }) + .on_preamble_decode_failure({ + let other_failure_tx = other_failure_tx.clone(); + move |_: &DecodeError| { + if let Some(tx) = other_failure_tx.lock().expect("lock").take() { + let _ = tx.send(()); + } + } + }); with_running_server(server, |addr| async move { let mut stream = TcpStream::connect(addr).await.expect("connect failed"); let config = bincode::config::standard() .with_big_endian() .with_fixed_int_encoding(); - let bytes = bincode::encode_to_vec(OtherPreamble(1), config).unwrap(); + let mut bytes = bincode::encode_to_vec(OtherPreamble(1), config).unwrap(); + bytes.resize(8, 0); stream.write_all(&bytes).await.expect("write failed"); stream.shutdown().await.expect("shutdown failed"); - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_secs(1)).await; }) .await; + timeout(Duration::from_secs(1), other_success_rx) + .await + .expect("timeout waiting for other success") + .expect("other success send"); + assert!( + timeout(Duration::from_millis(500), other_failure_rx) + .await + .is_err(), + "other failure callback invoked", + ); assert!( - timeout(Duration::from_millis(100), hotline_success_rx) + timeout(Duration::from_millis(500), hotline_success_rx) .await .is_err(), "hotline success callback invoked", ); assert!( - timeout(Duration::from_millis(100), hotline_failure_rx) + timeout(Duration::from_millis(500), hotline_failure_rx) .await .is_err(), "hotline failure callback invoked", From b8828a86a8007bb8725a9610e750be04ccbaaa10 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 15:34:45 +0100 Subject: [PATCH 09/20] Deflake preamble override test --- src/server/mod.rs | 2 + tests/preamble.rs | 107 ++++++++++++++++++++++------------------------ 2 files changed, 52 insertions(+), 57 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index a508879a..242a1ae1 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -51,6 +51,8 @@ where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, // `Preamble` covers types implementing `BorrowDecode` for any lifetime, // enabling decoding of borrowed data without external context. + // `()` already satisfies this bound via `bincode`, so servers default to + // having no preamble. T: Preamble, S: ServerState, { diff --git a/tests/preamble.rs b/tests/preamble.rs index 930cddd7..1671da06 100644 --- a/tests/preamble.rs +++ b/tests/preamble.rs @@ -1,6 +1,9 @@ //! Tests for connection preamble reading. -use std::io; +use std::{ + io, + sync::{Arc, Mutex}, +}; use bincode::error::DecodeError; use futures::future::BoxFuture; @@ -220,82 +223,72 @@ async fn success_callback_can_write_response( #[derive(Debug, Clone, Copy, PartialEq, Eq, bincode::Encode, bincode::Decode)] struct OtherPreamble(u8); +type Holder = Arc>>>; + +fn channel_holder() -> (Holder, oneshot::Receiver<()>) { + let (tx, rx) = oneshot::channel(); + (Arc::new(Mutex::new(Some(tx))), rx) +} + +fn success_cb

( + holder: Arc>>>, +) -> impl for<'a> Fn(&'a P, &'a mut TcpStream) -> BoxFuture<'a, io::Result<()>> + Send + Sync + 'static +{ + move |_, _| { + let holder = holder.clone(); + Box::pin(async move { + if let Some(tx) = holder.lock().expect("lock").take() { + let _ = tx.send(()); + } + Ok(()) + }) + } +} + +fn failure_cb( + holder: Arc>>>, +) -> impl Fn(&DecodeError) + Send + Sync + 'static { + move |_| { + if let Some(tx) = holder.lock().expect("lock").take() { + let _ = tx.send(()); + } + } +} + #[rstest] #[tokio::test] async fn callbacks_dropped_when_overriding_preamble( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, ) { - let (hotline_success_tx, hotline_success_rx) = tokio::sync::oneshot::channel::<()>(); - let (hotline_failure_tx, hotline_failure_rx) = tokio::sync::oneshot::channel::<()>(); - let (other_success_tx, other_success_rx) = tokio::sync::oneshot::channel::<()>(); - let (other_failure_tx, other_failure_rx) = tokio::sync::oneshot::channel::<()>(); - - let hotline_success_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(hotline_success_tx))); - let hotline_failure_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(hotline_failure_tx))); - let other_success_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(other_success_tx))); - let other_failure_tx = std::sync::Arc::new(std::sync::Mutex::new(Some(other_failure_tx))); + let (hotline_success, hotline_success_rx) = channel_holder(); + let (hotline_failure, hotline_failure_rx) = channel_holder(); + let (other_success, other_success_rx) = channel_holder(); + let (other_failure, other_failure_rx) = channel_holder(); let server = WireframeServer::new(factory.clone()) .with_preamble::() - .on_preamble_decode_success({ - let hotline_success_tx = hotline_success_tx.clone(); - move |_, _| { - let hotline_success_tx = hotline_success_tx.clone(); - Box::pin(async move { - if let Some(tx) = hotline_success_tx.lock().expect("lock").take() { - let _ = tx.send(()); - } - Ok(()) - }) - } - }) - .on_preamble_decode_failure({ - let hotline_failure_tx = hotline_failure_tx.clone(); - move |_| { - if let Some(tx) = hotline_failure_tx.lock().expect("lock").take() { - let _ = tx.send(()); - } - } - }) + .on_preamble_decode_success(success_cb::(hotline_success.clone())) + .on_preamble_decode_failure(failure_cb(hotline_failure.clone())) .with_preamble::() - .on_preamble_decode_success({ - let other_success_tx = other_success_tx.clone(); - move |_: &OtherPreamble, _| { - let other_success_tx = other_success_tx.clone(); - Box::pin(async move { - if let Some(tx) = other_success_tx.lock().expect("lock").take() { - let _ = tx.send(()); - } - Ok(()) - }) - } - }) - .on_preamble_decode_failure({ - let other_failure_tx = other_failure_tx.clone(); - move |_: &DecodeError| { - if let Some(tx) = other_failure_tx.lock().expect("lock").take() { - let _ = tx.send(()); - } - } - }); + .on_preamble_decode_success(success_cb::(other_success.clone())) + .on_preamble_decode_failure(failure_cb(other_failure.clone())); with_running_server(server, |addr| async move { let mut stream = TcpStream::connect(addr).await.expect("connect failed"); let config = bincode::config::standard() .with_big_endian() .with_fixed_int_encoding(); - let mut bytes = bincode::encode_to_vec(OtherPreamble(1), config).unwrap(); + let mut bytes = bincode::encode_to_vec(OtherPreamble(1), config).expect("encode preamble"); bytes.resize(8, 0); stream.write_all(&bytes).await.expect("write failed"); stream.shutdown().await.expect("shutdown failed"); - tokio::time::sleep(Duration::from_secs(1)).await; + // Wait for the success callback before shutting down the server. + timeout(Duration::from_secs(1), other_success_rx) + .await + .expect("timeout waiting for other success") + .expect("other success send"); }) .await; - - timeout(Duration::from_secs(1), other_success_rx) - .await - .expect("timeout waiting for other success") - .expect("other success send"); assert!( timeout(Duration::from_millis(500), other_failure_rx) .await From 026f722aa707b9c49fa8c968e22c070404d7bea5 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 16:28:41 +0100 Subject: [PATCH 10/20] Document preamble success handler usage --- src/server/mod.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/server/mod.rs b/src/server/mod.rs index 242a1ae1..e6ada313 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,6 +17,29 @@ use crate::{app::WireframeApp, preamble::Preamble}; /// /// Implementors may perform asynchronous I/O on the provided stream before the /// connection is handed off to [`WireframeApp`]. +/// +/// # Examples +/// ``` +/// use std::io; +/// +/// use futures::future::BoxFuture; +/// use tokio::net::TcpStream; +/// use wireframe::{app::WireframeApp, server::WireframeServer}; +/// +/// #[derive(bincode::Decode, bincode::BorrowDecode)] +/// struct MyPreamble; +/// +/// let _server = WireframeServer::new(|| WireframeApp::default()) +/// .with_preamble::() +/// .on_preamble_decode_success( +/// |_preamble: &MyPreamble, stream: &mut TcpStream| -> BoxFuture<'_, io::Result<()>> { +/// Box::pin(async move { +/// // Perform any initial handshake here. +/// Ok(()) +/// }) +/// }, +/// ); +/// ``` pub trait PreambleSuccessHandler: for<'a> Fn(&'a T, &'a mut tokio::net::TcpStream) -> BoxFuture<'a, io::Result<()>> + Send From 1071e96ace0c5ea2a5e077167ad5359423036972 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 19:13:16 +0100 Subject: [PATCH 11/20] Document typestate field and derive markers --- src/server/mod.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index e6ada313..be08b2a9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -62,13 +62,16 @@ pub type PreambleCallback = Arc>; /// Callback invoked when decoding a connection preamble fails. pub type PreambleErrorCallback = Arc; -/// Tokio-based server for `WireframeApp` instances. +/// Tokio-based server for [`WireframeApp`] instances. /// -/// `WireframeServer` spawns a worker task per thread. Each worker -/// receives its own `WireframeApp` from the provided factory +/// The server carries a typestate `S` indicating whether it is +/// [`Unbound`] (not yet bound to a TCP listener) or [`Bound`]. New +/// servers start `Unbound` and must call [`binding::bind`] or +/// [`binding::bind_listener`] before running. A worker task is spawned per +/// thread; each receives its own `WireframeApp` from the provided factory /// closure. The server listens for a shutdown signal using -/// `tokio::signal::ctrl_c` and notifies all workers to stop -/// accepting new connections. +/// `tokio::signal::ctrl_c` and notifies all workers to stop accepting new +/// connections. pub struct WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, @@ -96,14 +99,18 @@ where /// Because only one notification may be sent, a new `ready_tx` must be /// provided each time the server is started. pub(crate) ready_tx: Option>, + /// Typestate tracking whether the server has been bound to a listener. + /// [`Unbound`] servers require binding before they can run. pub(crate) state: S, pub(crate) _preamble: PhantomData, } /// Marker indicating the server has not yet bound a listener. +#[derive(Debug, Clone, Copy)] pub struct Unbound; /// Marker indicating the server is bound to a TCP listener. +#[derive(Debug, Clone)] pub struct Bound { pub(crate) listener: Arc, } From ff20ace07007dc1a2e62e3d99f487ced401256bc Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 20:07:22 +0100 Subject: [PATCH 12/20] Clarify preamble docs and binding links --- src/server/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index be08b2a9..6ec95c54 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -19,7 +19,7 @@ use crate::{app::WireframeApp, preamble::Preamble}; /// connection is handed off to [`WireframeApp`]. /// /// # Examples -/// ``` +/// ```no_run /// use std::io; /// /// use futures::future::BoxFuture; @@ -66,8 +66,8 @@ pub type PreambleErrorCallback = Arc; /// /// The server carries a typestate `S` indicating whether it is /// [`Unbound`] (not yet bound to a TCP listener) or [`Bound`]. New -/// servers start `Unbound` and must call [`binding::bind`] or -/// [`binding::bind_listener`] before running. A worker task is spawned per +/// servers start `Unbound` and must call [`binding::WireframeServer::bind`] or +/// [`binding::WireframeServer::bind_listener`] before running. A worker task is spawned per /// thread; each receives its own `WireframeApp` from the provided factory /// closure. The server listens for a shutdown signal using /// `tokio::signal::ctrl_c` and notifies all workers to stop accepting new @@ -77,8 +77,8 @@ where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, // `Preamble` covers types implementing `BorrowDecode` for any lifetime, // enabling decoding of borrowed data without external context. - // `()` already satisfies this bound via `bincode`, so servers default to - // having no preamble. + // `()` satisfies this bound via bincode's `BorrowDecode` support for unit, + // so servers default to having no preamble. T: Preamble, S: ServerState, { From 6f5b12aa90fe51b4c98cdb873bad90db69ab30bd Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 22:00:56 +0100 Subject: [PATCH 13/20] Seal ServerState trait --- src/server/mod.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 6ec95c54..a48b9331 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -116,7 +116,15 @@ pub struct Bound { } /// Trait implemented by [`Unbound`] and [`Bound`] to model binding typestate. -pub trait ServerState {} +pub trait ServerState: sealed::Sealed {} + +mod sealed { + //! Prevent external implementations of [`ServerState`]. + + pub trait Sealed {} + impl Sealed for super::Unbound {} + impl Sealed for super::Bound {} +} impl ServerState for Unbound {} impl ServerState for Bound {} From 7d6288f262ee32a611309be397ae97df0d1f3574 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 23:30:03 +0100 Subject: [PATCH 14/20] Make error callbacks 'static, default Unbound --- src/server/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index a48b9331..1a0fffca 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -60,7 +60,7 @@ impl PreambleSuccessHandler for F where pub type PreambleCallback = Arc>; /// Callback invoked when decoding a connection preamble fails. -pub type PreambleErrorCallback = Arc; +pub type PreambleErrorCallback = Arc; /// Tokio-based server for [`WireframeApp`] instances. /// @@ -106,7 +106,7 @@ where } /// Marker indicating the server has not yet bound a listener. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub struct Unbound; /// Marker indicating the server is bound to a TCP listener. From 3a9388eab7b453e7909e1ba74f056579cd16e7ec Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 9 Aug 2025 08:58:59 +0100 Subject: [PATCH 15/20] Use handler terminology and re-export ServerError --- examples/echo.rs | 2 +- examples/packet_enum.rs | 2 +- examples/ping_pong.rs | 2 +- src/server/config/binding.rs | 2 +- src/server/config/preamble.rs | 6 +++--- src/server/config/tests.rs | 18 +++++++++--------- src/server/connection.rs | 12 ++++++------ src/server/mod.rs | 33 +++++++++++++++++++++++++++------ src/server/runtime.rs | 14 +++++++------- 9 files changed, 56 insertions(+), 35 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index 3629f877..caeb2e7c 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -5,7 +5,7 @@ use wireframe::{ app::{Envelope, WireframeApp}, - server::{WireframeServer, error::ServerError}, + server::{ServerError, WireframeServer}, }; #[tokio::main] diff --git a/examples/packet_enum.rs b/examples/packet_enum.rs index 8a05d1c9..ff53fd8b 100644 --- a/examples/packet_enum.rs +++ b/examples/packet_enum.rs @@ -11,7 +11,7 @@ use wireframe::{ frame::{LengthFormat, LengthPrefixedProcessor}, message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, - server::{WireframeServer, error::ServerError}, + server::{ServerError, WireframeServer}, }; #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index d5e272f1..14a401a3 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -11,7 +11,7 @@ use wireframe::{ message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::BincodeSerializer, - server::{WireframeServer, error::ServerError}, + server::{ServerError, WireframeServer}, }; #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] diff --git a/src/server/config/binding.rs b/src/server/config/binding.rs index 65dba059..5664380f 100644 --- a/src/server/config/binding.rs +++ b/src/server/config/binding.rs @@ -9,7 +9,7 @@ use std::{ use tokio::net::TcpListener; use super::{Bound, Unbound, WireframeServer}; -use crate::{app::WireframeApp, preamble::Preamble, server::error::ServerError}; +use crate::{app::WireframeApp, preamble::Preamble, server::ServerError}; impl WireframeServer where diff --git a/src/server/config/preamble.rs b/src/server/config/preamble.rs index 309789c7..308b8f1c 100644 --- a/src/server/config/preamble.rs +++ b/src/server/config/preamble.rs @@ -20,7 +20,7 @@ where /// Converts the server to use a custom preamble type implementing /// [`crate::preamble::Preamble`] for incoming connections. /// - /// Calling this method drops any previously configured preamble decode callbacks + /// Calling this method drops any previously configured preamble handlers /// (both success and failure). /// /// # Examples @@ -52,7 +52,7 @@ where } builder_callback!( - /// Register a callback invoked when the connection preamble decodes successfully. + /// Register a handler invoked when the connection preamble decodes successfully. /// /// The handler must implement [`crate::server::PreambleSuccessHandler`]. /// @@ -77,7 +77,7 @@ where ); builder_callback!( - /// Register a callback invoked when the connection preamble fails to decode. + /// Register a handler invoked when the connection preamble fails to decode. /// /// The handler receives a [`bincode::error::DecodeError`]. /// diff --git a/src/server/config/tests.rs b/src/server/config/tests.rs index c1aa7eb4..177930a1 100644 --- a/src/server/config/tests.rs +++ b/src/server/config/tests.rs @@ -1,7 +1,7 @@ //! Tests for server configuration utilities. //! //! This module exercises the `WireframeServer` builder, covering worker counts, -//! binding behaviour, preamble handling, callback registration, and method +//! binding behaviour, preamble handling, handler registration, and method //! chaining. Fixtures from `test_util` provide shared setup and parameterised //! cases via `rstest`. @@ -95,15 +95,15 @@ async fn test_local_addr_after_bind( #[case("success")] #[case("failure")] #[tokio::test] -async fn test_preamble_callback_registration( +async fn test_preamble_handler_registration( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - #[case] callback_type: &str, + #[case] handler_type: &str, ) { let counter = Arc::new(AtomicUsize::new(0)); let c = counter.clone(); let server = server_with_preamble(factory); - let server = match callback_type { + let server = match handler_type { "success" => server.on_preamble_decode_success(move |_p: &TestPreamble, _| { let c = c.clone(); Box::pin(async move { @@ -114,11 +114,11 @@ async fn test_preamble_callback_registration( "failure" => server.on_preamble_decode_failure(move |_err: &DecodeError| { c.fetch_add(1, Ordering::SeqCst); }), - _ => panic!("Invalid callback type"), + _ => panic!("Invalid handler type"), }; assert_eq!(counter.load(Ordering::SeqCst), 0); - match callback_type { + match handler_type { "success" => assert!(server.on_preamble_success.is_some()), "failure" => assert!(server.on_preamble_failure.is_some()), _ => unreachable!(), @@ -131,8 +131,8 @@ async fn test_method_chaining( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, free_port: SocketAddr, ) { - let callback_invoked = Arc::new(AtomicUsize::new(0)); - let counter = callback_invoked.clone(); + let handler_invoked = Arc::new(AtomicUsize::new(0)); + let counter = handler_invoked.clone(); let server = WireframeServer::new(factory) .workers(2) .with_preamble::() @@ -148,7 +148,7 @@ async fn test_method_chaining( .expect("Failed to bind"); assert_eq!(server.worker_count(), 2); assert!(server.local_addr().is_some()); - assert_eq!(callback_invoked.load(Ordering::SeqCst), 0); + assert_eq!(handler_invoked.load(Ordering::SeqCst), 0); } #[rstest] diff --git a/src/server/connection.rs b/src/server/connection.rs index 503640d8..dae0c8d0 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -6,7 +6,7 @@ use futures::FutureExt; use tokio::net::TcpStream; use tokio_util::task::TaskTracker; -use super::{PreambleCallback, PreambleErrorCallback}; +use super::{PreambleErrorHandler, PreambleHandler}; use crate::{ app::WireframeApp, preamble::{Preamble, read_preamble}, @@ -17,8 +17,8 @@ use crate::{ pub(super) fn spawn_connection_task( stream: TcpStream, factory: F, - on_success: Option>, - on_failure: Option, + on_success: Option>, + on_failure: Option, tracker: &TaskTracker, ) where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, @@ -53,8 +53,8 @@ async fn process_stream( mut stream: TcpStream, peer_addr: Option, factory: F, - on_success: Option>, - on_failure: Option, + on_success: Option>, + on_failure: Option, ) where F: Fn() -> WireframeApp + Send + Sync + 'static, T: Preamble, @@ -64,7 +64,7 @@ async fn process_stream( if let Some(handler) = on_success.as_ref() && let Err(e) = handler(&preamble, &mut stream).await { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + tracing::error!(error = ?e, ?peer_addr, "preamble handler error"); } let stream = RewindStream::new(leftover, stream); let app = (factory)(); diff --git a/src/server/mod.rs b/src/server/mod.rs index 1a0fffca..1177e77d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -56,11 +56,11 @@ impl PreambleSuccessHandler for F where { } -/// Callback invoked when a connection preamble decodes successfully. -pub type PreambleCallback = Arc>; +/// Handler invoked when a connection preamble decodes successfully. +pub type PreambleHandler = Arc>; -/// Callback invoked when decoding a connection preamble fails. -pub type PreambleErrorCallback = Arc; +/// Handler invoked when decoding a connection preamble fails. +pub type PreambleErrorHandler = Arc; /// Tokio-based server for [`WireframeApp`] instances. /// @@ -72,6 +72,26 @@ pub type PreambleErrorCallback = Arc Result<(), ServerError> { +/// // Start unbound (S = Unbound) +/// let srv = WireframeServer::new(|| WireframeApp::default()); +/// +/// // Transition to bound (S = Bound) +/// let srv = srv.bind("127.0.0.1:0")?; +/// +/// // Run the server +/// srv.run().await +/// # } +/// ``` pub struct WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, @@ -84,8 +104,8 @@ where { pub(crate) factory: F, pub(crate) workers: usize, - pub(crate) on_preamble_success: Option>, - pub(crate) on_preamble_failure: Option, + pub(crate) on_preamble_success: Option>, + pub(crate) on_preamble_failure: Option, /// Channel used to notify when the server is ready. /// /// # Thread Safety @@ -133,6 +153,7 @@ mod config; pub use config::{binding, preamble}; mod connection; pub mod error; +pub use error::ServerError; mod runtime; /// Re-exported configuration types for server backoff behavior. diff --git a/src/server/runtime.rs b/src/server/runtime.rs index 6f15eae2..58105f1f 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -13,11 +13,11 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::{ Bound, - PreambleCallback, - PreambleErrorCallback, + PreambleErrorHandler, + PreambleHandler, + ServerError, WireframeServer, connection::spawn_connection_task, - error::ServerError, }; use crate::{app::WireframeApp, preamble::Preamble}; @@ -66,7 +66,7 @@ where /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// # #[tokio::main] - /// # async fn main() -> Result<(), wireframe::server::error::ServerError> { + /// # async fn main() -> Result<(), wireframe::server::ServerError> { /// let server = /// WireframeServer::new(|| WireframeApp::default()).bind(([127, 0, 0, 1], 8080).into())?; /// server.run().await?; @@ -93,7 +93,7 @@ where /// use wireframe::{app::WireframeApp, server::WireframeServer}; /// /// # #[tokio::main] - /// # async fn main() -> Result<(), wireframe::server::error::ServerError> { + /// # async fn main() -> Result<(), wireframe::server::ServerError> { /// let server = /// WireframeServer::new(|| WireframeApp::default()).bind(([127, 0, 0, 1], 0).into())?; /// @@ -171,8 +171,8 @@ where pub(super) async fn accept_loop( listener: Arc, factory: F, - on_success: Option>, - on_failure: Option, + on_success: Option>, + on_failure: Option, shutdown: CancellationToken, tracker: TaskTracker, backoff_config: BackoffConfig, From 34ad46854678d1b3656d61e6e19e8e4a6150c63b Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 9 Aug 2025 13:56:59 +0100 Subject: [PATCH 16/20] Use expect in echo example --- examples/echo.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index caeb2e7c..9228b999 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -12,7 +12,7 @@ use wireframe::{ async fn main() -> Result<(), ServerError> { let factory = || { WireframeApp::new() - .unwrap() + .expect("failed to create app") .route( 1, std::sync::Arc::new(|_: &Envelope| { @@ -22,11 +22,11 @@ async fn main() -> Result<(), ServerError> { }) }), ) - .unwrap() + .expect("failed to register route") }; WireframeServer::new(factory) - .bind("127.0.0.1:7878".parse().unwrap())? + .bind("127.0.0.1:7878".parse().expect("invalid bind address"))? .run() .await?; Ok(()) From 51bd37d1858ee9845230614434052586e26231f2 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 9 Aug 2025 18:26:27 +0100 Subject: [PATCH 17/20] Tighten the expect message to name the type Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- examples/echo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/echo.rs b/examples/echo.rs index 9228b999..efb1e674 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -12,7 +12,7 @@ use wireframe::{ async fn main() -> Result<(), ServerError> { let factory = || { WireframeApp::new() - .expect("failed to create app") + .expect("failed to create WireframeApp") .route( 1, std::sync::Arc::new(|_: &Envelope| { From 8035e58f12f36eacd91f87c8d738d79e4e5c835a Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 9 Aug 2025 18:26:48 +0100 Subject: [PATCH 18/20] Clarify which route failed to register Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- examples/echo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/echo.rs b/examples/echo.rs index efb1e674..3a620b69 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -22,7 +22,7 @@ async fn main() -> Result<(), ServerError> { }) }), ) - .expect("failed to register route") + .expect("failed to register route 1") }; WireframeServer::new(factory) From da6dd679bf653bb1fc94fd1820f47fc10a1e6d79 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 9 Aug 2025 18:27:36 +0100 Subject: [PATCH 19/20] Standardise the parse error message Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> rebase-temp-echo --- examples/echo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/echo.rs b/examples/echo.rs index 3a620b69..89e4f3b9 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), ServerError> { }; WireframeServer::new(factory) - .bind("127.0.0.1:7878".parse().expect("invalid bind address"))? + .bind("127.0.0.1:7878".parse().expect("invalid socket address"))? .run() .await?; Ok(()) From bb5cec059ed26fa6596775beafb4723cd2a08a2a Mon Sep 17 00:00:00 2001 From: Payton McIntosh Date: Sat, 9 Aug 2025 19:33:39 +0100 Subject: [PATCH 20/20] Apply formatting --- src/server/config/binding.rs | 8 ++++++-- src/server/config/mod.rs | 13 ++++++++++--- src/server/runtime.rs | 2 +- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/server/config/binding.rs b/src/server/config/binding.rs index 5664380f..fdadc041 100644 --- a/src/server/config/binding.rs +++ b/src/server/config/binding.rs @@ -8,8 +8,12 @@ use std::{ use tokio::net::TcpListener; -use super::{Bound, Unbound, WireframeServer}; -use crate::{app::WireframeApp, preamble::Preamble, server::ServerError}; +use super::{Unbound, WireframeServer}; +use crate::{ + app::WireframeApp, + preamble::Preamble, + server::{Bound, ServerError}, +}; impl WireframeServer where diff --git a/src/server/config/mod.rs b/src/server/config/mod.rs index 97e4b988..fa5ce2da 100644 --- a/src/server/config/mod.rs +++ b/src/server/config/mod.rs @@ -9,6 +9,7 @@ //! on [`Unbound`] servers. use core::marker::PhantomData; + use tokio::sync::oneshot; use super::{ServerState, Unbound, WireframeServer}; @@ -65,7 +66,15 @@ where #[must_use] pub fn new(factory: F) -> Self { let workers = std::thread::available_parallelism().map_or(1, std::num::NonZeroUsize::get); - Self { factory, workers, on_preamble_success: None, on_preamble_failure: None, ready_tx: None, state: Unbound, _preamble: PhantomData } + Self { + factory, + workers, + on_preamble_success: None, + on_preamble_failure: None, + ready_tx: None, + state: Unbound, + _preamble: PhantomData, + } } } @@ -122,6 +131,4 @@ where #[inline] #[must_use] pub const fn worker_count(&self) -> usize { self.workers } - - } diff --git a/src/server/runtime.rs b/src/server/runtime.rs index 58105f1f..d8b12230 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -153,7 +153,7 @@ where on_failure, token, t, - backoff_config, + BackoffConfig::default(), )); }