Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 54 additions & 19 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,15 @@ where
C: Send + 'static,
E: Packet,
{
/// Creates a new `WireframeApp` with no routes, services, middleware, or application data.
///
/// Initialises empty routes, services, middleware, and application data.
/// Sets the default frame processor and serializer, with no connection
/// lifecycle hooks.
/// The default frame processor and serializer are set, and no connection lifecycle hooks or dead letter queue are configured.
///
/// # Examples
///
/// ```no_run
/// let app = WireframeApp::<_, (), Envelope>::default();
/// ```
fn default() -> Self {
Self {
routes: HashMap::new(),
Expand Down Expand Up @@ -330,22 +335,29 @@ where
Ok(self)
}

/// Register a callback invoked when a new connection is established.
/// Registers an asynchronous callback to be invoked when a new connection is established.
///
/// The callback can perform authentication or other setup tasks and
/// returns connection-specific state stored for the connection's
/// lifetime.
/// The callback can perform authentication or setup tasks and returns connection-specific state,
/// which is stored for the lifetime of the connection. This method changes the connection state
/// type parameter from `C` to `C2`, so subsequent builder methods will operate on the new state type.
///
/// # Type Parameters
///
/// This method changes the connection state type parameter from `C` to `C2`.
/// This means that any subsequent builder methods will operate on the new connection state type
/// `C2`. Be aware of this type transition when chaining builder methods.
/// - `C2`: The new connection state type returned by the setup callback.
///
/// # Errors
/// # Returns
///
/// This function always succeeds currently but uses [`Result`] for
/// consistency with other builder methods.
/// Returns a new `WireframeApp` builder with the updated connection state type.
///
/// # Examples
///
/// ```no_run
/// use wireframe::WireframeApp;
///
/// let app = WireframeApp::default()
/// .on_connection_setup(|| async { /* perform setup */ 42 })
/// .unwrap();
/// ```
pub fn on_connection_setup<F, Fut, C2>(self, f: F) -> Result<WireframeApp<S, C2, E>>
where
F: Fn() -> Fut + Send + Sync + 'static,
Expand Down Expand Up @@ -384,11 +396,21 @@ where
Ok(self)
}

/// Install a [`WireframeProtocol`] implementation.
/// Installs a custom `WireframeProtocol` for connection and frame lifecycle hooks.
///
/// The provided protocol is wrapped in an `Arc` and stored for use by the connection actor,
/// enabling custom behaviour for connection setup, frame modification, and command completion.
///
/// The protocol defines hooks for connection setup, frame modification, and
/// command completion. It is wrapped in an [`Arc`] and stored for later use
/// by the connection actor.
/// # Returns
///
/// The updated builder with the protocol installed.
///
/// # Examples
///
/// ```no_run
/// let app = WireframeApp::default()
/// .with_protocol(MyProtocol::new());
/// ```
#[must_use]
pub fn with_protocol<P>(mut self, protocol: P) -> Self
where
Expand All @@ -398,7 +420,12 @@ where
self
}

/// Configure a Dead Letter Queue for dropped push frames.
/// Sets a Dead Letter Queue (DLQ) channel to receive dropped push frames.
///
/// This allows the application to capture or forward push frames that could not be delivered,
/// by sending their raw bytes to the provided asynchronous channel.
///
/// # Examples
///
/// ```rust,no_run
/// use tokio::sync::mpsc;
Expand Down Expand Up @@ -448,7 +475,15 @@ where
self
}

/// Replace the serializer used for messages.
/// Sets a custom serializer for message encoding and decoding, returning a new builder instance with the specified serializer.
///
/// # Examples
///
/// ```no_run
/// use mycrate::{WireframeApp, BincodeSerializer};
///
/// let app = WireframeApp::default().serializer(BincodeSerializer::default());
/// ```
#[must_use]
pub fn serializer<Ser>(self, serializer: Ser) -> WireframeApp<Ser, C, E>
where
Expand Down
63 changes: 37 additions & 26 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub enum PushConfigError {
}

impl std::fmt::Display for PushConfigError {
/// Formats a `PushConfigError` for display, providing details about the invalid rate value.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidRate(r) => {
Expand Down Expand Up @@ -139,9 +140,9 @@ impl<F: FrameLike> PushHandle<F> {
self.push_with_priority(frame, PushPriority::High).await
}

/// Push a low-priority frame subject to rate limiting.
/// Pushes a low-priority frame to the queue, applying rate limiting if configured.
///
/// Awaits if the rate limiter has no available tokens or the queue is full.
/// This method waits if the rate limiter has no available tokens or if the low-priority queue is full. The frame is sent with low priority and will be delivered after any high-priority frames.
///
/// # Errors
///
Expand All @@ -154,7 +155,7 @@ impl<F: FrameLike> PushHandle<F> {
///
/// #[tokio::test]
/// async fn example() {
/// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1));
/// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)).unwrap();
/// handle.push_low_priority(10u8).await.unwrap();
/// let (priority, frame) = queues.recv().await.unwrap();
/// assert_eq!(priority, PushPriority::Low);
Expand All @@ -165,7 +166,17 @@ impl<F: FrameLike> PushHandle<F> {
self.push_with_priority(frame, PushPriority::Low).await
}

/// Send a frame to the configured dead letter queue if available.
/// Attempts to send a frame to the configured dead letter queue (DLQ), if present.
///
/// If the DLQ is full or closed, logs an error indicating the frame was lost.
///
/// # Examples
///
/// ```no_run
/// // Assume `handle` is a PushHandle with a configured DLQ.
/// handle.route_to_dlq(frame);
/// // If the DLQ is full or closed, an error is logged and the frame is dropped.
/// ```
fn route_to_dlq(&self, frame: F) {
if let Some(dlq) = &self.0.dlq_tx {
match dlq.try_send(frame) {
Expand All @@ -180,15 +191,13 @@ impl<F: FrameLike> PushHandle<F> {
}
}

/// Attempt to push a frame with the given priority and policy.
/// Attempts to push a frame to the queue with the specified priority and policy.
///
/// If the queue is full, the behaviour depends on the provided policy: an error is returned, the frame is dropped, or a warning is logged and the frame is dropped. Dropped frames are routed to the dead letter queue if one is configured.
///
/// # Errors
///
/// Returns [`PushError::QueueFull`] if the queue is full and the policy is
/// [`PushPolicy::ReturnErrorIfFull`]. Returns [`PushError::Closed`] if the
/// receiving end has been dropped. When [`PushPolicy::DropIfFull`] or
/// [`PushPolicy::WarnAndDropIfFull`] is used, a configured dead letter queue
/// receives the dropped frame.
/// Returns [`PushError::QueueFull`] if the queue is full and the policy is [`PushPolicy::ReturnErrorIfFull`]. Returns [`PushError::Closed`] if the receiving end has been dropped. When [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] is used, a configured dead letter queue receives the dropped frame.
///
/// # Examples
///
Expand Down Expand Up @@ -249,8 +258,9 @@ pub struct PushQueues<F> {
}

impl<F: FrameLike> PushQueues<F> {
/// Create a new set of queues with the specified bounds for each priority
/// and return them along with a [`PushHandle`] for producers.
/// Creates a new set of bounded push queues for high and low priority frames, returning the queues and a [`PushHandle`] for producers.
///
/// The queues are rate-limited to the default push rate. Use this when you want simple bounded queues with prioritisation and default rate limiting.
///
/// # Examples
///
Expand All @@ -270,13 +280,14 @@ impl<F: FrameLike> PushQueues<F> {
/// # Panics
///
/// Panics if an internal invariant is violated. This should never occur.
#[must_use]
pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None)
.expect("DEFAULT_PUSH_RATE is always valid")
}

/// Create queues with no rate limiting.
/// Creates high- and low-priority push queues with bounded capacity and no rate limiting.
///
/// Returns a tuple containing the push queues and a handle for pushing frames. Both queues are bounded by the specified capacities, and no rate limiting is applied to frame pushes.
///
/// # Examples
///
Expand All @@ -298,16 +309,17 @@ impl<F: FrameLike> PushQueues<F> {
Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None).unwrap()
}

/// Create queues with a custom rate limit in pushes per second.
/// Creates prioritised push queues with an optional global rate limit.
///
/// The limiter enforces fairness by allowing at most `rate` pushes
/// per second across all producers for the returned [`PushHandle`].
/// Pass `None` to disable rate limiting entirely.
/// The returned queues support high and low priority channels. If `rate` is
/// specified, it limits the total number of pushes per second across all
/// producers using the associated [`PushHandle`]. Passing `None` disables
/// rate limiting entirely.
///
/// # Errors
///
/// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater
/// than [`MAX_PUSH_RATE`].
/// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or exceeds
/// [`MAX_PUSH_RATE`].
///
/// # Examples
///
Expand All @@ -330,16 +342,15 @@ impl<F: FrameLike> PushQueues<F> {
Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None)
}

/// Create queues with a custom rate limit and optional dead letter queue.
/// Creates prioritised push queues with optional rate limiting and dead letter queue support.
///
/// Frames that would be dropped by [`try_push`](PushHandle::try_push) when
/// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`]
/// are routed to `dlq` if provided.
/// Frames that would be dropped by [`try_push`](PushHandle::try_push) under
/// [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] are routed to the provided
/// dead letter queue (`dlq`) if supplied.
///
/// # Errors
///
/// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater
/// than [`MAX_PUSH_RATE`].
/// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or exceeds [`MAX_PUSH_RATE`].
///
/// # Examples
///
Expand Down
99 changes: 98 additions & 1 deletion tests/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ async fn push_queues_error_on_closed() {
assert!(matches!(res, Err(PushError::Closed)));
}

/// Tests that the rate limiter blocks pushes exceeding the configured rate limit for both
/// high and low priorities.
///
/// After pushing one frame, a second push attempt is made and expected to time out,
/// demonstrating that the rate limiter enforces blocking. Advancing time allows a
/// subsequent push, and the test verifies that only the allowed frames are received.
///
/// # Examples
///
/// ```no_run
/// // This test is parameterised and runs for both high and low priorities.
/// // It verifies that the rate limiter blocks excess pushes and allows them after waiting.
/// ```
#[rstest]
#[case::high(PushPriority::High)]
#[case::low(PushPriority::Low)]
Expand Down Expand Up @@ -85,6 +98,29 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) {
assert_eq!((first, second), (1, 3));
}

/// Tests that the rate limiter allows pushes after the wait interval has elapsed.
///
/// This test verifies that after pushing a frame and waiting for the rate limiter's interval,
/// a subsequent push is permitted. It ensures that both frames are received in the correct order.
///
/// # Examples
///
/// ```no_run
/// # use wireframe::push::PushQueues;
/// # use tokio::time::{self, Duration};
/// # #[tokio::main]
/// # async fn main() {
/// time::pause();
/// let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap();
/// handle.push_high_priority(1u8).await.unwrap();
/// time::advance(Duration::from_secs(1)).await;
/// handle.push_high_priority(2u8).await.unwrap();
///
/// let (_, a) = queues.recv().await.unwrap();
/// let (_, b) = queues.recv().await.unwrap();
/// assert_eq!((a, b), (1, 2));
/// # }
/// ```
#[tokio::test]
async fn rate_limiter_allows_after_wait() {
time::pause();
Expand All @@ -98,6 +134,38 @@ async fn rate_limiter_allows_after_wait() {
assert_eq!((a, b), (1, 2));
}

/// Tests that the rate limiter is enforced across both high and low priority queues.
///
/// Verifies that after pushing a high priority frame, a low priority push attempt blocks
/// until the rate limiter interval elapses, after which the push succeeds. Ensures that
/// the rate limiter is shared between priorities and that frames are received in the
/// correct order.
///
/// # Examples
///
/// ```no_run
/// # use wireframe::push::{PushQueues, PushPriority};
/// # use tokio::time::{self, Duration};
/// # #[tokio::main]
/// # async fn main() {
/// time::pause();
/// let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap();
/// handle.push_high_priority(1u8).await.unwrap();
///
/// let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await;
/// assert!(attempt.is_err());
///
/// time::advance(Duration::from_secs(1)).await;
/// handle.push_low_priority(2u8).await.unwrap();
///
/// let (prio1, frame1) = queues.recv().await.unwrap();
/// let (prio2, frame2) = queues.recv().await.unwrap();
/// assert_eq!(prio1, PushPriority::High);
/// assert_eq!(frame1, 1);
/// assert_eq!(prio2, PushPriority::Low);
/// assert_eq!(frame2, 2);
/// # }
/// ```
#[tokio::test]
async fn rate_limiter_shared_across_priorities() {
time::pause();
Expand Down Expand Up @@ -131,7 +199,36 @@ async fn unlimited_queues_do_not_block() {
assert_eq!((a, b), (1, 2));
}

#[tokio::test]
/// Tests that the rate limiter allows a burst of pushes within its configured capacity and blocks any excess until the interval elapses.
///
/// This test pushes frames up to the burst capacity, verifies that an additional push is rate limited (blocked), then advances time to allow further pushes. It confirms that all frames are received in the expected order.
///
/// # Examples
///
/// ```no_run
/// # use wireframe::push::{PushQueues, PushPriority};
/// # use tokio::time::{self, Duration};
/// # #[tokio::main]
/// # async fn main() {
/// time::pause();
/// let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3)).unwrap();
///
/// for i in 0u8..3 {
/// handle.push_high_priority(i).await.unwrap();
/// }
///
/// let res = time::timeout(Duration::from_millis(10), handle.push_high_priority(99)).await;
/// assert!(res.is_err());
///
/// time::advance(Duration::from_secs(1)).await;
/// handle.push_high_priority(100).await.unwrap();
///
/// for expected in [0u8, 1u8, 2u8, 100u8] {
/// let (_, frame) = queues.recv().await.unwrap();
/// assert_eq!(frame, expected);
/// }
/// # }
/// ```
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
time::pause();
let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3)).unwrap();
Expand Down
Loading