diff --git a/Cargo.lock b/Cargo.lock index 3cd970da..a1ebc914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -446,6 +446,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-util" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.18" @@ -564,6 +577,7 @@ dependencies = [ "rstest", "serde", "tokio", + "tokio-util", "wireframe_testing", ] diff --git a/Cargo.toml b/Cargo.toml index fec8fb8d..b714a344 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" serde = { version = "1", features = ["derive"] } bincode = "2" tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time", "io-util"] } +tokio-util = "0.7" futures = "0.3" async-trait = "0.1" bytes = "1" diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index 2842fbc2..6cb4ece0 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -10,7 +10,7 @@ design documents. channels. See [Design §3.1][design-queues]. - [x] **Unified `Response` and `WireframeError` types** to capture protocol errors and transport failures ([Roadmap #1.1][roadmap-1-1]). -- [ ] **Connection actor** with a biased `select!` loop that polls for shutdown, +- [x] **Connection actor** with a biased `select!` loop that polls for shutdown, high/low queues and response streams as described in [Design §3.2][design-write-loop]. - [ ] **Internal protocol hooks** `before_send` and `on_command_end` invoked diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index 711e200d..05e788bd 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -9,10 +9,10 @@ breaking the project down into four distinct phases, each with a set of well-defined tasks. The dependencies between tasks are explicitly noted to ensure a logical and stable development progression. -As of this roadmap's publication, only the push queue utilities exist in -`src/push.rs`. No connection actor or write loop has been implemented. Phase 1 -therefore begins by introducing the connection actor with its biased `select!` -loop and integrating the push queues. +At the time of writing, the push queue utilities and the connection actor with +its biased `select!` write loop are implemented. The first phase therefore +focuses on integrating the remaining foundational pieces, preparing the API for +public consumption. ## Phase 1: Foundational Mechanics @@ -26,6 +26,7 @@ all public-facing features will be built.* | 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - | | 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 | | 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | + ## Phase 2: Public APIs & Developer Ergonomics *Focus: Exposing the new functionality to developers through a clean, ergonomic, @@ -38,6 +39,7 @@ and intuitive.* | 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and write documentation recommending `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 | | 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 | | 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 | + ## Phase 3: Production Hardening & Resilience *Focus: Adding the critical features required for robust, secure, and reliable diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 00000000..ad8deea1 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,172 @@ +//! Connection actor responsible for outbound frames. +//! +//! The actor polls a shutdown token, high- and low-priority push queues, +//! and an optional response stream using a `tokio::select!` loop. The +//! `biased` keyword ensures high-priority messages are processed before +//! low-priority ones, with streamed responses handled last. + +use futures::StreamExt; +use tokio_util::sync::CancellationToken; + +use crate::{ + push::{FrameLike, PushQueues}, + response::{FrameStream, WireframeError}, +}; + +/// Actor driving outbound frame delivery for a connection. +pub struct ConnectionActor { + queues: PushQueues, + response: Option>, // current streaming response + shutdown: CancellationToken, +} + +impl ConnectionActor +where + F: FrameLike, +{ + /// Create a new `ConnectionActor` from the provided components. + #[must_use] + pub fn new( + queues: PushQueues, + response: Option>, + shutdown: CancellationToken, + ) -> Self { + Self { + queues, + response, + shutdown, + } + } + + /// Access the underlying push queues. + /// + /// This is mainly used in tests to close the queues when no actor is + /// draining them. + #[must_use] + pub fn queues_mut(&mut self) -> &mut PushQueues { &mut self.queues } + + /// Set or replace the current streaming response. + pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + + /// Get a clone of the shutdown token used by the actor. + #[must_use] + pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + + /// Drive the actor until all sources are exhausted or shutdown is triggered. + /// + /// Frames are appended to `out` in the order they are processed. + /// + /// # Errors + /// + /// Returns a [`WireframeError`] if the response stream yields an error. + pub async fn run(&mut self, out: &mut Vec) -> Result<(), WireframeError> { + // If cancellation has already been requested, exit immediately. Nothing + // will be drained and any streaming response is abandoned. This mirrors + // a hard shutdown and is required for the tests. + if self.shutdown.is_cancelled() { + return Ok(()); + } + + let mut state = ActorState::new(self.response.is_none()); + + while !state.is_done() { + self.poll_sources(&mut state, out).await?; + } + + Ok(()) + } + + async fn poll_sources( + &mut self, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + tokio::select! { + biased; + + () = self.shutdown.cancelled(), if !state.shutting_down => { + state.shutting_down = true; + self.start_shutdown(&mut state.resp_closed); + } + + res = self.queues.high_priority_rx.recv(), if !state.push.high => { + Self::handle_push(res, &mut state.push.high, out); + } + + res = self.queues.low_priority_rx.recv(), if !state.push.low => { + Self::handle_push(res, &mut state.push.low, out); + } + + res = async { + if let Some(stream) = &mut self.response { + stream.next().await + } else { + None + } + }, if !state.shutting_down && !state.resp_closed => { + Self::handle_response(res, &mut state.resp_closed, out)?; + } + } + + Ok(()) + } + + fn start_shutdown(&mut self, resp_closed: &mut bool) { + self.queues.high_priority_rx.close(); + self.queues.low_priority_rx.close(); + // Drop any streaming response so shutdown is prompt. Queued frames are + // still drained, but streamed responses may be truncated. + self.response = None; + *resp_closed = true; + } + + fn handle_push(res: Option, closed: &mut bool, out: &mut Vec) { + match res { + Some(frame) => out.push(frame), + None => *closed = true, + } + } + + fn handle_response( + res: Option>>, + closed: &mut bool, + out: &mut Vec, + ) -> Result<(), WireframeError> { + match res { + Some(Ok(frame)) => out.push(frame), + Some(Err(e)) => return Err(e), + None => *closed = true, + } + + Ok(()) + } +} + +struct PushClosed { + high: bool, + low: bool, +} + +struct ActorState { + push: PushClosed, + resp_closed: bool, + shutting_down: bool, +} + +impl ActorState { + fn new(resp_closed: bool) -> Self { + Self { + push: PushClosed { + high: false, + low: false, + }, + resp_closed, + shutting_down: false, + } + } + + fn is_done(&self) -> bool { + let push_drained = self.push.high && self.push.low; + push_drained && (self.resp_closed || self.shutting_down) + } +} diff --git a/src/lib.rs b/src/lib.rs index bf6e0525..19524ed7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod app; pub mod serializer; pub use serializer::{BincodeSerializer, Serializer}; +pub mod connection; pub mod extractor; pub mod frame; pub mod message; @@ -17,4 +18,5 @@ pub mod response; pub mod rewind_stream; pub mod server; +pub use connection::ConnectionActor; pub use response::{FrameStream, Response, WireframeError}; diff --git a/src/push.rs b/src/push.rs index e9f33866..70ba9f08 100644 --- a/src/push.rs +++ b/src/push.rs @@ -120,8 +120,8 @@ impl PushHandle { /// Receiver ends of the push queues stored by the connection actor. pub struct PushQueues { - pub high_priority_rx: mpsc::Receiver, - pub low_priority_rx: mpsc::Receiver, + pub(crate) high_priority_rx: mpsc::Receiver, + pub(crate) low_priority_rx: mpsc::Receiver, } impl PushQueues { @@ -154,4 +154,13 @@ impl PushQueues { res = self.low_priority_rx.recv() => res.map(|f| (PushPriority::Low, f)), } } + + /// Close both receivers to prevent further pushes from being accepted. + /// + /// This is primarily used in tests to release resources when no actor is + /// draining the queues. + pub fn close(&mut self) { + self.high_priority_rx.close(); + self.low_priority_rx.close(); + } } diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs new file mode 100644 index 00000000..fbb53607 --- /dev/null +++ b/tests/connection_actor.rs @@ -0,0 +1,146 @@ +//! Tests for the `ConnectionActor` component. +//! +//! These cover priority order, shutdown behaviour, error propagation, +//! interleaved cancellation and back-pressure handling. + +use futures::stream; +use rstest::{fixture, rstest}; +use tokio::time::{Duration, sleep, timeout}; +use tokio_util::sync::CancellationToken; +use wireframe::{ + connection::ConnectionActor, + push::PushQueues, + response::{FrameStream, WireframeError}, +}; + +#[fixture] +#[allow(unused_braces)] +fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::bounded(8, 8) } + +#[fixture] +#[allow(unused_braces)] +fn shutdown_token() -> CancellationToken { CancellationToken::new() } + +#[fixture] +#[allow(unused_braces)] +fn empty_stream() -> Option> { None } + +#[rstest] +#[tokio::test] +async fn strict_priority_order( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, handle) = queues; + handle.push_low_priority(2).await.unwrap(); + handle.push_high_priority(1).await.unwrap(); + drop(handle); + + let stream = stream::iter(vec![Ok(3u8)]); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![1, 2, 3]); +} + +#[rstest] +#[tokio::test] +async fn shutdown_signal_precedence( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, _handle) = queues; + shutdown_token.cancel(); + let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert!(out.is_empty()); +} + +#[rstest] +#[tokio::test] +async fn complete_draining_of_sources( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, handle) = queues; + handle.push_high_priority(1).await.unwrap(); + drop(handle); + + let stream = stream::iter(vec![Ok(2u8), Ok(3u8)]); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![1, 2, 3]); +} + +#[derive(Debug)] +enum TestError { + Kaboom, +} + +#[rstest] +#[tokio::test] +async fn error_propagation_from_stream( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, _handle) = queues; + let stream = stream::iter(vec![ + Ok(1u8), + Ok(2u8), + Err(WireframeError::Protocol(TestError::Kaboom)), + ]); + let mut actor: ConnectionActor<_, TestError> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + let result = actor.run(&mut out).await; + assert!(matches!( + result, + Err(WireframeError::Protocol(TestError::Kaboom)) + )); + assert_eq!(out, vec![1, 2]); +} + +#[rstest] +#[tokio::test] +async fn interleaved_shutdown_during_stream( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, _handle) = queues; + let token = shutdown_token.clone(); + tokio::spawn(async move { + sleep(Duration::from_millis(50)).await; + token.cancel(); + }); + + let stream = stream::unfold(1u8, |i| async move { + if i <= 5 { + sleep(Duration::from_millis(20)).await; + Some((Ok(i), i + 1)) + } else { + None + } + }); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert!(!out.is_empty() && out.len() < 5); +} + +#[rstest] +#[tokio::test] +async fn push_queue_exhaustion_backpressure() { + let (mut queues, handle) = PushQueues::bounded(1, 1); + handle.push_high_priority(1).await.unwrap(); + + let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; + assert!(blocked.is_err()); + + // clean up without exposing internal fields + queues.close(); +} diff --git a/tests/push.rs b/tests/push.rs index b1f559b5..a82f44e7 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -35,11 +35,11 @@ async fn try_push_respects_policy() { async fn push_queues_error_on_closed() { let (queues, handle) = PushQueues::bounded(1, 1); - drop(queues.high_priority_rx); + let mut queues = queues; + queues.close(); let res = handle.push_high_priority(42u8).await; assert!(matches!(res, Err(PushError::Closed))); - drop(queues.low_priority_rx); let res = handle.push_low_priority(24u8).await; assert!(matches!(res, Err(PushError::Closed))); }