From da75986f0d4b378edef14d88ac4f2113de6169db Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 15:57:37 +0100 Subject: [PATCH] Collapse nested conditionals --- src/push.rs | 16 ++++++++++------ src/server.rs | 13 ++++++++++--- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/push.rs b/src/push.rs index 491a3dfc..6712da33 100644 --- a/src/push.rs +++ b/src/push.rs @@ -97,7 +97,9 @@ pub(crate) struct PushHandleInner { pub struct PushHandle(Arc>); impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + pub(crate) fn from_arc(arc: Arc>) -> Self { + Self(arc) + } /// Internal helper to push a frame with the requested priority. /// @@ -253,7 +255,9 @@ impl PushHandle { } /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } + pub(crate) fn downgrade(&self) -> Weak> { + Arc::downgrade(&self.0) + } } /// Receiver ends of the push queues stored by the connection actor. @@ -387,10 +391,10 @@ impl PushQueues { rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate { - if r == 0 || r > MAX_PUSH_RATE { - return Err(PushConfigError::InvalidRate(r)); - } + if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { + // Reject unsupported rates early to avoid building queues that cannot + // be used. The bounds prevent runaway resource consumption. + return Err(PushConfigError::InvalidRate(r)); } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); diff --git a/src/server.rs b/src/server.rs index b34942d0..bb983b1f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] @@ -508,8 +510,13 @@ async fn process_stream( match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { if let Some(handler) = on_success.as_ref() { - if let Err(e) = handler(&preamble, &mut stream).await { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + match handler(&preamble, &mut stream).await { + Ok(()) => {} + Err(e) => { + // Log and continue processing if the callback fails; connection + // handling should not halt due to diagnostic hooks. + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + } } } let stream = RewindStream::new(leftover, stream);