diff --git a/README.md b/README.md index dcb9f778..89e4503e 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ async fn handle_echo(req: Message) -> WireframeResult WireframeServer::new(|| { WireframeApp::new() - .serialization_format(SerializationFormat::Bincode) + .serializer(BincodeSerializer) .route(MyMessageType::Echo, handle_echo) }) .bind("127.0.0.1:8000")? @@ -78,6 +78,14 @@ WireframeServer::new(|| { This example showcases how derive macros and the framing abstraction simplify a binary protocol server【F:docs/rust-binary-router-library-design.md†L1120-L1150】. +## Response Serialization and Framing + +Handlers can return types implementing the `Responder` trait. These values are +encoded using the application's configured serializer and written +back through the `FrameProcessor`【F:docs/rust-binary-router-library-design.md†L718-L724】. +The included `LengthPrefixedProcessor` illustrates a simple framing strategy +based on a big‑endian length prefix【F:docs/rust-binary-router-library-design.md†L1076-L1117】. + ## Current Limitations Connection processing is not implemented yet. After the optional diff --git a/docs/roadmap.md b/docs/roadmap.md index 372561db..1caea5cb 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -62,7 +62,7 @@ after formatting. Line numbers below refer to that file. user-configured callbacks on decode success or failure. See [preamble-validator](preamble-validator.md). -- [ ] Add response serialization and transmission. Encode handler responses +- [x] Add response serialization and transmission. Encode handler responses using the selected serialization format and write them back through the framing layer. diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 1944e61b..bfd89f5c 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -551,6 +551,8 @@ mechanism to dispatch this message to the appropriate user-defined handler. .message_guarded( MessageType::GenericCommand, + + ```` |msg_header: &CommandHeader| msg_header.sub_type == CommandSubType::Special, @@ -718,7 +720,10 @@ messages and optionally producing responses. - A specific message type that implements a `wireframe::Responder` trait (analogous to Actix Web's `Responder` trait 4). This trait defines how the - returned value is serialized and sent back to the client. + returned value is serialized and sent back to the client. When a handler + yields such a value, `wireframe` encodes it using the application’s + configured serializer and passes the resulting bytes to the `FrameProcessor` + for transmission back to the peer. - `Result`: For explicit error handling. If `Ok(response_message)`, the message is sent. If `Err(error_value)`, the error is processed by "wireframe's" error handling mechanism (see Section @@ -1115,55 +1120,65 @@ examples are invaluable. They make the abstract design tangible and showcase how ```` +````` + + + 3. **Server Setup and Handler**: Rust - ````rustrust + ```rustrust // Crate: main.rs - use wireframe::{WireframeApp, WireframeServer, Message, error::Result as WireframeResult, config::SerializationFormat}; - use my_protocol_messages::{EchoRequest, EchoResponse}; - use my_frame_processor::LengthPrefixedCodec; // Or wireframe's abstraction - use std::time::{SystemTime, UNIX_EPOCH}; + ``` - // Define a message ID enum if not using type-based routing directly - # - enum MyMessageType { Echo = 1 } - - // Handler function - async fn handle_echo(req: Message) -> WireframeResult { - println!("Received echo request with payload: {}", req.payload); - Ok(EchoResponse { - original_payload: req.payload.clone(), - echoed_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), - }) - } + use wireframe::{WireframeApp, WireframeServer, Message, error::Result as + WireframeResult, serializer::BincodeSerializer}; use + my_protocol_messages::{EchoRequest, EchoResponse}; use + my_frame_processor::LengthPrefixedCodec; // Or wireframe's abstraction use + std::time::{SystemTime, UNIX_EPOCH}; - #[tokio::main] - async fn main() -> std::io::Result<()> { - println!("Starting echo server on 127.0.0.1:8000"); - - WireframeServer::new(|| { - WireframeApp::new() - //.frame_processor(LengthPrefixedCodec) // Simplified - .serialization_format(SerializationFormat::Bincode) // Specify serializer - .route(MyMessageType::Echo, handle_echo) // Route based on ID - // OR if type-based routing is supported and EchoRequest has an ID: - //.service(handle_echo_typed) where handle_echo_typed takes Message - }) - .bind("127.0.0.1:8000")? - .run() - .await - } + // Define a message ID enum if not using type-based routing directly - ```rust + # - This example, even in outline, demonstrates how derive macros for messages, - a separable framing component, and a clear handler signature with - extractors (`Message`) and a return type - (`WireframeResult`) simplify server implementation. + enum MyMessageType { Echo = 1 } - ```` + // Handler function async fn handle_echo(req: Message) -> + WireframeResult { println!("Received echo request with payload: + {}", req.payload); Ok(EchoResponse { original_payload: req.payload.clone(), + echoed_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), }) + } + + #[tokio::main] async fn main() -> std::io::Result\<()> { println!("Starting + echo server on 127.0.0.1:8000"); + + ``` + WireframeServer::new(|| { + WireframeApp::new() + //.frame_processor(LengthPrefixedCodec) // Simplified + .serializer(BincodeSerializer) // Specify serializer + .route(MyMessageType::Echo, handle_echo) // Route based on ID + // OR if type-based routing is supported and EchoRequest has an ID: + //.service(handle_echo_typed) where handle_echo_typed takes Message + }) + .bind("127.0.0.1:8000")? + .run() + .await + ``` + + + + } + + ```rust + + This example, even in outline, demonstrates how derive macros for messages, + a separable framing component, and a clear handler signature with + extractors (`Message`) and a return type + (`WireframeResult`) simplify server implementation. + + ``` - **Example 2: Basic Chat Message Protocol** @@ -1217,62 +1232,53 @@ examples are invaluable. They make the abstract design tangible and showcase how Rust - ````rustrust + ```rustrust // Crate: main.rs - use wireframe::{WireframeApp, WireframeServer, Message, ConnectionInfo, error::Result as WireframeResult, config::SerializationFormat}; - use my_chat_messages::{ClientMessage, ServerMessage}; - //... use ChatRoomState, SharedChatRoomState... - use std::sync::Arc; - - # - enum ChatMessageType { ClientJoin = 10, ClientPost = 11 } - - - async fn handle_join( - msg: Message, // Assume it's ClientMessage::Join - conn_info: ConnectionInfo, - state: SharedChatRoomState, - ) -> WireframeResult> { // Optional direct response - if let ClientMessage::Join { user_name } = msg.into_inner() { - let mut room = state.lock().await; - //... logic to add user, check for name conflicts... - // room.add_user(conn_info.id(), user_name.clone()); - // Broadcast ServerMessage::UserJoined to other users (not shown) - println!("User '{}' joined from {}", user_name, conn_info.peer_addr()); - return Ok(None); // No direct response, or maybe an Ack - } - Ok(Some(ServerMessage::JoinError { reason: "Invalid Join message".to_string() })) - } - - async fn handle_post( - msg: Message, // Assume it's ClientMessage::Post - conn_info: ConnectionInfo, - state: SharedChatRoomState, - ) { // No direct response needed - if let ClientMessage::Post { content } = msg.into_inner() { - let room = state.lock().await; - // let user_name = room.get_user_name(conn_info.id()).unwrap_or_default(); - // Broadcast ServerMessage::NewMessage to other users (not shown) - // println!("User '{}' posted: {}", user_name, content); - } - } + ``` - #[tokio::main] - async fn main() -> std::io::Result<()> { - let chat_state = Arc::new(Mutex::new(ChatRoomState { users: HashMap::new() })); - WireframeServer::new(move | + use wireframe::{WireframeApp, WireframeServer, Message, ConnectionInfo, + error::Result as WireframeResult, serializer::BincodeSerializer}; use + my_chat_messages::{ClientMessage, ServerMessage}; //... use ChatRoomState, + SharedChatRoomState... use std::sync::Arc; + + # + + enum ChatMessageType { ClientJoin = 10, ClientPost = 11 } + + async fn handle_join( msg: Message, // Assume it's + ClientMessage::Join conn_info: ConnectionInfo, state: SharedChatRoomState, ) + -> WireframeResult\> { // Optional direct response if + let ClientMessage::Join { user_name } = msg.into_inner() { let mut room = + state.lock().await; //... logic to add user, check for name conflicts... // + room.add_user(conn_info.id(), user_name.clone()); // Broadcast + ServerMessage::UserJoined to other users (not shown) println!("User '{}' + joined from {}", user_name, conn_info.peer_addr()); return Ok(None); // No + direct response, or maybe an Ack } Ok(Some(ServerMessage::JoinError { reason: + "Invalid Join message".to_string() })) } + + async fn handle_post( msg: Message, // Assume it's + ClientMessage::Post conn_info: ConnectionInfo, state: SharedChatRoomState, ) { + // No direct response needed if let ClientMessage::Post { content } = + msg.into_inner() { let room = state.lock().await; // let user_name = + room.get_user_name(conn_info.id()).unwrap_or_default(); // Broadcast + ServerMessage::NewMessage to other users (not shown) // println!("User '{}' + posted: {}", user_name, content); } } + + #[tokio::main] async fn main() -> std::io::Result\<()> { let chat_state = + Arc::new(Mutex::new(ChatRoomState { users: HashMap::new() })); + WireframeServer::new(move | - ```rust + ```rust - ```` + ``` -| { +| { | WireframeApp::new() //.frame_processor(...) -.serialization_format(SerializationFormat::Bincode) +.serializer(BincodeSerializer) .app_data(SharedChatRoomState::new(chat_state.clone())) @@ -1290,7 +1296,9 @@ WireframeApp::new() } -\`\`\` +````` + + This chat example hints at how shared state (SharedChatRoomState) and connection information (ConnectionInfo) would be used, and how handlers might not always diff --git a/src/app.rs b/src/app.rs index 90041e77..75ae240f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,21 +1,34 @@ //! Application builder configuring routes and middleware. //! //! `WireframeApp` stores registered routes, services, and middleware -//! for a [`WireframeServer`]. Methods return [`Result`] so callers -//! can chain registrations ergonomically. +//! for a [`WireframeServer`]. Most builder methods return [`Result`] +//! so callers can chain registrations ergonomically. use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin}; +use bytes::BytesMut; +use tokio::io::{self, AsyncWrite, AsyncWriteExt}; + +use crate::{ + frame::{FrameProcessor, LengthPrefixedProcessor}, + message::Message, + serializer::{BincodeSerializer, Serializer}, +}; + +type BoxedFrameProcessor = + Box, Error = io::Error> + Send + Sync>; + /// Configures routing and middleware for a `WireframeServer`. /// /// The builder stores registered routes, services, and middleware /// without enforcing an ordering. Methods return [`Result`] so /// registrations can be chained ergonomically. -#[derive(Default)] -pub struct WireframeApp { +pub struct WireframeApp { routes: HashMap, services: Vec, middleware: Vec>, + frame_processor: BoxedFrameProcessor, + serializer: S, } /// Alias for boxed asynchronous handlers. @@ -34,10 +47,56 @@ pub enum WireframeError { DuplicateRoute(u32), } +/// Errors produced when sending a handler response over a stream. +#[derive(Debug)] +pub enum SendError { + /// Serialization failed. + Serialize(Box), + /// Writing to the stream failed. + Io(io::Error), +} + +impl std::fmt::Display for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SendError::Serialize(e) => write!(f, "serialization error: {e}"), + SendError::Io(e) => write!(f, "I/O error: {e}"), + } + } +} + +impl std::error::Error for SendError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + SendError::Serialize(e) => Some(&**e), + SendError::Io(e) => Some(e), + } + } +} + +impl From for SendError { + fn from(e: io::Error) -> Self { SendError::Io(e) } +} + /// Result type used throughout the builder API. pub type Result = std::result::Result; -impl WireframeApp { +impl Default for WireframeApp +where + S: Serializer + Default, +{ + fn default() -> Self { + Self { + routes: HashMap::new(), + services: Vec::new(), + middleware: Vec::new(), + frame_processor: Box::new(LengthPrefixedProcessor), + serializer: S::default(), + } + } +} + +impl WireframeApp { /// Construct a new empty application builder. /// /// # Errors @@ -45,7 +104,19 @@ impl WireframeApp { /// This function currently never returns an error but uses the /// [`Result`] type for forward compatibility. pub fn new() -> Result { Ok(Self::default()) } +} +impl WireframeApp +where + S: Serializer, +{ + /// Construct a new empty application builder. + /// + /// # Errors + /// + /// This function currently never returns an error but uses the + /// [`Result`] type for forward compatibility. + /// /// Register a route that maps `id` to `handler`. /// /// # Errors @@ -84,14 +155,65 @@ impl WireframeApp { Ok(self) } + /// Set the frame processor used for encoding and decoding frames. + #[must_use] + pub fn frame_processor

(mut self, processor: P) -> Self + where + P: FrameProcessor, Error = io::Error> + Send + Sync + 'static, + { + self.frame_processor = Box::new(processor); + self + } + + /// Replace the serializer used for messages. + #[must_use] + pub fn serializer(self, serializer: Ser) -> WireframeApp + where + Ser: Serializer, + { + WireframeApp { + routes: self.routes, + services: self.services, + middleware: self.middleware, + frame_processor: self.frame_processor, + serializer, + } + } + + /// Serialize `msg` and write it to `stream` using the frame processor. + /// + /// # Errors + /// + /// Returns a [`SendError`] if serialization or writing fails. + pub async fn send_response( + &self, + stream: &mut W, + msg: &M, + ) -> std::result::Result<(), SendError> + where + W: AsyncWrite + Unpin, + M: Message, + { + let bytes = self + .serializer + .serialize(msg) + .map_err(SendError::Serialize)?; + let mut framed = BytesMut::with_capacity(4 + bytes.len()); + self.frame_processor + .encode(&bytes, &mut framed) + .map_err(SendError::Io)?; + stream.write_all(&framed).await.map_err(SendError::Io)?; + stream.flush().await.map_err(SendError::Io) + } + /// Handle an accepted connection. /// /// This placeholder immediately closes the connection after the /// preamble phase. A warning is logged so tests and callers are /// aware of the current limitation. - pub async fn handle_connection(&self, _stream: S) + pub async fn handle_connection(&self, _stream: W) where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, + W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, { log::warn!( "`WireframeApp::handle_connection` called, but connection handling is not \ diff --git a/src/frame.rs b/src/frame.rs index c2ca7492..e4add771 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -4,8 +4,9 @@ //! Implementations may use any framing strategy suitable for the //! underlying transport. -use async_trait::async_trait; -use bytes::BytesMut; +use std::io; + +use bytes::{Buf, BytesMut}; /// Trait defining how raw bytes are decoded into frames and how frames are /// encoded back into bytes for transmission. @@ -13,7 +14,8 @@ use bytes::BytesMut; /// The `Frame` associated type represents a logical unit extracted from or /// written to the wire. Errors are represented by the `Error` associated type, /// which must implement [`std::error::Error`]. -#[async_trait] +/// Frame processors operate synchronously on in-memory buffers and need +/// no mutable state. The trait therefore uses `&self` receivers. pub trait FrameProcessor: Send + Sync { /// Logical frame type extracted from the stream. type Frame; @@ -22,8 +24,49 @@ pub trait FrameProcessor: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; /// Attempt to decode the next frame from `src`. - async fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error>; + /// + /// # Errors + /// + /// Returns an error if the bytes in `src` cannot be parsed into a complete frame. + fn decode(&self, src: &mut BytesMut) -> Result, Self::Error>; /// Encode `frame` and append the bytes to `dst`. - async fn encode(&mut self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; + /// + /// # Errors + /// + /// Returns an error if the frame cannot be written to `dst`. + fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; +} + +/// Simple length-prefixed framing using big-endian u32 lengths. +pub struct LengthPrefixedProcessor; + +impl FrameProcessor for LengthPrefixedProcessor { + type Frame = Vec; + type Error = std::io::Error; + + fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { + if src.len() < 4 { + return Ok(None); + } + let mut len_bytes = [0u8; 4]; + len_bytes.copy_from_slice(&src[..4]); + let len = u32::from_be_bytes(len_bytes); + let len_usize = usize::try_from(len).map_err(|_| io::Error::other("frame too large"))?; + if src.len() < 4 + len_usize { + return Ok(None); + } + src.advance(4); + Ok(Some(src.split_to(len_usize).to_vec())) + } + + fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { + use bytes::BufMut; + dst.reserve(4 + frame.len()); + let len = u32::try_from(frame.len()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?; + dst.put_u32(len); + dst.extend_from_slice(frame); + Ok(()) + } } diff --git a/src/lib.rs b/src/lib.rs index e763b16b..a7d5d78d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ //! servers, including routing, middleware, and connection utilities. pub mod app; +pub mod serializer; +pub use serializer::{BincodeSerializer, Serializer}; pub mod extractor; pub mod frame; pub mod message; diff --git a/src/serializer.rs b/src/serializer.rs new file mode 100644 index 00000000..ac22a1e1 --- /dev/null +++ b/src/serializer.rs @@ -0,0 +1,48 @@ +//! Message serialization traits. +//! +//! This module defines the [`Serializer`] trait enabling applications to plug in +//! custom encoding formats. A basic [`BincodeSerializer`] implementation is +//! provided as the default. + +use std::error::Error; + +use crate::message::Message; + +/// Trait for serializing and deserializing messages. +pub trait Serializer { + /// Serialize `value` into a byte vector. + /// + /// # Errors + /// + /// Returns an error if the value cannot be serialized. + fn serialize(&self, value: &M) -> Result, Box>; + + /// Deserialize a message from `bytes`, returning the message and bytes consumed. + /// + /// # Errors + /// + /// Returns an error if the bytes cannot be parsed into a message. + fn deserialize( + &self, + bytes: &[u8], + ) -> Result<(M, usize), Box>; +} + +/// Serializer using `bincode` with its standard configuration. +#[derive(Clone, Copy, Debug, Default)] +pub struct BincodeSerializer; + +impl Serializer for BincodeSerializer { + fn serialize(&self, value: &M) -> Result, Box> { + value + .to_bytes() + .map_err(|e| Box::new(e) as Box) + } + + fn deserialize( + &self, + bytes: &[u8], + ) -> Result<(M, usize), Box> { + M::from_bytes(bytes).map_err(|e| Box::new(e) as Box) + } +} diff --git a/tests/response.rs b/tests/response.rs new file mode 100644 index 00000000..277820cf --- /dev/null +++ b/tests/response.rs @@ -0,0 +1,119 @@ +//! Tests covering response serialization and framing logic. +//! +//! These verify normal encoding as well as error conditions like +//! write failures and encode errors. + +use bytes::BytesMut; +use wireframe::{ + app::WireframeApp, + frame::{FrameProcessor, LengthPrefixedProcessor}, + message::Message, + serializer::BincodeSerializer, +}; + +#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] +struct TestResp(u32); + +#[derive(Debug)] +struct FailingResp; + +impl bincode::Encode for FailingResp { + fn encode( + &self, + _: &mut E, + ) -> Result<(), bincode::error::EncodeError> { + Err(bincode::error::EncodeError::Other("fail")) + } +} + +impl<'de> bincode::BorrowDecode<'de, ()> for FailingResp { + fn borrow_decode>( + _: &mut D, + ) -> Result { + Ok(FailingResp) + } +} + +#[tokio::test] +async fn send_response_encodes_and_frames() { + let app = WireframeApp::new() + .unwrap() + .frame_processor(LengthPrefixedProcessor) + .serializer(BincodeSerializer); + + let mut out = Vec::new(); + app.send_response(&mut out, &TestResp(7)).await.unwrap(); + + let processor = LengthPrefixedProcessor; + let mut buf = BytesMut::from(&out[..]); + let frame = processor.decode(&mut buf).unwrap().unwrap(); + let (decoded, _) = TestResp::from_bytes(&frame).unwrap(); + assert_eq!(decoded, TestResp(7)); +} + +#[tokio::test] +async fn length_prefixed_decode_requires_complete_header() { + let processor = LengthPrefixedProcessor; + let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes + assert!(processor.decode(&mut buf).unwrap().is_none()); + assert_eq!(buf.len(), 3); // nothing consumed +} + +#[tokio::test] +async fn length_prefixed_decode_requires_full_frame() { + let processor = LengthPrefixedProcessor; + let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); + assert!(processor.decode(&mut buf).unwrap().is_none()); + // buffer should retain bytes since frame isn't complete + assert_eq!(buf.len(), 6); +} + +struct FailingWriter; + +impl tokio::io::AsyncWrite for FailingWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + _: &[u8], + ) -> std::task::Poll> { + std::task::Poll::Ready(Err(std::io::Error::other("fail"))) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +#[tokio::test] +async fn send_response_propagates_write_error() { + let app = WireframeApp::new() + .unwrap() + .frame_processor(LengthPrefixedProcessor); + + let mut writer = FailingWriter; + let err = app + .send_response(&mut writer, &TestResp(3)) + .await + .expect_err("expected error"); + assert!(matches!(err, wireframe::app::SendError::Io(_))); +} + +#[tokio::test] +async fn send_response_returns_encode_error() { + let app = WireframeApp::new().unwrap(); + let err = app + .send_response(&mut Vec::new(), &FailingResp) + .await + .expect_err("expected error"); + assert!(matches!(err, wireframe::app::SendError::Serialize(_))); +}