From f15eaf44ae78d637e4ee6871c67dc1832a35fab1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 25 Jan 2026 14:21:53 +0000 Subject: [PATCH 1/5] Decompose connection module into domain-focused submodules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract ActorState, MultiPacketContext, and Event types into separate submodules to improve code organisation and maintainability. The module structure now follows the established pattern used by app/, fragment/, and push/ modules. Extracted components: - state.rs: RunState enum and ActorState lifecycle management - multi_packet.rs: MultiPacketStamp, MultiPacketTerminationReason, and MultiPacketContext for correlation stamping and channel handling - event.rs: Event enum for the actor select loop The ConnectionActor implementation remains in mod.rs as the core public API. All existing tests pass and the public API is unchanged. closes #404 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/connection/event.rs | 22 +++ src/{connection.rs => connection/mod.rs} | 184 +---------------------- src/connection/multi_packet.rs | 92 ++++++++++++ src/connection/state.rs | 75 +++++++++ src/connection/test_support.rs | 6 +- 5 files changed, 199 insertions(+), 180 deletions(-) create mode 100644 src/connection/event.rs rename src/{connection.rs => connection/mod.rs} (83%) create mode 100644 src/connection/multi_packet.rs create mode 100644 src/connection/state.rs diff --git a/src/connection/event.rs b/src/connection/event.rs new file mode 100644 index 00000000..70f537d0 --- /dev/null +++ b/src/connection/event.rs @@ -0,0 +1,22 @@ +//! Internal event types for the connection actor select loop. + +use crate::response::WireframeError; + +/// Events returned by [`ConnectionActor::next_event`][super::ConnectionActor::next_event]. +/// +/// Only `Debug` is derived because `WireframeError` does not implement +/// `Clone` or `PartialEq`. +#[derive(Debug)] +pub(super) enum Event { + Shutdown, + High(Option), + Low(Option), + /// Frames drained from the multi-packet response channel. + /// Frames are forwarded in channel order after low-priority queues to + /// preserve fairness and reuse the existing back-pressure. + /// The actor emits the protocol terminator when the sender closes the + /// channel so downstream observers see end-of-stream signalling. + MultiPacket(Option), + Response(Option>>), + Idle, +} diff --git a/src/connection.rs b/src/connection/mod.rs similarity index 83% rename from src/connection.rs rename to src/connection/mod.rs index 08ab7276..4caa2fd1 100644 --- a/src/connection.rs +++ b/src/connection/mod.rs @@ -5,8 +5,11 @@ //! `biased` keyword ensures high-priority messages are processed before //! low-priority ones, with streamed responses handled last. +mod event; +mod multi_packet; +mod state; + use std::{ - fmt, future::Future, net::SocketAddr, sync::{ @@ -15,8 +18,11 @@ use std::{ }, }; +use event::Event; use futures::StreamExt; use log::{info, warn}; +use multi_packet::{MultiPacketContext, MultiPacketStamp, MultiPacketTerminationReason}; +use state::ActorState; use tokio::{ sync::mpsc::{self, error::TryRecvError}, time::Duration, @@ -60,25 +66,6 @@ use crate::{ session::ConnectionId, }; -/// Events returned by [`next_event`]. -/// -/// Only `Debug` is derived because `WireframeError` does not implement -/// `Clone` or `PartialEq`. -#[derive(Debug)] -enum Event { - Shutdown, - High(Option), - Low(Option), - /// Frames drained from the multi-packet response channel. - /// Frames are forwarded in channel order after low-priority queues to - /// preserve fairness and reuse the existing back-pressure. - /// The actor emits the protocol terminator when the sender closes the - /// channel so downstream observers see end-of-stream signalling. - MultiPacket(Option), - Response(Option>>), - Idle, -} - /// Configuration controlling fairness when draining push queues. #[derive(Clone, Copy, Debug)] pub struct FairnessConfig { @@ -158,91 +145,6 @@ struct DrainContext<'a, F> { state: &'a mut ActorState, } -/// Multi-packet correlation stamping state. -/// -/// Tracks the active receiver and how frames should be stamped before emission. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum MultiPacketStamp { - /// Stamping is disabled because no multi-packet channel is active. - Disabled, - /// Stamping is enabled and frames are stamped with the provided identifier. - Enabled(Option), -} - -/// Reasons why a multi-packet stream closed. -/// -/// The reason informs logging severity so operators can distinguish between -/// natural completion and abrupt termination. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum MultiPacketTerminationReason { - /// The sender dropped the channel after producing all frames. - Drained, - /// The sender was dropped without yielding an explicit end-of-stream. - Disconnected, - /// Shutdown cancelled the stream before it completed. - Shutdown, -} - -impl MultiPacketTerminationReason { - const fn as_str(self) -> &'static str { - match self { - Self::Drained => "drained", - Self::Disconnected => "disconnected", - Self::Shutdown => "shutdown", - } - } -} - -impl fmt::Display for MultiPacketTerminationReason { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(self.as_str()) } -} - -/// Multi-packet channel state tracking the active receiver and stamping config. -struct MultiPacketContext { - channel: Option>, - stamp: MultiPacketStamp, -} - -impl MultiPacketContext { - const fn new() -> Self { - Self { - channel: None, - stamp: MultiPacketStamp::Disabled, - } - } - - fn install(&mut self, channel: Option>, stamp: MultiPacketStamp) { - debug_assert_eq!( - channel.is_some(), - matches!(stamp, MultiPacketStamp::Enabled(_)), - "channel presence must match stamp: channel is Some iff stamp is \ - MultiPacketStamp::Enabled(...)", - ); - self.channel = channel; - self.stamp = stamp; - } - - fn clear(&mut self) { - self.channel = None; - self.stamp = MultiPacketStamp::Disabled; - } - - fn channel_mut(&mut self) -> Option<&mut mpsc::Receiver> { self.channel.as_mut() } - - fn take_channel(&mut self) -> Option> { self.channel.take() } - - fn stamp(&self) -> MultiPacketStamp { self.stamp } - - fn correlation_id(&self) -> Option { - match self.stamp { - MultiPacketStamp::Enabled(value) => value, - MultiPacketStamp::Disabled => None, - } - } - - fn is_active(&self) -> bool { self.channel.is_some() } -} - /// Queue variants processed by the connection actor. #[derive(Clone, Copy)] enum QueueKind { @@ -916,77 +818,5 @@ where } } -/// Internal run state for the connection actor. -enum RunState { - /// All sources are open and frames are still being processed. - Active, - /// A shutdown request has been observed and queues are being closed. - ShuttingDown, - /// All sources have completed and the actor can exit. - Finished, -} - -/// Tracks progress through the actor lifecycle. -struct ActorState { - run_state: RunState, - closed_sources: usize, - total_sources: usize, -} - -impl ActorState { - /// Create a new `ActorState`. - /// - /// `has_response` indicates whether a streaming response is currently - /// attached. - /// `has_multi_packet` signals that a channel-backed response is active. - /// - /// # Examples - /// - /// ```ignore - /// use wireframe::connection::ActorState; - /// - /// let state = ActorState::new(true, false); - /// assert!(state.is_active()); - /// ``` - fn new(has_response: bool, has_multi_packet: bool) -> Self { - Self { - run_state: RunState::Active, - // The shutdown token is considered closed until cancellation - // occurs, matching previous behaviour where draining sources - // without explicit shutdown terminates the actor. - closed_sources: 1, - // total_sources counts all sources that keep the actor alive: - // - 3 for the baseline sources (main loop, shutdown token, and queue drains) - // - +1 if a streaming response is active (has_response) - // - +1 if multi-packet handling is enabled (has_multi_packet) - total_sources: 3 + usize::from(has_response) + usize::from(has_multi_packet), - } - } - - /// Mark a source as closed and update the run state if all are closed. - fn mark_closed(&mut self) { - self.closed_sources += 1; - if self.closed_sources >= self.total_sources { - self.run_state = RunState::Finished; - } - } - - /// Transition to `ShuttingDown` if currently active. - fn start_shutdown(&mut self) { - if matches!(self.run_state, RunState::Active) { - self.run_state = RunState::ShuttingDown; - } - } - - /// Returns `true` while the actor is actively processing sources. - fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } - - /// Returns `true` once shutdown has begun. - fn is_shutting_down(&self) -> bool { matches!(self.run_state, RunState::ShuttingDown) } - - /// Returns `true` when all sources have finished. - fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } -} - #[cfg(not(loom))] pub mod test_support; diff --git a/src/connection/multi_packet.rs b/src/connection/multi_packet.rs new file mode 100644 index 00000000..356b282a --- /dev/null +++ b/src/connection/multi_packet.rs @@ -0,0 +1,92 @@ +//! Multi-packet channel state and correlation stamping. + +use std::fmt; + +use tokio::sync::mpsc; + +/// Multi-packet correlation stamping state. +/// +/// Tracks the active receiver and how frames should be stamped before emission. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum MultiPacketStamp { + /// Stamping is disabled because no multi-packet channel is active. + Disabled, + /// Stamping is enabled and frames are stamped with the provided identifier. + Enabled(Option), +} + +/// Reasons why a multi-packet stream closed. +/// +/// The reason informs logging severity so operators can distinguish between +/// natural completion and abrupt termination. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum MultiPacketTerminationReason { + /// The sender dropped the channel after producing all frames. + Drained, + /// The sender was dropped without yielding an explicit end-of-stream. + Disconnected, + /// Shutdown cancelled the stream before it completed. + Shutdown, +} + +impl MultiPacketTerminationReason { + pub(super) const fn as_str(self) -> &'static str { + match self { + Self::Drained => "drained", + Self::Disconnected => "disconnected", + Self::Shutdown => "shutdown", + } + } +} + +impl fmt::Display for MultiPacketTerminationReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(self.as_str()) } +} + +/// Multi-packet channel state tracking the active receiver and stamping config. +pub(super) struct MultiPacketContext { + channel: Option>, + stamp: MultiPacketStamp, +} + +impl MultiPacketContext { + pub(super) const fn new() -> Self { + Self { + channel: None, + stamp: MultiPacketStamp::Disabled, + } + } + + pub(super) fn install(&mut self, channel: Option>, stamp: MultiPacketStamp) { + debug_assert_eq!( + channel.is_some(), + matches!(stamp, MultiPacketStamp::Enabled(_)), + concat!( + "channel presence must match stamp: channel is Some iff stamp is ", + "MultiPacketStamp::Enabled(...)" + ), + ); + self.channel = channel; + self.stamp = stamp; + } + + pub(super) fn clear(&mut self) { + self.channel = None; + self.stamp = MultiPacketStamp::Disabled; + } + + pub(super) fn channel_mut(&mut self) -> Option<&mut mpsc::Receiver> { self.channel.as_mut() } + + pub(super) fn take_channel(&mut self) -> Option> { self.channel.take() } + + pub(super) fn stamp(&self) -> MultiPacketStamp { self.stamp } + + pub(super) fn correlation_id(&self) -> Option { + match self.stamp { + MultiPacketStamp::Enabled(value) => value, + MultiPacketStamp::Disabled => None, + } + } + + pub(super) fn is_active(&self) -> bool { self.channel.is_some() } +} diff --git a/src/connection/state.rs b/src/connection/state.rs new file mode 100644 index 00000000..6e6791e6 --- /dev/null +++ b/src/connection/state.rs @@ -0,0 +1,75 @@ +//! Actor lifecycle state management. + +/// Internal run state for the connection actor. +pub(super) enum RunState { + /// All sources are open and frames are still being processed. + Active, + /// A shutdown request has been observed and queues are being closed. + ShuttingDown, + /// All sources have completed and the actor can exit. + Finished, +} + +/// Tracks progress through the actor lifecycle. +pub(super) struct ActorState { + run_state: RunState, + pub(super) closed_sources: usize, + pub(super) total_sources: usize, +} + +impl ActorState { + /// Create a new `ActorState`. + /// + /// `has_response` indicates whether a streaming response is currently + /// attached. + /// `has_multi_packet` signals that a channel-backed response is active. + /// + /// # Examples + /// + /// ```ignore + /// use wireframe::connection::ActorState; + /// + /// let state = ActorState::new(true, false); + /// assert!(state.is_active()); + /// ``` + pub(super) fn new(has_response: bool, has_multi_packet: bool) -> Self { + Self { + run_state: RunState::Active, + // The shutdown token is considered closed until cancellation + // occurs, matching previous behaviour where draining sources + // without explicit shutdown terminates the actor. + closed_sources: 1, + // total_sources counts all sources that keep the actor alive: + // - 3 for the baseline sources (main loop, shutdown token, and queue drains) + // - +1 if a streaming response is active (has_response) + // - +1 if multi-packet handling is enabled (has_multi_packet) + total_sources: 3 + usize::from(has_response) + usize::from(has_multi_packet), + } + } + + /// Mark a source as closed and update the run state if all are closed. + pub(super) fn mark_closed(&mut self) { + self.closed_sources += 1; + if self.closed_sources >= self.total_sources { + self.run_state = RunState::Finished; + } + } + + /// Transition to `ShuttingDown` if currently active. + pub(super) fn start_shutdown(&mut self) { + if matches!(self.run_state, RunState::Active) { + self.run_state = RunState::ShuttingDown; + } + } + + /// Returns `true` while the actor is actively processing sources. + pub(super) fn is_active(&self) -> bool { matches!(self.run_state, RunState::Active) } + + /// Returns `true` once shutdown has begun. + pub(super) fn is_shutting_down(&self) -> bool { + matches!(self.run_state, RunState::ShuttingDown) + } + + /// Returns `true` when all sources have finished. + pub(super) fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } +} diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index 006c671f..bf2f7165 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -7,16 +7,16 @@ use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use super::{ - ActorState, ConnectionActor, ConnectionChannels, DrainContext, - MultiPacketTerminationReason, - ProtocolHooks, QueueKind, + multi_packet::MultiPacketTerminationReason, + state::ActorState, }; use crate::{ app::{Packet, PacketParts}, + hooks::ProtocolHooks, push::{PushConfigError, PushQueues}, }; From b220481b6e3cdbf359bd2cc68984bab3f6cb6def Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 28 Jan 2026 13:57:56 +0000 Subject: [PATCH 2/5] Tighten encapsulation and reduce module size per code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address code review feedback from PR #403: - Make ActorState fields private, exposing closed_sources() and total_sources() accessor methods to preserve lifecycle invariants - Make RunState private to state.rs module since it's only consumed within ActorState - Make MultiPacketStamp private to multi_packet.rs, adding is_stamping_enabled() method to avoid leaking implementation details - Extract helper functions into domain-focused submodules to bring mod.rs below the 400-line cap: - drain.rs: Queue drain operations and fairness-aware helpers - frame.rs: Frame processing and correlation stamping - response.rs: Streaming response handling - shutdown.rs: Shutdown and multi-packet closure handling - polling.rs: Async polling utilities - dispatch.rs: Event dispatching Module layout after refactoring: - mod.rs: 399 lines (core actor struct and public API) - drain.rs: 189 lines (queue processing) - frame.rs: 88 lines (frame emission) - response.rs: 77 lines (streaming response) - shutdown.rs: 83 lines (shutdown handling) - polling.rs: 70 lines (async helpers) - dispatch.rs: 34 lines (event dispatch) All existing tests pass and the public API is unchanged. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/connection/dispatch.rs | 34 +++ src/connection/drain.rs | 189 ++++++++++++++ src/connection/frame.rs | 88 +++++++ src/connection/mod.rs | 447 +-------------------------------- src/connection/multi_packet.rs | 30 ++- src/connection/polling.rs | 70 ++++++ src/connection/response.rs | 77 ++++++ src/connection/shutdown.rs | 83 ++++++ src/connection/state.rs | 12 +- src/connection/test_support.rs | 8 +- 10 files changed, 585 insertions(+), 453 deletions(-) create mode 100644 src/connection/dispatch.rs create mode 100644 src/connection/drain.rs create mode 100644 src/connection/frame.rs create mode 100644 src/connection/polling.rs create mode 100644 src/connection/response.rs create mode 100644 src/connection/shutdown.rs diff --git a/src/connection/dispatch.rs b/src/connection/dispatch.rs new file mode 100644 index 00000000..d63e1bb1 --- /dev/null +++ b/src/connection/dispatch.rs @@ -0,0 +1,34 @@ +//! Event dispatching for the connection actor. + +use super::{ConnectionActor, event::Event, state::ActorState}; +use crate::{ + app::Packet, + correlation::CorrelatableFrame, + push::FrameLike, + response::WireframeError, +}; + +impl ConnectionActor +where + F: FrameLike + CorrelatableFrame + Packet, + E: std::fmt::Debug, +{ + /// Dispatch the given event to the appropriate handler. + pub(super) fn dispatch_event( + &mut self, + event: Event, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + match event { + Event::Shutdown => self.process_shutdown(state), + Event::High(res) => self.process_high(res, state, out), + Event::Low(res) => self.process_low(res, state, out), + Event::MultiPacket(res) => self.process_multi_packet(res, state, out), + Event::Response(res) => self.process_response(res, state, out)?, + Event::Idle => {} + } + + Ok(()) + } +} diff --git a/src/connection/drain.rs b/src/connection/drain.rs new file mode 100644 index 00000000..1d7b2999 --- /dev/null +++ b/src/connection/drain.rs @@ -0,0 +1,189 @@ +//! Queue drain operations and fairness-aware helpers. + +use tokio::sync::mpsc::error::TryRecvError; + +use super::{ + ConnectionActor, + DrainContext, + QueueKind, + multi_packet::MultiPacketTerminationReason, + state::ActorState, +}; +use crate::{app::Packet, correlation::CorrelatableFrame, push::FrameLike}; + +impl ConnectionActor +where + F: FrameLike + CorrelatableFrame + Packet, + E: std::fmt::Debug, +{ + /// Handle the result of polling the high-priority queue. + pub(super) fn process_high( + &mut self, + res: Option, + state: &mut ActorState, + out: &mut Vec, + ) { + self.process_queue(QueueKind::High, res, DrainContext { out, state }); + } + + /// Process a queue-backed source with shared low-priority semantics. + pub(super) fn process_queue( + &mut self, + kind: QueueKind, + res: Option, + ctx: DrainContext<'_, F>, + ) { + let DrainContext { out, state } = ctx; + match res { + Some(frame) => { + match kind { + QueueKind::Multi if self.multi_packet.is_stamping_enabled() => { + self.emit_multi_packet_frame(frame, out); + } + _ => { + self.process_frame_with_hooks_and_metrics(frame, out); + } + } + match kind { + QueueKind::High => self.after_high(out, state), + QueueKind::Low | QueueKind::Multi => self.after_low(), + } + } + None => match kind { + QueueKind::High => { + Self::handle_closed_receiver(&mut self.high_rx, state); + self.fairness.reset(); + } + QueueKind::Low => { + Self::handle_closed_receiver(&mut self.low_rx, state); + } + QueueKind::Multi => { + self.handle_multi_packet_closed( + MultiPacketTerminationReason::Drained, + state, + out, + ); + } + }, + } + } + + /// Handle the result of polling the low-priority queue. + pub(super) fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { + self.process_queue(QueueKind::Low, res, DrainContext { out, state }); + } + + /// Handle frames drained from the multi-packet channel. + pub(super) fn process_multi_packet( + &mut self, + res: Option, + state: &mut ActorState, + out: &mut Vec, + ) { + self.process_queue(QueueKind::Multi, res, DrainContext { out, state }); + } + + /// Update counters and opportunistically drain the low-priority and multi-packet queues. + pub(super) fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { + self.fairness.record_high_priority(); + + if !self.fairness.should_yield_to_low_priority() { + return; + } + + if self.try_opportunistic_drain( + QueueKind::Low, + DrainContext { + out: &mut *out, + state: &mut *state, + }, + ) { + return; + } + + let _ = self.try_opportunistic_drain( + QueueKind::Multi, + DrainContext { + out: &mut *out, + state: &mut *state, + }, + ); + } + + /// Try to opportunistically drain a queue-backed source when fairness allows. + /// + /// Returns `true` when a frame is forwarded to `out`. + pub(super) fn try_opportunistic_drain( + &mut self, + kind: QueueKind, + ctx: DrainContext<'_, F>, + ) -> bool { + let DrainContext { out, state } = ctx; + match kind { + QueueKind::High => { + debug_assert!( + false, + concat!( + "try_opportunistic_drain(High) is unsupported; High is handled by biased ", + "polling" + ) + ); + false + } + QueueKind::Low => { + let res = match self.low_rx.as_mut() { + Some(receiver) => receiver.try_recv(), + None => return false, + }; + + match res { + Ok(frame) => { + self.process_frame_with_hooks_and_metrics(frame, out); + self.after_low(); + true + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Disconnected) => { + Self::handle_closed_receiver(&mut self.low_rx, state); + false + } + } + } + QueueKind::Multi => { + let result = match self.multi_packet.channel_mut() { + Some(rx) => rx.try_recv(), + None => return false, + }; + + match result { + Ok(frame) => { + self.emit_multi_packet_frame(frame, out); + self.after_low(); + true + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Disconnected) => { + self.handle_multi_packet_closed( + MultiPacketTerminationReason::Disconnected, + state, + out, + ); + false + } + } + } + } + } + + /// Reset counters after processing a low-priority frame. + pub(super) fn after_low(&mut self) { self.fairness.reset(); } + + /// Common logic for handling closed receivers. + pub(super) fn handle_closed_receiver( + receiver: &mut Option>, + state: &mut ActorState, + ) { + *receiver = None; + state.mark_closed(); + } +} diff --git a/src/connection/frame.rs b/src/connection/frame.rs new file mode 100644 index 00000000..cbd1774f --- /dev/null +++ b/src/connection/frame.rs @@ -0,0 +1,88 @@ +//! Frame processing and emission helpers. + +use log::warn; + +use super::ConnectionActor; +use crate::{ + app::{Packet, fragment_utils::fragment_packet}, + correlation::CorrelatableFrame, + push::FrameLike, +}; + +impl ConnectionActor +where + F: FrameLike + CorrelatableFrame + Packet, + E: std::fmt::Debug, +{ + /// Emit a multi-packet frame with correlation stamping applied. + pub(super) fn emit_multi_packet_frame(&mut self, frame: F, out: &mut Vec) { + let mut frame = frame; + self.apply_multi_packet_correlation(&mut frame); + self.process_frame_with_hooks_and_metrics(frame, out); + } + + /// Apply correlation stamping to a multi-packet frame. + pub(super) fn apply_multi_packet_correlation(&mut self, frame: &mut F) { + if !self.multi_packet.is_stamping_enabled() { + // No channel is active, so there is nothing to stamp. + return; + } + + let correlation_id = self.multi_packet.correlation_id(); + frame.set_correlation_id(correlation_id); + + if let Some(expected) = correlation_id { + debug_assert!( + CorrelatableFrame::correlation_id(frame) == Some(expected) + || CorrelatableFrame::correlation_id(frame).is_none(), + "multi-packet frame correlation mismatch: expected={:?}, got={:?}", + Some(expected), + CorrelatableFrame::correlation_id(frame), + ); + } else { + debug_assert!( + CorrelatableFrame::correlation_id(frame).is_none(), + "multi-packet frame correlation unexpectedly present: got={:?}", + CorrelatableFrame::correlation_id(frame), + ); + } + } + + /// Apply protocol hooks and increment metrics before emitting a frame. + /// + /// # Examples + /// + /// ```ignore + /// actor.process_frame_with_hooks_and_metrics(frame, &mut out); + /// ``` + pub(super) fn process_frame_with_hooks_and_metrics(&mut self, frame: F, out: &mut Vec) + where + F: Packet, + { + if let Some(fragmenter) = self.fragmenter.as_deref() { + let fragmented = fragment_packet(fragmenter, frame); + match fragmented { + Ok(frames) => frames + .into_iter() + .for_each(|frame| self.push_frame(frame, out)), + Err(err) => { + warn!( + "failed to fragment frame: connection_id={:?}, peer={:?}, error={err:?}", + self.connection_id, self.peer_addr, + ); + crate::metrics::inc_handler_errors(); + } + } + } else { + self.push_frame(frame, out); + } + } + + /// Push a single frame to output after applying hooks and metrics. + pub(super) fn push_frame(&mut self, frame: F, out: &mut Vec) { + let mut frame = frame; + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + crate::metrics::inc_frames(crate::metrics::Direction::Outbound); + } +} diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 4caa2fd1..d7977163 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -5,12 +5,17 @@ //! `biased` keyword ensures high-priority messages are processed before //! low-priority ones, with streamed responses handled last. +mod dispatch; +mod drain; mod event; +mod frame; mod multi_packet; +mod polling; +mod response; +mod shutdown; mod state; use std::{ - future::Future, net::SocketAddr, sync::{ Arc, @@ -19,14 +24,10 @@ use std::{ }; use event::Event; -use futures::StreamExt; -use log::{info, warn}; -use multi_packet::{MultiPacketContext, MultiPacketStamp, MultiPacketTerminationReason}; +use log::info; +use multi_packet::MultiPacketContext; use state::ActorState; -use tokio::{ - sync::mpsc::{self, error::TryRecvError}, - time::Duration, -}; +use tokio::{sync::mpsc, time::Duration}; use tokio_util::sync::CancellationToken; /// Global gauge tracking active connections. @@ -56,7 +57,7 @@ impl Drop for ActiveConnection { pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ - app::{Packet, fragment_utils::fragment_packet}, + app::Packet, correlation::CorrelatableFrame, fairness::FairnessTracker, fragment::{FragmentationConfig, Fragmenter}, @@ -261,12 +262,7 @@ where "response stream is active" ), ); - let stamp = if channel.is_some() { - MultiPacketStamp::Enabled(None) - } else { - MultiPacketStamp::Disabled - }; - self.multi_packet.install(channel, stamp); + self.multi_packet.install(channel, None); } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. @@ -299,42 +295,11 @@ where "response stream is active" ), ); - let stamp = if channel.is_some() { - MultiPacketStamp::Enabled(correlation_id) - } else { - MultiPacketStamp::Disabled - }; - self.multi_packet.install(channel, stamp); + self.multi_packet.install(channel, correlation_id); } fn clear_multi_packet(&mut self) { self.multi_packet.clear(); } - fn apply_multi_packet_correlation(&mut self, frame: &mut F) { - match self.multi_packet.stamp() { - MultiPacketStamp::Enabled(Some(expected)) => { - frame.set_correlation_id(Some(expected)); - debug_assert!( - CorrelatableFrame::correlation_id(frame) == Some(expected) - || CorrelatableFrame::correlation_id(frame).is_none(), - "multi-packet frame correlation mismatch: expected={:?}, got={:?}", - Some(expected), - CorrelatableFrame::correlation_id(frame), - ); - } - MultiPacketStamp::Enabled(None) => { - frame.set_correlation_id(None); - debug_assert!( - CorrelatableFrame::correlation_id(frame).is_none(), - "multi-packet frame correlation unexpectedly present: got={:?}", - CorrelatableFrame::correlation_id(frame), - ); - } - MultiPacketStamp::Disabled => { - // No channel is active, so there is nothing to stamp. - } - } - } - /// Replace the low-priority queue used for tests. pub fn set_low_queue(&mut self, queue: Option>) { self.low_rx = queue; } @@ -428,394 +393,6 @@ where let event = self.next_event(state).await; self.dispatch_event(event, state, out) } - - /// Dispatch the given event to the appropriate handler. - fn dispatch_event( - &mut self, - event: Event, - state: &mut ActorState, - out: &mut Vec, - ) -> Result<(), WireframeError> { - match event { - Event::Shutdown => self.process_shutdown(state), - Event::High(res) => self.process_high(res, state, out), - Event::Low(res) => self.process_low(res, state, out), - Event::MultiPacket(res) => self.process_multi_packet(res, state, out), - Event::Response(res) => self.process_response(res, state, out)?, - Event::Idle => {} - } - - Ok(()) - } - - /// Begin shutdown once cancellation has been observed. - fn process_shutdown(&mut self, state: &mut ActorState) { - state.start_shutdown(); - self.start_shutdown(state); - } - - /// Handle the result of polling the high-priority queue. - fn process_high(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - self.process_queue(QueueKind::High, res, DrainContext { out, state }); - } - - /// Process a queue-backed source with shared low-priority semantics. - fn process_queue(&mut self, kind: QueueKind, res: Option, ctx: DrainContext<'_, F>) { - let DrainContext { out, state } = ctx; - match res { - Some(frame) => { - match kind { - QueueKind::Multi - if matches!(self.multi_packet.stamp(), MultiPacketStamp::Enabled(_)) => - { - self.emit_multi_packet_frame(frame, out); - } - _ => { - self.process_frame_with_hooks_and_metrics(frame, out); - } - } - match kind { - QueueKind::High => self.after_high(out, state), - QueueKind::Low | QueueKind::Multi => self.after_low(), - } - } - None => match kind { - QueueKind::High => { - Self::handle_closed_receiver(&mut self.high_rx, state); - self.fairness.reset(); - } - QueueKind::Low => { - Self::handle_closed_receiver(&mut self.low_rx, state); - } - QueueKind::Multi => { - self.handle_multi_packet_closed( - MultiPacketTerminationReason::Drained, - state, - out, - ); - } - }, - } - } - - /// Handle the result of polling the low-priority queue. - fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - self.process_queue(QueueKind::Low, res, DrainContext { out, state }); - } - - /// Handle frames drained from the multi-packet channel. - fn process_multi_packet(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { - self.process_queue(QueueKind::Multi, res, DrainContext { out, state }); - } - - fn emit_multi_packet_frame(&mut self, frame: F, out: &mut Vec) { - let mut frame = frame; - self.apply_multi_packet_correlation(&mut frame); - self.process_frame_with_hooks_and_metrics(frame, out); - } - - /// Handle a closed multi-packet channel by emitting the protocol terminator - /// and notifying hooks. - fn handle_multi_packet_closed( - &mut self, - reason: MultiPacketTerminationReason, - state: &mut ActorState, - out: &mut Vec, - ) { - let correlation = self.multi_packet.correlation_id(); - self.log_multi_packet_closure(reason, correlation); - if let Some(mut receiver) = self.multi_packet.take_channel() { - receiver.close(); - } - state.mark_closed(); - if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { - self.emit_multi_packet_frame(frame, out); - self.after_low(); - } - self.clear_multi_packet(); - self.hooks.on_command_end(&mut self.ctx); - } - - fn log_multi_packet_closure( - &self, - reason: MultiPacketTerminationReason, - correlation_id: Option, - ) { - let message = "multi-packet stream closed"; - match reason { - MultiPacketTerminationReason::Disconnected => warn!( - "{message}: reason={}, connection_id={:?}, peer={:?}, correlation_id={:?}", - reason, self.connection_id, self.peer_addr, correlation_id, - ), - _ => info!( - "{message}: reason={}, connection_id={:?}, peer={:?}, correlation_id={:?}", - reason, self.connection_id, self.peer_addr, correlation_id, - ), - } - } - - /// Apply protocol hooks and increment metrics before emitting a frame. - /// - /// # Examples - /// - /// ```ignore - /// actor.process_frame_with_hooks_and_metrics(frame, &mut out); - /// ``` - fn process_frame_with_hooks_and_metrics(&mut self, frame: F, out: &mut Vec) - where - F: Packet, - { - if let Some(fragmenter) = self.fragmenter.as_deref() { - let fragmented = fragment_packet(fragmenter, frame); - match fragmented { - Ok(frames) => frames - .into_iter() - .for_each(|frame| self.push_frame(frame, out)), - Err(err) => { - warn!( - "failed to fragment frame: connection_id={:?}, peer={:?}, error={err:?}", - self.connection_id, self.peer_addr, - ); - crate::metrics::inc_handler_errors(); - } - } - } else { - self.push_frame(frame, out); - } - } - - fn push_frame(&mut self, frame: F, out: &mut Vec) { - let mut frame = frame; - self.hooks.before_send(&mut frame, &mut self.ctx); - out.push(frame); - crate::metrics::inc_frames(crate::metrics::Direction::Outbound); - } - - /// Common logic for handling closed receivers. - fn handle_closed_receiver(receiver: &mut Option>, state: &mut ActorState) { - *receiver = None; - state.mark_closed(); - } - - /// Handle the next frame or error from the streaming response. - fn process_response( - &mut self, - res: Option>>, - state: &mut ActorState, - out: &mut Vec, - ) -> Result<(), WireframeError> { - let is_none = res.is_none(); - let produced = self.handle_response(res, state, out)?; - if produced { - self.after_low(); - } - if is_none { - self.response = None; - } - Ok(()) - } - - /// Close all receivers and mark streaming sources as closed if present. - fn start_shutdown(&mut self, state: &mut ActorState) { - if let Some(rx) = &mut self.high_rx { - rx.close(); - } - if let Some(rx) = &mut self.low_rx { - rx.close(); - } - if self.multi_packet.is_active() { - let correlation = self.multi_packet.correlation_id(); - self.log_multi_packet_closure(MultiPacketTerminationReason::Shutdown, correlation); - if let Some(mut rx) = self.multi_packet.take_channel() { - rx.close(); - state.mark_closed(); - } - self.clear_multi_packet(); - self.hooks.on_command_end(&mut self.ctx); - } - - if self.response.take().is_some() { - state.mark_closed(); - } - } - - /// Update counters and opportunistically drain the low-priority and multi-packet queues. - fn after_high(&mut self, out: &mut Vec, state: &mut ActorState) { - self.fairness.record_high_priority(); - - if !self.fairness.should_yield_to_low_priority() { - return; - } - - if self.try_opportunistic_drain( - QueueKind::Low, - DrainContext { - out: &mut *out, - state: &mut *state, - }, - ) { - return; - } - - let _ = self.try_opportunistic_drain( - QueueKind::Multi, - DrainContext { - out: &mut *out, - state: &mut *state, - }, - ); - } - - /// Try to opportunistically drain a queue-backed source when fairness allows. - /// - /// Returns `true` when a frame is forwarded to `out`. - fn try_opportunistic_drain(&mut self, kind: QueueKind, ctx: DrainContext<'_, F>) -> bool { - let DrainContext { out, state } = ctx; - match kind { - QueueKind::High => { - debug_assert!( - false, - "try_opportunistic_drain(High) is unsupported; High is handled by biased \ - polling" - ); - false - } - QueueKind::Low => { - let res = match self.low_rx.as_mut() { - Some(receiver) => receiver.try_recv(), - None => return false, - }; - - match res { - Ok(frame) => { - self.process_frame_with_hooks_and_metrics(frame, out); - self.after_low(); - true - } - Err(TryRecvError::Empty) => false, - Err(TryRecvError::Disconnected) => { - Self::handle_closed_receiver(&mut self.low_rx, state); - false - } - } - } - QueueKind::Multi => { - let result = match self.multi_packet.channel_mut() { - Some(rx) => rx.try_recv(), - None => return false, - }; - - match result { - Ok(frame) => { - self.emit_multi_packet_frame(frame, out); - self.after_low(); - true - } - Err(TryRecvError::Empty) => false, - Err(TryRecvError::Disconnected) => { - self.handle_multi_packet_closed( - MultiPacketTerminationReason::Disconnected, - state, - out, - ); - false - } - } - } - } - } - - /// Reset counters after processing a low-priority frame. - fn after_low(&mut self) { self.fairness.reset(); } - - /// Push a frame from the response stream into `out` or handle completion. - /// - /// Protocol errors are passed to `handle_error` and do not terminate the - /// actor. I/O errors propagate to the caller. - /// - /// Returns `true` if a frame was appended to `out`. - fn handle_response( - &mut self, - res: Option>>, - state: &mut ActorState, - out: &mut Vec, - ) -> Result> { - let mut produced = false; - match res { - Some(Ok(frame)) => { - self.process_frame_with_hooks_and_metrics(frame, out); - produced = true; - } - Some(Err(WireframeError::Protocol(e))) => { - warn!("protocol error: error={e:?}"); - self.hooks.handle_error(e, &mut self.ctx); - state.mark_closed(); - // Stop polling the response after a protocol error to avoid - // double-closing and duplicate `on_command_end` signalling. - self.response = None; - self.hooks.on_command_end(&mut self.ctx); - crate::metrics::inc_handler_errors(); - } - Some(Err(e)) => return Err(e), - None => { - state.mark_closed(); - if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { - self.process_frame_with_hooks_and_metrics(frame, out); - produced = true; - } - self.hooks.on_command_end(&mut self.ctx); - } - } - - Ok(produced) - } - - /// Await cancellation on the provided shutdown token. - #[inline] - async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } - - /// Receive the next frame from a push queue. - #[inline] - async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } - - /// Poll `f` if `opt` is `Some`, returning `None` otherwise. - #[expect( - clippy::manual_async_fn, - reason = "Generic lifetime requires explicit async move" - )] - fn poll_optional<'a, T, Fut, R>( - opt: Option<&'a mut T>, - f: impl FnOnce(&'a mut T) -> Fut + Send + 'a, - ) -> impl Future> + Send + 'a - where - T: Send + 'a, - Fut: Future> + Send + 'a, - { - async move { - if let Some(value) = opt { - f(value).await - } else { - None - } - } - } - - /// Await shutdown cancellation on the provided token. - async fn await_shutdown(token: CancellationToken) { Self::wait_shutdown(token).await; } - - /// Poll whichever receiver is provided, returning `None` when absent. - /// - /// Multi-packet channels reuse this helper so they share back-pressure with queued frames. - async fn poll_queue(rx: Option<&mut mpsc::Receiver>) -> Option { - Self::poll_optional(rx, Self::recv_push).await - } - - /// Poll the streaming response. - async fn poll_response( - resp: Option<&mut FrameStream>, - ) -> Option>> { - Self::poll_optional(resp, |s| s.next()).await - } } #[cfg(not(loom))] diff --git a/src/connection/multi_packet.rs b/src/connection/multi_packet.rs index 356b282a..b6b4ab15 100644 --- a/src/connection/multi_packet.rs +++ b/src/connection/multi_packet.rs @@ -8,7 +8,7 @@ use tokio::sync::mpsc; /// /// Tracks the active receiver and how frames should be stamped before emission. #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(super) enum MultiPacketStamp { +enum MultiPacketStamp { /// Stamping is disabled because no multi-packet channel is active. Disabled, /// Stamping is enabled and frames are stamped with the provided identifier. @@ -57,15 +57,20 @@ impl MultiPacketContext { } } - pub(super) fn install(&mut self, channel: Option>, stamp: MultiPacketStamp) { - debug_assert_eq!( - channel.is_some(), - matches!(stamp, MultiPacketStamp::Enabled(_)), - concat!( - "channel presence must match stamp: channel is Some iff stamp is ", - "MultiPacketStamp::Enabled(...)" - ), - ); + /// Install a multi-packet channel with an optional correlation identifier. + /// + /// When `channel` is `Some`, stamping is enabled with the provided `correlation_id`. + /// When `channel` is `None`, stamping is disabled. + pub(super) fn install( + &mut self, + channel: Option>, + correlation_id: Option, + ) { + let stamp = if channel.is_some() { + MultiPacketStamp::Enabled(correlation_id) + } else { + MultiPacketStamp::Disabled + }; self.channel = channel; self.stamp = stamp; } @@ -79,7 +84,10 @@ impl MultiPacketContext { pub(super) fn take_channel(&mut self) -> Option> { self.channel.take() } - pub(super) fn stamp(&self) -> MultiPacketStamp { self.stamp } + /// Returns `true` if correlation stamping is enabled. + pub(super) fn is_stamping_enabled(&self) -> bool { + matches!(self.stamp, MultiPacketStamp::Enabled(_)) + } pub(super) fn correlation_id(&self) -> Option { match self.stamp { diff --git a/src/connection/polling.rs b/src/connection/polling.rs new file mode 100644 index 00000000..6587b4e1 --- /dev/null +++ b/src/connection/polling.rs @@ -0,0 +1,70 @@ +//! Async polling utilities for the connection actor select loop. + +use std::future::Future; + +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use super::ConnectionActor; +use crate::{push::FrameLike, response::FrameStream}; + +impl ConnectionActor +where + F: FrameLike, +{ + /// Await cancellation on the provided shutdown token. + #[inline] + pub(super) async fn wait_shutdown(token: CancellationToken) { token.cancelled_owned().await; } + + /// Receive the next frame from a push queue. + #[inline] + pub(super) async fn recv_push(rx: &mut mpsc::Receiver) -> Option { rx.recv().await } + + /// Poll `f` if `opt` is `Some`, returning `None` otherwise. + #[expect( + clippy::manual_async_fn, + reason = "Generic lifetime requires explicit async move" + )] + pub(super) fn poll_optional<'a, T, Fut, R>( + opt: Option<&'a mut T>, + f: impl FnOnce(&'a mut T) -> Fut + Send + 'a, + ) -> impl Future> + Send + 'a + where + T: Send + 'a, + Fut: Future> + Send + 'a, + { + async move { + if let Some(value) = opt { + f(value).await + } else { + None + } + } + } + + /// Await shutdown cancellation on the provided token. + pub(super) async fn await_shutdown(token: CancellationToken) { + Self::wait_shutdown(token).await; + } + + /// Poll whichever receiver is provided, returning `None` when absent. + /// + /// Multi-packet channels reuse this helper so they share back-pressure with queued frames. + pub(super) async fn poll_queue(rx: Option<&mut mpsc::Receiver>) -> Option { + Self::poll_optional(rx, Self::recv_push).await + } +} + +impl ConnectionActor +where + F: FrameLike, + E: std::fmt::Debug, +{ + /// Poll the streaming response. + pub(super) async fn poll_response( + resp: Option<&mut FrameStream>, + ) -> Option>> { + use futures::StreamExt; + Self::poll_optional(resp, |s| s.next()).await + } +} diff --git a/src/connection/response.rs b/src/connection/response.rs new file mode 100644 index 00000000..c9cca750 --- /dev/null +++ b/src/connection/response.rs @@ -0,0 +1,77 @@ +//! Streaming response handling for the connection actor. + +use log::warn; + +use super::{ConnectionActor, state::ActorState}; +use crate::{ + app::Packet, + correlation::CorrelatableFrame, + push::FrameLike, + response::WireframeError, +}; + +impl ConnectionActor +where + F: FrameLike + CorrelatableFrame + Packet, + E: std::fmt::Debug, +{ + /// Handle the next frame or error from the streaming response. + pub(super) fn process_response( + &mut self, + res: Option>>, + state: &mut ActorState, + out: &mut Vec, + ) -> Result<(), WireframeError> { + let is_none = res.is_none(); + let produced = self.handle_response(res, state, out)?; + if produced { + self.after_low(); + } + if is_none { + self.response = None; + } + Ok(()) + } + + /// Push a frame from the response stream into `out` or handle completion. + /// + /// Protocol errors are passed to `handle_error` and do not terminate the + /// actor. I/O errors propagate to the caller. + /// + /// Returns `true` if a frame was appended to `out`. + pub(super) fn handle_response( + &mut self, + res: Option>>, + state: &mut ActorState, + out: &mut Vec, + ) -> Result> { + let mut produced = false; + match res { + Some(Ok(frame)) => { + self.process_frame_with_hooks_and_metrics(frame, out); + produced = true; + } + Some(Err(WireframeError::Protocol(e))) => { + warn!("protocol error: error={e:?}"); + self.hooks.handle_error(e, &mut self.ctx); + state.mark_closed(); + // Stop polling the response after a protocol error to avoid + // double-closing and duplicate `on_command_end` signalling. + self.response = None; + self.hooks.on_command_end(&mut self.ctx); + crate::metrics::inc_handler_errors(); + } + Some(Err(e)) => return Err(e), + None => { + state.mark_closed(); + if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { + self.process_frame_with_hooks_and_metrics(frame, out); + produced = true; + } + self.hooks.on_command_end(&mut self.ctx); + } + } + + Ok(produced) + } +} diff --git a/src/connection/shutdown.rs b/src/connection/shutdown.rs new file mode 100644 index 00000000..13ad033a --- /dev/null +++ b/src/connection/shutdown.rs @@ -0,0 +1,83 @@ +//! Shutdown handling for the connection actor. + +use log::info; + +use super::{ConnectionActor, multi_packet::MultiPacketTerminationReason, state::ActorState}; +use crate::{app::Packet, correlation::CorrelatableFrame, push::FrameLike}; + +impl ConnectionActor +where + F: FrameLike + CorrelatableFrame + Packet, + E: std::fmt::Debug, +{ + /// Begin shutdown once cancellation has been observed. + pub(super) fn process_shutdown(&mut self, state: &mut ActorState) { + state.start_shutdown(); + self.start_shutdown(state); + } + + /// Close all receivers and mark streaming sources as closed if present. + pub(super) fn start_shutdown(&mut self, state: &mut ActorState) { + if let Some(rx) = &mut self.high_rx { + rx.close(); + } + if let Some(rx) = &mut self.low_rx { + rx.close(); + } + if self.multi_packet.is_active() { + let correlation = self.multi_packet.correlation_id(); + self.log_multi_packet_closure(MultiPacketTerminationReason::Shutdown, correlation); + if let Some(mut rx) = self.multi_packet.take_channel() { + rx.close(); + state.mark_closed(); + } + self.clear_multi_packet(); + self.hooks.on_command_end(&mut self.ctx); + } + + if self.response.take().is_some() { + state.mark_closed(); + } + } + + /// Handle a closed multi-packet channel by emitting the protocol terminator + /// and notifying hooks. + pub(super) fn handle_multi_packet_closed( + &mut self, + reason: MultiPacketTerminationReason, + state: &mut ActorState, + out: &mut Vec, + ) { + let correlation = self.multi_packet.correlation_id(); + self.log_multi_packet_closure(reason, correlation); + if let Some(mut receiver) = self.multi_packet.take_channel() { + receiver.close(); + } + state.mark_closed(); + if let Some(frame) = self.hooks.stream_end_frame(&mut self.ctx) { + self.emit_multi_packet_frame(frame, out); + self.after_low(); + } + self.clear_multi_packet(); + self.hooks.on_command_end(&mut self.ctx); + } + + pub(super) fn log_multi_packet_closure( + &self, + reason: MultiPacketTerminationReason, + correlation_id: Option, + ) { + use log::warn; + let message = "multi-packet stream closed"; + match reason { + MultiPacketTerminationReason::Disconnected => warn!( + "{message}: reason={}, connection_id={:?}, peer={:?}, correlation_id={:?}", + reason, self.connection_id, self.peer_addr, correlation_id, + ), + _ => info!( + "{message}: reason={}, connection_id={:?}, peer={:?}, correlation_id={:?}", + reason, self.connection_id, self.peer_addr, correlation_id, + ), + } + } +} diff --git a/src/connection/state.rs b/src/connection/state.rs index 6e6791e6..5225de88 100644 --- a/src/connection/state.rs +++ b/src/connection/state.rs @@ -1,7 +1,7 @@ //! Actor lifecycle state management. /// Internal run state for the connection actor. -pub(super) enum RunState { +enum RunState { /// All sources are open and frames are still being processed. Active, /// A shutdown request has been observed and queues are being closed. @@ -13,8 +13,8 @@ pub(super) enum RunState { /// Tracks progress through the actor lifecycle. pub(super) struct ActorState { run_state: RunState, - pub(super) closed_sources: usize, - pub(super) total_sources: usize, + closed_sources: usize, + total_sources: usize, } impl ActorState { @@ -72,4 +72,10 @@ impl ActorState { /// Returns `true` when all sources have finished. pub(super) fn is_done(&self) -> bool { matches!(self.run_state, RunState::Finished) } + + /// Returns the number of sources that have been closed. + pub(super) fn closed_sources(&self) -> usize { self.closed_sources } + + /// Returns the total number of sources being tracked. + pub(super) fn total_sources(&self) -> usize { self.total_sources } } diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index bf2f7165..473ca0a9 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -103,8 +103,8 @@ impl ActorHarness { is_active: self.state.is_active(), is_shutting_down: self.state.is_shutting_down(), is_done: self.state.is_done(), - total_sources: self.state.total_sources, - closed_sources: self.state.closed_sources, + total_sources: self.state.total_sources(), + closed_sources: self.state.closed_sources(), } } /// Replace the low-priority receiver. @@ -208,8 +208,8 @@ impl ActorStateHarness { is_active: self.state.is_active(), is_shutting_down: self.state.is_shutting_down(), is_done: self.state.is_done(), - total_sources: self.state.total_sources, - closed_sources: self.state.closed_sources, + total_sources: self.state.total_sources(), + closed_sources: self.state.closed_sources(), } } } From 078aa068b66240079a6a346b43cef54fdf35ae34 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 28 Jan 2026 17:24:06 +0000 Subject: [PATCH 3/5] refactor(connection): replace manual multi_packet install with set_multi_packet_with_correlation Replaced the direct call to `self.multi_packet.install(channel, None)` and debug assertion with a call to the helper method `set_multi_packet_with_correlation(channel, None)`. This centralizes correlation handling and reduces code duplication in the connection module. Co-authored-by: terragon-labs[bot] --- src/connection/mod.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/connection/mod.rs b/src/connection/mod.rs index d7977163..4ad7b8bb 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -255,14 +255,7 @@ where /// Set or replace the current multi-packet response channel. pub fn set_multi_packet(&mut self, channel: Option>) { - debug_assert!( - self.response.is_none(), - concat!( - "ConnectionActor invariant violated: cannot set multi_packet while a ", - "response stream is active" - ), - ); - self.multi_packet.install(channel, None); + self.set_multi_packet_with_correlation(channel, None); } /// Set or replace the current multi-packet response channel and stamp correlation identifiers. From 2814200c5dbfa40a21c2b753a339fd97456c6b15 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 28 Jan 2026 23:13:14 +0000 Subject: [PATCH 4/5] refactor(connection): clean up shutdown handling and clarify comments - Replaced 'await_shutdown' with 'wait_shutdown' for shutdown cancellation handling to simplify code. - Added Clippy exemption for unreachable code in 'try_opportunistic_drain' to maintain exhaustive matching. - Improved documentation comments regarding multi-packet channel stamping behavior. - Removed unused 'await_shutdown' function to reduce dead code. Co-authored-by: terragon-labs[bot] --- src/connection/drain.rs | 14 +++++++------- src/connection/mod.rs | 2 +- src/connection/multi_packet.rs | 2 +- src/connection/polling.rs | 5 ----- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/connection/drain.rs b/src/connection/drain.rs index 1d7b2999..4efa0696 100644 --- a/src/connection/drain.rs +++ b/src/connection/drain.rs @@ -113,6 +113,10 @@ where /// Try to opportunistically drain a queue-backed source when fairness allows. /// /// Returns `true` when a frame is forwarded to `out`. + #[expect( + clippy::unreachable, + reason = "High variant is structurally unreachable but must remain for exhaustive matching" + )] pub(super) fn try_opportunistic_drain( &mut self, kind: QueueKind, @@ -121,14 +125,10 @@ where let DrainContext { out, state } = ctx; match kind { QueueKind::High => { - debug_assert!( - false, - concat!( - "try_opportunistic_drain(High) is unsupported; High is handled by biased ", - "polling" - ) + unreachable!( + "try_opportunistic_drain(High) is unsupported; High is handled by biased \ + polling" ); - false } QueueKind::Low => { let res = match self.low_rx.as_mut() { diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 4ad7b8bb..deaf4313 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -363,7 +363,7 @@ where tokio::select! { biased; - () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, + () = Self::wait_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, res = Self::poll_queue(self.high_rx.as_mut()), if high_available => Event::High(res), res = Self::poll_queue(self.low_rx.as_mut()), if low_available => Event::Low(res), res = Self::poll_queue(self.multi_packet.channel_mut()), if multi_available => Event::MultiPacket(res), diff --git a/src/connection/multi_packet.rs b/src/connection/multi_packet.rs index b6b4ab15..05e678b6 100644 --- a/src/connection/multi_packet.rs +++ b/src/connection/multi_packet.rs @@ -60,7 +60,7 @@ impl MultiPacketContext { /// Install a multi-packet channel with an optional correlation identifier. /// /// When `channel` is `Some`, stamping is enabled with the provided `correlation_id`. - /// When `channel` is `None`, stamping is disabled. + /// When `channel` is `None`, stamping is disabled and `correlation_id` is ignored. pub(super) fn install( &mut self, channel: Option>, diff --git a/src/connection/polling.rs b/src/connection/polling.rs index 6587b4e1..77cf3192 100644 --- a/src/connection/polling.rs +++ b/src/connection/polling.rs @@ -42,11 +42,6 @@ where } } - /// Await shutdown cancellation on the provided token. - pub(super) async fn await_shutdown(token: CancellationToken) { - Self::wait_shutdown(token).await; - } - /// Poll whichever receiver is provided, returning `None` when absent. /// /// Multi-packet channels reuse this helper so they share back-pressure with queued frames. From 3dfb64e3350c081a7635aa576b871f8606d6f770 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 29 Jan 2026 00:07:06 +0000 Subject: [PATCH 5/5] refactor(connection): replace unreachable!() with debug_assert!() in drain.rs Changed try_opportunistic_drain for QueueKind::High from unreachable!() call to a debug_assert!(false) and returning false. Removed the #[expect(clippy::unreachable)] attribute. This improves clarity and debugging while preserving behavior. Co-authored-by: terragon-labs[bot] --- src/connection/drain.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/connection/drain.rs b/src/connection/drain.rs index 4efa0696..3f742acb 100644 --- a/src/connection/drain.rs +++ b/src/connection/drain.rs @@ -113,10 +113,6 @@ where /// Try to opportunistically drain a queue-backed source when fairness allows. /// /// Returns `true` when a frame is forwarded to `out`. - #[expect( - clippy::unreachable, - reason = "High variant is structurally unreachable but must remain for exhaustive matching" - )] pub(super) fn try_opportunistic_drain( &mut self, kind: QueueKind, @@ -125,10 +121,8 @@ where let DrainContext { out, state } = ctx; match kind { QueueKind::High => { - unreachable!( - "try_opportunistic_drain(High) is unsupported; High is handled by biased \ - polling" - ); + debug_assert!(false, "try_opportunistic_drain(High) should not be called"); + false } QueueKind::Low => { let res = match self.low_rx.as_mut() {