Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ An optional time slice (for example 100 µs) can also be configured. When the
elapsed time spent handling high-priority frames exceeds this slice, and the low
queue is not empty, the actor yields to a low-priority frame. Application
builders expose `with_fairness(FairnessConfig)` where `FairnessConfig` groups
the counter threshold and an optional `time_slice`. The counter defaults to 16
while `time_slice` is disabled. Setting the counter to zero preserves the
original strict ordering.
the counter threshold and an optional `time_slice`. The counter defaults to 8
while `time_slice` is disabled. Setting the counter to zero disables the
threshold logic and relies solely on `time_slice` for fairness, preserving
strict high-priority ordering otherwise.

This fairness mechanism ensures low-priority traffic continues to progress even
under sustained high-priority load.
Expand Down Expand Up @@ -181,6 +182,31 @@ flowchart TD
K --> A
```

The following sequence diagram illustrates the runtime behaviour:

```mermaid
sequenceDiagram
participant Client
participant ConnectionActor
participant HighQueue
participant LowQueue

loop While processing frames
ConnectionActor->>HighQueue: Poll high-priority frame
alt High-priority frame received
ConnectionActor->>ConnectionActor: after_high()
alt Max high before low or time slice reached
ConnectionActor->>LowQueue: Try receive low-priority frame
alt Low-priority frame available
ConnectionActor->>ConnectionActor: after_low()
end
end
else No high-priority frame
ConnectionActor->>ConnectionActor: reset_high_counter()
end
end
```

### 3.3 Connection Actor Overview

```mermaid
Expand Down
2 changes: 1 addition & 1 deletion docs/asynchronous-outbound-messaging-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ design documents.
- [x] **Connection actor** with a biased `select!` loop that polls for shutdown,
high/low queues and response streams as described in
[Design §3.2][design-write-loop].
- [ ] **Fairness counter** to yield to the low-priority queue after bursts of
- [x] **Fairness counter** to yield to the low-priority queue after bursts of
high-priority frames ([Design §3.2.1][design-fairness]).
- [ ] **Run state consolidation** using `Option` receivers and a closed source
counter ([Design §3.4][design-actor-state]).
Expand Down
69 changes: 33 additions & 36 deletions docs/contents.md
Original file line number Diff line number Diff line change
@@ -1,61 +1,58 @@
# Documentation contents

This page summarizes each file in the `docs/` directory.
Use it as a quick reference when navigating the project's design and
architecture material.
This page summarizes each file in the `docs/` directory. Use it as a quick
reference when navigating the project's design and architecture material.

## Design documents

- [Outbound messaging design](asynchronous-outbound-messaging-design.md)
- [Outbound messaging design](asynchronous-outbound-messaging-design.md)
Comprehensive design for server-initiated pushes.
- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md)
- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md)
Design for fragmenting and reassembling messages.
- [Streaming response design](multi-packet-and-streaming-responses-design.md)
- [Streaming response design](multi-packet-and-streaming-responses-design.md)
Design for handling multi-packet and streaming responses.
- [Router library design](rust-binary-router-library-design.md)
In-depth design of the binary router library.
- [Client design](wireframe-client-design.md)
Proposal for adding client-side support.
- [Frame metadata](frame-metadata.md)
How frame metadata assists with routing decisions.
- [Message versioning](message-versioning.md)
Approaches to message versioning and compatibility.
- [Preamble validator](preamble-validator.md)
Validating client connection preambles.
- [Router library design](rust-binary-router-library-design.md) In-depth design
of the binary router library.
- [Client design](wireframe-client-design.md) Proposal for adding client-side
support.
- [Frame metadata](frame-metadata.md) How frame metadata assists with routing
decisions.
- [Message versioning](message-versioning.md) Approaches to message versioning
and compatibility.
- [Preamble validator](preamble-validator.md) Validating client connection
preambles.

## Roadmaps

- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md)
Task list for implementing asynchronous outbound messaging.
- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md)
- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md) Task
list for implementing asynchronous outbound messaging.
- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md)
Detailed tasks leading to Wireframe 1.0.
- [Project roadmap](roadmap.md)
High-level development roadmap.
- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md)
- [Project roadmap](roadmap.md) High-level development roadmap.
- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md)
Philosophy and feature set for Wireframe 1.0.

## Testing

- [Multi-layered testing strategy](multi-layered-testing-strategy.md)
Overview of the library's testing approach.
- [RSTest fixtures](rust-testing-with-rstest-fixtures.md)
Using `rstest` fixtures for clean tests.
- [Mocking network outages](mocking-network-outages-in-rust.md)
Tutorial for simulating network outages in tests.
- [Testing helpers](wireframe-testing-crate.md)
Planned companion crate with testing helpers.
- [Multi-layered testing strategy](multi-layered-testing-strategy.md) Overview
of the library's testing approach.
- [RSTest fixtures](rust-testing-with-rstest-fixtures.md) Using `rstest`
fixtures for clean tests.
- [Mocking network outages](mocking-network-outages-in-rust.md) Tutorial for
simulating network outages in tests.
- [Testing helpers](wireframe-testing-crate.md) Planned companion crate with
testing helpers.

## Operations and resilience

- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md)
- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md)
Guidance on achieving production resilience.
- [Observability and operability](observability-operability-and-maturity.md)
- [Observability and operability](observability-operability-and-maturity.md)
Guide to observability and operational maturity.

## Reference guides

- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md)
- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md)
Strategies for taming code complexity and refactoring.
- [Documentation style guide](documentation-style-guide.md)
Conventions for writing project documentation.

- [Documentation style guide](documentation-style-guide.md) Conventions for
writing project documentation.
126 changes: 117 additions & 9 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
//! low-priority ones, with streamed responses handled last.

use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::{
sync::mpsc,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;

use crate::{
Expand All @@ -15,12 +18,37 @@ use crate::{
response::{FrameStream, WireframeError},
};

/// Configuration controlling fairness when draining push queues.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue.
pub max_high_before_low: usize,
/// A zero value disables the counter and relies solely on `time_slice` for
/// fairness, preserving strict high-priority ordering otherwise.
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}

impl Default for FairnessConfig {
fn default() -> Self {
Self {
max_high_before_low: 8,
time_slice: None,
}
}
}

/// Actor driving outbound frame delivery for a connection.
pub struct ConnectionActor<F, E> {
queues: PushQueues<F>,
response: Option<FrameStream<F, E>>, // current streaming response
shutdown: CancellationToken,
hooks: ProtocolHooks<F>,
fairness: FairnessConfig,
high_counter: usize,
high_start: Option<Instant>,
}

impl<F, E> ConnectionActor<F, E>
Expand Down Expand Up @@ -50,6 +78,9 @@ where
response,
shutdown,
hooks,
fairness: FairnessConfig::default(),
high_counter: 0,
high_start: None,
}
}

Expand All @@ -60,6 +91,9 @@ where
#[must_use]
pub fn queues_mut(&mut self) -> &mut PushQueues<F> { &mut self.queues }

/// Replace the fairness configuration.
pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; }

/// Set or replace the current streaming response.
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) { self.response = stream; }

Expand Down Expand Up @@ -100,29 +134,65 @@ where
biased;

() = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => {
state.shutting_down = true;
self.start_shutdown(&mut state.resp_closed);
self.process_shutdown(state);
}

res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => {
self.handle_push(res, &mut state.push.high, out);
self.process_high(res, state, out);
}

res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => {
self.handle_push(res, &mut state.push.low, out);
self.process_low(res, state, out);
}

res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => {
self.handle_response(res, &mut state.resp_closed, out)?;
if state.resp_closed {
self.response = None;
}
self.process_response(res, state, out)?;
}
}

Ok(())
}

fn process_shutdown(&mut self, state: &mut ActorState) {
state.shutting_down = true;
self.start_shutdown(&mut state.resp_closed);
}

fn process_high(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
let processed = res.is_some();
self.handle_push(res, &mut state.push.high, out);
if processed {
self.after_high(out, &mut state.push.low);
} else {
self.reset_high_counter();
}
}

fn process_low(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
let processed = res.is_some();
self.handle_push(res, &mut state.push.low, out);
if processed {
self.after_low();
}
}

fn process_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
let processed = matches!(res, Some(Ok(_)));
self.handle_response(res, &mut state.resp_closed, out)?;
if processed {
self.after_low();
}
if state.resp_closed {
self.response = None;
}
Ok(())
}

fn start_shutdown(&mut self, resp_closed: &mut bool) {
self.queues.high_priority_rx.close();
self.queues.low_priority_rx.close();
Expand All @@ -142,6 +212,44 @@ where
}
}

fn after_high(&mut self, out: &mut Vec<F>, low_closed: &mut bool) {
self.high_counter += 1;
if self.high_counter == 1 {
self.high_start = Some(Instant::now());
}

if self.should_yield_to_low_priority() {
match self.queues.low_priority_rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame);
out.push(frame);
self.after_low();
*low_closed = false;
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => *low_closed = true,
}
}
}

fn should_yield_to_low_priority(&self) -> bool {
let threshold_hit = self.fairness.max_high_before_low > 0
&& self.high_counter >= self.fairness.max_high_before_low;
let time_hit = self
.fairness
.time_slice
.zip(self.high_start)
.is_some_and(|(slice, start)| start.elapsed() >= slice);
threshold_hit || time_hit
}

fn after_low(&mut self) { self.reset_high_counter(); }

fn reset_high_counter(&mut self) {
self.high_counter = 0;
self.high_start = None;
}

fn handle_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
Expand Down
27 changes: 26 additions & 1 deletion tests/connection_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rstest::{fixture, rstest};
use tokio::time::{Duration, sleep, timeout};
use tokio_util::sync::CancellationToken;
use wireframe::{
connection::ConnectionActor,
connection::{ConnectionActor, FairnessConfig},
push::PushQueues,
response::{FrameStream, WireframeError},
};
Expand Down Expand Up @@ -44,6 +44,31 @@ async fn strict_priority_order(
assert_eq!(out, vec![1, 2, 3]);
}

#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 2,
time_slice: None,
};

for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);

let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}

#[rstest]
#[tokio::test]
async fn shutdown_signal_precedence(
Expand Down