Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ WireframeServer::new(|| {
```

This example showcases how derive macros and the framing abstraction simplify a
binary protocol server. See the
<!-- markdownlint-disable-next-line MD013 -->
binary protocol server. See the <!-- markdownlint-disable-next-line MD013 -->
[full example](docs/rust-binary-router-library-design.md#5-6-illustrative-api-usage-examples)
in the design document for further details.

Expand All @@ -99,12 +98,15 @@ payload bytes. Applications can supply their own envelope type by calling
use wireframe::app::{Packet, WireframeApp};

#[derive(bincode::Encode, bincode::BorrowDecode)]
struct MyEnv { id: u32, data: Vec<u8> }
struct MyEnv { id: u32, correlation_id: u64, data: Vec<u8> }

impl Packet for MyEnv {
fn id(&self) -> u32 { self.id }
fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.data) }
fn from_parts(id: u32, data: Vec<u8>) -> Self { Self { id, data } }
fn correlation_id(&self) -> u64 { self.correlation_id }
fn into_parts(self) -> (u32, u64, Vec<u8>) { (self.id, self.correlation_id, self.data) }
fn from_parts(id: u32, correlation_id: u64, data: Vec<u8>) -> Self {
Self { id, correlation_id, data }
}
}

let app = WireframeApp::<_, _, MyEnv>::new()
Expand Down
2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ stream.

- [ ] **Protocol Enhancement:**

- [ ] Add a `correlation_id` field to the `Frame` header. For a request, this
- [x] Add a `correlation_id` field to the `Frame` header. For a request, this
is the unique request ID. For each message in a multi-packet response, this
ID must match the original request's ID.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub enum Response<F = Frame, E = MyProtocolError> {
}
```

Each frame header now carries a 64-bit `correlation_id`. Requests set this
value, and every packet in a multi-part response repeats it, so clients can
match frames to the originating request.

This design is powered by the `async-stream` crate, which allows developers to
write imperative-looking logic that generates a declarative `Stream` object. It
provides the best of both worlds: the intuitive feel of a `for` loop for
Expand Down
2 changes: 1 addition & 1 deletion examples/metadata_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl FrameMetadata for HeaderSerializer {
// `parse` receives the complete frame because `LengthPrefixedProcessor`
// ensures `src` contains exactly one message. Returning `src.len()` is
// therefore correct for this demo.
Ok((Envelope::new(id, payload), src.len()))
Ok((Envelope::new(id, 0, payload), src.len()))
}
}

Expand Down
80 changes: 55 additions & 25 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,12 @@ impl From<io::Error> for SendError {
/// impl Packet for CustomEnvelope {
/// fn id(&self) -> u32 { self.id }
///
/// fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.payload) }
/// fn correlation_id(&self) -> u64 { 0 }
///
/// fn from_parts(id: u32, msg: Vec<u8>) -> Self {
/// fn into_parts(self) -> (u32, u64, Vec<u8>) { (self.id, 0, self.payload) }
///
/// fn from_parts(id: u32, correlation_id: u64, msg: Vec<u8>) -> Self {
/// let _ = correlation_id;
/// Self {
/// id,
/// payload: msg,
Expand All @@ -178,39 +181,60 @@ pub trait Packet: Message + Send + Sync + 'static {
/// Return the message identifier used for routing.
fn id(&self) -> u32;

/// Consume the packet and return its identifier and payload bytes.
fn into_parts(self) -> (u32, Vec<u8>);
/// Return the correlation identifier tying this frame to a request.
fn correlation_id(&self) -> u64;

/// Consume the packet and return its identifier, correlation id and payload bytes.
fn into_parts(self) -> (u32, u64, Vec<u8>);

/// Construct a new packet from an id and raw payload bytes.
fn from_parts(id: u32, msg: Vec<u8>) -> Self;
/// Construct a new packet from id, correlation id and raw payload bytes.
fn from_parts(id: u32, correlation_id: u64, msg: Vec<u8>) -> Self;
}

/// Basic envelope type used by [`handle_connection`].
///
/// Incoming frames are deserialized into an `Envelope` containing the
/// message identifier and raw payload bytes.
#[derive(bincode::Decode, bincode::Encode)]
pub struct Envelope {
#[derive(bincode::Decode, bincode::Encode, Copy, Clone, Debug)]
pub struct PacketHeader {
Comment thread
leynos marked this conversation as resolved.
pub(crate) id: u32,
pub(crate) correlation_id: u64,
}

#[derive(bincode::Decode, bincode::Encode, Debug)]
pub struct Envelope {
pub(crate) header: PacketHeader,
pub(crate) msg: Vec<u8>,
}

impl Envelope {
/// Create a new [`Envelope`] with the provided id and payload.
/// Create a new [`Envelope`] with the provided identifiers and payload.
#[must_use]
pub fn new(id: u32, msg: Vec<u8>) -> Self { Self { id, msg } }
pub fn new(id: u32, correlation_id: u64, msg: Vec<u8>) -> Self {
Self {
header: PacketHeader { id, correlation_id },
msg,
}
}

/// Consume the envelope, returning its id and payload bytes.
/// Consume the envelope, returning its header and payload bytes.
#[must_use]
pub fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.msg) }
pub fn into_parts(self) -> (PacketHeader, Vec<u8>) { (self.header, self.msg) }
}

