From b08243377e5876e3062359189c131d9170c405bc Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 15:10:20 +0100 Subject: [PATCH 1/2] Handle missing TryRecvError and fix test message --- examples/metadata_routing.rs | 5 +-- src/connection.rs | 66 +++++++++++++++++++++++++----------- src/server.rs | 17 +++++----- tests/response.rs | 2 +- 4 files changed, 57 insertions(+), 33 deletions(-) diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index eb53e3d0..9d2e7ead 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,10 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[expect( - dead_code, - reason = "placeholder for demonstration of metadata routing" -)] +// Placeholder for demonstration of metadata routing; not used directly. struct Pong; #[tokio::main] diff --git a/src/connection.rs b/src/connection.rs index 55f7b7eb..40f498de 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -12,7 +12,10 @@ use std::{ }; use futures::StreamExt; -use tokio::{sync::mpsc, time::Duration}; +use tokio::{ + sync::mpsc::{self, error::TryRecvError}, + time::Duration, +}; use tokio_util::sync::CancellationToken; use tracing::{info, info_span, warn}; @@ -40,7 +43,9 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } +pub fn active_connection_count() -> u64 { + ACTIVE_CONNECTIONS.load(Ordering::Relaxed) +} use crate::{ fairness::Fairness, @@ -182,14 +187,20 @@ where } /// Replace the fairness configuration. - pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } + pub fn set_fairness(&mut self, fairness: FairnessConfig) { + self.fairness.set_config(fairness); + } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { self.response = stream; } + pub fn set_response(&mut self, stream: Option>) { + self.response = stream; + } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } + pub fn shutdown_token(&self) -> CancellationToken { + self.shutdown.clone() + } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -393,21 +404,28 @@ where fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { self.fairness.after_high(); - if self.fairness.should_yield() - && let Some(rx) = &mut self.low_rx - { - match rx.try_recv() { - Ok(mut frame) => { - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - self.after_low(); + if self.fairness.should_yield() { + let res = self.low_rx.as_mut().map(mpsc::Receiver::try_recv); + if let Some(res) = res { + match res { + Ok(mut frame) => { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + self.after_low(); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => { + Self::handle_closed_receiver(&mut self.low_rx, state); + } } } } } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { self.fairness.after_low(); } + fn after_low(&mut self) { + self.fairness.after_low(); + } /// Push a frame from the response stream into `out` or handle completion. /// @@ -444,11 +462,15 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + async fn wait_shutdown(token: CancellationToken) { + token.cancelled_owned().await; + } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { + rx.recv().await + } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -531,11 +553,17 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + fn is_active(&self) -> bool { + matches!(self.run_state, RunState::Active) + } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } + fn is_shutting_down(&self) -> bool { + matches!(self.run_state, RunState::ShuttingDown) + } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } + fn is_done(&self) -> bool { + matches!(self.run_state, RunState::Finished) + } } diff --git a/src/server.rs b/src/server.rs index 572711f1..2d04b671 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,7 +229,9 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { self.workers } + pub const fn worker_count(&self) -> usize { + self.workers + } /// Get the socket address the server is bound to, if available. #[must_use] @@ -507,10 +509,10 @@ async fn process_stream( let peer_addr = stream.peer_addr().ok(); match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() - && let Err(e) = handler(&preamble, &mut stream).await - { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + if let Some(handler) = on_success.as_ref() { + if let Err(e) = handler(&preamble, &mut stream).await { + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + } } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing. @@ -558,10 +560,7 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[expect( - dead_code, - reason = "used only in doctest to illustrate an empty preamble" - )] + // Used only in doctest to illustrate an empty preamble. struct EmptyPreamble; #[fixture] diff --git a/tests/response.rs b/tests/response.rs index 684ba693..20a5857c 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -132,7 +132,7 @@ fn custom_length_roundtrip( #[tokio::test] async fn send_response_propagates_write_error() { let app = WireframeApp::new() - .expect("route registration failed") + .expect("app creation failed") .frame_processor(LengthPrefixedProcessor::default()); let mut writer = FailingWriter; From ce04d1ad49aeac0b6d0d8836719793f99c881258 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 15:11:47 +0100 Subject: [PATCH 2/2] Apply formatting --- src/connection.rs | 40 ++++++++++------------------------------ src/server.rs | 4 +--- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 40f498de..ae5c3b8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -43,9 +43,7 @@ impl Drop for ActiveConnection { /// Return the current number of active connections. #[must_use] -pub fn active_connection_count() -> u64 { - ACTIVE_CONNECTIONS.load(Ordering::Relaxed) -} +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ fairness::Fairness, @@ -187,20 +185,14 @@ where } /// Replace the fairness configuration. - pub fn set_fairness(&mut self, fairness: FairnessConfig) { - self.fairness.set_config(fairness); - } + pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness.set_config(fairness); } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { - self.response = stream; - } + pub fn set_response(&mut self, stream: Option>) { self.response = stream; } /// Get a clone of the shutdown token used by the actor. #[must_use] - pub fn shutdown_token(&self) -> CancellationToken { - self.shutdown.clone() - } + pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } /// Drive the actor until all sources are exhausted or shutdown is triggered. /// @@ -423,9 +415,7 @@ where } /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { - self.fairness.after_low(); - } + fn after_low(&mut self) { self.fairness.after_low(); } /// Push a frame from the response stream into `out` or handle completion. /// @@ -462,15 +452,11 @@ where /// Await cancellation on the provided shutdown token. #[inline] - async fn wait_shutdown(token: CancellationToken) { - token.cancelled_owned().await; - } + async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } /// Receive the next frame from a push queue. #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { - rx.recv().await - } + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } /// Poll `f` if `opt` is `Some`, returning `None` otherwise. #[expect( @@ -553,17 +539,11 @@ impl ActorState { } /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { - matches!(self.run_state, RunState::Active) - } + fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { - matches!(self.run_state, RunState::ShuttingDown) - } + fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { - matches!(self.run_state, RunState::Finished) - } + fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } } diff --git a/src/server.rs b/src/server.rs index 2d04b671..b34942d0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -229,9 +229,7 @@ where /// ``` #[inline] #[must_use] - pub const fn worker_count(&self) -> usize { - self.workers - } + pub const fn worker_count(&self) -> usize { self.workers } /// Get the socket address the server is bound to, if available. #[must_use]