Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 31 additions & 41 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,54 +249,18 @@ where
tokio::select! {
biased;

event = Self::handle_shutdown(self.shutdown.clone()), if state.is_active() => { event }
() = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown,

event = Self::handle_high(self.high_rx.as_mut()), if high_available => { event }
res = Self::poll_priority(self.high_rx.as_mut()), if high_available => Event::High(res),

event = Self::handle_low(self.low_rx.as_mut()), if low_available => { event }
res = Self::poll_priority(self.low_rx.as_mut()), if low_available => Event::Low(res),

event = Self::handle_response_stream(self.response.as_mut()), if resp_available => { event }
res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res),

else => Event::Idle,
}
}

/// Await cancellation and emit a shutdown event.
async fn handle_shutdown(token: CancellationToken) -> Event<F, E> {
Self::wait_shutdown(token).await;
Event::Shutdown
}

/// Poll `opt` with `f` and convert the result using `map`.
#[inline]
async fn handle_event<'a, T, Fut, R>(
opt: Option<&'a mut T>,
f: impl FnOnce(&'a mut T) -> Fut + Send + 'a,
map: impl FnOnce(Option<R>) -> Event<F, E> + Send,
) -> Event<F, E>
where
T: Send + 'a,
Fut: Future<Output = Option<R>> + Send + 'a,
{
let res = Self::poll_optional(opt, f).await;
map(res)
}

/// Poll the high-priority queue.
async fn handle_high(rx: Option<&mut mpsc::Receiver<F>>) -> Event<F, E> {
Self::handle_event(rx, Self::recv_push, Event::High).await
}

/// Poll the low-priority queue.
async fn handle_low(rx: Option<&mut mpsc::Receiver<F>>) -> Event<F, E> {
Self::handle_event(rx, Self::recv_push, Event::Low).await
}

/// Poll the streaming response if attached.
async fn handle_response_stream(stream: Option<&mut FrameStream<F, E>>) -> Event<F, E> {
Self::handle_event(stream, |s| s.next(), Event::Response).await
}

/// Poll all sources and push available frames into `out`.
///
/// This method polls the shutdown token, high- and low-priority queues,
Expand All @@ -308,7 +272,18 @@ where
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
match self.next_event(state).await {
let event = self.next_event(state).await;
self.dispatch_event(event, state, out)
}

/// Dispatch the given event to the appropriate handler.
fn dispatch_event(
&mut self,
event: Event<F, E>,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
match event {
Event::Shutdown => self.process_shutdown(state),
Event::High(res) => self.process_high(res, state, out),
Event::Low(res) => self.process_low(res, state, out),
Expand Down Expand Up @@ -479,6 +454,21 @@ where
}
}
}

/// Await shutdown cancellation on the provided token.
async fn await_shutdown(token: CancellationToken) { Self::wait_shutdown(token).await; }

/// Poll whichever priority queue is provided.
async fn poll_priority(rx: Option<&mut mpsc::Receiver<F>>) -> Option<F> {
Self::poll_optional(rx, Self::recv_push).await
}

/// Poll the streaming response.
async fn poll_response(
resp: Option<&mut FrameStream<F, E>>,
) -> Option<Result<F, WireframeError<E>>> {
Self::poll_optional(resp, |s| s.next()).await
}
}

/// Internal run state for the connection actor.
Expand Down
Loading