Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/asynchronous-outbound-messaging-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ design documents.
[Design §3.2][design-write-loop].
- [x] **Fairness counter** to yield to the low-priority queue after bursts of
high-priority frames ([Design §3.2.1][design-fairness]).
- [ ] **Run state consolidation** using `Option` receivers and a closed source
- [x] **Run state consolidation** using `Option` receivers and a closed source
counter ([Design §3.4][design-actor-state]).
- [X] **Internal protocol hooks** `before_send` and `on_command_end` invoked
from the actor ([Design §4.3][design-hooks]).
Expand Down
225 changes: 161 additions & 64 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,21 @@ impl Default for FairnessConfig {
}

/// Actor driving outbound frame delivery for a connection.
///
/// # Examples
///
/// ```no_run
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, _handle) = PushQueues::<u8>::bounded(8, 8);
/// let shutdown = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown);
/// # drop(actor);
/// ```
pub struct ConnectionActor<F, E> {
queues: PushQueues<F>,
high_rx: Option<mpsc::Receiver<F>>,
low_rx: Option<mpsc::Receiver<F>>,
response: Option<FrameStream<F, E>>, // current streaming response
shutdown: CancellationToken,
hooks: ProtocolHooks<F>,
Expand All @@ -56,6 +69,18 @@ where
F: FrameLike,
{
/// Create a new `ConnectionActor` from the provided components.
///
/// # Examples
///
/// ```
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, _handle) = PushQueues::<u8>::bounded(4, 4);
/// let token = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, token);
/// # drop(actor);
/// ```
#[must_use]
pub fn new(
queues: PushQueues<F>,
Expand All @@ -74,7 +99,8 @@ where
hooks: ProtocolHooks<F>,
) -> Self {
Self {
queues,
high_rx: Some(queues.high_priority_rx),
low_rx: Some(queues.low_priority_rx),
response,
shutdown,
hooks,
Expand All @@ -84,13 +110,6 @@ where
}
}

/// Access the underlying push queues.
///
/// This is mainly used in tests to close the queues when no actor is
/// draining them.
#[must_use]
pub fn queues_mut(&mut self) -> &mut PushQueues<F> { &mut self.queues }

/// Replace the fairness configuration.
pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; }

Expand All @@ -116,7 +135,7 @@ where
return Ok(());
}

let mut state = ActorState::new(self.response.is_none());
let mut state = ActorState::new(self.response.is_some());

while !state.is_done() {
self.poll_sources(&mut state, out).await?;
Expand All @@ -125,113 +144,150 @@ where
Ok(())
}

