Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ An optional time slice (for example 100 µs) can also be configured. When the
elapsed time spent handling high-priority frames exceeds this slice, and the low
queue is not empty, the actor yields to a low-priority frame. Application
builders expose `with_fairness(FairnessConfig)` where `FairnessConfig` groups
the counter threshold and an optional `time_slice`. The counter defaults to 16
while `time_slice` is disabled. Setting the counter to zero preserves the
original strict ordering.
the counter threshold and an optional `time_slice`. The counter defaults to 8
while `time_slice` is disabled. Setting the counter to zero disables the
threshold logic and relies solely on `time_slice` for fairness, preserving
strict high-priority ordering otherwise.

This fairness mechanism ensures low-priority traffic continues to progress even
under sustained high-priority load.
Expand Down Expand Up @@ -181,6 +182,31 @@ flowchart TD
K --> A
```

The following sequence diagram illustrates the runtime behaviour:

```mermaid
sequenceDiagram
participant Client
participant ConnectionActor
participant HighQueue
participant LowQueue

loop While processing frames
ConnectionActor->>HighQueue: Poll high-priority frame
alt High-priority frame received
ConnectionActor->>ConnectionActor: after_high()
alt Max high before low or time slice reached
ConnectionActor->>LowQueue: Try receive low-priority frame
alt Low-priority frame available
ConnectionActor->>ConnectionActor: after_low()
end
end
else No high-priority frame
ConnectionActor->>ConnectionActor: reset_high_counter()
end
end
```

### 3.3 Connection Actor Overview

```mermaid
Expand Down
2 changes: 1 addition & 1 deletion docs/asynchronous-outbound-messaging-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ design documents.
- [x] **Connection actor** with a biased `select!` loop that polls for shutdown,
high/low queues and response streams as described in
[Design §3.2][design-write-loop].
- [ ] **Fairness counter** to yield to the low-priority queue after bursts of
- [x] **Fairness counter** to yield to the low-priority queue after bursts of
high-priority frames ([Design §3.2.1][design-fairness]).
- [ ] **Run state consolidation** using `Option` receivers and a closed source
counter ([Design §3.4][design-actor-state]).
Expand Down
69 changes: 33 additions & 36 deletions docs/contents.md
Original file line number Diff line number Diff line change
@@ -1,61 +1,58 @@
# Documentation contents

This page summarizes each file in the `docs/` directory.
Use it as a quick reference when navigating the project's design and
architecture material.
This page summarizes each file in the `docs/` directory. Use it as a quick
reference when navigating the project's design and architecture material.

## Design documents

- [Outbound messaging design](asynchronous-outbound-messaging-design.md)
- [Outbound messaging design](asynchronous-outbound-messaging-design.md)
Comprehensive design for server-initiated pushes.
- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md)
- [Message fragments](generic-message-fragmentation-and-re-assembly-design.md)
Design for fragmenting and reassembling messages.
- [Streaming response design](multi-packet-and-streaming-responses-design.md)
- [Streaming response design](multi-packet-and-streaming-responses-design.md)
Design for handling multi-packet and streaming responses.
- [Router library design](rust-binary-router-library-design.md)
In-depth design of the binary router library.
- [Client design](wireframe-client-design.md)
Proposal for adding client-side support.
- [Frame metadata](frame-metadata.md)
How frame metadata assists with routing decisions.
- [Message versioning](message-versioning.md)
Approaches to message versioning and compatibility.
- [Preamble validator](preamble-validator.md)
Validating client connection preambles.
- [Router library design](rust-binary-router-library-design.md) In-depth design
of the binary router library.
- [Client design](wireframe-client-design.md) Proposal for adding client-side
support.
- [Frame metadata](frame-metadata.md) How frame metadata assists with routing
decisions.
- [Message versioning](message-versioning.md) Approaches to message versioning
and compatibility.
- [Preamble validator](preamble-validator.md) Validating client connection
preambles.

## Roadmaps

- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md)
Task list for implementing asynchronous outbound messaging.
- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md)
- [Outbound messaging roadmap](asynchronous-outbound-messaging-roadmap.md) Task
list for implementing asynchronous outbound messaging.
- [Wireframe 1.0 roadmap](wireframe-1-0-detailed-development-roadmap.md)
Detailed tasks leading to Wireframe 1.0.
- [Project roadmap](roadmap.md)
High-level development roadmap.
- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md)
- [Project roadmap](roadmap.md) High-level development roadmap.
- [1.0 philosophy](the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md)
Philosophy and feature set for Wireframe 1.0.

## Testing

- [Multi-layered testing strategy](multi-layered-testing-strategy.md)
Overview of the library's testing approach.
- [RSTest fixtures](rust-testing-with-rstest-fixtures.md)
Using `rstest` fixtures for clean tests.
- [Mocking network outages](mocking-network-outages-in-rust.md)
Tutorial for simulating network outages in tests.
- [Testing helpers](wireframe-testing-crate.md)
Planned companion crate with testing helpers.
- [Multi-layered testing strategy](multi-layered-testing-strategy.md) Overview
of the library's testing approach.
- [RSTest fixtures](rust-testing-with-rstest-fixtures.md) Using `rstest`
fixtures for clean tests.
- [Mocking network outages](mocking-network-outages-in-rust.md) Tutorial for
simulating network outages in tests.
- [Testing helpers](wireframe-testing-crate.md) Planned companion crate with
testing helpers.

## Operations and resilience

- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md)
- [Resilience guide](hardening-wireframe-a-guide-to-production-resilience.md)
Guidance on achieving production resilience.
- [Observability and operability](observability-operability-and-maturity.md)
- [Observability and operability](observability-operability-and-maturity.md)
Guide to observability and operational maturity.

## Reference guides

- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md)
- [Refactoring guide](complexity-antipatterns-and-refactoring-strategies.md)
Strategies for taming code complexity and refactoring.
- [Documentation style guide](documentation-style-guide.md)
Conventions for writing project documentation.

- [Documentation style guide](documentation-style-guide.md) Conventions for
writing project documentation.
126 changes: 117 additions & 9 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
//! low-priority ones, with streamed responses handled last.

use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::{
sync::mpsc,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;

use crate::{
Expand All @@ -15,12 +18,37 @@ use crate::{
response::{FrameStream, WireframeError},
};

/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue.
pub max_high_before_low: usize,
/// A zero value disables the counter and relies solely on `time_slice` for
/// fairness, preserving strict high-priority ordering otherwise.
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}
Comment on lines +21 to +32
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Jun 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Rustdoc comment placement for max_high_before_low field.

The documentation about zero value disabling the counter should be part of the field's doc comment, not a separate comment.

Apply this diff to fix the documentation:

 pub struct FairnessConfig {
     /// Number of consecutive high-priority frames to process before
-    /// checking the low-priority queue.
+    /// checking the low-priority queue. A zero value disables the counter
+    /// and relies solely on `time_slice` for fairness, preserving strict
+    /// high-priority ordering otherwise.
     pub max_high_before_low: usize,
-    /// A zero value disables the counter and relies solely on `time_slice` for
-    /// fairness, preserving strict high-priority ordering otherwise.
     /// Optional time slice after which the low-priority queue is checked
     /// if high-priority traffic has been continuous.
     pub time_slice: Option<Duration>,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue.
pub max_high_before_low: usize,
/// A zero value disables the counter and relies solely on `time_slice` for
/// fairness, preserving strict high-priority ordering otherwise.
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}
/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue. A zero value disables the counter
/// and relies solely on `time_slice` for fairness, preserving strict
/// high-priority ordering otherwise.
pub max_high_before_low: usize,
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}
🤖 Prompt for AI Agents
In src/connection.rs between lines 21 and 32, the Rustdoc comment explaining
that a zero value disables the counter and relies solely on time_slice should be
merged into the doc comment for the max_high_before_low field instead of being a
separate comment. Move that explanation directly under the max_high_before_low
field's doc comment to correctly associate it with that field.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GitHub issue, please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

@leynos I'll create a GitHub issue for the Rustdoc comment placement fix identified in the review.


🏁 Script executed:

#!/bin/bash
gh issue create \
  --title "Fix Rustdoc comment placement for FairnessConfig.max_high_before_low field" \
  --body "## Description

