From e18a44a9494ee30fe5f58dfd276d70ac22002f02 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 1 Aug 2025 00:14:39 +0100 Subject: [PATCH 1/3] Refactor connection event loop Extract polling helpers for shutdown and push queues to reduce complexity. --- src/connection.rs | 41 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index ae5c3b8d..30f22390 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -249,13 +249,13 @@ where tokio::select! { biased; - event = Self::handle_shutdown(self.shutdown.clone()), if state.is_active() => { event } + () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, - event = Self::handle_high(self.high_rx.as_mut()), if high_available => { event } + res = Self::poll_high_priority(self.high_rx.as_mut()), if high_available => Event::High(res), - event = Self::handle_low(self.low_rx.as_mut()), if low_available => { event } + res = Self::poll_low_priority(self.low_rx.as_mut()), if low_available => Event::Low(res), - event = Self::handle_response_stream(self.response.as_mut()), if resp_available => { event } + res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), else => Event::Idle, } @@ -308,7 +308,18 @@ where state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { - match self.next_event(state).await { + let event = self.next_event(state).await; + self.handle_event(event, state, out) + } + + /// Dispatch the given event to the appropriate handler. + fn handle_event( + &mut self, + event: Event, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + match event { Event::Shutdown => self.process_shutdown(state), Event::High(res) => self.process_high(res, state, out), Event::Low(res) => self.process_low(res, state, out), @@ -479,6 +490,26 @@ where } } } + + /// Await shutdown cancellation on the provided token. + async fn await_shutdown(token: CancellationToken) { Self::wait_shutdown(token).await; } + + /// Poll the high-priority queue. + async fn poll_high_priority(rx: Option<&mut mpsc::Receiver>) -> Option { + Self::poll_optional(rx, Self::recv_push).await + } + + /// Poll the low-priority queue. + async fn poll_low_priority(rx: Option<&mut mpsc::Receiver>) -> Option { + Self::poll_optional(rx, Self::recv_push).await + } + + /// Poll the streaming response. + async fn poll_response( + resp: Option<&mut FrameStream>, + ) -> Option>> { + Self::poll_optional(resp, |s| s.next()).await + } } /// Internal run state for the connection actor. From 6d15934af0d3ffcc36d34733fafd18d4a5734274 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 3 Aug 2025 22:50:22 +0100 Subject: [PATCH 2/3] Unify priority queue polling --- src/connection.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 30f22390..af549608 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -251,9 +251,9 @@ where () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, - res = Self::poll_high_priority(self.high_rx.as_mut()), if high_available => Event::High(res), + res = Self::poll_priority(self.high_rx.as_mut()), if high_available => Event::High(res), - res = Self::poll_low_priority(self.low_rx.as_mut()), if low_available => Event::Low(res), + res = Self::poll_priority(self.low_rx.as_mut()), if low_available => Event::Low(res), res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), @@ -494,13 +494,8 @@ where /// Await shutdown cancellation on the provided token. async fn await_shutdown(token: CancellationToken) { Self::wait_shutdown(token).await; } - /// Poll the high-priority queue. - async fn poll_high_priority(rx: Option<&mut mpsc::Receiver>) -> Option { - Self::poll_optional(rx, Self::recv_push).await - } - - /// Poll the low-priority queue. - async fn poll_low_priority(rx: Option<&mut mpsc::Receiver>) -> Option { + /// Poll whichever priority queue is provided. + async fn poll_priority(rx: Option<&mut mpsc::Receiver>) -> Option { Self::poll_optional(rx, Self::recv_push).await } From 93d1a1f9e1f3fda26580eea1c67ec1c62e484720 Mon Sep 17 00:00:00 2001 From: Payton McIntosh Date: Sun, 3 Aug 2025 23:20:40 +0100 Subject: [PATCH 3/3] Resolve lint errors --- src/connection.rs | 40 ++-------------------------------------- 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index af549608..676807f4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -261,42 +261,6 @@ where } } - /// Await cancellation and emit a shutdown event. - async fn handle_shutdown(token: CancellationToken) -> Event { - Self::wait_shutdown(token).await; - Event::Shutdown - } - - /// Poll `opt` with `f` and convert the result using `map`. - #[inline] - async fn handle_event<'a, T, Fut, R>( - opt: Option<&'a mut T>, - f: impl FnOnce(&'a mut T) -> Fut + Send + 'a, - map: impl FnOnce(Option) -> Event + Send, - ) -> Event - where - T: Send + 'a, - Fut: Future> + Send + 'a, - { - let res = Self::poll_optional(opt, f).await; - map(res) - } - - /// Poll the high-priority queue. - async fn handle_high(rx: Option<&mut mpsc::Receiver>) -> Event { - Self::handle_event(rx, Self::recv_push, Event::High).await - } - - /// Poll the low-priority queue. - async fn handle_low(rx: Option<&mut mpsc::Receiver>) -> Event { - Self::handle_event(rx, Self::recv_push, Event::Low).await - } - - /// Poll the streaming response if attached. - async fn handle_response_stream(stream: Option<&mut FrameStream>) -> Event { - Self::handle_event(stream, |s| s.next(), Event::Response).await - } - /// Poll all sources and push available frames into `out`. /// /// This method polls the shutdown token, high- and low-priority queues, @@ -309,11 +273,11 @@ where out: &mut Vec, ) -> Result<(), WireframeError> { let event = self.next_event(state).await; - self.handle_event(event, state, out) + self.dispatch_event(event, state, out) } /// Dispatch the given event to the appropriate handler. - fn handle_event( + fn dispatch_event( &mut self, event: Event, state: &mut ActorState,