From 2839caf13f85c5545da9d5114944b0f8578ebd95 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 01:28:44 +0100 Subject: [PATCH 1/5] Implement fairness counter --- ...asynchronous-outbound-messaging-roadmap.md | 2 +- src/connection.rs | 87 ++++++++++++++++++- tests/connection_actor.rs | 27 +++++- 3 files changed, 113 insertions(+), 3 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index 0d93d467..1e8e1644 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -13,7 +13,7 @@ design documents. - [x] **Connection actor** with a biased `select!` loop that polls for shutdown, high/low queues and response streams as described in [Design §3.2][design-write-loop]. -- [ ] **Fairness counter** to yield to the low-priority queue after bursts of +- [x] **Fairness counter** to yield to the low-priority queue after bursts of high-priority frames ([Design §3.2.1][design-fairness]). - [ ] **Run state consolidation** using `Option` receivers and a closed source counter ([Design §3.4][design-actor-state]). diff --git a/src/connection.rs b/src/connection.rs index a5e0d75d..a4b27d40 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -6,7 +6,10 @@ //! low-priority ones, with streamed responses handled last. use futures::StreamExt; -use tokio::sync::mpsc; +use tokio::{ + sync::mpsc, + time::{Duration, Instant}, +}; use tokio_util::sync::CancellationToken; use crate::{ @@ -16,11 +19,34 @@ use crate::{ }; /// Actor driving outbound frame delivery for a connection. +/// Configuration controlling fairness when draining push queues. +#[derive(Clone, Copy)] +pub struct FairnessConfig { + /// Number of consecutive high-priority frames to process before + /// checking the low-priority queue. + pub max_high_before_low: usize, + /// Optional time slice after which the low-priority queue is checked + /// if high-priority traffic has been continuous. + pub time_slice: Option, +} + +impl Default for FairnessConfig { + fn default() -> Self { + Self { + max_high_before_low: 16, + time_slice: None, + } + } +} + pub struct ConnectionActor { queues: PushQueues, response: Option>, // current streaming response shutdown: CancellationToken, hooks: ProtocolHooks, + fairness: FairnessConfig, + high_counter: usize, + high_start: Option, } impl ConnectionActor @@ -50,6 +76,9 @@ where response, shutdown, hooks, + fairness: FairnessConfig::default(), + high_counter: 0, + high_start: None, } } @@ -60,6 +89,9 @@ where #[must_use] pub fn queues_mut(&mut self) -> &mut PushQueues { &mut self.queues } + /// Replace the fairness configuration. + pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; } + /// Set or replace the current streaming response. pub fn set_response(&mut self, stream: Option>) { self.response = stream; } @@ -105,15 +137,29 @@ where } res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => { + let processed = res.is_some(); self.handle_push(res, &mut state.push.high, out); + if processed { + self.after_high(out, &mut state.push.low); + } else { + self.reset_high_counter(); + } } res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => { + let processed = res.is_some(); self.handle_push(res, &mut state.push.low, out); + if processed { + self.after_low(); + } } res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { + let processed = matches!(res, Some(Ok(_))); self.handle_response(res, &mut state.resp_closed, out)?; + if processed { + self.after_low(); + } if state.resp_closed { self.response = None; } @@ -142,6 +188,45 @@ where } } + fn after_high(&mut self, out: &mut Vec, low_closed: &mut bool) { + self.high_counter += 1; + if self.high_counter == 1 { + self.high_start = Some(Instant::now()); + } + + let threshold_hit = self.fairness.max_high_before_low > 0 + && self.high_counter >= self.fairness.max_high_before_low; + let time_hit = + if let (Some(slice), Some(start)) = (self.fairness.time_slice, self.high_start) { + start.elapsed() >= slice + } else { + false + }; + + if threshold_hit || time_hit { + match self.queues.low_priority_rx.try_recv() { + Ok(mut frame) => { + self.hooks.before_send(&mut frame); + out.push(frame); + self.after_low(); + *low_closed = false; + } + Err(mpsc::error::TryRecvError::Empty) => {} + Err(mpsc::error::TryRecvError::Disconnected) => *low_closed = true, + } + } + } + + fn after_low(&mut self) { + self.high_counter = 0; + self.high_start = None; + } + + fn reset_high_counter(&mut self) { + self.high_counter = 0; + self.high_start = None; + } + fn handle_response( &mut self, res: Option>>, diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index aacf092b..7cbd0f17 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -8,7 +8,7 @@ use rstest::{fixture, rstest}; use tokio::time::{Duration, sleep, timeout}; use tokio_util::sync::CancellationToken; use wireframe::{ - connection::ConnectionActor, + connection::{ConnectionActor, FairnessConfig}, push::PushQueues, response::{FrameStream, WireframeError}, }; @@ -44,6 +44,31 @@ async fn strict_priority_order( assert_eq!(out, vec![1, 2, 3]); } +#[rstest] +#[tokio::test] +async fn fairness_yields_low_after_burst( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, handle) = queues; + let fairness = FairnessConfig { + max_high_before_low: 2, + time_slice: None, + }; + + for n in 1..=5 { + handle.push_high_priority(n).await.unwrap(); + } + handle.push_low_priority(99).await.unwrap(); + drop(handle); + + let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token); + actor.set_fairness(fairness); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![1, 2, 99, 3, 4, 5]); +} + #[rstest] #[tokio::test] async fn shutdown_signal_precedence( From 0099d9ef6de354959f846459faa5f8dc12f8958d Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 03:50:24 +0100 Subject: [PATCH 2/5] Add fairness sequence diagram --- .../asynchronous-outbound-messaging-design.md | 25 ++++++++++ docs/contents.md | 46 +++++++++---------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 86e49e78..d1749132 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -181,6 +181,31 @@ flowchart TD K --> A ``` +The following sequence diagram illustrates the runtime behaviour: + +```mermaid +sequenceDiagram + participant Client + participant ConnectionActor + participant HighQueue + participant LowQueue + + loop While processing frames + ConnectionActor->>HighQueue: Poll high-priority frame + alt High-priority frame received + ConnectionActor->>ConnectionActor: after_high() + alt Max high before low or time slice reached + ConnectionActor->>LowQueue: Try receive low-priority frame + alt Low-priority frame available + ConnectionActor->>ConnectionActor: after_low() + end + end + else No high-priority frame + ConnectionActor->>ConnectionActor: reset_high_counter() + end + end +``` + ### 3.3 Connection Actor Overview ```mermaid diff --git a/docs/contents.md b/docs/contents.md index c131d034..cbedb31f 100644 --- a/docs/contents.md +++ b/docs/contents.md @@ -1,61 +1,59 @@ # Documentation contents -This page summarizes each file in the `docs/` directory. -Use it as a quick reference when navigating the project's design and -architecture material. +This page summarizes each file in the `docs/` directory. Use it as a quick +reference when navigating the project's design and architecture material. ## Design documents -- [Outbound messaging design](asynchronous-outbound-messaging-design.md) +- [Outbound messaging design](asynchronous-outbound-messaging-design.md)\ Comprehensive design for server-initiated pushes. -- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md) +- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md)\ Design for fragmenting and reassembling messages. -- [Streaming response design](multi-packet-and-streaming-responses-design.md) +- [Streaming response design](multi-packet-and-streaming-responses-design.md)\ Design for handling multi-packet and streaming responses. -- [Router library design](rust-binary-router-library-design.md) +- [Router library design](rust-binary-router-library-design.md)\ In-depth design of the binary router library. -- [Client design](wireframe-client-design.md) +- [Client design](wireframe-client-design.md)\ Proposal for adding client-side support. -- [Frame metadata](frame-metadata.md) +- [Frame metadata](frame-metadata.md)\ How frame metadata assists with routing decisions. -- [Message versioning](message-versioning.md) +- [Message versioning](message-versioning.md)\ Approaches to message versioning and compatibility. -- [Preamble validator](preamble-validator.md) +- [Preamble validator](preamble-validator.md)\ Validating client connection preambles. ## Roadmaps -- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md) +- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md)\ Task list for implementing asynchronous outbound messaging. -- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md) +- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md)\ Detailed tasks leading to Wireframe 1.0. -- [Project roadmap](roadmap.md) +- [Project roadmap](roadmap.md)\ High-level development roadmap. -- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md) +- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md)\ Philosophy and feature set for Wireframe 1.0. ## Testing -- [Multi-layered testing strategy](multi-layered-testing-strategy.md) +- [Multi-layered testing strategy](multi-layered-testing-strategy.md)\ Overview of the library's testing approach. -- [RSTest fixtures](rust-testing-with-rstest-fixtures.md) +- [RSTest fixtures](rust-testing-with-rstest-fixtures.md)\ Using `rstest` fixtures for clean tests. -- [Mocking network outages](mocking-network-outages-in-rust.md) +- [Mocking network outages](mocking-network-outages-in-rust.md)\ Tutorial for simulating network outages in tests. -- [Testing helpers](wireframe-testing-crate.md) +- [Testing helpers](wireframe-testing-crate.md)\ Planned companion crate with testing helpers. ## Operations and resilience -- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md) +- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md)\ Guidance on achieving production resilience. -- [Observability and operability](observability-operability-and-maturity.md) +- [Observability and operability](observability-operability-and-maturity.md)\ Guide to observability and operational maturity. ## Reference guides -- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md) +- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md)\ Strategies for taming code complexity and refactoring. -- [Documentation style guide](documentation-style-guide.md) +- [Documentation style guide](documentation-style-guide.md)\ Conventions for writing project documentation. - From 3af401666cd2a9a7f1140f7e2f7960ab70018a7f Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 03:50:29 +0100 Subject: [PATCH 3/5] Refactor actor polling logic --- src/connection.rs | 66 +++++++++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index a4b27d40..f62cece7 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -132,43 +132,65 @@ where biased; () = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => { - state.shutting_down = true; - self.start_shutdown(&mut state.resp_closed); + self.process_shutdown(state); } res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => { - let processed = res.is_some(); - self.handle_push(res, &mut state.push.high, out); - if processed { - self.after_high(out, &mut state.push.low); - } else { - self.reset_high_counter(); - } + self.process_high(res, state, out); } res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => { - let processed = res.is_some(); - self.handle_push(res, &mut state.push.low, out); - if processed { - self.after_low(); - } + self.process_low(res, state, out); } res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { - let processed = matches!(res, Some(Ok(_))); - self.handle_response(res, &mut state.resp_closed, out)?; - if processed { - self.after_low(); - } - if state.resp_closed { - self.response = None; - } + self.process_response(res, state, out)?; } } Ok(()) } + fn process_shutdown(&mut self, state: &mut ActorState) { + state.shutting_down = true; + self.start_shutdown(&mut state.resp_closed); + } + + fn process_high(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { + let processed = res.is_some(); + self.handle_push(res, &mut state.push.high, out); + if processed { + self.after_high(out, &mut state.push.low); + } else { + self.reset_high_counter(); + } + } + + fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { + let processed = res.is_some(); + self.handle_push(res, &mut state.push.low, out); + if processed { + self.after_low(); + } + } + + fn process_response( + &mut self, + res: Option>>, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + let processed = matches!(res, Some(Ok(_))); + self.handle_response(res, &mut state.resp_closed, out)?; + if processed { + self.after_low(); + } + if state.resp_closed { + self.response = None; + } + Ok(()) + } + fn start_shutdown(&mut self, resp_closed: &mut bool) { self.queues.high_priority_rx.close(); self.queues.low_priority_rx.close(); From 168c30581213baf894d77abf3d6633d3416ad5fc Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 04:05:43 +0100 Subject: [PATCH 4/5] Document fairness config and clean helpers --- docs/asynchronous-outbound-messaging-design.md | 5 +++-- src/connection.rs | 9 ++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index d1749132..73cbbbd5 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -147,8 +147,9 @@ elapsed time spent handling high-priority frames exceeds this slice, and the low queue is not empty, the actor yields to a low-priority frame. Application builders expose `with_fairness(FairnessConfig)` where `FairnessConfig` groups the counter threshold and an optional `time_slice`. The counter defaults to 16 -while `time_slice` is disabled. Setting the counter to zero preserves the -original strict ordering. +while `time_slice` is disabled. Setting the counter to zero disables the +threshold logic and relies solely on `time_slice` for fairness, preserving +strict high-priority ordering otherwise. This fairness mechanism ensures low-priority traffic continues to progress even under sustained high-priority load. diff --git a/src/connection.rs b/src/connection.rs index f62cece7..43dd9f31 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -18,13 +18,14 @@ use crate::{ response::{FrameStream, WireframeError}, }; -/// Actor driving outbound frame delivery for a connection. /// Configuration controlling fairness when draining push queues. #[derive(Clone, Copy)] pub struct FairnessConfig { /// Number of consecutive high-priority frames to process before /// checking the low-priority queue. pub max_high_before_low: usize, + /// A zero value disables the counter and relies solely on `time_slice` for + /// fairness, preserving strict high-priority ordering otherwise. /// Optional time slice after which the low-priority queue is checked /// if high-priority traffic has been continuous. pub time_slice: Option, @@ -39,6 +40,7 @@ impl Default for FairnessConfig { } } +/// Actor driving outbound frame delivery for a connection. pub struct ConnectionActor { queues: PushQueues, response: Option>, // current streaming response @@ -239,10 +241,7 @@ where } } - fn after_low(&mut self) { - self.high_counter = 0; - self.high_start = None; - } + fn after_low(&mut self) { self.reset_high_counter(); } fn reset_high_counter(&mut self) { self.high_counter = 0; From 4000d6108175f5426591559f7c0fd148085f1cd9 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 10:17:41 +0100 Subject: [PATCH 5/5] Tweak fairness helper and defaults --- .../asynchronous-outbound-messaging-design.md | 2 +- docs/contents.md | 63 +++++++++---------- src/connection.rs | 24 +++---- 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 73cbbbd5..dab955b8 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -146,7 +146,7 @@ An optional time slice (for example 100 µs) can also be configured. When the elapsed time spent handling high-priority frames exceeds this slice, and the low queue is not empty, the actor yields to a low-priority frame. Application builders expose `with_fairness(FairnessConfig)` where `FairnessConfig` groups -the counter threshold and an optional `time_slice`. The counter defaults to 16 +the counter threshold and an optional `time_slice`. The counter defaults to 8 while `time_slice` is disabled. Setting the counter to zero disables the threshold logic and relies solely on `time_slice` for fairness, preserving strict high-priority ordering otherwise. diff --git a/docs/contents.md b/docs/contents.md index cbedb31f..b4607b6c 100644 --- a/docs/contents.md +++ b/docs/contents.md @@ -5,55 +5,54 @@ reference when navigating the project's design and architecture material. ## Design documents -- [Outbound messaging design](asynchronous-outbound-messaging-design.md)\ +- [Outbound messaging design](asynchronous-outbound-messaging-design.md) Comprehensive design for server-initiated pushes. -- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md)\ +- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md) Design for fragmenting and reassembling messages. -- [Streaming response design](multi-packet-and-streaming-responses-design.md)\ +- [Streaming response design](multi-packet-and-streaming-responses-design.md) Design for handling multi-packet and streaming responses. -- [Router library design](rust-binary-router-library-design.md)\ - In-depth design of the binary router library. -- [Client design](wireframe-client-design.md)\ - Proposal for adding client-side support. -- [Frame metadata](frame-metadata.md)\ - How frame metadata assists with routing decisions. -- [Message versioning](message-versioning.md)\ - Approaches to message versioning and compatibility. -- [Preamble validator](preamble-validator.md)\ - Validating client connection preambles. +- [Router library design](rust-binary-router-library-design.md) In-depth design + of the binary router library. +- [Client design](wireframe-client-design.md) Proposal for adding client-side + support. +- [Frame metadata](frame-metadata.md) How frame metadata assists with routing + decisions. +- [Message versioning](message-versioning.md) Approaches to message versioning + and compatibility. +- [Preamble validator](preamble-validator.md) Validating client connection + preambles. ## Roadmaps -- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md)\ - Task list for implementing asynchronous outbound messaging. -- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md)\ +- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md) Task + list for implementing asynchronous outbound messaging. +- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md) Detailed tasks leading to Wireframe 1.0. -- [Project roadmap](roadmap.md)\ - High-level development roadmap. -- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md)\ +- [Project roadmap](roadmap.md) High-level development roadmap. +- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md) Philosophy and feature set for Wireframe 1.0. ## Testing -- [Multi-layered testing strategy](multi-layered-testing-strategy.md)\ - Overview of the library's testing approach. -- [RSTest fixtures](rust-testing-with-rstest-fixtures.md)\ - Using `rstest` fixtures for clean tests. -- [Mocking network outages](mocking-network-outages-in-rust.md)\ - Tutorial for simulating network outages in tests. -- [Testing helpers](wireframe-testing-crate.md)\ - Planned companion crate with testing helpers. +- [Multi-layered testing strategy](multi-layered-testing-strategy.md) Overview + of the library's testing approach. +- [RSTest fixtures](rust-testing-with-rstest-fixtures.md) Using `rstest` + fixtures for clean tests. +- [Mocking network outages](mocking-network-outages-in-rust.md) Tutorial for + simulating network outages in tests. +- [Testing helpers](wireframe-testing-crate.md) Planned companion crate with + testing helpers. ## Operations and resilience -- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md)\ +- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md) Guidance on achieving production resilience. -- [Observability and operability](observability-operability-and-maturity.md)\ +- [Observability and operability](observability-operability-and-maturity.md) Guide to observability and operational maturity. ## Reference guides -- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md)\ +- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md) Strategies for taming code complexity and refactoring. -- [Documentation style guide](documentation-style-guide.md)\ - Conventions for writing project documentation. +- [Documentation style guide](documentation-style-guide.md) Conventions for + writing project documentation. diff --git a/src/connection.rs b/src/connection.rs index 43dd9f31..53b145fe 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -34,7 +34,7 @@ pub struct FairnessConfig { impl Default for FairnessConfig { fn default() -> Self { Self { - max_high_before_low: 16, + max_high_before_low: 8, time_slice: None, } } @@ -218,16 +218,7 @@ where self.high_start = Some(Instant::now()); } - let threshold_hit = self.fairness.max_high_before_low > 0 - && self.high_counter >= self.fairness.max_high_before_low; - let time_hit = - if let (Some(slice), Some(start)) = (self.fairness.time_slice, self.high_start) { - start.elapsed() >= slice - } else { - false - }; - - if threshold_hit || time_hit { + if self.should_yield_to_low_priority() { match self.queues.low_priority_rx.try_recv() { Ok(mut frame) => { self.hooks.before_send(&mut frame); @@ -241,6 +232,17 @@ where } } + fn should_yield_to_low_priority(&self) -> bool { + let threshold_hit = self.fairness.max_high_before_low > 0 + && self.high_counter >= self.fairness.max_high_before_low; + let time_hit = self + .fairness + .time_slice + .zip(self.high_start) + .is_some_and(|(slice, start)| start.elapsed() >= slice); + threshold_hit || time_hit + } + fn after_low(&mut self) { self.reset_high_counter(); } fn reset_high_counter(&mut self) {