impl Packet for Envelope {
fn id(&self) -> u32 { self.id }
fn id(&self) -> u32 { self.header.id }

fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.msg) }
fn correlation_id(&self) -> u64 { self.header.correlation_id }

fn from_parts(id: u32, msg: Vec<u8>) -> Self { Self { id, msg } }
fn into_parts(self) -> (u32, u64, Vec<u8>) {
let (header, msg) = Envelope::into_parts(self);
(header.id, header.correlation_id, msg)
}

fn from_parts(id: u32, correlation_id: u64, msg: Vec<u8>) -> Self {
Envelope::new(id, correlation_id, msg)
}
}

/// Number of idle polls before terminating a connection.
Expand Down Expand Up @@ -266,13 +290,21 @@ where
/// #[derive(bincode::Encode, bincode::BorrowDecode)]
/// struct MyEnv {
/// id: u32,
/// correlation_id: u64,
/// data: Vec<u8>,
/// }
///
/// impl Packet for MyEnv {
/// fn id(&self) -> u32 { self.id }
/// fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.data) }
/// fn from_parts(id: u32, data: Vec<u8>) -> Self { Self { id, data } }
/// fn correlation_id(&self) -> u64 { self.correlation_id }
/// fn into_parts(self) -> (u32, u64, Vec<u8>) { (self.id, self.correlation_id, self.data) }
/// fn from_parts(id: u32, correlation_id: u64, data: Vec<u8>) -> Self {
/// Self {
/// id,
/// correlation_id,
/// data,
/// }
/// }
/// }
///
/// let app = WireframeApp::<_, _, MyEnv>::new().expect("failed to create app");
Expand Down Expand Up @@ -693,26 +725,24 @@ where
}
};

if let Some(service) = routes.get(&env.id) {
let request = ServiceRequest::new(env.msg);
if let Some(service) = routes.get(&env.header.id) {
let request = ServiceRequest::new(env.msg, env.header.correlation_id);
match service.call(request).await {
Ok(resp) => {
let response = Envelope {
id: env.id,
msg: resp.into_inner(),
};
let response =
Envelope::new(env.header.id, env.header.correlation_id, resp.into_inner());
if let Err(e) = self.send_response(stream, &response).await {
tracing::warn!(error = %e, "failed to send response");
crate::metrics::inc_handler_errors();
}
}
Err(e) => {
tracing::warn!(id = env.id, error = ?e, "handler error");
tracing::warn!(id = env.header.id, error = ?e, "handler error");
crate::metrics::inc_handler_errors();
}
}
} else {
tracing::warn!("no handler for message id {}", env.id);
tracing::warn!("no handler for message id {}", env.header.id);
crate::metrics::inc_handler_errors();
}

Expand Down
12 changes: 9 additions & 3 deletions src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,27 @@ impl<F> FrameContainer<F> {
#[derive(Debug)]
pub struct ServiceRequest {
inner: FrameContainer<Vec<u8>>,
correlation_id: u64,
}

impl ServiceRequest {
/// Create a new [`ServiceRequest`] from raw frame bytes.
#[must_use]
pub fn new(frame: Vec<u8>) -> Self {
pub fn new(frame: Vec<u8>, correlation_id: u64) -> Self {
Self {
inner: FrameContainer::new(frame),
correlation_id,
}
}

/// Borrow the underlying frame bytes.
#[must_use]
pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() }

/// Return the correlation identifier associated with this request.
#[must_use]
pub fn correlation_id(&self) -> u64 { self.correlation_id }

/// Mutable access to the inner frame bytes.
#[must_use]
pub fn frame_mut(&mut self) -> &mut Vec<u8> { self.inner.frame_mut() }
Expand Down Expand Up @@ -297,9 +303,9 @@ impl<E: Packet> Service for RouteService<E> {
async fn call(&self, req: ServiceRequest) -> Result<ServiceResponse, Self::Error> {
// The handler only borrows the envelope, allowing us to consume it
// afterwards to extract the response payload.
let env = E::from_parts(self.id, req.into_inner());
let env = E::from_parts(self.id, req.correlation_id(), req.into_inner());
(self.handler.as_ref())(&env).await;
let (_, bytes) = env.into_parts();
let (_, _, bytes) = env.into_parts();
Ok(ServiceResponse::new(bytes))
}
}
Expand Down
24 changes: 24 additions & 0 deletions tests/correlation_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! Tests for `correlation_id` propagation in streaming responses.
use async_stream::try_stream;
use tokio_util::sync::CancellationToken;
use wireframe::{
app::{Envelope, Packet},
connection::ConnectionActor,
push::PushQueues,
response::FrameStream,
};

#[tokio::test]
async fn stream_frames_carry_request_correlation_id() {
let cid = 42u64;
let stream: FrameStream<Envelope> = Box::pin(try_stream! {
yield Envelope::new(1, cid, vec![1]);
yield Envelope::new(1, cid, vec![2]);
});
let (queues, handle) = PushQueues::bounded(1, 1);
let shutdown = CancellationToken::new();
let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown);
let mut out = Vec::new();
actor.run(&mut out).await.expect("actor run failed");
assert!(out.iter().all(|e| e.correlation_id() == cid));
}
25 changes: 20 additions & 5 deletions tests/cucumber.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
//! Cucumber test runner for panic resilience integration tests.
//! Cucumber test runner for integration tests.
//!
//! Runs behavioural tests defined in `tests/features/` using the
//! `PanicWorld` test context to verify server panic handling.
//! Orchestrates two distinct test suites:
//! - `PanicWorld`: Tests server resilience during connection panics
//! - `CorrelationWorld`: Tests correlation ID propagation in multi-frame responses
//!
//! # Example
//!
//! The runner executes feature files sequentially:
//! ```text
//! tests/features/connection_panic.feature -> PanicWorld context
//! tests/features/correlation_id.feature -> CorrelationWorld context
//! ```
//!
//! Each context provides specialised step definitions and state management
//! for their respective test scenarios.

mod steps;
mod world;

use cucumber::World;
use world::PanicWorld;
use world::{CorrelationWorld, PanicWorld};

#[tokio::main]
async fn main() { PanicWorld::run("tests/features").await; }
async fn main() {
PanicWorld::run("tests/features/connection_panic.feature").await;
CorrelationWorld::run("tests/features/correlation_id.feature").await;
}
5 changes: 5 additions & 0 deletions tests/features/correlation_id.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Feature: Multi-packet response correlation
Scenario: Streamed frames reuse the request correlation id
Given a correlation id 7
When a stream of frames is processed
Then each emitted frame uses correlation id 7
14 changes: 12 additions & 2 deletions tests/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,24 @@ async fn teardown_without_setup_does_not_run() {
#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)]
struct StateEnvelope {
id: u32,
correlation_id: u64,
msg: Vec<u8>,
}

impl wireframe::app::Packet for StateEnvelope {
fn id(&self) -> u32 { self.id }

fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.msg) }
fn correlation_id(&self) -> u64 { self.correlation_id }

fn from_parts(id: u32, msg: Vec<u8>) -> Self { Self { id, msg } }
fn into_parts(self) -> (u32, u64, Vec<u8>) { (self.id, self.correlation_id, self.msg) }

fn from_parts(id: u32, correlation_id: u64, msg: Vec<u8>) -> Self {
Self {
id,
correlation_id,
msg,
}
}
}

#[tokio::test]
Expand All @@ -125,6 +134,7 @@ async fn helpers_propagate_connection_state() {

let env = StateEnvelope {
id: 1,
correlation_id: 0,
msg: vec![1],
};
let bytes = BincodeSerializer
Expand Down
4 changes: 2 additions & 2 deletions tests/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn metadata_parser_invoked_before_deserialize() {
let serializer = CountingSerializer(counter.clone());
let app = mock_wireframe_app_with_serializer(serializer);

let env = Envelope::new(1, vec![42]);
let env = Envelope::new(1, 0, vec![42]);

let out = drive_with_bincode(app, env)
.await
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn falls_back_to_deserialize_after_parse_error() {
let serializer = FallbackSerializer(parse_calls.clone(), deser_calls.clone());
let app = mock_wireframe_app_with_serializer(serializer);

let env = Envelope::new(1, vec![7]);
let env = Envelope::new(1, 0, vec![7]);

let out = drive_with_bincode(app, env)
.await
Expand Down
2 changes: 1 addition & 1 deletion tests/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn middleware_modifies_request_and_response() {
let mw = ModifyMiddleware;
let wrapped = mw.transform(service).await;

let request = ServiceRequest::new(vec![1, 2, 3]);
let request = ServiceRequest::new(vec![1, 2, 3], 0);
let response = wrapped.call(request).await.expect("middleware call failed");
assert_eq!(response.frame(), &[1, 2, 3, b'!', b'?']);
Comment thread
leynos marked this conversation as resolved.
}
2 changes: 1 addition & 1 deletion tests/middleware_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn middleware_applied_in_reverse_order() {

let (mut client, server) = duplex(256);

let env = Envelope::new(1, vec![b'X']);
let env = Envelope::new(1, 7, vec![b'X']);
let serializer = BincodeSerializer;
let bytes = serializer.serialize(&env).expect("serialization failed");
// Use the default 4-byte big-endian length prefix for framing
Expand Down
Loading
Loading