Skip to content

Implement connection actor with prioritised write loop#129

Merged
leynos merged 8 commits intomainfrom
codex/implement-connection-actor-with-biased-select-loop
Jun 25, 2025
Merged

Implement connection actor with prioritised write loop#129
leynos merged 8 commits intomainfrom
codex/implement-connection-actor-with-biased-select-loop

Conversation

@leynos
Copy link
Copy Markdown
Owner

@leynos leynos commented Jun 25, 2025

Summary

  • add ConnectionActor for managing outbound frames
  • expose actor from library
  • add tokio-util for CancellationToken
  • create tests covering push ordering and shutdown behaviour

Testing

  • make lint
  • make test

https://chatgpt.com/codex/tasks/task_e_685b4579902c8322aee724c49f0ed82a

Summary by Sourcery

Implement a new ConnectionActor for driving outbound frames with prioritized push queues and cancellation support, expose it in the library, add the necessary dependency, update documentation, and enhance tests.

New Features:

  • Introduce ConnectionActor to manage prioritized outbound frame delivery with a biased write loop
  • Expose ConnectionActor in the public API

Enhancements:

  • Add close() method to PushQueues and restrict queue receiver visibility
  • Update development roadmaps to reflect implemented connection actor and next phase focus

Build:

  • Add tokio-util dependency for CancellationToken support

Documentation:

  • Mark the connection actor as implemented in the asynchronous outbound messaging roadmap
  • Revise the detailed development roadmap to note the new actor and write loop

Tests:

  • Add comprehensive tests for ConnectionActor covering push ordering, shutdown, error propagation, interleaved cancellation, and backpressure

Implement ConnectionActor using tokio::select! with biased polling.Actor reads high and low priority queues as well as an optional\nresponse stream, terminating on shutdown or channel closure.\nIncluded tests cover shutdown handling and prioritisation of sources.
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Jun 25, 2025

Reviewer's Guide

This PR introduces a new ConnectionActor for prioritized outbound frame delivery using a biased tokio::select loop with high/low priority queues and optional response streams, integrates CancellationToken-based shutdown coordination, refines PushQueues lifecycle, exposes the actor in the public API with updated docs, adds tokio-util dependency, and adds comprehensive unit tests for ordering, shutdown, and error cases.

File-Level Changes

Change Details Files
Implement ConnectionActor with prioritized tokio::select loop and shutdown support
  • Added src/connection.rs defining ConnectionActor, its run loop, poll_sources, and state management
  • Integrated CancellationToken to trigger shutdown and drain or abandon streams
  • Implemented ActorState tracking push queue closures and shutdown progress
  • Defined handle_push and handle_response helpers for uniform frame handling
src/connection.rs
Refine PushQueues visibility and lifecycle
  • Changed high_priority_rx and low_priority_rx fields to pub(crate)
  • Added PushQueues::close() to shut down receivers for tests and cleanup
src/push.rs
tests/push.rs
Expose ConnectionActor in public API and update documentation
  • Added pub mod connection and pub use connection::ConnectionActor in lib.rs
  • Updated detailed-development-roadmap.md and asynchronous-outbound-messaging-roadmap.md to reflect the new actor implementation
src/lib.rs
docs/wireframe-1-0-detailed-development-roadmap.md
docs/asynchronous-outbound-messaging-roadmap.md
Add tokio-util dependency for CancellationToken support
  • Added tokio-util = "0.7" to Cargo.toml
Cargo.toml
Add comprehensive tests for ConnectionActor behavior
  • Created tests/connection_actor.rs covering priority order, shutdown precedence, stream completion, error propagation, interleaved cancellation, and backpressure
  • Extended push queue tests to validate close behavior
tests/connection_actor.rs
tests/push.rs

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 25, 2025

Warning

Rate limit exceeded

@leynos has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 1 minutes and 47 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 50b5062 and 8dd96c4.

📒 Files selected for processing (4)
  • src/connection.rs (1 hunks)
  • src/push.rs (2 hunks)
  • tests/connection_actor.rs (1 hunks)
  • tests/push.rs (1 hunks)