/// Poll all sources and push available frames into `out`.
///
/// This method polls the shutdown token, high- and low-priority queues,
/// and the optional response stream. Frames are appended to `out` in the
/// order they are processed. `ActorState` is updated based on which sources
/// return `None`.
async fn poll_sources(
&mut self,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
let high_available = self.high_rx.is_some();
let low_available = self.low_rx.is_some();
let resp_available = self.response.is_some();

tokio::select! {
biased;

() = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => {
() = Self::wait_shutdown(self.shutdown.clone()), if state.is_active() => {
self.process_shutdown(state);
}

res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => {
res = async {
Self::recv_push(
self.high_rx
.as_mut()
.expect("high_rx should be Some when high_available is true")
).await
}, if high_available => {
self.process_high(res, state, out);
}

res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => {
res = async {
Self::recv_push(
self.low_rx
.as_mut()
.expect("low_rx should be Some when low_available is true")
).await
}, if low_available => {
self.process_low(res, state, out);
}

res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => {
// `tokio::select!` is biased so the shutdown branch runs before
// this one. `process_shutdown` removes the response stream, making
// `resp_available` false on the next loop iteration. The explicit
// `!state.is_shutting_down()` check avoids polling the stream after
// shutdown has begun.
res = Self::next_response(&mut self.response), if resp_available && !state.is_shutting_down() => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (review_instructions): Check for shutdown before polling response to avoid unnecessary work.

The select branch for next_response now checks both resp_available and !state.is_shutting_down(). Ensure that this logic matches the intended shutdown semantics and does not introduce a race where a response is polled after shutdown has started.

Review instructions:

Path patterns: **/*

Instructions:
Create code-review comments for ALL issues. Avoid making general observations or non-specific feedback if at all possible.

self.process_response(res, state, out)?;
}
}

Ok(())
}

/// Begin shutdown once cancellation has been observed.
fn process_shutdown(&mut self, state: &mut ActorState) {
state.shutting_down = true;
self.start_shutdown(&mut state.resp_closed);
state.start_shutdown();
self.start_shutdown(state);
}

/// Handle the result of polling the high-priority queue.
fn process_high(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
let processed = res.is_some();
self.handle_push(res, &mut state.push.high, out);
if processed {
self.after_high(out, &mut state.push.low);
if let Some(mut frame) = res {
self.hooks.before_send(&mut frame);
out.push(frame);
self.after_high(out, state);
} else {
self.high_rx = None;
state.mark_closed();
self.reset_high_counter();
}
}

/// Handle the result of polling the low-priority queue.
fn process_low(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
let processed = res.is_some();
self.handle_push(res, &mut state.push.low, out);
if processed {
if let Some(mut frame) = res {
self.hooks.before_send(&mut frame);
out.push(frame);
self.after_low();
} else {
self.low_rx = None;
state.mark_closed();
}
}

/// Handle the next frame or error from the streaming response.
fn process_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
let processed = matches!(res, Some(Ok(_)));
self.handle_response(res, &mut state.resp_closed, out)?;
let is_none = res.is_none();
self.handle_response(res, state, out)?;
if processed {
self.after_low();
}
if state.resp_closed {
if is_none {
self.response = None;
}
Ok(())
}

fn start_shutdown(&mut self, resp_closed: &mut bool) {
self.queues.high_priority_rx.close();
self.queues.low_priority_rx.close();
// Drop any streaming response so shutdown is prompt. Queued frames are
// still drained, but streamed responses may be truncated.
self.response = None;
*resp_closed = true;
}

fn handle_push(&mut self, res: Option<F>, closed: &mut bool, out: &mut Vec<F>) {
match res {
Some(mut frame) => {
self.hooks.before_send(&mut frame);
out.push(frame);
}
None => *closed = true,
/// Close all receivers and mark the response stream as closed if present.
fn start_shutdown(&mut self, state: &mut ActorState) {
if let Some(rx) = &mut self.high_rx {
rx.close();
}
if let Some(rx) = &mut self.low_rx {
rx.close();
}
if self.response.take().is_some() {
state.mark_closed();
}
}

fn after_high(&mut self, out: &mut Vec<F>, low_closed: &mut bool) {
/// Update counters and opportunistically drain the low-priority queue.
fn after_high(&mut self, out: &mut Vec<F>, state: &mut ActorState) {
self.high_counter += 1;
if self.high_counter == 1 {
self.high_start = Some(Instant::now());
}

if self.should_yield_to_low_priority() {
match self.queues.low_priority_rx.try_recv() {
if self.should_yield_to_low_priority()
&& let Some(rx) = &mut self.low_rx
{
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame);
out.push(frame);
self.after_low();
*low_closed = false;
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => *low_closed = true,
Err(mpsc::error::TryRecvError::Disconnected) => {
self.low_rx = None;
state.mark_closed();
}
}
}
}

/// Determine if processing should yield to the low-priority queue.
fn should_yield_to_low_priority(&self) -> bool {
let threshold_hit = self.fairness.max_high_before_low > 0
&& self.high_counter >= self.fairness.max_high_before_low;
Expand All @@ -243,17 +299,20 @@ where
threshold_hit || time_hit
}

/// Reset counters after processing a low-priority frame.
fn after_low(&mut self) { self.reset_high_counter(); }

/// Clear the burst counter and associated timestamp.
fn reset_high_counter(&mut self) {
self.high_counter = 0;
self.high_start = None;
}

/// Push a frame from the response stream into `out` or handle completion.
fn handle_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
closed: &mut bool,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
match res {
Expand All @@ -263,7 +322,7 @@ where
}
Some(Err(e)) => return Err(e),
None => {
*closed = true;
state.mark_closed();
self.hooks.on_command_end();
}
}
Expand Down Expand Up @@ -292,31 +351,69 @@ where
}
}

struct PushClosed {
high: bool,
low: bool,
/// Internal run state for the connection actor.
enum RunState {
/// All sources are open and frames are still being processed.
Active,
/// A shutdown request has been observed and queues are being closed.
ShuttingDown,
/// All sources have completed and the actor can exit.
Finished,
}

/// Tracks progress through the actor lifecycle.
struct ActorState {
push: PushClosed,
resp_closed: bool,
shutting_down: bool,
run_state: RunState,
closed_sources: usize,
total_sources: usize,
}

impl ActorState {
fn new(resp_closed: bool) -> Self {
/// Create a new `ActorState`.
///
/// `has_response` indicates whether a streaming response is currently
/// attached.
///
/// # Examples
///
/// ```ignore
/// use wireframe::connection::ActorState;
///
/// let state = ActorState::new(true);
/// assert!(state.is_active());
/// ```
fn new(has_response: bool) -> Self {
Self {
push: PushClosed {
high: false,
low: false,
},
resp_closed,
shutting_down: false,
run_state: RunState::Active,
// The shutdown token is considered closed until cancellation
// occurs, matching previous behaviour where draining sources
// without explicit shutdown terminates the actor.
closed_sources: 1,
total_sources: 3 + usize::from(has_response),
}
}

/// Mark a source as closed and update the run state if all are closed.
fn mark_closed(&mut self) {
self.closed_sources += 1;
if self.closed_sources == self.total_sources {
self.run_state = RunState::Finished;
}
}

fn is_done(&self) -> bool {
let push_drained = self.push.high && self.push.low;
push_drained && (self.resp_closed || self.shutting_down)
/// Transition to `ShuttingDown` if currently active.
fn start_shutdown(&mut self) {
if matches!(self.run_state, RunState::Active) {
self.run_state = RunState::ShuttingDown;
}
}

/// Returns `true` while the actor is actively processing sources.
fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) }

/// Returns `true` once shutdown has begun.
fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) }

/// Returns `true` when all sources have finished.
fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) }
}