diff --git a/src/connection.rs b/src/connection.rs index e24ab889..b80f6be3 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -43,9 +43,7 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { - ACTIVE_CONNECTIONS.load(Ordering::Relaxed) -} +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ hooks::{ConnectionContext, ProtocolHooks}, @@ -190,20 +188,14 @@ where } /// Replace the fairness configuration. - pub fn set_fairness(&mut self, fairness: FairnessConfig) { - self.fairness = fairness; - } + pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { - self.response = stream; - } + pub fn set_response(&mut self, stream: Option>) { self.response = stream; } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { - self.shutdown.clone() - } + pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -410,19 +402,19 @@ where self.high_start = Some(Instant::now()); } - if self.should_yield_to_low_priority() { - if let Some(rx) = &mut self.low_rx { - match rx.try_recv() { - Ok(mut frame) => { - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - self.after_low(); - } - Err(mpsc::error::TryRecvError::Empty) => {} - Err(mpsc::error::TryRecvError::Disconnected) => { - self.low_rx = None; - state.mark_closed(); - } + if self.should_yield_to_low_priority() + && let Some(rx) = &mut self.low_rx + { + match rx.try_recv() { + Ok(mut frame) => { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + self.after_low(); + } + Err(mpsc::error::TryRecvError::Empty) => {} + Err(mpsc::error::TryRecvError::Disconnected) => { + self.low_rx = None; + state.mark_closed(); } } } @@ -441,9 +433,7 @@ where } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { - self.reset_high_counter(); - } + fn after_low(&mut self) { self.reset_high_counter(); } /// Clear the burst counter and associated timestamp. fn reset_high_counter(&mut self) { @@ -486,15 +476,11 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { - token.cancelled_owned().await; - } + async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { - rx.recv().await - } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -577,17 +563,11 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { - matches!(self.run_state, RunState::Active) - } + fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { - matches!(self.run_state, RunState::ShuttingDown) - } + fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { - matches!(self.run_state, RunState::Finished) - } + fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } } diff --git a/src/push.rs b/src/push.rs index eefce836..01103745 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,9 +97,7 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { - Self(arc) - } + pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } /// Internal helper to push a frame with the requested priority. /// @@ -255,9 +253,7 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { - Arc::downgrade(&self.0) - } + pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } } /// Receiver ends of the push queues stored by the connection actor. @@ -386,10 +382,10 @@ impl PushQueues { rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate { - if r == 0 || r > MAX_PUSH_RATE { - return Err(PushConfigError::InvalidRate(r)); - } + if let Some(r) = rate + && (r == 0 || r > MAX_PUSH_RATE) + { + return Err(PushConfigError::InvalidRate(r)); } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); diff --git a/src/server.rs b/src/server.rs index e5c75680..689fbe22 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,9 +229,7 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { - self.workers - } + pub const fn worker_count(&self) -> usize { self.workers } /// Get the socket address the server is bound to, if available. #[must_use] @@ -509,10 +507,10 @@ async fn process_stream( let peer_addr = stream.peer_addr().ok(); match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() { - if let Err(e) = handler(&preamble, &mut stream).await { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); - } + if let Some(handler) = on_success.as_ref() + && let Err(e) = handler(&preamble, &mut stream).await + { + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing.