diff --git a/Cargo.toml b/Cargo.toml index 2d547972..1e4e3b76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,8 @@ tokio = { version = "1.46.1", default-features = false, features = [ "sync", "time", "io-util", - "test-util", ] } -tokio-util = { version = "0.7.16", features = ["rt"] } +tokio-util = { version = "0.7.16", features = ["rt", "codec"] } futures = "0.3.31" async-trait = "0.1.88" bytes = "1.10.1" @@ -50,10 +49,19 @@ serial_test = "3.2.0" # Permit compatible bug fixes but block breaking updates cucumber = "0.21.1" metrics-util = "0.20.0" -tracing = { version = "0.1.41", features = ["log", "log-always"] } tracing-test = "0.2.5" mockall = "0.13.1" +tokio = { version = "1.46.1", default-features = false, features = [ + "macros", + "rt-multi-thread", + "sync", + "time", + "io-util", + "net", + "test-util", +] } + [features] default = ["metrics"] metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"] diff --git a/README.md b/README.md index a9820cc1..92dcc5c6 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,6 @@ connections and runs the Tokio event loop: ```rust WireframeServer::new(|| { WireframeApp::new() - .frame_processor(MyFrameProcessor::new()) .app_data(state.clone()) .route(MessageType::Login, handle_login) .wrap(MyLoggingMiddleware::default()) @@ -48,10 +47,10 @@ WireframeServer::new(|| { By default, the number of worker tasks equals the number of CPU cores. If the CPU count cannot be determined, the server falls back to a single worker. -The builder supports methods like `frame_processor`, `route`, `app_data`, and -`wrap` for middleware configuration. `app_data` stores any `Send + Sync` value -keyed by type; registering another value of the same type overwrites the -previous one. Handlers retrieve these values using the `SharedState` +The builder supports methods like `route`, `app_data`, and `wrap` for +middleware configuration. `app_data` stores any `Send + Sync` value keyed by +type; registering another value of the same type overwrites the previous one. +Handlers retrieve these values using the `SharedState` extractor【F:docs/rust-binary-router-library-design.md†L622-L710】. Handlers are asynchronous functions whose parameters implement extractor traits @@ -62,7 +61,7 @@ concise【F:docs/rust-binary-router-library-design.md†L682-L710】. ## Example The design document includes a simple echo server that demonstrates routing -based on a message ID and the use of a length‑prefixed frame processor: +based on a message ID and the use of a length‑delimited codec: ```rust async fn handle_echo(req: Message) -> WireframeResult { @@ -139,10 +138,7 @@ size and endianness) and defaults to a 4‑byte big‑endian length prefix【F:docs/rust-binary-router-library-design.md†L1082-L1123】. ```rust -use wireframe::frame::{LengthFormat, LengthPrefixedProcessor}; - -let app = WireframeApp::new()? - .frame_processor(LengthPrefixedProcessor::new(LengthFormat::u16_le())); +let app = WireframeApp::new()?; ``` ## Connection Lifecycle diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 027bf0e8..f173b120 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -162,11 +162,6 @@ Developers will enable fragmentation by adding the `FragmentAdapter` to their // Example: Configuring a server for MySQL-style fragmentation. WireframeServer::new(|| { WireframeApp::new() - .frame_processor( - FragmentAdapter::new(MySqlStrategy) - .with_max_message_size(64 * 1024 * 1024) // 64 MiB - .with_reassembly_timeout(Duration::from_secs(30)) - ) .route(...) }) ``` diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 15df8665..34debb68 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -726,8 +726,7 @@ component to run it. async fn main_server_setup() -> std::io::Result<()> { let app_state = Arc::new(Mutex::new(AppState::new())); WireframeServer::new(move || { // Closure provides App per worker thread - WireframeApp::new() - .frame_processor(MyFrameProcessor::new()) // Configure the framing logic + WireframeApp::new() .app_data(app_state.clone()) // Shared application state //.service(login_handler) // If using attribute macros and auto-discovery //.service(chat_handler) @@ -745,10 +744,8 @@ The WireframeApp builder would offer methods like: - WireframeApp::new(): Creates a new application builder. -- .frame_processor(impl FrameProcessor): Sets the framing logic. - -- .service(handler_function): Registers a handler function, potentially - inferring the message type it handles if attribute macros are used. +- `[deprecated]` `.frame_processor(impl FrameProcessor)`: framing is now + handled by the connection codec. - .route(message_id, handler_function): Explicitly maps a message identifier to a handler. @@ -1273,7 +1270,7 @@ its own `FrameProcessor` trait or provide helpers.) WireframeServer::new(|| { WireframeApp::new() - //.frame_processor(LengthPrefixedCodec) // Simplified + //.frame_processor(LengthPrefixedCodec) // deprecated: framing handled by codec .serializer(BincodeSerializer) // Specify serializer .route(MyMessageType::Echo, handle_echo) // Route based on ID // OR if type-based routing is supported and EchoRequest has an ID: @@ -1383,14 +1380,14 @@ simplify server implementation. let chat_state = Arc::new(Mutex::new(ChatRoomState { users: HashMap::new() })); - WireframeServer::new(move || { - WireframeApp::new() - //.frame_processor(...) - .serializer(BincodeSerializer) - .app_data(chat_state.clone()) - .route(ChatMessageType::ClientJoin, handle_join) - .route(ChatMessageType::ClientPost, handle_post) - }) + WireframeServer::new(move || { + WireframeApp::new() + //.frame_processor(...) // deprecated: framing handled by codec + .serializer(BincodeSerializer) + .app_data(chat_state.clone()) + .route(ChatMessageType::ClientJoin, handle_join) + .route(ChatMessageType::ClientPost, handle_post) + }) .bind("127.0.0.1:8001")? .run() .await diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index abca896c..731cde8d 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -33,14 +33,14 @@ A `WireframeClient::builder()` method configures the client: ```rust let client = WireframeClient::builder() - .frame_processor(LengthPrefixedProcessor::new(LengthFormat::u32_be())) .serializer(BincodeSerializer) .connect("127.0.0.1:7878") .await?; ``` -The same `FrameProcessor` and `Serializer` traits used by the server are reused -here, ensuring messages are framed and encoded consistently. +The same `Serializer` trait used by the server is reused here, ensuring +messages are encoded consistently while framing is handled by the +length‑delimited codec. ### Request/Response Helpers @@ -52,9 +52,9 @@ let request = Login { username: "guest".into() }; let response: LoginAck = client.call(request).await?; ``` -Internally, this uses the `Serializer` to encode the request, writes it through -the `FrameProcessor`, then waits for a frame, decodes it, and deserializes the -response type. +Internally, this uses the `Serializer` to encode the request, sends it through +the length‑delimited codec, then waits for a frame, decodes it, and +deserializes the response type. ### Connection Lifecycle @@ -68,7 +68,6 @@ initialization logic. #[tokio::main] async fn main() -> std::io::Result<()> { let mut client = WireframeClient::builder() - .frame_processor(LengthPrefixedProcessor::new(LengthFormat::u32_be())) .serializer(BincodeSerializer) .connect("127.0.0.1:7878") .await?; diff --git a/docs/wireframe-testing-crate.md b/docs/wireframe-testing-crate.md index 3972a71f..6152384a 100644 --- a/docs/wireframe-testing-crate.md +++ b/docs/wireframe-testing-crate.md @@ -135,14 +135,12 @@ let (_, frame) = recv_expect!(queues.recv()); ```rust use std::sync::Arc; use wireframe_testing::{drive_with_frame, drive_with_frames}; -use wireframe::processor::LengthPrefixedProcessor; use crate::tests::{build_test_frame, expected_bytes}; #[tokio::test] async fn handler_echoes_message() { let app = WireframeApp::new() .unwrap() - .frame_processor(LengthPrefixedProcessor::default()) .route(1, Arc::new(|_| Box::pin(async {}))) .unwrap(); diff --git a/examples/echo.rs b/examples/echo.rs index 89e4f3b9..91ae5f1d 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -4,14 +4,17 @@ //! envelope back to the client. use wireframe::{ - app::{Envelope, WireframeApp}, + app::Envelope, + serializer::BincodeSerializer, server::{ServerError, WireframeServer}, }; +type App = wireframe::app::WireframeApp; + #[tokio::main] async fn main() -> Result<(), ServerError> { let factory = || { - WireframeApp::new() + App::new() .expect("failed to create WireframeApp") .route( 1, diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index aae264aa..d23b6222 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -8,13 +8,16 @@ use std::{io, sync::Arc}; use bytes::BytesMut; use tokio::io::{AsyncWriteExt, duplex}; use wireframe::{ - app::{Envelope, WireframeApp}, + app::Envelope, frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::Serializer, }; +type App = wireframe::app::WireframeApp; + /// Frame format with a two-byte id, one-byte flags, and bincode payload. +#[derive(Default)] struct HeaderSerializer; impl Serializer for HeaderSerializer { @@ -61,15 +64,13 @@ struct Ping; #[tokio::main] async fn main() -> io::Result<()> { - let app = WireframeApp::new() + let app = App::with_serializer(HeaderSerializer) .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) - .serializer(HeaderSerializer) .route( 1, Arc::new(|_env: &Envelope| { Box::pin(async move { - println!("received ping message"); + tracing::info!("received ping message"); }) }), ) @@ -78,7 +79,7 @@ async fn main() -> io::Result<()> { 2, Arc::new(|_env: &Envelope| { Box::pin(async move { - println!("received pong message"); + tracing::info!("received pong message"); }) }), ) diff --git a/examples/packet_enum.rs b/examples/packet_enum.rs index ff53fd8b..544fe5b4 100644 --- a/examples/packet_enum.rs +++ b/examples/packet_enum.rs @@ -6,16 +6,19 @@ use std::{collections::HashMap, future::Future, pin::Pin}; use async_trait::async_trait; +use tracing::{info, warn}; use wireframe::{ - app::{Envelope, WireframeApp}, - frame::{LengthFormat, LengthPrefixedProcessor}, + app::Envelope, message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, + serializer::BincodeSerializer, server::{ServerError, WireframeServer}, }; +type App = wireframe::app::WireframeApp; + #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] -enum Packet { +enum ExamplePacket { Ping, Chat { user: String, msg: String }, Stats(Vec), @@ -24,7 +27,7 @@ enum Packet { #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] struct Frame { headers: HashMap, - packet: Packet, + packet: ExamplePacket, } /// Middleware that decodes incoming frames and logs packet details. @@ -45,12 +48,12 @@ where async fn call(&self, req: ServiceRequest) -> Result { match Frame::from_bytes(req.frame()) { Ok((frame, _)) => match frame.packet { - Packet::Ping => println!("ping: {:?}", frame.headers), - Packet::Chat { user, msg } => println!("{user} says: {msg}"), - Packet::Stats(values) => println!("stats: {values:?}"), + ExamplePacket::Ping => info!("ping: {:?}", frame.headers), + ExamplePacket::Chat { user, msg } => info!("{user} says: {msg}"), + ExamplePacket::Stats(values) => info!("stats: {values:?}"), }, Err(e) => { - eprintln!("Failed to decode frame: {e}"); + warn!("Failed to decode frame: {e}"); } } @@ -71,16 +74,15 @@ impl Transform> for DecodeMiddleware { fn handle_packet(_env: &Envelope) -> Pin + Send>> { Box::pin(async { - println!("packet received"); + info!("packet received"); }) } #[tokio::main] async fn main() -> Result<(), ServerError> { let factory = || { - WireframeApp::new() + App::new() .expect("Failed to create WireframeApp") - .frame_processor(LengthPrefixedProcessor::new(LengthFormat::u16_le())) .wrap(DecodeMiddleware) .expect("Failed to wrap middleware") .route(1, std::sync::Arc::new(handle_packet)) diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index 5a4a5f5a..7410d31b 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -7,13 +7,15 @@ use std::{net::SocketAddr, sync::Arc}; use async_trait::async_trait; use wireframe::{ - app::{Envelope, Packet, Result as AppResult, WireframeApp}, + app::{Envelope, Packet, Result as AppResult}, message::Message, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::BincodeSerializer, server::{ServerError, WireframeServer}, }; +type App = wireframe::app::WireframeApp; + #[derive(bincode::Encode, bincode::BorrowDecode, Debug)] struct Ping(u32); @@ -40,7 +42,10 @@ const PING_ID: u32 = 1; /// /// The middleware chain generates the actual response, so this /// handler intentionally performs no work. -#[allow(clippy::unused_async)] +#[expect( + clippy::unused_async, + reason = "Keep async signature to match Handler and Transform trait expectations" +)] async fn ping_handler() {} struct PongMiddleware; @@ -130,8 +135,8 @@ impl Transform> for Logging { } } -fn build_app() -> AppResult { - WireframeApp::new()? +fn build_app() -> AppResult { + App::new()? .serializer(BincodeSerializer) .route(PING_ID, Arc::new(|_: &Envelope| Box::pin(ping_handler())))? .wrap(PongMiddleware)? diff --git a/src/app.rs b/src/app.rs deleted file mode 100644 index 28d54773..00000000 --- a/src/app.rs +++ /dev/null @@ -1,826 +0,0 @@ -//! Application builder configuring routes and middleware. -//! -//! This module defines [`WireframeApp`], an Actix-inspired builder for -//! managing connection state, routing, and middleware in a `WireframeServer`. -//! It exposes convenience methods to register handlers and lifecycle hooks. - -use std::{ - any::{Any, TypeId}, - boxed::Box, - collections::HashMap, - future::Future, - pin::Pin, - sync::Arc, -}; - -use bytes::BytesMut; -use tokio::{ - io::{self, AsyncWrite, AsyncWriteExt}, - sync::mpsc, -}; - -use crate::{ - frame::{FrameProcessor, LengthFormat, LengthPrefixedProcessor}, - hooks::{ProtocolHooks, WireframeProtocol}, - message::Message, - middleware::{HandlerService, Service, ServiceRequest, Transform}, - serializer::{BincodeSerializer, Serializer}, -}; - -type BoxedFrameProcessor = - Box, Error = io::Error> + Send + Sync>; - -/// Callback invoked when a connection is established. -/// -/// # Examples -/// -/// ```no_run -/// use std::sync::Arc; -/// -/// use wireframe::app::ConnectionSetup; -/// -/// let setup: Arc> = Arc::new(|| { -/// Box::pin(async { -/// // Perform authentication and return connection state -/// String::from("hello") -/// }) -/// }); -/// ``` -pub type ConnectionSetup = dyn Fn() -> Pin + Send>> + Send + Sync; - -/// Callback invoked when a connection is closed. -/// -/// # Examples -/// -/// ```no_run -/// use std::sync::Arc; -/// -/// use wireframe::app::ConnectionTeardown; -/// -/// let teardown: Arc> = Arc::new(|state| { -/// Box::pin(async move { -/// println!("Dropping {state}"); -/// }) -/// }); -/// ``` -pub type ConnectionTeardown = - dyn Fn(C) -> Pin + Send>> + Send + Sync; - -/// Configures routing and middleware for a `WireframeServer`. -/// -/// The builder stores registered routes, services, and middleware -/// without enforcing an ordering. Methods return [`Result`] so -/// registrations can be chained ergonomically. -pub struct WireframeApp< - S: Serializer + Send + Sync = BincodeSerializer, - C: Send + 'static = (), - E: Packet = Envelope, -> { - routes: HashMap>, - services: Vec>, - middleware: Vec, Output = HandlerService>>>, - frame_processor: BoxedFrameProcessor, - serializer: S, - app_data: HashMap>, - on_connect: Option>>, - on_disconnect: Option>>, - protocol: Option, ProtocolError = ()>>>, - push_dlq: Option>>, -} - -/// Alias for asynchronous route handlers. -/// -/// A `Handler` is an `Arc` to a function returning a [`Future`], enabling -/// asynchronous execution of message handlers. -pub type Handler = Arc Pin + Send>> + Send + Sync>; - -/// Trait representing middleware components. -pub trait Middleware: - Transform, Output = HandlerService> + Send + Sync -{ -} - -impl Middleware for T where - T: Transform, Output = HandlerService> + Send + Sync -{ -} - -/// Top-level error type for application setup. -#[derive(Debug)] -pub enum WireframeError { - /// A route with the provided identifier was already registered. - DuplicateRoute(u32), -} - -/// Errors produced when sending a handler response over a stream. -#[derive(Debug)] -pub enum SendError { - /// Serialization failed. - Serialize(Box), - /// Writing to the stream failed. - Io(io::Error), -} - -impl std::fmt::Display for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - SendError::Serialize(e) => write!(f, "serialization error: {e}"), - SendError::Io(e) => write!(f, "I/O error: {e}"), - } - } -} - -impl std::error::Error for SendError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - SendError::Serialize(e) => Some(&**e), - SendError::Io(e) => Some(e), - } - } -} - -impl From for SendError { - fn from(e: io::Error) -> Self { SendError::Io(e) } -} - -/// Envelope-like type used to wrap incoming and outgoing messages. -/// -/// Custom envelope types must implement this trait so [`WireframeApp`] can -/// route messages and construct responses. -/// -/// # Example -/// -/// ``` -/// use wireframe::{ -/// app::{Packet, PacketParts}, -/// message::Message, -/// }; -/// -/// #[derive(bincode::Decode, bincode::Encode)] -/// struct CustomEnvelope { -/// id: u32, -/// payload: Vec, -/// timestamp: u64, -/// } -/// -/// impl Packet for CustomEnvelope { -/// fn id(&self) -> u32 { self.id } -/// -/// fn correlation_id(&self) -> Option { None } -/// -/// fn into_parts(self) -> PacketParts { PacketParts::new(self.id, None, self.payload) } -/// -/// fn from_parts(parts: PacketParts) -> Self { -/// Self { -/// id: parts.id(), -/// payload: parts.payload(), -/// timestamp: 0, -/// } -/// } -/// } -/// ``` -pub trait Packet: Message + Send + Sync + 'static { - /// Return the message identifier used for routing. - fn id(&self) -> u32; - - /// Return the correlation identifier tying this frame to a request. - fn correlation_id(&self) -> Option; - - /// Consume the packet and return its identifier, correlation id and payload bytes. - fn into_parts(self) -> PacketParts; - - /// Construct a new packet from raw parts. - fn from_parts(parts: PacketParts) -> Self; -} - -/// Component values extracted from or used to build a [`Packet`]. -#[derive(Debug)] -pub struct PacketParts { - id: u32, - correlation_id: Option, - payload: Vec, -} - -/// Basic envelope type used by [`WireframeApp::handle_connection`]. -/// -/// Incoming frames are deserialized into an `Envelope` containing the -/// message identifier and raw payload bytes. -#[derive(bincode::Decode, bincode::Encode, Debug)] -pub struct Envelope { - pub(crate) id: u32, - pub(crate) correlation_id: Option, - pub(crate) payload: Vec, -} - -impl Envelope { - /// Create a new [`Envelope`] with the provided identifiers and payload. - #[must_use] - pub fn new(id: u32, correlation_id: Option, payload: Vec) -> Self { - Self { - id, - correlation_id, - payload, - } - } -} - -impl Packet for Envelope { - #[inline] - fn id(&self) -> u32 { self.id } - - #[inline] - fn correlation_id(&self) -> Option { self.correlation_id } - - fn into_parts(self) -> PacketParts { self.into() } - - fn from_parts(parts: PacketParts) -> Self { parts.into() } -} - -impl PacketParts { - /// Construct a new set of packet parts. - #[must_use] - pub fn new(id: u32, correlation_id: Option, payload: Vec) -> Self { - Self { - id, - correlation_id, - payload, - } - } - - #[must_use] - pub const fn id(&self) -> u32 { self.id } - - #[must_use] - pub const fn correlation_id(&self) -> Option { self.correlation_id } - - #[must_use] - pub fn payload(self) -> Vec { self.payload } - - /// Ensure a correlation identifier is present, inheriting from `source` if missing. - /// - /// # Examples - /// ``` - /// use wireframe::app::PacketParts; - /// // Inherit when missing - /// let parts = PacketParts::new(1, None, vec![]).inherit_correlation(Some(42)); - /// assert_eq!(parts.correlation_id(), Some(42)); - /// - /// // Overwrite mismatched value - /// let parts = PacketParts::new(1, Some(7), vec![]).inherit_correlation(Some(8)); - /// assert_eq!(parts.correlation_id(), Some(8)); - /// ``` - #[must_use] - pub fn inherit_correlation(mut self, source: Option) -> Self { - match (self.correlation_id, source) { - (None, cid) => self.correlation_id = cid, - (Some(cid), Some(src)) if cid != src => { - tracing::warn!( - id = self.id, - expected = src, - found = cid, - "mismatched correlation id in response", - ); - // Overwrite with the source correlation ID to ensure downstream - // consistency. - self.correlation_id = Some(src); - } - _ => {} - } - self - } -} - -impl From for PacketParts { - fn from(e: Envelope) -> Self { PacketParts::new(e.id, e.correlation_id, e.payload) } -} - -impl From for Envelope { - fn from(p: PacketParts) -> Self { - let id = p.id(); - let correlation_id = p.correlation_id(); - let payload = p.payload(); - Envelope::new(id, correlation_id, payload) - } -} - -/// Number of idle polls before terminating a connection. -const MAX_IDLE_POLLS: u32 = 50; // ~5s with 100ms timeout -/// Maximum consecutive deserialization failures before closing a connection. -const MAX_DESER_FAILURES: u32 = 10; - -/// Result type used throughout the builder API. -pub type Result = std::result::Result; - -impl Default for WireframeApp -where - S: Serializer + Default + Send + Sync, - C: Send + 'static, - E: Packet, -{ - /// - /// Initializes empty routes, services, middleware, and application data. - /// Sets the default frame processor and serializer, with no connection - /// lifecycle hooks. - fn default() -> Self { - Self { - routes: HashMap::new(), - services: Vec::new(), - middleware: Vec::new(), - frame_processor: Box::new(LengthPrefixedProcessor::new(LengthFormat::default())), - serializer: S::default(), - app_data: HashMap::new(), - on_connect: None, - on_disconnect: None, - protocol: None, - push_dlq: None, - } - } -} - -impl WireframeApp -where - E: Packet, -{ - /// Construct a new empty application builder. - /// - /// # Errors - /// - /// This function currently never returns an error but uses [`Result`] for - /// forward compatibility. - /// - /// # Examples - /// - /// ``` - /// use wireframe::app::{Packet, WireframeApp}; - /// - /// #[derive(bincode::Encode, bincode::BorrowDecode)] - /// struct MyEnv { - /// id: u32, - /// correlation_id: Option, - /// data: Vec, - /// } - /// - /// impl Packet for MyEnv { - /// fn id(&self) -> u32 { self.id } - /// fn correlation_id(&self) -> Option { self.correlation_id } - /// fn into_parts(self) -> PacketParts { - /// PacketParts::new(self.id, self.correlation_id, self.data) - /// } - /// fn from_parts(parts: PacketParts) -> Self { - /// Self { - /// id: parts.id(), - /// correlation_id: parts.correlation_id(), - /// data: parts.payload(), - /// } - /// } - /// } - /// - /// let app = WireframeApp::<_, _, MyEnv>::new().expect("failed to create app"); - /// ``` - pub fn new() -> Result { Ok(Self::default()) } - - /// Construct a new application builder using a custom envelope type. - /// - /// Deprecated: call [`WireframeApp::new`] with explicit envelope type - /// parameters. - /// - /// # Errors - /// - /// This function currently never returns an error but uses [`Result`] for - /// forward compatibility. - #[deprecated(note = "use `WireframeApp::<_, _, E>::new()` instead")] - pub fn new_with_envelope() -> Result { Self::new() } -} - -impl WireframeApp -where - S: Serializer + Send + Sync, - C: Send + 'static, - E: Packet, -{ - /// Register a route that maps `id` to `handler`. - /// - /// # Errors - /// - /// Returns [`WireframeError::DuplicateRoute`] if a handler for `id` - /// has already been registered. - pub fn route(mut self, id: u32, handler: Handler) -> Result { - if self.routes.contains_key(&id) { - return Err(WireframeError::DuplicateRoute(id)); - } - self.routes.insert(id, handler); - Ok(self) - } - - /// Register a handler discovered by attribute macros or other means. - /// - /// # Errors - /// - /// This function always succeeds currently but uses [`Result`] for - /// consistency with other builder methods. - pub fn service(mut self, handler: Handler) -> Result { - self.services.push(handler); - Ok(self) - } - - /// Store a shared state value accessible to request extractors. - /// - /// The value can later be retrieved using [`crate::extractor::SharedState`]. Registering - /// another value of the same type overwrites the previous one. - #[must_use] - pub fn app_data(mut self, state: T) -> Self - where - T: Send + Sync + 'static, - { - self.app_data.insert( - TypeId::of::(), - Arc::new(state) as Arc, - ); - self - } - - /// Add a middleware component to the processing pipeline. - /// - /// # Errors - /// - /// This function currently always succeeds. - pub fn wrap(mut self, mw: M) -> Result - where - M: Transform, Output = HandlerService> + Send + Sync + 'static, - { - self.middleware.push(Box::new(mw)); - Ok(self) - } - - /// Register a callback invoked when a new connection is established. - /// - /// The callback can perform authentication or other setup tasks and - /// returns connection-specific state stored for the connection's - /// lifetime. - /// - /// # Type Parameters - /// - /// This method changes the connection state type parameter from `C` to `C2`. - /// This means that any subsequent builder methods will operate on the new connection state type - /// `C2`. Be aware of this type transition when chaining builder methods. - /// - /// # Errors - /// - /// This function always succeeds currently but uses [`Result`] for - /// consistency with other builder methods. - pub fn on_connection_setup(self, f: F) -> Result> - where - F: Fn() -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - C2: Send + 'static, - { - Ok(WireframeApp { - routes: self.routes, - services: self.services, - middleware: self.middleware, - frame_processor: self.frame_processor, - serializer: self.serializer, - app_data: self.app_data, - on_connect: Some(Arc::new(move || Box::pin(f()))), - on_disconnect: None, - protocol: self.protocol, - push_dlq: self.push_dlq, - }) - } - - /// Register a callback invoked when a connection is closed. - /// - /// The callback receives the connection state produced by - /// [`on_connection_setup`](Self::on_connection_setup). - /// - /// # Errors - /// - /// This function always succeeds currently but uses [`Result`] for - /// consistency with other builder methods. - pub fn on_connection_teardown(mut self, f: F) -> Result - where - F: Fn(C) -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - self.on_disconnect = Some(Arc::new(move |c| Box::pin(f(c)))); - Ok(self) - } - - /// Install a [`WireframeProtocol`] implementation. - /// - /// The protocol defines hooks for connection setup, frame modification, and - /// command completion. It is wrapped in an [`Arc`] and stored for later use - /// by the connection actor. - #[must_use] - pub fn with_protocol

(mut self, protocol: P) -> Self - where - P: WireframeProtocol, ProtocolError = ()> + 'static, - { - self.protocol = Some(Arc::new(protocol)); - self - } - - /// Configure a Dead Letter Queue for dropped push frames. - /// - /// ```rust,no_run - /// use tokio::sync::mpsc; - /// use wireframe::app::WireframeApp; - /// - /// # fn build() -> WireframeApp { WireframeApp::new().unwrap() } - /// # fn main() { - /// let (tx, _rx) = mpsc::channel(16); - /// let app = build().with_push_dlq(tx); - /// # let _ = app; - /// # } - /// ``` - #[must_use] - pub fn with_push_dlq(mut self, dlq: mpsc::Sender>) -> Self { - self.push_dlq = Some(dlq); - self - } - - /// Get a clone of the configured protocol, if any. - /// - /// Returns `None` if no protocol was installed via [`with_protocol`](Self::with_protocol). - #[must_use] - pub fn protocol( - &self, - ) -> Option, ProtocolError = ()>>> { - self.protocol.as_ref().map(Arc::clone) - } - - /// Return protocol hooks derived from the installed protocol. - /// - /// If no protocol is installed, returns default (no-op) hooks. - #[must_use] - pub fn protocol_hooks(&self) -> ProtocolHooks, ()> { - self.protocol - .as_ref() - .map(|p| ProtocolHooks::from_protocol(&Arc::clone(p))) - .unwrap_or_default() - } - - /// Set the frame processor used for encoding and decoding frames. - #[must_use] - pub fn frame_processor

