diff --git a/README.md b/README.md index 93188c7c..e277bcfa 100644 --- a/README.md +++ b/README.md @@ -83,8 +83,7 @@ WireframeServer::new(|| { ``` This example showcases how derive macros and the framing abstraction simplify a -binary protocol server. See the - +binary protocol server. See the [full example](docs/rust-binary-router-library-design.md#5-6-illustrative-api-usage-examples) in the design document for further details. @@ -99,12 +98,15 @@ payload bytes. Applications can supply their own envelope type by calling use wireframe::app::{Packet, WireframeApp}; #[derive(bincode::Encode, bincode::BorrowDecode)] -struct MyEnv { id: u32, data: Vec } +struct MyEnv { id: u32, correlation_id: u64, data: Vec } impl Packet for MyEnv { fn id(&self) -> u32 { self.id } - fn into_parts(self) -> (u32, Vec) { (self.id, self.data) } - fn from_parts(id: u32, data: Vec) -> Self { Self { id, data } } + fn correlation_id(&self) -> u64 { self.correlation_id } + fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.data) } + fn from_parts(id: u32, correlation_id: u64, data: Vec) -> Self { + Self { id, correlation_id, data } + } } let app = WireframeApp::<_, _, MyEnv>::new() diff --git a/docs/roadmap.md b/docs/roadmap.md index 57b890c0..9168bfe5 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -179,7 +179,7 @@ stream. - [ ] **Protocol Enhancement:** - - [ ] Add a `correlation_id` field to the `Frame` header. For a request, this + - [x] Add a `correlation_id` field to the `Frame` header. For a request, this is the unique request ID. For each message in a multi-packet response, this ID must match the original request's ID. diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index f9fedb65..e5d44ab2 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -59,6 +59,10 @@ pub enum Response { } ``` +Each frame header now carries a 64-bit `correlation_id`. Requests set this +value, and every packet in a multi-part response repeats it, so clients can +match frames to the originating request. + This design is powered by the `async-stream` crate, which allows developers to write imperative-looking logic that generates a declarative `Stream` object. It provides the best of both worlds: the intuitive feel of a `for` loop for diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 45459ac5..b2f77eca 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -52,7 +52,7 @@ impl FrameMetadata for HeaderSerializer { // `parse` receives the complete frame because `LengthPrefixedProcessor` // ensures `src` contains exactly one message. Returning `src.len()` is // therefore correct for this demo. - Ok((Envelope::new(id, payload), src.len())) + Ok((Envelope::new(id, 0, payload), src.len())) } } diff --git a/src/app.rs b/src/app.rs index a84df128..7d23028c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -163,9 +163,12 @@ impl From for SendError { /// impl Packet for CustomEnvelope { /// fn id(&self) -> u32 { self.id } /// -/// fn into_parts(self) -> (u32, Vec) { (self.id, self.payload) } +/// fn correlation_id(&self) -> u64 { 0 } /// -/// fn from_parts(id: u32, msg: Vec) -> Self { +/// fn into_parts(self) -> (u32, u64, Vec) { (self.id, 0, self.payload) } +/// +/// fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { +/// let _ = correlation_id; /// Self { /// id, /// payload: msg, @@ -178,39 +181,60 @@ pub trait Packet: Message + Send + Sync + 'static { /// Return the message identifier used for routing. fn id(&self) -> u32; - /// Consume the packet and return its identifier and payload bytes. - fn into_parts(self) -> (u32, Vec); + /// Return the correlation identifier tying this frame to a request. + fn correlation_id(&self) -> u64; + + /// Consume the packet and return its identifier, correlation id and payload bytes. + fn into_parts(self) -> (u32, u64, Vec); - /// Construct a new packet from an id and raw payload bytes. - fn from_parts(id: u32, msg: Vec) -> Self; + /// Construct a new packet from id, correlation id and raw payload bytes. + fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self; } /// Basic envelope type used by [`handle_connection`]. /// /// Incoming frames are deserialized into an `Envelope` containing the /// message identifier and raw payload bytes. -#[derive(bincode::Decode, bincode::Encode)] -pub struct Envelope { +#[derive(bincode::Decode, bincode::Encode, Copy, Clone, Debug)] +pub struct PacketHeader { pub(crate) id: u32, + pub(crate) correlation_id: u64, +} + +#[derive(bincode::Decode, bincode::Encode, Debug)] +pub struct Envelope { + pub(crate) header: PacketHeader, pub(crate) msg: Vec, } impl Envelope { - /// Create a new [`Envelope`] with the provided id and payload. + /// Create a new [`Envelope`] with the provided identifiers and payload. #[must_use] - pub fn new(id: u32, msg: Vec) -> Self { Self { id, msg } } + pub fn new(id: u32, correlation_id: u64, msg: Vec) -> Self { + Self { + header: PacketHeader { id, correlation_id }, + msg, + } + } - /// Consume the envelope, returning its id and payload bytes. + /// Consume the envelope, returning its header and payload bytes. #[must_use] - pub fn into_parts(self) -> (u32, Vec) { (self.id, self.msg) } + pub fn into_parts(self) -> (PacketHeader, Vec) { (self.header, self.msg) } } impl Packet for Envelope { - fn id(&self) -> u32 { self.id } + fn id(&self) -> u32 { self.header.id } - fn into_parts(self) -> (u32, Vec) { (self.id, self.msg) } + fn correlation_id(&self) -> u64 { self.header.correlation_id } - fn from_parts(id: u32, msg: Vec) -> Self { Self { id, msg } } + fn into_parts(self) -> (u32, u64, Vec) { + let (header, msg) = Envelope::into_parts(self); + (header.id, header.correlation_id, msg) + } + + fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { + Envelope::new(id, correlation_id, msg) + } } /// Number of idle polls before terminating a connection. @@ -266,13 +290,21 @@ where /// #[derive(bincode::Encode, bincode::BorrowDecode)] /// struct MyEnv { /// id: u32, + /// correlation_id: u64, /// data: Vec, /// } /// /// impl Packet for MyEnv { /// fn id(&self) -> u32 { self.id } - /// fn into_parts(self) -> (u32, Vec) { (self.id, self.data) } - /// fn from_parts(id: u32, data: Vec) -> Self { Self { id, data } } + /// fn correlation_id(&self) -> u64 { self.correlation_id } + /// fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.data) } + /// fn from_parts(id: u32, correlation_id: u64, data: Vec) -> Self { + /// Self { + /// id, + /// correlation_id, + /// data, + /// } + /// } /// } /// /// let app = WireframeApp::<_, _, MyEnv>::new().expect("failed to create app"); @@ -693,26 +725,24 @@ where } }; - if let Some(service) = routes.get(&env.id) { - let request = ServiceRequest::new(env.msg); + if let Some(service) = routes.get(&env.header.id) { + let request = ServiceRequest::new(env.msg, env.header.correlation_id); match service.call(request).await { Ok(resp) => { - let response = Envelope { - id: env.id, - msg: resp.into_inner(), - }; + let response = + Envelope::new(env.header.id, env.header.correlation_id, resp.into_inner()); if let Err(e) = self.send_response(stream, &response).await { tracing::warn!(error = %e, "failed to send response"); crate::metrics::inc_handler_errors(); } } Err(e) => { - tracing::warn!(id = env.id, error = ?e, "handler error"); + tracing::warn!(id = env.header.id, error = ?e, "handler error"); crate::metrics::inc_handler_errors(); } } } else { - tracing::warn!("no handler for message id {}", env.id); + tracing::warn!("no handler for message id {}", env.header.id); crate::metrics::inc_handler_errors(); } diff --git a/src/middleware.rs b/src/middleware.rs index 04520ab5..de33f7ef 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -36,14 +36,16 @@ impl FrameContainer { #[derive(Debug)] pub struct ServiceRequest { inner: FrameContainer>, + correlation_id: u64, } impl ServiceRequest { /// Create a new [`ServiceRequest`] from raw frame bytes. #[must_use] - pub fn new(frame: Vec) -> Self { + pub fn new(frame: Vec, correlation_id: u64) -> Self { Self { inner: FrameContainer::new(frame), + correlation_id, } } @@ -51,6 +53,10 @@ impl ServiceRequest { #[must_use] pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() } + /// Return the correlation identifier associated with this request. + #[must_use] + pub fn correlation_id(&self) -> u64 { self.correlation_id } + /// Mutable access to the inner frame bytes. #[must_use] pub fn frame_mut(&mut self) -> &mut Vec { self.inner.frame_mut() } @@ -297,9 +303,9 @@ impl Service for RouteService { async fn call(&self, req: ServiceRequest) -> Result { // The handler only borrows the envelope, allowing us to consume it // afterwards to extract the response payload. - let env = E::from_parts(self.id, req.into_inner()); + let env = E::from_parts(self.id, req.correlation_id(), req.into_inner()); (self.handler.as_ref())(&env).await; - let (_, bytes) = env.into_parts(); + let (_, _, bytes) = env.into_parts(); Ok(ServiceResponse::new(bytes)) } } diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs new file mode 100644 index 00000000..25316648 --- /dev/null +++ b/tests/correlation_id.rs @@ -0,0 +1,24 @@ +//! Tests for `correlation_id` propagation in streaming responses. +use async_stream::try_stream; +use tokio_util::sync::CancellationToken; +use wireframe::{ + app::{Envelope, Packet}, + connection::ConnectionActor, + push::PushQueues, + response::FrameStream, +}; + +#[tokio::test] +async fn stream_frames_carry_request_correlation_id() { + let cid = 42u64; + let stream: FrameStream = Box::pin(try_stream! { + yield Envelope::new(1, cid, vec![1]); + yield Envelope::new(1, cid, vec![2]); + }); + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); + let mut out = Vec::new(); + actor.run(&mut out).await.expect("actor run failed"); + assert!(out.iter().all(|e| e.correlation_id() == cid)); +} diff --git a/tests/cucumber.rs b/tests/cucumber.rs index 8837c3ae..b922302e 100644 --- a/tests/cucumber.rs +++ b/tests/cucumber.rs @@ -1,13 +1,28 @@ -//! Cucumber test runner for panic resilience integration tests. +//! Cucumber test runner for integration tests. //! -//! Runs behavioural tests defined in `tests/features/` using the -//! `PanicWorld` test context to verify server panic handling. +//! Orchestrates two distinct test suites: +//! - `PanicWorld`: Tests server resilience during connection panics +//! - `CorrelationWorld`: Tests correlation ID propagation in multi-frame responses +//! +//! # Example +//! +//! The runner executes feature files sequentially: +//! ```text +//! tests/features/connection_panic.feature -> PanicWorld context +//! tests/features/correlation_id.feature -> CorrelationWorld context +//! ``` +//! +//! Each context provides specialised step definitions and state management +//! for their respective test scenarios. mod steps; mod world; use cucumber::World; -use world::PanicWorld; +use world::{CorrelationWorld, PanicWorld}; #[tokio::main] -async fn main() { PanicWorld::run("tests/features").await; } +async fn main() { + PanicWorld::run("tests/features/connection_panic.feature").await; + CorrelationWorld::run("tests/features/correlation_id.feature").await; +} diff --git a/tests/features/correlation_id.feature b/tests/features/correlation_id.feature new file mode 100644 index 00000000..bdb56b19 --- /dev/null +++ b/tests/features/correlation_id.feature @@ -0,0 +1,5 @@ +Feature: Multi-packet response correlation + Scenario: Streamed frames reuse the request correlation id + Given a correlation id 7 + When a stream of frames is processed + Then each emitted frame uses correlation id 7 diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 49375170..c7dc00ba 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -102,15 +102,24 @@ async fn teardown_without_setup_does_not_run() { #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct StateEnvelope { id: u32, + correlation_id: u64, msg: Vec, } impl wireframe::app::Packet for StateEnvelope { fn id(&self) -> u32 { self.id } - fn into_parts(self) -> (u32, Vec) { (self.id, self.msg) } + fn correlation_id(&self) -> u64 { self.correlation_id } - fn from_parts(id: u32, msg: Vec) -> Self { Self { id, msg } } + fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.msg) } + + fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { + Self { + id, + correlation_id, + msg, + } + } } #[tokio::test] @@ -125,6 +134,7 @@ async fn helpers_propagate_connection_state() { let env = StateEnvelope { id: 1, + correlation_id: 0, msg: vec![1], }; let bytes = BincodeSerializer diff --git a/tests/metadata.rs b/tests/metadata.rs index 86c9a3bf..eaa8dc8b 100644 --- a/tests/metadata.rs +++ b/tests/metadata.rs @@ -60,7 +60,7 @@ async fn metadata_parser_invoked_before_deserialize() { let serializer = CountingSerializer(counter.clone()); let app = mock_wireframe_app_with_serializer(serializer); - let env = Envelope::new(1, vec![42]); + let env = Envelope::new(1, 0, vec![42]); let out = drive_with_bincode(app, env) .await @@ -105,7 +105,7 @@ async fn falls_back_to_deserialize_after_parse_error() { let serializer = FallbackSerializer(parse_calls.clone(), deser_calls.clone()); let app = mock_wireframe_app_with_serializer(serializer); - let env = Envelope::new(1, vec![7]); + let env = Envelope::new(1, 0, vec![7]); let out = drive_with_bincode(app, env) .await diff --git a/tests/middleware.rs b/tests/middleware.rs index a4f56f54..86e0653f 100644 --- a/tests/middleware.rs +++ b/tests/middleware.rs @@ -54,7 +54,7 @@ async fn middleware_modifies_request_and_response() { let mw = ModifyMiddleware; let wrapped = mw.transform(service).await; - let request = ServiceRequest::new(vec![1, 2, 3]); + let request = ServiceRequest::new(vec![1, 2, 3], 0); let response = wrapped.call(request).await.expect("middleware call failed"); assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']); } diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 6f8bdb6a..df33daec 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -64,7 +64,7 @@ async fn middleware_applied_in_reverse_order() { let (mut client, server) = duplex(256); - let env = Envelope::new(1, vec![b'X']); + let env = Envelope::new(1, 7, vec![b'X']); let serializer = BincodeSerializer; let bytes = serializer.serialize(&env).expect("serialization failed"); // Use the default 4-byte big-endian length prefix for framing diff --git a/tests/routes.rs b/tests/routes.rs index 93888cab..0d4957dd 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -21,15 +21,24 @@ use wireframe_testing::{drive_with_bincode, drive_with_frames}; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestEnvelope { id: u32, + correlation_id: u64, msg: Vec, } impl wireframe::app::Packet for TestEnvelope { fn id(&self) -> u32 { self.id } - fn into_parts(self) -> (u32, Vec) { (self.id, self.msg) } + fn correlation_id(&self) -> u64 { self.correlation_id } - fn from_parts(id: u32, msg: Vec) -> Self { Self { id, msg } } + fn into_parts(self) -> (u32, u64, Vec) { (self.id, self.correlation_id, self.msg) } + + fn from_parts(id: u32, correlation_id: u64, msg: Vec) -> Self { + Self { + id, + correlation_id, + msg, + } + } } #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] @@ -57,6 +66,7 @@ async fn handler_receives_message_and_echoes_response() { let msg_bytes = Echo(42).to_bytes().expect("encode failed"); let env = TestEnvelope { id: 1, + correlation_id: 99, msg: msg_bytes, }; @@ -72,6 +82,7 @@ async fn handler_receives_message_and_echoes_response() { let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); + assert_eq!(resp_env.correlation_id, 99); let (echo, _) = Echo::from_bytes(&resp_env.msg).expect("decode echo failed"); assert_eq!(echo, Echo(42)); assert_eq!(called.load(Ordering::SeqCst), 1); @@ -93,6 +104,7 @@ async fn multiple_frames_processed_in_sequence() { let msg_bytes = Echo(id).to_bytes().expect("encode failed"); let env = TestEnvelope { id: 1, + correlation_id: u64::from(id), msg: msg_bytes, }; let env_bytes = BincodeSerializer @@ -127,6 +139,8 @@ async fn multiple_frames_processed_in_sequence() { .deserialize::(&second) .expect("deserialize failed"); let (echo2, _) = Echo::from_bytes(&env2.msg).expect("decode echo failed"); + assert_eq!(env1.correlation_id, 1); + assert_eq!(env2.correlation_id, 2); assert_eq!(echo1, Echo(1)); assert_eq!(echo2, Echo(2)); } diff --git a/tests/steps/correlation_steps.rs b/tests/steps/correlation_steps.rs new file mode 100644 index 00000000..95b41c80 --- /dev/null +++ b/tests/steps/correlation_steps.rs @@ -0,0 +1,16 @@ +//! Steps for `correlation_id` behavioural tests. +use cucumber::{given, then, when}; + +use crate::world::CorrelationWorld; + +#[given(expr = "a correlation id {int}")] +fn given_cid(world: &mut CorrelationWorld, id: u64) { world.set_cid(id); } + +#[when("a stream of frames is processed")] +async fn when_process(world: &mut CorrelationWorld) { world.process().await; } + +#[then(expr = "each emitted frame uses correlation id {int}")] +fn then_verify(world: &mut CorrelationWorld, id: u64) { + assert_eq!(world.cid(), id); + world.verify(); +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index d422baa7..cc3a25ca 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -3,4 +3,5 @@ //! This module exposes all Given-When-Then steps used by the //! behaviour-driven tests under `tests/features`. +mod correlation_steps; mod panic_steps; diff --git a/tests/world.rs b/tests/world.rs index e1814277..cb32dd4a 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -5,9 +5,17 @@ use std::net::SocketAddr; +use async_stream::try_stream; use cucumber::World; use tokio::{net::TcpStream, sync::oneshot}; -use wireframe::{app::WireframeApp, server::WireframeServer}; +use tokio_util::sync::CancellationToken; +use wireframe::{ + app::{Envelope, Packet, WireframeApp}, + connection::ConnectionActor, + push::PushQueues, + response::FrameStream, + server::WireframeServer, +}; #[path = "common/mod.rs"] mod common; @@ -107,3 +115,41 @@ impl PanicWorld { tokio::task::yield_now().await; } } + +#[derive(Debug, Default, World)] +pub struct CorrelationWorld { + cid: u64, + frames: Vec, +} + +impl CorrelationWorld { + pub fn set_cid(&mut self, cid: u64) { self.cid = cid; } + + #[must_use] + pub fn cid(&self) -> u64 { self.cid } + + /// Run the connection actor and collect frames for later verification. + /// + /// # Panics + /// Panics if the actor fails to run successfully. + pub async fn process(&mut self) { + let cid = self.cid; + let stream: FrameStream = Box::pin(try_stream! { + yield Envelope::new(1, cid, vec![1]); + yield Envelope::new(1, cid, vec![2]); + }); + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); + actor.run(&mut self.frames).await.expect("actor run failed"); + } + + /// Verify that all received frames carry the expected correlation id. + /// + /// # Panics + /// Panics if any frame has a `correlation_id` that does not match the + /// expected value. + pub fn verify(&self) { + assert!(self.frames.iter().all(|f| f.correlation_id() == self.cid)); + } +}