Summary by CodeRabbit

  • New Features

    • Introduced a new ConnectionActor component for managing prioritised outbound frame delivery.
  • Documentation

    • Updated roadmap documents to reflect the completion of the ConnectionActor and revised implementation status descriptions.
    • Improved visual separation in task lists for better readability.
  • Tests

    • Added comprehensive tests for the ConnectionActor, covering prioritisation, shutdown behaviour, error propagation, and backpressure scenarios.
  • Chores

    • Updated dependencies to include the latest tokio-util crate.

Summary by CodeRabbit

  • New Features

    • Introduced a new ConnectionActor component for managing prioritised outbound frame delivery.
    • Made the ConnectionActor directly accessible from the main library interface.
  • Tests

    • Added comprehensive asynchronous tests for ConnectionActor, covering prioritisation, shutdown behaviour, error propagation, source exhaustion, and backpressure.
  • Documentation

    • Improved roadmap readability with additional spacing between project phases.
    • Updated roadmap status to mark the "Connection actor" task as completed and clarified Phase 1 progress.
  • Chores

    • Updated dependencies to include the latest tokio-util crate.

Walkthrough

This update introduces a new ConnectionActor struct to manage outbound frame delivery with prioritised queues and shutdown handling. The actor is exposed via the crate root, and a dedicated test suite is added to verify its behaviour. Documentation and dependency updates are also included for improved clarity and support.

Changes

File(s) Change Summary
Cargo.toml Added tokio-util dependency (v0.7).
docs/wireframe-1-0-detailed-development-roadmap.md Updated Phase 1 introduction text; inserted blank lines between task phases for readability.
docs/asynchronous-outbound-messaging-roadmap.md Marked "Connection actor" task as completed.
src/connection.rs Added ConnectionActor struct with prioritised queue processing, response stream handling, and shutdown support.
src/lib.rs Added connection module and re-exported ConnectionActor at crate root.
tests/connection_actor.rs Added async tests for ConnectionActor, covering prioritisation, shutdown, error propagation, source exhaustion, and backpressure.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ConnectionActor
    participant PushQueues
    participant ResponseStream
    participant ShutdownToken

    loop While active
        alt High-priority frame available
            ConnectionActor ->> PushQueues: Poll high-priority queue
            PushQueues -->> ConnectionActor: Frame
            ConnectionActor ->> Client: Output frame
        else Low-priority frame available
            ConnectionActor ->> PushQueues: Poll low-priority queue
            PushQueues -->> ConnectionActor: Frame
            ConnectionActor ->> Client: Output frame
        else Response stream frame available
            ConnectionActor ->> ResponseStream: Poll next frame
            ResponseStream -->> ConnectionActor: Frame or Error
            ConnectionActor ->> Client: Output frame or return error
        else Shutdown signalled
            ShutdownToken -->> ConnectionActor: Cancel
            ConnectionActor ->> Client: Exit
        end
    end
Loading

Possibly related PRs

Poem

A rabbit hops through code anew,
With queues and streams and shutdown too.
High-priority frames take the lead,
While tests ensure the spec’s decreed.
A crate now richer, strong, and neat—
This actor’s run is quite a feat!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate Unit Tests
  • Create PR with Unit Tests
  • Post Copyable Unit Tests in Comment
  • Commit Unit Tests in branch codex/implement-connection-actor-with-biased-select-loop

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes - here's some feedback:

  • Consider whether breaking immediately on shutdown might drop queued frames—if you want a graceful shutdown you may need to drain high- and low-priority queues before exiting.
  • The biased select gives strict priority to high then low over the response stream, which could starve streaming frames under load—verify that this ordering matches your throughput goals.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider whether breaking immediately on shutdown might drop queued frames—if you want a graceful shutdown you may need to drain high- and low-priority queues before exiting.
- The `biased` select gives strict priority to high then low over the response stream, which could starve streaming frames under load—verify that this ordering matches your throughput goals.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Copy Markdown

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce critical code health rules (1 file with Bumpy Road Ahead)
Enforce advisory code health rules (1 file with Complex Method, Complex Conditional)

