diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 6ad8bd67..87868ceb 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -353,12 +353,20 @@ classDiagram class PushQueues~F~ { +high_priority_rx: mpsc::Receiver +low_priority_rx: mpsc::Receiver - +bounded(high_capacity: usize, low_capacity: usize): (PushQueues~F~, PushHandle~F~) + +builder(): PushQueuesBuilder~F~ +recv(): Option<(PushPriority, F)> } + class PushQueuesBuilder~F~ { + +high_capacity(cap: usize): PushQueuesBuilder~F~ + +low_capacity(cap: usize): PushQueuesBuilder~F~ + +rate(rate: Option): PushQueuesBuilder~F~ + +dlq(sender: Option>): PushQueuesBuilder~F~ + +build(): (PushQueues~F~, PushHandle~F~) + } PushHandleInner <.. PushHandle~F~ : contains - PushQueues~F~ o-- PushHandle~F~ : bounded(high_capacity, low_capacity) + PushQueues~F~ o-- PushQueuesBuilder~F~ : builder() + PushQueuesBuilder~F~ o-- PushHandle~F~ : build() PushHandle --> PushPriority PushHandle --> PushPolicy PushHandle --> PushError diff --git a/docs/efficiency-report.md b/docs/efficiency-report.md index ed004504..abc59548 100644 --- a/docs/efficiency-report.md +++ b/docs/efficiency-report.md @@ -2,44 +2,53 @@ ## Executive Summary -This report documents efficiency improvement opportunities identified in the wireframe Rust library codebase. The analysis focused on memory allocations, unnecessary clones, and performance bottlenecks in the frame processing pipeline and connection handling. +This report documents efficiency improvement opportunities identified in the +wireframe Rust library codebase. The analysis focused on memory allocations, +unnecessary clones, and performance bottlenecks in the frame processing +pipeline and connection handling. ## Key Findings ### 1. Frame Processor Unnecessary Allocation (HIGH IMPACT) -**Location**: `src/frame/processor.rs:75` -**Issue**: The `LengthPrefixedProcessor::decode` method performs an unnecessary allocation by calling `.to_vec()` on a `BytesMut` returned from `split_to()`. +**Location**: `src/frame/processor.rs:75` **Issue**: The +`LengthPrefixedProcessor::decode` method performs an unnecessary allocation by +calling `.to_vec()` on a `BytesMut` returned from `split_to()`. ```rust // Current inefficient code: Ok(Some(src.split_to(len).to_vec())) ``` -**Impact**: This allocation occurs for every frame processed, creating performance overhead in high-throughput scenarios. +**Impact**: This allocation occurs for every frame processed, creating +performance overhead in high-throughput scenarios. -**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type to work directly with `Bytes` to avoid the conversion entirely. +**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type +to work directly with `Bytes` to avoid the conversion entirely. -**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more efficient. +**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more +efficient. ### 2. Connection Actor Clone Operations (MEDIUM IMPACT) -**Location**: `src/connection.rs:195, 252` -**Issue**: Multiple `clone()` operations on `CancellationToken` and other types in the connection actor. +**Location**: `src/connection.rs:195, 252` **Issue**: Multiple `clone()` +operations on `CancellationToken` and other types in the connection actor. ```rust pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } () = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, ``` -**Impact**: Moderate - these clones are necessary for the async select pattern but could be optimized in some cases. +**Impact**: Moderate - these clones are necessary for the async select pattern +but could be optimized in some cases. -**Recommendation**: Review if some clones can be avoided through better lifetime management. +**Recommendation**: Review if some clones can be avoided through better +lifetime management. ### 3. Middleware Chain Building (MEDIUM IMPACT) -**Location**: `src/app.rs:599` -**Issue**: Handler cloning during middleware chain construction. +**Location**: `src/app.rs:599` **Issue**: Handler cloning during middleware +chain construction. ```rust let mut service = HandlerService::new(id, handler.clone()); @@ -53,7 +62,8 @@ let mut service = HandlerService::new(id, handler.clone()); **Location**: `src/session.rs:47-55` -**Issue**: `Vec::with_capacity` followed by potential reallocation during `retain_and_collect`. +**Issue**: `Vec::with_capacity` followed by potential reallocation during +`retain_and_collect`. ```rust let mut out = Vec::with_capacity(self.0.len()); @@ -61,13 +71,15 @@ let mut out = Vec::with_capacity(self.0.len()); **Impact**: Low to medium - depends on registry size and pruning frequency. -**Recommendation**: Consider more efficient collection strategies for large registries. +**Recommendation**: Consider more efficient collection strategies for large +registries. ### 5. Vector Initializations (LOW IMPACT) **Location**: Various files -**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is known. +**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is +known. **Impact**: Low - minor allocation optimizations. @@ -96,19 +108,24 @@ let mut out = Vec::with_capacity(self.0.len()); ## Implemented Optimizations ### Frame Processor Optimization -**Change**: Modified `LengthPrefixedProcessor::decode` to use `freeze().to_vec()` instead of direct `.to_vec()`. + +**Change**: Modified `LengthPrefixedProcessor::decode` to use +`freeze().to_vec()` instead of direct `.to_vec()`. **Before**: + ```rust Ok(Some(src.split_to(len).to_vec())) ``` **After**: + ```rust Ok(Some(src.split_to(len).freeze().to_vec())) ``` **Benefits**: + - Reduces memory allocations in the frame processing hot path - Maintains API compatibility with existing code - Improves performance for high-throughput scenarios @@ -116,15 +133,20 @@ Ok(Some(src.split_to(len).freeze().to_vec())) ## Future Optimization Opportunities -1. **Frame Type Optimization**: Consider changing the frame type from `Vec` to `Bytes` to eliminate the final `.to_vec()` call entirely. +1. **Frame Type Optimization**: Consider changing the frame type from `Vec` + to `Bytes` to eliminate the final `.to_vec()` call entirely. -2. **Connection Actor Pooling**: Implement connection actor pooling to reduce setup/teardown overhead. +2. **Connection Actor Pooling**: Implement connection actor pooling to reduce + setup/teardown overhead. -3. **Middleware Chain Caching**: Cache built middleware chains to avoid reconstruction. +3. **Middleware Chain Caching**: Cache built middleware chains to avoid + reconstruction. -4. **Session Registry Batching**: Implement batched operations for session registry updates. +4. **Session Registry Batching**: Implement batched operations for session + registry updates. -5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where possible. +5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where + possible. ## Testing and Validation @@ -138,6 +160,10 @@ All optimizations have been tested to ensure: ## Conclusion -The implemented frame processor optimization provides immediate performance benefits for the most critical code path in the wireframe library. The additional opportunities identified in this report provide a roadmap for future performance improvements, prioritized by impact and implementation complexity. +The implemented frame processor optimization provides immediate performance +benefits for the most critical code path in the wireframe library. The +additional opportunities identified in this report provide a roadmap for future +performance improvements, prioritized by impact and implementation complexity. -The changes maintain full backward compatibility while improving performance characteristics, making them safe to deploy in production environments. +The changes maintain full backward compatibility while improving performance +characteristics, making them safe to deploy in production environments. diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 1392b53e..7e8bd68c 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -253,11 +253,20 @@ token-bucket algorithm is ideal. use wireframe::push::{PushQueues, MAX_PUSH_RATE}; // Configure a connection to allow at most MAX_PUSH_RATE pushes per second. -let (queues, handle) = PushQueues::::bounded_with_rate(8, 8, Some(MAX_PUSH_RATE)) +let (queues, handle) = PushQueues::::builder() + .high_capacity(8) + .low_capacity(8) + .rate(Some(MAX_PUSH_RATE)) + .build() .expect("rate within supported bounds"); // Passing `None` disables rate limiting entirely: -let (_unlimited, _handle) = PushQueues::::bounded_no_rate_limit(8, 8); +let (_unlimited, _handle) = PushQueues::::builder() + .high_capacity(8) + .low_capacity(8) + .rate(None) + .build() + .expect("failed to build unlimited queues"); // Inside PushHandle::push() async fn push(&self, frame: Frame) -> Result<(), PushError> { diff --git a/src/connection.rs b/src/connection.rs index ca5b50a4..884a0b8d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -98,7 +98,11 @@ impl Default for FairnessConfig { /// use tokio_util::sync::CancellationToken; /// use wireframe::{connection::ConnectionActor, push::PushQueues}; /// -/// let (queues, handle) = PushQueues::::bounded(8, 8); +/// let (queues, handle) = PushQueues::::builder() +/// .high_capacity(8) +/// .low_capacity(8) +/// .build() +/// .expect("failed to build PushQueues"); /// let shutdown = CancellationToken::new(); /// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); /// # drop(actor); @@ -129,7 +133,11 @@ where /// use tokio_util::sync::CancellationToken; /// use wireframe::{connection::ConnectionActor, push::PushQueues}; /// - /// let (queues, handle) = PushQueues::::bounded(4, 4); + /// let (queues, handle) = PushQueues::::builder() + /// .high_capacity(4) + /// .low_capacity(4) + /// .build() + /// .expect("failed to build PushQueues"); /// let token = CancellationToken::new(); /// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, token); /// # drop(actor); diff --git a/src/push.rs b/src/push.rs deleted file mode 100644 index 317f5250..00000000 --- a/src/push.rs +++ /dev/null @@ -1,477 +0,0 @@ -//! Prioritised queues used for asynchronously pushing frames to a connection. -//! -//! `PushQueues` maintain separate high- and low-priority channels so -//! background tasks can send messages without blocking the request/response -//! cycle. Producers interact with these queues through a cloneable -//! [`PushHandle`]. Queued frames are delivered in FIFO order within each -//! priority level. An optional rate limiter caps throughput at -//! [`MAX_PUSH_RATE`] pushes per second. - -use std::{ - sync::{Arc, Weak}, - time::Duration, -}; - -use leaky_bucket::RateLimiter; -use tokio::sync::mpsc; -use tracing::{debug, error, warn}; - -/// Messages can be sent through a [`PushHandle`]. -/// -/// The trait is intentionally empty: any type that is `Send` and `'static` -/// is considered a valid frame for pushing to a connection. -pub trait FrameLike: Send + 'static {} - -impl FrameLike for T where T: Send + 'static {} - -// Default maximum pushes per second when no custom rate is specified. -// This is an internal implementation detail and may change. -const DEFAULT_PUSH_RATE: usize = 100; -/// Highest supported rate for [`PushQueues::bounded_with_rate`]. -pub const MAX_PUSH_RATE: usize = 10_000; - -// Compile-time guard: DEFAULT_PUSH_RATE must not exceed MAX_PUSH_RATE. -const _: usize = MAX_PUSH_RATE - DEFAULT_PUSH_RATE; - -/// Priority level for outbound messages. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum PushPriority { - High, - Low, -} - -/// Behaviour when a push queue is full. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum PushPolicy { - /// Return an error to the caller if the queue is full. - ReturnErrorIfFull, - /// Silently drop the frame. - DropIfFull, - /// Drop the frame but emit a log warning. - WarnAndDropIfFull, -} - -/// Errors that can occur when pushing a frame. -#[derive(Debug)] -pub enum PushError { - /// The queue was at capacity and the policy was `ReturnErrorIfFull`. - QueueFull, - /// The receiving end of the queue has been dropped. - Closed, -} - -impl std::fmt::Display for PushError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::QueueFull => f.write_str("push queue full"), - Self::Closed => f.write_str("push queue closed"), - } - } -} - -impl std::error::Error for PushError {} - -/// Errors returned when creating push queues. -#[derive(Debug)] -pub enum PushConfigError { - /// The provided rate was zero or exceeded [`MAX_PUSH_RATE`]. - InvalidRate(usize), -} - -impl std::fmt::Display for PushConfigError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::InvalidRate(r) => { - write!(f, "invalid rate {r}; must be between 1 and {MAX_PUSH_RATE}") - } - } - } -} - -impl std::error::Error for PushConfigError {} - -/// Shared state for [`PushHandle`]. -/// -/// Holds the high- and low-priority channels alongside an optional rate -/// limiter and dead-letter queue sender used when pushes are discarded. -/// -/// - `high_prio_tx` – channel for frames that must be sent before any low-priority traffic. -/// - `low_prio_tx` – channel for best-effort frames. -/// - `limiter` – optional rate-limiter enforcing global push throughput. -/// - `dlq_tx` – optional dead-letter queue for discarded frames. -pub(crate) struct PushHandleInner { - high_prio_tx: mpsc::Sender, - low_prio_tx: mpsc::Sender, - limiter: Option, - dlq_tx: Option>, -} - -/// Cloneable handle used by producers to push frames to a connection. -#[derive(Clone)] -pub struct PushHandle(Arc>); - -impl PushHandle { - pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } - - /// Internal helper to push a frame with the requested priority. - /// - /// Waits on the rate limiter if configured and sends the frame to the - /// appropriate channel, mapping send errors to [`PushError`]. - async fn push_with_priority(&self, frame: F, priority: PushPriority) -> Result<(), PushError> { - if let Some(ref limiter) = self.0.limiter { - limiter.acquire(1).await; - } - let tx = match priority { - PushPriority::High => &self.0.high_prio_tx, - PushPriority::Low => &self.0.low_prio_tx, - }; - tx.send(frame).await.map_err(|_| PushError::Closed)?; - debug!(?priority, "frame pushed"); - Ok(()) - } - /// Push a high-priority frame subject to rate limiting. - /// - /// The call awaits if the rate limiter has no available tokens or - /// the queue is full. - /// - /// # Errors - /// - /// Returns [`PushError::Closed`] if the receiving end has been dropped. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::{PushPriority, PushQueues}; - /// - /// #[tokio::test] - /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)); - /// handle.push_high_priority(42u8).await.unwrap(); - /// let (priority, frame) = queues.recv().await.unwrap(); - /// assert_eq!(priority, PushPriority::High); - /// assert_eq!(frame, 42); - /// } - /// ``` - pub async fn push_high_priority(&self, frame: F) -> Result<(), PushError> { - self.push_with_priority(frame, PushPriority::High).await - } - - /// Push a low-priority frame subject to rate limiting. - /// - /// Awaits if the rate limiter has no available tokens or the queue is full. - /// - /// # Errors - /// - /// Returns [`PushError::Closed`] if the receiving end has been dropped. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::{PushPriority, PushQueues}; - /// - /// #[tokio::test] - /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)); - /// handle.push_low_priority(10u8).await.unwrap(); - /// let (priority, frame) = queues.recv().await.unwrap(); - /// assert_eq!(priority, PushPriority::Low); - /// assert_eq!(frame, 10); - /// } - /// ``` - pub async fn push_low_priority(&self, frame: F) -> Result<(), PushError> { - self.push_with_priority(frame, PushPriority::Low).await - } - - /// Send a frame to the configured dead letter queue if available. - fn route_to_dlq(&self, frame: F) - where - F: std::fmt::Debug, - { - if let Some(dlq) = &self.0.dlq_tx { - match dlq.try_send(frame) { - Ok(()) => {} - Err(mpsc::error::TrySendError::Full(f)) => { - error!(?f, "push queue and DLQ full; frame lost"); - } - Err(mpsc::error::TrySendError::Closed(f)) => { - error!(?f, "DLQ closed; frame lost"); - } - } - } - } - - /// Attempt to push a frame with the given priority and policy. - /// - /// # Errors - /// - /// Returns [`PushError::QueueFull`] if the queue is full and the policy is - /// [`PushPolicy::ReturnErrorIfFull`]. Returns [`PushError::Closed`] if the - /// receiving end has been dropped. When [`PushPolicy::DropIfFull`] or - /// [`PushPolicy::WarnAndDropIfFull`] is used, a configured dead letter queue - /// receives the dropped frame. - /// - /// # Examples - /// - /// ```rust,no_run - /// use tokio::sync::mpsc; - /// use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; - /// - /// #[tokio::test] - /// async fn example() { - /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - /// let (mut queues, handle) = - /// PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); - /// handle.push_high_priority(1u8).await.unwrap(); - /// - /// handle - /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) - /// .unwrap(); - /// - /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); - /// let _ = queues.recv().await; - /// } - /// ``` - pub fn try_push( - &self, - frame: F, - priority: PushPriority, - policy: PushPolicy, - ) -> Result<(), PushError> - where - F: std::fmt::Debug, - { - let tx = match priority { - PushPriority::High => &self.0.high_prio_tx, - PushPriority::Low => &self.0.low_prio_tx, - }; - - match tx.try_send(frame) { - Ok(()) => Ok(()), - Err(mpsc::error::TrySendError::Full(f)) => match policy { - PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull), - PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => { - if matches!(policy, PushPolicy::WarnAndDropIfFull) { - warn!( - ?priority, - ?policy, - dlq = self.0.dlq_tx.is_some(), - "push queue full" - ); - } - self.route_to_dlq(f); - Ok(()) - } - }, - Err(mpsc::error::TrySendError::Closed(_)) => Err(PushError::Closed), - } - } - - /// Downgrade to a `Weak` reference for storage in a registry. - pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } -} - -/// Receiver ends of the push queues stored by the connection actor. -pub struct PushQueues { - pub(crate) high_priority_rx: mpsc::Receiver, - pub(crate) low_priority_rx: mpsc::Receiver, -} - -impl PushQueues { - /// Create a new set of queues with the specified bounds for each priority - /// and return them along with a [`PushHandle`] for producers. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::{PushPriority, PushQueues}; - /// - /// #[tokio::test] - /// async fn example() { - /// let (mut queues, handle) = PushQueues::::bounded(1, 1); - /// handle.push_high_priority(7u8).await.unwrap(); - /// let (priority, frame) = queues.recv().await.unwrap(); - /// assert_eq!(priority, PushPriority::High); - /// assert_eq!(frame, 7); - /// } - /// ``` - /// - /// # Panics - /// - /// Panics if an internal invariant is violated. This should never occur. - #[must_use] - pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle) { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None) - .expect("DEFAULT_PUSH_RATE is always valid") - } - - /// Create queues with no rate limiting. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::PushQueues; - /// - /// let (_queues, handle) = PushQueues::::bounded_no_rate_limit(1, 1); - /// let _ = handle; - /// ``` - /// - /// # Panics - /// - /// Panics if an internal invariant is violated. This should never occur. - #[must_use] - pub fn bounded_no_rate_limit( - high_capacity: usize, - low_capacity: usize, - ) -> (Self, PushHandle) { - // `bounded_with_rate_dlq` only fails when given an invalid rate. Passing - // `None` disables rate limiting entirely so the call is infallible. The - // debug assertion guards against future regressions. - let result = Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None); - debug_assert!(result.is_ok(), "bounded_no_rate_limit should not fail"); - result.expect("bounded_no_rate_limit should not fail") - } - - /// Create queues with a custom rate limit in pushes per second. - /// - /// The limiter enforces fairness by allowing at most `rate` pushes - /// per second across all producers for the returned [`PushHandle`]. - /// Pass `None` to disable rate limiting entirely. - /// - /// # Errors - /// - /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater - /// than [`MAX_PUSH_RATE`]. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::PushQueues; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut queues, handle) = PushQueues::::bounded_with_rate(1, 1, Some(10)).unwrap(); - /// handle.push_low_priority(1u8).await.unwrap(); - /// let (_prio, frame) = queues.recv().await.unwrap(); - /// assert_eq!(frame, 1); - /// } - /// ``` - pub fn bounded_with_rate( - high_capacity: usize, - low_capacity: usize, - rate: Option, - ) -> Result<(Self, PushHandle), PushConfigError> { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None) - } - - /// Create queues with a custom rate limit and optional dead letter queue. - /// - /// Frames that would be dropped by [`try_push`](PushHandle::try_push) when - /// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] - /// are routed to `dlq` if provided. - /// - /// # Errors - /// - /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater - /// than [`MAX_PUSH_RATE`]. - /// - /// # Examples - /// - /// ```rust,no_run - /// use tokio::sync::mpsc; - /// use wireframe::push::{PushPolicy, PushPriority, PushQueues}; - /// - /// #[tokio::main] - /// async fn main() { - /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - /// let (mut queues, handle) = - /// PushQueues::::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); - /// handle.push_high_priority(1u8).await.unwrap(); - /// handle - /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) - /// .unwrap(); - /// - /// let (_, val) = queues.recv().await.unwrap(); - /// assert_eq!(val, 1); - /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); - /// } - /// ``` - pub fn bounded_with_rate_dlq( - high_capacity: usize, - low_capacity: usize, - rate: Option, - dlq: Option>, - ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { - // Reject unsupported rates early to avoid building queues that cannot - // be used. The bounds prevent runaway resource consumption. - return Err(PushConfigError::InvalidRate(r)); - } - let (high_tx, high_rx) = mpsc::channel(high_capacity); - let (low_tx, low_rx) = mpsc::channel(low_capacity); - let limiter = rate.map(|r| { - RateLimiter::builder() - .initial(r) - .refill(r) - .interval(Duration::from_secs(1)) - .max(r) - .build() - }); - let inner = PushHandleInner { - high_prio_tx: high_tx, - low_prio_tx: low_tx, - limiter, - dlq_tx: dlq, - }; - Ok(( - Self { - high_priority_rx: high_rx, - low_priority_rx: low_rx, - }, - PushHandle(Arc::new(inner)), - )) - } - - /// Receive the next frame, preferring high priority frames when available. - /// - /// Returns `None` when both queues are closed and empty. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::{PushPriority, PushQueues}; - /// - /// #[tokio::test] - /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded(1, 1); - /// handle.push_high_priority(2u8).await.unwrap(); - /// let (priority, frame) = queues.recv().await.unwrap(); - /// assert_eq!(priority, PushPriority::High); - /// assert_eq!(frame, 2); - /// } - /// ``` - pub async fn recv(&mut self) -> Option<(PushPriority, F)> { - tokio::select! { - biased; - res = self.high_priority_rx.recv() => res.map(|f| (PushPriority::High, f)), - res = self.low_priority_rx.recv() => res.map(|f| (PushPriority::Low, f)), - } - } - - /// Close both receivers to prevent further pushes from being accepted. - /// - /// This is primarily used in tests to release resources when no actor is - /// draining the queues. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::PushQueues; - /// - /// let (mut queues, _handle) = PushQueues::::bounded(1, 1); - /// queues.close(); - /// ``` - pub fn close(&mut self) { - self.high_priority_rx.close(); - self.low_priority_rx.close(); - } -} diff --git a/src/push/mod.rs b/src/push/mod.rs new file mode 100644 index 00000000..9fc11afa --- /dev/null +++ b/src/push/mod.rs @@ -0,0 +1,31 @@ +//! Prioritised queues used for asynchronously pushing frames to a connection. +//! +//! # Overview +//! Expose prioritised push queues and their handle. Construct via a fluent +//! builder. +//! +//! # Example +//! ```no_run +//! use wireframe::push::PushQueues; +//! +//! let (queues, handle) = PushQueues::::builder() +//! .high_capacity(8) +//! .low_capacity(8) +//! .build() +//! .expect("failed to build PushQueues"); +//! # drop((queues, handle)); +//! ``` + +mod queues; + +pub(crate) use self::queues::{FrameLike, PushHandleInner}; +pub use self::queues::{ + MAX_PUSH_RATE, + PushConfigError, + PushError, + PushHandle, + PushPolicy, + PushPriority, + PushQueues, + PushQueuesBuilder, +}; diff --git a/src/push/queues/builder.rs b/src/push/queues/builder.rs new file mode 100644 index 00000000..9a855cf4 --- /dev/null +++ b/src/push/queues/builder.rs @@ -0,0 +1,110 @@ +//! Builder for configuring push queues. + +use tokio::sync::mpsc; + +use super::{DEFAULT_PUSH_RATE, FrameLike, PushConfigError, PushHandle, PushQueues}; + +/// Builder for [`PushQueues`]. +/// +/// Allows configuration of queue capacities, rate limiting and an optional +/// dead-letter queue before constructing [`PushQueues`] and its paired +/// [`PushHandle`]. Defaults mirror the previous constructors: both queues have +/// a capacity of one and pushes are limited to [`DEFAULT_PUSH_RATE`] per +/// second unless overridden. Construct via [`PushQueues::builder`] or +/// [`Default::default`]. +/// +/// # Examples +/// +/// Build queues with custom capacities, a rate limit and a dead-letter queue: +/// +/// ```rust,no_run +/// use tokio::sync::mpsc; +/// use wireframe::push::PushQueues; +/// +/// # async fn demo() { +/// let (dlq_tx, _dlq_rx) = mpsc::channel(8); +/// let (_queues, _handle) = PushQueues::::builder() +/// .high_capacity(8) +/// .low_capacity(8) +/// .rate(Some(100)) // pass None to disable rate limiting +/// .dlq(Some(dlq_tx)) // frames are dropped if no DLQ or DLQ is full +/// .build() +/// .expect("failed to build PushQueues"); +/// # } +/// ``` +/// +/// Builders can also be constructed directly: +/// +/// ``` +/// use wireframe::push::PushQueuesBuilder; +/// +/// let (_queues, _handle) = PushQueuesBuilder::::default() +/// .build() +/// .expect("failed to build PushQueues"); +/// # drop((_queues, _handle)); +/// ``` +#[derive(Debug)] +pub struct PushQueuesBuilder { + high_capacity: usize, + low_capacity: usize, + rate: Option, + dlq: Option>, +} + +impl Default for PushQueuesBuilder { + fn default() -> Self { + Self { + high_capacity: 1, + low_capacity: 1, + rate: Some(DEFAULT_PUSH_RATE), + dlq: None, + } + } +} + +impl PushQueuesBuilder { + /// Set the capacity of the high-priority queue. + #[must_use] + pub fn high_capacity(mut self, capacity: usize) -> Self { + debug_assert!(capacity > 0, "capacity must be greater than zero"); + self.high_capacity = capacity; + self + } + + /// Set the capacity of the low-priority queue. + #[must_use] + pub fn low_capacity(mut self, capacity: usize) -> Self { + debug_assert!(capacity > 0, "capacity must be greater than zero"); + self.low_capacity = capacity; + self + } + + /// Set the global push rate limit in pushes per second. + /// + /// Passing `None` disables rate limiting. + #[must_use] + pub fn rate(mut self, rate: Option) -> Self { + self.rate = rate; + self + } + + /// Provide a dead-letter queue for discarded frames. + /// + /// Frames are dropped when no DLQ is set or the channel is full. + #[must_use] + pub fn dlq(mut self, dlq: Option>) -> Self { + self.dlq = dlq; + self + } + + /// Build the configured [`PushQueues`] and associated [`PushHandle`]. + /// + /// # Errors + /// + /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or strictly + /// greater than [`super::MAX_PUSH_RATE`] and + /// [`PushConfigError::InvalidCapacity`] if either queue capacity is zero. + pub fn build(self) -> Result<(PushQueues, PushHandle), PushConfigError> { + PushQueues::build_with_rate_dlq(self.high_capacity, self.low_capacity, self.rate, self.dlq) + } +} diff --git a/src/push/queues/errors.rs b/src/push/queues/errors.rs new file mode 100644 index 00000000..5d1a4349 --- /dev/null +++ b/src/push/queues/errors.rs @@ -0,0 +1,29 @@ +//! Error types for push queue operations and configuration. + +use thiserror::Error; + +use super::MAX_PUSH_RATE; + +/// Errors that can occur when pushing a frame. +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum PushError { + /// The queue was at capacity and the policy was `ReturnErrorIfFull`. + #[error("push queue full")] + QueueFull, + /// The receiving end of the queue has been dropped. + #[error("push queue closed")] + Closed, +} + +/// Errors returned when creating push queues. +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum PushConfigError { + /// The provided rate was zero or exceeded [`MAX_PUSH_RATE`]. + #[error("invalid rate {0}; must be between 1 and {max}", max = MAX_PUSH_RATE)] + InvalidRate(usize), + /// The provided capacities were zero. + #[error("invalid capacities; high={high}, low={low}; each must be ≥ 1")] + InvalidCapacity { high: usize, low: usize }, +} diff --git a/src/push/queues/handle.rs b/src/push/queues/handle.rs new file mode 100644 index 00000000..8de6a271 --- /dev/null +++ b/src/push/queues/handle.rs @@ -0,0 +1,205 @@ +//! Cloneable handle used by producers to push frames to a connection. + +use std::sync::{Arc, Weak}; + +use leaky_bucket::RateLimiter; +use tokio::sync::mpsc; +use tracing::{debug, error, warn}; + +use super::{FrameLike, PushError, PushPolicy, PushPriority}; + +/// Shared state for [`PushHandle`]. +/// +/// Holds the high- and low-priority channels alongside an optional rate +/// limiter and dead-letter queue sender used when pushes are discarded. +/// +/// - `high_prio_tx` – channel for frames that must be sent before any low-priority traffic. +/// - `low_prio_tx` – channel for best-effort frames. +/// - `limiter` – optional rate-limiter enforcing global push throughput. +/// - `dlq_tx` – optional dead-letter queue for discarded frames. +pub(crate) struct PushHandleInner { + pub(crate) high_prio_tx: mpsc::Sender, + pub(crate) low_prio_tx: mpsc::Sender, + pub(crate) limiter: Option, + pub(crate) dlq_tx: Option>, +} + +/// Cloneable handle used by producers to push frames to a connection. +#[derive(Clone)] +pub struct PushHandle(Arc>); + +impl PushHandle { + pub(crate) fn from_arc(arc: Arc>) -> Self { Self(arc) } + + /// Internal helper to push a frame with the requested priority. + /// + /// Waits on the rate limiter if configured and sends the frame to the + /// appropriate channel, mapping send errors to [`PushError`]. + async fn push_with_priority(&self, frame: F, priority: PushPriority) -> Result<(), PushError> { + if let Some(ref limiter) = self.0.limiter { + limiter.acquire(1).await; + } + let tx = match priority { + PushPriority::High => &self.0.high_prio_tx, + PushPriority::Low => &self.0.low_prio_tx, + }; + tx.send(frame).await.map_err(|_| PushError::Closed)?; + debug!(?priority, "frame pushed"); + Ok(()) + } + + /// Push a high-priority frame subject to rate limiting. + /// + /// The call awaits if the rate limiter has no available tokens or + /// the queue is full. + /// + /// # Errors + /// + /// Returns [`PushError::Closed`] if the receiving end has been dropped. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::push::{PushPriority, PushQueues}; + /// + /// #[tokio::test] + /// async fn example() { + /// let (mut queues, handle) = PushQueues::::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .rate(Some(1)) + /// .build() + /// .expect("failed to build PushQueues"); + /// handle.push_high_priority(42u8).await.expect("push failed"); + /// let (priority, frame) = queues.recv().await.expect("recv failed"); + /// assert_eq!(priority, PushPriority::High); + /// assert_eq!(frame, 42); + /// } + /// ``` + pub async fn push_high_priority(&self, frame: F) -> Result<(), PushError> { + self.push_with_priority(frame, PushPriority::High).await + } + + /// Push a low-priority frame subject to rate limiting. + /// + /// Awaits if the rate limiter has no available tokens or the queue is full. + /// + /// # Errors + /// + /// Returns [`PushError::Closed`] if the receiving end has been dropped. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::push::{PushPriority, PushQueues}; + /// + /// #[tokio::test] + /// async fn example() { + /// let (mut queues, handle) = PushQueues::::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .rate(Some(1)) + /// .build() + /// .expect("failed to build PushQueues"); + /// handle.push_low_priority(10u8).await.expect("push failed"); + /// let (priority, frame) = queues.recv().await.expect("recv failed"); + /// assert_eq!(priority, PushPriority::Low); + /// assert_eq!(frame, 10); + /// } + /// ``` + pub async fn push_low_priority(&self, frame: F) -> Result<(), PushError> { + self.push_with_priority(frame, PushPriority::Low).await + } + + /// Send a frame to the configured dead letter queue if available. + fn route_to_dlq(&self, frame: F) + where + F: std::fmt::Debug, + { + if let Some(dlq) = &self.0.dlq_tx { + match dlq.try_send(frame) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(f)) => { + error!(?f, "push queue and DLQ full; frame lost"); + } + Err(mpsc::error::TrySendError::Closed(f)) => { + error!(?f, "DLQ closed; frame lost"); + } + } + } + } + + /// Attempt to push a frame with the given priority and policy. + /// + /// # Errors + /// + /// Returns [`PushError::QueueFull`] if the queue is full and the policy is + /// [`PushPolicy::ReturnErrorIfFull`]. Returns [`PushError::Closed`] if the + /// receiving end has been dropped. When [`PushPolicy::DropIfFull`] or + /// [`PushPolicy::WarnAndDropIfFull`] is used, a configured dead letter queue + /// receives the dropped frame. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::sync::mpsc; + /// use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; + /// + /// #[tokio::test] + /// async fn example() { + /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); + /// let (mut queues, handle) = PushQueues::::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .rate(None) + /// .dlq(Some(dlq_tx)) + /// .build() + /// .expect("failed to build PushQueues"); + /// handle.push_high_priority(1u8).await.expect("push failed"); + /// + /// handle + /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) + /// .expect("try_push failed"); + /// + /// assert_eq!(dlq_rx.recv().await.expect("recv failed"), 2); + /// let _ = queues.recv().await; + /// } + /// ``` + pub fn try_push( + &self, + frame: F, + priority: PushPriority, + policy: PushPolicy, + ) -> Result<(), PushError> + where + F: std::fmt::Debug, + { + let tx = match priority { + PushPriority::High => &self.0.high_prio_tx, + PushPriority::Low => &self.0.low_prio_tx, + }; + + match tx.try_send(frame) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(f)) => match policy { + PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull), + PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => { + if matches!(policy, PushPolicy::WarnAndDropIfFull) { + warn!( + ?priority, + ?policy, + dlq = self.0.dlq_tx.is_some(), + "push queue full" + ); + } + self.route_to_dlq(f); + Ok(()) + } + }, + Err(mpsc::error::TrySendError::Closed(_)) => Err(PushError::Closed), + } + } + + /// Downgrade to a `Weak` reference for storage in a registry. + pub(crate) fn downgrade(&self) -> Weak> { Arc::downgrade(&self.0) } +} diff --git a/src/push/queues/mod.rs b/src/push/queues/mod.rs new file mode 100644 index 00000000..d60d5376 --- /dev/null +++ b/src/push/queues/mod.rs @@ -0,0 +1,248 @@ +//! Queue management used by [`PushHandle`] and [`PushQueues`]. +//! +//! Provides the core implementation for prioritised queues delivering frames +//! to a connection. Background tasks can send messages without blocking the +//! request/response cycle. Frames maintain FIFO order within each priority +//! level. An optional rate limiter caps throughput at [`MAX_PUSH_RATE`] pushes +//! per second. + +use std::{sync::Arc, time::Duration}; + +use leaky_bucket::RateLimiter; +use tokio::sync::mpsc; + +mod builder; +mod errors; +mod handle; + +pub use builder::PushQueuesBuilder; +pub use errors::{PushConfigError, PushError}; +pub use handle::PushHandle; +pub(crate) use handle::PushHandleInner; + +/// Messages can be sent through a [`PushHandle`]. +/// +/// The trait is intentionally empty: any type that is `Send` and `'static` +/// is considered a valid frame for pushing to a connection. +pub trait FrameLike: Send + 'static {} + +impl FrameLike for T where T: Send + 'static {} + +// Default maximum pushes per second when no custom rate is specified. +// This is an internal implementation detail and may change. +const DEFAULT_PUSH_RATE: usize = 100; +/// Highest supported rate for [`PushQueuesBuilder::rate`]. +pub const MAX_PUSH_RATE: usize = 10_000; + +// Compile-time guard: DEFAULT_PUSH_RATE must not exceed MAX_PUSH_RATE. +const _: () = assert!(DEFAULT_PUSH_RATE <= MAX_PUSH_RATE); + +/// Priority level for outbound messages. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PushPriority { + High, + Low, +} + +/// Behaviour when a push queue is full. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PushPolicy { + /// Return an error to the caller if the queue is full. + ReturnErrorIfFull, + /// Silently drop the frame. + DropIfFull, + /// Drop the frame but emit a log warning. + WarnAndDropIfFull, +} + +/// Receiver ends of the push queues stored by the connection actor. +pub struct PushQueues { + pub(crate) high_priority_rx: mpsc::Receiver, + pub(crate) low_priority_rx: mpsc::Receiver, +} + +impl PushQueues { + /// Start building a new set of push queues. + #[must_use] + pub fn builder() -> PushQueuesBuilder { PushQueuesBuilder::default() } + + pub(super) fn build_with_rate_dlq( + high_capacity: usize, + low_capacity: usize, + rate: Option, + dlq: Option>, + ) -> Result<(Self, PushHandle), PushConfigError> { + if let Some(r) = rate + && (r == 0 || r > MAX_PUSH_RATE) + { + // Reject unsupported rates early to avoid building queues that cannot + // be used. The bounds prevent runaway resource consumption. + return Err(PushConfigError::InvalidRate(r)); + } + if high_capacity == 0 || low_capacity == 0 { + return Err(PushConfigError::InvalidCapacity { + high: high_capacity, + low: low_capacity, + }); + } + let (high_tx, high_rx) = mpsc::channel(high_capacity); + let (low_tx, low_rx) = mpsc::channel(low_capacity); + let limiter = rate.map(|r| { + RateLimiter::builder() + .initial(r) + .refill(r) + .interval(Duration::from_secs(1)) + .max(r) + .build() + }); + let inner = PushHandleInner { + high_prio_tx: high_tx, + low_prio_tx: low_tx, + limiter, + dlq_tx: dlq, + }; + Ok(( + Self { + high_priority_rx: high_rx, + low_priority_rx: low_rx, + }, + PushHandle::from_arc(Arc::new(inner)), + )) + } + + /// Create a new set of queues with the specified bounds for each priority + /// and return them along with a [`PushHandle`] for producers. + /// + /// # Panics + /// + /// Panics if an internal invariant is violated. This should never occur. + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] + #[must_use] + pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle) { + Self::builder() + .high_capacity(high_capacity) + .low_capacity(low_capacity) + .build() + .expect("invalid capacities or rate in deprecated bounded()") + } + + /// Create queues with no rate limiting. + /// + /// # Panics + /// + /// Panics if an internal invariant is violated. This should never occur. + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] + #[must_use] + pub fn bounded_no_rate_limit( + high_capacity: usize, + low_capacity: usize, + ) -> (Self, PushHandle) { + Self::builder() + .high_capacity(high_capacity) + .low_capacity(low_capacity) + .rate(None) + .build() + .expect("invalid capacities in deprecated bounded_no_rate_limit()") + } + + /// Create queues with a custom rate limit in pushes per second. + /// + /// # Errors + /// + /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater + /// than [`MAX_PUSH_RATE`] and [`PushConfigError::InvalidCapacity`] if either + /// queue capacity is zero. + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] + pub fn bounded_with_rate( + high_capacity: usize, + low_capacity: usize, + rate: Option, + ) -> Result<(Self, PushHandle), PushConfigError> { + Self::builder() + .high_capacity(high_capacity) + .low_capacity(low_capacity) + .rate(rate) + .build() + } + + /// Create queues with a custom rate limit and optional dead letter queue. + /// + /// # Errors + /// + /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater + /// than [`MAX_PUSH_RATE`] and [`PushConfigError::InvalidCapacity`] if either + /// queue capacity is zero. + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] + pub fn bounded_with_rate_dlq( + high_capacity: usize, + low_capacity: usize, + rate: Option, + dlq: Option>, + ) -> Result<(Self, PushHandle), PushConfigError> { + Self::build_with_rate_dlq(high_capacity, low_capacity, rate, dlq) + } + + /// Receive the next frame, preferring high priority frames when available. + /// + /// Returns `None` when both queues are closed and empty. + /// + /// # Examples + /// + /// Note: this method is biased towards high priority traffic and may starve + /// low priority frames if producers saturate the high queue. + /// + /// ```rust,no_run + /// use wireframe::push::{PushPriority, PushQueues}; + /// + /// #[tokio::test] + /// async fn example() { + /// let (mut queues, handle) = PushQueues::::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .build() + /// .expect("failed to build PushQueues"); + /// handle.push_high_priority(2u8).await.expect("push failed"); + /// let (priority, frame) = queues.recv().await.expect("recv failed"); + /// assert_eq!(priority, PushPriority::High); + /// assert_eq!(frame, 2); + /// } + /// ``` + pub async fn recv(&mut self) -> Option<(PushPriority, F)> { + let mut high_closed = false; + let mut low_closed = false; + loop { + tokio::select! { + biased; + res = self.high_priority_rx.recv(), if !high_closed => match res { + Some(f) => return Some((PushPriority::High, f)), + None => high_closed = true, + }, + res = self.low_priority_rx.recv(), if !low_closed => match res { + Some(f) => return Some((PushPriority::Low, f)), + None => low_closed = true, + }, + else => return None, + } + } + } + + /// Close both receivers to prevent further pushes from being accepted. + /// + /// This is primarily used in tests to release resources when no actor is + /// draining the queues. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::push::PushQueues; + /// + /// let (mut queues, _handle) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); + /// queues.close(); + /// ``` + pub fn close(&mut self) { + self.high_priority_rx.close(); + self.low_priority_rx.close(); + } +} diff --git a/src/session.rs b/src/session.rs index 914df45f..68442b3f 100644 --- a/src/session.rs +++ b/src/session.rs @@ -83,7 +83,9 @@ impl SessionRegistry { /// session::{ConnectionId, SessionRegistry}, /// }; /// - /// let (_queues, handle) = PushQueues::::bounded(1, 1); + /// let (_queues, handle) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); /// let registry = SessionRegistry::default(); /// let id = ConnectionId::new(1); /// registry.insert(id, &handle); @@ -109,7 +111,9 @@ impl SessionRegistry { /// session::{ConnectionId, SessionRegistry}, /// }; /// - /// let (_queues, handle) = PushQueues::::bounded(1, 1); + /// let (_queues, handle) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); /// let registry = SessionRegistry::default(); /// let id = ConnectionId::new(2); /// assert!(registry.get(&id).is_none()); @@ -130,7 +134,9 @@ impl SessionRegistry { /// session::{ConnectionId, SessionRegistry}, /// }; /// - /// let (_queues, handle) = PushQueues::::bounded(1, 1); + /// let (_queues, handle) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); /// let registry = SessionRegistry::default(); /// let id = ConnectionId::new(3); /// registry.insert(id, &handle); @@ -154,7 +160,9 @@ impl SessionRegistry { /// }; /// /// let registry = SessionRegistry::default(); - /// let (_queues, handle) = PushQueues::::bounded(1, 1); + /// let (_queues, handle) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); /// let id = ConnectionId::new(4); /// registry.insert(id, &handle); /// drop(handle); @@ -178,8 +186,12 @@ impl SessionRegistry { /// }; /// /// let registry = SessionRegistry::default(); - /// let (_q1, h1) = PushQueues::::bounded(1, 1); - /// let (_q2, h2) = PushQueues::::bounded(1, 1); + /// let (_q1, h1) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); + /// let (_q2, h2) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); /// let id1 = ConnectionId::new(5); /// let id2 = ConnectionId::new(6); /// registry.insert(id1, &h1); @@ -209,8 +221,12 @@ impl SessionRegistry { /// }; /// /// let registry = SessionRegistry::default(); - /// let (_q1, h1) = PushQueues::::bounded(1, 1); - /// let (_q2, h2) = PushQueues::::bounded(1, 1); + /// let (_q1, h1) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); + /// let (_q2, h2) = PushQueues::::builder() + /// .build() + /// .expect("failed to build PushQueues"); /// let id1 = ConnectionId::new(7); /// let id2 = ConnectionId::new(8); /// registry.insert(id1, &h1); diff --git a/tests/advanced/concurrency_loom.rs b/tests/advanced/concurrency_loom.rs index ff997bad..6845d283 100644 --- a/tests/advanced/concurrency_loom.rs +++ b/tests/advanced/concurrency_loom.rs @@ -21,7 +21,11 @@ fn concurrent_push_delivery() { .expect("failed to build tokio runtime"); rt.block_on(async { - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); let token = CancellationToken::new(); let out = loom::sync::Arc::new(loom::sync::Mutex::new(Vec::new())); diff --git a/tests/advanced/interaction_fuzz.rs b/tests/advanced/interaction_fuzz.rs index 99d71632..6aad16b5 100644 --- a/tests/advanced/interaction_fuzz.rs +++ b/tests/advanced/interaction_fuzz.rs @@ -23,7 +23,11 @@ enum Action { } async fn run_actions(actions: &[Action]) -> Vec { - let (queues, handle) = PushQueues::bounded(16, 16); + let (queues, handle) = PushQueues::::builder() + .high_capacity(16) + .low_capacity(16) + .build() + .expect("failed to build PushQueues"); let shutdown = CancellationToken::new(); let mut stream: Option> = None; diff --git a/tests/async_stream.rs b/tests/async_stream.rs index 86f1482f..404ff32b 100644 --- a/tests/async_stream.rs +++ b/tests/async_stream.rs @@ -23,7 +23,11 @@ fn frame_stream() -> impl futures::Stream> { #[rstest] #[tokio::test] async fn async_stream_frames_processed_in_order() { - let (queues, handle) = PushQueues::::bounded(8, 8); + let (queues, handle) = PushQueues::builder() + .high_capacity(8) + .low_capacity(8) + .build() + .expect("failed to build PushQueues"); let shutdown = CancellationToken::new(); let stream: FrameStream = Box::pin(frame_stream()); diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index ef1dbe79..908ff1ec 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -18,24 +18,39 @@ use wireframe::{ use wireframe_testing::push_expect; #[fixture] -#[allow( +#[expect( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::bounded(8, 8) } +// allow(unfulfilled_lint_expectations): rustc occasionally fails to emit the expected +// lint for single-line rstest fixtures on stable. +#[allow(unfulfilled_lint_expectations)] +fn queues() -> (PushQueues, wireframe::push::PushHandle) { + PushQueues::::builder() + .high_capacity(8) + .low_capacity(8) + .build() + .expect("failed to build PushQueues") +} #[fixture] -#[allow( +#[expect( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] +// allow(unfulfilled_lint_expectations): rustc occasionally fails to emit the expected +// lint for single-line rstest fixtures on stable. +#[allow(unfulfilled_lint_expectations)] fn shutdown_token() -> CancellationToken { CancellationToken::new() } #[fixture] -#[allow( +#[expect( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] +// allow(unfulfilled_lint_expectations): rustc occasionally fails to emit the expected +// lint for single-line rstest fixtures on stable. +#[allow(unfulfilled_lint_expectations)] fn empty_stream() -> Option> { None } #[rstest] @@ -380,7 +395,11 @@ async fn interleaved_shutdown_during_stream( #[tokio::test] #[serial] async fn push_queue_exhaustion_backpressure() { - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); push_expect!(handle.push_high_priority(1), "push high-priority"); let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; @@ -467,7 +486,11 @@ async fn graceful_shutdown_waits_for_tasks() { let mut handles = Vec::new(); for _ in 0..5 { - let (queues, handle) = PushQueues::::bounded(1, 1); + let (queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle.clone(), None, token.clone()); handles.push(handle); diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index e515473b..1d7b58ca 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -15,7 +15,11 @@ async fn stream_frames_carry_request_correlation_id() { yield Envelope::new(1, Some(cid), vec![1]); yield Envelope::new(1, Some(cid), vec![2]); }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); let shutdown = CancellationToken::new(); let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); let mut out = Vec::new(); diff --git a/tests/push.rs b/tests/push.rs index ed3f991b..e26c91fe 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -2,15 +2,86 @@ //! //! They cover priority ordering, policy behaviour, and closed queue errors. -use rstest::rstest; +use rstest::{fixture, rstest}; use tokio::time::{self, Duration}; -use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; +use wireframe::push::{ + MAX_PUSH_RATE, + PushConfigError, + PushError, + PushHandle, + PushPolicy, + PushPriority, + PushQueues, +}; use wireframe_testing::{push_expect, recv_expect}; +#[fixture] +fn queues() -> (PushQueues, PushHandle) { + PushQueues::::builder() + .high_capacity(2) + .low_capacity(2) + .rate(Some(1)) + .build() + .expect("failed to build PushQueues") +} + +#[fixture] +fn small_queues() -> (PushQueues, PushHandle) { + PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues") +} + +/// Builder rejects rates outside the supported range. +#[rstest] +#[case::zero(0)] +#[case::too_high(MAX_PUSH_RATE + 1)] +fn builder_rejects_invalid_rate(#[case] rate: usize) { + let result = PushQueues::::builder().rate(Some(rate)).build(); + assert!(matches!(result, Err(PushConfigError::InvalidRate(r)) if r == rate)); +} + +#[test] +fn builder_rejects_zero_capacity() { + #[cfg(debug_assertions)] + { + assert!( + std::panic::catch_unwind(|| { + let _ = PushQueues::::builder().high_capacity(0).build(); + }) + .is_err() + ); + + assert!( + std::panic::catch_unwind(|| { + let _ = PushQueues::::builder().low_capacity(0).build(); + }) + .is_err() + ); + } + + #[cfg(not(debug_assertions))] + { + let hi = PushQueues::::builder().high_capacity(0).build(); + assert!(matches!( + hi, + Err(PushConfigError::InvalidCapacity { high: 0, low: 1 }) + )); + + let lo = PushQueues::::builder().low_capacity(0).build(); + assert!(matches!( + lo, + Err(PushConfigError::InvalidCapacity { high: 1, low: 0 }) + )); + } +} + /// Frames are delivered to queues matching their push priority. #[tokio::test] async fn frames_routed_to_correct_priority_queues() { - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = small_queues(); push_expect!(handle.push_low_priority(1u8)); push_expect!(handle.push_high_priority(2u8)); @@ -27,14 +98,14 @@ async fn frames_routed_to_correct_priority_queues() { /// `try_push` honours the selected queue policy when full. /// /// Using [`PushPolicy::ReturnErrorIfFull`] causes `try_push` to -/// return `PushError::Full` once the queue is at capacity. +/// return [`PushError::QueueFull`] once the queue is at capacity. #[tokio::test] async fn try_push_respects_policy() { - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = small_queues(); push_expect!(handle.push_high_priority(1u8)); let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull); - assert!(result.is_err()); + assert!(matches!(result, Err(PushError::QueueFull))); // drain queue to allow new push let _ = queues.recv().await; @@ -46,9 +117,7 @@ async fn try_push_respects_policy() { /// Push attempts return `Closed` when all queues have been shut down. #[tokio::test] async fn push_queues_error_on_closed() { - let (queues, handle) = PushQueues::bounded(1, 1); - - let mut queues = queues; + let (mut queues, handle) = small_queues(); queues.close(); let res = handle.push_high_priority(42u8).await; assert!(matches!(res, Err(PushError::Closed))); @@ -66,8 +135,7 @@ async fn push_queues_error_on_closed() { #[tokio::test] async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + let (mut queues, handle) = queues(); match priority { PushPriority::High => push_expect!(handle.push_high_priority(1u8)), @@ -100,8 +168,7 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { #[tokio::test] async fn rate_limiter_allows_after_wait() { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + let (mut queues, handle) = queues(); push_expect!(handle.push_high_priority(1u8)); time::advance(Duration::from_secs(1)).await; push_expect!(handle.push_high_priority(2u8)); @@ -117,8 +184,7 @@ async fn rate_limiter_allows_after_wait() { #[tokio::test] async fn rate_limiter_shared_across_priorities() { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + let (mut queues, handle) = queues(); push_expect!(handle.push_high_priority(1u8)); let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; @@ -139,7 +205,12 @@ async fn rate_limiter_shared_across_priorities() { #[tokio::test] async fn unlimited_queues_do_not_block() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_no_rate_limit(1, 1); + let (mut queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .rate(None) + .build() + .expect("failed to build PushQueues"); push_expect!(handle.push_high_priority(1u8)); let res = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(res.is_ok(), "pushes should not block when unlimited"); @@ -154,8 +225,12 @@ async fn unlimited_queues_do_not_block() { #[tokio::test] async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(4, 4, Some(3)).expect("queue creation failed"); + let (mut queues, handle) = PushQueues::::builder() + .high_capacity(4) + .low_capacity(4) + .rate(Some(3)) + .build() + .expect("failed to build PushQueues"); for i in 0u8..3 { push_expect!(handle.push_high_priority(i)); diff --git a/tests/push_policies.rs b/tests/push_policies.rs index 68bfec99..feafe172 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -12,10 +12,11 @@ use wireframe::push::{PushPolicy, PushPriority, PushQueues}; use wireframe_testing::{LoggerHandle, logger}; /// Builds a single-thread [`Runtime`] for async tests. -#[allow( +#[expect( unused_braces, reason = "rustc false positive for single line rstest fixtures" )] +#[allow(unfulfilled_lint_expectations)] #[fixture] fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() @@ -38,7 +39,11 @@ fn push_policy_behaviour( ) { rt.block_on(async { while logger.pop().is_some() {} - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); handle .push_high_priority(1u8) @@ -76,8 +81,13 @@ fn push_policy_behaviour( fn dropped_frame_goes_to_dlq(rt: Runtime) { rt.block_on(async { let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)) - .expect("queue creation failed"); + let (mut queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .rate(None) + .dlq(Some(dlq_tx)) + .build() + .expect("failed to build PushQueues"); handle .push_high_priority(1u8) @@ -135,8 +145,13 @@ fn dlq_error_scenarios( let (dlq_tx, dlq_rx) = mpsc::channel(1); let mut dlq_rx = Some(dlq_rx); setup(&dlq_tx, &mut dlq_rx); - let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)) - .expect("queue creation failed"); + let (mut queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .rate(None) + .dlq(Some(dlq_tx)) + .build() + .expect("failed to build PushQueues"); handle .push_high_priority(1u8) diff --git a/tests/server.rs b/tests/server.rs index f04503ca..353f5283 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -43,7 +43,7 @@ async fn readiness_receiver_dropped() { let server = WireframeServer::new(factory()) .workers(1) .bind_existing_listener(listener) - .unwrap(); + .expect("failed to bind existing listener"); let addr = server.local_addr().expect("local addr missing"); // Create channel and immediately drop receiver to force send failure diff --git a/tests/session_registry.rs b/tests/session_registry.rs index 12cede5e..065c5550 100644 --- a/tests/session_registry.rs +++ b/tests/session_registry.rs @@ -6,18 +6,26 @@ use wireframe::{ }; #[fixture] -#[allow( +#[expect( unused_braces, - reason = "rustc false positive for single line rstest fixtures" + reason = "rustc false positive for single-line rstest fixtures" )] +#[allow(unfulfilled_lint_expectations)] fn registry() -> SessionRegistry { SessionRegistry::default() } #[fixture] -#[allow( +#[expect( unused_braces, - reason = "rustc false positive for single line rstest fixtures" + reason = "rustc false positive for single-line rstest fixtures" )] -fn push_setup() -> (PushQueues, PushHandle) { PushQueues::bounded(1, 1) } +#[allow(unfulfilled_lint_expectations)] +fn push_setup() -> (PushQueues, PushHandle) { + PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues") +} /// Test that handles can be retrieved whilst the connection remains alive. #[rstest] diff --git a/tests/stream_end.rs b/tests/stream_end.rs index 383e2b72..fc3426c8 100644 --- a/tests/stream_end.rs +++ b/tests/stream_end.rs @@ -2,12 +2,12 @@ use std::sync::Arc; use async_stream::try_stream; -use rstest::rstest; +use rstest::{fixture, rstest}; use tokio_util::sync::CancellationToken; use wireframe::{ connection::ConnectionActor, hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}, - push::PushQueues, + push::{PushHandle, PushQueues}, response::FrameStream, }; @@ -15,6 +15,15 @@ use wireframe::{ mod terminator; use terminator::Terminator; +#[fixture] +fn queues() -> (PushQueues, PushHandle) { + PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues") +} + #[rstest] #[tokio::test] async fn emits_end_frame() { @@ -23,7 +32,7 @@ async fn emits_end_frame() { yield 2; }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = queues(); let shutdown = CancellationToken::new(); let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); @@ -51,7 +60,7 @@ async fn emits_no_end_frame_when_none() { yield 8; }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = queues(); let shutdown = CancellationToken::new(); let hooks = ProtocolHooks::from_protocol(&Arc::new(NoTerminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs index 774cb79e..ed1d354a 100644 --- a/tests/wireframe_protocol.rs +++ b/tests/wireframe_protocol.rs @@ -56,7 +56,11 @@ async fn builder_produces_protocol_hooks() { .with_protocol(protocol); let mut hooks = app.protocol_hooks(); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::>::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); hooks.on_connection_setup(handle, &mut ConnectionContext); drop(queues); // silence unused warnings @@ -80,7 +84,11 @@ async fn connection_actor_uses_protocol_from_builder() { .with_protocol(protocol); let hooks = app.protocol_hooks(); - let (queues, handle) = PushQueues::bounded(8, 8); + let (queues, handle) = PushQueues::>::builder() + .high_capacity(8) + .low_capacity(8) + .build() + .expect("failed to build PushQueues"); handle .push_high_priority(vec![1]) .await diff --git a/tests/world.rs b/tests/world.rs index 9162a876..f94db266 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -142,7 +142,11 @@ impl CorrelationWorld { yield Envelope::new(1, Some(cid), vec![1]); yield Envelope::new(1, Some(cid), vec![2]); }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); let shutdown = CancellationToken::new(); let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); actor.run(&mut self.frames).await.expect("actor run failed"); @@ -180,7 +184,11 @@ impl StreamEndWorld { yield 2u8; }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .expect("failed to build PushQueues"); let shutdown = CancellationToken::new(); let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks);