From 710b2d69fd479297d61202aacf33e964b4b98952 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 31 Jul 2025 22:31:59 +0100 Subject: [PATCH 1/3] Refactor queue processing --- src/connection.rs | 47 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 6e8c737e..cd47be1b 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -65,6 +65,12 @@ enum Event { Idle, } +#[derive(Clone, Copy)] +enum Queue { + High, + Low, +} + /// Configuration controlling fairness when draining push queues. #[derive(Clone, Copy)] pub struct FairnessConfig { @@ -302,22 +308,39 @@ where /// Handle the result of polling the high-priority queue. fn process_high(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - if let Some(frame) = res { - self.process_frame_common(frame, out); - self.after_high(out, state); - } else { - Self::handle_closed_receiver(&mut self.high_rx, state); - self.reset_high_counter(); - } + self.process_push(res, Queue::High, state, out); } /// Handle the result of polling the low-priority queue. fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - if let Some(frame) = res { - self.process_frame_common(frame, out); - self.after_low(); - } else { - Self::handle_closed_receiver(&mut self.low_rx, state); + self.process_push(res, Queue::Low, state, out); + } + + /// Handle the result of polling a push queue. + fn process_push( + &mut self, + res: Option, + queue: Queue, + state: &mut ActorState, + out: &mut Vec, + ) { + match res { + Some(frame) => { + self.process_frame_common(frame, out); + match queue { + Queue::High => self.after_high(out, state), + Queue::Low => self.after_low(), + } + } + None => match queue { + Queue::High => { + Self::handle_closed_receiver(&mut self.high_rx, state); + self.reset_high_counter(); + } + Queue::Low => { + Self::handle_closed_receiver(&mut self.low_rx, state); + } + }, } } From ba5da2e0d8fdd7af72cc4ed4319ed4e35f4d6621 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 02:18:07 +0100 Subject: [PATCH 2/3] Refine queue processing with callbacks --- examples/metadata_routing.rs | 2 +- src/connection.rs | 138 +++++++++++++++++++++-------------- src/push.rs | 16 ++-- src/server.rs | 14 ++-- 4 files changed, 102 insertions(+), 68 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index bc326c51..197f37c8 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,7 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[expect(dead_code, reason = "used only in documentation example")] +#[allow(dead_code)] // used only in documentation example struct Pong; #[tokio::main] diff --git a/src/connection.rs b/src/connection.rs index cd47be1b..6ed38cdb 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -43,7 +43,9 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } +pub fn active_connection_count() -> u64 { + ACTIVE_CONNECTIONS.load(Ordering::Relaxed) +} use crate::{ hooks::{ConnectionContext, ProtocolHooks}, @@ -65,10 +67,16 @@ enum Event { Idle, } -#[derive(Clone, Copy)] -enum Queue { - High, - Low, +/// Context for processing frames during actor execution. +struct ProcessContext<'a, F> { + state: &'a mut ActorState, + out: &'a mut Vec, +} + +impl<'a, F> ProcessContext<'a, F> { + fn new(state: &'a mut ActorState, out: &'a mut Vec) -> Self { + Self { state, out } + } } /// Configuration controlling fairness when draining push queues. @@ -194,14 +202,20 @@ where } /// Replace the fairness configuration. - pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; } + pub fn set_fairness(&mut self, fairness: FairnessConfig) { + self.fairness = fairness; + } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + pub fn set_response(&mut self, stream: Option>) { + self.response = stream; + } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown.clone() + } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -289,11 +303,13 @@ where state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { - match self.next_event(state).await { - Event::Shutdown => self.process_shutdown(state), - Event::High(res) => self.process_high(res, state, out), - Event::Low(res) => self.process_low(res, state, out), - Event::Response(res) => self.process_response(res, state, out)?, + let mut ctx = ProcessContext::new(state, out); + + match self.next_event(ctx.state).await { + Event::Shutdown => self.process_shutdown(ctx.state), + Event::High(res) => self.process_high(res, &mut ctx), + Event::Low(res) => self.process_low(res, &mut ctx), + Event::Response(res) => self.process_response(res, ctx.state, ctx.out)?, Event::Idle => {} } @@ -307,40 +323,40 @@ where } /// Handle the result of polling the high-priority queue. - fn process_high(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - self.process_push(res, Queue::High, state, out); + fn process_high(&mut self, res: Option, ctx: &mut ProcessContext) { + self.process_push(res, ctx, Self::after_high, |this, state| { + Self::handle_closed_receiver(&mut this.high_rx, state); + this.reset_high_counter(); + }); } /// Handle the result of polling the low-priority queue. - fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - self.process_push(res, Queue::Low, state, out); + fn process_low(&mut self, res: Option, ctx: &mut ProcessContext) { + self.process_push( + res, + ctx, + |this, _, _| this.after_low(), + |this, state| Self::handle_closed_receiver(&mut this.low_rx, state), + ); } /// Handle the result of polling a push queue. - fn process_push( + fn process_push( &mut self, res: Option, - queue: Queue, - state: &mut ActorState, - out: &mut Vec, - ) { + ctx: &mut ProcessContext, + on_some: OnSome, + on_none: OnNone, + ) where + OnSome: FnOnce(&mut Self, &mut Vec, &mut ActorState), + OnNone: FnOnce(&mut Self, &mut ActorState), + { match res { Some(frame) => { - self.process_frame_common(frame, out); - match queue { - Queue::High => self.after_high(out, state), - Queue::Low => self.after_low(), - } + self.process_frame_common(frame, ctx.out); + on_some(self, ctx.out, ctx.state); } - None => match queue { - Queue::High => { - Self::handle_closed_receiver(&mut self.high_rx, state); - self.reset_high_counter(); - } - Queue::Low => { - Self::handle_closed_receiver(&mut self.low_rx, state); - } - }, + None => on_none(self, ctx.state), } } @@ -397,19 +413,19 @@ where self.high_start = Some(Instant::now()); } - if self.should_yield_to_low_priority() - && let Some(rx) = &mut self.low_rx - { - match rx.try_recv() { - Ok(mut frame) => { - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - self.after_low(); - } - Err(mpsc::error::TryRecvError::Empty) => {} - Err(mpsc::error::TryRecvError::Disconnected) => { - self.low_rx = None; - state.mark_closed(); + if self.should_yield_to_low_priority() { + if let Some(rx) = &mut self.low_rx { + match rx.try_recv() { + Ok(mut frame) => { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + self.after_low(); + } + Err(mpsc::error::TryRecvError::Empty) => {} + Err(mpsc::error::TryRecvError::Disconnected) => { + self.low_rx = None; + state.mark_closed(); + } } } } @@ -428,7 +444,9 @@ where } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { self.reset_high_counter(); } + fn after_low(&mut self) { + self.reset_high_counter(); + } /// Clear the burst counter and associated timestamp. fn reset_high_counter(&mut self) { @@ -471,11 +489,15 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + async fn wait_shutdown(token: CancellationToken) { + token.cancelled_owned().await; + } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { + rx.recv().await + } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -558,11 +580,17 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + fn is_active(&self) -> bool { + matches!(self.run_state, RunState::Active) + } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } + fn is_shutting_down(&self) -> bool { + matches!(self.run_state, RunState::ShuttingDown) + } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } + fn is_done(&self) -> bool { + matches!(self.run_state, RunState::Finished) + } } diff --git a/src/push.rs b/src/push.rs index 01103745..eefce836 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,7 +97,9 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + pub(crate) fn from_arc(arc: Arc>) -> Self { + Self(arc) + } /// Internal helper to push a frame with the requested priority. /// @@ -253,7 +255,9 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } + pub(crate) fn downgrade(&self) -> Weak> { + Arc::downgrade(&self.0) + } } /// Receiver ends of the push queues stored by the connection actor. @@ -382,10 +386,10 @@ impl PushQueues { rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate - && (r == 0 || r > MAX_PUSH_RATE) - { - return Err(PushConfigError::InvalidRate(r)); + if let Some(r) = rate { + if r == 0 || r > MAX_PUSH_RATE { + return Err(PushConfigError::InvalidRate(r)); + } } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); diff --git a/src/server.rs b/src/server.rs index f1592a19..43bd94a1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] @@ -469,10 +471,10 @@ async fn process_stream( { match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() - && let Err(e) = handler(&preamble, &mut stream).await - { - eprintln!("preamble callback error: {e}"); + if let Some(handler) = on_success.as_ref() { + if let Err(e) = handler(&preamble, &mut stream).await { + eprintln!("preamble callback error: {e}"); + } } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing. @@ -520,7 +522,7 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[expect(dead_code, reason = "test helper for unused preamble type")] + #[allow(dead_code)] // test helper for unused preamble type struct EmptyPreamble; #[fixture] From 44e5c2cce67caa2b8c768bb298db6c1d60370d02 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 14:19:07 +0100 Subject: [PATCH 3/3] Document allow reasons --- examples/metadata_routing.rs | 2 +- examples/ping_pong.rs | 5 ++- src/middleware.rs | 62 ++++++++++++++++++++++++-------- src/server.rs | 2 +- wireframe_testing/src/helpers.rs | 6 ++-- 5 files changed, 57 insertions(+), 20 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 197f37c8..03f2c03a 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,7 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[allow(dead_code)] // used only in documentation example +#[allow(dead_code, reason = "used only in documentation example")] struct Pong; #[tokio::main] diff --git a/examples/ping_pong.rs b/examples/ping_pong.rs index edde533e..777667fe 100644 --- a/examples/ping_pong.rs +++ b/examples/ping_pong.rs @@ -40,7 +40,10 @@ const PING_ID: u32 = 1; /// /// The middleware chain generates the actual response, so this /// handler intentionally performs no work. -#[allow(clippy::unused_async)] +#[allow( + clippy::unused_async, + reason = "example handler intentionally performs no work" +)] async fn ping_handler() {} struct PongMiddleware; diff --git a/src/middleware.rs b/src/middleware.rs index 2b41eb01..6d35a24c 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -17,19 +17,27 @@ pub struct FrameContainer { impl FrameContainer { /// Create a new container holding `frame` bytes. #[must_use] - pub fn new(frame: F) -> Self { Self { frame } } + pub fn new(frame: F) -> Self { + Self { frame } + } /// Borrow the inner frame data. #[must_use] - pub fn frame(&self) -> &F { &self.frame } + pub fn frame(&self) -> &F { + &self.frame + } /// Mutable access to the frame data. #[must_use] - pub fn frame_mut(&mut self) -> &mut F { &mut self.frame } + pub fn frame_mut(&mut self) -> &mut F { + &mut self.frame + } /// Consume the container, returning the frame. #[must_use] - pub fn into_inner(self) -> F { self.frame } + pub fn into_inner(self) -> F { + self.frame + } } /// Incoming request wrapper passed through middleware. @@ -49,15 +57,21 @@ impl ServiceRequest { /// Borrow the underlying frame bytes. #[must_use] - pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() } + pub fn frame(&self) -> &[u8] { + self.inner.frame().as_slice() + } /// Mutable access to the inner frame bytes. #[must_use] - pub fn frame_mut(&mut self) -> &mut Vec { self.inner.frame_mut() } + pub fn frame_mut(&mut self) -> &mut Vec { + self.inner.frame_mut() + } /// Consume the request, returning the inner frame bytes. #[must_use] - pub fn into_inner(self) -> Vec { self.inner.into_inner() } + pub fn into_inner(self) -> Vec { + self.inner.into_inner() + } } /// Response produced by a handler or middleware. @@ -77,15 +91,21 @@ impl ServiceResponse { /// Borrow the inner frame bytes. #[must_use] - pub fn frame(&self) -> &[u8] { self.inner.frame().as_slice() } + pub fn frame(&self) -> &[u8] { + self.inner.frame().as_slice() + } /// Mutable access to the response frame bytes. #[must_use] - pub fn frame_mut(&mut self) -> &mut Vec { self.inner.frame_mut() } + pub fn frame_mut(&mut self) -> &mut Vec { + self.inner.frame_mut() + } /// Consume the response, yielding the raw frame bytes. #[must_use] - pub fn into_inner(self) -> Vec { self.inner.into_inner() } + pub fn into_inner(self) -> Vec { + self.inner.into_inner() + } } /// Continuation used by middleware to call the next service in the chain. @@ -120,7 +140,9 @@ where /// ``` #[inline] #[must_use] - pub fn new(service: &'a S) -> Self { Self { service } } + pub fn new(service: &'a S) -> Self { + Self { service } + } /// Call the next service with the provided request. /// @@ -157,7 +179,11 @@ where /// Create a new middleware service wrapping `service`. #[inline] - #[allow(clippy::inline_fn_without_body, unused_attributes)] + #[allow( + clippy::inline_fn_without_body, + unused_attributes, + reason = "future-proof attribute and inline hint without body" + )] #[must_use = "use the returned middleware service"] async fn transform(&self, service: S) -> Self::Output; } @@ -173,7 +199,9 @@ pub struct FromFn { impl FromFn { /// Construct middleware from the provided asynchronous function. - pub fn new(f: F) -> Self { Self { f } } + pub fn new(f: F) -> Self { + Self { f } + } } /// Convenience constructor to build middleware from an async function. @@ -202,7 +230,9 @@ impl FromFn { /// # } /// let mw = from_fn(logging); /// ``` -pub fn from_fn(f: F) -> FromFn { FromFn::new(f) } +pub fn from_fn(f: F) -> FromFn { + FromFn::new(f) +} /// Service wrapper that applies a middleware function to requests. /// @@ -283,7 +313,9 @@ impl HandlerService { /// Returns the route identifier associated with this service. #[must_use] - pub const fn id(&self) -> u32 { self.id } + pub const fn id(&self) -> u32 { + self.id + } } struct RouteService { diff --git a/src/server.rs b/src/server.rs index 43bd94a1..7021ef3b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -522,7 +522,7 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[allow(dead_code)] // test helper for unused preamble type + #[allow(dead_code, reason = "test helper for unused preamble type")] struct EmptyPreamble; #[fixture] diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index a42ba0bf..156d2a0e 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -19,7 +19,9 @@ use wireframe::{ unused_braces, reason = "Clippy is wrong here; this is not a redundant block" )] -pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() } +pub fn processor() -> LengthPrefixedProcessor { + LengthPrefixedProcessor::default() +} pub trait TestSerializer: Serializer + FrameMetadata + Send + Sync + 'static @@ -234,7 +236,7 @@ where /// /// Returns any I/O errors encountered while interacting with the in-memory /// duplex stream. -#[allow(dead_code)] +#[allow(dead_code, reason = "unused outside integration tests")] pub async fn run_app_with_frames( app: WireframeApp, frames: Vec>,