Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions tests/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rstest::rstest;
use tokio::time::{self, Duration};
use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};

/// Frames are delivered to queues matching their push priority.
#[tokio::test]
async fn frames_routed_to_correct_priority_queues() {
let (mut queues, handle) = PushQueues::bounded(1, 1);
Expand All @@ -22,6 +23,10 @@ async fn frames_routed_to_correct_priority_queues() {
assert_eq!(frame2, 1);
}

/// `try_push` honours the selected queue policy when full.
///
/// Using [`PushPolicy::ReturnErrorIfFull`] causes `try_push` to
/// return `PushError::Full` once the queue is at capacity.
#[tokio::test]
async fn try_push_respects_policy() {
let (mut queues, handle) = PushQueues::bounded(1, 1);
Expand All @@ -37,6 +42,7 @@ async fn try_push_respects_policy() {
assert_eq!(last, 3);
}

/// Push attempts return `Closed` when all queues have been shut down.
#[tokio::test]
async fn push_queues_error_on_closed() {
let (queues, handle) = PushQueues::bounded(1, 1);
Expand All @@ -50,6 +56,7 @@ async fn push_queues_error_on_closed() {
assert!(matches!(res, Err(PushError::Closed)));
}

/// A push beyond the configured rate is blocked.
#[rstest]
#[case::high(PushPriority::High)]
#[case::low(PushPriority::Low)]
Expand Down Expand Up @@ -85,6 +92,7 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) {
assert_eq!((first, second), (1, 3));
}

/// Exceeding the rate limit succeeds after the window has passed.
#[tokio::test]
async fn rate_limiter_allows_after_wait() {
time::pause();
Expand All @@ -98,6 +106,7 @@ async fn rate_limiter_allows_after_wait() {
assert_eq!((a, b), (1, 2));
}

/// The limiter counts pushes from all priority queues.
#[tokio::test]
async fn rate_limiter_shared_across_priorities() {
time::pause();
Expand All @@ -118,6 +127,7 @@ async fn rate_limiter_shared_across_priorities() {
assert_eq!(frame2, 2);
}

/// Unlimited queues never block pushes.
#[tokio::test]
async fn unlimited_queues_do_not_block() {
time::pause();
Expand All @@ -131,6 +141,7 @@ async fn unlimited_queues_do_not_block() {
assert_eq!((a, b), (1, 2));
}

/// A burst up to capacity succeeds and further pushes are blocked.
#[tokio::test]
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
time::pause();
Expand Down
16 changes: 12 additions & 4 deletions tests/push_policies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::{
use wireframe::push::{PushPolicy, PushPriority, PushQueues};
use wireframe_testing::{LoggerHandle, logger};

/// Builds a single-thread [`Runtime`] for async tests.
#[allow(
unused_braces,
reason = "rustc false positive for single line rstest fixtures"
Expand All @@ -23,6 +24,7 @@ fn rt() -> Runtime {
.expect("failed to build test runtime")
}

/// Verifies how queue policies log and drop when the queue is full.
#[rstest]
#[case::drop_if_full(PushPolicy::DropIfFull, false, "push queue full")]
#[case::warn_and_drop(PushPolicy::WarnAndDropIfFull, true, "push queue full")]
Expand Down Expand Up @@ -64,6 +66,7 @@ fn push_policy_behaviour(
});
}

/// Dropped frames are forwarded to the dead letter queue.
#[rstest]
fn dropped_frame_goes_to_dlq(rt: Runtime) {
rt.block_on(async {
Expand All @@ -82,12 +85,15 @@ fn dropped_frame_goes_to_dlq(rt: Runtime) {
});
}

fn setup_dlq_full(tx: &mpsc::Sender<u8>, _rx: &mut Option<mpsc::Receiver<u8>>) {
/// Preloads the DLQ to simulate a full queue.
fn fill_dlq(tx: &mpsc::Sender<u8>, _rx: &mut Option<mpsc::Receiver<u8>>) {
tx.try_send(99).unwrap();
}

fn setup_dlq_closed(_: &mpsc::Sender<u8>, rx: &mut Option<mpsc::Receiver<u8>>) { drop(rx.take()); }
/// Drops the receiver to simulate a closed DLQ channel.
fn close_dlq(_: &mpsc::Sender<u8>, rx: &mut Option<mpsc::Receiver<u8>>) { drop(rx.take()); }

/// Asserts that one message is queued and the DLQ then reports empty.
fn assert_dlq_full(rx: &mut Option<mpsc::Receiver<u8>>) -> BoxFuture<'_, ()> {
Box::pin(async move {
let receiver = rx.as_mut().expect("receiver missing");
Expand All @@ -96,11 +102,13 @@ fn assert_dlq_full(rx: &mut Option<mpsc::Receiver<u8>>) -> BoxFuture<'_, ()> {
})
}

/// Confirms no receiver is present when the DLQ is closed.
fn assert_dlq_closed(_: &mut Option<mpsc::Receiver<u8>>) -> BoxFuture<'_, ()> { Box::pin(async {}) }

/// Parameterised checks for error logs when DLQ interactions fail.
#[rstest]
#[case::dlq_full(setup_dlq_full, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)]
#[case::dlq_closed(setup_dlq_closed, PushPolicy::DropIfFull, "closed", assert_dlq_closed)]
#[case::dlq_full(fill_dlq, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)]
#[case::dlq_closed(close_dlq, PushPolicy::DropIfFull, "closed", assert_dlq_closed)]
#[serial(push_policies)]
fn dlq_error_scenarios<Setup, AssertFn>(
rt: Runtime,
Expand Down
1 change: 1 addition & 0 deletions wireframe_testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ bincode = "^2.0"
bytes = "^1.0"
rstest = "0.18.2"
logtest = "2"
log = "0.4"
2 changes: 1 addition & 1 deletion wireframe_testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! use wireframe::app::WireframeApp;
//! use wireframe_testing::drive_with_bincode;
//!
//! # async fn example(app: WireframeApp<_, _, ()>) {
//! # async fn example(app: WireframeApp) {
//! let bytes = drive_with_bincode(app, 42u8).await.unwrap();
//! # }
Comment on lines +11 to 13
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Update surrounding prose to match simplified generic

The removed generics narrow reader focus, but the preceding paragraph still talks about “generic parameters”. Remove or adjust that wording for consistency.

🤖 Prompt for AI Agents
In wireframe_testing/src/lib.rs around lines 11 to 13, the documentation still
refers to "generic parameters" even though the code example has been simplified
to remove generics. Update the surrounding prose to remove or rephrase mentions
of generic parameters so it accurately reflects the simplified example and
maintains consistency.

//! ```
Expand Down
35 changes: 34 additions & 1 deletion wireframe_testing/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
//! This module provides a global, thread-safe logger handle for capturing and
//! inspecting log output during tests. The [`LoggerHandle`] ensures exclusive
//! access to prevent interference between concurrent tests.
//!
//! ```
//! use wireframe_testing::logger;
//!
//! #[tokio::test]
//! async fn logs_are_collected() {
//! let mut log = logger();
//! log::info!("example");
//! assert!(log.pop().is_some());
//! }
//! ```

use std::sync::{Mutex, MutexGuard, OnceLock};

Expand All @@ -11,13 +22,34 @@ use rstest::fixture;
/// Handle to the global logger with exclusive access.
///
/// This guard ensures tests do not interfere with each other's log capture by
/// serialising access to a [`logtest::Logger`].
/// serialising access to a [`logtest::Logger`]. Acquire it using [`logger`] or
/// [`LoggerHandle::new`].
///
/// ```
/// use wireframe_testing::logger;
/// # use log::warn;
///
/// let mut log = logger();
/// warn!("warned");
/// assert!(log.pop().is_some());
/// assert!(log.pop().is_none());
/// ```
pub struct LoggerHandle {
guard: MutexGuard<'static, Logger>,
}

impl LoggerHandle {
/// Acquire the global [`Logger`] instance.
///
/// ```no_run
/// use wireframe_testing::LoggerHandle;
/// # use log::info;
///
/// let mut log = LoggerHandle::new();
/// info!("init");
/// assert!(log.pop().is_some());
/// assert!(log.pop().is_none());
/// ```
pub fn new() -> Self {
static LOGGER: OnceLock<Mutex<Logger>> = OnceLock::new();

Expand All @@ -38,6 +70,7 @@ impl std::ops::DerefMut for LoggerHandle {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard }
}

/// rstest fixture returning a [`LoggerHandle`] for log assertions.
#[allow(
unused_braces,
reason = "rustc false positive for single line rstest fixtures"
Expand Down