From a39e0b53ac00ec39a4da7111cee08baa3e0258f0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 11:39:31 +0100 Subject: [PATCH 01/11] Stamp multi-packet frames with correlation ids --- ...i-packet-and-streaming-responses-design.md | 4 + docs/roadmap.md | 8 +- src/app/envelope.rs | 10 ++- src/connection.rs | 78 +++++++++++++++++-- src/correlation.rs | 15 ++++ src/lib.rs | 2 + tests/correlation_id.rs | 30 +++++++ tests/features/correlation_id.feature | 5 ++ tests/steps/correlation_steps.rs | 3 + tests/world.rs | 28 ++++++- 10 files changed, 172 insertions(+), 11 deletions(-) create mode 100644 src/correlation.rs diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 9cbf79f7..00914891 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -275,6 +275,10 @@ not hang. channel and stamps it onto every serialised frame. This preserves protocol invariants without requiring handlers to mutate frames post-creation and mirrors the message attribution strategy outlined in the capability roadmap. + - Implementation stores the expected identifier alongside a closure built + from the new `CorrelatableFrame` trait, ensuring frames can be stamped in a + generic actor without constraining other protocols. Debug builds assert the + stamped frame exposes the expected identifier so regressions fail fast. Debug-mode assertions must guard this stamping by checking `frame.correlation_id == request.correlation_id` before a frame is dispatched. diff --git a/docs/roadmap.md b/docs/roadmap.md index 15ca7d84..7550d8bb 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -207,13 +207,13 @@ stream. - [x] Emit tracing and metrics for each forwarded frame so streaming traffic remains visible to observability pipelines. - - [ ] Each sent frame must carry the correct `correlation_id` from the + - [x] Each sent frame must carry the correct `correlation_id` from the initial request. - - [ ] Capture the originating request's `correlation_id` before handing + - [x] Capture the originating request's `correlation_id` before handing control to the multi-packet dispatcher. - - [ ] Stamp the stored `correlation_id` onto every frame emitted from the + - [x] Stamp the stored `correlation_id` onto every frame emitted from the channel before it is queued for transmission. - - [ ] Guard against accidental omission by asserting in debug builds and + - [x] Guard against accidental omission by asserting in debug builds and covering the behaviour with targeted tests. - [ ] When the channel closes, send the end-of-stream marker frame. diff --git a/src/app/envelope.rs b/src/app/envelope.rs index 61931dda..d229fd58 100644 --- a/src/app/envelope.rs +++ b/src/app/envelope.rs @@ -6,7 +6,7 @@ //! deserialisation. See [`crate::app::builder::WireframeApp`] for how envelopes //! are used when registering routes. -use crate::message::Message; +use crate::{correlation::CorrelatableFrame, message::Message}; /// Envelope-like type used to wrap incoming and outgoing messages. /// @@ -101,6 +101,14 @@ impl Packet for Envelope { fn from_parts(parts: PacketParts) -> Self { parts.into() } } +impl CorrelatableFrame for Envelope { + fn correlation_id(&self) -> Option { self.correlation_id } + + fn set_correlation_id(&mut self, correlation_id: Option) { + self.correlation_id = correlation_id; + } +} + impl PacketParts { /// Construct a new set of packet parts. #[must_use] diff --git a/src/connection.rs b/src/connection.rs index 3026d3e9..3417db79 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,6 +46,7 @@ impl Drop for ActiveConnection { pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ + correlation::CorrelatableFrame, fairness::FairnessTracker, hooks::{ConnectionContext, ProtocolHooks}, push::{FrameLike, PushHandle, PushQueues}, @@ -121,6 +122,8 @@ pub struct ConnectionActor { /// This preserves fairness with queued sources. /// The actor emits the protocol terminator when the sender closes the channel. multi_packet: Option>, + multi_packet_stamper: Option>, + multi_packet_correlation: Option, shutdown: CancellationToken, counter: Option, hooks: ProtocolHooks, @@ -144,6 +147,9 @@ enum QueueKind { Multi, } +/// Closure type used to stamp correlation identifiers onto frames. +type MultiPacketStamper = Box; + impl ConnectionActor where F: FrameLike, @@ -198,6 +204,8 @@ where low_rx: Some(queues.low_priority_rx), response, multi_packet: None, + multi_packet_stamper: None, + multi_packet_correlation: None, shutdown, counter: Some(counter), hooks, @@ -239,7 +247,62 @@ where "response stream is active" ), ); + self.install_multi_packet(channel, None, None); + } + + /// Set or replace the current multi-packet response channel and stamp correlation identifiers. + pub fn set_multi_packet_with_correlation( + &mut self, + channel: Option>, + correlation_id: Option, + ) where + F: CorrelatableFrame, + { + debug_assert!( + self.response.is_none(), + concat!( + "ConnectionActor invariant violated: cannot set multi_packet while a ", + "response stream is active" + ), + ); + let expected = correlation_id; + let stamper: Box = Box::new(move |frame: &mut F| { + frame.set_correlation_id(expected); + debug_assert_eq!( + frame.correlation_id(), + expected, + "multi-packet frame correlation mismatch: expected={:?}, got={:?}", + expected, + frame.correlation_id() + ); + }); + self.install_multi_packet(channel, Some(stamper), correlation_id); + } + + fn install_multi_packet( + &mut self, + channel: Option>, + stamper: Option>, + correlation_id: Option, + ) { self.multi_packet = channel; + self.multi_packet_stamper = stamper; + self.multi_packet_correlation = correlation_id; + } + + fn apply_multi_packet_correlation(&mut self, frame: &mut F) { + if let Some(stamper) = &mut self.multi_packet_stamper { + debug_assert!( + self.multi_packet_correlation.is_some(), + "multi_packet correlation state missing despite stamper" + ); + stamper(frame); + } else { + debug_assert!( + self.multi_packet_correlation.is_none(), + "multi_packet correlation id retained without stamper" + ); + } } /// Replace the low-priority queue used for tests. @@ -309,15 +372,10 @@ where biased; () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, - res = Self::poll_queue(self.high_rx.as_mut()), if high_available => Event::High(res), - res = Self::poll_queue(self.low_rx.as_mut()), if low_available => Event::Low(res), - res = Self::poll_queue(self.multi_packet.as_mut()), if multi_available => Event::MultiPacket(res), - res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), - else => Event::Idle, } } @@ -372,6 +430,10 @@ where let DrainContext { out, state } = ctx; match res { Some(frame) => { + let mut frame = frame; + if matches!(kind, QueueKind::Multi) { + self.apply_multi_packet_correlation(&mut frame); + } self.process_frame_with_hooks_and_metrics(frame, out); match kind { QueueKind::High => self.after_high(out, state), @@ -431,6 +493,8 @@ where self.process_frame_with_hooks_and_metrics(frame, out); self.after_low(); } + self.multi_packet_stamper = None; + self.multi_packet_correlation = None; self.hooks.on_command_end(&mut self.ctx); } @@ -483,6 +547,8 @@ where if let Some(mut rx) = self.multi_packet.take() { rx.close(); state.mark_closed(); + self.multi_packet_stamper = None; + self.multi_packet_correlation = None; } if self.response.take().is_some() { state.mark_closed(); @@ -558,6 +624,8 @@ where match rx.try_recv() { Ok(frame) => { + let mut frame = frame; + self.apply_multi_packet_correlation(&mut frame); self.process_frame_with_hooks_and_metrics(frame, out); self.after_low(); self.multi_packet = Some(rx); diff --git a/src/correlation.rs b/src/correlation.rs new file mode 100644 index 00000000..84831378 --- /dev/null +++ b/src/correlation.rs @@ -0,0 +1,15 @@ +//! Traits for working with correlation identifiers on frames. +//! +//! `CorrelatableFrame` abstracts over frame types that carry an optional +//! correlation identifier, allowing generic components such as the connection +//! actor to stamp or inspect identifiers without knowing the concrete frame +//! representation. + +/// Access and mutate correlation identifiers on frames. +pub trait CorrelatableFrame { + /// Return the correlation identifier associated with this frame, if any. + fn correlation_id(&self) -> Option; + + /// Set or clear the correlation identifier. + fn set_correlation_id(&mut self, correlation_id: Option); +} diff --git a/src/lib.rs b/src/lib.rs index 3845c5e0..44cca9ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ pub use app::error::Result; pub mod serializer; pub use serializer::{BincodeSerializer, Serializer}; pub mod connection; +pub mod correlation; pub mod extractor; mod fairness; pub mod frame; @@ -28,6 +29,7 @@ pub mod server; pub mod session; pub use connection::ConnectionActor; +pub use correlation::CorrelatableFrame; pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}; pub use metrics::{CONNECTIONS_ACTIVE, Direction, ERRORS_TOTAL, FRAMES_PROCESSED}; pub use response::{FrameStream, Response, WireframeError}; diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index 586d8434..2ef49177 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -1,6 +1,7 @@ #![cfg(not(loom))] //! Tests for `correlation_id` propagation in streaming responses. use async_stream::try_stream; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use wireframe::{ app::{Envelope, Packet}, @@ -28,3 +29,32 @@ async fn stream_frames_carry_request_correlation_id() { actor.run(&mut out).await.expect("actor run failed"); assert!(out.iter().all(|e| e.correlation_id() == Some(cid))); } + +#[tokio::test] +async fn multi_packet_frames_carry_request_correlation_id() { + let cid = 7u64; + let (tx, rx) = mpsc::channel(4); + tx.send(Envelope::new(1, None, vec![1])) + .await + .expect("send frame"); + tx.send(Envelope::new(1, Some(99), vec![2])) + .await + .expect("send frame"); + drop(tx); + + let (queues, handle) = PushQueues::::builder() + .high_capacity(2) + .low_capacity(2) + .unlimited() + .build() + .expect("failed to build PushQueues"); + let shutdown = CancellationToken::new(); + let mut actor: ConnectionActor = + ConnectionActor::new(queues, handle, None, shutdown); + actor.set_multi_packet_with_correlation(Some(rx), Some(cid)); + + let mut out = Vec::new(); + actor.run(&mut out).await.expect("actor run failed"); + + assert!(out.iter().all(|frame| frame.correlation_id() == Some(cid))); +} diff --git a/tests/features/correlation_id.feature b/tests/features/correlation_id.feature index bdb56b19..4dc6b60e 100644 --- a/tests/features/correlation_id.feature +++ b/tests/features/correlation_id.feature @@ -3,3 +3,8 @@ Feature: Multi-packet response correlation Given a correlation id 7 When a stream of frames is processed Then each emitted frame uses correlation id 7 + + Scenario: Multi-packet responses reuse the request correlation id + Given a correlation id 11 + When a multi-packet channel emits frames + Then each emitted frame uses correlation id 11 diff --git a/tests/steps/correlation_steps.rs b/tests/steps/correlation_steps.rs index 95b41c80..6a30d09f 100644 --- a/tests/steps/correlation_steps.rs +++ b/tests/steps/correlation_steps.rs @@ -9,6 +9,9 @@ fn given_cid(world: &mut CorrelationWorld, id: u64) { world.set_cid(id); } #[when("a stream of frames is processed")] async fn when_process(world: &mut CorrelationWorld) { world.process().await; } +#[when("a multi-packet channel emits frames")] +async fn when_process_multi(world: &mut CorrelationWorld) { world.process_multi().await; } + #[then(expr = "each emitted frame uses correlation id {int}")] fn then_verify(world: &mut CorrelationWorld, id: u64) { assert_eq!(world.cid(), id); diff --git a/tests/world.rs b/tests/world.rs index 8520dd84..01774c6a 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -9,7 +9,10 @@ use std::{net::SocketAddr, sync::Arc}; use async_stream::try_stream; use cucumber::World; -use tokio::{net::TcpStream, sync::oneshot}; +use tokio::{ + net::TcpStream, + sync::{mpsc, oneshot}, +}; use tokio_util::sync::CancellationToken; use wireframe::{ app::{Envelope, Packet}, @@ -163,6 +166,29 @@ impl CorrelationWorld { actor.run(&mut self.frames).await.expect("actor run failed"); } + /// Run the connection actor for a multi-packet channel and collect frames. + /// + /// # Panics + /// Panics if sending to the channel or running the actor fails. + pub async fn process_multi(&mut self) { + let cid = self.cid; + let (tx, rx) = mpsc::channel(4); + tx.send(Envelope::new(1, None, vec![1])) + .await + .expect("send frame"); + tx.send(Envelope::new(1, Some(99), vec![2])) + .await + .expect("send frame"); + drop(tx); + + let (queues, handle) = build_small_queues::(); + let shutdown = CancellationToken::new(); + let mut actor: ConnectionActor = + ConnectionActor::new(queues, handle, None, shutdown); + actor.set_multi_packet_with_correlation(Some(rx), Some(cid)); + actor.run(&mut self.frames).await.expect("actor run failed"); + } + /// Verify that all received frames carry the expected correlation ID. /// /// # Panics From 372af8db7b8681f1240ce73e234bc92abfb2aefe Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 12:06:50 +0100 Subject: [PATCH 02/11] Fix correlation design bullet --- docs/multi-packet-and-streaming-responses-design.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 00914891..5dbc8b5c 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -275,10 +275,10 @@ not hang. channel and stamps it onto every serialised frame. This preserves protocol invariants without requiring handlers to mutate frames post-creation and mirrors the message attribution strategy outlined in the capability roadmap. - - Implementation stores the expected identifier alongside a closure built - from the new `CorrelatableFrame` trait, ensuring frames can be stamped in a - generic actor without constraining other protocols. Debug builds assert the - stamped frame exposes the expected identifier so regressions fail fast. +- Implementation stores the expected identifier alongside a closure built from + the new `CorrelatableFrame` trait, ensuring frames can be stamped in a + generic actor without constraining other protocols. Debug builds assert the + stamped frame exposes the expected identifier so regressions fail fast. Debug-mode assertions must guard this stamping by checking `frame.correlation_id == request.correlation_id` before a frame is dispatched. From 6f34527679e2d11eb288dbefa3202d21ae487127 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 13:58:49 +0100 Subject: [PATCH 03/11] Simplify multi-packet correlation stamping * drop the boxed stamper in favour of storing the correlation id and stamping frames directly\n* centralise the multi-packet teardown logic via clear_multi_packet and ensure shutdown clears correlation state\n* implement CorrelatableFrame for u8 and Vec so ConnectionActor can require the trait --- src/connection.rs | 63 +++++++++++++++++++++------------------------- src/correlation.rs | 12 +++++++++ 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 3417db79..8dc704c1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -122,7 +122,6 @@ pub struct ConnectionActor { /// This preserves fairness with queued sources. /// The actor emits the protocol terminator when the sender closes the channel. multi_packet: Option>, - multi_packet_stamper: Option>, multi_packet_correlation: Option, shutdown: CancellationToken, counter: Option, @@ -147,12 +146,10 @@ enum QueueKind { Multi, } -/// Closure type used to stamp correlation identifiers onto frames. -type MultiPacketStamper = Box; impl ConnectionActor where - F: FrameLike, + F: FrameLike + CorrelatableFrame, E: std::fmt::Debug, { /// Create a new `ConnectionActor` from the provided components. @@ -204,7 +201,6 @@ where low_rx: Some(queues.low_priority_rx), response, multi_packet: None, - multi_packet_stamper: None, multi_packet_correlation: None, shutdown, counter: Some(counter), @@ -247,7 +243,7 @@ where "response stream is active" ), ); - self.install_multi_packet(channel, None, None); + self.install_multi_packet(channel, None); } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. @@ -255,9 +251,7 @@ where &mut self, channel: Option>, correlation_id: Option, - ) where - F: CorrelatableFrame, - { + ) { debug_assert!( self.response.is_none(), concat!( @@ -265,42 +259,43 @@ where "response stream is active" ), ); - let expected = correlation_id; - let stamper: Box = Box::new(move |frame: &mut F| { - frame.set_correlation_id(expected); - debug_assert_eq!( - frame.correlation_id(), - expected, - "multi-packet frame correlation mismatch: expected={:?}, got={:?}", - expected, - frame.correlation_id() - ); - }); - self.install_multi_packet(channel, Some(stamper), correlation_id); + self.install_multi_packet(channel, correlation_id); } fn install_multi_packet( &mut self, channel: Option>, - stamper: Option>, correlation_id: Option, ) { self.multi_packet = channel; - self.multi_packet_stamper = stamper; - self.multi_packet_correlation = correlation_id; + self.multi_packet_correlation = if self.multi_packet.is_some() { + correlation_id + } else { + None + }; + } + + fn clear_multi_packet(&mut self) { + self.multi_packet = None; + self.multi_packet_correlation = None; } fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - if let Some(stamper) = &mut self.multi_packet_stamper { - debug_assert!( - self.multi_packet_correlation.is_some(), - "multi_packet correlation state missing despite stamper" + if let Some(expected) = self.multi_packet_correlation { + frame.set_correlation_id(Some(expected)); + debug_assert_eq!( + frame.correlation_id(), + Some(expected), + "multi-packet frame correlation mismatch: expected={:?}, got={:?}", + Some(expected), + frame.correlation_id(), ); - stamper(frame); } else { + frame.set_correlation_id(None); debug_assert!( - self.multi_packet_correlation.is_none(), - "multi_packet correlation id retained without stamper" + frame.correlation_id().is_none(), + "multi-packet frame correlation unexpectedly present: got={:?}", + frame.correlation_id(), ); } } @@ -493,8 +488,7 @@ where self.process_frame_with_hooks_and_metrics(frame, out); self.after_low(); } - self.multi_packet_stamper = None; - self.multi_packet_correlation = None; + self.clear_multi_packet(); self.hooks.on_command_end(&mut self.ctx); } @@ -547,8 +541,7 @@ where if let Some(mut rx) = self.multi_packet.take() { rx.close(); state.mark_closed(); - self.multi_packet_stamper = None; - self.multi_packet_correlation = None; + self.clear_multi_packet(); } if self.response.take().is_some() { state.mark_closed(); diff --git a/src/correlation.rs b/src/correlation.rs index 84831378..b38e0f3b 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -13,3 +13,15 @@ pub trait CorrelatableFrame { /// Set or clear the correlation identifier. fn set_correlation_id(&mut self, correlation_id: Option); } + +impl CorrelatableFrame for u8 { + fn correlation_id(&self) -> Option { None } + + fn set_correlation_id(&mut self, _correlation_id: Option) {} +} + +impl CorrelatableFrame for Vec { + fn correlation_id(&self) -> Option { None } + + fn set_correlation_id(&mut self, _correlation_id: Option) {} +} From 85a80d14aaeed9d7850797b19e36362c36c403d5 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 13:58:54 +0100 Subject: [PATCH 04/11] Add correlation stamping tests - cover multi-packet stamping with rstest cases including absent correlations - extend cucumber scenarios and world state to support missing correlation ids - add CorrelatableFrame unit tests for Envelope and no-op types --- src/correlation.rs | 36 ++++++++++++++++++++ tests/correlation_id.rs | 48 +++++++++++++++++++-------- tests/features/correlation_id.feature | 5 +++ tests/steps/correlation_steps.rs | 13 ++++++-- tests/world.rs | 31 +++++++++-------- 5 files changed, 105 insertions(+), 28 deletions(-) diff --git a/src/correlation.rs b/src/correlation.rs index b38e0f3b..479adb97 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -25,3 +25,39 @@ impl CorrelatableFrame for Vec { fn set_correlation_id(&mut self, _correlation_id: Option) {} } + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + use crate::app::Envelope; + + #[rstest] + #[case(None)] + #[case(Some(27))] + fn envelope_correlation_round_trip(#[case] initial: Option) { + let mut frame = Envelope::new(7, initial, vec![1, 2, 3]); + assert_eq!(frame.correlation_id(), initial); + + frame.set_correlation_id(Some(99)); + assert_eq!(frame.correlation_id(), Some(99)); + + frame.set_correlation_id(None); + assert_eq!(frame.correlation_id(), None); + } + + #[rstest] + #[case::byte(0u8)] + #[case::buffer(Vec::::new())] + fn noop_implementations_ignore_correlation(#[case] mut frame: T) + where + T: CorrelatableFrame, + { + assert_eq!(frame.correlation_id(), None); + frame.set_correlation_id(Some(42)); + assert_eq!(frame.correlation_id(), None); + frame.set_correlation_id(None); + assert_eq!(frame.correlation_id(), None); + } +} diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index 2ef49177..420fbc05 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -1,10 +1,12 @@ #![cfg(not(loom))] //! Tests for `correlation_id` propagation in streaming responses. use async_stream::try_stream; +use rstest::rstest; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use wireframe::{ - app::{Envelope, Packet}, + CorrelatableFrame, + app::Envelope, connection::ConnectionActor, push::PushQueues, response::FrameStream, @@ -30,16 +32,19 @@ async fn stream_frames_carry_request_correlation_id() { assert!(out.iter().all(|e| e.correlation_id() == Some(cid))); } -#[tokio::test] -async fn multi_packet_frames_carry_request_correlation_id() { - let cid = 7u64; - let (tx, rx) = mpsc::channel(4); - tx.send(Envelope::new(1, None, vec![1])) - .await - .expect("send frame"); - tx.send(Envelope::new(1, Some(99), vec![2])) - .await - .expect("send frame"); +async fn run_multi_packet_channel( + request_correlation: Option, + frame_correlations: &[Option], +) -> Vec { + let capacity = frame_correlations.len().max(1); + let (tx, rx) = mpsc::channel(capacity); + for (idx, correlation) in frame_correlations.iter().enumerate() { + let marker = u8::try_from(idx).expect("frame index fits in u8") + 1; + let payload = vec![marker]; + tx.send(Envelope::new(1, *correlation, payload)) + .await + .expect("send frame"); + } drop(tx); let (queues, handle) = PushQueues::::builder() @@ -51,10 +56,27 @@ async fn multi_packet_frames_carry_request_correlation_id() { let shutdown = CancellationToken::new(); let mut actor: ConnectionActor = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet_with_correlation(Some(rx), Some(cid)); + actor.set_multi_packet_with_correlation(Some(rx), request_correlation); let mut out = Vec::new(); actor.run(&mut out).await.expect("actor run failed"); + out +} - assert!(out.iter().all(|frame| frame.correlation_id() == Some(cid))); +#[rstest] +#[case::stamps_request(Some(7), vec![None, Some(99)], vec![Some(7), Some(7)])] +#[case::clears_when_absent(None, vec![None, Some(13)], vec![None, None])] +#[case::preserves_matching(Some(17), vec![Some(17), Some(17)], vec![Some(17), Some(17)])] +#[tokio::test] +async fn multi_packet_frames_apply_expected_correlation( + #[case] request: Option, + #[case] initial: Vec>, + #[case] expected: Vec>, +) { + let frames = run_multi_packet_channel(request, &initial).await; + let correlations: Vec> = frames + .iter() + .map(CorrelatableFrame::correlation_id) + .collect(); + assert_eq!(correlations, expected); } diff --git a/tests/features/correlation_id.feature b/tests/features/correlation_id.feature index 4dc6b60e..d14a2938 100644 --- a/tests/features/correlation_id.feature +++ b/tests/features/correlation_id.feature @@ -8,3 +8,8 @@ Feature: Multi-packet response correlation Given a correlation id 11 When a multi-packet channel emits frames Then each emitted frame uses correlation id 11 + + Scenario: Multi-packet responses clear correlation ids without a request id + Given no correlation id + When a multi-packet channel emits frames + Then each emitted frame has no correlation id diff --git a/tests/steps/correlation_steps.rs b/tests/steps/correlation_steps.rs index 6a30d09f..bd58da0a 100644 --- a/tests/steps/correlation_steps.rs +++ b/tests/steps/correlation_steps.rs @@ -4,7 +4,10 @@ use cucumber::{given, then, when}; use crate::world::CorrelationWorld; #[given(expr = "a correlation id {int}")] -fn given_cid(world: &mut CorrelationWorld, id: u64) { world.set_cid(id); } +fn given_cid(world: &mut CorrelationWorld, id: u64) { world.set_expected(Some(id)); } + +#[given("no correlation id")] +fn given_no_correlation(world: &mut CorrelationWorld) { world.set_expected(None); } #[when("a stream of frames is processed")] async fn when_process(world: &mut CorrelationWorld) { world.process().await; } @@ -14,6 +17,12 @@ async fn when_process_multi(world: &mut CorrelationWorld) { world.process_multi( #[then(expr = "each emitted frame uses correlation id {int}")] fn then_verify(world: &mut CorrelationWorld, id: u64) { - assert_eq!(world.cid(), id); + assert_eq!(world.expected(), Some(id)); + world.verify(); +} + +#[then("each emitted frame has no correlation id")] +fn then_verify_absent(world: &mut CorrelationWorld) { + assert_eq!(world.expected(), None); world.verify(); } diff --git a/tests/world.rs b/tests/world.rs index 01774c6a..dd5c2c5b 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -140,22 +140,24 @@ impl PanicWorld { #[derive(Debug, Default, World)] pub struct CorrelationWorld { - cid: u64, + expected: Option, frames: Vec, } impl CorrelationWorld { - pub fn set_cid(&mut self, cid: u64) { self.cid = cid; } + pub fn set_expected(&mut self, expected: Option) { self.expected = expected; } #[must_use] - pub fn cid(&self) -> u64 { self.cid } + pub fn expected(&self) -> Option { self.expected } /// Run the connection actor and collect frames for later verification. /// /// # Panics /// Panics if the actor fails to run successfully. pub async fn process(&mut self) { - let cid = self.cid; + let cid = self + .expected + .expect("streaming scenario requires a correlation id"); let stream: FrameStream = Box::pin(try_stream! { yield Envelope::new(1, Some(cid), vec![1]); yield Envelope::new(1, Some(cid), vec![2]); @@ -171,7 +173,7 @@ impl CorrelationWorld { /// # Panics /// Panics if sending to the channel or running the actor fails. pub async fn process_multi(&mut self) { - let cid = self.cid; + let expected = self.expected; let (tx, rx) = mpsc::channel(4); tx.send(Envelope::new(1, None, vec![1])) .await @@ -185,20 +187,23 @@ impl CorrelationWorld { let shutdown = CancellationToken::new(); let mut actor: ConnectionActor = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet_with_correlation(Some(rx), Some(cid)); + actor.set_multi_packet_with_correlation(Some(rx), expected); actor.run(&mut self.frames).await.expect("actor run failed"); } - /// Verify that all received frames carry the expected correlation ID. + /// Verify that all received frames respect the configured correlation expectation. /// /// # Panics - /// Panics if any frame has a `correlation_id` that does not match `self.cid`. + /// Panics if any frame violates the stored correlation expectation. pub fn verify(&self) { - assert!( - self.frames - .iter() - .all(|f| f.correlation_id() == Some(self.cid)) - ); + match self.expected { + Some(cid) => { + assert!(self.frames.iter().all(|f| f.correlation_id() == Some(cid))); + } + None => { + assert!(self.frames.iter().all(|f| f.correlation_id().is_none())); + } + } } } From 21b98831ee2745d103f186fee57f54f2d835d6ac Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 13:58:58 +0100 Subject: [PATCH 05/11] Guard multi-packet stamping state Track whether multi-packet correlation stamping is active by storing an outer option, only applying stamps when configured, and documenting the Clippy allowances required by the review guidance. --- src/connection.rs | 69 ++++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 8dc704c1..8499d8e5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -122,7 +122,9 @@ pub struct ConnectionActor { /// This preserves fairness with queued sources. /// The actor emits the protocol terminator when the sender closes the channel. multi_packet: Option>, - multi_packet_correlation: Option, + #[allow(clippy::option_option)] + // Required to distinguish stamping activation from the stored identifier. + multi_packet_correlation: Option>, shutdown: CancellationToken, counter: Option, hooks: ProtocolHooks, @@ -146,7 +148,6 @@ enum QueueKind { Multi, } - impl ConnectionActor where F: FrameLike + CorrelatableFrame, @@ -243,7 +244,8 @@ where "response stream is active" ), ); - self.install_multi_packet(channel, None); + let correlation = channel.as_ref().map(|_| None); + self.install_multi_packet(channel, correlation); } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. @@ -259,20 +261,23 @@ where "response stream is active" ), ); - self.install_multi_packet(channel, correlation_id); + let correlation = channel.as_ref().map(|_| correlation_id); + self.install_multi_packet(channel, correlation); } + #[allow(clippy::option_option)] // Parameter retains explicit stamping activation state. fn install_multi_packet( &mut self, channel: Option>, - correlation_id: Option, + correlation_id: Option>, ) { + debug_assert_eq!( + channel.is_some(), + correlation_id.is_some(), + "multi-packet correlation must be provided when the channel is active", + ); self.multi_packet = channel; - self.multi_packet_correlation = if self.multi_packet.is_some() { - correlation_id - } else { - None - }; + self.multi_packet_correlation = correlation_id; } fn clear_multi_packet(&mut self) { @@ -281,22 +286,32 @@ where } fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - if let Some(expected) = self.multi_packet_correlation { - frame.set_correlation_id(Some(expected)); - debug_assert_eq!( - frame.correlation_id(), - Some(expected), - "multi-packet frame correlation mismatch: expected={:?}, got={:?}", - Some(expected), - frame.correlation_id(), - ); - } else { - frame.set_correlation_id(None); - debug_assert!( - frame.correlation_id().is_none(), - "multi-packet frame correlation unexpectedly present: got={:?}", - frame.correlation_id(), - ); + match self.multi_packet_correlation { + Some(Some(expected)) => { + frame.set_correlation_id(Some(expected)); + debug_assert_eq!( + frame.correlation_id(), + Some(expected), + "multi-packet frame correlation mismatch: expected={:?}, got={:?}", + Some(expected), + frame.correlation_id(), + ); + } + Some(None) => { + frame.set_correlation_id(None); + debug_assert!( + frame.correlation_id().is_none(), + "multi-packet frame correlation unexpectedly present: got={:?}", + frame.correlation_id(), + ); + } + None => { + debug_assert!( + false, + "multi-packet correlation invoked without configuration", + ); + frame.set_correlation_id(None); + } } } @@ -426,7 +441,7 @@ where match res { Some(frame) => { let mut frame = frame; - if matches!(kind, QueueKind::Multi) { + if matches!(kind, QueueKind::Multi) && self.multi_packet_correlation.is_some() { self.apply_multi_packet_correlation(&mut frame); } self.process_frame_with_hooks_and_metrics(frame, out); From 4171794a596ee5fdf0081bec3ebbeac7b0dc2409 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 14:02:58 +0100 Subject: [PATCH 06/11] Stamp multi-packet terminators with correlation Apply the configured correlation id to hook-provided stream end frames before they are emitted so multi-packet pipelines keep the request context on terminators. Extend the async tests to cover stamped and cleared terminators by wiring a custom hook through the helper. --- src/connection.rs | 5 ++++- tests/correlation_id.rs | 27 +++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 8499d8e5..2a298d8a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -499,7 +499,10 @@ where receiver.close(); } state.mark_closed(); - if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { + if let Some(mut frame) = self.hooks.stream_end_frame(&mut self.ctx) { + if self.multi_packet_correlation.is_some() { + self.apply_multi_packet_correlation(&mut frame); + } self.process_frame_with_hooks_and_metrics(frame, out); self.after_low(); } diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index 420fbc05..ecfa8976 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -8,6 +8,7 @@ use wireframe::{ CorrelatableFrame, app::Envelope, connection::ConnectionActor, + hooks::{ConnectionContext, ProtocolHooks}, push::PushQueues, response::FrameStream, }; @@ -35,6 +36,7 @@ async fn stream_frames_carry_request_correlation_id() { async fn run_multi_packet_channel( request_correlation: Option, frame_correlations: &[Option], + hooks: ProtocolHooks, ) -> Vec { let capacity = frame_correlations.len().max(1); let (tx, rx) = mpsc::channel(capacity); @@ -55,7 +57,7 @@ async fn run_multi_packet_channel( .expect("failed to build PushQueues"); let shutdown = CancellationToken::new(); let mut actor: ConnectionActor = - ConnectionActor::new(queues, handle, None, shutdown); + ConnectionActor::with_hooks(queues, handle, None, shutdown, hooks); actor.set_multi_packet_with_correlation(Some(rx), request_correlation); let mut out = Vec::new(); @@ -73,10 +75,31 @@ async fn multi_packet_frames_apply_expected_correlation( #[case] initial: Vec>, #[case] expected: Vec>, ) { - let frames = run_multi_packet_channel(request, &initial).await; + let frames = run_multi_packet_channel(request, &initial, ProtocolHooks::default()).await; let correlations: Vec> = frames .iter() .map(CorrelatableFrame::correlation_id) .collect(); assert_eq!(correlations, expected); } + +#[rstest] +#[case::terminator_stamped(Some(11), Some(11))] +#[case::terminator_cleared(None, None)] +#[tokio::test] +async fn multi_packet_terminator_applies_correlation( + #[case] request: Option, + #[case] expected: Option, +) { + let hooks = ProtocolHooks { + stream_end: Some(Box::new(|_ctx: &mut ConnectionContext| { + Some(Envelope::new(255, None, vec![])) + })), + ..ProtocolHooks::default() + }; + + let frames = run_multi_packet_channel(request, &[], hooks).await; + assert_eq!(frames.len(), 1, "terminator frame missing"); + let terminator = frames.last().expect("terminator frame missing"); + assert_eq!(terminator.correlation_id(), expected); +} From 909089baad543d84fe66723cd64a01fa54e03142 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 27 Sep 2025 21:33:06 +0100 Subject: [PATCH 07/11] Refactor multi-packet context handling --- src/connection.rs | 110 +++++++++++++++++++-------------- src/connection/test_support.rs | 2 +- 2 files changed, 65 insertions(+), 47 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 2a298d8a..a94f24e5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -121,10 +121,7 @@ pub struct ConnectionActor { /// Optional multi-packet channel drained after low-priority frames. /// This preserves fairness with queued sources. /// The actor emits the protocol terminator when the sender closes the channel. - multi_packet: Option>, - #[allow(clippy::option_option)] - // Required to distinguish stamping activation from the stored identifier. - multi_packet_correlation: Option>, + multi_packet: MultiPacketContext, shutdown: CancellationToken, counter: Option, hooks: ProtocolHooks, @@ -140,6 +137,48 @@ struct DrainContext<'a, F> { state: &'a mut ActorState, } +/// Multi-packet channel state tracking the active receiver and stamping config. +struct MultiPacketContext { + channel: Option>, + #[allow(clippy::option_option)] + /// Active stamping configuration. `None` disables stamping entirely. + correlation: Option>, +} + +impl MultiPacketContext { + const fn new() -> Self { + Self { + channel: None, + correlation: None, + } + } + + #[allow(clippy::option_option)] + fn install(&mut self, channel: Option>, correlation: Option>) { + debug_assert_eq!( + channel.is_some(), + correlation.is_some(), + "multi-packet correlation must be provided when the channel is active", + ); + self.channel = channel; + self.correlation = correlation; + } + + fn clear(&mut self) { + self.channel = None; + self.correlation = None; + } + + fn channel_mut(&mut self) -> Option<&mut mpsc::Receiver> { self.channel.as_mut() } + + fn take_channel(&mut self) -> Option> { self.channel.take() } + + #[allow(clippy::option_option)] + fn correlation(&self) -> Option> { self.correlation } + + fn is_active(&self) -> bool { self.channel.is_some() } +} + /// Queue variants processed by the connection actor. #[derive(Clone, Copy)] enum QueueKind { @@ -201,8 +240,7 @@ where high_rx: Some(queues.high_priority_rx), low_rx: Some(queues.low_priority_rx), response, - multi_packet: None, - multi_packet_correlation: None, + multi_packet: MultiPacketContext::new(), shutdown, counter: Some(counter), hooks, @@ -226,7 +264,7 @@ where /// Set or replace the current streaming response. pub fn set_response(&mut self, stream: Option>) { debug_assert!( - self.multi_packet.is_none(), + !self.multi_packet.is_active(), concat!( "ConnectionActor invariant violated: cannot set response while a ", "multi_packet channel is active" @@ -245,7 +283,7 @@ where ), ); let correlation = channel.as_ref().map(|_| None); - self.install_multi_packet(channel, correlation); + self.multi_packet.install(channel, correlation); } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. @@ -262,31 +300,13 @@ where ), ); let correlation = channel.as_ref().map(|_| correlation_id); - self.install_multi_packet(channel, correlation); - } - - #[allow(clippy::option_option)] // Parameter retains explicit stamping activation state. - fn install_multi_packet( - &mut self, - channel: Option>, - correlation_id: Option>, - ) { - debug_assert_eq!( - channel.is_some(), - correlation_id.is_some(), - "multi-packet correlation must be provided when the channel is active", - ); - self.multi_packet = channel; - self.multi_packet_correlation = correlation_id; + self.multi_packet.install(channel, correlation); } - fn clear_multi_packet(&mut self) { - self.multi_packet = None; - self.multi_packet_correlation = None; - } + fn clear_multi_packet(&mut self) { self.multi_packet.clear(); } fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - match self.multi_packet_correlation { + match self.multi_packet.correlation() { Some(Some(expected)) => { frame.set_correlation_id(Some(expected)); debug_assert_eq!( @@ -344,11 +364,11 @@ where } debug_assert!( - usize::from(self.response.is_some()) + usize::from(self.multi_packet.is_some()) <= 1, + usize::from(self.response.is_some()) + usize::from(self.multi_packet.is_active()) <= 1, "ConnectionActor invariant violated: at most one of response or multi_packet may be \ active" ); - let mut state = ActorState::new(self.response.is_some(), self.multi_packet.is_some()); + let mut state = ActorState::new(self.response.is_some(), self.multi_packet.is_active()); while !state.is_done() { self.poll_sources(&mut state, out).await?; @@ -375,7 +395,7 @@ where async fn next_event(&mut self, state: &ActorState) -> Event { let high_available = self.high_rx.is_some(); let low_available = self.low_rx.is_some(); - let multi_available = self.multi_packet.is_some() && !state.is_shutting_down(); + let multi_available = self.multi_packet.is_active() && !state.is_shutting_down(); let resp_available = self.response.is_some() && !state.is_shutting_down(); tokio::select! { @@ -384,7 +404,7 @@ where () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, res = Self::poll_queue(self.high_rx.as_mut()), if high_available => Event::High(res), res = Self::poll_queue(self.low_rx.as_mut()), if low_available => Event::Low(res), - res = Self::poll_queue(self.multi_packet.as_mut()), if multi_available => Event::MultiPacket(res), + res = Self::poll_queue(self.multi_packet.channel_mut()), if multi_available => Event::MultiPacket(res), res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), else => Event::Idle, } @@ -441,7 +461,7 @@ where match res { Some(frame) => { let mut frame = frame; - if matches!(kind, QueueKind::Multi) && self.multi_packet_correlation.is_some() { + if matches!(kind, QueueKind::Multi) && self.multi_packet.correlation().is_some() { self.apply_multi_packet_correlation(&mut frame); } self.process_frame_with_hooks_and_metrics(frame, out); @@ -478,7 +498,7 @@ where /// Handle a closed multi-packet channel by emitting the protocol terminator and notifying /// hooks. fn handle_multi_packet_closed(&mut self, state: &mut ActorState, out: &mut Vec) { - let rx = self.multi_packet.take(); + let rx = self.multi_packet.take_channel(); self.handle_multi_packet_closed_with(rx, state, out); } @@ -500,7 +520,7 @@ where } state.mark_closed(); if let Some(mut frame) = self.hooks.stream_end_frame(&mut self.ctx) { - if self.multi_packet_correlation.is_some() { + if self.multi_packet.correlation().is_some() { self.apply_multi_packet_correlation(&mut frame); } self.process_frame_with_hooks_and_metrics(frame, out); @@ -556,7 +576,7 @@ where if let Some(rx) = &mut self.low_rx { rx.close(); } - if let Some(mut rx) = self.multi_packet.take() { + if let Some(mut rx) = self.multi_packet.take_channel() { rx.close(); state.mark_closed(); self.clear_multi_packet(); @@ -629,25 +649,23 @@ where } } QueueKind::Multi => { - let Some(mut rx) = self.multi_packet.take() else { - return false; + let result = match self.multi_packet.channel_mut() { + Some(rx) => rx.try_recv(), + None => return false, }; - match rx.try_recv() { + match result { Ok(frame) => { let mut frame = frame; self.apply_multi_packet_correlation(&mut frame); self.process_frame_with_hooks_and_metrics(frame, out); self.after_low(); - self.multi_packet = Some(rx); true } - Err(TryRecvError::Empty) => { - self.multi_packet = Some(rx); - false - } + Err(TryRecvError::Empty) => false, Err(TryRecvError::Disconnected) => { - self.handle_multi_packet_closed_with(Some(rx), state, out); + let rx = self.multi_packet.take_channel(); + self.handle_multi_packet_closed_with(rx, state, out); false } } diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index 8ac198a7..d0485314 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -96,7 +96,7 @@ impl ActorHarness { /// Returns `true` when the multi-packet queue is still available. #[must_use] - pub fn has_multi_queue(&self) -> bool { self.actor.multi_packet.is_some() } + pub fn has_multi_queue(&self) -> bool { self.actor.multi_packet.is_active() } /// Process a multi-packet poll result. pub fn process_multi_packet(&mut self, res: Option) { From 5dfa3730fe7ec8874dd9429c4a2969615fc81799 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 28 Sep 2025 13:54:49 +0100 Subject: [PATCH 08/11] Document multi-packet correlation expectations --- src/connection.rs | 15 ++++++++++++--- tests/correlation_id.rs | 4 ++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index a94f24e5..2d10081c 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -140,7 +140,10 @@ struct DrainContext<'a, F> { /// Multi-packet channel state tracking the active receiver and stamping config. struct MultiPacketContext { channel: Option>, - #[allow(clippy::option_option)] + #[expect( + clippy::option_option, + reason = "Nested Option used to distinguish stamping activation from stored identifier" + )] /// Active stamping configuration. `None` disables stamping entirely. correlation: Option>, } @@ -153,7 +156,10 @@ impl MultiPacketContext { } } - #[allow(clippy::option_option)] + #[expect( + clippy::option_option, + reason = "Nested Option used to distinguish stamping activation from stored identifier" + )] fn install(&mut self, channel: Option>, correlation: Option>) { debug_assert_eq!( channel.is_some(), @@ -173,7 +179,10 @@ impl MultiPacketContext { fn take_channel(&mut self) -> Option> { self.channel.take() } - #[allow(clippy::option_option)] + #[expect( + clippy::option_option, + reason = "Nested Option used to distinguish stamping activation from stored identifier" + )] fn correlation(&self) -> Option> { self.correlation } fn is_active(&self) -> bool { self.channel.is_some() } diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index ecfa8976..b79c86b6 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -41,8 +41,8 @@ async fn run_multi_packet_channel( let capacity = frame_correlations.len().max(1); let (tx, rx) = mpsc::channel(capacity); for (idx, correlation) in frame_correlations.iter().enumerate() { - let marker = u8::try_from(idx).expect("frame index fits in u8") + 1; - let payload = vec![marker]; + let marker = u64::try_from(idx + 1).expect("frame index fits in u64"); + let payload = marker.to_le_bytes().to_vec(); tx.send(Envelope::new(1, *correlation, payload)) .await .expect("send frame"); From 2b83537fa93ffb3053a2ea6115099a66f60b6841 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 28 Sep 2025 14:52:40 +0100 Subject: [PATCH 09/11] Compute correlation test marker safely Use a widening cast before serialising the marker bytes so the multi-packet correlation tests cannot panic on large indices. --- tests/correlation_id.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index b79c86b6..63b39313 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -41,7 +41,7 @@ async fn run_multi_packet_channel( let capacity = frame_correlations.len().max(1); let (tx, rx) = mpsc::channel(capacity); for (idx, correlation) in frame_correlations.iter().enumerate() { - let marker = u64::try_from(idx + 1).expect("frame index fits in u64"); + let marker = (idx + 1) as u64; let payload = marker.to_le_bytes().to_vec(); tx.send(Envelope::new(1, *correlation, payload)) .await From f2f115c363093890b2c0e319270e894fedad961b Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 28 Sep 2025 15:42:03 +0100 Subject: [PATCH 10/11] Simplify multi-packet stamping --- src/connection.rs | 112 +++++++++++++++++---------------- src/connection/test_support.rs | 43 +++++++++++++ 2 files changed, 100 insertions(+), 55 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 2d10081c..5808f326 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -137,53 +137,51 @@ struct DrainContext<'a, F> { state: &'a mut ActorState, } +/// Multi-packet correlation stamping state. +/// +/// Tracks the active receiver and how frames should be stamped before emission. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum MultiPacketStamp { + /// Stamping is disabled because no multi-packet channel is active. + Disabled, + /// Stamping is enabled and frames are stamped with the provided identifier. + Enabled(Option), +} + /// Multi-packet channel state tracking the active receiver and stamping config. struct MultiPacketContext { channel: Option>, - #[expect( - clippy::option_option, - reason = "Nested Option used to distinguish stamping activation from stored identifier" - )] - /// Active stamping configuration. `None` disables stamping entirely. - correlation: Option>, + stamp: MultiPacketStamp, } impl MultiPacketContext { const fn new() -> Self { Self { channel: None, - correlation: None, + stamp: MultiPacketStamp::Disabled, } } - #[expect( - clippy::option_option, - reason = "Nested Option used to distinguish stamping activation from stored identifier" - )] - fn install(&mut self, channel: Option>, correlation: Option>) { + fn install(&mut self, channel: Option>, stamp: MultiPacketStamp) { debug_assert_eq!( channel.is_some(), - correlation.is_some(), + matches!(stamp, MultiPacketStamp::Enabled(_)), "multi-packet correlation must be provided when the channel is active", ); self.channel = channel; - self.correlation = correlation; + self.stamp = stamp; } fn clear(&mut self) { self.channel = None; - self.correlation = None; + self.stamp = MultiPacketStamp::Disabled; } fn channel_mut(&mut self) -> Option<&mut mpsc::Receiver> { self.channel.as_mut() } fn take_channel(&mut self) -> Option> { self.channel.take() } - #[expect( - clippy::option_option, - reason = "Nested Option used to distinguish stamping activation from stored identifier" - )] - fn correlation(&self) -> Option> { self.correlation } + fn stamp(&self) -> MultiPacketStamp { self.stamp } fn is_active(&self) -> bool { self.channel.is_some() } } @@ -291,8 +289,12 @@ where "response stream is active" ), ); - let correlation = channel.as_ref().map(|_| None); - self.multi_packet.install(channel, correlation); + let stamp = if channel.is_some() { + MultiPacketStamp::Enabled(None) + } else { + MultiPacketStamp::Disabled + }; + self.multi_packet.install(channel, stamp); } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. @@ -308,15 +310,19 @@ where "response stream is active" ), ); - let correlation = channel.as_ref().map(|_| correlation_id); - self.multi_packet.install(channel, correlation); + let stamp = if channel.is_some() { + MultiPacketStamp::Enabled(correlation_id) + } else { + MultiPacketStamp::Disabled + }; + self.multi_packet.install(channel, stamp); } fn clear_multi_packet(&mut self) { self.multi_packet.clear(); } fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - match self.multi_packet.correlation() { - Some(Some(expected)) => { + match self.multi_packet.stamp() { + MultiPacketStamp::Enabled(Some(expected)) => { frame.set_correlation_id(Some(expected)); debug_assert_eq!( frame.correlation_id(), @@ -326,7 +332,7 @@ where frame.correlation_id(), ); } - Some(None) => { + MultiPacketStamp::Enabled(None) => { frame.set_correlation_id(None); debug_assert!( frame.correlation_id().is_none(), @@ -334,12 +340,8 @@ where frame.correlation_id(), ); } - None => { - debug_assert!( - false, - "multi-packet correlation invoked without configuration", - ); - frame.set_correlation_id(None); + MultiPacketStamp::Disabled => { + unreachable!("multi-packet correlation invoked without configuration"); } } } @@ -469,11 +471,16 @@ where let DrainContext { out, state } = ctx; match res { Some(frame) => { - let mut frame = frame; - if matches!(kind, QueueKind::Multi) && self.multi_packet.correlation().is_some() { - self.apply_multi_packet_correlation(&mut frame); + match kind { + QueueKind::Multi + if matches!(self.multi_packet.stamp(), MultiPacketStamp::Enabled(_)) => + { + self.emit_multi_packet_frame(frame, out); + } + _ => { + self.process_frame_with_hooks_and_metrics(frame, out); + } } - self.process_frame_with_hooks_and_metrics(frame, out); match kind { QueueKind::High => self.after_high(out, state), QueueKind::Low | QueueKind::Multi => self.after_low(), @@ -504,6 +511,12 @@ where self.process_queue(QueueKind::Multi, res, DrainContext { out, state }); } + fn emit_multi_packet_frame(&mut self, frame: F, out: &mut Vec) { + let mut frame = frame; + self.apply_multi_packet_correlation(&mut frame); + self.process_frame_with_hooks_and_metrics(frame, out); + } + /// Handle a closed multi-packet channel by emitting the protocol terminator and notifying /// hooks. fn handle_multi_packet_closed(&mut self, state: &mut ActorState, out: &mut Vec) { @@ -528,11 +541,8 @@ where receiver.close(); } state.mark_closed(); - if let Some(mut frame) = self.hooks.stream_end_frame(&mut self.ctx) { - if self.multi_packet.correlation().is_some() { - self.apply_multi_packet_correlation(&mut frame); - } - self.process_frame_with_hooks_and_metrics(frame, out); + if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { + self.emit_multi_packet_frame(frame, out); self.after_low(); } self.clear_multi_packet(); @@ -628,16 +638,10 @@ where fn try_opportunistic_drain(&mut self, kind: QueueKind, ctx: DrainContext<'_, F>) -> bool { let DrainContext { out, state } = ctx; match kind { - QueueKind::High => { - debug_assert!( - false, - concat!( - "try_opportunistic_drain(High) is unsupported; ", - "High is handled by biased polling", - ), - ); - false - } + QueueKind::High => unreachable!(concat!( + "try_opportunistic_drain(High) is unsupported; ", + "High is handled by biased polling", + )), QueueKind::Low => { let res = match self.low_rx.as_mut() { Some(receiver) => receiver.try_recv(), @@ -665,9 +669,7 @@ where match result { Ok(frame) => { - let mut frame = frame; - self.apply_multi_packet_correlation(&mut frame); - self.process_frame_with_hooks_and_metrics(frame, out); + self.emit_multi_packet_frame(frame, out); self.after_low(); true } diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index d0485314..26f1f754 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -180,3 +180,46 @@ impl ActorStateHarness { pub async fn poll_queue_next(rx: Option<&mut mpsc::Receiver>) -> Option { ConnectionActor::::poll_queue(rx).await } + +#[cfg(test)] +mod tests { + use tokio::sync::mpsc; + + use super::*; + + #[test] + fn has_multi_queue_false_by_default() { + let harness = ActorHarness::new().expect("build ActorHarness"); + assert!( + !harness.has_multi_queue(), + "multi-packet queue should start inactive" + ); + } + + #[test] + fn has_multi_queue_true_after_install() { + let mut harness = ActorHarness::new().expect("build ActorHarness"); + let (_tx, rx) = mpsc::channel(1); + harness.set_multi_queue(Some(rx)); + assert!( + harness.has_multi_queue(), + "multi-packet queue should be active after install" + ); + } + + #[test] + fn has_multi_queue_false_after_clear() { + let mut harness = ActorHarness::new().expect("build ActorHarness"); + let (_tx, rx) = mpsc::channel(1); + harness.set_multi_queue(Some(rx)); + assert!( + harness.has_multi_queue(), + "multi-packet queue should be active after install" + ); + harness.set_multi_queue(None); + assert!( + !harness.has_multi_queue(), + "multi-packet queue should be inactive after clear" + ); + } +} From c41c4035c3aac7df128f568d8d37c3e6d9a944d5 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 28 Sep 2025 16:44:52 +0100 Subject: [PATCH 11/11] Clarify multi-packet context invariants --- src/connection.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/connection.rs b/src/connection.rs index 5808f326..438843f4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -166,7 +166,8 @@ impl MultiPacketContext { debug_assert_eq!( channel.is_some(), matches!(stamp, MultiPacketStamp::Enabled(_)), - "multi-packet correlation must be provided when the channel is active", + "channel presence must match stamp: channel is Some iff stamp is \ + MultiPacketStamp::Enabled(...)", ); self.channel = channel; self.stamp = stamp; @@ -298,6 +299,23 @@ where } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. + /// + /// # Examples + /// + /// ```no_run + /// # use tokio::sync::mpsc; + /// # use tokio_util::sync::CancellationToken; + /// # use wireframe::{ConnectionActor, push::PushQueues}; + /// # let (queues, handle) = PushQueues::::builder() + /// # .high_capacity(1) + /// # .low_capacity(1) + /// # .build() + /// # .expect("failed to build PushQueues"); + /// # let shutdown = CancellationToken::new(); + /// # let mut actor = ConnectionActor::new(queues, handle, None, shutdown); + /// # let (_tx, rx) = mpsc::channel(4); + /// actor.set_multi_packet_with_correlation(Some(rx), Some(7)); + /// ``` pub fn set_multi_packet_with_correlation( &mut self, channel: Option>,