Refactor connection module into submodules#424
Conversation
Extract ActorState, MultiPacketContext, and Event types into separate submodules to improve code organisation and maintainability. The module structure now follows the established pattern used by app/, fragment/, and push/ modules. Extracted components: - state.rs: RunState enum and ActorState lifecycle management - multi_packet.rs: MultiPacketStamp, MultiPacketTerminationReason, and MultiPacketContext for correlation stamping and channel handling - event.rs: Event enum for the actor select loop The ConnectionActor implementation remains in mod.rs as the core public API. All existing tests pass and the public API is unchanged. closes #404 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Reviewer's GuideRefactors the monolithic connection module into three focused internal submodules (event, state, multi_packet), moving the Event enum, multi-packet channel/correlation logic, and actor lifecycle tracking into their own files while updating module wiring and test imports to preserve existing behaviour. Class diagram for refactored connection state and multi-packet submodulesclassDiagram
class Event~F,E~ {
<<enumeration>>
Shutdown
High
Low
MultiPacket
Response
Idle
}
class MultiPacketStamp {
<<enumeration>>
Disabled
Enabled
}
class MultiPacketTerminationReason {
<<enumeration>>
Drained
Disconnected
Shutdown
+as_str() &'static str
}
class MultiPacketContext~F~ {
-channel Option_mpsc_Receiver_F_
-stamp MultiPacketStamp
+new() MultiPacketContext_F_
+install(channel Option_mpsc_Receiver_F_, stamp MultiPacketStamp) void
+clear() void
+channel_mut() Option_mpsc_Receiver_F__ref
+take_channel() Option_mpsc_Receiver_F_
+stamp() MultiPacketStamp
+correlation_id() Option_u64_
+is_active() bool
}
class RunState {
<<enumeration>>
Active
ShuttingDown
Finished
}
class ActorState {
-run_state RunState
+closed_sources usize
+total_sources usize
+new(has_response bool, has_multi_packet bool) ActorState
+mark_closed() void
+start_shutdown() void
+is_active() bool
+is_shutting_down() bool
+is_done() bool
}
class ConnectionActor {
-state ActorState
-multi_packet MultiPacketContext_Frame_
+next_event() Event_Frame,Error_
}
Event~F,E~ <.. ConnectionActor : uses
MultiPacketStamp <.. MultiPacketContext~F~ : uses
MultiPacketTerminationReason <.. MultiPacketContext~F~ : used_for_logging
RunState <.. ActorState : uses
ActorState <.. ConnectionActor : lifecycle_state
MultiPacketContext~F~ <.. ConnectionActor : multi_packet_handling
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Summary by CodeRabbitRelease Notes
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughSummarise the Connection actor refactor: extract internal Event, MultiPacket and State modules; add fairness-aware draining, polling, response streaming, shutdown handling, multi-packet stamping and active-connection tracking; introduce ConnectionActor that selects from multiple sources and dispatches events until shutdown or stream exhaustion. Changes
Sequence Diagram(s)sequenceDiagram
participant Selector as Select loop
participant HighQ as High queue
participant LowQ as Low queue
participant Multi as MultiPacket channel
participant Resp as Response stream
participant Shutdown as Cancellation token
participant Dispatcher as dispatch_event
participant Out as Output collector
Selector->>Shutdown: poll cancellation
Selector->>HighQ: poll high queue
Selector->>LowQ: poll low queue
Selector->>Multi: poll multi-packet channel
Selector->>Resp: poll response stream
alt Shutdown signalled
Selector->>Dispatcher: Shutdown
else Frame from High
Selector->>Dispatcher: High(frame)
else Frame from Low
Selector->>Dispatcher: Low(frame)
else Frame from Multi
Selector->>Dispatcher: MultiPacket(frame)
else Response item
Selector->>Dispatcher: Response(result)
else Idle
Selector->>Dispatcher: Idle
end
Dispatcher->>Selector: return outcome (ok / error)
Dispatcher->>Out: emit frames (if any)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- In
state.rs, consider keepingActorState’sclosed_sourcesandtotal_sourcesfields private and exposing any needed access via methods to better preserve the internal lifecycle invariants. - If
RunStateis only consumed withinActorState, you may be able to make it private to the module (or even the struct) rather thanpub(super)to tighten encapsulation around the actor’s lifecycle. - For the multi-packet helpers, double-check whether
MultiPacketStampandMultiPacketTerminationReasonreally needpub(super)visibility or whether they can remain private tomulti_packet.rsto avoid leaking internal implementation details across the connection submodules.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `state.rs`, consider keeping `ActorState`’s `closed_sources` and `total_sources` fields private and exposing any needed access via methods to better preserve the internal lifecycle invariants.
- If `RunState` is only consumed within `ActorState`, you may be able to make it private to the module (or even the struct) rather than `pub(super)` to tighten encapsulation around the actor’s lifecycle.
- For the multi-packet helpers, double-check whether `MultiPacketStamp` and `MultiPacketTerminationReason` really need `pub(super)` visibility or whether they can remain private to `multi_packet.rs` to avoid leaking internal implementation details across the connection submodules.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/mod.rs`:
- Around line 8-10: The file still exceeds the 400-line cap; find the large
helper functions/impl blocks remaining in this module and extract them into new
submodules (e.g., create files and add mod declarations similar to mod event /
mod multi_packet / mod state) preserving existing visibility (pub / pub(crate))
and re-exporting symbols used externally; update all use paths and references to
the moved functions/structs (for example move heavy helpers into new submodules
like helpers.rs or utils.rs and add pub use self::helpers::{...}) and run cargo
build/tests to fix any visibility or import errors.
|
@coderabbitai please create a new GitHub issue for the following:
|
|
@coderabbitai please create a new GitHub issue for the following:
|
|
@coderabbitai please create a new GitHub issue for the following:
|
|
@coderabbitai please create a new GitHub issue for the following:
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue includes all seven files you specified and references this PR as a pattern for the modularisation approach. |
|
@coderabbitai please create a new GitHub issue for the following: Duplication / similarity
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue documents the layering violation where |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 6158 🏁 Script executed: Length of output: 870 🏁 Script executed: Length of output: 664 🏁 Script executed: Length of output: 2445 🏁 Script executed: Length of output: 46
Issue: #430 The issue documents:
The issue has been assigned to you. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue documents the current situation where |
|
@coderabbitai please create a new GitHub issue for the following:
|
|
@coderabbitai please create a new GitHub issue for the following:
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue includes all three areas you specified:
|
|
@coderabbitai please create a new GitHub issue for the following:
|
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue documents:
The issue has been assigned to you. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue documents the current brittle string matching in |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue includes both concerns:
The issue has been assigned to you. |
Address code review feedback from PR #403: - Make ActorState fields private, exposing closed_sources() and total_sources() accessor methods to preserve lifecycle invariants - Make RunState private to state.rs module since it's only consumed within ActorState - Make MultiPacketStamp private to multi_packet.rs, adding is_stamping_enabled() method to avoid leaking implementation details - Extract helper functions into domain-focused submodules to bring mod.rs below the 400-line cap: - drain.rs: Queue drain operations and fairness-aware helpers - frame.rs: Frame processing and correlation stamping - response.rs: Streaming response handling - shutdown.rs: Shutdown and multi-packet closure handling - polling.rs: Async polling utilities - dispatch.rs: Event dispatching Module layout after refactoring: - mod.rs: 399 lines (core actor struct and public API) - drain.rs: 189 lines (queue processing) - frame.rs: 88 lines (frame emission) - response.rs: 77 lines (streaming response) - shutdown.rs: 83 lines (shutdown handling) - polling.rs: 70 lines (async helpers) - dispatch.rs: 34 lines (event dispatch) All existing tests pass and the public API is unchanged. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on file "response stream is active"
),
);
self.multi_packet.install(channel, None);❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
…lti_packet_with_correlation Replaced the direct call to `self.multi_packet.install(channel, None)` and debug assertion with a call to the helper method `set_multi_packet_with_correlation(channel, None)`. This centralizes correlation handling and reduces code duplication in the connection module. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/connection/drain.rs`:
- Around line 113-131: The match arm for QueueKind::High in
try_opportunistic_drain currently uses debug_assert!(false, ...) which
unconditionally panics in debug builds; replace it with a meaningful assertion
or an explicit unreachable to reflect intent: either add a guard so
QueueKind::High cannot reach this function (e.g., handle/exclude High earlier)
or change the arm to debug_assert!(some_condition_reflecting_invariant) and
return false, or if the branch truly cannot happen, use unreachable!() to signal
a hard invariant; update references to QueueKind::High and
try_opportunistic_drain accordingly so the code documents the intended invariant
instead of asserting false.
In `@src/connection/multi_packet.rs`:
- Around line 60-76: The doc comment for install currently omits that
correlation_id is ignored when channel is None; update the comment for the
install method to explicitly state that correlation_id is only used when channel
is Some and is ignored when channel is None (i.e., stamping is disabled via
MultiPacketStamp::Disabled), referencing the install method, the channel
parameter, correlation_id parameter, and the MultiPacketStamp enum to make the
behavior clear to callers.
In `@src/connection/polling.rs`:
- Around line 45-55: The small wrapper await_shutdown merely forwards to
wait_shutdown with no extra behavior; either remove the wrapper and replace all
call sites with Self::wait_shutdown(token).await to eliminate indirection, or
keep it but add a one-line doc comment to await_shutdown explaining its purpose
(e.g., a future extension or test seam) so callers understand why it exists;
update any references to await_shutdown accordingly and run tests to ensure no
behavioral changes.
- Replaced 'await_shutdown' with 'wait_shutdown' for shutdown cancellation handling to simplify code. - Added Clippy exemption for unreachable code in 'try_opportunistic_drain' to maintain exhaustive matching. - Improved documentation comments regarding multi-packet channel stamping behavior. - Removed unused 'await_shutdown' function to reduce dead code. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/drain.rs`:
- Around line 113-175: Remove the #[expect(clippy::unreachable, ...)] annotation
and change the QueueKind::High match arm in try_opportunistic_drain to be a
neutral, non-panicking branch: replace the unreachable! macro with a
debug_assert!(false, "try_opportunistic_drain(High) should not be called") (or
assert! in debug builds) and return false so the function remains exhaustive
without silencing the lint; keep QueueKind::Low and QueueKind::Multi logic
unchanged.
…drain.rs Changed try_opportunistic_drain for QueueKind::High from unreachable!() call to a debug_assert!(false) and returning false. Removed the #[expect(clippy::unreachable)] attribute. This improves clarity and debugging while preserving behavior. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Resolves the layering violation where `fragment_packet` was defined in `app::fragment_utils` but used by the connection actor. The function now resides in its canonical location at `fragment::packet` alongside other fragmentation primitives, while preserving backward compatibility through a re-export from the original `app::fragment_utils` path. This follows the modularisation pattern established in PR #424. Closes #431 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
* Move fragment_packet to fragment module Resolves the layering violation where `fragment_packet` was defined in `app::fragment_utils` but used by the connection actor. The function now resides in its canonical location at `fragment::packet` alongside other fragmentation primitives, while preserving backward compatibility through a re-export from the original `app::fragment_utils` path. This follows the modularisation pattern established in PR #424. Closes #431 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Introduce Fragmentable trait to remove app layer dependency Addresses code review feedback on the fragment_packet refactoring: 1. Introduces `Fragmentable` trait and `FragmentParts` type in the fragment module, removing the dependency on `crate::app::{Packet, PacketParts}`. The fragmentation layer now operates on its own abstractions. 2. Adds a blanket implementation in the app module so all `Packet` types automatically implement `Fragmentable`, preserving backward compatibility. 3. Marks `app::fragment_utils::fragment_packet` as deprecated with guidance to migrate to `crate::fragment::fragment_packet`. This completes the layering fix: the fragment module no longer depends on app-layer types, and the transition path for callers is explicit. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Summary
Changes
Module structure
Test support changes
Backwards compatibility
Test plan
🌿 Generated by Terry
📎 Task: https://www.terragonlabs.com/task/05cb208e-ef51-4495-b6a6-d1f5d051f8cd