From 553afc751e129486cc80a0305749c3f1485def35 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 26 Jun 2025 00:14:54 +0100 Subject: [PATCH 1/3] Refactor connection actor polling --- src/connection.rs | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 03f2e410..133ad0cf 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -6,6 +6,7 @@ //! low-priority ones, with streamed responses handled last. use futures::StreamExt; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use crate::{ @@ -98,27 +99,21 @@ where tokio::select! { biased; - () = self.shutdown.cancelled(), if !state.shutting_down => { + () = Self::wait_shutdown(&self.shutdown), if !state.shutting_down => { state.shutting_down = true; self.start_shutdown(&mut state.resp_closed); } - res = self.queues.high_priority_rx.recv(), if !state.push.high => { - self.handle_push(res, &mut state.push.high, out); + res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => { + Self::handle_push(res, &mut state.push.high, out); } - res = self.queues.low_priority_rx.recv(), if !state.push.low => { - self.handle_push(res, &mut state.push.low, out); + res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => { + Self::handle_push(res, &mut state.push.low, out); } - res = async { - if let Some(stream) = &mut self.response { - stream.next().await - } else { - None - } - }, if !state.shutting_down && !state.resp_closed => { - self.handle_response(res, &mut state.resp_closed, out)?; + res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { + Self::handle_response(res, &mut state.resp_closed, out)?; } } @@ -164,6 +159,20 @@ where Ok(()) } + + async fn wait_shutdown(token: &CancellationToken) { token.cancelled().await; } + + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + + async fn next_response( + stream: &mut Option>, + ) -> Option>> { + if let Some(s) = stream { + s.next().await + } else { + None + } + } } struct PushClosed { From 4225cc09ab21800b0c981a087599c34661be8b82 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 26 Jun 2025 19:21:42 +0100 Subject: [PATCH 2/3] Clone token for shutdown poll --- src/connection.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 133ad0cf..45757586 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -99,7 +99,7 @@ where tokio::select! { biased; - () = Self::wait_shutdown(&self.shutdown), if !state.shutting_down => { + () = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => { state.shutting_down = true; self.start_shutdown(&mut state.resp_closed); } @@ -114,6 +114,9 @@ where res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { Self::handle_response(res, &mut state.resp_closed, out)?; + if state.resp_closed { + self.response = None; + } } } @@ -160,10 +163,13 @@ where Ok(()) } - async fn wait_shutdown(token: &CancellationToken) { token.cancelled().await; } + #[inline] + async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + #[inline] async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + #[inline] async fn next_response( stream: &mut Option>, ) -> Option>> { From 211faf3b7356971477075e2fa5c1743f5cac959d Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 26 Jun 2025 20:07:43 +0100 Subject: [PATCH 3/3] Fix method calls and document helpers (#141) --- src/connection.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 45757586..a5e0d75d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -105,15 +105,15 @@ where } res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => { - Self::handle_push(res, &mut state.push.high, out); + self.handle_push(res, &mut state.push.high, out); } res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => { - Self::handle_push(res, &mut state.push.low, out); + self.handle_push(res, &mut state.push.low, out); } res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { - Self::handle_response(res, &mut state.resp_closed, out)?; + self.handle_response(res, &mut state.resp_closed, out)?; if state.resp_closed { self.response = None; } @@ -163,12 +163,15 @@ where Ok(()) } + /// Await cancellation on the provided shutdown token. #[inline] async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + /// Receive the next frame from a push queue. #[inline] async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + /// Poll the current streaming response for the next frame. #[inline] async fn next_response( stream: &mut Option>,