From 7706c5e1253c71f1425f4f04980314ef00f45aff Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 5 Aug 2025 22:44:37 +0100 Subject: [PATCH 1/3] Inject clock into fairness tracker --- src/connection.rs | 12 +++---- src/fairness.rs | 86 ++++++++++++++++++++++++++++++----------------- 2 files changed, 62 insertions(+), 36 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 676807f4..0c1a74ae 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -46,7 +46,7 @@ impl Drop for ActiveConnection { pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ - fairness::Fairness, + fairness::FairnessTracker, hooks::{ConnectionContext, ProtocolHooks}, push::{FrameLike, PushHandle, PushQueues}, response::{FrameStream, WireframeError}, @@ -111,7 +111,7 @@ pub struct ConnectionActor { counter: Option, hooks: ProtocolHooks, ctx: ConnectionContext, - fairness: Fairness, + fairness: FairnessTracker, connection_id: Option, peer_addr: Option, } @@ -169,7 +169,7 @@ where counter: Some(counter), hooks, ctx, - fairness: Fairness::new(FairnessConfig::default()), + fairness: FairnessTracker::new(FairnessConfig::default()), connection_id: None, peer_addr: None, }; @@ -369,9 +369,9 @@ where /// Update counters and opportunistically drain the low-priority queue. fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { - self.fairness.after_high(); + self.fairness.record_high_priority(); - if self.fairness.should_yield() { + if self.fairness.should_yield_to_low_priority() { let res = self.low_rx.as_mut().map(mpsc::Receiver::try_recv); if let Some(res) = res { match res { @@ -390,7 +390,7 @@ where } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { self.fairness.after_low(); } + fn after_low(&mut self) { self.fairness.record_low_priority(); } /// Push a frame from the response stream into `out` or handle completion. /// diff --git a/src/fairness.rs b/src/fairness.rs index 8463b70f..05d7cfe5 100644 --- a/src/fairness.rs +++ b/src/fairness.rs @@ -2,23 +2,44 @@ //! //! This module encapsulates the logic for deciding when high-priority //! processing should yield to low-priority traffic based on configured -//! thresholds and optional time slices. +//! thresholds and optional time slices. A pluggable [`Clock`] allows the +//! timing logic to be tested without depending on Tokio's global time. use tokio::time::Instant; use crate::connection::FairnessConfig; +/// Time source used by [`FairnessTracker`]. +pub(crate) trait Clock: Copy { + /// Return the current instant. + fn now(&self) -> Instant; +} + +/// Clock implementation backed by [`tokio::time`]. +#[derive(Clone, Copy, Debug, Default)] +pub(crate) struct TokioClock; + +impl Clock for TokioClock { + fn now(&self) -> Instant { Instant::now() } +} + #[derive(Debug)] -pub(crate) struct Fairness { +pub(crate) struct FairnessTracker { config: FairnessConfig, + clock: C, high_counter: usize, high_start: Option, } -impl Fairness { - pub(crate) fn new(config: FairnessConfig) -> Self { +impl FairnessTracker { + pub(crate) fn new(config: FairnessConfig) -> Self { Self::with_clock(config, TokioClock) } +} + +impl FairnessTracker { + pub(crate) fn with_clock(config: FairnessConfig, clock: C) -> Self { Self { config, + clock, high_counter: 0, high_start: None, } @@ -29,27 +50,32 @@ impl Fairness { self.reset(); } - pub(crate) fn after_high(&mut self) { + pub(crate) fn record_high_priority(&mut self) { self.high_counter += 1; if self.high_counter == 1 { - self.high_start = Some(Instant::now()); + self.high_start = Some(self.clock.now()); } } - pub(crate) fn should_yield(&self) -> bool { - let threshold_hit = self.config.max_high_before_low > 0 - && self.high_counter >= self.config.max_high_before_low; - let time_hit = self - .config - .time_slice - .zip(self.high_start) - .is_some_and(|(slice, start)| start.elapsed() >= slice); - threshold_hit || time_hit + pub(crate) fn should_yield_to_low_priority(&self) -> bool { + if self.config.max_high_before_low > 0 + && self.high_counter >= self.config.max_high_before_low + { + return true; + } + + if let (Some(slice), Some(start)) = (self.config.time_slice, self.high_start) { + return self.clock.now().duration_since(start) >= slice; + } + + false } - pub(crate) fn after_low(&mut self) { self.reset(); } + pub(crate) fn record_low_priority(&mut self) { self.clear(); } + + pub(crate) fn reset(&mut self) { self.clear(); } - pub(crate) fn reset(&mut self) { + fn clear(&mut self) { self.high_counter = 0; self.high_start = None; } @@ -69,11 +95,11 @@ mod tests { max_high_before_low: 2, time_slice: None, }; - let mut fairness = Fairness::new(cfg); - fairness.after_high(); - assert!(!fairness.should_yield()); - fairness.after_high(); - assert!(fairness.should_yield()); + let mut fairness = FairnessTracker::new(cfg); + fairness.record_high_priority(); + assert!(!fairness.should_yield_to_low_priority()); + fairness.record_high_priority(); + assert!(fairness.should_yield_to_low_priority()); } #[rstest] @@ -83,11 +109,11 @@ mod tests { max_high_before_low: 1, time_slice: None, }; - let mut fairness = Fairness::new(cfg); - fairness.after_high(); - assert!(fairness.should_yield()); - fairness.after_low(); - assert!(!fairness.should_yield()); + let mut fairness = FairnessTracker::new(cfg); + fairness.record_high_priority(); + assert!(fairness.should_yield_to_low_priority()); + fairness.record_low_priority(); + assert!(!fairness.should_yield_to_low_priority()); } #[rstest] @@ -98,9 +124,9 @@ mod tests { max_high_before_low: 0, time_slice: Some(Duration::from_millis(5)), }; - let mut fairness = Fairness::new(cfg); - fairness.after_high(); + let mut fairness = FairnessTracker::new(cfg); + fairness.record_high_priority(); time::advance(Duration::from_millis(6)).await; - assert!(fairness.should_yield()); + assert!(fairness.should_yield_to_low_priority()); } } From c4a2bc9bb632961cf3f05360387b581cb4c13d04 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 6 Aug 2025 06:41:52 +0100 Subject: [PATCH 2/3] Relax clock bounds and unify fairness reset --- src/connection.rs | 2 +- src/fairness.rs | 46 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 0c1a74ae..9d7a11d8 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -390,7 +390,7 @@ where } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { self.fairness.record_low_priority(); } + fn after_low(&mut self) { self.fairness.reset(); } /// Push a frame from the response stream into `out` or handle completion. /// diff --git a/src/fairness.rs b/src/fairness.rs index 05d7cfe5..ed6dc7c9 100644 --- a/src/fairness.rs +++ b/src/fairness.rs @@ -10,13 +10,13 @@ use tokio::time::Instant; use crate::connection::FairnessConfig; /// Time source used by [`FairnessTracker`]. -pub(crate) trait Clock: Copy { +pub(crate) trait Clock: Clone { /// Return the current instant. fn now(&self) -> Instant; } /// Clock implementation backed by [`tokio::time`]. -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Debug, Default)] pub(crate) struct TokioClock; impl Clock for TokioClock { @@ -71,8 +71,6 @@ impl FairnessTracker { false } - pub(crate) fn record_low_priority(&mut self) { self.clear(); } - pub(crate) fn reset(&mut self) { self.clear(); } fn clear(&mut self) { @@ -83,8 +81,10 @@ impl FairnessTracker { #[cfg(test)] mod tests { + use std::sync::{Arc, Mutex}; + use rstest::rstest; - use tokio::time::{self, Duration}; + use tokio::time::{Duration, Instant}; use super::*; @@ -112,21 +112,45 @@ mod tests { let mut fairness = FairnessTracker::new(cfg); fairness.record_high_priority(); assert!(fairness.should_yield_to_low_priority()); - fairness.record_low_priority(); + fairness.reset(); assert!(!fairness.should_yield_to_low_priority()); } + #[derive(Clone, Debug)] + struct MockClock { + now: Arc>, + } + + impl MockClock { + fn new(start: Instant) -> Self { + Self { + now: Arc::new(Mutex::new(start)), + } + } + + fn advance(&self, dur: Duration) { + let mut now = self.now.lock().expect("lock poisoned"); + *now += dur; + } + } + + impl Clock for MockClock { + fn now(&self) -> Instant { *self.now.lock().expect("lock poisoned") } + } + #[rstest] - #[tokio::test] - async fn time_slice_triggers_yield() { - time::pause(); + #[test] + fn time_slice_triggers_yield() { + let start = Instant::now(); + let clock = MockClock::new(start); let cfg = FairnessConfig { max_high_before_low: 0, time_slice: Some(Duration::from_millis(5)), }; - let mut fairness = FairnessTracker::new(cfg); + let mut fairness = FairnessTracker::with_clock(cfg, clock.clone()); fairness.record_high_priority(); - time::advance(Duration::from_millis(6)).await; + assert!(!fairness.should_yield_to_low_priority()); + clock.advance(Duration::from_millis(5)); assert!(fairness.should_yield_to_low_priority()); } } From 071609c91f297e7ab925ad7f63f543e5e24c5f35 Mon Sep 17 00:00:00 2001 From: Payton McIntosh Date: Wed, 6 Aug 2025 19:00:18 +0100 Subject: [PATCH 3/3] Refactors preamble callback tests and cleanup logic Consolidates success/failure callback tests into a parameterized test to reduce duplicated code Replaces redundant callback reset test while preserving verification that callbacks remain unset Improves test variable naming and ensures listener socket is properly cleaned up --- src/server/config.rs | 77 ++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 46 deletions(-) diff --git a/src/server/config.rs b/src/server/config.rs index d7c9e135..e0c08a09 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -315,38 +315,37 @@ mod tests { } #[rstest] + #[case("success")] + #[case("failure")] #[tokio::test] - async fn test_preamble_success_callback( + async fn test_preamble_callback_registration( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, + #[case] callback_type: &str, ) { let counter = Arc::new(AtomicUsize::new(0)); let c = counter.clone(); - let server = server_with_preamble(factory).on_preamble_decode_success( - move |_p: &TestPreamble, _| { + + let server = server_with_preamble(factory); + let server = match callback_type { + "success" => server.on_preamble_decode_success(move |_p: &TestPreamble, _| { let c = c.clone(); Box::pin(async move { c.fetch_add(1, Ordering::SeqCst); Ok(()) }) - }, - ); - assert_eq!(counter.load(Ordering::SeqCst), 0); - assert!(server.on_preamble_success.is_some()); - } - - #[rstest] - #[tokio::test] - async fn test_preamble_failure_callback( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let counter = Arc::new(AtomicUsize::new(0)); - let c = counter.clone(); - let server = - server_with_preamble(factory).on_preamble_decode_failure(move |_err: &DecodeError| { + }), + "failure" => server.on_preamble_decode_failure(move |_err: &DecodeError| { c.fetch_add(1, Ordering::SeqCst); - }); + }), + _ => panic!("Invalid callback type"), + }; + assert_eq!(counter.load(Ordering::SeqCst), 0); - assert!(server.on_preamble_failure.is_some()); + match callback_type { + "success" => assert!(server.on_preamble_success.is_some()), + "failure" => assert!(server.on_preamble_failure.is_some()), + _ => unreachable!(), + } } #[rstest] @@ -355,13 +354,13 @@ mod tests { factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, free_port: SocketAddr, ) { - let counter = Arc::new(AtomicUsize::new(0)); - let c = counter.clone(); + let callback_invoked = Arc::new(AtomicUsize::new(0)); + let counter = callback_invoked.clone(); let server = WireframeServer::new(factory) .workers(2) .with_preamble::() .on_preamble_decode_success(move |_p: &TestPreamble, _| { - let c = c.clone(); + let c = counter.clone(); Box::pin(async move { c.fetch_add(1, Ordering::SeqCst); Ok(()) @@ -372,7 +371,7 @@ mod tests { .expect("Failed to bind"); assert_eq!(server.worker_count(), 2); assert!(server.local_addr().is_some()); - assert!(server.on_preamble_success.is_some() && server.on_preamble_failure.is_some()); + assert_eq!(callback_invoked.load(Ordering::SeqCst), 0); } #[rstest] @@ -389,18 +388,6 @@ mod tests { assert!(server.local_addr().is_some()); } - #[rstest] - fn test_preamble_callbacks_reset_on_type_change( - factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, - ) { - let server = WireframeServer::new(factory) - .on_preamble_decode_success(|&(), _| Box::pin(async { Ok(()) })) - .on_preamble_decode_failure(|_: &DecodeError| {}); - assert!(server.on_preamble_success.is_some() && server.on_preamble_failure.is_some()); - let server = server.with_preamble::(); - assert!(server.on_preamble_success.is_none() && server.on_preamble_failure.is_none()); - } - #[rstest] fn test_extreme_worker_counts( factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, @@ -417,16 +404,14 @@ mod tests { factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static, free_port: SocketAddr, ) { - let addr2 = { - let listener = std::net::TcpListener::bind(SocketAddr::new( - std::net::Ipv4Addr::LOCALHOST.into(), - 0, - )) - .expect("failed to bind second listener"); - listener - .local_addr() - .expect("failed to get second listener address") - }; + let listener2 = + std::net::TcpListener::bind(SocketAddr::new(std::net::Ipv4Addr::LOCALHOST.into(), 0)) + .expect("failed to bind second listener"); + let addr2 = listener2 + .local_addr() + .expect("failed to get second listener address"); + drop(listener2); + let server = WireframeServer::new(factory); let server = server .bind(free_port)