Skip to content

Add fairness config docs and tweak helper#147

Closed
leynos wants to merge 1 commit intomainfrom
7dqmqx-codex/implement-fairness-counter-for-queue-management
Closed

Add fairness config docs and tweak helper#147
leynos wants to merge 1 commit intomainfrom
7dqmqx-codex/implement-fairness-counter-for-queue-management

Conversation

@leynos
Copy link
Copy Markdown
Owner

@leynos leynos commented Jun 28, 2025

Summary

  • document how zero disables fairness threshold
  • move ConnectionActor doc comment and clean up helper
  • default to 8 high-priority frames before yielding to low
  • clean up doc contents index links

Testing

  • make lint
  • make test

https://chatgpt.com/codex/tasks/task_e_685f1ebd24108322b4214a341609e5d3

Summary by Sourcery

Introduce a configurable fairness mechanism for ConnectionActor frame dispatch, refactor actor internals around helper methods, update defaults and documentation, and add a corresponding test case

New Features:

  • Introduce FairnessConfig to control high-priority frame bursts and optional time slicing for low-priority yields
  • Add set_fairness method on ConnectionActor to replace default fairness parameters

Enhancements:

  • Refactor ConnectionActor to use process_* helper methods for shutdown, push, and response handling
  • Change default max_high_before_low from 16 to 8 frames before yielding to low priority
  • Clean up documentation index formatting and update roadmap completion status

Documentation:

  • Document that setting max_high_before_low to zero disables the burst counter and relies solely on time_slice
  • Add sequence diagram illustrating the runtime fairness behavior

Tests:

  • Add a test to verify low-priority frames are yielded after a burst of high-priority frames under the new fairness config

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Jun 28, 2025

Reviewer's Guide

This PR adds a configurable fairness mechanism to the ConnectionActor, refactors its main loop into discrete processing methods, updates documentation to describe and illustrate the new behavior, and includes a new test verifying fairness yields.

Sequence diagram for fairness yielding in ConnectionActor

sequenceDiagram
    participant Client
    participant ConnectionActor
    participant HighQueue
    participant LowQueue

    loop While processing frames
        ConnectionActor->>HighQueue: Poll high-priority frame
        alt High-priority frame received
            ConnectionActor->>ConnectionActor: after_high()
            alt Max high before low or time slice reached
                ConnectionActor->>LowQueue: Try receive low-priority frame
                alt Low-priority frame available
                    ConnectionActor->>ConnectionActor: after_low()
                end
            end
        else No high-priority frame
            ConnectionActor->>ConnectionActor: reset_high_counter()
        end
    end
Loading

Class diagram for updated ConnectionActor fairness logic

classDiagram
    class FairnessConfig {
        +usize max_high_before_low
        +Option<Duration> time_slice
        +default()
    }
    class ConnectionActor {
        -PushQueues queues
        -Option<FrameStream> response
        -CancellationToken shutdown
        -ProtocolHooks hooks
        -FairnessConfig fairness
        -usize high_counter
        -Option<Instant> high_start
        +set_fairness(fairness: FairnessConfig)
        +set_response(stream: Option<FrameStream>)
        -process_shutdown(state)
        -process_high(res, state, out)
        -process_low(res, state, out)
        -process_response(res, state, out)
        -after_high(out, low_closed)
        -should_yield_to_low_priority()
        -after_low()
        -reset_high_counter()
    }
    ConnectionActor --> FairnessConfig
Loading

File-Level Changes

Change Details Files
Introduce and default FairnessConfig in ConnectionActor
  • Define FairnessConfig struct with counter and time_slice
  • Implement Default with max_high_before_low=8
  • Add fairness, high_counter, high_start fields
  • Add set_fairness() method
src/connection.rs
Refactor ConnectionActor loop into process_* methods
  • Extract shutdown handling into process_shutdown
  • Extract high-priority push into process_high
  • Extract low-priority push into process_low
  • Extract response handling into process_response
src/connection.rs
Implement fairness logic around high/low queue draining
  • Add after_high(), should_yield_to_low_priority(), after_low(), reset_high_counter()
  • Invoke after_high in process_high to yield to low when due
  • Reset high counter when no high-priority frames