(mut self, processor: P) -> Self - where - P: FrameProcessor, Error = io::Error> + Send + Sync + 'static, - { - self.frame_processor = Box::new(processor); - self - } - - /// Replace the serializer used for messages. - #[must_use] - pub fn serializer(self, serializer: Ser) -> WireframeApp - where - Ser: Serializer + Send + Sync, - { - WireframeApp { - routes: self.routes, - services: self.services, - middleware: self.middleware, - frame_processor: self.frame_processor, - serializer, - app_data: self.app_data, - on_connect: self.on_connect, - on_disconnect: self.on_disconnect, - protocol: self.protocol, - push_dlq: self.push_dlq, - } - } - - /// Serialize `msg` and write it to `stream` using the frame processor. - /// - /// # Errors - /// - /// Returns a [`SendError`] if serialization or writing fails. - pub async fn send_response( - &self, - stream: &mut W, - msg: &M, - ) -> std::result::Result<(), SendError> - where - W: AsyncWrite + Unpin, - M: Message, - { - let bytes = self - .serializer - .serialize(msg) - .map_err(SendError::Serialize)?; - let mut framed = BytesMut::with_capacity(4 + bytes.len()); - self.frame_processor - .encode(&bytes, &mut framed) - .map_err(SendError::Io)?; - stream.write_all(&framed).await.map_err(SendError::Io)?; - stream.flush().await.map_err(SendError::Io) - } -} - -impl WireframeApp -where - S: Serializer + crate::frame::FrameMetadata + Send + Sync, - C: Send + 'static, - E: Packet, -{ - /// Try parsing the frame using [`FrameMetadata::parse`], falling back to - /// full deserialization on failure. - fn parse_envelope( - &self, - frame: &[u8], - ) -> std::result::Result<(Envelope, usize), Box> { - self.serializer - .parse(frame) - .map_err(|e| Box::new(e) as Box) - .or_else(|_| self.serializer.deserialize::(frame)) - } - - /// Handle an accepted connection. - /// - /// This placeholder immediately closes the connection after the - /// preamble phase. A warning is logged so tests and callers are - /// aware of the current limitation. - pub async fn handle_connection(&self, mut stream: W) - where - W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, - { - let state = if let Some(setup) = &self.on_connect { - Some((setup)().await) - } else { - None - }; - - let routes = self.build_chains().await; - - if let Err(e) = self.process_stream(&mut stream, &routes).await { - tracing::warn!(correlation_id = ?None::, error = ?e, "connection terminated with error"); - } - - if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { - teardown(state).await; - } - } - - async fn build_chains(&self) -> HashMap> { - let mut routes = HashMap::new(); - for (&id, handler) in &self.routes { - let mut service = HandlerService::new(id, handler.clone()); - for mw in self.middleware.iter().rev() { - service = mw.transform(service).await; - } - routes.insert(id, service); - } - routes - } - - async fn process_stream( - &self, - stream: &mut W, - routes: &HashMap>, - ) -> io::Result<()> - where - W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, - { - let mut buf = BytesMut::with_capacity(1024); - let mut idle = 0u32; - let mut deser_failures = 0u32; - - loop { - if let Some(frame) = self.frame_processor.decode(&mut buf)? { - self.handle_frame(stream, &frame, &mut deser_failures, routes) - .await?; - idle = 0; - continue; - } - - if self.read_and_update(stream, &mut buf, &mut idle).await? { - break; - } - } - - Ok(()) - } - - async fn read_and_update( - &self, - stream: &mut W, - buf: &mut BytesMut, - idle: &mut u32, - ) -> io::Result - where - W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, - { - match self.read_into(stream, buf).await { - Ok(Some(0)) => Ok(true), - Ok(Some(_)) => { - *idle = 0; - Ok(false) - } - Ok(None) => { - *idle += 1; - Ok(*idle >= MAX_IDLE_POLLS) - } - Err(e) if Self::is_transient_error(&e) => Ok(false), - Err(e) if Self::is_fatal_error(&e) => Ok(true), - Err(e) => Err(e), - } - } - - fn is_transient_error(e: &io::Error) -> bool { - matches!( - e.kind(), - io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted - ) - } - - fn is_fatal_error(e: &io::Error) -> bool { - matches!( - e.kind(), - io::ErrorKind::UnexpectedEof - | io::ErrorKind::ConnectionReset - | io::ErrorKind::ConnectionAborted - | io::ErrorKind::BrokenPipe - ) - } - - async fn read_into(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result> - where - W: tokio::io::AsyncRead + Unpin, - { - use tokio::{ - io::AsyncReadExt, - time::{Duration, timeout}, - }; - - const READ_TIMEOUT: Duration = Duration::from_millis(100); - - match timeout(READ_TIMEOUT, stream.read_buf(buf)).await { - Ok(Ok(n)) => Ok(Some(n)), - Ok(Err(e)) => Err(e), - Err(_) => Ok(None), - } - } - - async fn handle_frame( - &self, - stream: &mut W, - frame: &[u8], - deser_failures: &mut u32, - routes: &HashMap>, - ) -> io::Result<()> - where - W: tokio::io::AsyncWrite + Unpin, - { - // Parse the frame first; routing is handled below to avoid duplicating - // logic on the success path. - crate::metrics::inc_frames(crate::metrics::Direction::Inbound); - let (env, _) = match self.parse_envelope(frame) { - Ok(result) => { - *deser_failures = 0; - result - } - Err(e) => { - *deser_failures += 1; - tracing::warn!(correlation_id = ?None::, error = ?e, "failed to deserialize message"); - crate::metrics::inc_deser_errors(); - if *deser_failures >= MAX_DESER_FAILURES { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "too many deserialization failures", - )); - } - return Ok(()); - } - }; - - if let Some(service) = routes.get(&env.id) { - let request = ServiceRequest::new(env.payload, env.correlation_id); - match service.call(request).await { - Ok(resp) => { - let parts = PacketParts::new(env.id, resp.correlation_id(), resp.into_inner()) - .inherit_correlation(env.correlation_id); - let correlation_id = parts.correlation_id(); - let response = Envelope::from_parts(parts); - if let Err(e) = self.send_response(stream, &response).await { - tracing::warn!( - id = env.id, - correlation_id = ?correlation_id, - error = ?e, - "failed to send response", - ); - crate::metrics::inc_handler_errors(); - } - } - Err(e) => { - tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, error = ?e, "handler error"); - crate::metrics::inc_handler_errors(); - } - } - } else { - tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, "no handler for message id"); - crate::metrics::inc_handler_errors(); - } - - Ok(()) - } -} diff --git a/src/app/builder.rs b/src/app/builder.rs new file mode 100644 index 00000000..3e18f327 --- /dev/null +++ b/src/app/builder.rs @@ -0,0 +1,402 @@ +//! Application builder configuring routes and middleware. +//! [`WireframeApp`] is an Actix-inspired builder for managing connection +//! state, routing, and middleware in a `WireframeServer`. It exposes +//! convenience methods to register handlers and lifecycle hooks, and +//! serializes messages using a configurable serializer. + +use std::{ + any::{Any, TypeId}, + boxed::Box, + collections::HashMap, + future::Future, + pin::Pin, + sync::Arc, +}; + +use tokio::{ + io, + sync::{OnceCell, mpsc}, +}; + +use super::{ + envelope::{Envelope, Packet}, + error::{Result, WireframeError}, +}; +use crate::{ + hooks::{ProtocolHooks, WireframeProtocol}, + middleware::{HandlerService, Transform}, + serializer::{BincodeSerializer, Serializer}, +}; + +const MIN_BUFFER_CAP: usize = 64; +const MAX_BUFFER_CAP: usize = 16 * 1024 * 1024; +const MIN_READ_TIMEOUT_MS: u64 = 1; +const MAX_READ_TIMEOUT_MS: u64 = 86_400_000; +/// Callback invoked when a connection is established. +/// +/// # Examples +/// +/// ```no_run +/// use std::sync::Arc; +/// +/// use wireframe::app::ConnectionSetup; +/// let setup: Arc> = +/// Arc::new(|| Box::pin(async { String::from("hello") })); +/// ``` +pub type ConnectionSetup = dyn Fn() -> Pin + Send>> + Send + Sync; + +/// Callback invoked when a connection is closed. +/// +/// # Examples +/// +/// ```no_run +/// use std::sync::Arc; +/// +/// use wireframe::app::ConnectionTeardown; +/// let teardown: Arc> = Arc::new(|state| { +/// Box::pin(async move { +/// println!("Dropping {state}"); +/// }) +/// }); +/// ``` +pub type ConnectionTeardown = + dyn Fn(C) -> Pin + Send>> + Send + Sync; +/// Configures routing and middleware for a `WireframeServer`. +/// +/// The builder stores registered routes and middleware without enforcing an +/// ordering. Methods return [`Result`] so registrations can be chained +/// ergonomically. +pub struct WireframeApp< + S: Serializer + Send + Sync = BincodeSerializer, + C: Send + 'static = (), + E: Packet = Envelope, +> { + pub(super) handlers: HashMap>, + pub(super) routes: OnceCell>>>, + pub(super) middleware: Vec>>, + #[expect( + dead_code, + reason = "Deprecated: retained temporarily for API compatibility until codec-based \ + framing is fully removed" + )] + #[allow(unfulfilled_lint_expectations)] + pub(super) frame_processor: + Box, Error = io::Error> + Send + Sync>, + pub(super) serializer: S, + pub(super) app_data: HashMap>, + pub(super) on_connect: Option>>, + pub(super) on_disconnect: Option>>, + pub(super) protocol: Option, ProtocolError = ()>>>, + pub(super) push_dlq: Option>>, + pub(super) buffer_capacity: usize, + pub(super) read_timeout_ms: u64, +} + +/// Alias for asynchronous route handlers. +/// +/// A `Handler` wraps an `Arc` to a function returning a [`Future`]. +pub type Handler = Arc Pin + Send>> + Send + Sync>; + +/// Trait representing middleware components. +pub trait Middleware: + Transform, Output = HandlerService> + Send + Sync +{ +} + +impl Middleware for T where + T: Transform, Output = HandlerService> + Send + Sync +{ +} + +impl Default for WireframeApp +where + S: Serializer + Default + Send + Sync, + C: Send + 'static, + E: Packet, +{ + /// Initializes empty routes, middleware, and application data with a + /// placeholder frame processor and serializer, and no lifecycle hooks. + fn default() -> Self { + Self { + handlers: HashMap::new(), + routes: OnceCell::new(), + middleware: Vec::new(), + frame_processor: Box::new(crate::frame::LengthPrefixedProcessor::new( + crate::frame::LengthFormat::default(), + )), + serializer: S::default(), + app_data: HashMap::new(), + on_connect: None, + on_disconnect: None, + protocol: None, + push_dlq: None, + buffer_capacity: 1024, + read_timeout_ms: 100, + } + } +} + +impl WireframeApp +where + S: Serializer + Default + Send + Sync, + C: Send + 'static, + E: Packet, +{ + /// Construct a new empty application builder. + /// + /// # Errors + /// + /// This function currently never returns an error but uses [`Result`] for + /// forward compatibility. + /// + /// # Examples + /// + /// ``` + /// use wireframe::app::WireframeApp; + /// WireframeApp::<_, _, wireframe::app::Envelope>::new().expect("failed to initialize app"); + /// ``` + pub fn new() -> Result { Ok(Self::default()) } + + /// Construct a new application builder using a custom envelope type. + /// + /// Deprecated: call [`WireframeApp::new`] with explicit envelope type + /// parameters. + /// + /// # Errors + /// + /// Currently always succeeds. + #[deprecated(note = "use `WireframeApp::new()` instead")] + pub fn new_with_envelope() -> Result { Self::new() } +} + +impl WireframeApp +where + S: Serializer + Default + Send + Sync, + C: Send + 'static, + E: Packet, +{ + /// Construct a new application builder using the provided serializer. + /// + /// # Errors + /// + /// This function currently never returns an error but uses [`Result`] for + /// forward compatibility. + pub fn with_serializer(serializer: S) -> Result { + Ok(Self { + serializer, + ..Self::default() + }) + } + + /// Register a route that maps `id` to `handler`. + /// + /// # Errors + /// + /// Returns [`WireframeError::DuplicateRoute`] if a handler for `id` + /// has already been registered. + pub fn route(mut self, id: u32, handler: Handler) -> Result { + if self.handlers.contains_key(&id) { + return Err(WireframeError::DuplicateRoute(id)); + } + self.handlers.insert(id, handler); + self.routes = OnceCell::new(); + Ok(self) + } + + /// Store a shared state value accessible to request extractors. + /// + /// The value can later be retrieved using [`crate::extractor::SharedState`]. Registering + /// another value of the same type overwrites the previous one. + #[must_use] + pub fn app_data(mut self, state: T) -> Self + where + T: Send + Sync + 'static, + { + self.app_data.insert( + TypeId::of::(), + Arc::new(state) as Arc, + ); + self + } + + /// Add a middleware component to the processing pipeline. + /// + /// # Errors + /// + /// This function currently always succeeds. + pub fn wrap(mut self, mw: M) -> Result + where + M: Middleware + 'static, + { + self.middleware.push(Box::new(mw)); + self.routes = OnceCell::new(); + Ok(self) + } + + /// Register a callback invoked when a new connection is established. + /// + /// The callback can perform authentication or other setup tasks and + /// returns connection-specific state stored for the connection's + /// lifetime. + /// + /// # Type Parameters + /// + /// This method changes the connection state type parameter from `C` to `C2`. + /// This means that any subsequent builder methods will operate on the new connection state type + /// `C2`. Be aware of this type transition when chaining builder methods. + /// + /// # Errors + /// + /// This function always succeeds currently but uses [`Result`] for + /// consistency with other builder methods. + pub fn on_connection_setup(self, f: F) -> Result> + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + C2: Send + 'static, + { + Ok(WireframeApp { + handlers: self.handlers, + routes: OnceCell::new(), + middleware: self.middleware, + frame_processor: self.frame_processor, + serializer: self.serializer, + app_data: self.app_data, + on_connect: Some(Arc::new(move || Box::pin(f()))), + on_disconnect: None, + protocol: self.protocol, + push_dlq: self.push_dlq, + buffer_capacity: self.buffer_capacity, + read_timeout_ms: self.read_timeout_ms, + }) + } + + /// Register a callback invoked when a connection is closed. + /// + /// The callback receives the connection state produced by + /// [`on_connection_setup`](Self::on_connection_setup). + /// + /// # Errors + /// + /// This function always succeeds currently but uses [`Result`] for + /// consistency with other builder methods. + pub fn on_connection_teardown(mut self, f: F) -> Result + where + F: Fn(C) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + { + self.on_disconnect = Some(Arc::new(move |c| Box::pin(f(c)))); + Ok(self) + } + + /// Install a [`WireframeProtocol`] implementation. + /// + /// The protocol defines hooks for connection setup, frame modification, and + /// command completion. It is wrapped in an [`Arc`] and stored for later use + /// by the connection actor. + #[must_use] + pub fn with_protocol

(self, protocol: P) -> Self + where + P: WireframeProtocol, ProtocolError = ()> + 'static, + { + WireframeApp { + protocol: Some(Arc::new(protocol)), + ..self + } + } + + /// Configure a Dead Letter Queue for dropped push frames. + /// + /// ```rust,no_run + /// use tokio::sync::mpsc; + /// use wireframe::app::WireframeApp; + /// + /// # fn build() -> WireframeApp { + /// # WireframeApp::new().expect("builder creation should not fail") + /// # } + /// # fn main() { + /// let (tx, _rx) = mpsc::channel(16); + /// let app = build().with_push_dlq(tx); + /// # let _ = app; + /// # } + /// ``` + #[must_use] + pub fn with_push_dlq(self, dlq: mpsc::Sender>) -> Self { + WireframeApp { + push_dlq: Some(dlq), + ..self + } + } + + /// Get a clone of the configured protocol, if any. + /// + /// Returns `None` if no protocol was installed via [`with_protocol`](Self::with_protocol). + #[must_use] + pub fn protocol( + &self, + ) -> Option, ProtocolError = ()>>> { + self.protocol.clone() + } + + /// Return protocol hooks derived from the installed protocol. + /// + /// If no protocol is installed, returns default (no-op) hooks. + #[must_use] + pub fn protocol_hooks(&self) -> ProtocolHooks, ()> { + self.protocol + .as_ref() + .map(ProtocolHooks::from_protocol) + .unwrap_or_default() + } + + /// Set the frame processor used for encoding and decoding frames. + #[deprecated(note = "framing is handled by the connection codec; this method will be removed")] + #[must_use] + pub fn frame_processor

(self, processor: P) -> Self + where + P: crate::frame::FrameProcessor, Error = io::Error> + Send + Sync + 'static, + { + WireframeApp { + frame_processor: Box::new(processor), + ..self + } + } + + /// Replace the serializer used for messages. + #[must_use] + pub fn serializer(self, serializer: Ser) -> WireframeApp + where + Ser: Serializer + Send + Sync, + { + WireframeApp { + handlers: self.handlers, + routes: OnceCell::new(), + middleware: self.middleware, + frame_processor: self.frame_processor, + serializer, + app_data: self.app_data, + on_connect: self.on_connect, + on_disconnect: self.on_disconnect, + protocol: self.protocol, + push_dlq: self.push_dlq, + buffer_capacity: self.buffer_capacity, + read_timeout_ms: self.read_timeout_ms, + } + } + + /// Set the initial buffer capacity for framed reads. + /// Clamped between 64 bytes and 16 MiB. + #[must_use] + pub fn buffer_capacity(mut self, capacity: usize) -> Self { + self.buffer_capacity = capacity.clamp(MIN_BUFFER_CAP, MAX_BUFFER_CAP); + self + } + + /// Configure the read timeout in milliseconds. + /// Clamped between 1 and 86 400 000 milliseconds (24 h). + #[must_use] + pub fn read_timeout_ms(mut self, timeout_ms: u64) -> Self { + self.read_timeout_ms = timeout_ms.clamp(MIN_READ_TIMEOUT_MS, MAX_READ_TIMEOUT_MS); + self + } +} diff --git a/src/app/connection.rs b/src/app/connection.rs new file mode 100644 index 00000000..04947571 --- /dev/null +++ b/src/app/connection.rs @@ -0,0 +1,272 @@ +//! Connection handling and response utilities for `WireframeApp`. + +use std::{collections::HashMap, sync::Arc}; + +use bytes::BytesMut; +use futures::{SinkExt, StreamExt}; +use tokio::{ + io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}, + time::{Duration, timeout}, +}; +use tokio_util::codec::{Encoder, Framed, LengthDelimitedCodec}; + +use super::{ + builder::WireframeApp, + envelope::{Envelope, Packet, PacketParts}, + error::SendError, +}; +use crate::{ + frame::FrameMetadata, + message::Message, + middleware::{HandlerService, Service, ServiceRequest}, + serializer::Serializer, +}; + +/// Maximum consecutive deserialization failures before closing a connection. +const MAX_DESER_FAILURES: u32 = 10; + +#[derive(Debug)] +enum EnvelopeDecodeError { + Parse(E), + Deserialize(Box), +} + +impl WireframeApp +where + S: Serializer + Send + Sync, + C: Send + 'static, + E: Packet, +{ + /// Serialize `msg` and write it to `stream` using a length-delimited codec. + /// + /// # Errors + /// + /// Returns a [`SendError`] if serialization or writing fails. + pub async fn send_response( + &self, + stream: &mut W, + msg: &M, + ) -> std::result::Result<(), SendError> + where + W: AsyncWrite + Unpin, + M: Message, + { + let bytes = self + .serializer + .serialize(msg) + .map_err(SendError::Serialize)?; + let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut framed = BytesMut::new(); + codec + .encode(bytes.into(), &mut framed) + .map_err(|e| SendError::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?; + stream.write_all(&framed).await.map_err(SendError::Io)?; + stream.flush().await.map_err(SendError::Io) + } + + /// Serialize `msg` and send it through an existing framed stream. + /// + /// # Errors + /// + /// Returns a [`SendError`] if serialization or sending fails. + pub async fn send_response_framed( + &self, + framed: &mut Framed, + msg: &M, + ) -> std::result::Result<(), SendError> + where + W: AsyncRead + AsyncWrite + Unpin, + M: Message, + { + let bytes = self + .serializer + .serialize(msg) + .map_err(SendError::Serialize)?; + framed.send(bytes.into()).await.map_err(SendError::Io) + } +} + +impl WireframeApp +where + S: Serializer + FrameMetadata + Send + Sync, + C: Send + 'static, + E: Packet, +{ + /// Try parsing the frame using [`FrameMetadata::parse`], falling back to + /// full deserialization on failure. + fn parse_envelope( + &self, + frame: &[u8], + ) -> std::result::Result<(Envelope, usize), EnvelopeDecodeError> { + self.serializer + .parse(frame) + .map_err(EnvelopeDecodeError::Parse) + .or_else(|_| { + self.serializer + .deserialize::(frame) + .map_err(EnvelopeDecodeError::Deserialize) + }) + } + + /// Handle an accepted connection end-to-end. + /// + /// Runs optional connection setup to produce per-connection state, + /// initializes (and caches) route chains, processes the framed stream + /// with per-frame timeouts, and finally runs optional teardown. + pub async fn handle_connection(&self, stream: W) + where + W: AsyncRead + AsyncWrite + Send + Unpin + 'static, + { + let state = if let Some(setup) = &self.on_connect { + Some((setup)().await) + } else { + None + }; + + let routes = self + .routes + .get_or_init(|| async { Arc::new(self.build_chains().await) }) + .await + .clone(); + + if let Err(e) = self.process_stream(stream, &routes).await { + tracing::warn!(correlation_id = ?None::, error = ?e, "connection terminated with error"); + } + + if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { + teardown(state).await; + } + } + + async fn build_chains(&self) -> HashMap> { + let mut routes = HashMap::new(); + for (&id, handler) in &self.handlers { + let mut service = HandlerService::new(id, handler.clone()); + for mw in self.middleware.iter().rev() { + service = mw.transform(service).await; + } + routes.insert(id, service); + } + routes + } + + async fn process_stream( + &self, + stream: W, + routes: &Arc>>, + ) -> io::Result<()> + where + W: AsyncRead + AsyncWrite + Unpin, + { + let codec = LengthDelimitedCodec::builder().new_codec(); + let mut framed = Framed::new(stream, codec); + framed.read_buffer_mut().reserve(self.buffer_capacity); + let mut deser_failures = 0u32; + let timeout_dur = Duration::from_millis(self.read_timeout_ms); + + loop { + match timeout(timeout_dur, framed.next()).await { + Ok(Some(Ok(buf))) => { + self.handle_frame(&mut framed, buf.as_ref(), &mut deser_failures, routes) + .await?; + } + Ok(Some(Err(e))) => return Err(e), + Ok(None) => break, + Err(_) => { + tracing::debug!("read timeout elapsed; continuing to wait for next frame"); + } + } + } + + Ok(()) + } + + async fn handle_frame( + &self, + framed: &mut Framed, + frame: &[u8], + deser_failures: &mut u32, + routes: &HashMap>, + ) -> io::Result<()> + where + W: AsyncRead + AsyncWrite + Unpin, + { + crate::metrics::inc_frames(crate::metrics::Direction::Inbound); + let (env, _) = match self.parse_envelope(frame) { + Ok(result) => { + *deser_failures = 0; + result + } + Err(EnvelopeDecodeError::Parse(e)) => { + *deser_failures += 1; + tracing::warn!(correlation_id = ?None::, error = ?e, "failed to parse message"); + crate::metrics::inc_deser_errors(); + if *deser_failures >= MAX_DESER_FAILURES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "too many deserialization failures", + )); + } + return Ok(()); + } + Err(EnvelopeDecodeError::Deserialize(e)) => { + *deser_failures += 1; + tracing::warn!(correlation_id = ?None::, error = ?e, "failed to deserialize message"); + crate::metrics::inc_deser_errors(); + if *deser_failures >= MAX_DESER_FAILURES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "too many deserialization failures", + )); + } + return Ok(()); + } + }; + + if let Some(service) = routes.get(&env.id) { + let request = ServiceRequest::new(env.payload, env.correlation_id); + match service.call(request).await { + Ok(resp) => { + let parts = PacketParts::new(env.id, resp.correlation_id(), resp.into_inner()) + .inherit_correlation(env.correlation_id); + let correlation_id = parts.correlation_id(); + let response = Envelope::from_parts(parts); + match self.serializer.serialize(&response) { + Ok(bytes) => { + if let Err(e) = framed.send(bytes.into()).await { + tracing::warn!( + id = env.id, + correlation_id = ?correlation_id, + error = ?e, + "failed to send response", + ); + crate::metrics::inc_handler_errors(); + } + } + Err(e) => { + tracing::warn!( + id = env.id, + correlation_id = ?correlation_id, + error = ?e, + "failed to serialize response", + ); + crate::metrics::inc_handler_errors(); + } + } + } + Err(e) => { + tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, error = ?e, "handler error"); + crate::metrics::inc_handler_errors(); + } + } + } else { + tracing::warn!( + id = env.id, + correlation_id = ?env.correlation_id, + "no handler for message id" + ); + } + + Ok(()) + } +} diff --git a/src/app/envelope.rs b/src/app/envelope.rs new file mode 100644 index 00000000..f138769e --- /dev/null +++ b/src/app/envelope.rs @@ -0,0 +1,173 @@ +//! Packet abstraction and envelope types. +//! +//! These types decouple serialisation from routing by wrapping raw payloads in +//! identifiers understood by [`crate::app::WireframeApp`]. This allows the +//! builder (`crate::app::WireframeApp`) to route frames before full +//! deserialisation. See [`crate::app::builder::WireframeApp`] for how envelopes +//! are used when registering routes. + +use crate::message::Message; + +/// Envelope-like type used to wrap incoming and outgoing messages. +/// +/// Custom envelope types must implement this trait so [`WireframeApp`] can +/// route messages and construct responses. +/// +/// # Example +/// +/// ``` +/// use wireframe::{ +/// app::{Packet, PacketParts}, +/// message::Message, +/// }; +/// +/// #[derive(bincode::Decode, bincode::Encode)] +/// struct CustomEnvelope { +/// id: u32, +/// payload: Vec, +/// timestamp: u64, +/// } +/// +/// impl Packet for CustomEnvelope { +/// fn id(&self) -> u32 { self.id } +/// +/// fn correlation_id(&self) -> Option { None } +/// +/// fn into_parts(self) -> PacketParts { PacketParts::new(self.id, None, self.payload) } +/// +/// fn from_parts(parts: PacketParts) -> Self { +/// Self { +/// id: parts.id(), +/// payload: parts.payload(), +/// timestamp: 0, +/// } +/// } +/// } +/// ``` +pub trait Packet: Message + Send + Sync + 'static { + /// Return the message identifier used for routing. + fn id(&self) -> u32; + + /// Return the correlation identifier tying this frame to a request. + fn correlation_id(&self) -> Option; + + /// Consume the packet and return its identifier, correlation id and payload bytes. + fn into_parts(self) -> PacketParts; + + /// Construct a new packet from raw parts. + fn from_parts(parts: PacketParts) -> Self; +} + +/// Component values extracted from or used to build a [`Packet`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PacketParts { + id: u32, + correlation_id: Option, + payload: Vec, +} + +/// Basic envelope type used by [`WireframeApp::handle_connection`]. +/// +/// Incoming frames are deserialised into an `Envelope` containing the +/// message identifier and raw payload bytes. +#[derive(bincode::Decode, bincode::Encode, Debug, Clone, PartialEq, Eq)] +pub struct Envelope { + pub(crate) id: u32, + pub(crate) correlation_id: Option, + pub(crate) payload: Vec, +} + +impl Envelope { + /// Create a new [`Envelope`] with the provided identifiers and payload. + #[must_use] + pub fn new(id: u32, correlation_id: Option, payload: Vec) -> Self { + Self { + id, + correlation_id, + payload, + } + } +} + +impl Packet for Envelope { + #[inline] + fn id(&self) -> u32 { self.id } + + #[inline] + fn correlation_id(&self) -> Option { self.correlation_id } + + fn into_parts(self) -> PacketParts { self.into() } + + fn from_parts(parts: PacketParts) -> Self { parts.into() } +} + +impl PacketParts { + /// Construct a new set of packet parts. + #[must_use] + pub fn new(id: u32, correlation_id: Option, payload: Vec) -> Self { + Self { + id, + correlation_id, + payload, + } + } + + #[must_use] + pub const fn id(&self) -> u32 { self.id } + + #[must_use] + pub const fn correlation_id(&self) -> Option { self.correlation_id } + + #[must_use] + pub fn payload(self) -> Vec { self.payload } + + /// Ensure a correlation identifier is present, inheriting from `source` if missing. + /// + /// # Examples + /// ``` + /// use wireframe::app::PacketParts; + /// // Inherit when missing + /// let parts = PacketParts::new(1, None, vec![]).inherit_correlation(Some(42)); + /// assert_eq!(parts.correlation_id(), Some(42)); + /// + /// // Overwrite mismatched value + /// let parts = PacketParts::new(1, Some(7), vec![]).inherit_correlation(Some(8)); + /// assert_eq!(parts.correlation_id(), Some(8)); + /// ``` + #[must_use] + pub fn inherit_correlation(mut self, source: Option) -> Self { + let (next, mismatched) = Self::select_correlation(self.correlation_id, source); + if mismatched && let (Some(found), Some(expected)) = (self.correlation_id, next) { + tracing::warn!( + id = self.id, + expected, + found, + "mismatched correlation id in response", + ); + } + self.correlation_id = next; + self + } + + #[inline] + fn select_correlation(current: Option, source: Option) -> (Option, bool) { + match (current, source) { + (None, cid) => (cid, false), + (Some(cid), Some(src)) if cid != src => (Some(src), true), + (curr, _) => (curr, false), + } + } +} + +impl From for PacketParts { + fn from(e: Envelope) -> Self { PacketParts::new(e.id, e.correlation_id, e.payload) } +} + +impl From for Envelope { + fn from(p: PacketParts) -> Self { + let id = p.id(); + let correlation_id = p.correlation_id(); + let payload = p.payload(); + Envelope::new(id, correlation_id, payload) + } +} diff --git a/src/app/error.rs b/src/app/error.rs new file mode 100644 index 00000000..10dc337c --- /dev/null +++ b/src/app/error.rs @@ -0,0 +1,29 @@ +//! Error types for application setup and messaging. + +use std::io; + +use thiserror::Error; + +/// Top-level error type for application setup. +#[derive(Debug, Error, PartialEq, Eq)] +#[non_exhaustive] +pub enum WireframeError { + /// A route with the provided identifier was already registered. + #[error("route id {0} was already registered")] + DuplicateRoute(u32), +} + +/// Errors produced when sending a handler response over a stream. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum SendError { + /// Serialisation failed. + #[error("serialisation error: {0}")] + Serialize(#[source] Box), + /// Writing to the stream failed. + #[error("I/O error: {0}")] + Io(#[from] io::Error), +} + +/// Result type used throughout the builder API. +pub type Result = std::result::Result; diff --git a/src/app/mod.rs b/src/app/mod.rs new file mode 100644 index 00000000..02268ceb --- /dev/null +++ b/src/app/mod.rs @@ -0,0 +1,18 @@ +//! Application builder and supporting types. +//! +//! Re-exports: +//! - [`WireframeApp`] and builder traits ([`Handler`], [`Middleware`], [`ConnectionSetup`], +//! [`ConnectionTeardown`]) +//! - Envelope primitives ([`Envelope`], [`Packet`], [`PacketParts`]) +//! - Error handling types ([`WireframeError`], [`SendError`], [`Result`]) +//! +//! See the `examples/` directory for end-to-end usage. + +mod builder; +mod connection; +mod envelope; +pub mod error; + +pub use builder::{ConnectionSetup, ConnectionTeardown, Handler, Middleware, WireframeApp}; +pub use envelope::{Envelope, Packet, PacketParts}; +pub use error::{Result, SendError, WireframeError}; diff --git a/src/lib.rs b/src/lib.rs index 2ae0d074..368302d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,9 @@ //! servers, including routing, middleware, and connection utilities. pub mod app; +/// Result type alias re-exported for convenience when working with the +/// application builder. +pub use app::error::Result; pub mod serializer; pub use serializer::{BincodeSerializer, Serializer}; pub mod connection; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4405b3dc..16c83285 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,19 +7,24 @@ use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener}; /// Create a TCP listener bound to a free local port. +#[expect(dead_code, reason = "Used by tests that bind to random ports")] +#[allow(unfulfilled_lint_expectations)] pub fn unused_listener() -> StdTcpListener { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); StdTcpListener::bind(addr).expect("failed to bind port") } use rstest::fixture; -use wireframe::app::WireframeApp; +use wireframe::{app::Envelope, serializer::BincodeSerializer}; + +pub type TestApp = wireframe::app::WireframeApp; #[fixture] -#[allow( +#[expect( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -pub fn factory() -> impl Fn() -> WireframeApp + Send + Sync + Clone + 'static { - || WireframeApp::new().expect("WireframeApp::new failed") +#[allow(unfulfilled_lint_expectations)] +pub fn factory() -> impl Fn() -> TestApp + Send + Sync + Clone + 'static { + || TestApp::new().expect("TestApp::new failed") } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 6a00242f..709a6e14 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -13,12 +13,15 @@ use std::{ use bytes::BytesMut; use wireframe::{ - app::{Envelope, Packet, PacketParts, WireframeApp}, + app::{Envelope, Packet, PacketParts}, frame::FrameProcessor, serializer::{BincodeSerializer, Serializer}, }; use wireframe_testing::{processor, run_app, run_with_duplex_server}; +type App = wireframe::app::WireframeApp; +type BasicApp = wireframe::app::WireframeApp; + fn call_counting_callback( counter: &Arc, result: R, @@ -42,14 +45,14 @@ fn wireframe_app_with_lifecycle_callbacks( setup: &Arc, teardown: &Arc, state: u32, -) -> WireframeApp +) -> App where E: Packet, { let setup_cb = call_counting_callback(setup, state); let teardown_cb = call_counting_callback(teardown, ()); - WireframeApp::<_, _, E>::new() + App::::new() .expect("failed to create app") .on_connection_setup(move || setup_cb(())) .expect("setup callback") @@ -74,7 +77,7 @@ async fn setup_without_teardown_runs() { let setup_count = Arc::new(AtomicUsize::new(0)); let cb = call_counting_callback(&setup_count, ()); - let app = WireframeApp::<_, _, Envelope>::new() + let app = BasicApp::new() .expect("failed to create app") .on_connection_setup(move || cb(())) .expect("setup callback"); @@ -89,7 +92,7 @@ async fn teardown_without_setup_does_not_run() { let teardown_count = Arc::new(AtomicUsize::new(0)); let cb = call_counting_callback(&teardown_count, ()); - let app = WireframeApp::<_, _, Envelope>::new() + let app = BasicApp::new() .expect("failed to create app") .on_connection_teardown(cb) .expect("teardown callback"); @@ -128,12 +131,11 @@ impl Packet for StateEnvelope { } #[tokio::test] -async fn helpers_propagate_connection_state() { +async fn helpers_preserve_correlation_id_and_run_callbacks() { let setup = Arc::new(AtomicUsize::new(0)); let teardown = Arc::new(AtomicUsize::new(0)); let app = wireframe_app_with_lifecycle_callbacks::(&setup, &teardown, 7) - .frame_processor(processor()) .route(1, Arc::new(|_: &StateEnvelope| Box::pin(async {}))) .expect("route registration failed"); diff --git a/tests/metadata.rs b/tests/metadata.rs index 45efcecb..c30b4a47 100644 --- a/tests/metadata.rs +++ b/tests/metadata.rs @@ -8,24 +8,25 @@ use std::sync::{ }; use wireframe::{ - app::{Envelope, WireframeApp}, - frame::{FrameMetadata, LengthPrefixedProcessor}, + app::Envelope, + frame::FrameMetadata, serializer::{BincodeSerializer, Serializer}, }; use wireframe_testing::{TestSerializer, drive_with_bincode}; -fn mock_wireframe_app_with_serializer(serializer: S) -> WireframeApp +type TestApp = wireframe::app::WireframeApp; + +fn mock_wireframe_app_with_serializer(serializer: S) -> TestApp where - S: TestSerializer, + S: TestSerializer + Default, { - WireframeApp::new() + wireframe::app::WireframeApp::::with_serializer(serializer) .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) - .serializer(serializer) .route(1, Arc::new(|_| Box::pin(async {}))) .expect("route registration failed") } +#[derive(Default)] struct CountingSerializer(Arc); impl Serializer for CountingSerializer { @@ -49,7 +50,7 @@ impl FrameMetadata for CountingSerializer { type Error = bincode::error::DecodeError; fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error> { - self.0.fetch_add(1, Ordering::SeqCst); + self.0.fetch_add(1, Ordering::Relaxed); BincodeSerializer.parse(src) } } @@ -66,9 +67,10 @@ async fn metadata_parser_invoked_before_deserialize() { .await .expect("drive_with_bincode failed"); assert!(!out.is_empty()); - assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(counter.load(Ordering::Relaxed), 1); } +#[derive(Default)] struct FallbackSerializer(Arc, Arc); impl Serializer for FallbackSerializer { @@ -83,7 +85,7 @@ impl Serializer for FallbackSerializer { &self, bytes: &[u8], ) -> Result<(M, usize), Box> { - self.1.fetch_add(1, Ordering::SeqCst); + self.1.fetch_add(1, Ordering::Relaxed); BincodeSerializer.deserialize(bytes) } } @@ -93,7 +95,7 @@ impl FrameMetadata for FallbackSerializer { type Error = bincode::error::DecodeError; fn parse(&self, _src: &[u8]) -> Result<(Self::Frame, usize), Self::Error> { - self.0.fetch_add(1, Ordering::SeqCst); + self.0.fetch_add(1, Ordering::Relaxed); Err(bincode::error::DecodeError::OtherString("fail".into())) } } @@ -111,6 +113,6 @@ async fn falls_back_to_deserialize_after_parse_error() { .await .expect("drive_with_bincode failed"); assert!(!out.is_empty()); - assert_eq!(parse_calls.load(Ordering::SeqCst), 1); - assert_eq!(deser_calls.load(Ordering::SeqCst), 1); + assert_eq!(parse_calls.load(Ordering::Relaxed), 1); + assert_eq!(deser_calls.load(Ordering::Relaxed), 1); } diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 6b66f55b..f823d467 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -6,12 +6,14 @@ use async_trait::async_trait; use bytes::BytesMut; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use wireframe::{ - app::{Envelope, Handler, WireframeApp}, + app::{Envelope, Handler}, frame::{FrameProcessor, LengthPrefixedProcessor}, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, }; +type TestApp = wireframe::app::WireframeApp; + struct TagMiddleware(u8); struct TagService { @@ -53,7 +55,7 @@ impl Transform> for TagMiddleware { #[tokio::test] async fn middleware_applied_in_reverse_order() { let handler: Handler = std::sync::Arc::new(|_env: &Envelope| Box::pin(async {})); - let app = WireframeApp::new() + let app = TestApp::new() .expect("failed to create app") .route(1, handler) .expect("route registration failed") diff --git a/tests/response.rs b/tests/response.rs index 60b9de04..41d484c4 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -12,6 +12,9 @@ use wireframe::{ serializer::BincodeSerializer, }; +mod common; +use common::TestApp; + #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestResp(u32); @@ -35,14 +38,11 @@ impl<'de> bincode::BorrowDecode<'de, ()> for FailingResp { } } +/// Tests that sending a response serializes and frames the data correctly, +/// and that the response can be decoded and deserialized back to its original value asynchronously. #[tokio::test] -/// Tests that sending a response serialises and frames the data correctly, -/// and that the response can be decoded and deserialised back to its original value asynchronously. async fn send_response_encodes_and_frames() { - let app = WireframeApp::<_, _, Envelope>::new() - .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) - .serializer(BincodeSerializer); + let app = TestApp::new().expect("failed to create app"); let mut out = Vec::new(); app.send_response(&mut out, &TestResp(7)) @@ -59,11 +59,11 @@ async fn send_response_encodes_and_frames() { assert_eq!(decoded, TestResp(7)); } -#[tokio::test] /// Tests that decoding with an incomplete length prefix header returns `None` and does not consume /// any bytes from the buffer. /// /// This ensures that the decoder waits for the full header before attempting to decode a frame. +#[tokio::test] async fn length_prefixed_decode_requires_complete_header() { let processor = LengthPrefixedProcessor::default(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes @@ -71,11 +71,11 @@ async fn length_prefixed_decode_requires_complete_header() { assert_eq!(buf.len(), 3); // nothing consumed } -#[tokio::test] /// Tests that decoding with a complete length prefix but incomplete frame data returns `None` /// and retains all bytes in the buffer. /// /// Ensures that the decoder does not consume any bytes when the full frame is not yet available. +#[tokio::test] async fn length_prefixed_decode_requires_full_frame() { let processor = LengthPrefixedProcessor::default(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); @@ -131,9 +131,7 @@ fn custom_length_roundtrip( #[tokio::test] async fn send_response_propagates_write_error() { - let app = WireframeApp::<_, _, Envelope>::new() - .expect("app creation failed") - .frame_processor(LengthPrefixedProcessor::default()); + let app = TestApp::new().expect("app creation failed"); let mut writer = FailingWriter; let err = app @@ -184,13 +182,14 @@ fn encode_fails_for_length_too_large(#[case] fmt: LengthFormat, #[case] len: usi assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } -#[tokio::test] /// Tests that `send_response` returns a serialization error when encoding fails. /// /// This test sends a `FailingResp` using `send_response` and asserts that the resulting /// error is of the `Serialize` variant, indicating a failure during response encoding. +#[tokio::test] async fn send_response_returns_encode_error() { - let app = WireframeApp::<_, _, Envelope>::new().expect("failed to create app"); + // Intentionally do not set a frame processor: encode should fail before framing. + let app = WireframeApp::::new().expect("failed to create app"); let err = app .send_response(&mut Vec::new(), &FailingResp) .await diff --git a/tests/routes.rs b/tests/routes.rs index e57a3b63..aafc793b 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -11,13 +11,15 @@ use bytes::BytesMut; use rstest::rstest; use wireframe::{ Serializer, - app::{Packet, PacketParts, WireframeApp}, + app::{Packet, PacketParts}, frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::BincodeSerializer, }; use wireframe_testing::{drive_with_bincode, drive_with_frames}; +type TestApp = wireframe::app::WireframeApp; + #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug, Clone)] struct TestEnvelope { id: u32, @@ -56,9 +58,8 @@ struct Echo(u8); async fn handler_receives_message_and_echoes_response() { let called = Arc::new(AtomicUsize::new(0)); let called_clone = called.clone(); - let app = WireframeApp::<_, _, TestEnvelope>::new() + let app = TestApp::new() .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) .route( 1, std::sync::Arc::new(move |_: &TestEnvelope| { @@ -97,9 +98,8 @@ async fn handler_receives_message_and_echoes_response() { #[tokio::test] async fn handler_echoes_with_none_correlation_id() { - let app = WireframeApp::<_, _, TestEnvelope>::new() + let app = TestApp::new() .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) .route( 1, std::sync::Arc::new(|_: &TestEnvelope| Box::pin(async {})), @@ -130,9 +130,8 @@ async fn handler_echoes_with_none_correlation_id() { #[tokio::test] async fn multiple_frames_processed_in_sequence() { - let app = WireframeApp::<_, _, TestEnvelope>::new() + let app = TestApp::new() .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) .route( 1, std::sync::Arc::new(|_: &TestEnvelope| Box::pin(async {})), @@ -191,9 +190,8 @@ async fn multiple_frames_processed_in_sequence() { #[case(Some(2))] #[tokio::test] async fn single_frame_propagates_correlation_id(#[case] cid: Option) { - let app = WireframeApp::<_, _, TestEnvelope>::new() + let app = TestApp::new() .expect("failed to create app") - .frame_processor(LengthPrefixedProcessor::default()) .route( 1, std::sync::Arc::new(|_: &TestEnvelope| Box::pin(async {})), diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs index 774cb79e..72fbaa85 100644 --- a/tests/wireframe_protocol.rs +++ b/tests/wireframe_protocol.rs @@ -16,11 +16,14 @@ use tokio_util::sync::CancellationToken; use wireframe::{ ConnectionContext, WireframeProtocol, - app::{Envelope, WireframeApp}, + app::Envelope, connection::ConnectionActor, push::PushQueues, + serializer::BincodeSerializer, }; +type TestApp = wireframe::app::WireframeApp; + struct TestProtocol { counter: Arc, } @@ -51,7 +54,7 @@ async fn builder_produces_protocol_hooks() { let protocol = TestProtocol { counter: counter.clone(), }; - let app = WireframeApp::<_, _, Envelope>::new() + let app = TestApp::new() .expect("failed to create app") .with_protocol(protocol); let mut hooks = app.protocol_hooks(); @@ -75,7 +78,7 @@ async fn connection_actor_uses_protocol_from_builder() { let protocol = TestProtocol { counter: counter.clone(), }; - let app = WireframeApp::<_, _, Envelope>::new() + let app = TestApp::new() .expect("failed to create app") .with_protocol(protocol); diff --git a/tests/world.rs b/tests/world.rs index 9162a876..c36ab604 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -10,14 +10,17 @@ use cucumber::World; use tokio::{net::TcpStream, sync::oneshot}; use tokio_util::sync::CancellationToken; use wireframe::{ - app::{Envelope, Packet, WireframeApp}, + app::{Envelope, Packet}, connection::ConnectionActor, hooks::ProtocolHooks, push::PushQueues, response::FrameStream, + serializer::BincodeSerializer, server::WireframeServer, }; +type TestApp = wireframe::app::WireframeApp; + #[path = "common/mod.rs"] mod common; use common::unused_listener; @@ -35,7 +38,7 @@ struct PanicServer { impl PanicServer { async fn spawn() -> Self { let factory = || { - WireframeApp::new() + TestApp::new() .expect("Failed to create WireframeApp") .on_connection_setup(|| async { panic!("boom") }) .expect("Failed to set connection setup callback") @@ -70,18 +73,17 @@ impl PanicServer { impl Drop for PanicServer { fn drop(&mut self) { - use std::time::Duration; + use std::{thread, time::Duration}; if let Some(tx) = self.shutdown.take() { let _ = tx.send(()); } let timeout = Duration::from_secs(5); - let joined = futures::executor::block_on(tokio::time::timeout(timeout, &mut self.handle)); - match joined { - Ok(Ok(())) => {} - Ok(Err(e)) => eprintln!("PanicServer task panicked: {e:?}"), - Err(_) => eprintln!("PanicServer task did not shut down within timeout"), - } + let handle = self.handle.abort_handle(); + thread::spawn(move || { + thread::sleep(timeout); + handle.abort(); + }); } } diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 27f66fdc..08f232a2 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -3,10 +3,12 @@ //! These functions spin up an application on an in-memory duplex stream and //! collect the bytes written back by the app for assertions. +use std::io; + use bincode::config; use bytes::BytesMut; use rstest::fixture; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; use wireframe::{ app::{Envelope, Packet, WireframeApp}, frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}, @@ -15,10 +17,8 @@ use wireframe::{ /// Create a default length-prefixed frame processor for tests. #[fixture] -#[allow( - unused_braces, - reason = "Clippy is wrong here; this is not a redundant block" -)] +#[expect(unused_braces, reason = "Braces are intentional here; false positive")] +#[allow(unfulfilled_lint_expectations)] pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() } pub trait TestSerializer: @@ -41,12 +41,12 @@ impl TestSerializer for T where /// with `"server task failed"`. /// /// ```rust -/// use tokio::io::{self, AsyncWriteExt, DuplexStream}; +/// use tokio::io::{AsyncWriteExt, DuplexStream}; /// use wireframe_testing::helpers::drive_internal; /// /// async fn echo(mut server: DuplexStream) { let _ = server.write_all(&[1, 2]).await; } /// -/// # async fn demo() -> io::Result<()> { +/// # async fn demo() -> std::io::Result<()> { /// let bytes = drive_internal(echo, vec![vec![0]], 64).await?; /// assert_eq!(bytes, [1, 2]); /// # Ok(()) @@ -97,6 +97,7 @@ where const DEFAULT_CAPACITY: usize = 4096; const MAX_CAPACITY: usize = 1024 * 1024 * 10; // 10MB limit +pub(crate) const EMPTY_SERVER_CAPACITY: usize = 64; macro_rules! forward_default { ( @@ -158,10 +159,10 @@ macro_rules! forward_with_capacity { /// duplex stream. /// /// ```rust -/// # use wireframe_testing::{drive_with_frame, processor}; +/// # use wireframe_testing::drive_with_frame; /// # use wireframe::app::WireframeApp; -/// # async fn demo() -> tokio::io::Result<()> { -/// let app = WireframeApp::new().frame_processor(processor()).unwrap(); +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); /// let bytes = drive_with_frame(app, vec![1, 2, 3]).await?; /// # Ok(()) /// # } @@ -184,10 +185,10 @@ forward_with_capacity! { /// Adjusting the buffer size helps exercise edge cases such as small channels. /// /// ```rust - /// # use wireframe_testing::{drive_with_frame_with_capacity, processor}; + /// # use wireframe_testing::drive_with_frame_with_capacity; /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> tokio::io::Result<()> { - /// let app = WireframeApp::new().frame_processor(processor()).unwrap(); + /// # async fn demo() -> std::io::Result<()> { + /// let app = WireframeApp::new().expect("failed to initialize app"); /// let bytes = drive_with_frame_with_capacity(app, vec![0], 512).await?; /// # Ok(()) /// # } @@ -202,10 +203,10 @@ forward_default! { /// Each frame is written to the duplex stream in order. /// /// ```rust - /// # use wireframe_testing::{drive_with_frames, processor}; + /// # use wireframe_testing::drive_with_frames; /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> tokio::io::Result<()> { - /// let app = WireframeApp::new().frame_processor(processor()).unwrap(); + /// # async fn demo() -> std::io::Result<()> { + /// let app = WireframeApp::new().expect("failed to initialize app"); /// let out = drive_with_frames(app, vec![vec![1], vec![2]]).await?; /// # Ok(()) /// # } @@ -219,10 +220,10 @@ forward_default! { /// This variant exposes the buffer size for fine-grained control in tests. /// /// ```rust -/// # use wireframe_testing::{drive_with_frames_with_capacity, processor}; +/// # use wireframe_testing::drive_with_frames_with_capacity; /// # use wireframe::app::WireframeApp; -/// # async fn demo() -> tokio::io::Result<()> { -/// let app = WireframeApp::new().frame_processor(processor()).unwrap(); +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); /// let out = drive_with_frames_with_capacity(app, vec![vec![1], vec![2]], 1024).await?; /// # Ok(()) /// # } @@ -250,10 +251,10 @@ forward_default! { /// across calls. /// /// ```rust - /// # use wireframe_testing::{drive_with_frame_mut, processor}; + /// # use wireframe_testing::drive_with_frame_mut; /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> tokio::io::Result<()> { - /// let mut app = WireframeApp::new().frame_processor(processor()).unwrap(); + /// # async fn demo() -> std::io::Result<()> { + /// let mut app = WireframeApp::new().expect("failed to initialize app"); /// let bytes = drive_with_frame_mut(&mut app, vec![1]).await?; /// # Ok(()) /// # } @@ -266,10 +267,10 @@ forward_with_capacity! { /// Feed a single frame into `app` using a duplex buffer of `capacity` bytes. /// /// ```rust - /// # use wireframe_testing::{drive_with_frame_with_capacity_mut, processor}; + /// # use wireframe_testing::drive_with_frame_with_capacity_mut; /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> tokio::io::Result<()> { - /// let mut app = WireframeApp::new().frame_processor(processor()).unwrap(); + /// # async fn demo() -> std::io::Result<()> { + /// let mut app = WireframeApp::new().expect("failed to initialize app"); /// let bytes = drive_with_frame_with_capacity_mut(&mut app, vec![1], 256).await?; /// # Ok(()) /// # } @@ -282,10 +283,10 @@ forward_default! { /// Feed multiple frames into a mutable `app`. /// /// ```rust - /// # use wireframe_testing::{drive_with_frames_mut, processor}; + /// # use wireframe_testing::drive_with_frames_mut; /// # use wireframe::app::WireframeApp; - /// # async fn demo() -> tokio::io::Result<()> { - /// let mut app = WireframeApp::new().frame_processor(processor()).unwrap(); + /// # async fn demo() -> std::io::Result<()> { + /// let mut app = WireframeApp::new().expect("failed to initialize app"); /// let out = drive_with_frames_mut(&mut app, vec![vec![1], vec![2]]).await?; /// # Ok(()) /// # } @@ -297,10 +298,10 @@ forward_default! { /// Feed multiple frames into `app` with a duplex buffer of `capacity` bytes. /// /// ```rust -/// # use wireframe_testing::{drive_with_frames_with_capacity_mut, processor}; +/// # use wireframe_testing::drive_with_frames_with_capacity_mut; /// # use wireframe::app::WireframeApp; -/// # async fn demo() -> tokio::io::Result<()> { -/// let mut app = WireframeApp::new().frame_processor(processor()).unwrap(); +/// # async fn demo() -> std::io::Result<()> { +/// let mut app = WireframeApp::new().expect("failed to initialize app"); /// let out = drive_with_frames_with_capacity_mut(&mut app, vec![vec![1], vec![2]], 64).await?; /// # Ok(()) /// # } @@ -326,12 +327,12 @@ where /// Encode `msg` using bincode, frame it and drive `app`. /// /// ```rust -/// # use wireframe_testing::{drive_with_bincode, processor}; +/// # use wireframe_testing::drive_with_bincode; /// # use wireframe::app::WireframeApp; /// #[derive(bincode::Encode)] /// struct Ping(u8); -/// # async fn demo() -> tokio::io::Result<()> { -/// let app = WireframeApp::new().frame_processor(processor()).unwrap(); +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); /// let bytes = drive_with_bincode(app, Ping(1)).await?; /// # Ok(()) /// # } @@ -370,10 +371,10 @@ where /// surfaced as an error. /// /// ```rust -/// # use wireframe_testing::{processor, run_app}; +/// # use wireframe_testing::run_app; /// # use wireframe::app::WireframeApp; -/// # async fn demo() -> tokio::io::Result<()> { -/// let app = WireframeApp::new().frame_processor(processor()).unwrap(); +/// # async fn demo() -> std::io::Result<()> { +/// let app = WireframeApp::new().expect("failed to initialize app"); /// let out = run_app(app, vec![vec![1]], None).await?; /// # Ok(()) /// # } @@ -433,12 +434,11 @@ where /// Panics if `handle_connection` fails. /// /// ```rust -/// # use wireframe_testing::{run_with_duplex_server, processor}; +/// # use wireframe_testing::run_with_duplex_server; /// # use wireframe::app::WireframeApp; /// # async fn demo() { /// let app = WireframeApp::new() -/// .frame_processor(processor()) -/// .unwrap(); +/// .expect("failed to initialize app"); /// run_with_duplex_server(app).await; /// } /// ``` @@ -448,7 +448,7 @@ where C: Send + 'static, E: Packet, { - let (_client, server) = duplex(64); + let (_, server) = duplex(EMPTY_SERVER_CAPACITY); // discard client half app.handle_connection(server).await; }