From 10398f8dfbbff7291c5a53716f053f9b85217e75 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 02:15:34 +0100 Subject: [PATCH 1/8] Add connection actor with biased select loop Implement ConnectionActor using tokio::select! with biased polling.Actor reads high and low priority queues as well as an optional\nresponse stream, terminating on shutdown or channel closure.\nIncluded tests cover shutdown handling and prioritisation of sources. --- Cargo.lock | 14 +++ Cargo.toml | 1 + ...eframe-1-0-detailed-development-roadmap.md | 2 + src/connection.rs | 97 +++++++++++++++++++ src/lib.rs | 2 + tests/connection_actor.rs | 63 ++++++++++++ 6 files changed, 179 insertions(+) create mode 100644 src/connection.rs create mode 100644 tests/connection_actor.rs diff --git a/Cargo.lock b/Cargo.lock index 3cd970da..a1ebc914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-util" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.18" @@ -564,6 +577,7 @@ dependencies = [ "rstest", "serde", "tokio", + "tokio-util", "wireframe_testing", ] diff --git a/Cargo.toml b/Cargo.toml index fec8fb8d..b714a344 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" serde = { version = "1", features = ["derive"] } bincode = "2" tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time", "io-util"] } +tokio-util = "0.7" futures = "0.3" async-trait = "0.1" bytes = "1" diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index 711e200d..0044035d 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -26,6 +26,7 @@ all public-facing features will be built.* | 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - | | 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 | | 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | + ## Phase 2: Public APIs & Developer Ergonomics *Focus: Exposing the new functionality to developers through a clean, ergonomic, @@ -38,6 +39,7 @@ and intuitive.* | 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and write documentation recommending `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 | | 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 | | 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 | + ## Phase 3: Production Hardening & Resilience *Focus: Adding the critical features required for robust, secure, and reliable diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 00000000..8f595803 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,97 @@ +//! Connection actor responsible for outbound frames. +//! +//! The actor polls a shutdown token, high- and low-priority push queues, +//! and an optional response stream using a `tokio::select!` loop. The +//! `biased` keyword ensures high-priority messages are processed before +//! low-priority ones, with streamed responses handled last. + +use futures::StreamExt; +use tokio_util::sync::CancellationToken; + +use crate::{ + push::{FrameLike, PushQueues}, + response::{FrameStream, WireframeError}, +}; + +/// Actor driving outbound frame delivery for a connection. +pub struct ConnectionActor { + pub queues: PushQueues, + pub response: Option>, // current streaming response + pub shutdown: CancellationToken, +} + +impl ConnectionActor +where + F: FrameLike, +{ + /// Create a new `ConnectionActor` from the provided components. + #[must_use] + pub fn new( + queues: PushQueues, + response: Option>, + shutdown: CancellationToken, + ) -> Self { + Self { + queues, + response, + shutdown, + } + } + + /// Drive the actor until all sources are exhausted or shutdown is triggered. + /// + /// Frames are appended to `out` in the order they are processed. + /// + /// # Errors + /// + /// Returns a [`WireframeError`] if the response stream yields an error. + pub async fn run(&mut self, out: &mut Vec) -> Result<(), WireframeError> { + let mut high_closed = false; + let mut low_closed = false; + let mut resp_closed = self.response.is_none(); + + loop { + tokio::select! { + biased; + + () = self.shutdown.cancelled() => break, + + res = self.queues.high_priority_rx.recv(), if !high_closed => { + match res { + Some(frame) => out.push(frame), + None => high_closed = true, + } + } + + res = self.queues.low_priority_rx.recv(), if !low_closed => { + match res { + Some(frame) => out.push(frame), + None => low_closed = true, + } + } + + res = async { + if resp_closed { + None + } else if let Some(stream) = &mut self.response { + stream.next().await + } else { + None + } + } => { + match res { + Some(Ok(frame)) => out.push(frame), + Some(Err(e)) => return Err(e), + None => resp_closed = true, + } + } + } + + if high_closed && low_closed && resp_closed { + break; + } + } + + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index bf6e0525..19524ed7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod app; pub mod serializer; pub use serializer::{BincodeSerializer, Serializer}; +pub mod connection; pub mod extractor; pub mod frame; pub mod message; @@ -17,4 +18,5 @@ pub mod response; pub mod rewind_stream; pub mod server; +pub use connection::ConnectionActor; pub use response::{FrameStream, Response, WireframeError}; diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs new file mode 100644 index 00000000..1a4b047b --- /dev/null +++ b/tests/connection_actor.rs @@ -0,0 +1,63 @@ +use futures::stream; +use rstest::{fixture, rstest}; +use tokio_util::sync::CancellationToken; +use wireframe::{connection::ConnectionActor, push::PushQueues}; + +#[fixture] +#[allow(unused_braces)] +fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::bounded(8, 8) } + +#[fixture] +#[allow(unused_braces)] +fn shutdown_token() -> CancellationToken { CancellationToken::new() } + +#[rstest] +#[tokio::test] +async fn high_priority_before_low_and_stream( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, handle) = queues; + handle.push_low_priority(2).await.unwrap(); + handle.push_high_priority(1).await.unwrap(); + drop(handle); + + let stream = stream::iter(vec![Ok(3u8)]); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![1, 2, 3]); +} + +#[rstest] +#[tokio::test] +async fn shutdown_terminates_actor( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, _handle) = queues; + shutdown_token.cancel(); + let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert!(out.is_empty()); +} + +#[rstest] +#[tokio::test] +async fn actor_processes_until_sources_close( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, handle) = queues; + handle.push_high_priority(1).await.unwrap(); + drop(handle); + + let stream = stream::iter(vec![Ok(2u8), Ok(3u8)]); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![1, 2, 3]); +} From cd61a430511919fda16a0c6643789ffe9bb9a3bb Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 09:38:39 +0100 Subject: [PATCH 2/8] Expand connection actor tests --- tests/connection_actor.rs | 86 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 4 deletions(-) diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 1a4b047b..bab82b26 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -1,7 +1,12 @@ use futures::stream; use rstest::{fixture, rstest}; +use tokio::time::{Duration, sleep, timeout}; use tokio_util::sync::CancellationToken; -use wireframe::{connection::ConnectionActor, push::PushQueues}; +use wireframe::{ + connection::ConnectionActor, + push::PushQueues, + response::{FrameStream, WireframeError}, +}; #[fixture] #[allow(unused_braces)] @@ -11,9 +16,13 @@ fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::b #[allow(unused_braces)] fn shutdown_token() -> CancellationToken { CancellationToken::new() } +#[fixture] +#[allow(unused_braces)] +fn empty_stream() -> Option> { None } + #[rstest] #[tokio::test] -async fn high_priority_before_low_and_stream( +async fn strict_priority_order( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, ) { @@ -32,7 +41,7 @@ async fn high_priority_before_low_and_stream( #[rstest] #[tokio::test] -async fn shutdown_terminates_actor( +async fn shutdown_signal_precedence( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, ) { @@ -46,7 +55,7 @@ async fn shutdown_terminates_actor( #[rstest] #[tokio::test] -async fn actor_processes_until_sources_close( +async fn complete_draining_of_sources( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, ) { @@ -61,3 +70,72 @@ async fn actor_processes_until_sources_close( actor.run(&mut out).await.unwrap(); assert_eq!(out, vec![1, 2, 3]); } + +#[derive(Debug)] +enum TestError { + Kaboom, +} + +#[rstest] +#[tokio::test] +async fn error_propagation_from_stream( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, _handle) = queues; + let stream = stream::iter(vec![ + Ok(1u8), + Ok(2u8), + Err(WireframeError::Protocol(TestError::Kaboom)), + ]); + let mut actor: ConnectionActor<_, TestError> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + let result = actor.run(&mut out).await; + assert!(matches!( + result, + Err(WireframeError::Protocol(TestError::Kaboom)) + )); + assert_eq!(out, vec![1, 2]); +} + +#[rstest] +#[tokio::test] +async fn interleaved_shutdown_during_stream( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, _handle) = queues; + let token = shutdown_token.clone(); + tokio::spawn(async move { + sleep(Duration::from_millis(50)).await; + token.cancel(); + }); + + let stream = stream::unfold(1u8, |i| async move { + if i <= 5 { + sleep(Duration::from_millis(20)).await; + Some((Ok(i), i + 1)) + } else { + None + } + }); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert!(!out.is_empty() && out.len() < 5); +} + +#[rstest] +#[tokio::test] +async fn push_queue_exhaustion_backpressure() { + let (queues, handle) = PushQueues::bounded(1, 1); + handle.push_high_priority(1).await.unwrap(); + + let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; + assert!(blocked.is_err()); + + // clean up to avoid background tasks holding the queue + drop(queues.high_priority_rx); +} From 8e0dd0a7f5a24a1e6e7a016e42c50ad8c0c224d9 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 10:55:39 +0100 Subject: [PATCH 3/8] Refine graceful shutdown in ConnectionActor --- src/connection.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 8f595803..a7974160 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,15 +46,32 @@ where /// /// Returns a [`WireframeError`] if the response stream yields an error. pub async fn run(&mut self, out: &mut Vec) -> Result<(), WireframeError> { + // If cancellation has already been requested, exit immediately without + // draining any sources. This mirrors a hard shutdown and is required by + // tests. + if self.shutdown.is_cancelled() { + return Ok(()); + } + let mut high_closed = false; let mut low_closed = false; let mut resp_closed = self.response.is_none(); + let mut shutting_down = false; loop { tokio::select! { biased; - () = self.shutdown.cancelled() => break, + () = self.shutdown.cancelled(), if !shutting_down => { + shutting_down = true; + // Close the queues so producers receive an error and we can + // drain queued frames without waiting for new ones. + self.queues.high_priority_rx.close(); + self.queues.low_priority_rx.close(); + // Drop any streaming response so shutdown is prompt. + self.response = None; + resp_closed = true; + } res = self.queues.high_priority_rx.recv(), if !high_closed => { match res { @@ -78,7 +95,7 @@ where } else { None } - } => { + }, if !shutting_down => { match res { Some(Ok(frame)) => out.push(frame), Some(Err(e)) => return Err(e), @@ -87,7 +104,7 @@ where } } - if high_closed && low_closed && resp_closed { + if high_closed && low_closed && (resp_closed || shutting_down) { break; } } From 5a96ae463efd0f30e8a69c83b6a9b5b96cf9f085 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 16:17:08 +0100 Subject: [PATCH 4/8] Refactor ConnectionActor and update docs --- ...asynchronous-outbound-messaging-roadmap.md | 2 +- ...eframe-1-0-detailed-development-roadmap.md | 8 +-- src/connection.rs | 62 +++++++++++-------- 3 files changed, 42 insertions(+), 30 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index 2842fbc2..6cb4ece0 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -10,7 +10,7 @@ design documents. channels. See [Design §3.1][design-queues]. - [x] **Unified `Response` and `WireframeError` types** to capture protocol errors and transport failures ([Roadmap #1.1][roadmap-1-1]). -- [ ] **Connection actor** with a biased `select!` loop that polls for shutdown, +- [x] **Connection actor** with a biased `select!` loop that polls for shutdown, high/low queues and response streams as described in [Design §3.2][design-write-loop]. - [ ] **Internal protocol hooks** `before_send` and `on_command_end` invoked diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index 0044035d..05e788bd 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -9,10 +9,10 @@ breaking the project down into four distinct phases, each with a set of well-defined tasks. The dependencies between tasks are explicitly noted to ensure a logical and stable development progression. -As of this roadmap's publication, only the push queue utilities exist in -`src/push.rs`. No connection actor or write loop has been implemented. Phase 1 -therefore begins by introducing the connection actor with its biased `select!` -loop and integrating the push queues. +At the time of writing, the push queue utilities and the connection actor with +its biased `select!` write loop are implemented. The first phase therefore +focuses on integrating the remaining foundational pieces, preparing the API for +public consumption. ## Phase 1: Foundational Mechanics diff --git a/src/connection.rs b/src/connection.rs index a7974160..7a738a53 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -64,51 +64,63 @@ where () = self.shutdown.cancelled(), if !shutting_down => { shutting_down = true; - // Close the queues so producers receive an error and we can - // drain queued frames without waiting for new ones. - self.queues.high_priority_rx.close(); - self.queues.low_priority_rx.close(); - // Drop any streaming response so shutdown is prompt. - self.response = None; - resp_closed = true; + self.start_shutdown(&mut resp_closed); } res = self.queues.high_priority_rx.recv(), if !high_closed => { - match res { - Some(frame) => out.push(frame), - None => high_closed = true, - } + Self::handle_push(res, &mut high_closed, out); } res = self.queues.low_priority_rx.recv(), if !low_closed => { - match res { - Some(frame) => out.push(frame), - None => low_closed = true, - } + Self::handle_push(res, &mut low_closed, out); } res = async { - if resp_closed { - None - } else if let Some(stream) = &mut self.response { + if let Some(stream) = &mut self.response { stream.next().await } else { None } - }, if !shutting_down => { - match res { - Some(Ok(frame)) => out.push(frame), - Some(Err(e)) => return Err(e), - None => resp_closed = true, - } + }, if !shutting_down && !resp_closed => { + Self::handle_response(res, &mut resp_closed, out)?; } } - if high_closed && low_closed && (resp_closed || shutting_down) { + let push_drained = high_closed && low_closed; + let done = push_drained && (resp_closed || shutting_down); + if done { break; } } Ok(()) } + + fn start_shutdown(&mut self, resp_closed: &mut bool) { + self.queues.high_priority_rx.close(); + self.queues.low_priority_rx.close(); + self.response = None; + *resp_closed = true; + } + + fn handle_push(res: Option, closed: &mut bool, out: &mut Vec) { + match res { + Some(frame) => out.push(frame), + None => *closed = true, + } + } + + fn handle_response( + res: Option>>, + closed: &mut bool, + out: &mut Vec, + ) -> Result<(), WireframeError> { + match res { + Some(Ok(frame)) => out.push(frame), + Some(Err(e)) => return Err(e), + None => *closed = true, + } + + Ok(()) + } } From 30a7b43b465e9fba4d51d75368c60669fac0cd9c Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 19:20:15 +0100 Subject: [PATCH 5/8] Clarify shutdown semantics --- src/connection.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 7a738a53..6385c9a5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,9 +46,10 @@ where /// /// Returns a [`WireframeError`] if the response stream yields an error. pub async fn run(&mut self, out: &mut Vec) -> Result<(), WireframeError> { - // If cancellation has already been requested, exit immediately without - // draining any sources. This mirrors a hard shutdown and is required by - // tests. + // If cancellation has already been requested, exit immediately. + // Nothing will be drained, queued frames are lost and any streaming + // response is abandoned. This mirrors a hard shutdown and is required + // for the tests. if self.shutdown.is_cancelled() { return Ok(()); } @@ -86,9 +87,7 @@ where } } - let push_drained = high_closed && low_closed; - let done = push_drained && (resp_closed || shutting_down); - if done { + if Self::is_done(high_closed, low_closed, resp_closed, shutting_down) { break; } } @@ -99,10 +98,23 @@ where fn start_shutdown(&mut self, resp_closed: &mut bool) { self.queues.high_priority_rx.close(); self.queues.low_priority_rx.close(); + // Drop any streaming response so shutdown is prompt. Queued frames are + // still drained, but streamed responses may be truncated. self.response = None; *resp_closed = true; } + #[allow(clippy::fn_params_excessive_bools)] + fn is_done( + high_closed: bool, + low_closed: bool, + resp_closed: bool, + shutting_down: bool, + ) -> bool { + let push_drained = high_closed && low_closed; + push_drained && (resp_closed || shutting_down) + } + fn handle_push(res: Option, closed: &mut bool, out: &mut Vec) { match res { Some(frame) => out.push(frame), From 50b5062d5b9c5e8daacbc54de9325de094cc285d Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 19:54:20 +0100 Subject: [PATCH 6/8] Refactor connection actor run loop Extract tokio::select! logic into a helper and consolidate state into a dedicated struct. This lowers cyclomatic complexity and clarifies shutdown behaviour. --- src/connection.rs | 106 +++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 43 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 6385c9a5..79caaeaf 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,49 +46,51 @@ where /// /// Returns a [`WireframeError`] if the response stream yields an error. pub async fn run(&mut self, out: &mut Vec) -> Result<(), WireframeError> { - // If cancellation has already been requested, exit immediately. - // Nothing will be drained, queued frames are lost and any streaming - // response is abandoned. This mirrors a hard shutdown and is required - // for the tests. + // If cancellation has already been requested, exit immediately. Nothing + // will be drained and any streaming response is abandoned. This mirrors + // a hard shutdown and is required for the tests. if self.shutdown.is_cancelled() { return Ok(()); } - let mut high_closed = false; - let mut low_closed = false; - let mut resp_closed = self.response.is_none(); - let mut shutting_down = false; + let mut state = ActorState::new(self.response.is_none()); - loop { - tokio::select! { - biased; + while !state.is_done() { + self.poll_sources(&mut state, out).await?; + } - () = self.shutdown.cancelled(), if !shutting_down => { - shutting_down = true; - self.start_shutdown(&mut resp_closed); - } + Ok(()) + } - res = self.queues.high_priority_rx.recv(), if !high_closed => { - Self::handle_push(res, &mut high_closed, out); - } + async fn poll_sources( + &mut self, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + tokio::select! { + biased; - res = self.queues.low_priority_rx.recv(), if !low_closed => { - Self::handle_push(res, &mut low_closed, out); - } + () = self.shutdown.cancelled(), if !state.shutting_down => { + state.shutting_down = true; + self.start_shutdown(&mut state.resp_closed); + } - res = async { - if let Some(stream) = &mut self.response { - stream.next().await - } else { - None - } - }, if !shutting_down && !resp_closed => { - Self::handle_response(res, &mut resp_closed, out)?; - } + res = self.queues.high_priority_rx.recv(), if !state.push.high => { + Self::handle_push(res, &mut state.push.high, out); } - if Self::is_done(high_closed, low_closed, resp_closed, shutting_down) { - break; + res = self.queues.low_priority_rx.recv(), if !state.push.low => { + Self::handle_push(res, &mut state.push.low, out); + } + + res = async { + if let Some(stream) = &mut self.response { + stream.next().await + } else { + None + } + }, if !state.shutting_down && !state.resp_closed => { + Self::handle_response(res, &mut state.resp_closed, out)?; } } @@ -104,17 +106,6 @@ where *resp_closed = true; } - #[allow(clippy::fn_params_excessive_bools)] - fn is_done( - high_closed: bool, - low_closed: bool, - resp_closed: bool, - shutting_down: bool, - ) -> bool { - let push_drained = high_closed && low_closed; - push_drained && (resp_closed || shutting_down) - } - fn handle_push(res: Option, closed: &mut bool, out: &mut Vec) { match res { Some(frame) => out.push(frame), @@ -136,3 +127,32 @@ where Ok(()) } } + +struct PushClosed { + high: bool, + low: bool, +} + +struct ActorState { + push: PushClosed, + resp_closed: bool, + shutting_down: bool, +} + +impl ActorState { + fn new(resp_closed: bool) -> Self { + Self { + push: PushClosed { + high: false, + low: false, + }, + resp_closed, + shutting_down: false, + } + } + + fn is_done(&self) -> bool { + let push_drained = self.push.high && self.push.low; + push_drained && (self.resp_closed || self.shutting_down) + } +} From 1aca14d0f1758c785f92161729edd5189ac61097 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 20:33:55 +0100 Subject: [PATCH 7/8] Encapsulate queue cleanup and actor state --- src/connection.rs | 6 +++--- src/push.rs | 13 +++++++++++-- tests/connection_actor.rs | 11 ++++++++--- tests/push.rs | 4 ++-- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 79caaeaf..ef10e784 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -15,9 +15,9 @@ use crate::{ /// Actor driving outbound frame delivery for a connection. pub struct ConnectionActor { - pub queues: PushQueues, - pub response: Option>, // current streaming response - pub shutdown: CancellationToken, + queues: PushQueues, + response: Option>, // current streaming response + shutdown: CancellationToken, } impl ConnectionActor diff --git a/src/push.rs b/src/push.rs index e9f33866..70ba9f08 100644 --- a/src/push.rs +++ b/src/push.rs @@ -120,8 +120,8 @@ impl PushHandle { /// Receiver ends of the push queues stored by the connection actor. pub struct PushQueues { - pub high_priority_rx: mpsc::Receiver, - pub low_priority_rx: mpsc::Receiver, + pub(crate) high_priority_rx: mpsc::Receiver, + pub(crate) low_priority_rx: mpsc::Receiver, } impl PushQueues { @@ -154,4 +154,13 @@ impl PushQueues { res = self.low_priority_rx.recv() => res.map(|f| (PushPriority::Low, f)), } } + + /// Close both receivers to prevent further pushes from being accepted. + /// + /// This is primarily used in tests to release resources when no actor is + /// draining the queues. + pub fn close(&mut self) { + self.high_priority_rx.close(); + self.low_priority_rx.close(); + } } diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index bab82b26..fbb53607 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -1,3 +1,8 @@ +//! Tests for the `ConnectionActor` component. +//! +//! These cover priority order, shutdown behaviour, error propagation, +//! interleaved cancellation and back-pressure handling. + use futures::stream; use rstest::{fixture, rstest}; use tokio::time::{Duration, sleep, timeout}; @@ -130,12 +135,12 @@ async fn interleaved_shutdown_during_stream( #[rstest] #[tokio::test] async fn push_queue_exhaustion_backpressure() { - let (queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::bounded(1, 1); handle.push_high_priority(1).await.unwrap(); let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; assert!(blocked.is_err()); - // clean up to avoid background tasks holding the queue - drop(queues.high_priority_rx); + // clean up without exposing internal fields + queues.close(); } diff --git a/tests/push.rs b/tests/push.rs index b1f559b5..a82f44e7 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -35,11 +35,11 @@ async fn try_push_respects_policy() { async fn push_queues_error_on_closed() { let (queues, handle) = PushQueues::bounded(1, 1); - drop(queues.high_priority_rx); + let mut queues = queues; + queues.close(); let res = handle.push_high_priority(42u8).await; assert!(matches!(res, Err(PushError::Closed))); - drop(queues.low_priority_rx); let res = handle.push_low_priority(24u8).await; assert!(matches!(res, Err(PushError::Closed))); } From 8dd96c430d639b5cadfc8b54cd4001239737cae1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 25 Jun 2025 22:51:54 +0100 Subject: [PATCH 8/8] Add accessors for ConnectionActor fields --- src/connection.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index ef10e784..ad8deea1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -38,6 +38,20 @@ where } } + /// Access the underlying push queues. + /// + /// This is mainly used in tests to close the queues when no actor is + /// draining them. + #[must_use] + pub fn queues_mut(&mut self) -> &mut PushQueues { &mut self.queues } + + /// Set or replace the current streaming response. + pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + + /// Get a clone of the shutdown token used by the actor. + #[must_use] + pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + /// Drive the actor until all sources are exhausted or shutdown is triggered. /// /// Frames are appended to `out` in the order they are processed.