src/connection.rs
Clean up and extend documentation
  • Normalize link formatting in docs/contents.md
  • Update fairness counter default and disabling semantics
  • Add mermaid sequence diagram illustrating fairness flow
docs/contents.md
docs/asynchronous-outbound-messaging-design.md
Add a test for fairness behavior
  • Create fairness_yields_low_after_burst test using rstest
  • Verify low-priority frame is interleaved after configured burst
tests/connection_actor.rs
Update roadmap to reflect implemented fairness
  • Mark fairness counter item as completed
docs/asynchronous-outbound-messaging-roadmap.md

Possibly linked issues

  • #0: PR implements fairness mechanism to prevent high-priority frame starvation requested in issue.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 28, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a configurable fairness mechanism for processing high- and low-priority outbound messages, allowing customisation of how many high-priority messages are handled before yielding to low-priority ones.
    • Added a new method to adjust fairness settings for outbound messaging.
  • Documentation

    • Updated the design documentation to describe the new fairness logic and added a sequence diagram illustrating the behaviour.
    • Marked the fairness counter feature as complete in the roadmap.
    • Improved formatting in the documentation contents for better readability.
  • Tests

    • Added a test to verify that low-priority messages are processed after a configurable burst of high-priority messages.

Walkthrough

This update introduces a configurable fairness mechanism to the connection actor, balancing the processing of high- and low-priority outbound frames. It adds a FairnessConfig struct, refactors the actor's polling loop, updates documentation, and includes a new test to verify the fairness policy. Documentation and formatting changes accompany the code updates.

Changes

Files/Group Change Summary
src/connection.rs Introduced FairnessConfig, refactored polling loop, added fairness counters and timing logic, new setter method.
tests/connection_actor.rs Added a new async test to verify fairness: high-priority bursts yield to low-priority frames as configured.
docs/asynchronous-outbound-messaging-design.md Updated fairness configuration details, changed threshold default, clarified semantics, and added a sequence diagram.
docs/asynchronous-outbound-messaging-roadmap.md Marked the "Fairness counter" feature as complete in the roadmap checklist.
docs/contents.md Reformatted markdown for layout consistency; no content changes.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ConnectionActor
    participant HighQueue
    participant LowQueue

    loop Main Polling Loop
        ConnectionActor->>HighQueue: Poll for high-priority frame
        alt Frame available
            HighQueue-->>ConnectionActor: High-priority frame
            ConnectionActor->>ConnectionActor: Increment high_counter
            alt Threshold or time slice reached
                ConnectionActor->>LowQueue: Try poll for low-priority frame
                alt Low-priority frame available
                    LowQueue-->>ConnectionActor: Low-priority frame
                    ConnectionActor->>ConnectionActor: Reset high_counter
                else No low-priority frame
                    ConnectionActor->>ConnectionActor: Continue high-priority processing
                end
            else
                ConnectionActor->>ConnectionActor: Continue high-priority processing
            end
        else No high-priority frame
            ConnectionActor->>LowQueue: Poll for low-priority frame
            alt Low-priority frame available
                LowQueue-->>ConnectionActor: Low-priority frame
                ConnectionActor->>ConnectionActor: Reset high_counter
            else No low-priority frame
                ConnectionActor->>ConnectionActor: Wait or process shutdown/response
            end
        end
    end
Loading

Possibly related PRs

Poem

In the warren of code, fairness now reigns,
High and low queues take orderly trains.
No more will the little frames wait at the gate,
For counters and clocks decide their fate.
With tests and diagrams, the story’s complete—
A rabbit’s delight: fairness, robust and neat!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate Unit Tests
  • Create PR with Unit Tests
  • Post Copyable Unit Tests in Comment
  • Commit Unit Tests in branch 7dqmqx-codex/implement-fairness-counter-for-queue-management

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `tests/connection_actor.rs:47` </location>
<code_context>
     assert_eq!(out, vec![1, 2, 3]);
 }

+#[rstest]
+#[tokio::test]
+async fn fairness_yields_low_after_burst(
+    queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
+    shutdown_token: CancellationToken,
+) {
+    let (queues, handle) = queues;
+    let fairness = FairnessConfig {
+        max_high_before_low: 2,
+        time_slice: None,
+    };
+
+    for n in 1..=5 {
+        handle.push_high_priority(n).await.unwrap();
+    }
+    handle.push_low_priority(99).await.unwrap();
+    drop(handle);
+
+    let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
+    actor.set_fairness(fairness);
+    let mut out = Vec::new();
+    actor.run(&mut out).await.unwrap();
+    assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
+}
+
</code_context>

<issue_to_address>
Missing test for time_slice-based fairness yielding.

Please add a test that verifies yielding to the low-priority queue when time_slice is set and max_high_before_low is zero, so both fairness mechanisms are tested.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
    queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
    shutdown_token: CancellationToken,
) {
    let (queues, handle) = queues;
    let fairness = FairnessConfig {
        max_high_before_low: 2,
        time_slice: None,
    };

    for n in 1..=5 {
        handle.push_high_priority(n).await.unwrap();
    }
    handle.push_low_priority(99).await.unwrap();
    drop(handle);

    let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
    actor.set_fairness(fairness);
    let mut out = Vec::new();
    actor.run(&mut out).await.unwrap();
    assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
=======
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
    queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
    shutdown_token: CancellationToken,
) {
    let (queues, handle) = queues;
    let fairness = FairnessConfig {
        max_high_before_low: 2,
        time_slice: None,
    };

    for n in 1..=5 {
        handle.push_high_priority(n).await.unwrap();
    }
    handle.push_low_priority(99).await.unwrap();
    drop(handle);

    let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
    actor.set_fairness(fairness);
    let mut out = Vec::new();
    actor.run(&mut out).await.unwrap();
    assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}

#[rstest]
#[tokio::test]
async fn fairness_yields_low_with_time_slice(
    queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
    shutdown_token: CancellationToken,
) {
    use std::time::Duration;
    let (queues, handle) = queues;
    let fairness = FairnessConfig {
        max_high_before_low: 0,
        time_slice: Some(Duration::from_millis(10)),
    };

    for n in 1..=3 {
        handle.push_high_priority(n).await.unwrap();
    }
    handle.push_low_priority(42).await.unwrap();
    for n in 4..=5 {
        handle.push_high_priority(n).await.unwrap();
    }
    drop(handle);

    let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
    actor.set_fairness(fairness);
    let mut out = Vec::new();
    actor.run(&mut out).await.unwrap();

    // The low-priority item should be yielded after the time slice expires, not after a fixed number of high-priority items.
    // The exact position may depend on timing, but it should not be at the end.
    assert!(out.contains(&42), "Low-priority item was not yielded");
    let pos = out.iter().position(|x| *x == 42).unwrap();
    assert!(pos > 0 && pos < out.len() - 1, "Low-priority item should be yielded in the middle");
}
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `tests/connection_actor.rs:54` </location>
<code_context>
+    shutdown_token: CancellationToken,
+) {
+    let (queues, handle) = queues;
+    let fairness = FairnessConfig {
+        max_high_before_low: 2,
+        time_slice: None,
</code_context>

<issue_to_address>
No test for disabling fairness (max_high_before_low = 0, time_slice = None).

Please add a test to verify that when both max_high_before_low = 0 and time_slice = None, all high-priority frames are processed before any low-priority ones, as documented.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread tests/connection_actor.rs
Comment on lines +47 to +70
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 2,
time_slice: None,
};

for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);

let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Missing test for time_slice-based fairness yielding.

Please add a test that verifies yielding to the low-priority queue when time_slice is set and max_high_before_low is zero, so both fairness mechanisms are tested.

Suggested change
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 2,
time_slice: None,
};
for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);
let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
#[rstest]
#[tokio::test]
async fn fairness_yields_low_after_burst(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 2,
time_slice: None,
};
for n in 1..=5 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(99).await.unwrap();
drop(handle);
let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![1, 2, 99, 3, 4, 5]);
}
#[rstest]
#[tokio::test]
async fn fairness_yields_low_with_time_slice(
queues: (PushQueues<u8>, wireframe::push::PushHandle<u8>),
shutdown_token: CancellationToken,
) {
use std::time::Duration;
let (queues, handle) = queues;
let fairness = FairnessConfig {
max_high_before_low: 0,
time_slice: Some(Duration::from_millis(10)),
};
for n in 1..=3 {
handle.push_high_priority(n).await.unwrap();
}
handle.push_low_priority(42).await.unwrap();
for n in 4..=5 {
handle.push_high_priority(n).await.unwrap();
}
drop(handle);
let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token);
actor.set_fairness(fairness);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
// The low-priority item should be yielded after the time slice expires, not after a fixed number of high-priority items.
// The exact position may depend on timing, but it should not be at the end.
assert!(out.contains(&42), "Low-priority item was not yielded");
let pos = out.iter().position(|x| *x == 42).unwrap();
assert!(pos > 0 && pos < out.len() - 1, "Low-priority item should be yielded in the middle");
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GitHub issue, please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue for your comment: #150

Comment thread tests/connection_actor.rs
shutdown_token: CancellationToken,
) {
let (queues, handle) = queues;
let fairness = FairnessConfig {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): No test for disabling fairness (max_high_before_low = 0, time_slice = None).

Please add a test to verify that when both max_high_before_low = 0 and time_slice = None, all high-priority frames are processed before any low-priority ones, as documented.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GitHub issue, please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue for your comment: #148

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5333cfa and f811d67.

📒 Files selected for processing (5)
  • docs/asynchronous-outbound-messaging-design.md (2 hunks)
  • docs/asynchronous-outbound-messaging-roadmap.md (1 hunks)
  • docs/contents.md (1 hunks)
  • src/connection.rs (6 hunks)
  • tests/connection_actor.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (8)
`docs/**/*.md`: Use the markdown files within the `docs/` directory as a knowled...

docs/**/*.md: Use the markdown files within the docs/ directory as a knowledge base and source of truth for project requirements, dependency choices, and architectural decisions.
Proactively update the relevant file(s) in the docs/ directory to reflect the latest state when new decisions are made, requirements change, libraries are added/removed, or architectural patterns evolve.
Documentation in the docs/ directory must use en-GB-oxendict spelling and grammar, except for the word 'license'.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • docs/contents.md
  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/asynchronous-outbound-messaging-design.md
`**/*.md`: Validate Markdown files using `markdownlint *.md **/*.md`. Run `mdfor...

**/*.md: Validate Markdown files using markdownlint *.md **/*.md.
Run mdformat-all after any documentation changes to format all Markdown files and fix table markup.
Validate Markdown Mermaid diagrams using the nixie CLI by running nixie *.md **/*.md.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown files must be wrapped at 120 columns.
Tables and headings in Markdown files must not be wrapped.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • docs/contents.md
  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/asynchronous-outbound-messaging-design.md
`docs/*`: Each file in the docs/ directory provides design, roadmap, testing, op...

docs/*: Each file in the docs/ directory provides design, roadmap, testing, operations, or reference documentation for the project. Use these files as a reference for understanding the project's architecture, development plans, testing strategies, operational guidelines, and documentation conventions.

📄 Source: CodeRabbit Inference Engine (docs/contents.md)

List of files the instruction was applied to:

  • docs/contents.md
  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/asynchronous-outbound-messaging-design.md
`docs/**/*.md`: Use British English spelling based on the Oxford English Diction...

docs/**/*.md: Use British English spelling based on the Oxford English Dictionary, except retain US spelling in API names (e.g., 'color').
Use the Oxford comma in lists.
Write headings in sentence case and use Markdown heading levels in order without skipping.
Follow markdownlint recommendations for Markdown formatting.
Always use fenced code blocks with a language identifier; use 'plaintext' for non-code text.
Use '-' as the first level bullet and renumber lists when items change.
Prefer inline links using 'text' or angle brackets around the URL.
Expand any uncommon acronym on first use, e.g., Continuous Integration (CI).
Wrap paragraphs at 80 columns, code at 120 columns, and do not wrap tables.
Use footnotes referenced with '[^label]'.
When embedding figures, use 'alt text' and provide concise alt text describing the content.
Add a short description before each Mermaid diagram for screen readers.

📄 Source: CodeRabbit Inference Engine (docs/documentation-style-guide.md)

List of files the instruction was applied to:

  • docs/contents.md
  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/asynchronous-outbound-messaging-design.md
`docs/**/*`: Write official documentation for new features, including separate g...

docs/**/*: Write official documentation for new features, including separate guides for 'Duplex Messaging & Pushes', 'Streaming Responses', and 'Message Fragmentation', each with runnable examples and explanations of relevant concepts and APIs.

📄 Source: CodeRabbit Inference Engine (docs/wireframe-1-0-detailed-development-roadmap.md)

List of files the instruction was applied to:

  • docs/contents.md
  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/asynchronous-outbound-messaging-design.md
`**/*.md`: * Avoid 2nd person or 1st person pronouns ("I", "you", "we") * Use en...

**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")

  • Use en-oxendic spelling and grammar.
  • Paragraphs and bullets must be wrapped to 80 columns, except where a long URL would prevent this (in which case, silence MD013 for that line)
  • Code blocks should be wrapped to 120 columns.
  • Headings must not be wrapped.

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • docs/contents.md
  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/asynchronous-outbound-messaging-design.md
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • tests/connection_actor.rs
  • src/connection.rs
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments must use en-GB-oxendict spelling and grammar.

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • tests/connection_actor.rs
  • src/connection.rs
🔇 Additional comments (5)
docs/contents.md (1)

3-32: Documentation formatting improvements look good.

The inline placement of descriptions with their corresponding links improves readability and follows standard Markdown formatting practices.

src/connection.rs (1)

128-251: Excellent refactoring of the connection actor's polling logic.

The extraction of processing logic into dedicated helper methods (process_shutdown, process_high, process_low, process_response) significantly improves code readability and maintainability. The fairness mechanism is well-implemented with proper counter management and threshold checking.

docs/asynchronous-outbound-messaging-roadmap.md (1)

16-17: Roadmap correctly updated to reflect completed feature.

The fairness counter implementation is complete and the roadmap accurately reflects this status.

docs/asynchronous-outbound-messaging-design.md (1)

149-209: Documentation accurately reflects the fairness implementation.

The updates correctly document:

  • The new default threshold of 8 high-priority frames
  • The semantics of setting the counter to zero
  • A clear sequence diagram showing the runtime behaviour

These changes align perfectly with the implementation in src/connection.rs.

tests/connection_actor.rs (1)

47-71: Well-structured test for the fairness mechanism.

The test effectively verifies that the fairness configuration works as intended:

  • Correctly sets up a low threshold (2) to make the behaviour observable
  • Validates that after processing 2 high-priority frames, the actor yields to process the low-priority frame
  • Confirms the expected output sequence: [1, 2, 99, 3, 4, 5]

Comment thread src/connection.rs
Comment on lines +21 to +32
/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue.
pub max_high_before_low: usize,
/// A zero value disables the counter and relies solely on `time_slice` for
/// fairness, preserving strict high-priority ordering otherwise.
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Jun 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Rustdoc comment placement for max_high_before_low field.

The documentation about zero value disabling the counter should be part of the field's doc comment, not a separate comment.

Apply this diff to fix the documentation:

 pub struct FairnessConfig {
     /// Number of consecutive high-priority frames to process before
-    /// checking the low-priority queue.
+    /// checking the low-priority queue. A zero value disables the counter
+    /// and relies solely on `time_slice` for fairness, preserving strict
+    /// high-priority ordering otherwise.
     pub max_high_before_low: usize,
-    /// A zero value disables the counter and relies solely on `time_slice` for
-    /// fairness, preserving strict high-priority ordering otherwise.
     /// Optional time slice after which the low-priority queue is checked
     /// if high-priority traffic has been continuous.
     pub time_slice: Option<Duration>,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue.
pub max_high_before_low: usize,
/// A zero value disables the counter and relies solely on `time_slice` for
/// fairness, preserving strict high-priority ordering otherwise.
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}
/// Configuration controlling fairness when draining push queues.
#[derive(Clone, Copy)]
pub struct FairnessConfig {
/// Number of consecutive high-priority frames to process before
/// checking the low-priority queue. A zero value disables the counter
/// and relies solely on `time_slice` for fairness, preserving strict
/// high-priority ordering otherwise.
pub max_high_before_low: usize,
/// Optional time slice after which the low-priority queue is checked
/// if high-priority traffic has been continuous.
pub time_slice: Option<Duration>,
}
🤖 Prompt for AI Agents
In src/connection.rs between lines 21 and 32, the Rustdoc comment explaining
that a zero value disables the counter and relies solely on time_slice should be
merged into the doc comment for the max_high_before_low field instead of being a
separate comment. Move that explanation directly under the max_high_before_low
field's doc comment to correctly associate it with that field.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New GitHub issue, please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

@leynos I'll create a GitHub issue for the Rustdoc comment placement fix identified in the review.


🏁 Script executed:

#!/bin/bash
gh issue create \
  --title "Fix Rustdoc comment placement for FairnessConfig.max_high_before_low field" \
  --body "## Description

The documentation comment for the \`max_high_before_low\` field in the \`FairnessConfig\` struct needs to be restructured. Currently, the explanation about zero value behaviour is placed as a separate comment between the field and the next field, which is not the correct Rustdoc format.

## Current Issue

The documentation explaining that \"A zero value disables the counter and relies solely on \`time_slice\` for fairness, preserving strict high-priority ordering otherwise\" should be merged into the doc comment for the \`max_high_before_low\` field.

## Expected Fix

Move the zero value explanation directly under the \`max_high_before_low\` field's doc comment to correctly associate it with that field.

## References

- PR: https://github.com/leynos/wireframe/pull/147
- Review comment: https://github.com/leynos/wireframe/pull/147#discussion_r2173524894
- Requested by: @leynos

## File Location

\`src/connection.rs\` lines 21-32" \
  --assignee leynos

Length of output: 1124


Perfect! I've successfully created GitHub issue #149 for the Rustdoc comment placement fix.

Issue created: #149

The issue documents the need to fix the documentation structure for the max_high_before_low field in the FairnessConfig struct, moving the zero value explanation into the proper field documentation comment as identified in the code review.

🐰✨
(\ /)
( ..)
o
(")(")


🧬 Code Graph Analysis Results

Relevant snippets from src/hooks.rs (lines 22-27)

fn default() -> Self {
    Self {
        before_send: None,
        on_command_end: None,
    }
}

This snippet shows the default initialization of ProtocolHooks, which is used in ConnectionActor to run hooks before sending frames and when a command ends.


Relevant snippets from tests/connection_actor.rs (lines 18 and 22)

fn queues() -> (PushQueues<u8>, wireframe::push::PushHandle<u8>) { PushQueues::bounded(8, 8) }

This test helper function creates bounded push queues, which are used by ConnectionActor for high- and low-priority frame delivery.

fn shutdown_token() -> CancellationToken { CancellationToken::new() }

This helper function creates a new shutdown token, which is used by ConnectionActor to handle shutdown signaling.


Summary of relevant code from src/connection.rs (lines 1-180)

The file defines a ConnectionActor struct that manages outbound frame delivery using asynchronous streams and queues. It uses a tokio::select! loop with the biased keyword to prioritize high-priority push queues over low-priority ones and streaming responses. The actor supports fairness configuration to balance processing between high- and low-priority frames based on count or elapsed time.

Key components and methods include:

  • FairnessConfig: Configures how many high-priority frames are processed before yielding to low-priority frames, and optionally a time slice for fairness.
  • ConnectionActor::new and with_hooks: Constructors for the actor, optionally with custom protocol hooks.
  • run: Main async method driving the actor until shutdown or all sources are exhausted.
  • poll_sources: Uses tokio::select! to poll shutdown token, high- and low-priority queues, and an optional response stream.
  • process_* methods: Handle frames received from different sources, applying hooks and managing internal state.
  • start_shutdown: Closes queues and drops the response stream on shutdown.
  • handle_push and handle_response: Apply hooks and append frames to the output vector.
  • Fairness logic in after_high, should_yield_to_low_priority, and after_low to balance frame processing.
  • ActorState struct tracks whether push queues and response streams are closed and if shutdown is in progress.

This file is central to managing outbound frame flow with prioritization, fairness, and graceful shutdown.


No other snippets from the provided list appear directly relevant to understanding or addressing the src/connection.rs file or the user comment about creating a new GitHub issue.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Jun 28, 2025

Stale

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant