Replace response and multi_packet with ActiveOutput enum#427
Conversation
Reviewer's GuideRefactors ConnectionActor to manage its output source via a single ActiveOutput enum, enforcing mutual exclusivity between streaming responses and multi-packet channels, and updates event polling, multi-packet lifecycle handling, shutdown, and tests to use this new abstraction. Sequence diagram for next_event polling using ActiveOutputsequenceDiagram
participant ConnActor
participant Shutdown
participant HighQueue
participant LowQueue
participant MultiPacketChannel
participant ResponseStream
ConnActor->>ConnActor: compute_availability(state)
ConnActor->>ConnActor: extract multi_channel,response_stream from active_output
alt shutdown_active
ConnActor->>Shutdown: await_shutdown()
Shutdown-->>ConnActor: triggered
ConnActor-->>ConnActor: Event_Shutdown
else response_active
ConnActor->>ResponseStream: poll_response()
ResponseStream-->>ConnActor: PollResult
ConnActor-->>ConnActor: Event_Response
else multi_packet_active
ConnActor->>MultiPacketChannel: poll_queue()
MultiPacketChannel-->>ConnActor: PollResult
ConnActor-->>ConnActor: Event_MultiPacket
else queues_only
ConnActor->>HighQueue: poll_queue()
HighQueue-->>ConnActor: PollResult
ConnActor->>LowQueue: poll_queue()
LowQueue-->>ConnActor: PollResult
ConnActor-->>ConnActor: Event_High_Low_Idle
end
Class diagram for ConnectionActor output refactor using ActiveOutput enumclassDiagram
class ConnectionActor~F,E~ {
- Option<mpsc::Receiver<F>> high_rx
- Option<mpsc::Receiver<F>> low_rx
- ActiveOutput<F,E> active_output
- CancellationToken shutdown
+ set_response(stream: Option<FrameStream<F,E>>)
+ set_multi_packet(channel: Option<mpsc::Receiver<F>>)
+ set_multi_packet_with_correlation(channel: Option<mpsc::Receiver<F>>, correlation_id: Option<u64>)
- apply_multi_packet_correlation(frame: &mut F)
+ run(out: &mut Vec<F>) Result<(),E>
- compute_availability(state: &ActorState) EventAvailability
- next_event(state: &ActorState) Event<F,E>
- end_multi_packet(reason: MultiPacketTerminationReason, state: &mut ActorState, out: &mut Vec<F>)
- shutdown_queues(state: &mut ActorState)
- handle_queue_frame(kind: QueueKind, frame: F, ctx: DrainContext<F>, state: &mut ActorState)
}
class ActiveOutput~F,E~ {
+ None
+ Response(FrameStream<F,E>)
+ MultiPacket(MultiPacketContext<F>)
+ is_response() bool
+ is_multi_packet() bool
+ multi_packet_mut() Option<&mut MultiPacketContext<F>>
+ clear_response()
}
class MultiPacketContext~F~ {
- Option<mpsc::Receiver<F>> channel
- MultiPacketStamp stamp
+ new() MultiPacketContext<F>
+ install(channel: Option<mpsc::Receiver<F>>, stamp: MultiPacketStamp)
+ channel_mut() Option<&mut mpsc::Receiver<F>>
+ stamp() MultiPacketStamp
+ correlation_id() Option<u64>
}
class EventAvailability {
+ bool high
+ bool low
+ bool multi_packet
+ bool response
}
ConnectionActor "1" o-- "1" ActiveOutput
ConnectionActor "1" o-- "1" EventAvailability
ActiveOutput "1" o-- "1" MultiPacketContext
MultiPacketContext "1" o-- "1" MultiPacketStamp
Flow diagram for ActiveOutput state transitions in ConnectionActorflowchart LR
none[None]
resp[Response]
multi[MultiPacket]
none -- set_response_Some --> resp
none -- set_multi_packet_Some --> multi
resp -- set_response_None --> none
resp -- shutdown_queues --> none
resp -- protocol_error --> none
multi -- end_multi_packet --> none
multi -- shutdown_queues --> none
multi -- set_multi_packet_None --> none
none -. initialize_with_response_Some .-> resp
none -. initialize_with_response_None .-> none
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
WalkthroughIntroduce an ActiveOutput abstraction to make response streams and multi-packet channels mutually exclusive; make set_response/set_multi_packet/set_multi_packet_with_correlation fallible (return Result<..., ConnectionStateError>); update draining, framing, shutdown, multi-packet internals, tests and harness to use and propagate the new API and state model. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +548 to +557 let multi_available = self.active_output.is_multi_packet() && !state.is_shutting_down();
let resp_available = self.active_output.is_response() && !state.is_shutting_down();
// Extract either the multi-packet channel or response stream since they're
// mutually exclusive - only one can be active at a time.
let (multi_channel, response_stream) = match &mut self.active_output {
ActiveOutput::MultiPacket(ctx) => (ctx.channel_mut(), None),
ActiveOutput::Response(stream) => (None, Some(stream)),
ActiveOutput::None => (None, None),
};❌ New issue: Complex Method |
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `src/connection.rs:276` </location>
<code_context>
+ clippy::struct_excessive_bools,
+ reason = "Availability flags are a natural fit for booleans; no state machine needed"
+)]
+#[derive(Clone, Copy)]
+struct EventAvailability {
+ high: bool,
</code_context>
<issue_to_address>
**issue (complexity):** Consider pushing shutdown/closure logic into `ActiveOutput` and inlining the availability booleans in `next_event` to keep invariants while simplifying control flow and types.
You can keep the type-level invariant while reducing the new surface area and branching. Two focused suggestions:
---
### 1. Move shutdown / closure behavior into `ActiveOutput`
Right now `start_shutdown` needs to know internal details of `ActiveOutput`/`MultiPacketContext` and manually do:
- correlation extraction
- `mem::replace`
- channel closure
- `on_command_end`
You can encapsulate this in `ActiveOutput` with a higher-level API and keep `start_shutdown` linear:
```rust
impl<F, E> ActiveOutput<F, E> {
fn shutdown(
&mut self,
reason: MultiPacketTerminationReason,
hooks: &mut ProtocolHooks<F, E>,
ctx: &mut ConnectionContext,
state: &mut ActorState,
log: impl Fn(MultiPacketTerminationReason, Option<u64>),
) {
match self {
ActiveOutput::MultiPacket(ctx_mp) => {
let correlation = ctx_mp.correlation_id();
log(reason, correlation);
if let Some(rx) = ctx_mp.channel_mut() {
rx.close();
state.mark_closed();
}
if let Some(frame) = hooks.stream_end_frame(ctx) {
// let ConnectionActor handle emitting; or call a callback
// depending on where you want this to live
// emit_multi_packet_frame(frame, out);
}
hooks.on_command_end(ctx);
*self = ActiveOutput::None;
}
ActiveOutput::Response(_) => {
state.mark_closed();
*self = ActiveOutput::None;
}
ActiveOutput::None => {}
}
}
}
```
Then `ConnectionActor::start_shutdown` becomes simpler and doesn’t need `mem::replace` or to know about correlation IDs:
```rust
fn start_shutdown(&mut self, state: &mut ActorState) {
if let Some(rx) = &mut self.high_rx {
rx.close();
}
if let Some(rx) = &mut self.low_rx {
rx.close();
}
self.active_output.shutdown(
MultiPacketTerminationReason::Shutdown,
&mut self.hooks,
&mut self.ctx,
state,
|reason, correlation| self.log_multi_packet_closure(reason, correlation),
);
}
```
You can apply the same pattern to `handle_multi_packet_closed`, e.g. `ActiveOutput::handle_multi_packet_closed(...)`, so the actor body stops pattern-matching on internal enum variants in multiple places.
---
### 2. Inline `EventAvailability` into `next_event`
`EventAvailability` and `compute_availability` only wrap four simple booleans and force a jump to understand what’s polled. You can keep the new `ActiveOutput` but remove this indirection:
```rust
async fn next_event(&mut self, state: &ActorState) -> Event<F, E> {
let high_available = self.high_rx.is_some();
let low_available = self.low_rx.is_some();
let multi_available = self.active_output.is_multi_packet() && !state.is_shutting_down();
let resp_available = self.active_output.is_response() && !state.is_shutting_down();
// Extract either the multi-packet channel or response stream since they're
// mutually exclusive.
let (multi_channel, response_stream) = match &mut self.active_output {
ActiveOutput::MultiPacket(ctx) => (ctx.channel_mut(), None),
ActiveOutput::Response(stream) => (None, Some(stream)),
ActiveOutput::None => (None, None),
};
tokio::select! {
biased;
() = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown,
res = Self::poll_queue(self.high_rx.as_mut()), if high_available => Event::High(res),
res = Self::poll_queue(self.low_rx.as_mut()), if low_available => Event::Low(res),
res = Self::poll_queue(multi_channel), if multi_available => Event::MultiPacket(res),
res = Self::poll_response(response_stream), if resp_available => Event::Response(res),
else => Event::Idle,
}
}
```
This preserves the `ActiveOutput` invariant and the borrow-checker workaround but removes the extra type and method, making availability logic local and easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection.rs`:
- Around line 387-398: Replace the debug-only checks that allow conflicting
active_output in release builds: in set_response and the two other setter
methods that assign self.active_output (the other setters in this file that
mirror set_response's active_output assignment), change the debug_assert! that
checks !self.active_output.is_multi_packet() to assert! with the same error
message so the code panics in all builds when attempting to set a
response/error/stream while a multi-packet channel is active; keep the existing
concatenated message text and only change debug_assert! to assert!.
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +615 to +625 let multi_available = self.active_output.is_multi_packet() && !state.is_shutting_down();
let response_available = self.active_output.is_response() && !state.is_shutting_down();
// Extract either the multi-packet channel or response stream since they're
// mutually exclusive - only one can be active at a time. This must be
// inlined to satisfy the borrow checker's disjoint field analysis.
let (multi_channel, response_stream) = match &mut self.active_output {
ActiveOutput::MultiPacket(ctx) => (ctx.channel_mut(), None),
ActiveOutput::Response(stream) => (None, Some(stream)),
ActiveOutput::None => (None, None),
};❌ New issue: Complex Method |
This comment was marked as resolved.
This comment was marked as resolved.
0dc5e17 to
1f4af30
Compare
Consolidate the separate response stream and multi_packet channel fields into a single ActiveOutput enum that enforces mutual exclusion at compile time rather than via runtime debug_assert! checks. This change includes: - ActiveOutput enum with None, Response, and MultiPacket variants - ConnectionStateError enum for propagated errors from setter methods - set_response and set_multi_packet now return Result<(), ConnectionStateError> - ShutdownResult and MultiPacketCloseResult for encapsulated shutdown logic - EventAvailability struct to compute source availability for next_event - Removed unused methods from MultiPacketContext (clear, take_channel, is_active) All call sites updated to handle the new Result returns appropriately. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1f4af30 to
26bb362
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/mod.rs`:
- Line 19: Replace the manual Display/Error implementation for the
ConnectionStateError enum by deriving thiserror::Error and annotating each
variant with #[error("...")] messages; specifically remove the fmt import and
the impl blocks for fmt::Display and std::error::Error, add use thiserror::Error
to imports, change #[derive(Clone, Copy, Debug, PartialEq, Eq)] to
#[derive(Clone, Copy, Debug, PartialEq, Eq, Error)] on ConnectionStateError, and
add #[error("cannot set response while a multi-packet channel is active")] to
MultiPacketActive and #[error("cannot set multi-packet channel while a response
stream is active")] to ResponseActive so the variant messages are co-located
with the enum.
Replace manual Display and Error implementations with thiserror derive macro annotations, co-locating error messages with their variants. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/mod.rs`:
- Around line 114-228: Extract the ActiveOutput, ShutdownResult,
MultiPacketCloseResult, and EventAvailability types into a new submodule (e.g.,
an output submodule) and re-export them from the parent connection module; move
the enum/struct/impl blocks as-is into the new file, adjust their visibility
(pub or pub(crate)) to match how they're used elsewhere, add mod output; pub use
output::{ActiveOutput, ShutdownResult, MultiPacketCloseResult,
EventAvailability}; in the original file, and update any local references to
rely on the re-exports; ensure the impl methods (is_response, is_multi_packet,
multi_packet_mut, clear_response, shutdown, close_multi_packet, clear) remain
intact and compile.
Move ActiveOutput, ShutdownResult, MultiPacketCloseResult, and EventAvailability into a dedicated output submodule to reduce the size of the main connection module. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Extract types to dedicated submodules for better organization: - DrainContext and QueueKind moved to drain.rs - ActiveConnection counter types moved to new counter.rs - ConnectionChannels moved to new channels.rs All types are re-exported as needed to maintain the public API. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/connection/test_support.rs (1)
235-265: 🧹 Nitpick | 🔵 TrivialConsolidate the multi-queue tests using
rstestfixtures and parameterised cases.Collapse the three tests into a single parameterised case and share harness setup via a fixture to eliminate duplication.
♻️ Proposed refactor
#[cfg(test)] mod tests { + use rstest::{fixture, rstest}; use tokio::sync::mpsc; use super::*; - #[test] - fn has_multi_queue_false_by_default() { - let harness = ActorHarness::new().expect("build ActorHarness"); - assert!( - !harness.has_multi_queue(), - "multi-packet queue should start inactive" - ); - } - - #[test] - fn has_multi_queue_true_after_install() { - let mut harness = ActorHarness::new().expect("build ActorHarness"); - let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); - assert!( - harness.has_multi_queue(), - "multi-packet queue should be active after install" - ); - } - - #[test] - fn has_multi_queue_false_after_clear() { - let mut harness = ActorHarness::new().expect("build ActorHarness"); - let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); - assert!( - harness.has_multi_queue(), - "multi-packet queue should be active after install" - ); - harness.set_multi_queue(None).expect("clear multi_queue"); - assert!( - !harness.has_multi_queue(), - "multi-packet queue should be inactive after clear" - ); - } + #[fixture] + fn harness() -> ActorHarness { + ActorHarness::new().expect("build ActorHarness") + } + + #[rstest] + #[case::default(false, false, false)] + #[case::install(true, false, true)] + #[case::clear(true, true, false)] + fn has_multi_queue_states( + #[case] install: bool, + #[case] clear: bool, + #[case] expected: bool, + mut harness: ActorHarness, + ) { + if install { + let (_tx, rx) = mpsc::channel(1); + harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); + } + if clear { + harness.set_multi_queue(None).expect("clear multi_queue"); + } + assert_eq!( + harness.has_multi_queue(), + expected, + "multi-packet queue state mismatch" + ); + } }Per coding guidelines, utilise
rstestfixtures for shared setup in Rust tests and replace duplicated tests with#[rstest(...)]parameterised cases.
There was a problem hiding this comment.
@leynos, yes! The file has been successfully decomposed and now meets the guideline. ✓
Verification results:
- Current line count: 397 lines (3 lines under the 400-line limit)
- Target: ≤400 lines
- Status: ✓ Resolved
The additional extraction work since the last check has brought the file into compliance:
- ActiveOutput, ShutdownResult, MultiPacketCloseResult, and EventAvailability →
output.rs(120 lines) - ConnectionChannels →
channels.rs - Additional refinements reduced the file from 451 → 397 lines
The decomposition maintains logical organisation while satisfying the file size constraint.
Refactored tests in src/connection/test_support.rs to use rstest fixtures and parameterized test cases. Consolidated three separate tests of the multi-queue state (default, after install, after clear) into a single parametrized test to improve readability and reduce duplication. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/test_support.rs`:
- Around line 234-239: The file's formatting around the test fixture (the
harness() function that returns ActorHarness and calls ActorHarness::new()) does
not match rustfmt; run the workspace formatter (e.g., make fmt or cargo fmt
--workspace) or run cargo fmt on src/connection/test_support.rs to apply the
expected formatting changes, then re-commit the updated file so cargo fmt --all
-- --check passes in CI.
Added a comment to the harness() fixture in test_support.rs that explains it provides an ActorHarness for parameterised multi-queue state tests. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/test_support.rs`:
- Around line 229-233: Add a module-level doc comment above the `mod tests`
block using a `//!` comment that briefly explains the purpose of the test module
(e.g., that it contains unit/integration helpers and test cases for connection
test support, fixtures like `fixture`/`rstest`, and use of `tokio::sync::mpsc`).
Place the `//!` comment immediately before `mod tests` so the module-level
documentation requirement is satisfied and clearly describes what the test
utilities and test cases inside `mod tests` are for.
Added documentation comment for the tests module describing that it contains unit tests for the `ActorHarness` fixture using parameterised `rstest` cases. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/connection/test_support.rs`:
- Around line 238-266: Change the harness fixture to be fallible (return
Result<ActorHarness, Box<dyn std::error::Error>> or
anyhow::Result<ActorHarness>) and construct it with ActorHarness::new()? (i.e.
return ActorHarness::new().map_err(Into::into) so the error is propagated
instead of .expect()); change the test function has_multi_queue_states to return
Result<(), Box<dyn std::error::Error>> (or anyhow::Result) and replace the
.expect() calls on harness.set_multi_queue(...) with ? to propagate errors, and
end the test with Ok(()) after the assert_eq!; keep references to harness,
ActorHarness::new, has_multi_queue_states, and set_multi_queue to locate the
changes.
Changed the test support harness function and tests to return Result types with proper error propagation instead of panicking. This improves error handling and test reliability by using standard Result and ? operator instead of expect calls. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Summary
Changes
Core Functionality
API Support / Test Surface
Misc
Why
Migration notes
Risk & Mitigations
How to test
Notes for reviewers
📎 Task: https://www.terragonlabs.com/task/14d3e409-e284-48da-9a13-ff56268fd638