The documentation comment for the \`max_high_before_low\` field in the \`FairnessConfig\` struct needs to be restructured. Currently, the explanation about zero value behaviour is placed as a separate comment between the field and the next field, which is not the correct Rustdoc format.

## Current Issue

The documentation explaining that \"A zero value disables the counter and relies solely on \`time_slice\` for fairness, preserving strict high-priority ordering otherwise\" should be merged into the doc comment for the \`max_high_before_low\` field.

## Expected Fix

Move the zero value explanation directly under the \`max_high_before_low\` field's doc comment to correctly associate it with that field.

## References

- PR: https://github.com/leynos/wireframe/pull/147
- Review comment: https://github.com/leynos/wireframe/pull/147#discussion_r2173524894
- Requested by: @leynos

## File Location

\`src/connection.rs\` lines 21-32" \
  --assignee leynos

Length of output: 1124


Perfect! I've successfully created GitHub issue #149 for the Rustdoc comment placement fix.

Issue created: #149

The issue documents the need to fix the documentation structure for the max_high_before_low field in the FairnessConfig struct, moving the zero value explanation into the proper field documentation comment as identified in the code review.

🐰✨
(\ /)
( ..)
o
(")(")


🧬 Code Graph Analysis Results

Relevant snippets from src/hooks.rs (lines 22-27)

fn default() -> Self {
    Self {
        before_send: None,
        on_command_end: None,
    }
}

This snippet shows the default initialization of ProtocolHooks, which is used in ConnectionActor to run hooks before sending frames and when a command ends.


Relevant snippets from tests/connection_actor.rs (lines 18 and 22)

fn queues() -> (PushQueues<u8>, wireframe::push::PushHandle<u8>) { PushQueues::bounded(8, 8) }

This test helper function creates bounded push queues, which are used by ConnectionActor for high- and low-priority frame delivery.

fn shutdown_token() -> CancellationToken { CancellationToken::new() }

This helper function creates a new shutdown token, which is used by ConnectionActor to handle shutdown signaling.


Summary of relevant code from src/connection.rs (lines 1-180)

The file defines a ConnectionActor struct that manages outbound frame delivery using asynchronous streams and queues. It uses a tokio::select! loop with the biased keyword to prioritize high-priority push queues over low-priority ones and streaming responses. The actor supports fairness configuration to balance processing between high- and low-priority frames based on count or elapsed time.

Key components and methods include:

  • FairnessConfig: Configures how many high-priority frames are processed before yielding to low-priority frames, and optionally a time slice for fairness.
  • ConnectionActor::new and with_hooks: Constructors for the actor, optionally with custom protocol hooks.
  • run: Main async method driving the actor until shutdown or all sources are exhausted.
  • poll_sources: Uses tokio::select! to poll shutdown token, high- and low-priority queues, and an optional response stream.
  • process_* methods: Handle frames received from different sources, applying hooks and managing internal state.
  • start_shutdown: Closes queues and drops the response stream on shutdown.
  • handle_push and handle_response: Apply hooks and append frames to the output vector.
  • Fairness logic in after_high, should_yield_to_low_priority, and after_low to balance frame processing.
  • ActorState struct tracks whether push queues and response streams are closed and if shutdown is in progress.

This file is central to managing outbound frame flow with prioritization, fairness, and graceful shutdown.


No other snippets from the provided list appear directly relevant to understanding or addressing the src/connection.rs file or the user comment about creating a new GitHub issue.


impl Default for FairnessConfig {
fn default() -> Self {
Self {
max_high_before_low: 8,
time_slice: None,
}
}
}

/// Actor driving outbound frame delivery for a connection.
pub struct ConnectionActor<F, E> {
queues: PushQueues<F>,
response: Option<FrameStream<F, E>>, // current streaming response
shutdown: CancellationToken,
hooks: ProtocolHooks<F>,
fairness: FairnessConfig,
high_counter: usize,
high_start: Option<Instant>,
}

impl<F, E> ConnectionActor<F, E>
Expand Down Expand Up @@ -50,6 +78,9 @@ where
response,
shutdown,
hooks,
fairness: FairnessConfig::default(),
high_counter: 0,
high_start: None,
}
}

Expand All @@ -60,6 +91,9 @@ where
#[must_use]
pub fn queues_mut(&mut self) -> &mut PushQueues<F> { &mut self.queues }

/// Replace the fairness configuration.
pub fn set_fairness(&mut self, fairness: FairnessConfig) { self.fairness = fairness; }

/// Set or replace the current streaming response.
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) { self.response = stream; }

Expand Down Expand Up @@ -100,29 +134,65 @@ where
biased;

() = Self::wait_shutdown(self.shutdown.clone()), if !state.shutting_down => {
state.shutting_down = true;
self.start_shutdown(&mut state.resp_closed);
self.process_shutdown(state);
}

res = Self::recv_push(&mut self.queues.high_priority_rx), if !state.push.high => {
self.handle_push(res, &mut state.push.high, out);
self.process_high(res, state, out);
}

res = Self::recv_push(&mut self.queues.low_priority_rx), if !state.push.low => {
self.handle_push(res, &mut state.push.low, out);
self.process_low(res, state, out);
}

res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => {
self.handle_response(res, &mut state.resp_closed, out)?;
if state.resp_closed {
self.response = None;
}
self.process_response(res, state, out)?;
}
}

Ok(())
}

fn process_shutdown(&mut self, state: &mut ActorState) {
state.shutting_down = true;
self.start_shutdown(&mut state.resp_closed);
}

fn process_high(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
let processed = res.is_some();
self.handle_push(res, &mut state.push.high, out);
if processed {
self.after_high(out, &mut state.push.low);
} else {
self.reset_high_counter();
}
}

fn process_low(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
let processed = res.is_some();
self.handle_push(res, &mut state.push.low, out);
if processed {
self.after_low();
}
}

fn process_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
let processed = matches!(res, Some(Ok(_)));
self.handle_response(res, &mut state.resp_closed, out)?;
if processed {
self.after_low();
}
if state.resp_closed {
self.response = None;
}
Ok(())
}

fn start_shutdown(&mut self, resp_closed: &mut bool) {
self.queues.high_priority_rx.close();
self.queues.low_priority_rx.close();
Expand All @@ -142,6 +212,44 @@ where
}
}

fn after_high(&mut self, out: &mut Vec<F>, low_closed: &mut bool) {
self.high_counter += 1;
if self.high_counter == 1 {
self.high_start = Some(Instant::now());
}

if self.should_yield_to_low_priority() {
match self.queues.low_priority_rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame);
out.push(frame);
self.after_low();
*low_closed = false;
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => *low_closed = true,
}
}
}

fn should_yield_to_low_priority(&self) -> bool {
let threshold_hit = self.fairness.max_high_before_low > 0
&& self.high_counter >= self.fairness.max_high_before_low;
let time_hit = self
.fairness
.time_slice
.zip(self.high_start)
.is_some_and(|(slice, start)| start.elapsed() >= slice);
threshold_hit || time_hit
}

fn after_low(&mut self) { self.reset_high_counter(); }

fn reset_high_counter(&mut self) {
self.high_counter = 0;
self.high_start = None;
}

fn handle_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
Expand Down
27 changes: 26 additions & 1 deletion tests/connection_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rstest::{fixture, rstest};
use tokio::time::{Duration, sleep, timeout};
use tokio_util::sync::CancellationToken;
use wireframe::{
connection::ConnectionActor,
connection::{ConnectionActor, FairnessConfig},
push::PushQueues,
response::{FrameStream, WireframeError},
};
Expand Down Expand Up @@ -44,6 +44,31 @@ async fn strict_priority_order(
assert_eq!(out, vec![1, 2, 3]);
}

#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): No test for disabling fairness (max_high_before_low = 0, time_slice = None).

Please add a test to verify that when both max_high_before_low = 0 and time_slice = None, all high-priority frames are processed before any low-priority ones, as documented.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GitHub issue, please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue for your comment: #148

max_high_before_low: 2,
time_slice: None,
};

for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);

let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
Comment on lines +47 to +70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Missing test for time_slice-based fairness yielding.

Please add a test that verifies yielding to the low-priority queue when time_slice is set and max_high_before_low is zero, so both fairness mechanisms are tested.

Suggested change
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 2,
time_slice: None,
};
for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);
let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 2,
time_slice: None,
};
for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);
let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
#[rstest]
#[tokio::test]
async fn fairness_yields_low_with_time_slice(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
use std::time::Duration;
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 0,
time_slice: Some(Duration::from_millis(10)),
};
for n in 1..=3 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(42).await.unwrap();
for n in 4..=5 {
handle.push_high_priority(n).await.unwrap();
}
drop(handle);
let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
// The low-priority item should be yielded after the time slice expires, not after a fixed number of high-priority items.
// The exact position may depend on timing, but it should not be at the end.
assert!(out.contains(&42), "Low-priority item was not yielded");
let pos = out.iter().position(|x| *x == 42).unwrap();
assert!(pos > 0 && pos < out.len() - 1, "Low-priority item should be yielded in the middle");
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GitHub issue, please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue for your comment: #150


#[rstest]
#[tokio::test]
async fn shutdown_signal_precedence(
Expand Down