From 36b86c1c3e917223d0bdb1caa2741d0ad4c109f1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 02:32:15 +0100 Subject: [PATCH 01/13] Introduce PacketParts and optional correlation IDs --- README.md | 14 +++-- examples/metadata_routing.rs | 2 +- examples/ping_pong.rs | 17 +++--- src/app.rs | 106 ++++++++++++++++++++--------------- src/middleware.rs | 29 +++++++--- tests/correlation_id.rs | 6 +- tests/lifecycle.rs | 26 +++++---- tests/metadata.rs | 4 +- tests/middleware.rs | 18 +++++- tests/middleware_order.rs | 6 +- tests/routes.rs | 34 ++++++----- tests/world.rs | 10 +++- 12 files changed, 169 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index e277bcfa..5a6dc63b 100644 --- a/README.md +++ b/README.md @@ -95,17 +95,19 @@ payload bytes. Applications can supply their own envelope type by calling `Packet` trait: ```rust -use wireframe::app::{Packet, WireframeApp}; +use wireframe::app::{Packet, PacketParts, WireframeApp}; #[derive(bincode::Encode, bincode::BorrowDecode)] -struct MyEnv { id: u32, correlation_id: u64, data: Vec } +struct MyEnv { id: u32, correlation_id: Option, data: Vec } impl Packet for MyEnv { fn id(&self) -> u32 { self.id } - fn correlation_id(&self) -> u64 { self.correlation_id } - fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.data) } - fn from_parts(id: u32, correlation_id: u64, data: Vec) -> Self { - Self { id, correlation_id, data } + fn correlation_id(&self) -> Option { self.correlation_id } + fn into_parts(self) -> PacketParts { + PacketParts { id: self.id, correlation_id: self.correlation_id, msg: self.data } + } + fn from_parts(parts: PacketParts) -> Self { + Self { id: parts.id, correlation_id: parts.correlation_id, data: parts.msg } } } diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index b2f77eca..0936e803 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -52,7 +52,7 @@ impl FrameMetadata for HeaderSerializer { // `parse` receives the complete frame because `LengthPrefixedProcessor` // ensures `src` contains exactly one message. Returning `src.len()` is // therefore correct for this demo. - Ok((Envelope::new(id, 0, payload), src.len())) + Ok((Envelope::new(id, Some(0), payload), src.len())) } } diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index ed39b217..61763d94 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -57,13 +57,15 @@ where type Error = std::convert::Infallible; async fn call(&self, req: ServiceRequest) -> Result { + let cid = req.correlation_id(); let (ping_req, _) = match Ping::from_bytes(req.frame()) { Ok(val) => val, Err(e) => { eprintln!("failed to decode ping: {e:?}"); - return Ok(ServiceResponse::new(encode_error(format!( - "decode error: {e:?}" - )))); + return Ok(ServiceResponse::new( + encode_error(format!("decode error: {e:?}")), + cid, + )); } }; let mut response = self.inner.call(req).await?; @@ -71,15 +73,16 @@ where Pong(v) } else { eprintln!("ping overflowed at {}", ping_req.0); - return Ok(ServiceResponse::new(encode_error("overflow"))); + return Ok(ServiceResponse::new(encode_error("overflow"), cid)); }; match pong_resp.to_bytes() { Ok(bytes) => *response.frame_mut() = bytes, Err(e) => { eprintln!("failed to encode pong: {e:?}"); - return Ok(ServiceResponse::new(encode_error(format!( - "encode error: {e:?}" - )))); + return Ok(ServiceResponse::new( + encode_error(format!("encode error: {e:?}")), + cid, + )); } } Ok(response) diff --git a/src/app.rs b/src/app.rs index 7d23028c..c2ffb7d2 100644 --- a/src/app.rs +++ b/src/app.rs @@ -163,15 +163,20 @@ impl From for SendError { /// impl Packet for CustomEnvelope { /// fn id(&self) -> u32 { self.id } /// -/// fn correlation_id(&self) -> u64 { 0 } +/// fn correlation_id(&self) -> Option { None } /// -/// fn into_parts(self) -> (u32, u64, Vec) { (self.id, 0, self.payload) } +/// fn into_parts(self) -> PacketParts { +/// PacketParts { +/// id: self.id, +/// correlation_id: None, +/// msg: self.payload, +/// } +/// } /// -/// fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { -/// let _ = correlation_id; +/// fn from_parts(parts: PacketParts) -> Self { /// Self { -/// id, -/// payload: msg, +/// id: parts.id, +/// payload: parts.msg, /// timestamp: 0, /// } /// } @@ -182,58 +187,65 @@ pub trait Packet: Message + Send + Sync + 'static { fn id(&self) -> u32; /// Return the correlation identifier tying this frame to a request. - fn correlation_id(&self) -> u64; + fn correlation_id(&self) -> Option; /// Consume the packet and return its identifier, correlation id and payload bytes. - fn into_parts(self) -> (u32, u64, Vec); + fn into_parts(self) -> PacketParts; - /// Construct a new packet from id, correlation id and raw payload bytes. - fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self; + /// Construct a new packet from raw parts. + fn from_parts(parts: PacketParts) -> Self; +} + +/// Component values extracted from or used to build a [`Packet`]. +#[derive(Debug)] +pub struct PacketParts { + pub id: u32, + pub correlation_id: Option, + pub msg: Vec, } /// Basic envelope type used by [`handle_connection`]. /// -/// Incoming frames are deserialized into an `Envelope` containing the +/// Incoming frames are deserialised into an `Envelope` containing the /// message identifier and raw payload bytes. -#[derive(bincode::Decode, bincode::Encode, Copy, Clone, Debug)] -pub struct PacketHeader { - pub(crate) id: u32, - pub(crate) correlation_id: u64, -} - #[derive(bincode::Decode, bincode::Encode, Debug)] pub struct Envelope { - pub(crate) header: PacketHeader, + pub(crate) id: u32, + pub(crate) correlation_id: Option, pub(crate) msg: Vec, } impl Envelope { /// Create a new [`Envelope`] with the provided identifiers and payload. #[must_use] - pub fn new(id: u32, correlation_id: u64, msg: Vec) -> Self { + pub fn new(id: u32, correlation_id: Option, msg: Vec) -> Self { Self { - header: PacketHeader { id, correlation_id }, + id, + correlation_id, msg, } } - /// Consume the envelope, returning its header and payload bytes. + /// Consume the envelope, returning its parts. #[must_use] - pub fn into_parts(self) -> (PacketHeader, Vec) { (self.header, self.msg) } + pub fn into_parts(self) -> PacketParts { + PacketParts { + id: self.id, + correlation_id: self.correlation_id, + msg: self.msg, + } + } } impl Packet for Envelope { - fn id(&self) -> u32 { self.header.id } + fn id(&self) -> u32 { self.id } - fn correlation_id(&self) -> u64 { self.header.correlation_id } + fn correlation_id(&self) -> Option { self.correlation_id } - fn into_parts(self) -> (u32, u64, Vec) { - let (header, msg) = Envelope::into_parts(self); - (header.id, header.correlation_id, msg) - } + fn into_parts(self) -> PacketParts { Envelope::into_parts(self) } - fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { - Envelope::new(id, correlation_id, msg) + fn from_parts(parts: PacketParts) -> Self { + Envelope::new(parts.id, parts.correlation_id, parts.msg) } } @@ -290,19 +302,25 @@ where /// #[derive(bincode::Encode, bincode::BorrowDecode)] /// struct MyEnv { /// id: u32, - /// correlation_id: u64, + /// correlation_id: Option, /// data: Vec, /// } /// /// impl Packet for MyEnv { /// fn id(&self) -> u32 { self.id } - /// fn correlation_id(&self) -> u64 { self.correlation_id } - /// fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.data) } - /// fn from_parts(id: u32, correlation_id: u64, data: Vec) -> Self { + /// fn correlation_id(&self) -> Option { self.correlation_id } + /// fn into_parts(self) -> PacketParts { + /// PacketParts { + /// id: self.id, + /// correlation_id: self.correlation_id, + /// msg: self.data, + /// } + /// } + /// fn from_parts(parts: PacketParts) -> Self { /// Self { - /// id, - /// correlation_id, - /// data, + /// id: parts.id, + /// correlation_id: parts.correlation_id, + /// data: parts.msg, /// } /// } /// } @@ -725,24 +743,24 @@ where } }; - if let Some(service) = routes.get(&env.header.id) { - let request = ServiceRequest::new(env.msg, env.header.correlation_id); + if let Some(service) = routes.get(&env.id) { + let request = ServiceRequest::new(env.msg, env.correlation_id); match service.call(request).await { Ok(resp) => { - let response = - Envelope::new(env.header.id, env.header.correlation_id, resp.into_inner()); + let correlation_id = resp.correlation_id().or(env.correlation_id); + let response = Envelope::new(env.id, correlation_id, resp.into_inner()); if let Err(e) = self.send_response(stream, &response).await { - tracing::warn!(error = %e, "failed to send response"); + tracing::warn!(id = env.id, correlation_id = ?correlation_id, error = %e, "failed to send response"); crate::metrics::inc_handler_errors(); } } Err(e) => { - tracing::warn!(id = env.header.id, error = ?e, "handler error"); + tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, error = ?e, "handler error"); crate::metrics::inc_handler_errors(); } } } else { - tracing::warn!("no handler for message id {}", env.header.id); + tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, "no handler for message id"); crate::metrics::inc_handler_errors(); } diff --git a/src/middleware.rs b/src/middleware.rs index de33f7ef..bf86e52a 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -8,6 +8,8 @@ use std::{convert::Infallible, sync::Arc}; use async_trait::async_trait; +use crate::app::PacketParts; + /// Generic container for request and response frame data. #[derive(Debug, Default)] pub struct FrameContainer { @@ -36,13 +38,13 @@ impl FrameContainer { #[derive(Debug)] pub struct ServiceRequest { inner: FrameContainer>, - correlation_id: u64, + correlation_id: Option, } impl ServiceRequest { /// Create a new [`ServiceRequest`] from raw frame bytes. #[must_use] - pub fn new(frame: Vec, correlation_id: u64) -> Self { + pub fn new(frame: Vec, correlation_id: Option) -> Self { Self { inner: FrameContainer::new(frame), correlation_id, @@ -53,9 +55,9 @@ impl ServiceRequest { #[must_use] pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() } - /// Return the correlation identifier associated with this request. + /// Return the correlation identifier associated with this request, if any. #[must_use] - pub fn correlation_id(&self) -> u64 { self.correlation_id } + pub fn correlation_id(&self) -> Option { self.correlation_id } /// Mutable access to the inner frame bytes. #[must_use] @@ -70,14 +72,16 @@ impl ServiceRequest { #[derive(Debug, Default)] pub struct ServiceResponse { inner: FrameContainer>, + correlation_id: Option, } impl ServiceResponse { /// Create a new [`ServiceResponse`] containing the given frame bytes. #[must_use] - pub fn new(frame: Vec) -> Self { + pub fn new(frame: Vec, correlation_id: Option) -> Self { Self { inner: FrameContainer::new(frame), + correlation_id, } } @@ -89,6 +93,10 @@ impl ServiceResponse { #[must_use] pub fn frame_mut(&mut self) -> &mut Vec { self.inner.frame_mut() } + /// Return the correlation identifier associated with this response, if any. + #[must_use] + pub fn correlation_id(&self) -> Option { self.correlation_id } + /// Consume the response, yielding the raw frame bytes. #[must_use] pub fn into_inner(self) -> Vec { self.inner.into_inner() } @@ -303,10 +311,15 @@ impl Service for RouteService { async fn call(&self, req: ServiceRequest) -> Result { // The handler only borrows the envelope, allowing us to consume it // afterwards to extract the response payload. - let env = E::from_parts(self.id, req.correlation_id(), req.into_inner()); + let parts = PacketParts { + id: self.id, + correlation_id: req.correlation_id(), + msg: req.into_inner(), + }; + let env = E::from_parts(parts); (self.handler.as_ref())(&env).await; - let (_, _, bytes) = env.into_parts(); - Ok(ServiceResponse::new(bytes)) + let parts = env.into_parts(); + Ok(ServiceResponse::new(parts.msg, parts.correlation_id)) } } diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index 25316648..e515473b 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -12,13 +12,13 @@ use wireframe::{ async fn stream_frames_carry_request_correlation_id() { let cid = 42u64; let stream: FrameStream = Box::pin(try_stream! { - yield Envelope::new(1, cid, vec![1]); - yield Envelope::new(1, cid, vec![2]); + yield Envelope::new(1, Some(cid), vec![1]); + yield Envelope::new(1, Some(cid), vec![2]); }); let (queues, handle) = PushQueues::bounded(1, 1); let shutdown = CancellationToken::new(); let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); let mut out = Vec::new(); actor.run(&mut out).await.expect("actor run failed"); - assert!(out.iter().all(|e| e.correlation_id() == cid)); + assert!(out.iter().all(|e| e.correlation_id() == Some(cid))); } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index c7dc00ba..09ab9144 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -13,7 +13,7 @@ use std::{ use bytes::BytesMut; use wireframe::{ - app::{Envelope, Packet, WireframeApp}, + app::{Envelope, Packet, PacketParts, WireframeApp}, frame::{FrameProcessor, LengthPrefixedProcessor}, serializer::{BincodeSerializer, Serializer}, }; @@ -102,22 +102,28 @@ async fn teardown_without_setup_does_not_run() { #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct StateEnvelope { id: u32, - correlation_id: u64, + correlation_id: Option, msg: Vec, } -impl wireframe::app::Packet for StateEnvelope { +impl Packet for StateEnvelope { fn id(&self) -> u32 { self.id } - fn correlation_id(&self) -> u64 { self.correlation_id } + fn correlation_id(&self) -> Option { self.correlation_id } - fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.msg) } + fn into_parts(self) -> PacketParts { + PacketParts { + id: self.id, + correlation_id: self.correlation_id, + msg: self.msg, + } + } - fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { + fn from_parts(parts: PacketParts) -> Self { Self { - id, - correlation_id, - msg, + id: parts.id, + correlation_id: parts.correlation_id, + msg: parts.msg, } } } @@ -134,7 +140,7 @@ async fn helpers_propagate_connection_state() { let env = StateEnvelope { id: 1, - correlation_id: 0, + correlation_id: Some(0), msg: vec![1], }; let bytes = BincodeSerializer diff --git a/tests/metadata.rs b/tests/metadata.rs index eaa8dc8b..45efcecb 100644 --- a/tests/metadata.rs +++ b/tests/metadata.rs @@ -60,7 +60,7 @@ async fn metadata_parser_invoked_before_deserialize() { let serializer = CountingSerializer(counter.clone()); let app = mock_wireframe_app_with_serializer(serializer); - let env = Envelope::new(1, 0, vec![42]); + let env = Envelope::new(1, Some(0), vec![42]); let out = drive_with_bincode(app, env) .await @@ -105,7 +105,7 @@ async fn falls_back_to_deserialize_after_parse_error() { let serializer = FallbackSerializer(parse_calls.clone(), deser_calls.clone()); let app = mock_wireframe_app_with_serializer(serializer); - let env = Envelope::new(1, 0, vec![7]); + let env = Envelope::new(1, Some(0), vec![7]); let out = drive_with_bincode(app, env) .await diff --git a/tests/middleware.rs b/tests/middleware.rs index 86e0653f..fb1c508b 100644 --- a/tests/middleware.rs +++ b/tests/middleware.rs @@ -12,7 +12,8 @@ impl Service for EchoService { type Error = std::convert::Infallible; async fn call(&self, req: ServiceRequest) -> Result { - Ok(ServiceResponse::new(req.into_inner())) + let cid = req.correlation_id(); + Ok(ServiceResponse::new(req.into_inner(), cid)) } } @@ -54,7 +55,20 @@ async fn middleware_modifies_request_and_response() { let mw = ModifyMiddleware; let wrapped = mw.transform(service).await; - let request = ServiceRequest::new(vec![1, 2, 3], 0); + let request = ServiceRequest::new(vec![1, 2, 3], Some(0)); let response = wrapped.call(request).await.expect("middleware call failed"); assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']); } + +#[tokio::test] +async fn test_modify_middleware_preserves_nonzero_correlation_id() { + let service = EchoService; + let mw = ModifyMiddleware; + let wrapped = mw.transform(service).await; + + let correlation_id = Some(42); + let request = ServiceRequest::new(vec![4, 5, 6], correlation_id); + let response = wrapped.call(request).await.expect("middleware call failed"); + assert_eq!(response.frame(), &[4, 5, 6, b'!', b'?']); + assert_eq!(response.correlation_id(), correlation_id); +} diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index df33daec..3fd0fed3 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -64,7 +64,7 @@ async fn middleware_applied_in_reverse_order() { let (mut client, server) = duplex(256); - let env = Envelope::new(1, 7, vec![b'X']); + let env = Envelope::new(1, Some(7), vec![b'X']); let serializer = BincodeSerializer; let bytes = serializer.serialize(&env).expect("serialization failed"); // Use the default 4-byte big-endian length prefix for framing @@ -88,6 +88,6 @@ async fn middleware_applied_in_reverse_order() { let (resp, _) = serializer .deserialize::(&frame) .expect("deserialize failed"); - let (_, bytes) = resp.into_parts(); - assert_eq!(bytes, vec![b'X', b'A', b'B', b'B', b'A']); + let parts = resp.into_parts(); + assert_eq!(parts.msg, vec![b'X', b'A', b'B', b'B', b'A']); } diff --git a/tests/routes.rs b/tests/routes.rs index 0d4957dd..af0f04ac 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -11,7 +11,7 @@ use bytes::BytesMut; use rstest::rstest; use wireframe::{ Serializer, - app::WireframeApp, + app::{Packet, PacketParts, WireframeApp}, frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::BincodeSerializer, @@ -21,22 +21,28 @@ use wireframe_testing::{drive_with_bincode, drive_with_frames}; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestEnvelope { id: u32, - correlation_id: u64, + correlation_id: Option, msg: Vec, } -impl wireframe::app::Packet for TestEnvelope { +impl Packet for TestEnvelope { fn id(&self) -> u32 { self.id } - fn correlation_id(&self) -> u64 { self.correlation_id } + fn correlation_id(&self) -> Option { self.correlation_id } - fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.msg) } + fn into_parts(self) -> PacketParts { + PacketParts { + id: self.id, + correlation_id: self.correlation_id, + msg: self.msg, + } + } - fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { + fn from_parts(parts: PacketParts) -> Self { Self { - id, - correlation_id, - msg, + id: parts.id, + correlation_id: parts.correlation_id, + msg: parts.msg, } } } @@ -66,7 +72,7 @@ async fn handler_receives_message_and_echoes_response() { let msg_bytes = Echo(42).to_bytes().expect("encode failed"); let env = TestEnvelope { id: 1, - correlation_id: 99, + correlation_id: Some(99), msg: msg_bytes, }; @@ -82,7 +88,7 @@ async fn handler_receives_message_and_echoes_response() { let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); - assert_eq!(resp_env.correlation_id, 99); + assert_eq!(resp_env.correlation_id, Some(99)); let (echo, _) = Echo::from_bytes(&resp_env.msg).expect("decode echo failed"); assert_eq!(echo, Echo(42)); assert_eq!(called.load(Ordering::SeqCst), 1); @@ -104,7 +110,7 @@ async fn multiple_frames_processed_in_sequence() { let msg_bytes = Echo(id).to_bytes().expect("encode failed"); let env = TestEnvelope { id: 1, - correlation_id: u64::from(id), + correlation_id: Some(u64::from(id)), msg: msg_bytes, }; let env_bytes = BincodeSerializer @@ -139,8 +145,8 @@ async fn multiple_frames_processed_in_sequence() { .deserialize::(&second) .expect("deserialize failed"); let (echo2, _) = Echo::from_bytes(&env2.msg).expect("decode echo failed"); - assert_eq!(env1.correlation_id, 1); - assert_eq!(env2.correlation_id, 2); + assert_eq!(env1.correlation_id, Some(1)); + assert_eq!(env2.correlation_id, Some(2)); assert_eq!(echo1, Echo(1)); assert_eq!(echo2, Echo(2)); } diff --git a/tests/world.rs b/tests/world.rs index cb32dd4a..06e11024 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -135,8 +135,8 @@ impl CorrelationWorld { pub async fn process(&mut self) { let cid = self.cid; let stream: FrameStream = Box::pin(try_stream! { - yield Envelope::new(1, cid, vec![1]); - yield Envelope::new(1, cid, vec![2]); + yield Envelope::new(1, Some(cid), vec![1]); + yield Envelope::new(1, Some(cid), vec![2]); }); let (queues, handle) = PushQueues::bounded(1, 1); let shutdown = CancellationToken::new(); @@ -150,6 +150,10 @@ impl CorrelationWorld { /// Panics if any frame has a `correlation_id` that does not match the /// expected value. pub fn verify(&self) { - assert!(self.frames.iter().all(|f| f.correlation_id() == self.cid)); + assert!( + self.frames + .iter() + .all(|f| f.correlation_id() == Some(self.cid)) + ); } } From 867338efa72e6bce7ad313d0d53ca0970b474243 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 17:48:54 +0100 Subject: [PATCH 02/13] Document packet abstraction --- docs/rust-binary-router-library-design.md | 35 +++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index fa9d4ab6..7e0e127f 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -498,6 +498,41 @@ frame processing, akin to how `tokio-util::codec` operates, endows "wireframe" with the necessary flexibility to adapt to this diversity without embedding assumptions about any single framing strategy into its core. +#### 4.3.1 Packet Abstraction + +The library defines a `Packet` trait to represent transport frames. Frames can +be decomposed into `PacketParts` for efficient handling and re-assembly. +`Envelope` is the default implementation used by `wireframe`. + +```mermaid +classDiagram + class Packet { + +id() u32 + +correlation_id() Option + +into_parts() PacketParts + +from_parts(parts: PacketParts) Self + } + class PacketParts { + +id: u32 + +correlation_id: Option + +msg: Vec + } + class Envelope { + +id: u32 + +correlation_id: Option + +msg: Vec + +new(id: u32, correlation_id: Option, msg: Vec) + +into_parts() PacketParts + } + Packet <|.. Envelope + PacketParts <.. Envelope : uses + PacketParts <.. Packet : uses +``` + +`Envelope` implements `Packet`, carrying payload and metadata through the +system. `PacketParts` avoids repetitive tuple unpacking when frames are split +into constituent pieces. + ### 4.4. Message Serialization and Deserialization The conversion of frame payloads to and from Rust types is a critical source of From 88d54ebd07eda96448b43bc52d7ec0f9089315c3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 8 Aug 2025 18:30:25 +0100 Subject: [PATCH 03/13] Rename PacketParts field and expand correlation tests --- README.md | 6 +- docs/rust-binary-router-library-design.md | 6 +- examples/metadata_routing.rs | 2 +- src/app.rs | 73 +++++++++----- src/middleware.rs | 13 ++- tests/lifecycle.rs | 12 +-- tests/middleware.rs | 12 +++ tests/middleware_order.rs | 5 +- tests/routes.rs | 112 +++++++++++++++++++--- 9 files changed, 182 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 5a6dc63b..253788f0 100644 --- a/README.md +++ b/README.md @@ -98,16 +98,16 @@ payload bytes. Applications can supply their own envelope type by calling use wireframe::app::{Packet, PacketParts, WireframeApp}; #[derive(bincode::Encode, bincode::BorrowDecode)] -struct MyEnv { id: u32, correlation_id: Option, data: Vec } +struct MyEnv { id: u32, correlation_id: Option, payload: Vec } impl Packet for MyEnv { fn id(&self) -> u32 { self.id } fn correlation_id(&self) -> Option { self.correlation_id } fn into_parts(self) -> PacketParts { - PacketParts { id: self.id, correlation_id: self.correlation_id, msg: self.data } + PacketParts { id: self.id, correlation_id: self.correlation_id, payload: self.payload } } fn from_parts(parts: PacketParts) -> Self { - Self { id: parts.id, correlation_id: parts.correlation_id, data: parts.msg } + Self { id: parts.id, correlation_id: parts.correlation_id, payload: parts.payload } } } diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 7e0e127f..09126e05 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -515,13 +515,13 @@ classDiagram class PacketParts { +id: u32 +correlation_id: Option - +msg: Vec + +payload: Vec } class Envelope { +id: u32 +correlation_id: Option - +msg: Vec - +new(id: u32, correlation_id: Option, msg: Vec) + +payload: Vec + +new(id: u32, correlation_id: Option, payload: Vec) +into_parts() PacketParts } Packet <|.. Envelope diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 0936e803..772d4550 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -52,7 +52,7 @@ impl FrameMetadata for HeaderSerializer { // `parse` receives the complete frame because `LengthPrefixedProcessor` // ensures `src` contains exactly one message. Returning `src.len()` is // therefore correct for this demo. - Ok((Envelope::new(id, Some(0), payload), src.len())) + Ok((Envelope::new(id, None, payload), src.len())) } } diff --git a/src/app.rs b/src/app.rs index c2ffb7d2..845d48d1 100644 --- a/src/app.rs +++ b/src/app.rs @@ -176,7 +176,7 @@ impl From for SendError { /// fn from_parts(parts: PacketParts) -> Self { /// Self { /// id: parts.id, -/// payload: parts.msg, +/// payload: parts.payload, /// timestamp: 0, /// } /// } @@ -201,7 +201,7 @@ pub trait Packet: Message + Send + Sync + 'static { pub struct PacketParts { pub id: u32, pub correlation_id: Option, - pub msg: Vec, + pub payload: Vec, } /// Basic envelope type used by [`handle_connection`]. @@ -212,41 +212,68 @@ pub struct PacketParts { pub struct Envelope { pub(crate) id: u32, pub(crate) correlation_id: Option, - pub(crate) msg: Vec, + pub(crate) payload: Vec, } impl Envelope { /// Create a new [`Envelope`] with the provided identifiers and payload. #[must_use] - pub fn new(id: u32, correlation_id: Option, msg: Vec) -> Self { + pub fn new(id: u32, correlation_id: Option, payload: Vec) -> Self { Self { id, correlation_id, - msg, - } - } - - /// Consume the envelope, returning its parts. - #[must_use] - pub fn into_parts(self) -> PacketParts { - PacketParts { - id: self.id, - correlation_id: self.correlation_id, - msg: self.msg, + payload, } } } impl Packet for Envelope { + #[inline] fn id(&self) -> u32 { self.id } + #[inline] fn correlation_id(&self) -> Option { self.correlation_id } - fn into_parts(self) -> PacketParts { Envelope::into_parts(self) } + fn into_parts(self) -> PacketParts { self.into() } + + fn from_parts(parts: PacketParts) -> Self { parts.into() } +} - fn from_parts(parts: PacketParts) -> Self { - Envelope::new(parts.id, parts.correlation_id, parts.msg) +impl PacketParts { + /// Construct a new set of packet parts. + #[must_use] + pub fn new(id: u32, correlation_id: Option, payload: Vec) -> Self { + Self { + id, + correlation_id, + payload, + } } + + /// Ensure a correlation identifier is present, inheriting from `source` if missing. + #[must_use] + pub fn inherit_correlation(mut self, source: Option) -> Self { + match (self.correlation_id, source) { + (None, cid) => self.correlation_id = cid, + (Some(cid), Some(src)) if cid != src => { + tracing::warn!( + expected = src, + found = cid, + "mismatched correlation id in response" + ); + } + _ => {} + } + self + } +} + +impl From for PacketParts { + fn from(e: Envelope) -> Self { PacketParts::new(e.id, e.correlation_id, e.payload) } +} + +impl From for Envelope { + fn from(p: PacketParts) -> Self { Envelope::new(p.id, p.correlation_id, p.payload) } } /// Number of idle polls before terminating a connection. @@ -320,7 +347,7 @@ where /// Self { /// id: parts.id, /// correlation_id: parts.correlation_id, - /// data: parts.msg, + /// data: parts.payload, /// } /// } /// } @@ -744,11 +771,13 @@ where }; if let Some(service) = routes.get(&env.id) { - let request = ServiceRequest::new(env.msg, env.correlation_id); + let request = ServiceRequest::new(env.payload, env.correlation_id); match service.call(request).await { Ok(resp) => { - let correlation_id = resp.correlation_id().or(env.correlation_id); - let response = Envelope::new(env.id, correlation_id, resp.into_inner()); + let parts = PacketParts::new(env.id, resp.correlation_id(), resp.into_inner()) + .inherit_correlation(env.correlation_id); + let correlation_id = parts.correlation_id; + let response = Envelope::from_parts(parts); if let Err(e) = self.send_response(stream, &response).await { tracing::warn!(id = env.id, correlation_id = ?correlation_id, error = %e, "failed to send response"); crate::metrics::inc_handler_errors(); diff --git a/src/middleware.rs b/src/middleware.rs index bf86e52a..20f35a7f 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -311,15 +311,14 @@ impl Service for RouteService { async fn call(&self, req: ServiceRequest) -> Result { // The handler only borrows the envelope, allowing us to consume it // afterwards to extract the response payload. - let parts = PacketParts { - id: self.id, - correlation_id: req.correlation_id(), - msg: req.into_inner(), - }; - let env = E::from_parts(parts); + let env = E::from_parts(PacketParts::new( + self.id, + req.correlation_id(), + req.into_inner(), + )); (self.handler.as_ref())(&env).await; let parts = env.into_parts(); - Ok(ServiceResponse::new(parts.msg, parts.correlation_id)) + Ok(ServiceResponse::new(parts.payload, parts.correlation_id)) } } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 09ab9144..9e0bba7f 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -103,7 +103,7 @@ async fn teardown_without_setup_does_not_run() { struct StateEnvelope { id: u32, correlation_id: Option, - msg: Vec, + payload: Vec, } impl Packet for StateEnvelope { @@ -112,18 +112,14 @@ impl Packet for StateEnvelope { fn correlation_id(&self) -> Option { self.correlation_id } fn into_parts(self) -> PacketParts { - PacketParts { - id: self.id, - correlation_id: self.correlation_id, - msg: self.msg, - } + PacketParts::new(self.id, self.correlation_id, self.payload) } fn from_parts(parts: PacketParts) -> Self { Self { id: parts.id, correlation_id: parts.correlation_id, - msg: parts.msg, + payload: parts.payload, } } } @@ -141,7 +137,7 @@ async fn helpers_propagate_connection_state() { let env = StateEnvelope { id: 1, correlation_id: Some(0), - msg: vec![1], + payload: vec![1], }; let bytes = BincodeSerializer .serialize(&env) diff --git a/tests/middleware.rs b/tests/middleware.rs index fb1c508b..baac1cb8 100644 --- a/tests/middleware.rs +++ b/tests/middleware.rs @@ -72,3 +72,15 @@ async fn test_modify_middleware_preserves_nonzero_correlation_id() { assert_eq!(response.frame(), &[4, 5, 6, b'!', b'?']); assert_eq!(response.correlation_id(), correlation_id); } + +#[tokio::test] +async fn middleware_preserves_none_correlation_id() { + let service = EchoService; + let mw = ModifyMiddleware; + let wrapped = mw.transform(service).await; + + let request = ServiceRequest::new(vec![7, 8, 9], None); + let response = wrapped.call(request).await.expect("middleware call failed"); + assert_eq!(response.frame(), &[7, 8, 9, b'!', b'?']); + assert_eq!(response.correlation_id(), None); +} diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 3fd0fed3..d6df499e 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use bytes::BytesMut; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use wireframe::{ - app::{Envelope, Handler, WireframeApp}, + app::{Envelope, Handler, Packet, WireframeApp}, frame::{FrameProcessor, LengthPrefixedProcessor}, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, @@ -89,5 +89,6 @@ async fn middleware_applied_in_reverse_order() { .deserialize::(&frame) .expect("deserialize failed"); let parts = resp.into_parts(); - assert_eq!(parts.msg, vec![b'X', b'A', b'B', b'B', b'A']); + assert_eq!(parts.payload, vec![b'X', b'A', b'B', b'B', b'A']); + assert_eq!(parts.correlation_id, Some(7)); } diff --git a/tests/routes.rs b/tests/routes.rs index af0f04ac..42fcb990 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -18,31 +18,29 @@ use wireframe::{ }; use wireframe_testing::{drive_with_bincode, drive_with_frames}; -#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] +#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug, Clone)] struct TestEnvelope { id: u32, correlation_id: Option, - msg: Vec, + payload: Vec, } impl Packet for TestEnvelope { + #[inline] fn id(&self) -> u32 { self.id } + #[inline] fn correlation_id(&self) -> Option { self.correlation_id } fn into_parts(self) -> PacketParts { - PacketParts { - id: self.id, - correlation_id: self.correlation_id, - msg: self.msg, - } + PacketParts::new(self.id, self.correlation_id, self.payload) } fn from_parts(parts: PacketParts) -> Self { Self { id: parts.id, correlation_id: parts.correlation_id, - msg: parts.msg, + payload: parts.payload, } } } @@ -73,7 +71,7 @@ async fn handler_receives_message_and_echoes_response() { let env = TestEnvelope { id: 1, correlation_id: Some(99), - msg: msg_bytes, + payload: msg_bytes, }; let out = drive_with_bincode(app, env) @@ -89,11 +87,44 @@ async fn handler_receives_message_and_echoes_response() { .deserialize::(&frame) .expect("deserialize failed"); assert_eq!(resp_env.correlation_id, Some(99)); - let (echo, _) = Echo::from_bytes(&resp_env.msg).expect("decode echo failed"); + let (echo, _) = Echo::from_bytes(&resp_env.payload).expect("decode echo failed"); assert_eq!(echo, Echo(42)); assert_eq!(called.load(Ordering::SeqCst), 1); } +#[tokio::test] +async fn handler_echoes_with_none_correlation_id() { + let app = WireframeApp::<_, _, TestEnvelope>::new() + .expect("failed to create app") + .frame_processor(LengthPrefixedProcessor::default()) + .route( + 1, + std::sync::Arc::new(|_: &TestEnvelope| Box::pin(async {})), + ) + .expect("route registration failed"); + + let msg_bytes = Echo(7).to_bytes().expect("encode failed"); + let env = TestEnvelope { + id: 1, + correlation_id: None, + payload: msg_bytes, + }; + + let out = drive_with_bincode(app, env).await.expect("drive failed"); + let mut buf = BytesMut::from(&out[..]); + let frame = LengthPrefixedProcessor::default() + .decode(&mut buf) + .expect("decode failed") + .expect("missing frame"); + let (resp_env, _) = BincodeSerializer + .deserialize::(&frame) + .expect("deserialize failed"); + + assert_eq!(resp_env.correlation_id, None); + let (echo, _) = Echo::from_bytes(&resp_env.payload).expect("decode echo failed"); + assert_eq!(echo, Echo(7)); +} + #[tokio::test] async fn multiple_frames_processed_in_sequence() { let app = WireframeApp::<_, _, TestEnvelope>::new() @@ -111,7 +142,7 @@ async fn multiple_frames_processed_in_sequence() { let env = TestEnvelope { id: 1, correlation_id: Some(u64::from(id)), - msg: msg_bytes, + payload: msg_bytes, }; let env_bytes = BincodeSerializer .serialize(&env) @@ -136,7 +167,7 @@ async fn multiple_frames_processed_in_sequence() { let (env1, _) = BincodeSerializer .deserialize::(&first) .expect("deserialize failed"); - let (echo1, _) = Echo::from_bytes(&env1.msg).expect("decode echo failed"); + let (echo1, _) = Echo::from_bytes(&env1.payload).expect("decode echo failed"); let second = LengthPrefixedProcessor::default() .decode(&mut buf) .expect("decode failed") @@ -144,9 +175,64 @@ async fn multiple_frames_processed_in_sequence() { let (env2, _) = BincodeSerializer .deserialize::(&second) .expect("deserialize failed"); - let (echo2, _) = Echo::from_bytes(&env2.msg).expect("decode echo failed"); + let (echo2, _) = Echo::from_bytes(&env2.payload).expect("decode echo failed"); assert_eq!(env1.correlation_id, Some(1)); assert_eq!(env2.correlation_id, Some(2)); assert_eq!(echo1, Echo(1)); assert_eq!(echo2, Echo(2)); } + +#[rstest] +#[case(None)] +#[case(Some(1))] +#[case(Some(2))] +#[tokio::test] +async fn single_frame_propagates_correlation_id(#[case] cid: Option) { + let app = WireframeApp::<_, _, TestEnvelope>::new() + .expect("failed to create app") + .frame_processor(LengthPrefixedProcessor::default()) + .route( + 1, + std::sync::Arc::new(|_: &TestEnvelope| Box::pin(async {})), + ) + .expect("route registration failed"); + + let msg_bytes = Echo(5).to_bytes().expect("encode failed"); + let env = TestEnvelope { + id: 1, + correlation_id: cid, + payload: msg_bytes, + }; + let env_bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); + + let mut framed = BytesMut::new(); + LengthPrefixedProcessor::default() + .encode(&env_bytes, &mut framed) + .expect("encode failed"); + + let out = drive_with_frames(app, vec![framed.to_vec()]) + .await + .expect("drive failed"); + let mut buf = BytesMut::from(&out[..]); + let frame = LengthPrefixedProcessor::default() + .decode(&mut buf) + .expect("decode failed") + .expect("missing"); + let (resp, _) = BincodeSerializer + .deserialize::(&frame) + .expect("deserialize failed"); + + assert_eq!(resp.correlation_id, cid); +} + +#[test] +fn packet_from_parts_round_trips() { + let env = TestEnvelope { + id: 5, + correlation_id: Some(9), + payload: vec![1, 2, 3], + }; + let parts = env.clone().into_parts(); + let rebuilt = TestEnvelope::from_parts(parts); + assert_eq!(rebuilt, env); +} From 7e87c8119faef1909a7e3a983aa407e9c1bb36a1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 9 Aug 2025 00:28:42 +0100 Subject: [PATCH 04/13] Document PacketParts and expose response correlation setter --- README.md | 5 ++++- docs/api.md | 25 +++++++++++++++++++++++ docs/roadmap.md | 8 +++++++- docs/rust-binary-router-library-design.md | 2 +- src/app.rs | 20 +++++------------- src/middleware.rs | 5 +++++ tests/lifecycle.rs | 12 +++++++++++ 7 files changed, 59 insertions(+), 18 deletions(-) create mode 100644 docs/api.md diff --git a/README.md b/README.md index 253788f0..28cbd766 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ impl Packet for MyEnv { fn id(&self) -> u32 { self.id } fn correlation_id(&self) -> Option { self.correlation_id } fn into_parts(self) -> PacketParts { - PacketParts { id: self.id, correlation_id: self.correlation_id, payload: self.payload } + PacketParts::new(self.id, self.correlation_id, self.payload) } fn from_parts(parts: PacketParts) -> Self { Self { id: parts.id, correlation_id: parts.correlation_id, payload: parts.payload } @@ -117,6 +117,9 @@ let app = WireframeApp::<_, _, MyEnv>::new() .unwrap(); ``` +A `None` correlation identifier denotes an unsolicited event or server push. +See [PacketParts](docs/api.md#packetparts) for field details. + This allows integration with existing packet formats without modifying `handle_frame`. diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 00000000..32960200 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,25 @@ +# API overview + +## PacketParts + +A `PacketParts` struct decomposes a packet into its components: + +```rust +let parts = PacketParts::new(id, correlation_id, payload); +``` + +- `id: u32` – frame identifier +- `correlation_id: Option` – `None` marks an unsolicited event or + server‑initiated push +- `payload: Vec` – raw message bytes + +Custom packet types can convert to and from `PacketParts` to avoid manual +mapping: + +```rust +let parts = PacketParts::new(id, None, data); +let env = Envelope::from(parts); +``` + +`None` propagation ensures packets that originate on the server carry no +accidental correlation identifier. diff --git a/docs/roadmap.md b/docs/roadmap.md index 9168bfe5..729a2c36 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -155,9 +155,15 @@ production environments. - [x] Expose key operational metrics (e.g., active connections, messages per second, error rates). - - [x] Provide an integration guide for popular monitoring systems (e.g., +- [x] Provide an integration guide for popular monitoring systems (e.g., Prometheus). +- [x] **Packet decomposition:** + + - [x] Introduce `PacketParts` to replace tuple-based packet handling. + - [x] Treat `correlation_id` as `Option` so `None` denotes an + unsolicited event or server-initiated push. + - [x] **Advanced Error Handling:** - [x] Implement panic handlers in connection tasks to prevent a single diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 09126e05..10e39404 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -498,7 +498,7 @@ frame processing, akin to how `tokio-util::codec` operates, endows "wireframe" with the necessary flexibility to adapt to this diversity without embedding assumptions about any single framing strategy into its core. -#### 4.3.1 Packet Abstraction +#### 4.3.1 Packet abstraction The library defines a `Packet` trait to represent transport frames. Frames can be decomposed into `PacketParts` for efficient handling and re-assembly. diff --git a/src/app.rs b/src/app.rs index 845d48d1..66d39bce 100644 --- a/src/app.rs +++ b/src/app.rs @@ -165,13 +165,7 @@ impl From for SendError { /// /// fn correlation_id(&self) -> Option { None } /// -/// fn into_parts(self) -> PacketParts { -/// PacketParts { -/// id: self.id, -/// correlation_id: None, -/// msg: self.payload, -/// } -/// } +/// fn into_parts(self) -> PacketParts { PacketParts::new(self.id, None, self.payload) } /// /// fn from_parts(parts: PacketParts) -> Self { /// Self { @@ -206,7 +200,7 @@ pub struct PacketParts { /// Basic envelope type used by [`handle_connection`]. /// -/// Incoming frames are deserialised into an `Envelope` containing the +/// Incoming frames are deserialized into an `Envelope` containing the /// message identifier and raw payload bytes. #[derive(bincode::Decode, bincode::Encode, Debug)] pub struct Envelope { @@ -337,11 +331,7 @@ where /// fn id(&self) -> u32 { self.id } /// fn correlation_id(&self) -> Option { self.correlation_id } /// fn into_parts(self) -> PacketParts { - /// PacketParts { - /// id: self.id, - /// correlation_id: self.correlation_id, - /// msg: self.data, - /// } + /// PacketParts::new(self.id, self.correlation_id, self.data) /// } /// fn from_parts(parts: PacketParts) -> Self { /// Self { @@ -630,7 +620,7 @@ where let routes = self.build_chains().await; if let Err(e) = self.process_stream(&mut stream, &routes).await { - tracing::warn!(error = ?e, "connection terminated with error"); + tracing::warn!(correlation_id = ?None::, error = ?e, "connection terminated with error"); } if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { @@ -758,7 +748,7 @@ where } Err(e) => { *deser_failures += 1; - tracing::warn!(error = ?e, "failed to deserialize message"); + tracing::warn!(correlation_id = ?None::, error = ?e, "failed to deserialize message"); crate::metrics::inc_deser_errors(); if *deser_failures >= MAX_DESER_FAILURES { return Err(io::Error::new( diff --git a/src/middleware.rs b/src/middleware.rs index 20f35a7f..1447b6dc 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -97,6 +97,11 @@ impl ServiceResponse { #[must_use] pub fn correlation_id(&self) -> Option { self.correlation_id } + /// Set or clear the correlation identifier. + pub fn set_correlation_id(&mut self, correlation_id: Option) { + self.correlation_id = correlation_id; + } + /// Consume the response, yielding the raw frame bytes. #[must_use] pub fn into_inner(self) -> Vec { self.inner.into_inner() } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 9e0bba7f..7af16e0c 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -151,6 +151,18 @@ async fn helpers_propagate_connection_state() { .await .expect("app run failed"); assert!(!out.is_empty()); + + let mut buf = BytesMut::from(&out[..]); + let processor = LengthPrefixedProcessor::default(); + let frame = processor + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (resp, _) = BincodeSerializer + .deserialize::(&frame) + .expect("deserialize failed"); + assert_eq!(resp.correlation_id, Some(0)); + assert_eq!(setup.load(Ordering::SeqCst), 1); assert_eq!(teardown.load(Ordering::SeqCst), 1); } From 47e3dfe74bc91c0df8267c31f0ba3ccadf02015e Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 10 Aug 2025 11:11:30 +0100 Subject: [PATCH 05/13] Test PacketParts conversions and mismatched IDs --- src/app.rs | 3 +++ src/middleware.rs | 4 +++- tests/packet_parts.rs | 21 +++++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 tests/packet_parts.rs diff --git a/src/app.rs b/src/app.rs index 66d39bce..22b79bdc 100644 --- a/src/app.rs +++ b/src/app.rs @@ -255,6 +255,9 @@ impl PacketParts { found = cid, "mismatched correlation id in response" ); + // Preserve the envelope's identifier to avoid leaking an + // unrelated correlation value. + self.correlation_id = Some(src); } _ => {} } diff --git a/src/middleware.rs b/src/middleware.rs index 1447b6dc..ae543658 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -98,8 +98,10 @@ impl ServiceResponse { pub fn correlation_id(&self) -> Option { self.correlation_id } /// Set or clear the correlation identifier. - pub fn set_correlation_id(&mut self, correlation_id: Option) { + #[must_use] + pub fn set_correlation_id(&mut self, correlation_id: Option) -> &mut Self { self.correlation_id = correlation_id; + self } /// Consume the response, yielding the raw frame bytes. diff --git a/tests/packet_parts.rs b/tests/packet_parts.rs new file mode 100644 index 00000000..cd9f3fa7 --- /dev/null +++ b/tests/packet_parts.rs @@ -0,0 +1,21 @@ +//! Tests for `PacketParts` conversions and helpers. + +use wireframe::app::{Envelope, Packet, PacketParts}; + +#[test] +fn envelope_from_parts_round_trip() { + let env = Envelope::new(2, Some(5), vec![1, 2]); + let parts = env.into_parts(); + let rebuilt = Envelope::from(parts); + let parts = rebuilt.into_parts(); + assert_eq!(parts.id, 2); + assert_eq!(parts.correlation_id, Some(5)); + assert_eq!(parts.payload, vec![1, 2]); +} + +#[test] +fn inherit_correlation_overwrites_mismatch() { + let parts = PacketParts::new(1, Some(7), vec![]); + let inherited = parts.inherit_correlation(Some(8)); + assert_eq!(inherited.correlation_id, Some(8)); +} From 020adc95bd80153e36b64f0ca310d98143397cf4 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 10 Aug 2025 12:16:38 +0100 Subject: [PATCH 06/13] Address review comments on correlation ID handling --- examples/metadata_routing.rs | 12 ++++++------ src/app.rs | 7 ++++++- src/middleware.rs | 8 ++++++-- tests/middleware.rs | 31 +++++-------------------------- tests/packet_parts.rs | 15 ++++++++++----- 5 files changed, 33 insertions(+), 40 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 772d4550..92748e7c 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -62,7 +62,7 @@ struct Ping; #[tokio::main] async fn main() -> io::Result<()> { let app = WireframeApp::new() - .unwrap() + .expect("failed to create app") .frame_processor(LengthPrefixedProcessor::default()) .serializer(HeaderSerializer) .route( @@ -73,7 +73,7 @@ async fn main() -> io::Result<()> { }) }), ) - .unwrap() + .expect("failed to add ping route") .route( 2, Arc::new(|_env: &Envelope| { @@ -82,14 +82,14 @@ async fn main() -> io::Result<()> { }) }), ) - .unwrap(); + .expect("failed to add pong route"); let (mut client, server) = duplex(1024); let server_task = tokio::spawn(async move { app.handle_connection(server).await; }); - let payload = Ping.to_bytes().unwrap(); + let payload = Ping.to_bytes().expect("failed to serialise Ping message"); let mut frame = Vec::new(); frame.extend_from_slice(&1u16.to_be_bytes()); frame.push(0); @@ -98,11 +98,11 @@ async fn main() -> io::Result<()> { let mut bytes = BytesMut::new(); LengthPrefixedProcessor::default() .encode(&frame, &mut bytes) - .unwrap(); + .expect("failed to encode frame"); client.write_all(&bytes).await?; client.shutdown().await?; - server_task.await.unwrap(); + server_task.await.expect("server task failed"); Ok(()) } diff --git a/src/app.rs b/src/app.rs index 22b79bdc..a83d5484 100644 --- a/src/app.rs +++ b/src/app.rs @@ -772,7 +772,12 @@ where let correlation_id = parts.correlation_id; let response = Envelope::from_parts(parts); if let Err(e) = self.send_response(stream, &response).await { - tracing::warn!(id = env.id, correlation_id = ?correlation_id, error = %e, "failed to send response"); + tracing::warn!( + id = env.id, + correlation_id = ?correlation_id, + error = ?e, + "failed to send response", + ); crate::metrics::inc_handler_errors(); } } diff --git a/src/middleware.rs b/src/middleware.rs index ae543658..b2d3e481 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -324,8 +324,12 @@ impl Service for RouteService { req.into_inner(), )); (self.handler.as_ref())(&env).await; - let parts = env.into_parts(); - Ok(ServiceResponse::new(parts.payload, parts.correlation_id)) + let PacketParts { + payload, + correlation_id, + .. + } = env.into_parts(); + Ok(ServiceResponse::new(payload, correlation_id)) } } diff --git a/tests/middleware.rs b/tests/middleware.rs index baac1cb8..24114a75 100644 --- a/tests/middleware.rs +++ b/tests/middleware.rs @@ -3,6 +3,7 @@ //! Confirm that a custom middleware can modify requests and responses. use async_trait::async_trait; +use rstest::rstest; use wireframe::middleware::{Next, Service, ServiceRequest, ServiceResponse, Transform}; struct EchoService; @@ -49,38 +50,16 @@ where } } +#[rstest(correlation_id => [None, Some(0), Some(42)])] #[tokio::test] -async fn middleware_modifies_request_and_response() { +async fn middleware_modifies_request_and_response_preserves_cid(correlation_id: Option) { let service = EchoService; let mw = ModifyMiddleware; let wrapped = mw.transform(service).await; - let request = ServiceRequest::new(vec![1, 2, 3], Some(0)); + let request = ServiceRequest::new(vec![1, 2, 3], correlation_id); let response = wrapped.call(request).await.expect("middleware call failed"); - assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']); -} -#[tokio::test] -async fn test_modify_middleware_preserves_nonzero_correlation_id() { - let service = EchoService; - let mw = ModifyMiddleware; - let wrapped = mw.transform(service).await; - - let correlation_id = Some(42); - let request = ServiceRequest::new(vec![4, 5, 6], correlation_id); - let response = wrapped.call(request).await.expect("middleware call failed"); - assert_eq!(response.frame(), &[4, 5, 6, b'!', b'?']); + assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']); assert_eq!(response.correlation_id(), correlation_id); } - -#[tokio::test] -async fn middleware_preserves_none_correlation_id() { - let service = EchoService; - let mw = ModifyMiddleware; - let wrapped = mw.transform(service).await; - - let request = ServiceRequest::new(vec![7, 8, 9], None); - let response = wrapped.call(request).await.expect("middleware call failed"); - assert_eq!(response.frame(), &[7, 8, 9, b'!', b'?']); - assert_eq!(response.correlation_id(), None); -} diff --git a/tests/packet_parts.rs b/tests/packet_parts.rs index cd9f3fa7..4e845bd1 100644 --- a/tests/packet_parts.rs +++ b/tests/packet_parts.rs @@ -13,9 +13,14 @@ fn envelope_from_parts_round_trip() { assert_eq!(parts.payload, vec![1, 2]); } -#[test] -fn inherit_correlation_overwrites_mismatch() { - let parts = PacketParts::new(1, Some(7), vec![]); - let inherited = parts.inherit_correlation(Some(8)); - assert_eq!(inherited.correlation_id, Some(8)); +#[rstest::rstest( + start, source, expected, + case(PacketParts::new(1, None, vec![]), Some(42), Some(42)), + case(PacketParts::new(1, Some(7), vec![]), None, Some(7)), + case(PacketParts::new(1, None, vec![]), None, None), + case(PacketParts::new(1, Some(7), vec![]), Some(8), Some(8)), +)] +fn inherit_variants(start: PacketParts, source: Option, expected: Option) { + let got = start.inherit_correlation(source); + assert_eq!(got.correlation_id, expected); } From 2df17815e7f2c933130a9ebee8dabfcdb61293c0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 10 Aug 2025 13:04:03 +0100 Subject: [PATCH 07/13] Encapsulate PacketParts and clarify correlation semantics --- README.md | 13 +++++-- docs/api.md | 6 +-- docs/rust-binary-router-library-design.md | 5 ++- src/app.rs | 46 ++++++++++++++++++----- src/middleware.rs | 8 ++-- tests/lifecycle.rs | 9 +++-- tests/middleware_order.rs | 10 +++-- tests/packet_parts.rs | 11 ++++-- tests/routes.rs | 9 +++-- 9 files changed, 79 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 28cbd766..3df62974 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,10 @@ impl Packet for MyEnv { PacketParts::new(self.id, self.correlation_id, self.payload) } fn from_parts(parts: PacketParts) -> Self { - Self { id: parts.id, correlation_id: parts.correlation_id, payload: parts.payload } + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.payload(); + Self { id, correlation_id, payload } } } @@ -117,8 +120,10 @@ let app = WireframeApp::<_, _, MyEnv>::new() .unwrap(); ``` -A `None` correlation identifier denotes an unsolicited event or server push. -See [PacketParts](docs/api.md#packetparts) for field details. +A `None` correlation identifier denotes an unsolicited event or +server-initiated push. Use `None` rather than `Some(0)` when a frame lacks a +correlation identifier. See [PacketParts](docs/api.md#packetparts) for field +details. This allows integration with existing packet formats without modifying `handle_frame`. @@ -286,7 +291,7 @@ Example programs are available in the `examples/` directory: - `ping_pong.rs` — showcases serialization and middleware in a ping/pong protocol. See [examples/ping_pong.md](examples/ping_pong.md) for a detailed overview. -- `packet_enum.rs` – shows packet type discrimination with a bincode enum and a +- `packet_enum.rs` — shows packet type discrimination with a bincode enum and a frame containing container types like `HashMap` and `Vec`. Run an example with Cargo: diff --git a/docs/api.md b/docs/api.md index 32960200..120e05d0 100644 --- a/docs/api.md +++ b/docs/api.md @@ -8,10 +8,10 @@ A `PacketParts` struct decomposes a packet into its components: let parts = PacketParts::new(id, correlation_id, payload); ``` -- `id: u32` – frame identifier -- `correlation_id: Option` – `None` marks an unsolicited event or +- `id: u32` — frame identifier +- `correlation_id: Option` — `None` marks an unsolicited event or server‑initiated push -- `payload: Vec` – raw message bytes +- `payload: Vec` — raw message bytes Custom packet types can convert to and from `PacketParts` to avoid manual mapping: diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 10e39404..a33d6973 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -501,7 +501,7 @@ assumptions about any single framing strategy into its core. #### 4.3.1 Packet abstraction The library defines a `Packet` trait to represent transport frames. Frames can -be decomposed into `PacketParts` for efficient handling and re-assembly. +be decomposed into `PacketParts` for efficient handling and reassembly. `Envelope` is the default implementation used by `wireframe`. ```mermaid @@ -531,7 +531,8 @@ classDiagram `Envelope` implements `Packet`, carrying payload and metadata through the system. `PacketParts` avoids repetitive tuple unpacking when frames are split -into constituent pieces. +into constituent pieces. A `None` correlation identifier denotes an unsolicited +event or server-initiated push. ### 4.4. Message Serialization and Deserialization diff --git a/src/app.rs b/src/app.rs index a83d5484..13a3f611 100644 --- a/src/app.rs +++ b/src/app.rs @@ -169,8 +169,8 @@ impl From for SendError { /// /// fn from_parts(parts: PacketParts) -> Self { /// Self { -/// id: parts.id, -/// payload: parts.payload, +/// id: parts.id(), +/// payload: parts.payload(), /// timestamp: 0, /// } /// } @@ -193,9 +193,9 @@ pub trait Packet: Message + Send + Sync + 'static { /// Component values extracted from or used to build a [`Packet`]. #[derive(Debug)] pub struct PacketParts { - pub id: u32, - pub correlation_id: Option, - pub payload: Vec, + id: u32, + correlation_id: Option, + payload: Vec, } /// Basic envelope type used by [`handle_connection`]. @@ -244,7 +244,28 @@ impl PacketParts { } } + #[must_use] + pub const fn id(&self) -> u32 { self.id } + + #[must_use] + pub const fn correlation_id(&self) -> Option { self.correlation_id } + + #[must_use] + pub fn payload(self) -> Vec { self.payload } + /// Ensure a correlation identifier is present, inheriting from `source` if missing. + /// + /// # Examples + /// ``` + /// use wireframe::app::PacketParts; + /// // Inherit when missing + /// let parts = PacketParts::new(1, None, vec![]).inherit_correlation(Some(42)); + /// assert_eq!(parts.correlation_id(), Some(42)); + /// + /// // Overwrite mismatched value + /// let parts = PacketParts::new(1, Some(7), vec![]).inherit_correlation(Some(8)); + /// assert_eq!(parts.correlation_id(), Some(8)); + /// ``` #[must_use] pub fn inherit_correlation(mut self, source: Option) -> Self { match (self.correlation_id, source) { @@ -270,7 +291,12 @@ impl From for PacketParts { } impl From for Envelope { - fn from(p: PacketParts) -> Self { Envelope::new(p.id, p.correlation_id, p.payload) } + fn from(p: PacketParts) -> Self { + let id = p.id(); + let correlation_id = p.correlation_id(); + let payload = p.payload(); + Envelope::new(id, correlation_id, payload) + } } /// Number of idle polls before terminating a connection. @@ -338,9 +364,9 @@ where /// } /// fn from_parts(parts: PacketParts) -> Self { /// Self { - /// id: parts.id, - /// correlation_id: parts.correlation_id, - /// data: parts.payload, + /// id: parts.id(), + /// correlation_id: parts.correlation_id(), + /// data: parts.payload(), /// } /// } /// } @@ -769,7 +795,7 @@ where Ok(resp) => { let parts = PacketParts::new(env.id, resp.correlation_id(), resp.into_inner()) .inherit_correlation(env.correlation_id); - let correlation_id = parts.correlation_id; + let correlation_id = parts.correlation_id(); let response = Envelope::from_parts(parts); if let Err(e) = self.send_response(stream, &response).await { tracing::warn!( diff --git a/src/middleware.rs b/src/middleware.rs index b2d3e481..254f08f1 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -324,11 +324,9 @@ impl Service for RouteService { req.into_inner(), )); (self.handler.as_ref())(&env).await; - let PacketParts { - payload, - correlation_id, - .. - } = env.into_parts(); + let parts = env.into_parts(); + let correlation_id = parts.correlation_id(); + let payload = parts.payload(); Ok(ServiceResponse::new(payload, correlation_id)) } } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 7af16e0c..ada5559d 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -116,10 +116,13 @@ impl Packet for StateEnvelope { } fn from_parts(parts: PacketParts) -> Self { + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.payload(); Self { - id: parts.id, - correlation_id: parts.correlation_id, - payload: parts.payload, + id, + correlation_id, + payload, } } } diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index d6df499e..6b66f55b 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use bytes::BytesMut; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use wireframe::{ - app::{Envelope, Handler, Packet, WireframeApp}, + app::{Envelope, Handler, WireframeApp}, frame::{FrameProcessor, LengthPrefixedProcessor}, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, @@ -88,7 +88,9 @@ async fn middleware_applied_in_reverse_order() { let (resp, _) = serializer .deserialize::(&frame) .expect("deserialize failed"); - let parts = resp.into_parts(); - assert_eq!(parts.payload, vec![b'X', b'A', b'B', b'B', b'A']); - assert_eq!(parts.correlation_id, Some(7)); + let parts = wireframe::app::Packet::into_parts(resp); + let correlation_id = parts.correlation_id(); + let payload = parts.payload(); + assert_eq!(payload, vec![b'X', b'A', b'B', b'B', b'A']); + assert_eq!(correlation_id, Some(7)); } diff --git a/tests/packet_parts.rs b/tests/packet_parts.rs index 4e845bd1..727964b9 100644 --- a/tests/packet_parts.rs +++ b/tests/packet_parts.rs @@ -8,9 +8,12 @@ fn envelope_from_parts_round_trip() { let parts = env.into_parts(); let rebuilt = Envelope::from(parts); let parts = rebuilt.into_parts(); - assert_eq!(parts.id, 2); - assert_eq!(parts.correlation_id, Some(5)); - assert_eq!(parts.payload, vec![1, 2]); + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.payload(); + assert_eq!(id, 2); + assert_eq!(correlation_id, Some(5)); + assert_eq!(payload, vec![1, 2]); } #[rstest::rstest( @@ -22,5 +25,5 @@ fn envelope_from_parts_round_trip() { )] fn inherit_variants(start: PacketParts, source: Option, expected: Option) { let got = start.inherit_correlation(source); - assert_eq!(got.correlation_id, expected); + assert_eq!(got.correlation_id(), expected); } diff --git a/tests/routes.rs b/tests/routes.rs index 42fcb990..e57a3b63 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -37,10 +37,13 @@ impl Packet for TestEnvelope { } fn from_parts(parts: PacketParts) -> Self { + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.payload(); Self { - id: parts.id, - correlation_id: parts.correlation_id, - payload: parts.payload, + id, + correlation_id, + payload, } } } From 7325a7422a48a3b3a41d3354b600e44f5a1b6d77 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 11 Aug 2025 10:55:44 +0100 Subject: [PATCH 08/13] Expose request correlation setter Add PacketParts helpers to design diagram and clarify inheritance behaviour. Provide ServiceRequest::set_correlation_id for middleware to adjust inbound correlation IDs, update tests to exercise the new setter, and use the shared processor helper in lifecycle tests. --- docs/rust-binary-router-library-design.md | 2 ++ src/app.rs | 4 ++-- src/middleware.rs | 7 +++++++ tests/lifecycle.rs | 6 +++--- tests/middleware.rs | 10 ++++++++++ 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index a33d6973..171cb7ee 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -516,6 +516,8 @@ classDiagram +id: u32 +correlation_id: Option +payload: Vec + +new(id: u32, correlation_id: Option, payload: Vec) PacketParts + +inherit_correlation(source: Option) PacketParts } class Envelope { +id: u32 diff --git a/src/app.rs b/src/app.rs index 13a3f611..04f4410b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -276,8 +276,8 @@ impl PacketParts { found = cid, "mismatched correlation id in response" ); - // Preserve the envelope's identifier to avoid leaking an - // unrelated correlation value. + // Overwrite with the source correlation ID to ensure downstream + // consistency. self.correlation_id = Some(src); } _ => {} diff --git a/src/middleware.rs b/src/middleware.rs index 254f08f1..f5eca6f7 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -59,6 +59,13 @@ impl ServiceRequest { #[must_use] pub fn correlation_id(&self) -> Option { self.correlation_id } + /// Set or clear the correlation identifier on the request. + #[must_use] + pub fn set_correlation_id(&mut self, correlation_id: Option) -> &mut Self { + self.correlation_id = correlation_id; + self + } + /// Mutable access to the inner frame bytes. #[must_use] pub fn frame_mut(&mut self) -> &mut Vec { self.inner.frame_mut() } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index ada5559d..be8b5660 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -156,13 +156,13 @@ async fn helpers_propagate_connection_state() { assert!(!out.is_empty()); let mut buf = BytesMut::from(&out[..]); - let processor = LengthPrefixedProcessor::default(); - let frame = processor + let processor = processor(); + let decoded = processor .decode(&mut buf) .expect("decode failed") .expect("frame missing"); let (resp, _) = BincodeSerializer - .deserialize::(&frame) + .deserialize::(&decoded) .expect("deserialize failed"); assert_eq!(resp.correlation_id, Some(0)); diff --git a/tests/middleware.rs b/tests/middleware.rs index 24114a75..33ef5924 100644 --- a/tests/middleware.rs +++ b/tests/middleware.rs @@ -63,3 +63,13 @@ async fn middleware_modifies_request_and_response_preserves_cid(correlation_id: assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']); assert_eq!(response.correlation_id(), correlation_id); } + +#[test] +fn service_request_setter_updates_correlation_id() { + let mut req = ServiceRequest::new(vec![], None); + let _ = req.set_correlation_id(Some(7)); + assert_eq!(req.correlation_id(), Some(7)); + + let _ = req.set_correlation_id(None); + assert_eq!(req.correlation_id(), None); +} From 6bda6e0b81dad6103bcaa79e50db7957be633e8e Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 11 Aug 2025 19:19:58 +0100 Subject: [PATCH 09/13] Document packet part privacy --- README.md | 7 +++---- docs/rust-binary-router-library-design.md | 13 ++++++++----- examples/metadata_routing.rs | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 3df62974..6ac81ce1 100644 --- a/README.md +++ b/README.md @@ -120,10 +120,9 @@ let app = WireframeApp::<_, _, MyEnv>::new() .unwrap(); ``` -A `None` correlation identifier denotes an unsolicited event or -server-initiated push. Use `None` rather than `Some(0)` when a frame lacks a -correlation identifier. See [PacketParts](docs/api.md#packetparts) for field -details. +A `None` correlation ID denotes an unsolicited event or server-initiated push. +Use `None` rather than `Some(0)` when a frame lacks a correlation ID. See +[PacketParts](docs/api.md#packetparts) for field details. This allows integration with existing packet formats without modifying `handle_frame`. diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 171cb7ee..6f5e3082 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -513,10 +513,13 @@ classDiagram +from_parts(parts: PacketParts) Self } class PacketParts { - +id: u32 - +correlation_id: Option - +payload: Vec + -id: u32 + -correlation_id: Option + -payload: Vec +new(id: u32, correlation_id: Option, payload: Vec) PacketParts + +id() u32 + +correlation_id() Option + +payload() Vec +inherit_correlation(source: Option) PacketParts } class Envelope { @@ -533,8 +536,8 @@ classDiagram `Envelope` implements `Packet`, carrying payload and metadata through the system. `PacketParts` avoids repetitive tuple unpacking when frames are split -into constituent pieces. A `None` correlation identifier denotes an unsolicited -event or server-initiated push. +into constituent pieces. A `None` correlation ID denotes an unsolicited event +or server-initiated push. ### 4.4. Message Serialization and Deserialization diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 92748e7c..aae264aa 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -89,7 +89,7 @@ async fn main() -> io::Result<()> { app.handle_connection(server).await; }); - let payload = Ping.to_bytes().expect("failed to serialise Ping message"); + let payload = Ping.to_bytes().expect("failed to serialize Ping message"); let mut frame = Vec::new(); frame.extend_from_slice(&1u16.to_be_bytes()); frame.push(0); From 1a94d002432bd3738481449e17a25a1b8d9a5a22 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 11 Aug 2025 22:57:24 +0100 Subject: [PATCH 10/13] Import PacketParts in doctest and use processor helper --- src/app.rs | 7 +++++-- tests/lifecycle.rs | 6 +++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/app.rs b/src/app.rs index 04f4410b..d2b22925 100644 --- a/src/app.rs +++ b/src/app.rs @@ -151,7 +151,10 @@ impl From for SendError { /// # Example /// /// ``` -/// use wireframe::{app::Packet, message::Message}; +/// use wireframe::{ +/// app::{Packet, PacketParts}, +/// message::Message, +/// }; /// /// #[derive(bincode::Decode, bincode::Encode)] /// struct CustomEnvelope { @@ -314,7 +317,7 @@ where E: Packet, { /// - /// Initialises empty routes, services, middleware, and application data. + /// Initializes empty routes, services, middleware, and application data. /// Sets the default frame processor and serializer, with no connection /// lifecycle hooks. fn default() -> Self { diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index be8b5660..6a00242f 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -14,7 +14,7 @@ use std::{ use bytes::BytesMut; use wireframe::{ app::{Envelope, Packet, PacketParts, WireframeApp}, - frame::{FrameProcessor, LengthPrefixedProcessor}, + frame::FrameProcessor, serializer::{BincodeSerializer, Serializer}, }; use wireframe_testing::{processor, run_app, run_with_duplex_server}; @@ -146,8 +146,8 @@ async fn helpers_propagate_connection_state() { .serialize(&env) .expect("failed to serialise envelope"); let mut frame = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&bytes, &mut frame) + let proc = processor(); + proc.encode(&bytes, &mut frame) .expect("encode should succeed"); let out = run_app(app, vec![frame.to_vec()], None) From e7dc1a4bee3a80886f28d3d4375b7e4f48849dad Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 12 Aug 2025 00:42:01 +0100 Subject: [PATCH 11/13] Link packet enum example and log mismatched ids --- README.md | 5 +++-- docs/rust-binary-router-library-design.md | 5 ++++- src/app.rs | 3 ++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6ac81ce1..a9820cc1 100644 --- a/README.md +++ b/README.md @@ -290,8 +290,9 @@ Example programs are available in the `examples/` directory: - `ping_pong.rs` — showcases serialization and middleware in a ping/pong protocol. See [examples/ping_pong.md](examples/ping_pong.md) for a detailed overview. -- `packet_enum.rs` — shows packet type discrimination with a bincode enum and a - frame containing container types like `HashMap` and `Vec`. +- [`packet_enum.rs`](examples/packet_enum.rs) — shows packet type discrimination + with a bincode enum and a frame containing container types like `HashMap` and + `Vec`. Run an example with Cargo: diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 6f5e3082..773f64a7 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -537,7 +537,10 @@ classDiagram `Envelope` implements `Packet`, carrying payload and metadata through the system. `PacketParts` avoids repetitive tuple unpacking when frames are split into constituent pieces. A `None` correlation ID denotes an unsolicited event -or server-initiated push. +or server-initiated push. In multi-packet streaming responses, the optional +`correlation_id` links all packets in the stream to the originating request, +and protocols should define an explicit end-of-stream indicator alongside the +shared correlation identifier. ### 4.4. Message Serialization and Deserialization diff --git a/src/app.rs b/src/app.rs index d2b22925..02e0b9b2 100644 --- a/src/app.rs +++ b/src/app.rs @@ -275,9 +275,10 @@ impl PacketParts { (None, cid) => self.correlation_id = cid, (Some(cid), Some(src)) if cid != src => { tracing::warn!( + id = self.id, expected = src, found = cid, - "mismatched correlation id in response" + "mismatched correlation id in response", ); // Overwrite with the source correlation ID to ensure downstream // consistency. From 0cf61886d54819ed6219868d8641234f3ddc4691 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 12 Aug 2025 01:16:47 +0100 Subject: [PATCH 12/13] Mark Envelope fields private in class diagram --- docs/rust-binary-router-library-design.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 773f64a7..3a39e578 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -523,9 +523,9 @@ classDiagram +inherit_correlation(source: Option) PacketParts } class Envelope { - +id: u32 - +correlation_id: Option - +payload: Vec + -id: u32 + -correlation_id: Option + -payload: Vec +new(id: u32, correlation_id: Option, payload: Vec) +into_parts() PacketParts } From c565a235fb4e03877b8ba3c111ba876adbb89c84 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 12 Aug 2025 01:56:55 +0100 Subject: [PATCH 13/13] Clarify packet abstraction diagram --- docs/rust-binary-router-library-design.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 3a39e578..15df8665 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -502,11 +502,13 @@ assumptions about any single framing strategy into its core. The library defines a `Packet` trait to represent transport frames. Frames can be decomposed into `PacketParts` for efficient handling and reassembly. -`Envelope` is the default implementation used by `wireframe`. +`Envelope` is the default implementation used by `wireframe`. The following +diagram depicts the `Packet` trait, `PacketParts`, and `Envelope`. ```mermaid classDiagram class Packet { + <> +id() u32 +correlation_id() Option +into_parts() PacketParts @@ -527,6 +529,7 @@ classDiagram -correlation_id: Option -payload: Vec +new(id: u32, correlation_id: Option, payload: Vec) + +from_parts(parts: PacketParts) Envelope +into_parts() PacketParts } Packet <|.. Envelope