diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index eb53e3d0..9d2e7ead 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -60,10 +60,7 @@ impl FrameMetadata for HeaderSerializer { struct Ping; #[derive(bincode::Decode, bincode::Encode)] -#[expect( - dead_code, - reason = "placeholder for demonstration of metadata routing" -)] +// Placeholder for demonstration of metadata routing; not used directly. struct Pong; #[tokio::main] diff --git a/src/connection.rs b/src/connection.rs index 55f7b7eb..ae5c3b8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -12,7 +12,10 @@ use std::{ }; use futures::StreamExt; -use tokio::{sync::mpsc, time::Duration}; +use tokio::{ + sync::mpsc::{self, error::TryRecvError}, + time::Duration, +}; use tokio_util::sync::CancellationToken; use tracing::{info, info_span, warn}; @@ -393,14 +396,19 @@ where fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { self.fairness.after_high(); - if self.fairness.should_yield() - && let Some(rx) = &mut self.low_rx - { - match rx.try_recv() { - Ok(mut frame) => { - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - self.after_low(); + if self.fairness.should_yield() { + let res = self.low_rx.as_mut().map(mpsc::Receiver::try_recv); + if let Some(res) = res { + match res { + Ok(mut frame) => { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + self.after_low(); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => { + Self::handle_closed_receiver(&mut self.low_rx, state); + } } } } diff --git a/src/server.rs b/src/server.rs index 572711f1..b34942d0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -507,10 +507,10 @@ async fn process_stream( let peer_addr = stream.peer_addr().ok(); match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => { - if let Some(handler) = on_success.as_ref() - && let Err(e) = handler(&preamble, &mut stream).await - { - tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + if let Some(handler) = on_success.as_ref() { + if let Err(e) = handler(&preamble, &mut stream).await { + tracing::error!(error = ?e, ?peer_addr, "preamble callback error"); + } } let stream = RewindStream::new(leftover, stream); // Hand the connection to the application for processing. @@ -558,10 +558,7 @@ mod tests { /// Test helper preamble carrying no data. #[derive(Debug, Clone, PartialEq, Encode, Decode)] - #[expect( - dead_code, - reason = "used only in doctest to illustrate an empty preamble" - )] + // Used only in doctest to illustrate an empty preamble. struct EmptyPreamble; #[fixture] diff --git a/tests/response.rs b/tests/response.rs index 684ba693..20a5857c 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -132,7 +132,7 @@ fn custom_length_roundtrip( #[tokio::test] async fn send_response_propagates_write_error() { let app = WireframeApp::new() - .expect("route registration failed") + .expect("app creation failed") .frame_processor(LengthPrefixedProcessor::default()); let mut writer = FailingWriter;