diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index 1e8e1644..8bd95f6c 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -15,7 +15,7 @@ design documents. [Design §3.2][design-write-loop]. - [x] **Fairness counter** to yield to the low-priority queue after bursts of high-priority frames ([Design §3.2.1][design-fairness]). -- [ ] **Run state consolidation** using `Option` receivers and a closed source +- [x] **Run state consolidation** using `Option` receivers and a closed source counter ([Design §3.4][design-actor-state]). - [X] **Internal protocol hooks** `before_send` and `on_command_end` invoked from the actor ([Design §4.3][design-hooks]). diff --git a/src/connection.rs b/src/connection.rs index 53b145fe..20fd423a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -41,8 +41,21 @@ impl Default for FairnessConfig { } /// Actor driving outbound frame delivery for a connection. +/// +/// # Examples +/// +/// ```no_run +/// use tokio_util::sync::CancellationToken; +/// use wireframe::{connection::ConnectionActor, push::PushQueues}; +/// +/// let (queues, _handle) = PushQueues::::bounded(8, 8); +/// let shutdown = CancellationToken::new(); +/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown); +/// # drop(actor); +/// ``` pub struct ConnectionActor { - queues: PushQueues, + high_rx: Option>, + low_rx: Option>, response: Option>, // current streaming response shutdown: CancellationToken, hooks: ProtocolHooks, @@ -56,6 +69,18 @@ where F: FrameLike, { /// Create a new `ConnectionActor` from the provided components. + /// + /// # Examples + /// + /// ``` + /// use tokio_util::sync::CancellationToken; + /// use wireframe::{connection::ConnectionActor, push::PushQueues}; + /// + /// let (queues, _handle) = PushQueues::::bounded(4, 4); + /// let token = CancellationToken::new(); + /// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, token); + /// # drop(actor); + /// ``` #[must_use] pub fn new( queues: PushQueues, @@ -74,7 +99,8 @@ where hooks: ProtocolHooks, ) -> Self { Self { - queues, + high_rx: Some(queues.high_priority_rx), + low_rx: Some(queues.low_priority_rx), response, shutdown, hooks, @@ -84,13 +110,6 @@ where } } - /// Access the underlying push queues. - /// - /// This is mainly used in tests to close the queues when no actor is - /// draining them. - #[must_use] - pub fn queues_mut(&mut self) -> &mut PushQueues { &mut self.queues } - /// Replace the fairness configuration. pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; } @@ -116,7 +135,7 @@ where return Ok(()); } - let mut state = ActorState::new(self.response.is_none()); + let mut state = ActorState::new(self.response.is_some()); while !state.is_done() { self.poll_sources(&mut state, out).await?; @@ -125,27 +144,54 @@ where Ok(()) } + /// Poll all sources and push available frames into `out`. + /// + /// This method polls the shutdown token, high- and low-priority queues, + /// and the optional response stream. Frames are appended to `out` in the + /// order they are processed. `ActorState` is updated based on which sources + /// return `None`. async fn poll_sources( &mut self, state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { + let high_available = self.high_rx.is_some(); + let low_available = self.low_rx.is_some(); + let resp_available = self.response.is_some(); + tokio::select! { biased; - () = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => { + () = Self::wait_shutdown(self.shutdown.clone()), if state.is_active() => { self.process_shutdown(state); } - res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => { + res = async { + Self::recv_push( + self.high_rx + .as_mut() + .expect("high_rx should be Some when high_available is true") + ).await + }, if high_available => { self.process_high(res, state, out); } - res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => { + res = async { + Self::recv_push( + self.low_rx + .as_mut() + .expect("low_rx should be Some when low_available is true") + ).await + }, if low_available => { self.process_low(res, state, out); } - res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => { + // `tokio::select!` is biased so the shutdown branch runs before + // this one. `process_shutdown` removes the response stream, making + // `resp_available` false on the next loop iteration. The explicit + // `!state.is_shutting_down()` check avoids polling the stream after + // shutdown has begun. + res = Self::next_response(&mut self.response), if resp_available && !state.is_shutting_down() => { self.process_response(res, state, out)?; } } @@ -153,29 +199,38 @@ where Ok(()) } + /// Begin shutdown once cancellation has been observed. fn process_shutdown(&mut self, state: &mut ActorState) { - state.shutting_down = true; - self.start_shutdown(&mut state.resp_closed); + state.start_shutdown(); + self.start_shutdown(state); } + /// Handle the result of polling the high-priority queue. fn process_high(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - let processed = res.is_some(); - self.handle_push(res, &mut state.push.high, out); - if processed { - self.after_high(out, &mut state.push.low); + if let Some(mut frame) = res { + self.hooks.before_send(&mut frame); + out.push(frame); + self.after_high(out, state); } else { + self.high_rx = None; + state.mark_closed(); self.reset_high_counter(); } } + /// Handle the result of polling the low-priority queue. fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - let processed = res.is_some(); - self.handle_push(res, &mut state.push.low, out); - if processed { + if let Some(mut frame) = res { + self.hooks.before_send(&mut frame); + out.push(frame); self.after_low(); + } else { + self.low_rx = None; + state.mark_closed(); } } + /// Handle the next frame or error from the streaming response. fn process_response( &mut self, res: Option>>, @@ -183,55 +238,56 @@ where out: &mut Vec, ) -> Result<(), WireframeError> { let processed = matches!(res, Some(Ok(_))); - self.handle_response(res, &mut state.resp_closed, out)?; + let is_none = res.is_none(); + self.handle_response(res, state, out)?; if processed { self.after_low(); } - if state.resp_closed { + if is_none { self.response = None; } Ok(()) } - fn start_shutdown(&mut self, resp_closed: &mut bool) { - self.queues.high_priority_rx.close(); - self.queues.low_priority_rx.close(); - // Drop any streaming response so shutdown is prompt. Queued frames are - // still drained, but streamed responses may be truncated. - self.response = None; - *resp_closed = true; - } - - fn handle_push(&mut self, res: Option, closed: &mut bool, out: &mut Vec) { - match res { - Some(mut frame) => { - self.hooks.before_send(&mut frame); - out.push(frame); - } - None => *closed = true, + /// Close all receivers and mark the response stream as closed if present. + fn start_shutdown(&mut self, state: &mut ActorState) { + if let Some(rx) = &mut self.high_rx { + rx.close(); + } + if let Some(rx) = &mut self.low_rx { + rx.close(); + } + if self.response.take().is_some() { + state.mark_closed(); } } - fn after_high(&mut self, out: &mut Vec, low_closed: &mut bool) { + /// Update counters and opportunistically drain the low-priority queue. + fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { self.high_counter += 1; if self.high_counter == 1 { self.high_start = Some(Instant::now()); } - if self.should_yield_to_low_priority() { - match self.queues.low_priority_rx.try_recv() { + if self.should_yield_to_low_priority() + && let Some(rx) = &mut self.low_rx + { + match rx.try_recv() { Ok(mut frame) => { self.hooks.before_send(&mut frame); out.push(frame); self.after_low(); - *low_closed = false; } Err(mpsc::error::TryRecvError::Empty) => {} - Err(mpsc::error::TryRecvError::Disconnected) => *low_closed = true, + Err(mpsc::error::TryRecvError::Disconnected) => { + self.low_rx = None; + state.mark_closed(); + } } } } + /// Determine if processing should yield to the low-priority queue. fn should_yield_to_low_priority(&self) -> bool { let threshold_hit = self.fairness.max_high_before_low > 0 && self.high_counter >= self.fairness.max_high_before_low; @@ -243,17 +299,20 @@ where threshold_hit || time_hit } + /// Reset counters after processing a low-priority frame. fn after_low(&mut self) { self.reset_high_counter(); } + /// Clear the burst counter and associated timestamp. fn reset_high_counter(&mut self) { self.high_counter = 0; self.high_start = None; } + /// Push a frame from the response stream into `out` or handle completion. fn handle_response( &mut self, res: Option>>, - closed: &mut bool, + state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { match res { @@ -263,7 +322,7 @@ where } Some(Err(e)) => return Err(e), None => { - *closed = true; + state.mark_closed(); self.hooks.on_command_end(); } } @@ -292,31 +351,69 @@ where } } -struct PushClosed { - high: bool, - low: bool, +/// Internal run state for the connection actor. +enum RunState { + /// All sources are open and frames are still being processed. + Active, + /// A shutdown request has been observed and queues are being closed. + ShuttingDown, + /// All sources have completed and the actor can exit. + Finished, } +/// Tracks progress through the actor lifecycle. struct ActorState { - push: PushClosed, - resp_closed: bool, - shutting_down: bool, + run_state: RunState, + closed_sources: usize, + total_sources: usize, } impl ActorState { - fn new(resp_closed: bool) -> Self { + /// Create a new `ActorState`. + /// + /// `has_response` indicates whether a streaming response is currently + /// attached. + /// + /// # Examples + /// + /// ```ignore + /// use wireframe::connection::ActorState; + /// + /// let state = ActorState::new(true); + /// assert!(state.is_active()); + /// ``` + fn new(has_response: bool) -> Self { Self { - push: PushClosed { - high: false, - low: false, - }, - resp_closed, - shutting_down: false, + run_state: RunState::Active, + // The shutdown token is considered closed until cancellation + // occurs, matching previous behaviour where draining sources + // without explicit shutdown terminates the actor. + closed_sources: 1, + total_sources: 3 + usize::from(has_response), + } + } + + /// Mark a source as closed and update the run state if all are closed. + fn mark_closed(&mut self) { + self.closed_sources += 1; + if self.closed_sources == self.total_sources { + self.run_state = RunState::Finished; } } - fn is_done(&self) -> bool { - let push_drained = self.push.high && self.push.low; - push_drained && (self.resp_closed || self.shutting_down) + /// Transition to `ShuttingDown` if currently active. + fn start_shutdown(&mut self) { + if matches!(self.run_state, RunState::Active) { + self.run_state = RunState::ShuttingDown; + } } + + /// Returns `true` while the actor is actively processing sources. + fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + + /// Returns `true` once shutdown has begun. + fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } + + /// Returns `true` when all sources have finished. + fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } }