diff --git a/src/connection.rs b/src/connection.rs index ae5c3b8d..676807f4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -249,54 +249,18 @@ where tokio::select! { biased; - event = Self::handle_shutdown(self.shutdown.clone()), if state.is_active() => { event } + () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, - event = Self::handle_high(self.high_rx.as_mut()), if high_available => { event } + res = Self::poll_priority(self.high_rx.as_mut()), if high_available => Event::High(res), - event = Self::handle_low(self.low_rx.as_mut()), if low_available => { event } + res = Self::poll_priority(self.low_rx.as_mut()), if low_available => Event::Low(res), - event = Self::handle_response_stream(self.response.as_mut()), if resp_available => { event } + res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), else => Event::Idle, } } - /// Await cancellation and emit a shutdown event. - async fn handle_shutdown(token: CancellationToken) -> Event { - Self::wait_shutdown(token).await; - Event::Shutdown - } - - /// Poll `opt` with `f` and convert the result using `map`. - #[inline] - async fn handle_event<'a, T, Fut, R>( - opt: Option<&'a mut T>, - f: impl FnOnce(&'a mut T) -> Fut + Send + 'a, - map: impl FnOnce(Option) -> Event + Send, - ) -> Event - where - T: Send + 'a, - Fut: Future> + Send + 'a, - { - let res = Self::poll_optional(opt, f).await; - map(res) - } - - /// Poll the high-priority queue. - async fn handle_high(rx: Option<&mut mpsc::Receiver>) -> Event { - Self::handle_event(rx, Self::recv_push, Event::High).await - } - - /// Poll the low-priority queue. - async fn handle_low(rx: Option<&mut mpsc::Receiver>) -> Event { - Self::handle_event(rx, Self::recv_push, Event::Low).await - } - - /// Poll the streaming response if attached. - async fn handle_response_stream(stream: Option<&mut FrameStream>) -> Event { - Self::handle_event(stream, |s| s.next(), Event::Response).await - } - /// Poll all sources and push available frames into `out`. /// /// This method polls the shutdown token, high- and low-priority queues, @@ -308,7 +272,18 @@ where state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { - match self.next_event(state).await { + let event = self.next_event(state).await; + self.dispatch_event(event, state, out) + } + + /// Dispatch the given event to the appropriate handler. + fn dispatch_event( + &mut self, + event: Event, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + match event { Event::Shutdown => self.process_shutdown(state), Event::High(res) => self.process_high(res, state, out), Event::Low(res) => self.process_low(res, state, out), @@ -479,6 +454,21 @@ where } } } + + /// Await shutdown cancellation on the provided token. + async fn await_shutdown(token: CancellationToken) { Self::wait_shutdown(token).await; } + + /// Poll whichever priority queue is provided. + async fn poll_priority(rx: Option<&mut mpsc::Receiver>) -> Option { + Self::poll_optional(rx, Self::recv_push).await + } + + /// Poll the streaming response. + async fn poll_response( + resp: Option<&mut FrameStream>, + ) -> Option>> { + Self::poll_optional(resp, |s| s.next()).await + } } /// Internal run state for the connection actor.