diff --git a/src/connection/channels.rs b/src/connection/channels.rs new file mode 100644 index 00000000..b18cd14a --- /dev/null +++ b/src/connection/channels.rs @@ -0,0 +1,17 @@ +//! Connection channel bundling for actor construction. + +use crate::push::{PushHandle, PushQueues}; + +/// Bundles push queues with their shared handle for actor construction. +pub struct ConnectionChannels { + /// Receivers for high- and low-priority frames consumed by the actor. + pub queues: PushQueues, + /// Handle cloned by producers to enqueue frames into the shared queues. + pub handle: PushHandle, +} + +impl ConnectionChannels { + /// Create a new bundle of push queues and their associated handle. + #[must_use] + pub fn new(queues: PushQueues, handle: PushHandle) -> Self { Self { queues, handle } } +} diff --git a/src/connection/counter.rs b/src/connection/counter.rs new file mode 100644 index 00000000..c20f33a8 --- /dev/null +++ b/src/connection/counter.rs @@ -0,0 +1,32 @@ +//! Active connection counting and RAII guard. + +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Global gauge tracking active connections. +static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0); + +/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and +/// decrementing it on drop. +pub(super) struct ActiveConnection; + +impl ActiveConnection { + pub(super) fn new() -> Self { + ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed); + crate::metrics::inc_connections(); + Self + } +} + +impl Drop for ActiveConnection { + fn drop(&mut self) { + ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); + crate::metrics::dec_connections(); + } +} + +/// Return the current number of active connections. +#[must_use] +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } + +/// Load the current count for logging purposes. +pub(super) fn current_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } diff --git a/src/connection/drain.rs b/src/connection/drain.rs index 3f742acb..2f93482c 100644 --- a/src/connection/drain.rs +++ b/src/connection/drain.rs @@ -2,15 +2,23 @@ use tokio::sync::mpsc::error::TryRecvError; -use super::{ - ConnectionActor, - DrainContext, - QueueKind, - multi_packet::MultiPacketTerminationReason, - state::ActorState, -}; +use super::{ConnectionActor, multi_packet::MultiPacketTerminationReason, state::ActorState}; use crate::{app::Packet, correlation::CorrelatableFrame, push::FrameLike}; +/// Context for drain operations containing mutable references to output and actor state. +pub(super) struct DrainContext<'a, F> { + pub(super) out: &'a mut Vec, + pub(super) state: &'a mut ActorState, +} + +/// Queue variants processed by the connection actor. +#[derive(Clone, Copy)] +pub(super) enum QueueKind { + High, + Low, + Multi, +} + impl ConnectionActor where F: FrameLike + CorrelatableFrame + Packet, @@ -36,8 +44,12 @@ where let DrainContext { out, state } = ctx; match res { Some(frame) => { + let is_stamping = self + .active_output + .multi_packet_mut() + .is_some_and(|ctx| ctx.is_stamping_enabled()); match kind { - QueueKind::Multi if self.multi_packet.is_stamping_enabled() => { + QueueKind::Multi if is_stamping => { self.emit_multi_packet_frame(frame, out); } _ => { @@ -144,8 +156,11 @@ where } } QueueKind::Multi => { - let result = match self.multi_packet.channel_mut() { - Some(rx) => rx.try_recv(), + let result = match self.active_output.multi_packet_mut() { + Some(ctx) => match ctx.channel_mut() { + Some(rx) => rx.try_recv(), + None => return false, + }, None => return false, }; diff --git a/src/connection/frame.rs b/src/connection/frame.rs index cbd1774f..6b821684 100644 --- a/src/connection/frame.rs +++ b/src/connection/frame.rs @@ -23,12 +23,16 @@ where /// Apply correlation stamping to a multi-packet frame. pub(super) fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - if !self.multi_packet.is_stamping_enabled() { + let Some(ctx) = self.active_output.multi_packet_mut() else { // No channel is active, so there is nothing to stamp. return; + }; + + if !ctx.is_stamping_enabled() { + return; } - let correlation_id = self.multi_packet.correlation_id(); + let correlation_id = ctx.correlation_id(); frame.set_correlation_id(correlation_id); if let Some(expected) = correlation_id { diff --git a/src/connection/mod.rs b/src/connection/mod.rs index deaf4313..6f07b558 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -5,57 +5,33 @@ //! `biased` keyword ensures high-priority messages are processed before //! low-priority ones, with streamed responses handled last. +mod channels; +mod counter; mod dispatch; mod drain; mod event; mod frame; mod multi_packet; +mod output; mod polling; mod response; mod shutdown; mod state; -use std::{ - net::SocketAddr, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, -}; +use std::{net::SocketAddr, sync::Arc}; +pub use channels::ConnectionChannels; +use counter::ActiveConnection; +pub use counter::active_connection_count; use event::Event; use log::info; use multi_packet::MultiPacketContext; +use output::{ActiveOutput, EventAvailability}; use state::ActorState; +use thiserror::Error; use tokio::{sync::mpsc, time::Duration}; use tokio_util::sync::CancellationToken; -/// Global gauge tracking active connections. -static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0); - -/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and -/// decrementing it on drop. -struct ActiveConnection; - -impl ActiveConnection { - fn new() -> Self { - ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed); - crate::metrics::inc_connections(); - Self - } -} - -impl Drop for ActiveConnection { - fn drop(&mut self) { - ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); - crate::metrics::dec_connections(); - } -} - -/// Return the current number of active connections. -#[must_use] -pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } - use crate::{ app::Packet, correlation::CorrelatableFrame, @@ -82,18 +58,18 @@ pub struct FairnessConfig { pub time_slice: Option, } -/// Bundles push queues with their shared handle for actor construction. -pub struct ConnectionChannels { - /// Receivers for high- and low-priority frames consumed by the actor. - pub queues: PushQueues, - /// Handle cloned by producers to enqueue frames into the shared queues. - pub handle: PushHandle, -} - -impl ConnectionChannels { - /// Create a new bundle of push queues and their associated handle. - #[must_use] - pub fn new(queues: PushQueues, handle: PushHandle) -> Self { Self { queues, handle } } +/// Error returned when attempting to set an active output source while +/// another source is already active. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Error)] +pub enum ConnectionStateError { + /// A multi-packet channel is currently active and must be cleared before + /// setting a response stream. + #[error("cannot set response while a multi-packet channel is active")] + MultiPacketActive, + /// A response stream is currently active and must be cleared before + /// setting a multi-packet channel. + #[error("cannot set multi-packet channel while a response stream is active")] + ResponseActive, } impl Default for FairnessConfig { @@ -125,11 +101,12 @@ impl Default for FairnessConfig { pub struct ConnectionActor { high_rx: Option>, low_rx: Option>, - response: Option>, // current streaming response - /// Optional multi-packet channel drained after low-priority frames. - /// This preserves fairness with queued sources. + /// Active output source: either a streaming response or a multi-packet channel. + /// + /// At most one output source can be active at a time. The multi-packet channel + /// is drained after low-priority frames to preserve fairness with queued sources. /// The actor emits the protocol terminator when the sender closes the channel. - multi_packet: MultiPacketContext, + active_output: ActiveOutput, shutdown: CancellationToken, counter: Option, hooks: ProtocolHooks, @@ -140,20 +117,6 @@ pub struct ConnectionActor { peer_addr: Option, } -/// Context for drain operations containing mutable references to output and actor state. -struct DrainContext<'a, F> { - out: &'a mut Vec, - state: &'a mut ActorState, -} - -/// Queue variants processed by the connection actor. -#[derive(Clone, Copy)] -enum QueueKind { - High, - Low, - Multi, -} - impl ConnectionActor where F: FrameLike + CorrelatableFrame + Packet, @@ -202,11 +165,14 @@ where let ConnectionChannels { queues, handle } = channels; let ctx = ConnectionContext; let counter = ActiveConnection::new(); + let active_output = match response { + Some(stream) => ActiveOutput::Response(stream), + None => ActiveOutput::None, + }; let mut actor = Self { high_rx: Some(queues.high_priority_rx), low_rx: Some(queues.low_priority_rx), - response, - multi_packet: MultiPacketContext::new(), + active_output, shutdown, counter: Some(counter), hooks, @@ -216,10 +182,11 @@ where connection_id: None, peer_addr: None, }; - let current = ACTIVE_CONNECTIONS.load(Ordering::Relaxed); info!( "connection opened: wireframe_active_connections={}, id={:?}, peer={:?}", - current, actor.connection_id, actor.peer_addr + counter::current_count(), + actor.connection_id, + actor.peer_addr ); actor.hooks.on_connection_setup(handle, &mut actor.ctx); actor @@ -242,24 +209,45 @@ where } /// Set or replace the current streaming response. - pub fn set_response(&mut self, stream: Option>) { - debug_assert!( - !self.multi_packet.is_active(), - concat!( - "ConnectionActor invariant violated: cannot set response while a ", - "multi_packet channel is active" - ), - ); - self.response = stream; + /// + /// # Errors + /// + /// Returns [`ConnectionStateError::MultiPacketActive`] if a multi-packet + /// channel is currently active. + pub fn set_response( + &mut self, + stream: Option>, + ) -> Result<(), ConnectionStateError> { + if self.active_output.is_multi_packet() { + return Err(ConnectionStateError::MultiPacketActive); + } + self.active_output = match stream { + Some(s) => ActiveOutput::Response(s), + None => ActiveOutput::None, + }; + Ok(()) } /// Set or replace the current multi-packet response channel. - pub fn set_multi_packet(&mut self, channel: Option>) { - self.set_multi_packet_with_correlation(channel, None); + /// + /// # Errors + /// + /// Returns [`ConnectionStateError::ResponseActive`] if a response stream is + /// currently active. + pub fn set_multi_packet( + &mut self, + channel: Option>, + ) -> Result<(), ConnectionStateError> { + self.set_multi_packet_with_correlation(channel, None) } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. /// + /// # Errors + /// + /// Returns [`ConnectionStateError::ResponseActive`] if a response stream is + /// currently active. + /// /// # Examples /// /// ```no_run @@ -274,25 +262,28 @@ where /// # let shutdown = CancellationToken::new(); /// # let mut actor = ConnectionActor::new(queues, handle, None, shutdown); /// # let (_tx, rx) = mpsc::channel(4); - /// actor.set_multi_packet_with_correlation(Some(rx), Some(7)); + /// actor.set_multi_packet_with_correlation(Some(rx), Some(7))?; + /// # Ok::<(), wireframe::connection::ConnectionStateError>(()) /// ``` pub fn set_multi_packet_with_correlation( &mut self, channel: Option>, correlation_id: Option, - ) { - debug_assert!( - self.response.is_none(), - concat!( - "ConnectionActor invariant violated: cannot set multi_packet while a ", - "response stream is active" - ), - ); - self.multi_packet.install(channel, correlation_id); + ) -> Result<(), ConnectionStateError> { + if self.active_output.is_response() { + return Err(ConnectionStateError::ResponseActive); + } + self.active_output = match channel { + Some(rx) => { + let mut ctx = MultiPacketContext::new(); + ctx.install(Some(rx), correlation_id); + ActiveOutput::MultiPacket(ctx) + } + None => ActiveOutput::None, + }; + Ok(()) } - fn clear_multi_packet(&mut self) { self.multi_packet.clear(); } - /// Replace the low-priority queue used for tests. pub fn set_low_queue(&mut self, queue: Option>) { self.low_rx = queue; } @@ -321,12 +312,10 @@ where return Ok(()); } - debug_assert!( - usize::from(self.response.is_some()) + usize::from(self.multi_packet.is_active()) <= 1, - "ConnectionActor invariant violated: at most one of response or multi_packet may be \ - active" + let mut state = ActorState::new( + self.active_output.is_response(), + self.active_output.is_multi_packet(), ); - let mut state = ActorState::new(self.response.is_some(), self.multi_packet.is_active()); while !state.is_done() { self.poll_sources(&mut state, out).await?; @@ -339,6 +328,16 @@ where Ok(()) } + /// Compute which event sources are currently available for polling. + fn compute_availability(&self, state: &ActorState) -> EventAvailability { + EventAvailability { + high: self.high_rx.is_some(), + low: self.low_rx.is_some(), + multi_packet: self.active_output.is_multi_packet() && !state.is_shutting_down(), + response: self.active_output.is_response() && !state.is_shutting_down(), + } + } + /// Await the next ready event using biased priority ordering. /// /// Shutdown is observed first, followed by high-priority pushes, then @@ -355,19 +354,25 @@ where reason = "tokio::select! expands to modulus operations internally" )] async fn next_event(&mut self, state: &ActorState) -> Event { - let high_available = self.high_rx.is_some(); - let low_available = self.low_rx.is_some(); - let multi_available = self.multi_packet.is_active() && !state.is_shutting_down(); - let resp_available = self.response.is_some() && !state.is_shutting_down(); + let avail = self.compute_availability(state); + + // Extract mutable references before the select! to satisfy the borrow + // checker. Only one of these can be Some due to the ActiveOutput enum + // invariant. + let (multi_rx, response_stream) = match &mut self.active_output { + ActiveOutput::MultiPacket(ctx) => (ctx.channel_mut(), None), + ActiveOutput::Response(stream) => (None, Some(stream)), + ActiveOutput::None => (None, None), + }; tokio::select! { biased; () = Self::wait_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, - res = Self::poll_queue(self.high_rx.as_mut()), if high_available => Event::High(res), - res = Self::poll_queue(self.low_rx.as_mut()), if low_available => Event::Low(res), - res = Self::poll_queue(self.multi_packet.channel_mut()), if multi_available => Event::MultiPacket(res), - res = Self::poll_response(self.response.as_mut()), if resp_available => Event::Response(res), + res = Self::poll_queue(self.high_rx.as_mut()), if avail.high => Event::High(res), + res = Self::poll_queue(self.low_rx.as_mut()), if avail.low => Event::Low(res), + res = Self::poll_queue(multi_rx), if avail.multi_packet => Event::MultiPacket(res), + res = Self::poll_response(response_stream), if avail.response => Event::Response(res), else => Event::Idle, } } diff --git a/src/connection/multi_packet.rs b/src/connection/multi_packet.rs index 05e678b6..c3a5de09 100644 --- a/src/connection/multi_packet.rs +++ b/src/connection/multi_packet.rs @@ -75,15 +75,8 @@ impl MultiPacketContext { self.stamp = stamp; } - pub(super) fn clear(&mut self) { - self.channel = None; - self.stamp = MultiPacketStamp::Disabled; - } - pub(super) fn channel_mut(&mut self) -> Option<&mut mpsc::Receiver> { self.channel.as_mut() } - pub(super) fn take_channel(&mut self) -> Option> { self.channel.take() } - /// Returns `true` if correlation stamping is enabled. pub(super) fn is_stamping_enabled(&self) -> bool { matches!(self.stamp, MultiPacketStamp::Enabled(_)) @@ -95,6 +88,4 @@ impl MultiPacketContext { MultiPacketStamp::Disabled => None, } } - - pub(super) fn is_active(&self) -> bool { self.channel.is_some() } } diff --git a/src/connection/output.rs b/src/connection/output.rs new file mode 100644 index 00000000..1304e09d --- /dev/null +++ b/src/connection/output.rs @@ -0,0 +1,120 @@ +//! Active output source types for the connection actor. + +use super::multi_packet::MultiPacketContext; +use crate::response::FrameStream; + +/// Active output source for the connection actor. +/// +/// At most one output source can be active at a time. This enum makes the +/// mutual exclusion compile-time enforced rather than runtime-asserted. +pub(super) enum ActiveOutput { + /// No output source is active. + None, + /// A streaming response is active. + Response(FrameStream), + /// A multi-packet channel is active. + MultiPacket(MultiPacketContext), +} + +/// Result of shutting down an active output source. +pub(super) struct ShutdownResult { + /// Correlation ID of the multi-packet context, if any. + pub(super) correlation_id: Option, + /// Whether the source should be marked as closed in `ActorState`. + pub(super) source_closed: bool, + /// Whether `on_command_end` hook should be called. + pub(super) call_on_command_end: bool, +} + +/// Result of closing an active multi-packet channel. +pub(super) struct MultiPacketCloseResult { + /// Correlation ID of the multi-packet context, if any. + pub(super) correlation_id: Option, +} + +/// Availability flags for event sources polled by the connection actor. +#[expect( + clippy::struct_excessive_bools, + reason = "Availability flags are a natural fit for booleans; no state machine needed" +)] +#[derive(Clone, Copy)] +pub(super) struct EventAvailability { + pub(super) high: bool, + pub(super) low: bool, + pub(super) multi_packet: bool, + pub(super) response: bool, +} + +impl ActiveOutput { + /// Returns `true` if a streaming response is active. + pub(super) fn is_response(&self) -> bool { matches!(self, Self::Response(_)) } + + /// Returns `true` if a multi-packet channel is active. + pub(super) fn is_multi_packet(&self) -> bool { matches!(self, Self::MultiPacket(_)) } + + /// Returns a mutable reference to the multi-packet context if active. + pub(super) fn multi_packet_mut(&mut self) -> Option<&mut MultiPacketContext> { + match self { + Self::MultiPacket(ctx) => Some(ctx), + _ => Option::None, + } + } + + /// Clears the response stream, leaving `None` in its place. + pub(super) fn clear_response(&mut self) { + if matches!(self, Self::Response(_)) { + *self = Self::None; + } + } + + /// Perform shutdown cleanup and return the result. + /// + /// This takes ownership of the active output, closes any receivers, and + /// returns metadata needed by the caller to complete shutdown handling. + pub(super) fn shutdown(&mut self) -> ShutdownResult { + match std::mem::replace(self, Self::None) { + Self::MultiPacket(mut ctx) => { + let correlation_id = ctx.correlation_id(); + let source_closed = if let Some(rx) = ctx.channel_mut() { + rx.close(); + true + } else { + false + }; + ShutdownResult { + correlation_id, + source_closed, + call_on_command_end: true, + } + } + Self::Response(_) => ShutdownResult { + correlation_id: None, + source_closed: true, + call_on_command_end: false, + }, + Self::None => ShutdownResult { + correlation_id: None, + source_closed: false, + call_on_command_end: false, + }, + } + } + + /// Begin closing a multi-packet channel. + /// + /// This closes the receiver and returns the correlation ID for logging, + /// but does NOT clear the context yet. The caller must call `clear()` after + /// emitting any terminator frames that need correlation IDs applied. + pub(super) fn close_multi_packet(&mut self) -> MultiPacketCloseResult { + let correlation_id = self.multi_packet_mut().and_then(|ctx| ctx.correlation_id()); + if let Self::MultiPacket(ctx) = self + && let Some(rx) = ctx.channel_mut() + { + rx.close(); + } + MultiPacketCloseResult { correlation_id } + } + + /// Clear the active output to `None`. + pub(super) fn clear(&mut self) { *self = Self::None; } +} diff --git a/src/connection/response.rs b/src/connection/response.rs index c9cca750..91a5c9b1 100644 --- a/src/connection/response.rs +++ b/src/connection/response.rs @@ -28,7 +28,7 @@ where self.after_low(); } if is_none { - self.response = None; + self.active_output.clear_response(); } Ok(()) } @@ -57,7 +57,7 @@ where state.mark_closed(); // Stop polling the response after a protocol error to avoid // double-closing and duplicate `on_command_end` signalling. - self.response = None; + self.active_output.clear_response(); self.hooks.on_command_end(&mut self.ctx); crate::metrics::inc_handler_errors(); } diff --git a/src/connection/shutdown.rs b/src/connection/shutdown.rs index 13ad033a..8724d873 100644 --- a/src/connection/shutdown.rs +++ b/src/connection/shutdown.rs @@ -24,20 +24,23 @@ where if let Some(rx) = &mut self.low_rx { rx.close(); } - if self.multi_packet.is_active() { - let correlation = self.multi_packet.correlation_id(); - self.log_multi_packet_closure(MultiPacketTerminationReason::Shutdown, correlation); - if let Some(mut rx) = self.multi_packet.take_channel() { - rx.close(); - state.mark_closed(); - } - self.clear_multi_packet(); - self.hooks.on_command_end(&mut self.ctx); - } - if self.response.take().is_some() { + // Check if multi-packet is active before shutdown clears it + let was_multi_packet = self.active_output.is_multi_packet(); + let result = self.active_output.shutdown(); + + if result.source_closed { state.mark_closed(); } + if was_multi_packet { + self.log_multi_packet_closure( + MultiPacketTerminationReason::Shutdown, + result.correlation_id, + ); + } + if result.call_on_command_end { + self.hooks.on_command_end(&mut self.ctx); + } } /// Handle a closed multi-packet channel by emitting the protocol terminator @@ -48,17 +51,14 @@ where state: &mut ActorState, out: &mut Vec, ) { - let correlation = self.multi_packet.correlation_id(); - self.log_multi_packet_closure(reason, correlation); - if let Some(mut receiver) = self.multi_packet.take_channel() { - receiver.close(); - } + let result = self.active_output.close_multi_packet(); + self.log_multi_packet_closure(reason, result.correlation_id); state.mark_closed(); if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { self.emit_multi_packet_frame(frame, out); self.after_low(); } - self.clear_multi_packet(); + self.active_output.clear(); self.hooks.on_command_end(&mut self.ctx); } diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index 473ca0a9..6898d271 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -9,8 +9,7 @@ use tokio_util::sync::CancellationToken; use super::{ ConnectionActor, ConnectionChannels, - DrainContext, - QueueKind, + drain::{DrainContext, QueueKind}, multi_packet::MultiPacketTerminationReason, state::ActorState, }; @@ -113,8 +112,16 @@ impl ActorHarness { } /// Replace the multi-packet receiver. - pub fn set_multi_queue(&mut self, queue: Option>) { - self.actor.set_multi_packet(queue); + /// + /// # Errors + /// + /// Returns [`crate::connection::ConnectionStateError`] if a response stream + /// is currently active. + pub fn set_multi_queue( + &mut self, + queue: Option>, + ) -> Result<(), crate::connection::ConnectionStateError> { + self.actor.set_multi_packet(queue) } /// Returns `true` when the low-priority queue is still available. @@ -123,7 +130,7 @@ impl ActorHarness { /// Returns `true` when the multi-packet queue is still available. #[must_use] - pub fn has_multi_queue(&self) -> bool { self.actor.multi_packet.is_active() } + pub fn has_multi_queue(&self) -> bool { self.actor.active_output.is_multi_packet() } /// Process a multi-packet poll result. pub fn process_multi_packet(&mut self, res: Option) { @@ -221,43 +228,42 @@ pub async fn poll_queue_next(rx: Option<&mut mpsc::Receiver>) -> Option #[cfg(test)] mod tests { + //! Unit tests for the `ActorHarness` fixture using parameterised `rstest` cases. + + use rstest::{fixture, rstest}; use tokio::sync::mpsc; use super::*; - #[test] - fn has_multi_queue_false_by_default() { - let harness = ActorHarness::new().expect("build ActorHarness"); - assert!( - !harness.has_multi_queue(), - "multi-packet queue should start inactive" - ); - } + type TestResult = Result>; - #[test] - fn has_multi_queue_true_after_install() { - let mut harness = ActorHarness::new().expect("build ActorHarness"); - let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); - assert!( - harness.has_multi_queue(), - "multi-packet queue should be active after install" - ); + #[fixture] + fn harness() -> TestResult { + // Provides an ActorHarness for parameterised multi-queue state tests. + ActorHarness::new().map_err(Into::into) } - #[test] - fn has_multi_queue_false_after_clear() { - let mut harness = ActorHarness::new().expect("build ActorHarness"); - let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); - assert!( - harness.has_multi_queue(), - "multi-packet queue should be active after install" - ); - harness.set_multi_queue(None); - assert!( - !harness.has_multi_queue(), - "multi-packet queue should be inactive after clear" - ); + #[rstest] + #[case::default(false, false, false)] + #[case::install(true, false, true)] + #[case::clear(true, true, false)] + fn has_multi_queue_states( + #[case] install: bool, + #[case] clear: bool, + #[case] expected: bool, + harness: TestResult, + ) -> TestResult<()> { + let mut harness = harness?; + if install { + let (_tx, rx) = mpsc::channel(1); + harness.set_multi_queue(Some(rx))?; + } + if clear { + harness.set_multi_queue(None)?; + } + if harness.has_multi_queue() != expected { + return Err("multi-packet queue state mismatch".into()); + } + Ok(()) } } diff --git a/tests/connection.rs b/tests/connection.rs index 99598782..1350605f 100644 --- a/tests/connection.rs +++ b/tests/connection.rs @@ -260,7 +260,7 @@ fn process_multi_packet_none_emits_end_frame(harness_factory: HarnessFactory) -> .with_stream_end(|_| Some(9)), )?; let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx))?; harness.process_multi_packet(None); @@ -292,7 +292,7 @@ fn handle_multi_packet_closed_behaviour( .with_stream_end(move |_| terminator), )?; let (_tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx))?; harness.handle_multi_packet_closed(); @@ -361,7 +361,7 @@ fn handle_multi_packet_closed_logs_reason( let (_tx, rx) = mpsc::channel(1); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(11)); + .set_multi_packet_with_correlation(Some(rx), Some(11))?; logger.clear(); harness.handle_multi_packet_closed(); assert_reason_logged(&mut logger, Level::Info, "drained", Some(11)); @@ -383,7 +383,7 @@ fn try_opportunistic_drain_multi_disconnect_logs_reason( let (tx, rx) = mpsc::channel(1); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(12)); + .set_multi_packet_with_correlation(Some(rx), Some(12))?; drop(tx); logger.clear(); let drained = harness.try_drain_multi(); @@ -409,7 +409,7 @@ fn start_shutdown_logs_reason( let (_tx, rx) = mpsc::channel(1); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(13)); + .set_multi_packet_with_correlation(Some(rx), Some(13))?; logger.clear(); harness.start_shutdown(); assert_reason_logged(&mut logger, Level::Info, "shutdown", Some(13)); @@ -429,7 +429,7 @@ fn try_opportunistic_drain_multi_disconnect_emits_terminator( .with_stream_end(|_| Some(5)), )?; let (tx, rx) = mpsc::channel(1); - harness.set_multi_queue(Some(rx)); + harness.set_multi_queue(Some(rx))?; drop(tx); let drained = harness.try_drain_multi(); diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index 3337dcee..ddb57694 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -79,7 +79,9 @@ async fn run_multi_packet_channel( shutdown, hooks, ); - actor.set_multi_packet_with_correlation(Some(rx), request_correlation); + actor + .set_multi_packet_with_correlation(Some(rx), request_correlation) + .map_err(|e| io::Error::other(format!("set_multi_packet_with_correlation: {e}")))?; let mut out = Vec::new(); actor diff --git a/tests/fixtures/correlation.rs b/tests/fixtures/correlation.rs index 7298757d..d0c6c814 100644 --- a/tests/fixtures/correlation.rs +++ b/tests/fixtures/correlation.rs @@ -80,7 +80,9 @@ impl CorrelationWorld { let shutdown = CancellationToken::new(); let mut actor: ConnectionActor = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet_with_correlation(Some(rx), expected); + actor + .set_multi_packet_with_correlation(Some(rx), expected) + .map_err(|e| format!("set_multi_packet_with_correlation failed: {e}"))?; actor .run(&mut self.frames) .await diff --git a/tests/fixtures/multi_packet.rs b/tests/fixtures/multi_packet.rs index cfc5ee7b..c38ae6f4 100644 --- a/tests/fixtures/multi_packet.rs +++ b/tests/fixtures/multi_packet.rs @@ -43,7 +43,9 @@ impl MultiPacketWorld { let shutdown = CancellationToken::new(); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| format!("set_multi_packet failed: {e}"))?; let mut frames = Vec::new(); actor diff --git a/tests/fixtures/stream_end.rs b/tests/fixtures/stream_end.rs index 0b01d274..f127db64 100644 --- a/tests/fixtures/stream_end.rs +++ b/tests/fixtures/stream_end.rs @@ -87,7 +87,9 @@ impl StreamEndWorld { shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| format!("set_multi_packet failed: {e}"))?; actor .run(&mut temp.frames) .await @@ -141,7 +143,8 @@ impl StreamEndWorld { let (tx, rx) = mpsc::channel(4); harness .actor_mut() - .set_multi_packet_with_correlation(Some(rx), Some(correlation_id)); + .set_multi_packet_with_correlation(Some(rx), Some(correlation_id)) + .map_err(|e| format!("set_multi_packet_with_correlation failed: {e}"))?; match mode { MultiPacketMode::Disconnect { send_frames } => { if *send_frames { diff --git a/tests/multi_packet.rs b/tests/multi_packet.rs index 383d8fa6..0a72cd26 100644 --- a/tests/multi_packet.rs +++ b/tests/multi_packet.rs @@ -99,7 +99,9 @@ async fn connection_actor_drains_multi_packet_channel( let (queues, handle, shutdown) = actor_components?; let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let mut out = Vec::new(); actor @@ -134,7 +136,9 @@ async fn connection_actor_interleaves_multi_packet_and_priority_frames( max_high_before_low: 1, time_slice: None, }); - actor.set_multi_packet(Some(multi_rx)); + actor + .set_multi_packet(Some(multi_rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let mut out = Vec::new(); actor @@ -154,7 +158,9 @@ async fn shutdown_completes_multi_packet_channel( let (queues, handle, shutdown) = actor_components?; let (tx, rx) = mpsc::channel(1); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let cancel = actor.shutdown_token(); @@ -188,7 +194,9 @@ async fn shutdown_during_active_multi_packet_send( let (queues, handle, shutdown) = actor_components?; let (tx, rx) = mpsc::channel(4); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| boxed_err("set_multi_packet error", e))?; let cancel = actor.shutdown_token(); diff --git a/tests/multi_packet_streaming.rs b/tests/multi_packet_streaming.rs index 07249de5..bf493867 100644 --- a/tests/multi_packet_streaming.rs +++ b/tests/multi_packet_streaming.rs @@ -129,7 +129,8 @@ async fn client_receives_multi_packet_stream_with_terminator() -> TestResult<()> harness .actor - .set_multi_packet_with_correlation(Some(rx), correlation); + .set_multi_packet_with_correlation(Some(rx), correlation) + .map_err(|e| format!("set_multi_packet_with_correlation: {e}"))?; harness.release_handle(); @@ -178,7 +179,8 @@ async fn multi_packet_logs_disconnected_when_sender_dropped( harness .actor - .set_multi_packet_with_correlation(Some(rx), correlation); + .set_multi_packet_with_correlation(Some(rx), correlation) + .map_err(|e| format!("set_multi_packet_with_correlation: {e}"))?; harness.actor.set_fairness(interleaving_fairness()); @@ -311,7 +313,8 @@ async fn interleaved_multi_packet_and_push_frames_preserve_correlations() -> Tes harness .actor - .set_multi_packet_with_correlation(Some(rx), stream_correlation); + .set_multi_packet_with_correlation(Some(rx), stream_correlation) + .map_err(|e| format!("set_multi_packet_with_correlation: {e}"))?; harness.actor.set_fairness(interleaving_fairness()); push_interleaved_frames(harness.handle()?).await?; diff --git a/tests/stream_end.rs b/tests/stream_end.rs index d4f68af4..ad605a4e 100644 --- a/tests/stream_end.rs +++ b/tests/stream_end.rs @@ -80,7 +80,9 @@ async fn multi_packet_emits_end_frame( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor @@ -121,7 +123,9 @@ async fn multi_packet_respects_no_terminator( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor @@ -150,7 +154,9 @@ async fn multi_packet_empty_channel_emits_end( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor @@ -188,7 +194,9 @@ async fn multi_packet_empty_channel_no_terminator_emits_nothing( shutdown, hooks, ); - actor.set_multi_packet(Some(rx)); + actor + .set_multi_packet(Some(rx)) + .map_err(|e| std::io::Error::other(format!("set_multi_packet: {e}")))?; let mut out = Vec::new(); actor