From 92e4b4999dd1e632f4271cd3215dd5742edf8eb1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Jun 2025 03:32:03 +0100 Subject: [PATCH 1/7] Add module docstring for config module --- docs/roadmap.md | 2 +- docs/rust-binary-router-library-design.md | 5 +- src/app.rs | 68 ++++++++++++++++++++++- src/config.rs | 39 +++++++++++++ src/frame.rs | 36 +++++++++++- src/lib.rs | 1 + tests/response.rs | 29 ++++++++++ 7 files changed, 176 insertions(+), 4 deletions(-) create mode 100644 src/config.rs create mode 100644 tests/response.rs diff --git a/docs/roadmap.md b/docs/roadmap.md index 372561db..1caea5cb 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -62,7 +62,7 @@ after formatting. Line numbers below refer to that file. user-configured callbacks on decode success or failure. See [preamble-validator](preamble-validator.md). -- [ ] Add response serialization and transmission. Encode handler responses +- [x] Add response serialization and transmission. Encode handler responses using the selected serialization format and write them back through the framing layer. diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 1944e61b..5d40ba1e 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -718,7 +718,10 @@ messages and optionally producing responses. - A specific message type that implements a `wireframe::Responder` trait (analogous to Actix Web's `Responder` trait 4). This trait defines how the - returned value is serialized and sent back to the client. + returned value is serialized and sent back to the client. When a handler + yields such a value, `wireframe` encodes it using the application’s + configured `SerializationFormat` and passes the resulting bytes to the + `FrameProcessor` for transmission back to the peer. - `Result`: For explicit error handling. If `Ok(response_message)`, the message is sent. If `Err(error_value)`, the error is processed by "wireframe's" error handling mechanism (see Section diff --git a/src/app.rs b/src/app.rs index 90041e77..52b19d35 100644 --- a/src/app.rs +++ b/src/app.rs @@ -6,16 +6,29 @@ use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin}; +use bytes::BytesMut; +use tokio::io::{self, AsyncWrite, AsyncWriteExt}; + +use crate::{ + config::SerializationFormat, + frame::{FrameProcessor, LengthPrefixedProcessor}, + message::Message, +}; + +type BoxedFrameProcessor = + Box, Error = io::Error> + Send + Sync>; + /// Configures routing and middleware for a `WireframeServer`. /// /// The builder stores registered routes, services, and middleware /// without enforcing an ordering. Methods return [`Result`] so /// registrations can be chained ergonomically. -#[derive(Default)] pub struct WireframeApp { routes: HashMap, services: Vec, middleware: Vec>, + frame_processor: BoxedFrameProcessor, + serializer: SerializationFormat, } /// Alias for boxed asynchronous handlers. @@ -37,6 +50,18 @@ pub enum WireframeError { /// Result type used throughout the builder API. pub type Result = std::result::Result; +impl Default for WireframeApp { + fn default() -> Self { + Self { + routes: HashMap::new(), + services: Vec::new(), + middleware: Vec::new(), + frame_processor: Box::new(LengthPrefixedProcessor), + serializer: SerializationFormat::Bincode, + } + } +} + impl WireframeApp { /// Construct a new empty application builder. /// @@ -84,6 +109,47 @@ impl WireframeApp { Ok(self) } + /// Set the frame processor used for encoding and decoding frames. + /// + /// # Errors + /// + /// Currently never returns an error but retains `Result` for future + /// configurability. + pub fn frame_processor

(mut self, processor: P) -> Result + where + P: FrameProcessor, Error = io::Error> + Send + Sync + 'static, + { + self.frame_processor = Box::new(processor); + Ok(self) + } + + /// Choose the serialization format for messages. + /// + /// # Errors + /// + /// This function currently never fails. + pub fn serialization_format(mut self, format: SerializationFormat) -> Result { + self.serializer = format; + Ok(self) + } + + /// Serialize `msg` and write it to `stream` using the frame processor. + /// + /// # Errors + /// + /// Returns an `io::Error` if serialization or writing fails. + pub async fn send_response(&mut self, stream: &mut S, msg: &M) -> io::Result<()> + where + S: AsyncWrite + Unpin, + M: Message, + { + let bytes = self.serializer.serialize(msg).map_err(io::Error::other)?; + let mut framed = BytesMut::new(); + self.frame_processor.encode(&bytes, &mut framed).await?; + stream.write_all(&framed).await?; + stream.flush().await + } + /// Handle an accepted connection. /// /// This placeholder immediately closes the connection after the diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 00000000..c67705d9 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,39 @@ +//! Application configuration types. +//! +//! This module defines enums and helpers for selecting +//! serialization formats used by `wireframe` when encoding +//! and decoding messages. +use bincode::error::{DecodeError, EncodeError}; + +use crate::message::Message; + +/// Serialization formats supported by `wireframe`. +#[derive(Clone, Copy)] +pub enum SerializationFormat { + /// Use `bincode` with its standard configuration. + Bincode, +} + +impl SerializationFormat { + /// Serialize a message into a byte vector. + /// + /// # Errors + /// + /// Returns an [`EncodeError`] if serialization fails. + pub fn serialize(self, value: &M) -> Result, EncodeError> { + match self { + SerializationFormat::Bincode => value.to_bytes(), + } + } + + /// Deserialize a message from a byte slice. + /// + /// # Errors + /// + /// Returns a [`DecodeError`] if deserialization fails. + pub fn deserialize(self, bytes: &[u8]) -> Result<(M, usize), DecodeError> { + match self { + SerializationFormat::Bincode => M::from_bytes(bytes), + } + } +} diff --git a/src/frame.rs b/src/frame.rs index c2ca7492..dbe9a144 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -4,8 +4,10 @@ //! Implementations may use any framing strategy suitable for the //! underlying transport. +use std::io; + use async_trait::async_trait; -use bytes::BytesMut; +use bytes::{Buf, BytesMut}; /// Trait defining how raw bytes are decoded into frames and how frames are /// encoded back into bytes for transmission. @@ -27,3 +29,35 @@ pub trait FrameProcessor: Send + Sync { /// Encode `frame` and append the bytes to `dst`. async fn encode(&mut self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; } + +/// Simple length-prefixed framing using big-endian u32 lengths. +pub struct LengthPrefixedProcessor; + +#[async_trait] +impl FrameProcessor for LengthPrefixedProcessor { + type Frame = Vec; + type Error = std::io::Error; + + async fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.len() < 4 { + return Ok(None); + } + let mut len_bytes = [0u8; 4]; + len_bytes.copy_from_slice(&src[..4]); + let len = u32::from_be_bytes(len_bytes); + if src.len() < 4 + len as usize { + return Ok(None); + } + src.advance(4); + Ok(Some(src.split_to(len as usize).to_vec())) + } + + async fn encode(&mut self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { + use bytes::BufMut; + dst.reserve(4 + frame.len()); + let len = u32::try_from(frame.len()).map_err(|_| io::Error::other("frame too large"))?; + dst.put_u32(len); + dst.extend_from_slice(frame); + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index e763b16b..7aee0c6a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ //! servers, including routing, middleware, and connection utilities. pub mod app; +pub mod config; pub mod extractor; pub mod frame; pub mod message; diff --git a/tests/response.rs b/tests/response.rs new file mode 100644 index 00000000..e6462857 --- /dev/null +++ b/tests/response.rs @@ -0,0 +1,29 @@ +use bytes::BytesMut; +use wireframe::{ + app::WireframeApp, + config::SerializationFormat, + frame::{FrameProcessor, LengthPrefixedProcessor}, + message::Message, +}; + +#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] +struct TestResp(u32); + +#[tokio::test] +async fn send_response_encodes_and_frames() { + let mut app = WireframeApp::new() + .unwrap() + .frame_processor(LengthPrefixedProcessor) + .unwrap() + .serialization_format(SerializationFormat::Bincode) + .unwrap(); + + let mut out = Vec::new(); + app.send_response(&mut out, &TestResp(7)).await.unwrap(); + + let mut processor = LengthPrefixedProcessor; + let mut buf = BytesMut::from(&out[..]); + let frame = processor.decode(&mut buf).await.unwrap().unwrap(); + let (decoded, _) = TestResp::from_bytes(&frame).unwrap(); + assert_eq!(decoded, TestResp(7)); +} From bc8bec60f158e59409f699937537f56d690223d0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Jun 2025 03:39:33 +0100 Subject: [PATCH 2/7] Expand response tests and document serialization --- README.md | 8 +++++++ tests/response.rs | 58 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/README.md b/README.md index dcb9f778..5f4b02ef 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,14 @@ WireframeServer::new(|| { This example showcases how derive macros and the framing abstraction simplify a binary protocol server【F:docs/rust-binary-router-library-design.md†L1120-L1150】. +## Response Serialization and Framing + +Handlers can return types implementing the `Responder` trait. These values are +encoded using the application's configured `SerializationFormat` and written +back through the `FrameProcessor`【F:docs/rust-binary-router-library-design.md†L718-L724】. +The included `LengthPrefixedProcessor` illustrates a simple framing strategy +based on a big‑endian length prefix【F:docs/rust-binary-router-library-design.md†L1076-L1117】. + ## Current Limitations Connection processing is not implemented yet. After the optional diff --git a/tests/response.rs b/tests/response.rs index e6462857..318d2268 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -27,3 +27,61 @@ async fn send_response_encodes_and_frames() { let (decoded, _) = TestResp::from_bytes(&frame).unwrap(); assert_eq!(decoded, TestResp(7)); } + +#[tokio::test] +async fn length_prefixed_decode_requires_complete_header() { + let mut processor = LengthPrefixedProcessor; + let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes + assert!(processor.decode(&mut buf).await.unwrap().is_none()); + assert_eq!(buf.len(), 3); // nothing consumed +} + +#[tokio::test] +async fn length_prefixed_decode_requires_full_frame() { + let mut processor = LengthPrefixedProcessor; + let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); + assert!(processor.decode(&mut buf).await.unwrap().is_none()); + // buffer should retain bytes since frame isn't complete + assert_eq!(buf.len(), 6); +} + +struct FailingWriter; + +impl tokio::io::AsyncWrite for FailingWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + _: &[u8], + ) -> std::task::Poll> { + std::task::Poll::Ready(Err(std::io::Error::other("fail"))) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +#[tokio::test] +async fn send_response_propagates_write_error() { + let mut app = WireframeApp::new() + .unwrap() + .frame_processor(LengthPrefixedProcessor) + .unwrap(); + + let mut writer = FailingWriter; + let err = app + .send_response(&mut writer, &TestResp(3)) + .await + .expect_err("expected error"); + assert_eq!(err.kind(), std::io::ErrorKind::Other); +} From cf23fc5ab03784b7ede08059c6ac86e567f683b6 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Jun 2025 19:47:39 +0100 Subject: [PATCH 3/7] Refine builder API and response serialization --- src/app.rs | 34 +++++++++++++++------------------- src/config.rs | 10 +++++++++- src/frame.rs | 8 +++++--- src/lib.rs | 1 + tests/response.rs | 7 ++----- 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/app.rs b/src/app.rs index 52b19d35..99791942 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,8 +1,8 @@ //! Application builder configuring routes and middleware. //! //! `WireframeApp` stores registered routes, services, and middleware -//! for a [`WireframeServer`]. Methods return [`Result`] so callers -//! can chain registrations ergonomically. +//! for a [`WireframeServer`]. Most builder methods return [`Result`] +//! so callers can chain registrations ergonomically. use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin}; @@ -10,7 +10,7 @@ use bytes::BytesMut; use tokio::io::{self, AsyncWrite, AsyncWriteExt}; use crate::{ - config::SerializationFormat, + SerializationFormat, frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, }; @@ -57,7 +57,7 @@ impl Default for WireframeApp { services: Vec::new(), middleware: Vec::new(), frame_processor: Box::new(LengthPrefixedProcessor), - serializer: SerializationFormat::Bincode, + serializer: SerializationFormat::DEFAULT, } } } @@ -110,27 +110,20 @@ impl WireframeApp { } /// Set the frame processor used for encoding and decoding frames. - /// - /// # Errors - /// - /// Currently never returns an error but retains `Result` for future - /// configurability. - pub fn frame_processor

(mut self, processor: P) -> Result + #[must_use] + pub fn frame_processor

(mut self, processor: P) -> Self where P: FrameProcessor, Error = io::Error> + Send + Sync + 'static, { self.frame_processor = Box::new(processor); - Ok(self) + self } /// Choose the serialization format for messages. - /// - /// # Errors - /// - /// This function currently never fails. - pub fn serialization_format(mut self, format: SerializationFormat) -> Result { + #[must_use] + pub fn serialization_format(mut self, format: SerializationFormat) -> Self { self.serializer = format; - Ok(self) + self } /// Serialize `msg` and write it to `stream` using the frame processor. @@ -143,8 +136,11 @@ impl WireframeApp { S: AsyncWrite + Unpin, M: Message, { - let bytes = self.serializer.serialize(msg).map_err(io::Error::other)?; - let mut framed = BytesMut::new(); + let bytes = self + .serializer + .serialize(msg) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut framed = BytesMut::with_capacity(4 + bytes.len()); self.frame_processor.encode(&bytes, &mut framed).await?; stream.write_all(&framed).await?; stream.flush().await diff --git a/src/config.rs b/src/config.rs index c67705d9..93e05680 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,13 +8,17 @@ use bincode::error::{DecodeError, EncodeError}; use crate::message::Message; /// Serialization formats supported by `wireframe`. -#[derive(Clone, Copy)] +#[non_exhaustive] +#[derive(Clone, Copy, Debug)] pub enum SerializationFormat { /// Use `bincode` with its standard configuration. Bincode, } impl SerializationFormat { + /// The library default (currently [`Bincode`]). + pub const DEFAULT: SerializationFormat = SerializationFormat::Bincode; + /// Serialize a message into a byte vector. /// /// # Errors @@ -37,3 +41,7 @@ impl SerializationFormat { } } } + +impl Default for SerializationFormat { + fn default() -> Self { SerializationFormat::DEFAULT } +} diff --git a/src/frame.rs b/src/frame.rs index dbe9a144..afdfb688 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -45,17 +45,19 @@ impl FrameProcessor for LengthPrefixedProcessor { let mut len_bytes = [0u8; 4]; len_bytes.copy_from_slice(&src[..4]); let len = u32::from_be_bytes(len_bytes); - if src.len() < 4 + len as usize { + let len_usize = usize::try_from(len).map_err(|_| io::Error::other("frame too large"))?; + if src.len() < 4 + len_usize { return Ok(None); } src.advance(4); - Ok(Some(src.split_to(len as usize).to_vec())) + Ok(Some(src.split_to(len_usize).to_vec())) } async fn encode(&mut self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { use bytes::BufMut; dst.reserve(4 + frame.len()); - let len = u32::try_from(frame.len()).map_err(|_| io::Error::other("frame too large"))?; + let len = u32::try_from(frame.len()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?; dst.put_u32(len); dst.extend_from_slice(frame); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 7aee0c6a..09aa93a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod app; pub mod config; +pub use config::SerializationFormat; pub mod extractor; pub mod frame; pub mod message; diff --git a/tests/response.rs b/tests/response.rs index 318d2268..369a3a5a 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -14,9 +14,7 @@ async fn send_response_encodes_and_frames() { let mut app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor) - .unwrap() - .serialization_format(SerializationFormat::Bincode) - .unwrap(); + .serialization_format(SerializationFormat::Bincode); let mut out = Vec::new(); app.send_response(&mut out, &TestResp(7)).await.unwrap(); @@ -75,8 +73,7 @@ impl tokio::io::AsyncWrite for FailingWriter { async fn send_response_propagates_write_error() { let mut app = WireframeApp::new() .unwrap() - .frame_processor(LengthPrefixedProcessor) - .unwrap(); + .frame_processor(LengthPrefixedProcessor); let mut writer = FailingWriter; let err = app From 11027bcb02223398c4b4b6caf2d42ed149e0a6ec Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Jun 2025 23:51:39 +0100 Subject: [PATCH 4/7] Make frame processors stateless --- src/app.rs | 53 +++++++++++++++++++++++++++++++++++++++++------ src/frame.rs | 21 ++++++++++++------- tests/response.rs | 48 ++++++++++++++++++++++++++++++++++-------- 3 files changed, 100 insertions(+), 22 deletions(-) diff --git a/src/app.rs b/src/app.rs index 99791942..85a5e38c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -47,6 +47,41 @@ pub enum WireframeError { DuplicateRoute(u32), } +/// Errors produced when sending a handler response over a stream. +#[derive(Debug)] +pub enum SendError { + /// Serialization failed. + Serialize(bincode::error::EncodeError), + /// Writing to the stream failed. + Io(io::Error), +} + +impl std::fmt::Display for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SendError::Serialize(e) => write!(f, "serialization error: {e}"), + SendError::Io(e) => write!(f, "I/O error: {e}"), + } + } +} + +impl std::error::Error for SendError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + SendError::Serialize(e) => Some(e), + SendError::Io(e) => Some(e), + } + } +} + +impl From for SendError { + fn from(e: io::Error) -> Self { SendError::Io(e) } +} + +impl From for SendError { + fn from(e: bincode::error::EncodeError) -> Self { SendError::Serialize(e) } +} + /// Result type used throughout the builder API. pub type Result = std::result::Result; @@ -130,8 +165,12 @@ impl WireframeApp { /// /// # Errors /// - /// Returns an `io::Error` if serialization or writing fails. - pub async fn send_response(&mut self, stream: &mut S, msg: &M) -> io::Result<()> + /// Returns a [`SendError`] if serialization or writing fails. + pub async fn send_response( + &self, + stream: &mut S, + msg: &M, + ) -> std::result::Result<(), SendError> where S: AsyncWrite + Unpin, M: Message, @@ -139,11 +178,13 @@ impl WireframeApp { let bytes = self .serializer .serialize(msg) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + .map_err(SendError::Serialize)?; let mut framed = BytesMut::with_capacity(4 + bytes.len()); - self.frame_processor.encode(&bytes, &mut framed).await?; - stream.write_all(&framed).await?; - stream.flush().await + self.frame_processor + .encode(&bytes, &mut framed) + .map_err(SendError::Io)?; + stream.write_all(&framed).await.map_err(SendError::Io)?; + stream.flush().await.map_err(SendError::Io) } /// Handle an accepted connection. diff --git a/src/frame.rs b/src/frame.rs index afdfb688..e4add771 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -6,7 +6,6 @@ use std::io; -use async_trait::async_trait; use bytes::{Buf, BytesMut}; /// Trait defining how raw bytes are decoded into frames and how frames are @@ -15,7 +14,8 @@ use bytes::{Buf, BytesMut}; /// The `Frame` associated type represents a logical unit extracted from or /// written to the wire. Errors are represented by the `Error` associated type, /// which must implement [`std::error::Error`]. -#[async_trait] +/// Frame processors operate synchronously on in-memory buffers and need +/// no mutable state. The trait therefore uses `&self` receivers. pub trait FrameProcessor: Send + Sync { /// Logical frame type extracted from the stream. type Frame; @@ -24,21 +24,28 @@ pub trait FrameProcessor: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; /// Attempt to decode the next frame from `src`. - async fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error>; + /// + /// # Errors + /// + /// Returns an error if the bytes in `src` cannot be parsed into a complete frame. + fn decode(&self, src: &mut BytesMut) -> Result, Self::Error>; /// Encode `frame` and append the bytes to `dst`. - async fn encode(&mut self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; + /// + /// # Errors + /// + /// Returns an error if the frame cannot be written to `dst`. + fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; } /// Simple length-prefixed framing using big-endian u32 lengths. pub struct LengthPrefixedProcessor; -#[async_trait] impl FrameProcessor for LengthPrefixedProcessor { type Frame = Vec; type Error = std::io::Error; - async fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 4 { return Ok(None); } @@ -53,7 +60,7 @@ impl FrameProcessor for LengthPrefixedProcessor { Ok(Some(src.split_to(len_usize).to_vec())) } - async fn encode(&mut self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { use bytes::BufMut; dst.reserve(4 + frame.len()); let len = u32::try_from(frame.len()) diff --git a/tests/response.rs b/tests/response.rs index 369a3a5a..813fba1d 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -9,9 +9,29 @@ use wireframe::{ #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestResp(u32); +#[derive(Debug)] +struct FailingResp; + +impl bincode::Encode for FailingResp { + fn encode( + &self, + _: &mut E, + ) -> Result<(), bincode::error::EncodeError> { + Err(bincode::error::EncodeError::Other("fail")) + } +} + +impl<'de> bincode::BorrowDecode<'de, ()> for FailingResp { + fn borrow_decode>( + _: &mut D, + ) -> Result { + Ok(FailingResp) + } +} + #[tokio::test] async fn send_response_encodes_and_frames() { - let mut app = WireframeApp::new() + let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor) .serialization_format(SerializationFormat::Bincode); @@ -19,26 +39,26 @@ async fn send_response_encodes_and_frames() { let mut out = Vec::new(); app.send_response(&mut out, &TestResp(7)).await.unwrap(); - let mut processor = LengthPrefixedProcessor; + let processor = LengthPrefixedProcessor; let mut buf = BytesMut::from(&out[..]); - let frame = processor.decode(&mut buf).await.unwrap().unwrap(); + let frame = processor.decode(&mut buf).unwrap().unwrap(); let (decoded, _) = TestResp::from_bytes(&frame).unwrap(); assert_eq!(decoded, TestResp(7)); } #[tokio::test] async fn length_prefixed_decode_requires_complete_header() { - let mut processor = LengthPrefixedProcessor; + let processor = LengthPrefixedProcessor; let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes - assert!(processor.decode(&mut buf).await.unwrap().is_none()); + assert!(processor.decode(&mut buf).unwrap().is_none()); assert_eq!(buf.len(), 3); // nothing consumed } #[tokio::test] async fn length_prefixed_decode_requires_full_frame() { - let mut processor = LengthPrefixedProcessor; + let processor = LengthPrefixedProcessor; let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); - assert!(processor.decode(&mut buf).await.unwrap().is_none()); + assert!(processor.decode(&mut buf).unwrap().is_none()); // buffer should retain bytes since frame isn't complete assert_eq!(buf.len(), 6); } @@ -71,7 +91,7 @@ impl tokio::io::AsyncWrite for FailingWriter { #[tokio::test] async fn send_response_propagates_write_error() { - let mut app = WireframeApp::new() + let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor); @@ -80,5 +100,15 @@ async fn send_response_propagates_write_error() { .send_response(&mut writer, &TestResp(3)) .await .expect_err("expected error"); - assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert!(matches!(err, wireframe::app::SendError::Io(_))); +} + +#[tokio::test] +async fn send_response_returns_encode_error() { + let app = WireframeApp::new().unwrap(); + let err = app + .send_response(&mut Vec::new(), &FailingResp) + .await + .expect_err("expected error"); + assert!(matches!(err, wireframe::app::SendError::Serialize(_))); } From ff7ad02568b125b8d733399fb4ddff909cd7dce3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Jun 2025 23:51:46 +0100 Subject: [PATCH 5/7] Introduce trait-based serializer --- README.md | 4 +- docs/rust-binary-router-library-design.md | 181 +++++++++++----------- src/app.rs | 61 +++++--- src/config.rs | 47 ------ src/lib.rs | 4 +- src/serializer.rs | 48 ++++++ tests/response.rs | 4 +- 7 files changed, 187 insertions(+), 162 deletions(-) delete mode 100644 src/config.rs create mode 100644 src/serializer.rs diff --git a/README.md b/README.md index 5f4b02ef..061691b6 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ async fn handle_echo(req: Message) -> WireframeResult WireframeServer::new(|| { WireframeApp::new() - .serialization_format(SerializationFormat::Bincode) + .serializer(BincodeSerializer::default()) .route(MyMessageType::Echo, handle_echo) }) .bind("127.0.0.1:8000")? @@ -81,7 +81,7 @@ binary protocol server【F:docs/rust-binary-router-library-design.md†L1120-L11 ## Response Serialization and Framing Handlers can return types implementing the `Responder` trait. These values are -encoded using the application's configured `SerializationFormat` and written +encoded using the application's configured serializer and written back through the `FrameProcessor`【F:docs/rust-binary-router-library-design.md†L718-L724】. The included `LengthPrefixedProcessor` illustrates a simple framing strategy based on a big‑endian length prefix【F:docs/rust-binary-router-library-design.md†L1076-L1117】. diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 5d40ba1e..2285e800 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -551,6 +551,8 @@ mechanism to dispatch this message to the appropriate user-defined handler. .message_guarded( MessageType::GenericCommand, + + ```` |msg_header: &CommandHeader| msg_header.sub_type == CommandSubType::Special, @@ -720,8 +722,8 @@ messages and optionally producing responses. (analogous to Actix Web's `Responder` trait 4). This trait defines how the returned value is serialized and sent back to the client. When a handler yields such a value, `wireframe` encodes it using the application’s - configured `SerializationFormat` and passes the resulting bytes to the - `FrameProcessor` for transmission back to the peer. + configured serializer and passes the resulting bytes to the `FrameProcessor` + for transmission back to the peer. - `Result`: For explicit error handling. If `Ok(response_message)`, the message is sent. If `Err(error_value)`, the error is processed by "wireframe's" error handling mechanism (see Section @@ -1118,55 +1120,65 @@ examples are invaluable. They make the abstract design tangible and showcase how ```` +````` + + + 3. **Server Setup and Handler**: Rust - ````rustrust + ```rustrust // Crate: main.rs - use wireframe::{WireframeApp, WireframeServer, Message, error::Result as WireframeResult, config::SerializationFormat}; - use my_protocol_messages::{EchoRequest, EchoResponse}; - use my_frame_processor::LengthPrefixedCodec; // Or wireframe's abstraction - use std::time::{SystemTime, UNIX_EPOCH}; + ``` - // Define a message ID enum if not using type-based routing directly - # - enum MyMessageType { Echo = 1 } - - // Handler function - async fn handle_echo(req: Message) -> WireframeResult { - println!("Received echo request with payload: {}", req.payload); - Ok(EchoResponse { - original_payload: req.payload.clone(), - echoed_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), - }) - } + use wireframe::{WireframeApp, WireframeServer, Message, error::Result as + WireframeResult, serializer::BincodeSerializer}; use + my_protocol_messages::{EchoRequest, EchoResponse}; use + my_frame_processor::LengthPrefixedCodec; // Or wireframe's abstraction use + std::time::{SystemTime, UNIX_EPOCH}; - #[tokio::main] - async fn main() -> std::io::Result<()> { - println!("Starting echo server on 127.0.0.1:8000"); - - WireframeServer::new(|| { - WireframeApp::new() - //.frame_processor(LengthPrefixedCodec) // Simplified - .serialization_format(SerializationFormat::Bincode) // Specify serializer - .route(MyMessageType::Echo, handle_echo) // Route based on ID - // OR if type-based routing is supported and EchoRequest has an ID: - //.service(handle_echo_typed) where handle_echo_typed takes Message - }) - .bind("127.0.0.1:8000")? - .run() - .await - } + // Define a message ID enum if not using type-based routing directly - ```rust + # - This example, even in outline, demonstrates how derive macros for messages, - a separable framing component, and a clear handler signature with - extractors (`Message`) and a return type - (`WireframeResult`) simplify server implementation. + enum MyMessageType { Echo = 1 } - ```` + // Handler function async fn handle_echo(req: Message) -> + WireframeResult { println!("Received echo request with payload: + {}", req.payload); Ok(EchoResponse { original_payload: req.payload.clone(), + echoed_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), }) + } + + #[tokio::main] async fn main() -> std::io::Result\<()> { println!("Starting + echo server on 127.0.0.1:8000"); + + ``` + WireframeServer::new(|| { + WireframeApp::new() + //.frame_processor(LengthPrefixedCodec) // Simplified + .serializer(BincodeSerializer::default()) // Specify serializer + .route(MyMessageType::Echo, handle_echo) // Route based on ID + // OR if type-based routing is supported and EchoRequest has an ID: + //.service(handle_echo_typed) where handle_echo_typed takes Message + }) + .bind("127.0.0.1:8000")? + .run() + .await + ``` + + + + } + + ```rust + + This example, even in outline, demonstrates how derive macros for messages, + a separable framing component, and a clear handler signature with + extractors (`Message`) and a return type + (`WireframeResult`) simplify server implementation. + + ``` - **Example 2: Basic Chat Message Protocol** @@ -1220,62 +1232,53 @@ examples are invaluable. They make the abstract design tangible and showcase how Rust - ````rustrust + ```rustrust // Crate: main.rs - use wireframe::{WireframeApp, WireframeServer, Message, ConnectionInfo, error::Result as WireframeResult, config::SerializationFormat}; - use my_chat_messages::{ClientMessage, ServerMessage}; - //... use ChatRoomState, SharedChatRoomState... - use std::sync::Arc; - - # - enum ChatMessageType { ClientJoin = 10, ClientPost = 11 } - - - async fn handle_join( - msg: Message, // Assume it's ClientMessage::Join - conn_info: ConnectionInfo, - state: SharedChatRoomState, - ) -> WireframeResult> { // Optional direct response - if let ClientMessage::Join { user_name } = msg.into_inner() { - let mut room = state.lock().await; - //... logic to add user, check for name conflicts... - // room.add_user(conn_info.id(), user_name.clone()); - // Broadcast ServerMessage::UserJoined to other users (not shown) - println!("User '{}' joined from {}", user_name, conn_info.peer_addr()); - return Ok(None); // No direct response, or maybe an Ack - } - Ok(Some(ServerMessage::JoinError { reason: "Invalid Join message".to_string() })) - } - - async fn handle_post( - msg: Message, // Assume it's ClientMessage::Post - conn_info: ConnectionInfo, - state: SharedChatRoomState, - ) { // No direct response needed - if let ClientMessage::Post { content } = msg.into_inner() { - let room = state.lock().await; - // let user_name = room.get_user_name(conn_info.id()).unwrap_or_default(); - // Broadcast ServerMessage::NewMessage to other users (not shown) - // println!("User '{}' posted: {}", user_name, content); - } - } + ``` - #[tokio::main] - async fn main() -> std::io::Result<()> { - let chat_state = Arc::new(Mutex::new(ChatRoomState { users: HashMap::new() })); - WireframeServer::new(move | + use wireframe::{WireframeApp, WireframeServer, Message, ConnectionInfo, + error::Result as WireframeResult, serializer::BincodeSerializer}; use + my_chat_messages::{ClientMessage, ServerMessage}; //... use ChatRoomState, + SharedChatRoomState... use std::sync::Arc; + + # + + enum ChatMessageType { ClientJoin = 10, ClientPost = 11 } + + async fn handle_join( msg: Message, // Assume it's + ClientMessage::Join conn_info: ConnectionInfo, state: SharedChatRoomState, ) + -> WireframeResult\> { // Optional direct response if + let ClientMessage::Join { user_name } = msg.into_inner() { let mut room = + state.lock().await; //... logic to add user, check for name conflicts... // + room.add_user(conn_info.id(), user_name.clone()); // Broadcast + ServerMessage::UserJoined to other users (not shown) println!("User '{}' + joined from {}", user_name, conn_info.peer_addr()); return Ok(None); // No + direct response, or maybe an Ack } Ok(Some(ServerMessage::JoinError { reason: + "Invalid Join message".to_string() })) } + + async fn handle_post( msg: Message, // Assume it's + ClientMessage::Post conn_info: ConnectionInfo, state: SharedChatRoomState, ) { + // No direct response needed if let ClientMessage::Post { content } = + msg.into_inner() { let room = state.lock().await; // let user_name = + room.get_user_name(conn_info.id()).unwrap_or_default(); // Broadcast + ServerMessage::NewMessage to other users (not shown) // println!("User '{}' + posted: {}", user_name, content); } } + + #[tokio::main] async fn main() -> std::io::Result\<()> { let chat_state = + Arc::new(Mutex::new(ChatRoomState { users: HashMap::new() })); + WireframeServer::new(move | - ```rust + ```rust - ```` + ``` -| { +| { | WireframeApp::new() //.frame_processor(...) -.serialization_format(SerializationFormat::Bincode) +.serializer(BincodeSerializer::default()) .app_data(SharedChatRoomState::new(chat_state.clone())) @@ -1293,7 +1296,9 @@ WireframeApp::new() } -\`\`\` +````` + + This chat example hints at how shared state (SharedChatRoomState) and connection information (ConnectionInfo) would be used, and how handlers might not always diff --git a/src/app.rs b/src/app.rs index 85a5e38c..75ae240f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -10,9 +10,9 @@ use bytes::BytesMut; use tokio::io::{self, AsyncWrite, AsyncWriteExt}; use crate::{ - SerializationFormat, frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, + serializer::{BincodeSerializer, Serializer}, }; type BoxedFrameProcessor = @@ -23,12 +23,12 @@ type BoxedFrameProcessor = /// The builder stores registered routes, services, and middleware /// without enforcing an ordering. Methods return [`Result`] so /// registrations can be chained ergonomically. -pub struct WireframeApp { +pub struct WireframeApp { routes: HashMap, services: Vec, middleware: Vec>, frame_processor: BoxedFrameProcessor, - serializer: SerializationFormat, + serializer: S, } /// Alias for boxed asynchronous handlers. @@ -51,7 +51,7 @@ pub enum WireframeError { #[derive(Debug)] pub enum SendError { /// Serialization failed. - Serialize(bincode::error::EncodeError), + Serialize(Box), /// Writing to the stream failed. Io(io::Error), } @@ -68,7 +68,7 @@ impl std::fmt::Display for SendError { impl std::error::Error for SendError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { - SendError::Serialize(e) => Some(e), + SendError::Serialize(e) => Some(&**e), SendError::Io(e) => Some(e), } } @@ -78,26 +78,25 @@ impl From for SendError { fn from(e: io::Error) -> Self { SendError::Io(e) } } -impl From for SendError { - fn from(e: bincode::error::EncodeError) -> Self { SendError::Serialize(e) } -} - /// Result type used throughout the builder API. pub type Result = std::result::Result; -impl Default for WireframeApp { +impl Default for WireframeApp +where + S: Serializer + Default, +{ fn default() -> Self { Self { routes: HashMap::new(), services: Vec::new(), middleware: Vec::new(), frame_processor: Box::new(LengthPrefixedProcessor), - serializer: SerializationFormat::DEFAULT, + serializer: S::default(), } } } -impl WireframeApp { +impl WireframeApp { /// Construct a new empty application builder. /// /// # Errors @@ -105,7 +104,19 @@ impl WireframeApp { /// This function currently never returns an error but uses the /// [`Result`] type for forward compatibility. pub fn new() -> Result { Ok(Self::default()) } +} +impl WireframeApp +where + S: Serializer, +{ + /// Construct a new empty application builder. + /// + /// # Errors + /// + /// This function currently never returns an error but uses the + /// [`Result`] type for forward compatibility. + /// /// Register a route that maps `id` to `handler`. /// /// # Errors @@ -154,11 +165,19 @@ impl WireframeApp { self } - /// Choose the serialization format for messages. + /// Replace the serializer used for messages. #[must_use] - pub fn serialization_format(mut self, format: SerializationFormat) -> Self { - self.serializer = format; - self + pub fn serializer(self, serializer: Ser) -> WireframeApp + where + Ser: Serializer, + { + WireframeApp { + routes: self.routes, + services: self.services, + middleware: self.middleware, + frame_processor: self.frame_processor, + serializer, + } } /// Serialize `msg` and write it to `stream` using the frame processor. @@ -166,13 +185,13 @@ impl WireframeApp { /// # Errors /// /// Returns a [`SendError`] if serialization or writing fails. - pub async fn send_response( + pub async fn send_response( &self, - stream: &mut S, + stream: &mut W, msg: &M, ) -> std::result::Result<(), SendError> where - S: AsyncWrite + Unpin, + W: AsyncWrite + Unpin, M: Message, { let bytes = self @@ -192,9 +211,9 @@ impl WireframeApp { /// This placeholder immediately closes the connection after the /// preamble phase. A warning is logged so tests and callers are /// aware of the current limitation. - pub async fn handle_connection(&self, _stream: S) + pub async fn handle_connection(&self, _stream: W) where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, + W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, { log::warn!( "`WireframeApp::handle_connection` called, but connection handling is not \ diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index 93e05680..00000000 --- a/src/config.rs +++ /dev/null @@ -1,47 +0,0 @@ -//! Application configuration types. -//! -//! This module defines enums and helpers for selecting -//! serialization formats used by `wireframe` when encoding -//! and decoding messages. -use bincode::error::{DecodeError, EncodeError}; - -use crate::message::Message; - -/// Serialization formats supported by `wireframe`. -#[non_exhaustive] -#[derive(Clone, Copy, Debug)] -pub enum SerializationFormat { - /// Use `bincode` with its standard configuration. - Bincode, -} - -impl SerializationFormat { - /// The library default (currently [`Bincode`]). - pub const DEFAULT: SerializationFormat = SerializationFormat::Bincode; - - /// Serialize a message into a byte vector. - /// - /// # Errors - /// - /// Returns an [`EncodeError`] if serialization fails. - pub fn serialize(self, value: &M) -> Result, EncodeError> { - match self { - SerializationFormat::Bincode => value.to_bytes(), - } - } - - /// Deserialize a message from a byte slice. - /// - /// # Errors - /// - /// Returns a [`DecodeError`] if deserialization fails. - pub fn deserialize(self, bytes: &[u8]) -> Result<(M, usize), DecodeError> { - match self { - SerializationFormat::Bincode => M::from_bytes(bytes), - } - } -} - -impl Default for SerializationFormat { - fn default() -> Self { SerializationFormat::DEFAULT } -} diff --git a/src/lib.rs b/src/lib.rs index 09aa93a4..a7d5d78d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,8 @@ //! servers, including routing, middleware, and connection utilities. pub mod app; -pub mod config; -pub use config::SerializationFormat; +pub mod serializer; +pub use serializer::{BincodeSerializer, Serializer}; pub mod extractor; pub mod frame; pub mod message; diff --git a/src/serializer.rs b/src/serializer.rs new file mode 100644 index 00000000..ac22a1e1 --- /dev/null +++ b/src/serializer.rs @@ -0,0 +1,48 @@ +//! Message serialization traits. +//! +//! This module defines the [`Serializer`] trait enabling applications to plug in +//! custom encoding formats. A basic [`BincodeSerializer`] implementation is +//! provided as the default. + +use std::error::Error; + +use crate::message::Message; + +/// Trait for serializing and deserializing messages. +pub trait Serializer { + /// Serialize `value` into a byte vector. + /// + /// # Errors + /// + /// Returns an error if the value cannot be serialized. + fn serialize(&self, value: &M) -> Result, Box>; + + /// Deserialize a message from `bytes`, returning the message and bytes consumed. + /// + /// # Errors + /// + /// Returns an error if the bytes cannot be parsed into a message. + fn deserialize( + &self, + bytes: &[u8], + ) -> Result<(M, usize), Box>; +} + +/// Serializer using `bincode` with its standard configuration. +#[derive(Clone, Copy, Debug, Default)] +pub struct BincodeSerializer; + +impl Serializer for BincodeSerializer { + fn serialize(&self, value: &M) -> Result, Box> { + value + .to_bytes() + .map_err(|e| Box::new(e) as Box) + } + + fn deserialize( + &self, + bytes: &[u8], + ) -> Result<(M, usize), Box> { + M::from_bytes(bytes).map_err(|e| Box::new(e) as Box) + } +} diff --git a/tests/response.rs b/tests/response.rs index 813fba1d..cd404a73 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -1,9 +1,9 @@ use bytes::BytesMut; use wireframe::{ app::WireframeApp, - config::SerializationFormat, frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, + serializer::BincodeSerializer, }; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] @@ -34,7 +34,7 @@ async fn send_response_encodes_and_frames() { let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor) - .serialization_format(SerializationFormat::Bincode); + .serializer(BincodeSerializer::default()); let mut out = Vec::new(); app.send_response(&mut out, &TestResp(7)).await.unwrap(); From 48807252d24826494a5ba1c772dc9fa8580704f9 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Jun 2025 00:19:37 +0100 Subject: [PATCH 6/7] Remove default() calls --- README.md | 2 +- docs/rust-binary-router-library-design.md | 8 ++++---- tests/response.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 061691b6..89e4503e 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ async fn handle_echo(req: Message) -> WireframeResult WireframeServer::new(|| { WireframeApp::new() - .serializer(BincodeSerializer::default()) + .serializer(BincodeSerializer) .route(MyMessageType::Echo, handle_echo) }) .bind("127.0.0.1:8000")? diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 2285e800..bfd89f5c 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -1155,9 +1155,9 @@ examples are invaluable. They make the abstract design tangible and showcase how ``` WireframeServer::new(|| { - WireframeApp::new() - //.frame_processor(LengthPrefixedCodec) // Simplified - .serializer(BincodeSerializer::default()) // Specify serializer + WireframeApp::new() + //.frame_processor(LengthPrefixedCodec) // Simplified + .serializer(BincodeSerializer) // Specify serializer .route(MyMessageType::Echo, handle_echo) // Route based on ID // OR if type-based routing is supported and EchoRequest has an ID: //.service(handle_echo_typed) where handle_echo_typed takes Message @@ -1278,7 +1278,7 @@ WireframeApp::new() //.frame_processor(...) -.serializer(BincodeSerializer::default()) +.serializer(BincodeSerializer) .app_data(SharedChatRoomState::new(chat_state.clone())) diff --git a/tests/response.rs b/tests/response.rs index cd404a73..f02163fa 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -34,7 +34,7 @@ async fn send_response_encodes_and_frames() { let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor) - .serializer(BincodeSerializer::default()); + .serializer(BincodeSerializer); let mut out = Vec::new(); app.send_response(&mut out, &TestResp(7)).await.unwrap(); From 2efe165df1dc6eb562942468ed164ace35c521b4 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Jun 2025 00:21:59 +0100 Subject: [PATCH 7/7] Add doc comment for response tests (#70) --- tests/response.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/response.rs b/tests/response.rs index f02163fa..277820cf 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -1,3 +1,8 @@ +//! Tests covering response serialization and framing logic. +//! +//! These verify normal encoding as well as error conditions like +//! write failures and encode errors. + use bytes::BytesMut; use wireframe::{ app::WireframeApp,