From 26bb362fb6456815290bde1c9b8b13705f22e1fb Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 30 Jan 2026 18:46:09 +0000 Subject: [PATCH 1/8] Refactor ConnectionActor output sources to ActiveOutput enum MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidate the separate response stream and multi_packet channel fields into a single ActiveOutput enum that enforces mutual exclusion at compile time rather than via runtime debug_assert! checks. This change includes: - ActiveOutput enum with None, Response, and MultiPacket variants - ConnectionStateError enum for propagated errors from setter methods - set_response and set_multi_packet now return Result<(), ConnectionStateError> - ShutdownResult and MultiPacketCloseResult for encapsulated shutdown logic - EventAvailability struct to compute source availability for next_event - Removed unused methods from MultiPacketContext (clear, take_channel, is_active) All call sites updated to handle the new Result returns appropriately. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/connection/drain.rs | 13 +- src/connection/frame.rs | 8 +- src/connection/mod.rs | 272 +++++++++++++++++++++++++++----- src/connection/multi_packet.rs | 9 -- src/connection/response.rs | 4 +- src/connection/shutdown.rs | 34 ++-- src/connection/test_support.rs | 20 ++- tests/connection.rs | 12 +- tests/correlation_id.rs | 4 +- tests/fixtures/correlation.rs | 4 +- tests/fixtures/multi_packet.rs | 4 +- tests/fixtures/stream_end.rs | 7 +- tests/multi_packet.rs | 16 +- tests/multi_packet_streaming.rs | 9 +- tests/stream_end.rs | 16 +- 15 files changed, 329 insertions(+), 103 deletions(-) diff --git a/src/connection/drain.rs b/src/connection/drain.rs index 3f742acb..b6de2fb6 100644 --- a/src/connection/drain.rs +++ b/src/connection/drain.rs @@ -36,8 +36,12 @@ where let DrainContext { out, state } = ctx; match res { Some(frame) => { + let is_stamping = self + .active_output + .multi_packet_mut() + .is_some_and(|ctx| ctx.is_stamping_enabled()); match kind { - QueueKind::Multi if self.multi_packet.is_stamping_enabled() => { + QueueKind::Multi if is_stamping => { self.emit_multi_packet_frame(frame, out); } _ => { @@ -144,8 +148,11 @@ where } } QueueKind::Multi => { - let result = match self.multi_packet.channel_mut() { - Some(rx) => rx.try_recv(), + let result = match self.active_output.multi_packet_mut() { + Some(ctx) => match ctx.channel_mut() { + Some(rx) => rx.try_recv(), + None => return false, + }, None => return false, }; diff --git a/src/connection/frame.rs b/src/connection/frame.rs index cbd1774f..6b821684 100644 --- a/src/connection/frame.rs +++ b/src/connection/frame.rs @@ -23,12 +23,16 @@ where /// Apply correlation stamping to a multi-packet frame. pub(super) fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - if !self.multi_packet.is_stamping_enabled() { + let Some(ctx) = self.active_output.multi_packet_mut() else { // No channel is active, so there is nothing to stamp. return; + }; + + if !ctx.is_stamping_enabled() { + return; } - let correlation_id = self.multi_packet.correlation_id(); + let correlation_id = ctx.correlation_id(); frame.set_correlation_id(correlation_id); if let Some(expected) = correlation_id { diff --git a/src/connection/mod.rs b/src/connection/mod.rs index deaf4313..26f2153f 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -16,6 +16,7 @@ mod shutdown; mod state; use std::{ + fmt, net::SocketAddr, sync::{ Arc, @@ -96,6 +97,151 @@ impl ConnectionChannels { pub fn new(queues: PushQueues, handle: PushHandle) -> Self { Self { queues, handle } } } +/// Error returned when attempting to set an active output source while +/// another source is already active. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ConnectionStateError { + /// A multi-packet channel is currently active and must be cleared before + /// setting a response stream. + MultiPacketActive, + /// A response stream is currently active and must be cleared before + /// setting a multi-packet channel. + ResponseActive, +} + +impl fmt::Display for ConnectionStateError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::MultiPacketActive => write!( + f, + "cannot set response while a multi-packet channel is active" + ), + Self::ResponseActive => write!( + f, + "cannot set multi-packet channel while a response stream is active" + ), + } + } +} + +impl std::error::Error for ConnectionStateError {} + +/// Active output source for the connection actor. +/// +/// At most one output source can be active at a time. This enum makes the +/// mutual exclusion compile-time enforced rather than runtime-asserted. +enum ActiveOutput { + /// No output source is active. + None, + /// A streaming response is active. + Response(FrameStream), + /// A multi-packet channel is active. + MultiPacket(MultiPacketContext), +} + +/// Result of shutting down an active output source. +struct ShutdownResult { + /// Correlation ID of the multi-packet context, if any. + correlation_id: Option, + /// Whether the source should be marked as closed in `ActorState`. + source_closed: bool, + /// Whether `on_command_end` hook should be called. + call_on_command_end: bool, +} + +/// Result of closing an active multi-packet channel. +struct MultiPacketCloseResult { + /// Correlation ID of the multi-packet context, if any. + correlation_id: Option, +} + +impl ActiveOutput { + /// Returns `true` if a streaming response is active. + fn is_response(&self) -> bool { matches!(self, Self::Response(_)) } + + /// Returns `true` if a multi-packet channel is active. + fn is_multi_packet(&self) -> bool { matches!(self, Self::MultiPacket(_)) } + + /// Returns a mutable reference to the multi-packet context if active. + fn multi_packet_mut(&mut self) -> Option<&mut MultiPacketContext> { + match self { + Self::MultiPacket(ctx) => Some(ctx), + _ => Option::None, + } + } + + /// Clears the response stream, leaving `None` in its place. + fn clear_response(&mut self) { + if matches!(self, Self::Response(_)) { + *self = Self::None; + } + } + + /// Perform shutdown cleanup and return the result. + /// + /// This takes ownership of the active output, closes any receivers, and + /// returns metadata needed by the caller to complete shutdown handling. + fn shutdown(&mut self) -> ShutdownResult { + match std::mem::replace(self, Self::None) { + Self::MultiPacket(mut ctx) => { + let correlation_id = ctx.correlation_id(); + let source_closed = if let Some(rx) = ctx.channel_mut() { + rx.close(); + true + } else { + false + }; + ShutdownResult { + correlation_id, + source_closed, + call_on_command_end: true, + } + } + Self::Response(_) => ShutdownResult { + correlation_id: None, + source_closed: true, + call_on_command_end: false, + }, + Self::None => ShutdownResult { + correlation_id: None, + source_closed: false, + call_on_command_end: false, + }, + } + } + + /// Begin closing a multi-packet channel. + /// + /// This closes the receiver and returns the correlation ID for logging, + /// but does NOT clear the context yet. The caller must call `clear()` after + /// emitting any terminator frames that need correlation IDs applied. + fn close_multi_packet(&mut self) -> MultiPacketCloseResult { + let correlation_id = self.multi_packet_mut().and_then(|ctx| ctx.correlation_id()); + if let Self::MultiPacket(ctx) = self + && let Some(rx) = ctx.channel_mut() + { + rx.close(); + } + MultiPacketCloseResult { correlation_id } + } + + /// Clear the active output to `None`. + fn clear(&mut self) { *self = Self::None; } +} + +/// Availability flags for event sources polled by the connection actor. +#[expect( + clippy::struct_excessive_bools, + reason = "Availability flags are a natural fit for booleans; no state machine needed" +)] +#[derive(Clone, Copy)] +struct EventAvailability { + high: bool, + low: bool, + multi_packet: bool, + response: bool, +} + impl Default for FairnessConfig { fn default() -> Self { Self { @@ -125,11 +271,12 @@ impl Default for FairnessConfig { pub struct ConnectionActor { high_rx: Option>, low_rx: Option>, - response: Option>, // current streaming response - /// Optional multi-packet channel drained after low-priority frames. - /// This preserves fairness with queued sources. + /// Active output source: either a streaming response or a multi-packet channel. + /// + /// At most one output source can be active at a time. The multi-packet channel + /// is drained after low-priority frames to preserve fairness with queued sources. /// The actor emits the protocol terminator when the sender closes the channel. - multi_packet: MultiPacketContext, + active_output: ActiveOutput, shutdown: CancellationToken, counter: Option, hooks: ProtocolHooks, @@ -202,11 +349,14 @@ where let ConnectionChannels { queues, handle } = channels; let ctx = ConnectionContext; let counter = ActiveConnection::new(); + let active_output = match response { + Some(stream) => ActiveOutput::Response(stream), + None => ActiveOutput::None, + }; let mut actor = Self { high_rx: Some(queues.high_priority_rx), low_rx: Some(queues.low_priority_rx), - response, - multi_packet: MultiPacketContext::new(), + active_output, shutdown, counter: Some(counter), hooks, @@ -242,24 +392,45 @@ where } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { - debug_assert!( - !self.multi_packet.is_active(), - concat!( - "ConnectionActor invariant violated: cannot set response while a ", - "multi_packet channel is active" - ), - ); - self.response = stream; + /// + /// # Errors + /// + /// Returns [`ConnectionStateError::MultiPacketActive`] if a multi-packet + /// channel is currently active. + pub fn set_response( + &mut self, + stream: Option>, + ) -> Result<(), ConnectionStateError> { + if self.active_output.is_multi_packet() { + return Err(ConnectionStateError::MultiPacketActive); + } + self.active_output = match stream { + Some(s) => ActiveOutput::Response(s), + None => ActiveOutput::None, + }; + Ok(()) } /// Set or replace the current multi-packet response channel. - pub fn set_multi_packet(&mut self, channel: Option>) { - self.set_multi_packet_with_correlation(channel, None); + /// + /// # Errors + /// + /// Returns [`ConnectionStateError::ResponseActive`] if a response stream is + /// currently active. + pub fn set_multi_packet( + &mut self, + channel: Option>, + ) -> Result<(), ConnectionStateError> { + self.set_multi_packet_with_correlation(channel, None) } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. /// + /// # Errors + /// + /// Returns [`ConnectionStateError::ResponseActive`] if a response stream is + /// currently active. + /// /// # Examples /// /// ```no_run @@ -274,25 +445,28 @@ where /// # let shutdown = CancellationToken::new(); /// # let mut actor = ConnectionActor::new(queues, handle, None, shutdown); /// # let (_tx, rx) = mpsc::channel(4); - /// actor.set_multi_packet_with_correlation(Some(rx), Some(7)); + /// actor.set_multi_packet_with_correlation(Some(rx), Some(7))?; + /// # Ok::<(), wireframe::connection::ConnectionStateError>(()) /// ``` pub fn set_multi_packet_with_correlation( &mut self, channel: Option>, correlation_id: Option, - ) { - debug_assert!( - self.response.is_none(), - concat!( - "ConnectionActor invariant violated: cannot set multi_packet while a ", - "response stream is active" - ), - ); - self.multi_packet.install(channel, correlation_id); + ) -> Result<(), ConnectionStateError> { + if self.active_output.is_response() { + return Err(ConnectionStateError::ResponseActive); + } + self.active_output = match channel { + Some(rx) => { + let mut ctx = MultiPacketContext::new(); + ctx.install(Some(rx), correlation_id); + ActiveOutput::MultiPacket(ctx) + } + None => ActiveOutput::None, + }; + Ok(()) } - fn clear_multi_packet(&mut self) { self.multi_packet.clear(); } - /// Replace the low-priority queue used for tests. pub fn set_low_queue(&mut self, queue: Option>) { self.low_rx = queue; } @@ -321,12 +495,10 @@ where return Ok(()); } - debug_assert!( - usize::from(self.response.is_some()) + usize::from(self.multi_packet.is_active()) <= 1, - "ConnectionActor invariant violated: at most one of response or multi_packet may be \ - active" + let mut state = ActorState::new( + self.active_output.is_response(), + self.active_output.is_multi_packet(), ); - let mut state = ActorState::new(self.response.is_some(), self.multi_packet.is_active()); while !state.is_done() { self.poll_sources(&mut state, out).await?; @@ -339,6 +511,16 @@ where Ok(()) } + /// Compute which event sources are currently available for polling. + fn compute_availability(&self, state: &ActorState) -> EventAvailability { + EventAvailability { + high: self.high_rx.is_some(), + low: self.low_rx.is_some(), + multi_packet: self.active_output.is_multi_packet() && !state.is_shutting_down(), + response: self.active_output.is_response() && !state.is_shutting_down(), + } + } + /// Await the next ready event using biased priority ordering. /// /// Shutdown is observed first, followed by high-priority pushes, then @@ -355,19 +537,25 @@ where reason = "tokio::select! expands to modulus operations internally" )] async fn next_event(&mut self, state: &ActorState) -> Event { - let high_available = self.high_rx.is_some(); - let low_available = self.low_rx.is_some(); - let multi_available = self.multi_packet.is_active() && !state.is_shutting_down(); - let resp_available = self.response.is_some() && !state.is_shutting_down(); + let avail = self.compute_availability(state); + + // Extract mutable references before the select! to satisfy the borrow + // checker. Only one of these can be Some due to the ActiveOutput enum + // invariant. + let (multi_rx, response_stream) = match &mut self.active_output { + ActiveOutput::MultiPacket(ctx) => (ctx.channel_mut(), None), + ActiveOutput::Response(stream) => (None, Some(stream)), + ActiveOutput::None => (None, None), + }; tokio::select! { biased; () = Self::wait_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, - res = Self::poll_queue(self.high_rx.as_mut()), if high_available => Event::High(res), - res = Self::poll_queue(self.low_rx.as_mut()), if low_available => Event::Low(res), - res = Self::poll_queue(self.multi_packet.channel_mut()), if multi_available => Event::MultiPacket(res), - res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), + res = Self::poll_queue(self.high_rx.as_mut()), if avail.high => Event::High(res), + res = Self::poll_queue(self.low_rx.as_mut()), if avail.low => Event::Low(res), + res = Self::poll_queue(multi_rx), if avail.multi_packet => Event::MultiPacket(res), + res = Self::poll_response(response_stream), if avail.response => Event::Response(res), else => Event::Idle, } } diff --git a/src/connection/multi_packet.rs b/src/connection/multi_packet.rs index 05e678b6..c3a5de09 100644 --- a/src/connection/multi_packet.rs +++ b/src/connection/multi_packet.rs @@ -75,15 +75,8 @@ impl MultiPacketContext { self.stamp = stamp; } - pub(super) fn clear(&mut self) { - self.channel = None; - self.stamp = MultiPacketStamp::Disabled; - } - pub(super) fn channel_mut(&mut self) -> Option<&mut mpsc::Receiver> { self.channel.as_mut() } - pub(super) fn take_channel(&mut self) -> Option> { self.channel.take() } - /// Returns `true` if correlation stamping is enabled. pub(super) fn is_stamping_enabled(&self) -> bool { matches!(self.stamp, MultiPacketStamp::Enabled(_)) @@ -95,6 +88,4 @@ impl MultiPacketContext { MultiPacketStamp::Disabled => None, } } - - pub(super) fn is_active(&self) -> bool { self.channel.is_some() } } diff --git a/src/connection/response.rs b/src/connection/response.rs index c9cca750..91a5c9b1 100644 --- a/src/connection/response.rs +++ b/src/connection/response.rs @@ -28,7 +28,7 @@ where self.after_low(); } if is_none { - self.response = None; + self.active_output.clear_response(); } Ok(()) } @@ -57,7 +57,7 @@ where state.mark_closed(); // Stop polling the response after a protocol error to avoid // double-closing and duplicate `on_command_end` signalling. - self.response = None; + self.active_output.clear_response(); self.hooks.on_command_end(&mut self.ctx); crate::metrics::inc_handler_errors(); } diff --git a/src/connection/shutdown.rs b/src/connection/shutdown.rs index 13ad033a..8724d873 100644 --- a/src/connection/shutdown.rs +++ b/src/connection/shutdown.rs @@ -24,20 +24,23 @@ where if let Some(rx) = &mut self.low_rx { rx.close(); } - if self.multi_packet.is_active() { - let correlation = self.multi_packet.correlation_id(); - self.log_multi_packet_closure(MultiPacketTerminationReason::Shutdown, correlation); - if let Some(mut rx) = self.multi_packet.take_channel() { - rx.close(); - state.mark_closed(); - } - self.clear_multi_packet(); - self.hooks.on_command_end(&mut self.ctx); - } - if self.response.take().is_some() { + // Check if multi-packet is active before shutdown clears it + let was_multi_packet = self.active_output.is_multi_packet(); + let result = self.active_output.shutdown(); + + if result.source_closed { state.mark_closed(); } + if was_multi_packet { + self.log_multi_packet_closure( + MultiPacketTerminationReason::Shutdown, + result.correlation_id, + ); + } + if result.call_on_command_end { + self.hooks.on_command_end(&mut self.ctx); + } } /// Handle a closed multi-packet channel by emitting the protocol terminator @@ -48,17 +51,14 @@ where state: &mut ActorState, out: &mut Vec, ) { - let correlation = self.multi_packet.correlation_id(); - self.log_multi_packet_closure(reason, correlation); - if let Some(mut receiver) = self.multi_packet.take_channel() { - receiver.close(); - } + let result = self.active_output.close_multi_packet(); + self.log_multi_packet_closure(reason, result.correlation_id); state.mark_closed(); if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { self.emit_multi_packet_frame(frame, out); self.after_low(); } - self.clear_multi_packet(); + self.active_output.clear(); self.hooks.on_command_end(&mut self.ctx); } diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index 473ca0a9..ef7a75b4 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -113,8 +113,16 @@ impl ActorHarness { } /// Replace the multi-packet receiver. - pub fn set_multi_queue(&mut self, queue: Option>) { - self.actor.set_multi_packet(queue); + /// + /// # Errors + /// + /// Returns [`crate::connection::ConnectionStateError`] if a response stream + /// is currently active. + pub fn set_multi_queue( + &mut self, + queue: Option>, + ) -> Result<(), crate::connection::ConnectionStateError> { + self.actor.set_multi_packet(queue) } /// Returns `true` when the low-priority queue is still available. @@ -123,7 +131,7 @@ impl ActorHarness { /// Returns `true` when the multi-packet queue is still available. #[must_use] - pub fn has_multi_queue(&self) -> bool { self.actor.multi_packet.is_active() } + pub fn has_multi_queue(&self) -> bool { self.actor.active_output.is_multi_packet() } /// Process a multi-packet poll result. pub fn process_multi_packet(&mut self, res: Option) { @@ -238,7 +246,7 @@ mod tests { fn has_multi_queue_true_after_install() { let mut harness = ActorHarness::new().expect("build ActorHarness"); let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); assert!( harness.has_multi_queue(), "multi-packet queue should be active after install" @@ -249,12 +257,12 @@ mod tests { fn has_multi_queue_false_after_clear() { let mut harness = ActorHarness::new().expect("build ActorHarness"); let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); assert!( harness.has_multi_queue(), "multi-packet queue should be active after install" ); - harness.set_multi_queue(None); + harness.set_multi_queue(None).expect("clear multi_queue"); assert!( !harness.has_multi_queue(), "multi-packet queue should be inactive after clear" diff --git a/tests/connection.rs b/tests/connection.rs index 99598782..1350605f 100644 --- a/tests/connection.rs +++ b/tests/connection.rs @@ -260,7 +260,7 @@ fn process_multi_packet_none_emits_end_frame(harness_factory: HarnessFactory) -> .with_stream_end(|_| Some(9)), )?; let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx))?; harness.process_multi_packet(None); @@ -292,7 +292,7 @@ fn handle_multi_packet_closed_behaviour( .with_stream_end(move |_| terminator), )?; let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx))?; harness.handle_multi_packet_closed(); @@ -361,7 +361,7 @@ fn handle_multi_packet_closed_logs_reason( let (_tx, rx) = mpsc::channel(1); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(11)); + .set_multi_packet_with_correlation(Some(rx), Some(11))?; logger.clear(); harness.handle_multi_packet_closed(); assert_reason_logged(&mut logger, Level::Info, "drained", Some(11)); @@ -383,7 +383,7 @@ fn try_opportunistic_drain_multi_disconnect_logs_reason( let (tx, rx) = mpsc::channel(1); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(12)); + .set_multi_packet_with_correlation(Some(rx), Some(12))?; drop(tx); logger.clear(); let drained = harness.try_drain_multi(); @@ -409,7 +409,7 @@ fn start_shutdown_logs_reason( let (_tx, rx) = mpsc::channel(1); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(13)); + .set_multi_packet_with_correlation(Some(rx), Some(13))?; logger.clear(); harness.start_shutdown(); assert_reason_logged(&mut logger, Level::Info, "shutdown", Some(13)); @@ -429,7 +429,7 @@ fn try_opportunistic_drain_multi_disconnect_emits_terminator( .with_stream_end(|_| Some(5)), )?; let (tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx))?; drop(tx); let drained = harness.try_drain_multi(); diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index 3337dcee..ddb57694 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -79,7 +79,9 @@ async fn run_multi_packet_channel( shutdown, hooks, ); - actor.set_multi_packet_with_correlation(Some(rx), request_correlation); + actor + .set_multi_packet_with_correlation(Some(rx), request_correlation) + .map_err(|e| io::Error::other(format!("set_multi_packet_with_correlation: {e}")))?; let mut out = Vec::new(); actor diff --git a/tests/fixtures/correlation.rs b/tests/fixtures/correlation.rs index 7298757d..d0c6c814 100644 --- a/tests/fixtures/correlation.rs +++ b/tests/fixtures/correlation.rs @@ -80,7 +80,9 @@ impl CorrelationWorld { let shutdown = CancellationToken::new(); let mut actor: ConnectionActor = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet_with_correlation(Some(rx), expected); + actor + .set_multi_packet_with_correlation(Some(rx), expected) + .map_err(|e| format!("set_multi_packet_with_correlation failed: {e}"))?; actor .run(&mut self.frames) .await diff --git a/tests/fixtures/multi_packet.rs b/tests/fixtures/multi_packet.rs index cfc5ee7b..c38ae6f4 100644 --- a/tests/fixtures/multi_packet.rs +++ b/tests/fixtures/multi_packet.rs @@ -43,7 +43,9 @@ impl MultiPacketWorld { let shutdown = CancellationToken::new(); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| format!("set_multi_packet failed: {e}"))?; let mut frames = Vec::new(); actor diff --git a/tests/fixtures/stream_end.rs b/tests/fixtures/stream_end.rs index 0b01d274..f127db64 100644 --- a/tests/fixtures/stream_end.rs +++ b/tests/fixtures/stream_end.rs @@ -87,7 +87,9 @@ impl StreamEndWorld { shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| format!("set_multi_packet failed: {e}"))?; actor .run(&mut temp.frames) .await @@ -141,7 +143,8 @@ impl StreamEndWorld { let (tx, rx) = mpsc::channel(4); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(correlation_id)); + .set_multi_packet_with_correlation(Some(rx), Some(correlation_id)) + .map_err(|e| format!("set_multi_packet_with_correlation failed: {e}"))?; match mode { MultiPacketMode::Disconnect { send_frames } => { if *send_frames { diff --git a/tests/multi_packet.rs b/tests/multi_packet.rs index 383d8fa6..0a72cd26 100644 --- a/tests/multi_packet.rs +++ b/tests/multi_packet.rs @@ -99,7 +99,9 @@ async fn connection_actor_drains_multi_packet_channel( let (queues, handle, shutdown) = actor_components?; let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let mut out = Vec::new(); actor @@ -134,7 +136,9 @@ async fn connection_actor_interleaves_multi_packet_and_priority_frames( max_high_before_low: 1, time_slice: None, }); - actor.set_multi_packet(Some(multi_rx)); + actor + .set_multi_packet(Some(multi_rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let mut out = Vec::new(); actor @@ -154,7 +158,9 @@ async fn shutdown_completes_multi_packet_channel( let (queues, handle, shutdown) = actor_components?; let (tx, rx) = mpsc::channel(1); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let cancel = actor.shutdown_token(); @@ -188,7 +194,9 @@ async fn shutdown_during_active_multi_packet_send( let (queues, handle, shutdown) = actor_components?; let (tx, rx) = mpsc::channel(4); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let cancel = actor.shutdown_token(); diff --git a/tests/multi_packet_streaming.rs b/tests/multi_packet_streaming.rs index 07249de5..bf493867 100644 --- a/tests/multi_packet_streaming.rs +++ b/tests/multi_packet_streaming.rs @@ -129,7 +129,8 @@ async fn client_receives_multi_packet_stream_with_terminator() -> TestResult<()> harness .actor - .set_multi_packet_with_correlation(Some(rx), correlation); + .set_multi_packet_with_correlation(Some(rx), correlation) + .map_err(|e| format!("set_multi_packet_with_correlation: {e}"))?; harness.release_handle(); @@ -178,7 +179,8 @@ async fn multi_packet_logs_disconnected_when_sender_dropped( harness .actor - .set_multi_packet_with_correlation(Some(rx), correlation); + .set_multi_packet_with_correlation(Some(rx), correlation) + .map_err(|e| format!("set_multi_packet_with_correlation: {e}"))?; harness.actor.set_fairness(interleaving_fairness()); @@ -311,7 +313,8 @@ async fn interleaved_multi_packet_and_push_frames_preserve_correlations() -> Tes harness .actor - .set_multi_packet_with_correlation(Some(rx), stream_correlation); + .set_multi_packet_with_correlation(Some(rx), stream_correlation) + .map_err(|e| format!("set_multi_packet_with_correlation: {e}"))?; harness.actor.set_fairness(interleaving_fairness()); push_interleaved_frames(harness.handle()?).await?; diff --git a/tests/stream_end.rs b/tests/stream_end.rs index d4f68af4..ad605a4e 100644 --- a/tests/stream_end.rs +++ b/tests/stream_end.rs @@ -80,7 +80,9 @@ async fn multi_packet_emits_end_frame( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor @@ -121,7 +123,9 @@ async fn multi_packet_respects_no_terminator( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor @@ -150,7 +154,9 @@ async fn multi_packet_empty_channel_emits_end( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor @@ -188,7 +194,9 @@ async fn multi_packet_empty_channel_no_terminator_emits_nothing( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor From c7675700b92bf711abbf72d83ae4444fc45a68e0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 31 Jan 2026 01:58:04 +0000 Subject: [PATCH 2/8] Use thiserror for ConnectionStateError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace manual Display and Error implementations with thiserror derive macro annotations, co-locating error messages with their variants. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/connection/mod.rs | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 26f2153f..f4049e05 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -16,7 +16,6 @@ mod shutdown; mod state; use std::{ - fmt, net::SocketAddr, sync::{ Arc, @@ -28,6 +27,7 @@ use event::Event; use log::info; use multi_packet::MultiPacketContext; use state::ActorState; +use thiserror::Error; use tokio::{sync::mpsc, time::Duration}; use tokio_util::sync::CancellationToken; @@ -99,33 +99,18 @@ impl ConnectionChannels { /// Error returned when attempting to set an active output source while /// another source is already active. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Error)] pub enum ConnectionStateError { /// A multi-packet channel is currently active and must be cleared before /// setting a response stream. + #[error("cannot set response while a multi-packet channel is active")] MultiPacketActive, /// A response stream is currently active and must be cleared before /// setting a multi-packet channel. + #[error("cannot set multi-packet channel while a response stream is active")] ResponseActive, } -impl fmt::Display for ConnectionStateError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::MultiPacketActive => write!( - f, - "cannot set response while a multi-packet channel is active" - ), - Self::ResponseActive => write!( - f, - "cannot set multi-packet channel while a response stream is active" - ), - } - } -} - -impl std::error::Error for ConnectionStateError {} - /// Active output source for the connection actor. /// /// At most one output source can be active at a time. This enum makes the From 7adcbe40b25d9f8dd66e787cc0eeac6ff36145ca Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 31 Jan 2026 12:45:09 +0000 Subject: [PATCH 3/8] Extract ActiveOutput types into output submodule MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move ActiveOutput, ShutdownResult, MultiPacketCloseResult, and EventAvailability into a dedicated output submodule to reduce the size of the main connection module. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/connection/mod.rs | 118 +------------------------------------- src/connection/output.rs | 120 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 116 deletions(-) create mode 100644 src/connection/output.rs diff --git a/src/connection/mod.rs b/src/connection/mod.rs index f4049e05..46c90ed1 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -10,6 +10,7 @@ mod drain; mod event; mod frame; mod multi_packet; +mod output; mod polling; mod response; mod shutdown; @@ -26,6 +27,7 @@ use std::{ use event::Event; use log::info; use multi_packet::MultiPacketContext; +use output::{ActiveOutput, EventAvailability}; use state::ActorState; use thiserror::Error; use tokio::{sync::mpsc, time::Duration}; @@ -111,122 +113,6 @@ pub enum ConnectionStateError { ResponseActive, } -/// Active output source for the connection actor. -/// -/// At most one output source can be active at a time. This enum makes the -/// mutual exclusion compile-time enforced rather than runtime-asserted. -enum ActiveOutput { - /// No output source is active. - None, - /// A streaming response is active. - Response(FrameStream), - /// A multi-packet channel is active. - MultiPacket(MultiPacketContext), -} - -/// Result of shutting down an active output source. -struct ShutdownResult { - /// Correlation ID of the multi-packet context, if any. - correlation_id: Option, - /// Whether the source should be marked as closed in `ActorState`. - source_closed: bool, - /// Whether `on_command_end` hook should be called. - call_on_command_end: bool, -} - -/// Result of closing an active multi-packet channel. -struct MultiPacketCloseResult { - /// Correlation ID of the multi-packet context, if any. - correlation_id: Option, -} - -impl ActiveOutput { - /// Returns `true` if a streaming response is active. - fn is_response(&self) -> bool { matches!(self, Self::Response(_)) } - - /// Returns `true` if a multi-packet channel is active. - fn is_multi_packet(&self) -> bool { matches!(self, Self::MultiPacket(_)) } - - /// Returns a mutable reference to the multi-packet context if active. - fn multi_packet_mut(&mut self) -> Option<&mut MultiPacketContext> { - match self { - Self::MultiPacket(ctx) => Some(ctx), - _ => Option::None, - } - } - - /// Clears the response stream, leaving `None` in its place. - fn clear_response(&mut self) { - if matches!(self, Self::Response(_)) { - *self = Self::None; - } - } - - /// Perform shutdown cleanup and return the result. - /// - /// This takes ownership of the active output, closes any receivers, and - /// returns metadata needed by the caller to complete shutdown handling. - fn shutdown(&mut self) -> ShutdownResult { - match std::mem::replace(self, Self::None) { - Self::MultiPacket(mut ctx) => { - let correlation_id = ctx.correlation_id(); - let source_closed = if let Some(rx) = ctx.channel_mut() { - rx.close(); - true - } else { - false - }; - ShutdownResult { - correlation_id, - source_closed, - call_on_command_end: true, - } - } - Self::Response(_) => ShutdownResult { - correlation_id: None, - source_closed: true, - call_on_command_end: false, - }, - Self::None => ShutdownResult { - correlation_id: None, - source_closed: false, - call_on_command_end: false, - }, - } - } - - /// Begin closing a multi-packet channel. - /// - /// This closes the receiver and returns the correlation ID for logging, - /// but does NOT clear the context yet. The caller must call `clear()` after - /// emitting any terminator frames that need correlation IDs applied. - fn close_multi_packet(&mut self) -> MultiPacketCloseResult { - let correlation_id = self.multi_packet_mut().and_then(|ctx| ctx.correlation_id()); - if let Self::MultiPacket(ctx) = self - && let Some(rx) = ctx.channel_mut() - { - rx.close(); - } - MultiPacketCloseResult { correlation_id } - } - - /// Clear the active output to `None`. - fn clear(&mut self) { *self = Self::None; } -} - -/// Availability flags for event sources polled by the connection actor. -#[expect( - clippy::struct_excessive_bools, - reason = "Availability flags are a natural fit for booleans; no state machine needed" -)] -#[derive(Clone, Copy)] -struct EventAvailability { - high: bool, - low: bool, - multi_packet: bool, - response: bool, -} - impl Default for FairnessConfig { fn default() -> Self { Self { diff --git a/src/connection/output.rs b/src/connection/output.rs new file mode 100644 index 00000000..1304e09d --- /dev/null +++ b/src/connection/output.rs @@ -0,0 +1,120 @@ +//! Active output source types for the connection actor. + +use super::multi_packet::MultiPacketContext; +use crate::response::FrameStream; + +/// Active output source for the connection actor. +/// +/// At most one output source can be active at a time. This enum makes the +/// mutual exclusion compile-time enforced rather than runtime-asserted. +pub(super) enum ActiveOutput { + /// No output source is active. + None, + /// A streaming response is active. + Response(FrameStream), + /// A multi-packet channel is active. + MultiPacket(MultiPacketContext), +} + +/// Result of shutting down an active output source. +pub(super) struct ShutdownResult { + /// Correlation ID of the multi-packet context, if any. + pub(super) correlation_id: Option, + /// Whether the source should be marked as closed in `ActorState`. + pub(super) source_closed: bool, + /// Whether `on_command_end` hook should be called. + pub(super) call_on_command_end: bool, +} + +/// Result of closing an active multi-packet channel. +pub(super) struct MultiPacketCloseResult { + /// Correlation ID of the multi-packet context, if any. + pub(super) correlation_id: Option, +} + +/// Availability flags for event sources polled by the connection actor. +#[expect( + clippy::struct_excessive_bools, + reason = "Availability flags are a natural fit for booleans; no state machine needed" +)] +#[derive(Clone, Copy)] +pub(super) struct EventAvailability { + pub(super) high: bool, + pub(super) low: bool, + pub(super) multi_packet: bool, + pub(super) response: bool, +} + +impl ActiveOutput { + /// Returns `true` if a streaming response is active. + pub(super) fn is_response(&self) -> bool { matches!(self, Self::Response(_)) } + + /// Returns `true` if a multi-packet channel is active. + pub(super) fn is_multi_packet(&self) -> bool { matches!(self, Self::MultiPacket(_)) } + + /// Returns a mutable reference to the multi-packet context if active. + pub(super) fn multi_packet_mut(&mut self) -> Option<&mut MultiPacketContext> { + match self { + Self::MultiPacket(ctx) => Some(ctx), + _ => Option::None, + } + } + + /// Clears the response stream, leaving `None` in its place. + pub(super) fn clear_response(&mut self) { + if matches!(self, Self::Response(_)) { + *self = Self::None; + } + } + + /// Perform shutdown cleanup and return the result. + /// + /// This takes ownership of the active output, closes any receivers, and + /// returns metadata needed by the caller to complete shutdown handling. + pub(super) fn shutdown(&mut self) -> ShutdownResult { + match std::mem::replace(self, Self::None) { + Self::MultiPacket(mut ctx) => { + let correlation_id = ctx.correlation_id(); + let source_closed = if let Some(rx) = ctx.channel_mut() { + rx.close(); + true + } else { + false + }; + ShutdownResult { + correlation_id, + source_closed, + call_on_command_end: true, + } + } + Self::Response(_) => ShutdownResult { + correlation_id: None, + source_closed: true, + call_on_command_end: false, + }, + Self::None => ShutdownResult { + correlation_id: None, + source_closed: false, + call_on_command_end: false, + }, + } + } + + /// Begin closing a multi-packet channel. + /// + /// This closes the receiver and returns the correlation ID for logging, + /// but does NOT clear the context yet. The caller must call `clear()` after + /// emitting any terminator frames that need correlation IDs applied. + pub(super) fn close_multi_packet(&mut self) -> MultiPacketCloseResult { + let correlation_id = self.multi_packet_mut().and_then(|ctx| ctx.correlation_id()); + if let Self::MultiPacket(ctx) = self + && let Some(rx) = ctx.channel_mut() + { + rx.close(); + } + MultiPacketCloseResult { correlation_id } + } + + /// Clear the active output to `None`. + pub(super) fn clear(&mut self) { *self = Self::None; } +} From 8263d6631a8e7869abbe5315dbccf3dfd65eecc5 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 31 Jan 2026 16:20:27 +0000 Subject: [PATCH 4/8] Reduce connection/mod.rs from 451 to 397 lines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract types to dedicated submodules for better organization: - DrainContext and QueueKind moved to drain.rs - ActiveConnection counter types moved to new counter.rs - ConnectionChannels moved to new channels.rs All types are re-exported as needed to maintain the public API. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/connection/channels.rs | 17 ++++++++ src/connection/counter.rs | 32 +++++++++++++++ src/connection/drain.rs | 22 +++++++---- src/connection/mod.rs | 72 +++++----------------------------- src/connection/test_support.rs | 3 +- 5 files changed, 74 insertions(+), 72 deletions(-) create mode 100644 src/connection/channels.rs create mode 100644 src/connection/counter.rs diff --git a/src/connection/channels.rs b/src/connection/channels.rs new file mode 100644 index 00000000..b18cd14a --- /dev/null +++ b/src/connection/channels.rs @@ -0,0 +1,17 @@ +//! Connection channel bundling for actor construction. + +use crate::push::{PushHandle, PushQueues}; + +/// Bundles push queues with their shared handle for actor construction. +pub struct ConnectionChannels { + /// Receivers for high- and low-priority frames consumed by the actor. + pub queues: PushQueues, + /// Handle cloned by producers to enqueue frames into the shared queues. + pub handle: PushHandle, +} + +impl ConnectionChannels { + /// Create a new bundle of push queues and their associated handle. + #[must_use] + pub fn new(queues: PushQueues, handle: PushHandle) -> Self { Self { queues, handle } } +} diff --git a/src/connection/counter.rs b/src/connection/counter.rs new file mode 100644 index 00000000..c20f33a8 --- /dev/null +++ b/src/connection/counter.rs @@ -0,0 +1,32 @@ +//! Active connection counting and RAII guard. + +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Global gauge tracking active connections. +static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0); + +/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and +/// decrementing it on drop. +pub(super) struct ActiveConnection; + +impl ActiveConnection { + pub(super) fn new() -> Self { + ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed); + crate::metrics::inc_connections(); + Self + } +} + +impl Drop for ActiveConnection { + fn drop(&mut self) { + ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); + crate::metrics::dec_connections(); + } +} + +/// Return the current number of active connections. +#[must_use] +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } + +/// Load the current count for logging purposes. +pub(super) fn current_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } diff --git a/src/connection/drain.rs b/src/connection/drain.rs index b6de2fb6..2f93482c 100644 --- a/src/connection/drain.rs +++ b/src/connection/drain.rs @@ -2,15 +2,23 @@ use tokio::sync::mpsc::error::TryRecvError; -use super::{ - ConnectionActor, - DrainContext, - QueueKind, - multi_packet::MultiPacketTerminationReason, - state::ActorState, -}; +use super::{ConnectionActor, multi_packet::MultiPacketTerminationReason, state::ActorState}; use crate::{app::Packet, correlation::CorrelatableFrame, push::FrameLike}; +/// Context for drain operations containing mutable references to output and actor state. +pub(super) struct DrainContext<'a, F> { + pub(super) out: &'a mut Vec, + pub(super) state: &'a mut ActorState, +} + +/// Queue variants processed by the connection actor. +#[derive(Clone, Copy)] +pub(super) enum QueueKind { + High, + Low, + Multi, +} + impl ConnectionActor where F: FrameLike + CorrelatableFrame + Packet, diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 46c90ed1..6f07b558 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -5,6 +5,8 @@ //! `biased` keyword ensures high-priority messages are processed before //! low-priority ones, with streamed responses handled last. +mod channels; +mod counter; mod dispatch; mod drain; mod event; @@ -16,14 +18,11 @@ mod response; mod shutdown; mod state; -use std::{ - net::SocketAddr, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, -}; +use std::{net::SocketAddr, sync::Arc}; +pub use channels::ConnectionChannels; +use counter::ActiveConnection; +pub use counter::active_connection_count; use event::Event; use log::info; use multi_packet::MultiPacketContext; @@ -33,32 +32,6 @@ use thiserror::Error; use tokio::{sync::mpsc, time::Duration}; use tokio_util::sync::CancellationToken; -/// Global gauge tracking active connections. -static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0); - -/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and -/// decrementing it on drop. -struct ActiveConnection; - -impl ActiveConnection { - fn new() -> Self { - ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed); - crate::metrics::inc_connections(); - Self - } -} - -impl Drop for ActiveConnection { - fn drop(&mut self) { - ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); - crate::metrics::dec_connections(); - } -} - -/// Return the current number of active connections. -#[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } - use crate::{ app::Packet, correlation::CorrelatableFrame, @@ -85,20 +58,6 @@ pub struct FairnessConfig { pub time_slice: Option, } -/// Bundles push queues with their shared handle for actor construction. -pub struct ConnectionChannels { - /// Receivers for high- and low-priority frames consumed by the actor. - pub queues: PushQueues, - /// Handle cloned by producers to enqueue frames into the shared queues. - pub handle: PushHandle, -} - -impl ConnectionChannels { - /// Create a new bundle of push queues and their associated handle. - #[must_use] - pub fn new(queues: PushQueues, handle: PushHandle) -> Self { Self { queues, handle } } -} - /// Error returned when attempting to set an active output source while /// another source is already active. #[derive(Clone, Copy, Debug, PartialEq, Eq, Error)] @@ -158,20 +117,6 @@ pub struct ConnectionActor { peer_addr: Option, } -/// Context for drain operations containing mutable references to output and actor state. -struct DrainContext<'a, F> { - out: &'a mut Vec, - state: &'a mut ActorState, -} - -/// Queue variants processed by the connection actor. -#[derive(Clone, Copy)] -enum QueueKind { - High, - Low, - Multi, -} - impl ConnectionActor where F: FrameLike + CorrelatableFrame + Packet, @@ -237,10 +182,11 @@ where connection_id: None, peer_addr: None, }; - let current = ACTIVE_CONNECTIONS.load(Ordering::Relaxed); info!( "connection opened: wireframe_active_connections={}, id={:?}, peer={:?}", - current, actor.connection_id, actor.peer_addr + counter::current_count(), + actor.connection_id, + actor.peer_addr ); actor.hooks.on_connection_setup(handle, &mut actor.ctx); actor diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index ef7a75b4..ced909e2 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -9,8 +9,7 @@ use tokio_util::sync::CancellationToken; use super::{ ConnectionActor, ConnectionChannels, - DrainContext, - QueueKind, + drain::{DrainContext, QueueKind}, multi_packet::MultiPacketTerminationReason, state::ActorState, }; From 86bed8dda972d5db2c0aa73cfa2abc9a1f5ed1ec Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 1 Feb 2026 03:26:13 +0000 Subject: [PATCH 5/8] test(connection): parametrize multi-queue tests using rstest fixtures Refactored tests in src/connection/test_support.rs to use rstest fixtures and parameterized test cases. Consolidated three separate tests of the multi-queue state (default, after install, after clear) into a single parametrized test to improve readability and reduce duplication. Co-authored-by: terragon-labs[bot] --- src/connection/test_support.rs | 54 +++++++++++++++------------------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index ced909e2..a56607a5 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -228,43 +228,37 @@ pub async fn poll_queue_next(rx: Option<&mut mpsc::Receiver>) -> Option #[cfg(test)] mod tests { + use rstest::{fixture, rstest}; use tokio::sync::mpsc; use super::*; - #[test] - fn has_multi_queue_false_by_default() { - let harness = ActorHarness::new().expect("build ActorHarness"); - assert!( - !harness.has_multi_queue(), - "multi-packet queue should start inactive" - ); - } - - #[test] - fn has_multi_queue_true_after_install() { - let mut harness = ActorHarness::new().expect("build ActorHarness"); - let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); - assert!( - harness.has_multi_queue(), - "multi-packet queue should be active after install" - ); + #[fixture] + fn harness() -> ActorHarness { + ActorHarness::new().expect("build ActorHarness") } - #[test] - fn has_multi_queue_false_after_clear() { - let mut harness = ActorHarness::new().expect("build ActorHarness"); - let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); - assert!( + #[rstest] + #[case::default(false, false, false)] + #[case::install(true, false, true)] + #[case::clear(true, true, false)] + fn has_multi_queue_states( + #[case] install: bool, + #[case] clear: bool, + #[case] expected: bool, + mut harness: ActorHarness, + ) { + if install { + let (_tx, rx) = mpsc::channel(1); + harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); + } + if clear { + harness.set_multi_queue(None).expect("clear multi_queue"); + } + assert_eq!( harness.has_multi_queue(), - "multi-packet queue should be active after install" - ); - harness.set_multi_queue(None).expect("clear multi_queue"); - assert!( - !harness.has_multi_queue(), - "multi-packet queue should be inactive after clear" + expected, + "multi-packet queue state mismatch" ); } } From 851e350ce1466357ff1c6c0c20ecb552abbe55c1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 1 Feb 2026 03:33:44 +0000 Subject: [PATCH 6/8] docs(connection): add comment explaining ActorHarness fixture Added a comment to the harness() fixture in test_support.rs that explains it provides an ActorHarness for parameterised multi-queue state tests. Co-authored-by: terragon-labs[bot] --- src/connection/test_support.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index a56607a5..2ec55045 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -235,6 +235,7 @@ mod tests { #[fixture] fn harness() -> ActorHarness { + // Provides an ActorHarness for parameterised multi-queue state tests. ActorHarness::new().expect("build ActorHarness") } From 33c59c97705858f16aca109bdfeb741f0b4e2983 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 1 Feb 2026 03:53:43 +0000 Subject: [PATCH 7/8] test(connection): add module doc for ActorHarness tests with rstest Added documentation comment for the tests module describing that it contains unit tests for the `ActorHarness` fixture using parameterised `rstest` cases. Co-authored-by: terragon-labs[bot] --- src/connection/test_support.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index 2ec55045..8b2f08f3 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -228,6 +228,8 @@ pub async fn poll_queue_next(rx: Option<&mut mpsc::Receiver>) -> Option #[cfg(test)] mod tests { + //! Unit tests for the `ActorHarness` fixture using parameterised `rstest` cases. + use rstest::{fixture, rstest}; use tokio::sync::mpsc; From 18eda3537b764d2195e56c1f9be57992addc144f Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 1 Feb 2026 04:10:49 +0000 Subject: [PATCH 8/8] refactor(connection): refactor tests to use Result for error handling Changed the test support harness function and tests to return Result types with proper error propagation instead of panicking. This improves error handling and test reliability by using standard Result and ? operator instead of expect calls. Co-authored-by: terragon-labs[bot] --- src/connection/test_support.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index 8b2f08f3..6898d271 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -235,10 +235,12 @@ mod tests { use super::*; + type TestResult = Result>; + #[fixture] - fn harness() -> ActorHarness { + fn harness() -> TestResult { // Provides an ActorHarness for parameterised multi-queue state tests. - ActorHarness::new().expect("build ActorHarness") + ActorHarness::new().map_err(Into::into) } #[rstest] @@ -249,19 +251,19 @@ mod tests { #[case] install: bool, #[case] clear: bool, #[case] expected: bool, - mut harness: ActorHarness, - ) { + harness: TestResult, + ) -> TestResult<()> { + let mut harness = harness?; if install { let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)).expect("set_multi_queue"); + harness.set_multi_queue(Some(rx))?; } if clear { - harness.set_multi_queue(None).expect("clear multi_queue"); + harness.set_multi_queue(None)?; } - assert_eq!( - harness.has_multi_queue(), - expected, - "multi-packet queue state mismatch" - ); + if harness.has_multi_queue() != expected { + return Err("multi-packet queue state mismatch".into()); + } + Ok(()) } }