Gates Passed
4 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce critical code health rules Violations Code Health Impact
connection.rs 1 critical rule 10.00 → 9.24 Suppress
Enforce advisory code health rules Violations Code Health Impact
connection.rs 2 advisory rules 10.00 → 9.24 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2c98d64 and 10398f8.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • Cargo.toml (1 hunks)
  • docs/wireframe-1-0-detailed-development-roadmap.md (2 hunks)
  • src/connection.rs (1 hunks)
  • src/lib.rs (2 hunks)
  • tests/connection_actor.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
`Cargo.toml`: Use explicit version ranges in `Cargo.toml` and keep dependencies up-to-date.

Cargo.toml: Use explicit version ranges in Cargo.toml and keep dependencies up-to-date.

  • Cargo.toml
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment explaining the module's purpose and utility. Document public Rust APIs using Rustdoc comments (`///`) so...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

  • src/lib.rs
  • src/connection.rs
  • tests/connection_actor.rs
`docs/**/*.md`: Use the markdown files within the `docs/` directory as a knowledge base and source of truth for project requirements, dependency choices, and architectural decision...

docs/**/*.md: Use the markdown files within the docs/ directory as a knowledge base and source of truth for project requirements, dependency choices, and architectural decisions.
Proactively update the relevant file(s) in the docs/ directory to reflect the latest state when new decisions are made, requirements change, libraries are added/removed, or architectural patterns evolve.
Documentation in the docs/ directory must use en-GB-oxendict spelling and grammar, except for the word 'license'.

  • docs/wireframe-1-0-detailed-development-roadmap.md
`**/*.md`: Validate Markdown files using `markdownlint *.md **/*.md`. Run `mdformat-all` after any documentation changes to format all Markdown files and fix table markup. Validate...

**/*.md: Validate Markdown files using markdownlint *.md **/*.md.
Run mdformat-all after any documentation changes to format all Markdown files and fix table markup.
Validate Markdown Mermaid diagrams using the nixie CLI by running nixie *.md **/*.md.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown files must be wrapped at 120 columns.
Tables and headings in Markdown files must not be wrapped.

  • docs/wireframe-1-0-detailed-development-roadmap.md
`docs/**/wireframe-1-0-detailed-development-roadmap.md`: Track detailed development roadmap items, including unified response types, trait and builder integration, and benchmarks.

docs/**/wireframe-1-0-detailed-development-roadmap.md: Track detailed development roadmap items, including unified response types, trait and builder integration, and benchmarks.

  • docs/wireframe-1-0-detailed-development-roadmap.md
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build-test
🔇 Additional comments (12)
docs/wireframe-1-0-detailed-development-roadmap.md (2)

29-29: Good improvement to document structure.

The blank line addition improves visual separation between phases, enhancing readability of the roadmap document.


42-42: Consistent formatting improvement.

The additional blank line maintains consistent visual separation between all phases in the document.

src/lib.rs (2)

10-10: Correct module declaration.

The connection module is properly declared as public, making it accessible to users of the crate.


21-21: Appropriate re-export for public API.

Re-exporting ConnectionActor at the crate root provides convenient access to this key component without requiring users to navigate the module hierarchy.

src/connection.rs (4)

1-7: Excellent module documentation.

The module-level documentation clearly explains the purpose and implementation details, including the biased select! behaviour. This follows the coding guidelines for Rust modules.


16-21: Well-structured actor definition.

The ConnectionActor struct appropriately encapsulates the necessary components for managing outbound frames with clear ownership semantics.


28-39: Proper constructor implementation.

The constructor correctly applies the #[must_use] attribute and follows standard Rust patterns for struct initialisation.


48-96: Robust implementation of the actor's main loop.

The run method implementation demonstrates several excellent practices:

  • Uses biased select! to ensure proper prioritisation of high-priority frames
  • Properly tracks closure state of all sources
  • Handles errors from the response stream appropriately
  • Terminates correctly when shutdown is signalled or all sources are exhausted

The async block for the response stream handling (lines 73-81) is a clever way to conditionally participate in the select! macro.

tests/connection_actor.rs (4)

6-12: Good use of fixtures for test setup.

The fixtures provide clean, reusable setup for the tests. The #[allow(unused_braces)] attribute addresses clippy warnings appropriately.


14-31: Comprehensive priority verification test.

This test effectively verifies the biased select! behaviour by ensuring high-priority frames (1) are processed before low-priority frames (2), and both are processed before streamed frames (3). The test setup and assertions are clear and correct.


33-45: Proper shutdown behaviour verification.

The test correctly verifies that the actor terminates immediately when the shutdown token is cancelled, without processing any frames. This ensures the shutdown signal takes precedence over all other operations.


47-63: Thorough source exhaustion test.

This test verifies that the actor continues processing until all sources are closed, which is critical for ensuring no frames are lost during normal operation. The test correctly drops the handle to close the queues and uses a finite stream.

Comment thread Cargo.toml
Copy link
Copy Markdown

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce critical code health rules (1 file with Bumpy Road Ahead)
Enforce advisory code health rules (1 file with Complex Method, Complex Conditional)

Gates Passed
4 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce critical code health rules Violations Code Health Impact
connection.rs 1 critical rule 10.00 → 9.24 Suppress
Enforce advisory code health rules Violations Code Health Impact
connection.rs 2 advisory rules 10.00 → 9.24 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 10398f8 and cd61a43.

📒 Files selected for processing (1)
  • tests/connection_actor.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • tests/connection_actor.rs
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build-test
  • GitHub Check: Analyze (actions)
🔇 Additional comments (5)
tests/connection_actor.rs (5)

23-40: Excellent test coverage for priority ordering.

The test correctly verifies that high-priority frames are processed before low-priority frames, followed by response stream frames. The test setup and assertions are clear and comprehensive.


42-54: Good test for shutdown signal precedence.

The test properly verifies that when a shutdown token is cancelled before the actor runs, no frames are processed. This ensures the shutdown signal takes precedence over all other operations.


56-72: Comprehensive test for complete source draining.

The test effectively verifies that all sources (push queues and response streams) are completely drained before the actor terminates. This ensures no frames are lost during normal operation.


74-100: Robust error propagation testing.

The test properly verifies that errors from the response stream are correctly propagated whilst ensuring that frames processed before the error are still delivered. The custom TestError type and error matching are well implemented.


102-128: Effective test for interleaved shutdown behaviour.

The test cleverly uses a spawned task to trigger shutdown during stream processing, verifying that the actor respects shutdown signals during operation. The timing-based approach is appropriate for this scenario.

Comment thread tests/connection_actor.rs
Comment thread tests/connection_actor.rs
Comment thread tests/connection_actor.rs
Comment thread tests/connection_actor.rs
Comment thread tests/connection_actor.rs
Copy link
Copy Markdown

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce critical code health rules (1 file with Bumpy Road Ahead)
Enforce advisory code health rules (1 file with Complex Method, Complex Conditional)

Gates Passed
4 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce critical code health rules Violations Code Health Impact
connection.rs 1 critical rule 10.00 → 9.10 Suppress
Enforce advisory code health rules Violations Code Health Impact
connection.rs 2 advisory rules 10.00 → 9.10 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/connection.rs (1)

90-104: Simplify the response stream handling logic.

The async block with nested conditions is unnecessarily complex. Consider refactoring for better readability.

-                res = async {
-                    if resp_closed {
-                        None
-                    } else if let Some(stream) = &mut self.response {
-                        stream.next().await
-                    } else {
-                        None
-                    }
-                }, if !shutting_down => {
+                res = async {
+                    match &mut self.response {
+                        Some(stream) if !resp_closed => stream.next().await,
+                        _ => None,
+                    }
+                }, if !shutting_down && !resp_closed => {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd61a43 and 8e0dd0a.

📒 Files selected for processing (1)
  • src/connection.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • src/connection.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build-test
🔇 Additional comments (3)
src/connection.rs (3)

1-7: Module documentation looks good!

The module-level documentation clearly explains the actor's purpose and prioritisation strategy.


27-39: Constructor implementation follows best practices.

The #[must_use] attribute is correctly placed after the doc comment, and the method provides a clean interface for creating the actor.


100-102: Consider documenting error handling behaviour.

When the response stream yields an error, the method returns immediately without processing any remaining queued frames. This might be intentional, but it's worth documenting this behaviour explicitly.

Is this the intended behaviour? If so, consider adding a note to the method documentation clarifying that response stream errors cause immediate termination without draining queues.

Comment thread src/connection.rs Outdated
Copy link
Copy Markdown

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (1 file with Complex Method)

Gates Passed
5 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
connection.rs 1 advisory rule 10.00 → 9.64 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/connection.rs (1)

48-97: Address previous review feedback on shutdown documentation.

The complex run method (cyclomatic complexity 13) manages multiple concerns effectively, but there's an outstanding request to improve documentation about shutdown behavior for response streams. The current comment at line 102 could be expanded as previously suggested.

Consider the past review suggestion:

// Drop any streaming response so shutdown is prompt.
+ // Note: This means streamed responses are not guaranteed to complete during shutdown,
+ // while queued frames will still be drained.

Additionally, consider extracting some of the state management logic into helper methods to reduce complexity:

+    fn is_done(&self, high_closed: bool, low_closed: bool, resp_closed: bool, shutting_down: bool) -> bool {
+        let push_drained = high_closed && low_closed;
+        push_drained && (resp_closed || shutting_down)
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e0dd0a and 5a96ae4.

📒 Files selected for processing (3)
  • docs/asynchronous-outbound-messaging-roadmap.md (1 hunks)
  • docs/wireframe-1-0-detailed-development-roadmap.md (3 hunks)
  • src/connection.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
`docs/**/*.md`: Use the markdown files within the `docs/` directory as a knowled...

docs/**/*.md: Use the markdown files within the docs/ directory as a knowledge base and source of truth for project requirements, dependency choices, and architectural decisions.
Proactively update the relevant file(s) in the docs/ directory to reflect the latest state when new decisions are made, requirements change, libraries are added/removed, or architectural patterns evolve.
Documentation in the docs/ directory must use en-GB-oxendict spelling and grammar, except for the word 'license'.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/wireframe-1-0-detailed-development-roadmap.md
`**/*.md`: Validate Markdown files using `markdownlint *.md **/*.md`. Run `mdfor...

**/*.md: Validate Markdown files using markdownlint *.md **/*.md.
Run mdformat-all after any documentation changes to format all Markdown files and fix table markup.
Validate Markdown Mermaid diagrams using the nixie CLI by running nixie *.md **/*.md.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown files must be wrapped at 120 columns.
Tables and headings in Markdown files must not be wrapped.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
  • docs/wireframe-1-0-detailed-development-roadmap.md
`docs/**/wireframe-1-0-detailed-development-roadmap.md`: Track detailed development roadmap items, including unified response types, trait and builder integration, and benchmarks.

docs/**/wireframe-1-0-detailed-development-roadmap.md: Track detailed development roadmap items, including unified response types, trait and builder integration, and benchmarks.

📄 Source: CodeRabbit Inference Engine (docs/asynchronous-outbound-messaging-roadmap.md)

List of files the instruction was applied to:

  • docs/wireframe-1-0-detailed-development-roadmap.md
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • src/connection.rs
🔇 Additional comments (8)
src/connection.rs (4)

1-7: Excellent module documentation.

The module-level documentation clearly explains the purpose and key implementation details, including the biased tokio::select! loop and priority handling. This meets the coding guidelines requirement.


16-39: Well-designed struct with proper constructor.

The ConnectionActor struct design is sound with appropriate generics, public field access, and a properly documented constructor with the #[must_use] attribute.


99-126: Well-structured helper methods.

The helper methods provide good separation of concerns with clear, focused responsibilities. The start_shutdown method clearly documents the shutdown sequence.


8-14: Clean imports and module structure.

The imports are appropriate, including the new tokio_util::sync::CancellationToken dependency mentioned in the PR objectives. The module structure is well-organized.

docs/asynchronous-outbound-messaging-roadmap.md (1)

13-15: Accurate roadmap update.

The task completion marking correctly reflects the implemented ConnectionActor with its biased select! loop functionality. The description aligns well with the actual implementation.

docs/wireframe-1-0-detailed-development-roadmap.md (3)

12-15: Accurate status update reflecting implementation progress.

The updated introduction correctly reflects that the foundational components (push queue utilities and connection actor) are now implemented, appropriately shifting Phase 1's focus to integration and API readiness.


29-29: Good visual separation between phases.

The added blank line improves document readability by clearly separating the task phases.


42-42: Consistent formatting improvement.

The blank line maintains consistent visual separation between phases throughout the document.

Comment thread src/connection.rs Outdated
Copy link
Copy Markdown

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (1 file with Complex Method)

Gates Passed
5 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
connection.rs 1 advisory rule 10.00 → 9.69 Suppress

Quality Gate Profile: Pay Down Tech Debt
Want more control? Customize Code Health rules or catch issues early with our IDE extension and CLI tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
src/connection.rs (2)

49-55: Early exit behaviour is well-documented.

The detailed comments explain the implications of returning immediately when shutdown is already cancelled, addressing the design choice clearly for future maintainers.


101-104: Shutdown behaviour documentation has been improved.

The comment clearly explains that streaming responses may be truncated while queued frames are still drained, which addresses the previous concern about documenting this design choice.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5a96ae4 and 30a7b43.

📒 Files selected for processing (1)
  • src/connection.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • src/connection.rs
🧬 Code Graph Analysis (1)
src/connection.rs (2)
tests/connection_actor.rs (1)
  • queues (13-13)
src/push.rs (1)
  • recv (150-156)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build-test
  • GitHub Check: Analyze (actions)
🔇 Additional comments (4)
src/connection.rs (4)

1-7: Module documentation follows guidelines well.

The module-level documentation clearly explains the purpose and utility of the connection actor, including implementation details about the biased select loop and prioritisation behaviour.


16-21: Well-documented public struct with appropriate field visibility.

The struct is properly documented and exposes its fields as public, which aligns with the actor pattern where external code needs access to configure the queues and response stream.


27-39: Constructor follows Rust conventions properly.

The #[must_use] attribute is appropriate for a constructor, and the documentation clearly describes the method's purpose.


118-137: Helper methods are well-implemented.

The handle_push and handle_response methods properly separate concerns and handle the different result types appropriately. The error propagation in handle_response is correct.

Comment thread src/connection.rs
Comment thread src/connection.rs Outdated
Extract tokio::select! logic into a helper and consolidate
state into a dedicated struct. This lowers cyclomatic
complexity and clarifies shutdown behaviour.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30a7b43 and 50b5062.

📒 Files selected for processing (1)
  • src/connection.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • src/connection.rs
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build-test
  • GitHub Check: Analyze (actions)
🔇 Additional comments (1)
src/connection.rs (1)

41-158: Well-structured actor implementation with proper state management.

The refactored implementation successfully addresses the complexity concerns raised in previous reviews by:

  • Extracting poll_sources as a helper method
  • Using ActorState struct instead of multiple boolean parameters
  • Clearly documenting the shutdown behaviour for response streams
  • Properly handling prioritization with the biased select!

The code is clean, maintainable, and follows Rust best practices.

Comment thread src/connection.rs Outdated
@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Jun 25, 2025

@coderabbitai review

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Jun 25, 2025

@sourcery-ai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 25, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes - here's some feedback:

  • The manual tracking of push and response closure flags in ActorState is verbose; consider consolidating state management (e.g., counting closed channels) for clearer state transitions.
  • The tokio::select! loop in poll_sources is fairly large—extracting individual branches into helper async functions could improve readability and maintainability.
  • Right now high‐priority frames can starve low‐priority ones under constant load—consider a configurable fairness mechanism or occasional yield to the low‐priority branch.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The manual tracking of push and response closure flags in `ActorState` is verbose; consider consolidating state management (e.g., counting closed channels) for clearer state transitions.
- The `tokio::select!` loop in `poll_sources` is fairly large—extracting individual branches into helper async functions could improve readability and maintainability.
- Right now high‐priority frames can starve low‐priority ones under constant load—consider a configurable fairness mechanism or occasional yield to the low‐priority branch.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Jun 25, 2025

@coderabbitai new GitHub issues, please:

  • The manual tracking of push and response closure flags in ActorState is verbose; consider consolidating state management (e.g., counting closed channels) for clearer state transitions.

  • The tokio::select! loop in poll_sources is fairly large—extracting individual branches into helper async functions could improve readability and maintainability.

  • Right now high‐priority frames can starve low‐priority ones under constant load—consider a configurable fairness mechanism or occasional yield to the low‐priority branch.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 25, 2025

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

@leynos I'll create three GitHub issues for the improvements you've identified. Let me generate them now:


🏁 Script executed:

#!/bin/bash

# Create first issue about state management consolidation
gh issue create \
  --title "Consolidate ActorState management in ConnectionActor" \
  --body "The manual tracking of push and response closure flags in ActorState is verbose and could be improved for clearer state transitions. Consider consolidating state management, such as counting closed channels, rather than tracking individual boolean flags.

**Context:**
This improvement was identified during review of the ConnectionActor implementation.

**Reference:**
- PR: https://github.com/leynos/wireframe/pull/129
- Comment: https://github.com/leynos/wireframe/pull/129#issuecomment-$(date +%s)

**Requested by:** @leynos" \
  --assignee leynos

# Create second issue about code structure
gh issue create \
  --title "Refactor large tokio::select! loop in ConnectionActor::poll_sources" \
  --body "The tokio::select! loop in the poll_sources method is fairly large and could benefit from improved readability and maintainability. Consider extracting individual branches into helper async functions to reduce complexity and improve code organization.

**Context:**
This improvement was identified during review of the ConnectionActor implementation.

**Reference:**
- PR: https://github.com/leynos/wireframe/pull/129
- Comment: https://github.com/leynos/wireframe/pull/129#issuecomment-$(date +%s)

**Requested by:** @leynos" \
  --assignee leynos

# Create third issue about fairness mechanism
gh issue create \
  --title "Implement fairness mechanism to prevent high-priority frame starvation in ConnectionActor" \
  --body "Currently, high-priority frames can starve low-priority ones under constant load due to the biased select! loop. Consider implementing a configurable fairness mechanism or occasional yield to the low-priority branch to ensure balanced processing.

**Context:**
This improvement was identified during review of the ConnectionActor implementation.

**Reference:**
- PR: https://github.com/leynos/wireframe/pull/129
- Comment: https://github.com/leynos/wireframe/pull/129#issuecomment-$(date +%s)

**Requested by:** @leynos" \
  --assignee leynos

echo "Created three GitHub issues for ConnectionActor improvements"

Length of output: 2256


Perfect! I've successfully created three GitHub issues to track the ConnectionActor improvements you identified:

Created Issues:

  1. Issue Consolidate ActorState management in ConnectionActor #130: Consolidate ActorState management in ConnectionActor

    • Addresses the verbose manual tracking of closure flags
  2. Issue Refactor large tokio::select! loop in ConnectionActor::poll_sources #131: Refactor large tokio::select! loop in ConnectionActor::poll_sources

    • Focuses on improving code structure and readability
  3. Issue Implement fairness mechanism to prevent high-priority frame starvation in ConnectionActor #132: Implement fairness mechanism to prevent high-priority frame starvation in ConnectionActor

    • Tackles the potential starvation problem under high load

All issues have been assigned to you and include proper context and references back to this PR and discussion. These can now be tracked and addressed in future development iterations.

🐰✨
Thoughtfully documenting improvements for future enhancement

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant