diff --git a/Cargo.lock b/Cargo.lock index af9d432b..505831ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,6 +1290,7 @@ version = "0.1.0" dependencies = [ "bincode", "bytes", + "log", "logtest", "rstest", "tokio", diff --git a/tests/push.rs b/tests/push.rs index 05fe000e..b1ad0d08 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -6,6 +6,7 @@ use rstest::rstest; use tokio::time::{self, Duration}; use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; +/// Frames are delivered to queues matching their push priority. #[tokio::test] async fn frames_routed_to_correct_priority_queues() { let (mut queues, handle) = PushQueues::bounded(1, 1); @@ -22,6 +23,10 @@ async fn frames_routed_to_correct_priority_queues() { assert_eq!(frame2, 1); } +/// `try_push` honours the selected queue policy when full. +/// +/// Using [`PushPolicy::ReturnErrorIfFull`] causes `try_push` to +/// return `PushError::Full` once the queue is at capacity. #[tokio::test] async fn try_push_respects_policy() { let (mut queues, handle) = PushQueues::bounded(1, 1); @@ -37,6 +42,7 @@ async fn try_push_respects_policy() { assert_eq!(last, 3); } +/// Push attempts return `Closed` when all queues have been shut down. #[tokio::test] async fn push_queues_error_on_closed() { let (queues, handle) = PushQueues::bounded(1, 1); @@ -50,6 +56,7 @@ async fn push_queues_error_on_closed() { assert!(matches!(res, Err(PushError::Closed))); } +/// A push beyond the configured rate is blocked. #[rstest] #[case::high(PushPriority::High)] #[case::low(PushPriority::Low)] @@ -85,6 +92,7 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { assert_eq!((first, second), (1, 3)); } +/// Exceeding the rate limit succeeds after the window has passed. #[tokio::test] async fn rate_limiter_allows_after_wait() { time::pause(); @@ -98,6 +106,7 @@ async fn rate_limiter_allows_after_wait() { assert_eq!((a, b), (1, 2)); } +/// The limiter counts pushes from all priority queues. #[tokio::test] async fn rate_limiter_shared_across_priorities() { time::pause(); @@ -118,6 +127,7 @@ async fn rate_limiter_shared_across_priorities() { assert_eq!(frame2, 2); } +/// Unlimited queues never block pushes. #[tokio::test] async fn unlimited_queues_do_not_block() { time::pause(); @@ -131,6 +141,7 @@ async fn unlimited_queues_do_not_block() { assert_eq!((a, b), (1, 2)); } +/// A burst up to capacity succeeds and further pushes are blocked. #[tokio::test] async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { time::pause(); diff --git a/tests/push_policies.rs b/tests/push_policies.rs index a5e334ed..e0173bc6 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -11,6 +11,7 @@ use tokio::{ use wireframe::push::{PushPolicy, PushPriority, PushQueues}; use wireframe_testing::{LoggerHandle, logger}; +/// Builds a single-thread [`Runtime`] for async tests. #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures" @@ -23,6 +24,7 @@ fn rt() -> Runtime { .expect("failed to build test runtime") } +/// Verifies how queue policies log and drop when the queue is full. #[rstest] #[case::drop_if_full(PushPolicy::DropIfFull, false, "push queue full")] #[case::warn_and_drop(PushPolicy::WarnAndDropIfFull, true, "push queue full")] @@ -64,6 +66,7 @@ fn push_policy_behaviour( }); } +/// Dropped frames are forwarded to the dead letter queue. #[rstest] fn dropped_frame_goes_to_dlq(rt: Runtime) { rt.block_on(async { @@ -82,12 +85,15 @@ fn dropped_frame_goes_to_dlq(rt: Runtime) { }); } -fn setup_dlq_full(tx: &mpsc::Sender, _rx: &mut Option>) { +/// Preloads the DLQ to simulate a full queue. +fn fill_dlq(tx: &mpsc::Sender, _rx: &mut Option>) { tx.try_send(99).unwrap(); } -fn setup_dlq_closed(_: &mpsc::Sender, rx: &mut Option>) { drop(rx.take()); } +/// Drops the receiver to simulate a closed DLQ channel. +fn close_dlq(_: &mpsc::Sender, rx: &mut Option>) { drop(rx.take()); } +/// Asserts that one message is queued and the DLQ then reports empty. fn assert_dlq_full(rx: &mut Option>) -> BoxFuture<'_, ()> { Box::pin(async move { let receiver = rx.as_mut().expect("receiver missing"); @@ -96,11 +102,13 @@ fn assert_dlq_full(rx: &mut Option>) -> BoxFuture<'_, ()> { }) } +/// Confirms no receiver is present when the DLQ is closed. fn assert_dlq_closed(_: &mut Option>) -> BoxFuture<'_, ()> { Box::pin(async {}) } +/// Parameterised checks for error logs when DLQ interactions fail. #[rstest] -#[case::dlq_full(setup_dlq_full, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)] -#[case::dlq_closed(setup_dlq_closed, PushPolicy::DropIfFull, "closed", assert_dlq_closed)] +#[case::dlq_full(fill_dlq, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)] +#[case::dlq_closed(close_dlq, PushPolicy::DropIfFull, "closed", assert_dlq_closed)] #[serial(push_policies)] fn dlq_error_scenarios( rt: Runtime, diff --git a/wireframe_testing/Cargo.toml b/wireframe_testing/Cargo.toml index 5d70db84..832e269c 100644 --- a/wireframe_testing/Cargo.toml +++ b/wireframe_testing/Cargo.toml @@ -10,3 +10,4 @@ bincode = "^2.0" bytes = "^1.0" rstest = "0.18.2" logtest = "2" +log = "0.4" diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 11bf893d..8657b8e2 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -8,7 +8,7 @@ //! use wireframe::app::WireframeApp; //! use wireframe_testing::drive_with_bincode; //! -//! # async fn example(app: WireframeApp<_, _, ()>) { +//! # async fn example(app: WireframeApp) { //! let bytes = drive_with_bincode(app, 42u8).await.unwrap(); //! # } //! ``` diff --git a/wireframe_testing/src/logging.rs b/wireframe_testing/src/logging.rs index 2e1193d0..0f312e13 100644 --- a/wireframe_testing/src/logging.rs +++ b/wireframe_testing/src/logging.rs @@ -3,6 +3,17 @@ //! This module provides a global, thread-safe logger handle for capturing and //! inspecting log output during tests. The [`LoggerHandle`] ensures exclusive //! access to prevent interference between concurrent tests. +//! +//! ``` +//! use wireframe_testing::logger; +//! +//! #[tokio::test] +//! async fn logs_are_collected() { +//! let mut log = logger(); +//! log::info!("example"); +//! assert!(log.pop().is_some()); +//! } +//! ``` use std::sync::{Mutex, MutexGuard, OnceLock}; @@ -11,13 +22,34 @@ use rstest::fixture; /// Handle to the global logger with exclusive access. /// /// This guard ensures tests do not interfere with each other's log capture by -/// serialising access to a [`logtest::Logger`]. +/// serialising access to a [`logtest::Logger`]. Acquire it using [`logger`] or +/// [`LoggerHandle::new`]. +/// +/// ``` +/// use wireframe_testing::logger; +/// # use log::warn; +/// +/// let mut log = logger(); +/// warn!("warned"); +/// assert!(log.pop().is_some()); +/// assert!(log.pop().is_none()); +/// ``` pub struct LoggerHandle { guard: MutexGuard<'static, Logger>, } impl LoggerHandle { /// Acquire the global [`Logger`] instance. + /// + /// ```no_run + /// use wireframe_testing::LoggerHandle; + /// # use log::info; + /// + /// let mut log = LoggerHandle::new(); + /// info!("init"); + /// assert!(log.pop().is_some()); + /// assert!(log.pop().is_none()); + /// ``` pub fn new() -> Self { static LOGGER: OnceLock> = OnceLock::new(); @@ -38,6 +70,7 @@ impl std::ops::DerefMut for LoggerHandle { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard } } +/// rstest fixture returning a [`LoggerHandle`] for log assertions. #[allow( unused_braces, reason = "rustc false positive for single line rstest fixtures"