diff --git a/src/connection.rs b/src/connection.rs index 03f2e410..a5e0d75d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -6,6 +6,7 @@ //! low-priority ones, with streamed responses handled last. use futures::StreamExt; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use crate::{ @@ -98,27 +99,24 @@ where tokio::select! { biased; - () = self.shutdown.cancelled(), if !state.shutting_down => { + () = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => { state.shutting_down = true; self.start_shutdown(&mut state.resp_closed); } - res = self.queues.high_priority_rx.recv(), if !state.push.high => { + res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => { self.handle_push(res, &mut state.push.high, out); } - res = self.queues.low_priority_rx.recv(), if !state.push.low => { + res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => { self.handle_push(res, &mut state.push.low, out); } - res = async { - if let Some(stream) = &mut self.response { - stream.next().await - } else { - None - } - }, if !state.shutting_down && !state.resp_closed => { + res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { self.handle_response(res, &mut state.resp_closed, out)?; + if state.resp_closed { + self.response = None; + } } } @@ -164,6 +162,26 @@ where Ok(()) } + + /// Await cancellation on the provided shutdown token. + #[inline] + async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + + /// Receive the next frame from a push queue. + #[inline] + async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + + /// Poll the current streaming response for the next frame. + #[inline] + async fn next_response( + stream: &mut Option>, + ) -> Option>> { + if let Some(s) = stream { + s.next().await + } else { + None + } + } } struct PushClosed {