From 9fb7f1752fce3867ca87c0da274a916f9d89be51 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 02:09:07 +0100 Subject: [PATCH 1/2] Refactor fairness with injectable clock --- examples/metadata_routing.rs | 5 +- src/connection.rs | 72 ++++++++++++++++++----------- src/fairness.rs | 90 ++++++++++++++++++++++++++---------- src/push.rs | 16 ++++--- src/server.rs | 17 ++++--- 5 files changed, 130 insertions(+), 70 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index eb53e3d0..25060cf8 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,10 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[expect( - dead_code, - reason = "placeholder for demonstration of metadata routing" -)] +#[allow(dead_code)] // placeholder for demonstration of metadata routing struct Pong; #[tokio::main] diff --git a/src/connection.rs b/src/connection.rs index 4fdb1a5d..2d3abe84 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -40,10 +40,12 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } +pub fn active_connection_count() -> u64 { + ACTIVE_CONNECTIONS.load(Ordering::Relaxed) +} use crate::{ - fairness::Fairness, + fairness::FairnessTracker, hooks::{ConnectionContext, ProtocolHooks}, push::{FrameLike, PushHandle, PushQueues}, response::{FrameStream, WireframeError}, @@ -108,7 +110,7 @@ pub struct ConnectionActor { counter: Option, hooks: ProtocolHooks, ctx: ConnectionContext, - fairness: Fairness, + fairness: FairnessTracker, connection_id: Option, peer_addr: Option, } @@ -166,7 +168,7 @@ where counter: Some(counter), hooks, ctx, - fairness: Fairness::new(FairnessConfig::default()), + fairness: FairnessTracker::new(FairnessConfig::default()), connection_id: None, peer_addr: None, }; @@ -182,14 +184,20 @@ where } /// Replace the fairness configuration. - pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } + pub fn set_fairness(&mut self, fairness: FairnessConfig) { + self.fairness.set_config(fairness); + } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + pub fn set_response(&mut self, stream: Option>) { + self.response = stream; + } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown.clone() + } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -393,26 +401,28 @@ where fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { self.fairness.after_high(); - if self.fairness.should_yield() - && let Some(rx) = &mut self.low_rx - { - match rx.try_recv() { - Ok(mut frame) => { - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - self.after_low(); - } - Err(mpsc::error::TryRecvError::Empty) => {} - Err(mpsc::error::TryRecvError::Disconnected) => { - self.low_rx = None; - state.mark_closed(); + if self.fairness.should_yield_to_low_priority() { + if let Some(rx) = &mut self.low_rx { + match rx.try_recv() { + Ok(mut frame) => { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + self.after_low(); + } + Err(mpsc::error::TryRecvError::Empty) => {} + Err(mpsc::error::TryRecvError::Disconnected) => { + self.low_rx = None; + state.mark_closed(); + } } } } } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { self.fairness.after_low(); } + fn after_low(&mut self) { + self.fairness.after_low(); + } /// Push a frame from the response stream into `out` or handle completion. /// @@ -449,11 +459,15 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + async fn wait_shutdown(token: CancellationToken) { + token.cancelled_owned().await; + } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { + rx.recv().await + } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -536,11 +550,17 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + fn is_active(&self) -> bool { + matches!(self.run_state, RunState::Active) + } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } + fn is_shutting_down(&self) -> bool { + matches!(self.run_state, RunState::ShuttingDown) + } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } + fn is_done(&self) -> bool { + matches!(self.run_state, RunState::Finished) + } } diff --git a/src/fairness.rs b/src/fairness.rs index 8463b70f..fd5af2d7 100644 --- a/src/fairness.rs +++ b/src/fairness.rs @@ -2,54 +2,94 @@ //! //! This module encapsulates the logic for deciding when high-priority //! processing should yield to low-priority traffic based on configured -//! thresholds and optional time slices. +//! thresholds and optional time slices. A pluggable clock decouples the +//! implementation from the runtime and allows deterministic tests. -use tokio::time::Instant; +use std::time::Duration; use crate::connection::FairnessConfig; +/// Abstraction over a time source returning [`Instant`]s. +pub(crate) trait Clock { + type Instant: Copy; + fn now(&self) -> Self::Instant; + fn elapsed(&self, start: Self::Instant) -> Duration; +} + +#[derive(Debug, Default)] +pub(crate) struct TokioClock; + +impl Clock for TokioClock { + type Instant = tokio::time::Instant; + fn now(&self) -> Self::Instant { + tokio::time::Instant::now() + } + fn elapsed(&self, start: Self::Instant) -> Duration { + start.elapsed() + } +} + #[derive(Debug)] -pub(crate) struct Fairness { +pub(crate) struct FairnessTracker { config: FairnessConfig, high_counter: usize, - high_start: Option, + high_start: Option, + clock: C, } -impl Fairness { +impl FairnessTracker { pub(crate) fn new(config: FairnessConfig) -> Self { + Self::with_clock(config, TokioClock) + } +} + +impl FairnessTracker { + pub(crate) fn with_clock(config: FairnessConfig, clock: C) -> Self { Self { config, high_counter: 0, high_start: None, + clock, } } pub(crate) fn set_config(&mut self, config: FairnessConfig) { self.config = config; - self.reset(); + self.clear(); } pub(crate) fn after_high(&mut self) { self.high_counter += 1; if self.high_counter == 1 { - self.high_start = Some(Instant::now()); + self.high_start = Some(self.clock.now()); } } - pub(crate) fn should_yield(&self) -> bool { - let threshold_hit = self.config.max_high_before_low > 0 - && self.high_counter >= self.config.max_high_before_low; - let time_hit = self - .config - .time_slice - .zip(self.high_start) - .is_some_and(|(slice, start)| start.elapsed() >= slice); - threshold_hit || time_hit + pub(crate) fn should_yield_to_low_priority(&self) -> bool { + if self.config.max_high_before_low > 0 + && self.high_counter >= self.config.max_high_before_low + { + return true; + } + + if let (Some(slice), Some(start)) = (self.config.time_slice, self.high_start) { + if self.clock.elapsed(start) >= slice { + return true; + } + } + + false } - pub(crate) fn after_low(&mut self) { self.reset(); } + pub(crate) fn after_low(&mut self) { + self.clear(); + } pub(crate) fn reset(&mut self) { + self.clear(); + } + + fn clear(&mut self) { self.high_counter = 0; self.high_start = None; } @@ -69,11 +109,11 @@ mod tests { max_high_before_low: 2, time_slice: None, }; - let mut fairness = Fairness::new(cfg); + let mut fairness = FairnessTracker::new(cfg); fairness.after_high(); - assert!(!fairness.should_yield()); + assert!(!fairness.should_yield_to_low_priority()); fairness.after_high(); - assert!(fairness.should_yield()); + assert!(fairness.should_yield_to_low_priority()); } #[rstest] @@ -83,11 +123,11 @@ mod tests { max_high_before_low: 1, time_slice: None, }; - let mut fairness = Fairness::new(cfg); + let mut fairness = FairnessTracker::new(cfg); fairness.after_high(); - assert!(fairness.should_yield()); + assert!(fairness.should_yield_to_low_priority()); fairness.after_low(); - assert!(!fairness.should_yield()); + assert!(!fairness.should_yield_to_low_priority()); } #[rstest] @@ -98,9 +138,9 @@ mod tests { max_high_before_low: 0, time_slice: Some(Duration::from_millis(5)), }; - let mut fairness = Fairness::new(cfg); + let mut fairness = FairnessTracker::new(cfg); fairness.after_high(); time::advance(Duration::from_millis(6)).await; - assert!(fairness.should_yield()); + assert!(fairness.should_yield_to_low_priority()); } } diff --git a/src/push.rs b/src/push.rs index 01103745..eefce836 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,7 +97,9 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + pub(crate) fn from_arc(arc: Arc>) -> Self { + Self(arc) + } /// Internal helper to push a frame with the requested priority. /// @@ -253,7 +255,9 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } + pub(crate) fn downgrade(&self) -> Weak> { + Arc::downgrade(&self.0) + } } /// Receiver ends of the push queues stored by the connection actor. @@ -382,10 +386,10 @@ impl PushQueues { rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate - && (r == 0 || r > MAX_PUSH_RATE) - { - return Err(PushConfigError::InvalidRate(r)); + if let Some(r) = rate { + if r == 0 || r > MAX_PUSH_RATE { + return Err(PushConfigError::InvalidRate(r)); + } } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); diff --git a/src/server.rs b/src/server.rs index e637106c..a5d9d41a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] @@ -507,10 +509,10 @@ async fn process_stream( let peer_addr = stream.peer_addr().ok(); match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() - && let Err(e) = handler(&preamble, &mut stream).await - { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + if let Some(handler) = on_success.as_ref() { + if let Err(e) = handler(&preamble, &mut stream).await { + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + } } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing. @@ -558,10 +560,7 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[expect( - dead_code, - reason = "used only in doctest to illustrate an empty preamble" - )] + #[allow(dead_code)] // used only in doctest to illustrate an empty preamble struct EmptyPreamble; #[fixture] From 5002f77d226266037effdb36b1d2728df4f9ae72 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 02:29:49 +0100 Subject: [PATCH 2/2] Annotate allow attributes with reasons --- examples/metadata_routing.rs | 5 ++++- examples/ping_pong.rs | 5 ++++- src/server.rs | 5 ++++- wireframe_testing/src/helpers.rs | 6 ++++-- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 25060cf8..4ad97414 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,7 +60,10 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[allow(dead_code)] // placeholder for demonstration of metadata routing +#[allow( + dead_code, + reason = "placeholder for demonstration of metadata routing" +)] struct Pong; #[tokio::main] diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index edde533e..4ebf4e5d 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -40,7 +40,10 @@ const PING_ID: u32 = 1; /// /// The middleware chain generates the actual response, so this /// handler intentionally performs no work. -#[allow(clippy::unused_async)] +#[allow( + clippy::unused_async, + reason = "handler intentionally performs no work" +)] async fn ping_handler() {} struct PongMiddleware; diff --git a/src/server.rs b/src/server.rs index a5d9d41a..ee3a44b3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -560,7 +560,10 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[allow(dead_code)] // used only in doctest to illustrate an empty preamble + #[allow( + dead_code, + reason = "used only in doctest to illustrate an empty preamble" + )] struct EmptyPreamble; #[fixture] diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index cc3c81da..8556f1a8 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -19,7 +19,9 @@ use wireframe::{ unused_braces, reason = "Clippy is wrong here; this is not a redundant block" )] -pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() } +pub fn processor() -> LengthPrefixedProcessor { + LengthPrefixedProcessor::default() +} pub trait TestSerializer: Serializer + FrameMetadata + Send + Sync + 'static @@ -399,7 +401,7 @@ where /// # Ok(()) /// # } /// ``` -#[allow(dead_code)] +#[allow(dead_code, reason = "used in documentation examples")] pub async fn run_app_with_frames( app: WireframeApp, frames: Vec>,