Skip to content

Implement run state consolidation in connection actor#146

Merged
leynos merged 4 commits intomainfrom
codex/implement-run-state-consolidation-with-tests
Jun 28, 2025
Merged

Implement run state consolidation in connection actor#146
leynos merged 4 commits intomainfrom
codex/implement-run-state-consolidation-with-tests

Conversation

@leynos
Copy link
Copy Markdown
Owner

@leynos leynos commented Jun 28, 2025

Summary

  • consolidate actor run state using Option receivers and a closed source counter
  • update roadmap to mark run state consolidation as complete

Testing

  • make lint
  • make test

https://chatgpt.com/codex/tasks/task_e_6860120c72908322ba068e3d0b8b4f8d

Summary by Sourcery

Implement run state consolidation in ConnectionActor by replacing queue-based state with optional receivers and a unified run-state counter, refactoring shutdown and polling logic accordingly and updating the roadmap documentation.

Bug Fixes:

  • Fix initial run-state calculation by inverting response presence check in ActorState

Enhancements:

  • Consolidate ConnectionActor run state by replacing PushQueues with optional high/low receivers and a closed-source counter
  • Refactor actor shutdown and frame processing logic to use unified RunState enum
  • Simplify selective polling logic to conditionally await on Option receivers

Documentation:

  • Mark run state consolidation as complete in the asynchronous outbound messaging roadmap

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Jun 28, 2025

Reviewer's Guide

This PR refactors the ConnectionActor to consolidate its run state by swapping out a PushQueues struct for optional mpsc receivers, replacing multiple boolean flags with a RunState enum and source counters, and updating the polling and shutdown logic to operate directly on these Option receivers; it also marks the consolidation milestone as complete in the roadmap.

Class diagram for consolidated run state in ConnectionActor

classDiagram
    class ConnectionActor {
        - Option<mpsc::Receiver<F>> high_rx
        - Option<mpsc::Receiver<F>> low_rx
        - Option<FrameStream<F, E>> response
        - CancellationToken shutdown
        - ProtocolHooks<F> hooks
        - FairnessConfig fairness
        - usize high_counter
        - Option<Instant> high_start
        + set_fairness(fairness: FairnessConfig)
        + poll_sources(state: &mut ActorState, out: &mut Vec<F>)
        + process_shutdown(state: &mut ActorState)
        + process_high(res: Option<F>, state: &mut ActorState, out: &mut Vec<F>)
        + process_low(res: Option<F>, state: &mut ActorState, out: &mut Vec<F>)
        + process_response(res: Option<Result<F, WireframeError<E>>>, state: &mut ActorState, out: &mut Vec<F>)
        + start_shutdown(state: &mut ActorState)
        + after_high(out: &mut Vec<F>, state: &mut ActorState)
        + after_low()
        + handle_response(res: Option<Result<F, WireframeError<E>>>, state: &mut ActorState, out: &mut Vec<F>)
    }

    class ActorState {
        - RunState run_state
        - usize closed_sources
        - usize total_sources
        + mark_closed()
        + start_shutdown()
        + is_active() bool
        + is_shutting_down() bool
        + is_done() bool
    }

    class RunState {
        <<enum>>
        Active
        ShuttingDown
        Finished
    }

    ConnectionActor --> ActorState
    ActorState --> RunState
Loading

Class diagram for removed PushQueues and PushClosed structures

classDiagram
    class PushQueues {
        - mpsc::Receiver<F> high_priority_rx
        - mpsc::Receiver<F> low_priority_rx
    }
    class PushClosed {
        - bool high
        - bool low
    }
    %% These classes were removed from ConnectionActor and ActorState
Loading

File-Level Changes

Change Details Files
Replace PushQueues with optional receivers in ConnectionActor
  • Replaced queues field with high_rx/low_rx as Options
  • Updated constructor to extract and store queues.high_priority_rx and queues.low_priority_rx
  • Removed queues_mut method used only in tests
src/connection.rs
Consolidate actor run state into RunState enum with counters
  • Introduced RunState enum (Active, ShuttingDown, Finished)
  • Replaced multiple booleans with closed_sources and total_sources counters
  • Added ActorState methods: mark_closed, start_shutdown, is_active, is_shutting_down, is_done
src/connection.rs
Refactor polling, shutdown, and frame processing to use Option receivers
  • Rewrote tokio::select to await on Option receivers with guard conditions
  • Unified start_shutdown to close Option receivers and drop response
  • Updated process_high/low/process_response to unwrap Option, call before_send, mark closed on None
src/connection.rs
Update roadmap to complete run state consolidation
  • Changed unchecked checkbox to checked for run state consolidation
docs/asynchronous-outbound-messaging-roadmap.md

Possibly linked issues

  • #0: The PR consolidates actor run state using Option receivers and a closed source counter, improving state management as suggested.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 28, 2025

Summary by CodeRabbit

  • Refactor

    • Improved handling and tracking of multiple message sources, resulting in clearer shutdown behaviour and more reliable message processing.
    • Simplified and unified the logic for managing high- and low-priority message queues.
  • Documentation

    • Updated roadmap document to reflect progress on a completed checklist item.

Summary by CodeRabbit

  • Documentation

    • Updated the roadmap to reflect completion of the "Run state consolidation using Option receivers and a closed source counter" task.
  • Refactor

    • Improved internal management of connection state and shutdown handling for more robust and streamlined processing of high- and low-priority message queues. No changes to user-facing functionality.

Walkthrough

The ConnectionActor struct was refactored to manage high- and low-priority queues as separate optional receivers, replacing the previous unified queue structure. State management was consolidated into a new ActorState struct using an enum for run states and a closed-source counter. Shutdown and polling logic were updated for clarity and correctness.

Changes

File(s) Change Summary
docs/asynchronous-outbound-messaging-roadmap.md Checklist item for state consolidation marked as complete.
src/connection.rs Refactored ConnectionActor to use separate optional receivers for queues; introduced ActorState with RunState enum; updated run loop, shutdown, and fairness logic; removed queues_mut method; updated constructor and internal methods.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ConnectionActor
    participant HighPriorityQueue
    participant LowPriorityQueue
    participant ResponseStream

    loop Actor Poll Loop
        alt High-priority frame available
            HighPriorityQueue->>ConnectionActor: Send frame
            ConnectionActor->>ConnectionActor: Process frame
        end
        alt Low-priority frame available
            LowPriorityQueue->>ConnectionActor: Send frame
            ConnectionActor->>ConnectionActor: Process frame
        end
        alt Response available
            ResponseStream->>ConnectionActor: Send response
            ConnectionActor->>ConnectionActor: Process response
        end
        alt All sources closed
            ConnectionActor->>ConnectionActor: Mark as Finished
        end
    end
Loading

Possibly related issues

Possibly related PRs

Suggested reviewers

  • codescene-delta-analysis

Poem

In the warren of code, where actors hop,
Queues once tangled now cleanly drop.
High and low, each with their door,
Counted closed—no less, no more!
With states unified, the shutdown’s neat,
This rabbit finds the refactor sweet. 🐇


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 280f719 and 4e99a92.

📒 Files selected for processing (1)
  • src/connection.rs (8 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • src/connection.rs
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments must use en-GB-oxendict spelling and grammar.

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • src/connection.rs
🧬 Code Graph Analysis (1)
src/connection.rs (1)
tests/connection_actor.rs (1)
  • queues (18-18)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build-test
  • GitHub Check: Analyze (actions)
🔇 Additional comments (6)
src/connection.rs (6)

44-55: Good addition of comprehensive rustdoc examples.

The examples clearly demonstrate the basic usage pattern and follow the coding guidelines for documentation.


57-58: Well-designed refactoring to separate queue receivers.

The change from a single PushQueues<F> to separate optional receivers provides better granularity for tracking queue lifecycles and aligns with the run state consolidation objective.


102-103: Correct extraction of receivers from PushQueues.

The implementation properly extracts and wraps the receivers, maintaining backward compatibility with the public API whilst supporting the new internal structure.


147-197: Excellent implementation of the consolidated polling logic.

The method effectively consolidates source polling with proper guards and clear error messages. The additional shutdown check for response polling (line 194) correctly prevents unnecessary work after shutdown has begun, addressing the concern raised in previous reviews.


354-419: Clean and well-documented state machine implementation.

The RunState enum and ActorState struct provide excellent encapsulation of the actor's lifecycle. The rustdoc comments have been added as requested in previous reviews, and the explanation of the shutdown token's initial state (lines 388-391) is particularly helpful for understanding the closed source counting logic.


252-263: Robust shutdown implementation.

The method correctly closes all receivers and tracks the response stream closure in the state. This ensures clean termination of all sources during shutdown.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate Unit Tests
  • Create PR with Unit Tests
  • Post Copyable Unit Tests in Comment
  • Commit Unit Tests in branch codex/implement-run-state-consolidation-with-tests

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @leynos - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `src/connection.rs:159` </location>
<code_context>
             }

-            res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => {
+            res = Self::next_response(&mut self.response), if resp_available && !state.is_shutting_down() => {
                 self.process_response(res, state, out)?;
             }
</code_context>

<issue_to_address>
Check for shutdown before polling response to avoid unnecessary work.

The select branch for `next_response` now checks both `resp_available` and `!state.is_shutting_down()`. Ensure that this logic matches the intended shutdown semantics and does not introduce a race where a response is polled after shutdown has started.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread src/connection.rs
}

res = Self::next_response(&mut self.response), if !state.shutting_down && !state.resp_closed => {
res = Self::next_response(&mut self.response), if resp_available && !state.is_shutting_down() => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (review_instructions): Check for shutdown before polling response to avoid unnecessary work.

The select branch for next_response now checks both resp_available and !state.is_shutting_down(). Ensure that this logic matches the intended shutdown semantics and does not introduce a race where a response is polled after shutdown has started.

Review instructions:

Path patterns: **/*

Instructions:
Create code-review comments for ALL issues. Avoid making general observations or non-specific feedback if at all possible.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 201fc93 and eb6ca73.

📒 Files selected for processing (2)
  • docs/asynchronous-outbound-messaging-roadmap.md (1 hunks)
  • src/connection.rs (9 hunks)
🧰 Additional context used
📓 Path-based instructions (7)
`docs/**/*.md`: Use the markdown files within the `docs/` directory as a knowled...

docs/**/*.md: Use the markdown files within the docs/ directory as a knowledge base and source of truth for project requirements, dependency choices, and architectural decisions.
Proactively update the relevant file(s) in the docs/ directory to reflect the latest state when new decisions are made, requirements change, libraries are added/removed, or architectural patterns evolve.
Documentation in the docs/ directory must use en-GB-oxendict spelling and grammar, except for the word 'license'.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
`**/*.md`: Validate Markdown files using `markdownlint *.md **/*.md`. Run `mdfor...

**/*.md: Validate Markdown files using markdownlint *.md **/*.md.
Run mdformat-all after any documentation changes to format all Markdown files and fix table markup.
Validate Markdown Mermaid diagrams using the nixie CLI by running nixie *.md **/*.md.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown files must be wrapped at 120 columns.
Tables and headings in Markdown files must not be wrapped.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
`docs/*`: Each file in the docs/ directory provides design, roadmap, testing, op...

docs/*: Each file in the docs/ directory provides design, roadmap, testing, operations, or reference documentation for the project. Use these files as a reference for understanding the project's architecture, development plans, testing strategies, operational guidelines, and documentation conventions.

📄 Source: CodeRabbit Inference Engine (docs/contents.md)

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
`docs/**/*.md`: Use British English spelling based on the Oxford English Diction...

docs/**/*.md: Use British English spelling based on the Oxford English Dictionary, except retain US spelling in API names (e.g., 'color').
Use the Oxford comma in lists.
Write headings in sentence case and use Markdown heading levels in order without skipping.
Follow markdownlint recommendations for Markdown formatting.
Always use fenced code blocks with a language identifier; use 'plaintext' for non-code text.
Use '-' as the first level bullet and renumber lists when items change.
Prefer inline links using 'text' or angle brackets around the URL.
Expand any uncommon acronym on first use, e.g., Continuous Integration (CI).
Wrap paragraphs at 80 columns, code at 120 columns, and do not wrap tables.
Use footnotes referenced with '[^label]'.
When embedding figures, use 'alt text' and provide concise alt text describing the content.
Add a short description before each Mermaid diagram for screen readers.

📄 Source: CodeRabbit Inference Engine (docs/documentation-style-guide.md)

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
`**/*.md`: * Avoid 2nd person or 1st person pronouns ("I", "you", "we") * Use en...

**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")

  • Use en-oxendic spelling and grammar.
  • Paragraphs and bullets must be wrapped to 80 columns, except where a long URL would prevent this (in which case, silence MD013 for that line)
  • Code blocks should be wrapped to 120 columns.
  • Headings must not be wrapped.

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • docs/asynchronous-outbound-messaging-roadmap.md
`**/*.rs`: Every Rust module must begin with a module level (`//!`) comment expl...

**/*.rs: Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public Rust APIs using Rustdoc comments (///) so documentation can be generated with cargo doc.
Place function attributes after doc comments.

📄 Source: CodeRabbit Inference Engine (AGENTS.md)

List of files the instruction was applied to:

  • src/connection.rs
`**/*.rs`: * Seek to keep the cyclomatic complexity of functions no more than 12...

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments must use en-GB-oxendict spelling and grammar.

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

⚙️ Source: CodeRabbit Configuration File

List of files the instruction was applied to:

  • src/connection.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build-test
🔇 Additional comments (6)
src/connection.rs (5)

45-46: LGTM! Clean refactoring to support state consolidation.

The change from a single PushQueues<F> field to separate optional receivers improves state tracking and aligns perfectly with the run state consolidation objective.


78-79: Correct extraction of receivers from PushQueues.

The receivers are properly extracted and wrapped in Some, maintaining the initial state where both queues are available.


114-114: Proper state initialisation with response stream awareness.

The state correctly tracks whether a response stream exists, which is essential for accurate source counting.


213-223: Well-structured shutdown sequence.

The shutdown logic properly closes all receivers and drops the response stream whilst tracking the closed state. The use of take() for the response stream is particularly elegant.


231-244: Robust fairness logic with proper Option handling.

The updated fairness check correctly handles the optional low_rx and properly manages the disconnection case by setting it to None and marking the source as closed.

docs/asynchronous-outbound-messaging-roadmap.md (1)

18-18: Documentation correctly reflects completed implementation.

The roadmap update accurately marks the "Run state consolidation" task as complete, which aligns with the refactoring implemented in src/connection.rs.

Comment thread src/connection.rs
Comment thread src/connection.rs
@leynos leynos merged commit 5333cfa into main Jun 28, 2025
5 checks passed
@leynos leynos deleted the codex/implement-run-state-consolidation-with-tests branch June 28, 2025 20:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant