-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor fairness logic with injectable clock #249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,54 +2,94 @@ | |||||||||||||||||
| //! | ||||||||||||||||||
| //! This module encapsulates the logic for deciding when high-priority | ||||||||||||||||||
| //! processing should yield to low-priority traffic based on configured | ||||||||||||||||||
| //! thresholds and optional time slices. | ||||||||||||||||||
| //! thresholds and optional time slices. A pluggable clock decouples the | ||||||||||||||||||
| //! implementation from the runtime and allows deterministic tests. | ||||||||||||||||||
|
|
||||||||||||||||||
| use tokio::time::Instant; | ||||||||||||||||||
| use std::time::Duration; | ||||||||||||||||||
|
|
||||||||||||||||||
| use crate::connection::FairnessConfig; | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Abstraction over a time source returning [`Instant`]s. | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider removing the Clock trait and related generics to simplify the code while retaining deterministic tests. Here’s a way to get rid of almost all of the extra indirection and still keep determinism in your tests (they already use use tokio::time::Instant;
use crate::connection::FairnessConfig;
#[derive(Debug)]
pub(crate) struct FairnessTracker {
config: FairnessConfig,
high_counter: usize,
high_start: Option<Instant>,
}
impl FairnessTracker {
pub(crate) fn new(config: FairnessConfig) -> Self {
Self {
config,
high_counter: 0,
high_start: None,
}
}
pub(crate) fn set_config(&mut self, config: FairnessConfig) {
self.config = config;
self.reset();
}
pub(crate) fn after_high(&mut self) {
self.high_counter += 1;
if self.high_counter == 1 {
self.high_start = Some(Instant::now());
}
}
pub(crate) fn should_yield_to_low_priority(&self) -> bool {
let threshold_hit = self.config.max_high_before_low > 0
&& self.high_counter >= self.config.max_high_before_low;
let time_hit = self
.config
.time_slice
.zip(self.high_start)
.map_or(false, |(slice, start)| start.elapsed() >= slice);
threshold_hit || time_hit
}
pub(crate) fn after_low(&mut self) {
self.reset();
}
fn reset(&mut self) {
self.high_counter = 0;
self.high_start = None;
}
}What this buys you:
|
||||||||||||||||||
| pub(crate) trait Clock { | ||||||||||||||||||
| type Instant: Copy; | ||||||||||||||||||
| fn now(&self) -> Self::Instant; | ||||||||||||||||||
| fn elapsed(&self, start: Self::Instant) -> Duration; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| #[derive(Debug, Default)] | ||||||||||||||||||
| pub(crate) struct TokioClock; | ||||||||||||||||||
|
|
||||||||||||||||||
| impl Clock for TokioClock { | ||||||||||||||||||
| type Instant = tokio::time::Instant; | ||||||||||||||||||
| fn now(&self) -> Self::Instant { | ||||||||||||||||||
| tokio::time::Instant::now() | ||||||||||||||||||
| } | ||||||||||||||||||
| fn elapsed(&self, start: Self::Instant) -> Duration { | ||||||||||||||||||
| start.elapsed() | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+24
to
+29
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Format The pipeline indicates these methods should be formatted on single lines. Apply this diff to fix the formatting: - fn now(&self) -> Self::Instant {
- tokio::time::Instant::now()
- }
- fn elapsed(&self, start: Self::Instant) -> Duration {
- start.elapsed()
- }
+ fn now(&self) -> Self::Instant { tokio::time::Instant::now() }
+ fn elapsed(&self, start: Self::Instant) -> Duration { start.elapsed() }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| #[derive(Debug)] | ||||||||||||||||||
| pub(crate) struct Fairness { | ||||||||||||||||||
| pub(crate) struct FairnessTracker<C: Clock = TokioClock> { | ||||||||||||||||||
| config: FairnessConfig, | ||||||||||||||||||
| high_counter: usize, | ||||||||||||||||||
| high_start: Option<Instant>, | ||||||||||||||||||
| high_start: Option<C::Instant>, | ||||||||||||||||||
| clock: C, | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| impl Fairness { | ||||||||||||||||||
| impl FairnessTracker<TokioClock> { | ||||||||||||||||||
| pub(crate) fn new(config: FairnessConfig) -> Self { | ||||||||||||||||||
| Self::with_clock(config, TokioClock) | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
41
to
+43
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Format The pipeline indicates this method should be formatted on a single line. Apply this diff to fix the formatting: - pub(crate) fn new(config: FairnessConfig) -> Self {
- Self::with_clock(config, TokioClock)
- }
+ pub(crate) fn new(config: FairnessConfig) -> Self { Self::with_clock(config, TokioClock) }🤖 Prompt for AI Agents |
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| impl<C: Clock> FairnessTracker<C> { | ||||||||||||||||||
| pub(crate) fn with_clock(config: FairnessConfig, clock: C) -> Self { | ||||||||||||||||||
| Self { | ||||||||||||||||||
| config, | ||||||||||||||||||
| high_counter: 0, | ||||||||||||||||||
| high_start: None, | ||||||||||||||||||
| clock, | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| pub(crate) fn set_config(&mut self, config: FairnessConfig) { | ||||||||||||||||||
| self.config = config; | ||||||||||||||||||
| self.reset(); | ||||||||||||||||||
| self.clear(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| pub(crate) fn after_high(&mut self) { | ||||||||||||||||||
| self.high_counter += 1; | ||||||||||||||||||
| if self.high_counter == 1 { | ||||||||||||||||||
| self.high_start = Some(Instant::now()); | ||||||||||||||||||
| self.high_start = Some(self.clock.now()); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| pub(crate) fn should_yield(&self) -> bool { | ||||||||||||||||||
| let threshold_hit = self.config.max_high_before_low > 0 | ||||||||||||||||||
| && self.high_counter >= self.config.max_high_before_low; | ||||||||||||||||||
| let time_hit = self | ||||||||||||||||||
| .config | ||||||||||||||||||
| .time_slice | ||||||||||||||||||
| .zip(self.high_start) | ||||||||||||||||||
| .is_some_and(|(slice, start)| start.elapsed() >= slice); | ||||||||||||||||||
| threshold_hit || time_hit | ||||||||||||||||||
| pub(crate) fn should_yield_to_low_priority(&self) -> bool { | ||||||||||||||||||
| if self.config.max_high_before_low > 0 | ||||||||||||||||||
| && self.high_counter >= self.config.max_high_before_low | ||||||||||||||||||
| { | ||||||||||||||||||
| return true; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if let (Some(slice), Some(start)) = (self.config.time_slice, self.high_start) { | ||||||||||||||||||
| if self.clock.elapsed(start) >= slice { | ||||||||||||||||||
| return true; | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| false | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| pub(crate) fn after_low(&mut self) { self.reset(); } | ||||||||||||||||||
| pub(crate) fn after_low(&mut self) { | ||||||||||||||||||
| self.clear(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| pub(crate) fn reset(&mut self) { | ||||||||||||||||||
| self.clear(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+84
to
+90
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Format The pipeline indicates these methods should be formatted on single lines. Apply this diff to fix the formatting: - pub(crate) fn after_low(&mut self) {
- self.clear();
- }
+ pub(crate) fn after_low(&mut self) { self.clear(); }
- pub(crate) fn reset(&mut self) {
- self.clear();
- }
+ pub(crate) fn reset(&mut self) { self.clear(); }🧰 Tools🪛 GitHub Actions: CI[warning] 81-90: Prettier-like formatting warning: functions after_low and reset should be formatted in a single line. 🤖 Prompt for AI Agents |
||||||||||||||||||
|
|
||||||||||||||||||
| fn clear(&mut self) { | ||||||||||||||||||
| self.high_counter = 0; | ||||||||||||||||||
| self.high_start = None; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -69,11 +109,11 @@ | |||||||||||||||||
| max_high_before_low: 2, | ||||||||||||||||||
| time_slice: None, | ||||||||||||||||||
| }; | ||||||||||||||||||
| let mut fairness = Fairness::new(cfg); | ||||||||||||||||||
| let mut fairness = FairnessTracker::new(cfg); | ||||||||||||||||||
| fairness.after_high(); | ||||||||||||||||||
| assert!(!fairness.should_yield()); | ||||||||||||||||||
| assert!(!fairness.should_yield_to_low_priority()); | ||||||||||||||||||
| fairness.after_high(); | ||||||||||||||||||
| assert!(fairness.should_yield()); | ||||||||||||||||||
| assert!(fairness.should_yield_to_low_priority()); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| #[rstest] | ||||||||||||||||||
|
|
@@ -83,11 +123,11 @@ | |||||||||||||||||
| max_high_before_low: 1, | ||||||||||||||||||
| time_slice: None, | ||||||||||||||||||
| }; | ||||||||||||||||||
| let mut fairness = Fairness::new(cfg); | ||||||||||||||||||
| let mut fairness = FairnessTracker::new(cfg); | ||||||||||||||||||
| fairness.after_high(); | ||||||||||||||||||
| assert!(fairness.should_yield()); | ||||||||||||||||||
| assert!(fairness.should_yield_to_low_priority()); | ||||||||||||||||||
| fairness.after_low(); | ||||||||||||||||||
| assert!(!fairness.should_yield()); | ||||||||||||||||||
| assert!(!fairness.should_yield_to_low_priority()); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| #[rstest] | ||||||||||||||||||
|
|
@@ -98,9 +138,9 @@ | |||||||||||||||||
| max_high_before_low: 0, | ||||||||||||||||||
| time_slice: Some(Duration::from_millis(5)), | ||||||||||||||||||
| }; | ||||||||||||||||||
| let mut fairness = Fairness::new(cfg); | ||||||||||||||||||
| let mut fairness = FairnessTracker::new(cfg); | ||||||||||||||||||
| fairness.after_high(); | ||||||||||||||||||
| time::advance(Duration::from_millis(6)).await; | ||||||||||||||||||
| assert!(fairness.should_yield()); | ||||||||||||||||||
| assert!(fairness.should_yield_to_low_priority()); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -97,7 +97,9 @@ pub(crate) struct PushHandleInner<F> { | |||||||||
| pub struct PushHandle<F>(Arc<PushHandleInner<F>>); | ||||||||||
|
|
||||||||||
| impl<F: FrameLike> PushHandle<F> { | ||||||||||
| pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self { Self(arc) } | ||||||||||
| pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self { | ||||||||||
| Self(arc) | ||||||||||
| } | ||||||||||
|
Comment on lines
+100
to
+102
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Format The pipeline indicates this function should be formatted on a single line. Apply this diff to fix the formatting: - pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self {
- Self(arc)
- }
+ pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self { Self(arc) }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
|
|
||||||||||
| /// Internal helper to push a frame with the requested priority. | ||||||||||
| /// | ||||||||||
|
|
@@ -253,7 +255,9 @@ impl<F: FrameLike> PushHandle<F> { | |||||||||
| } | ||||||||||
|
|
||||||||||
| /// Downgrade to a `Weak` reference for storage in a registry. | ||||||||||
| pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> { Arc::downgrade(&self.0) } | ||||||||||
| pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> { | ||||||||||
| Arc::downgrade(&self.0) | ||||||||||
| } | ||||||||||
|
Comment on lines
+258
to
+260
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Format The pipeline indicates this function should be formatted on a single line. Apply this diff to fix the formatting: - pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> {
- Arc::downgrade(&self.0)
- }
+ pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> { Arc::downgrade(&self.0) }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Receiver ends of the push queues stored by the connection actor. | ||||||||||
|
|
@@ -382,10 +386,10 @@ impl<F: FrameLike> PushQueues<F> { | |||||||||
| rate: Option<usize>, | ||||||||||
| dlq: Option<mpsc::Sender<F>>, | ||||||||||
| ) -> Result<(Self, PushHandle<F>), PushConfigError> { | ||||||||||
| if let Some(r) = rate | ||||||||||
| && (r == 0 || r > MAX_PUSH_RATE) | ||||||||||
| { | ||||||||||
| return Err(PushConfigError::InvalidRate(r)); | ||||||||||
| if let Some(r) = rate { | ||||||||||
| if r == 0 || r > MAX_PUSH_RATE { | ||||||||||
| return Err(PushConfigError::InvalidRate(r)); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| let (high_tx, high_rx) = mpsc::channel(high_capacity); | ||||||||||
| let (low_tx, low_rx) = mpsc::channel(low_capacity); | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -229,7 +229,9 @@ where | |||||||||
| /// ``` | ||||||||||
| #[inline] | ||||||||||
| #[must_use] | ||||||||||
| pub const fn worker_count(&self) -> usize { self.workers } | ||||||||||
| pub const fn worker_count(&self) -> usize { | ||||||||||
| self.workers | ||||||||||
| } | ||||||||||
|
Comment on lines
+232
to
+234
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Format The pipeline indicates this method should be formatted on a single line. Apply this diff to fix the formatting: - pub const fn worker_count(&self) -> usize {
- self.workers
- }
+ pub const fn worker_count(&self) -> usize { self.workers }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
|
|
||||||||||
| /// Get the socket address the server is bound to, if available. | ||||||||||
| #[must_use] | ||||||||||
|
|
@@ -507,10 +509,10 @@ async fn process_stream<F, T>( | |||||||||
| let peer_addr = stream.peer_addr().ok(); | ||||||||||
| match read_preamble::<_, T>(&mut stream).await { | ||||||||||
| Ok((preamble, leftover)) => { | ||||||||||
| if let Some(handler) = on_success.as_ref() | ||||||||||
| && let Err(e) = handler(&preamble, &mut stream).await | ||||||||||
| { | ||||||||||
| tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); | ||||||||||
| if let Some(handler) = on_success.as_ref() { | ||||||||||
| if let Err(e) = handler(&preamble, &mut stream).await { | ||||||||||
| tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| let stream = RewindStream::new(leftover, stream); | ||||||||||
| // Hand the connection to the application for processing. | ||||||||||
|
|
@@ -558,7 +560,7 @@ mod tests { | |||||||||
|
|
||||||||||
| /// Test helper preamble carrying no data. | ||||||||||
| #[derive(Debug, Clone, PartialEq, Encode, Decode)] | ||||||||||
| #[expect( | ||||||||||
| #[allow( | ||||||||||
| dead_code, | ||||||||||
| reason = "used only in doctest to illustrate an empty preamble" | ||||||||||
| )] | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix multiple single-line function formatting issues.
The pipeline indicates these functions should all be formatted on a single line.
Apply these diffs to fix the formatting:
Also applies to: 192-194, 198-200, 423-425, 462-464, 468-470, 553-555, 557-559, 562-564
🧰 Tools
🪛 GitHub Actions: CI
[warning] 188-188: Prettier-like formatting warning: function set_response should be formatted in a single line.
🤖 Prompt for AI Agents