From 622959775e733679c02ca7f3f36da572710ae293 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 8 Jul 2025 05:40:50 +0100 Subject: [PATCH 1/9] Add dead letter queue support for push queues --- ...asynchronous-outbound-messaging-roadmap.md | 2 +- docs/rust-testing-with-rstest-fixtures.md | 42 +++++----- ...eframe-1-0-detailed-development-roadmap.md | 72 ++++++++++-------- src/app.rs | 28 ++++++- src/push.rs | 76 ++++++++++++++++--- tests/push_policies.rs | 54 +++++++++++++ 6 files changed, 209 insertions(+), 65 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index c1df5585..52bf71a9 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -44,7 +44,7 @@ design documents. ([Design §5][design-errors]). - [x] **Per-connection rate limiting** on pushes via a token bucket ([Resilience Guide §4.1][resilience-rate]). -- [ ] **Optional Dead Letter Queue** for full queues +- [x] **Optional Dead Letter Queue** for full queues ([Design §5.2][design-dlq]). ## 4. Observability and Quality Assurance diff --git a/docs/rust-testing-with-rstest-fixtures.md b/docs/rust-testing-with-rstest-fixtures.md index 0d2b90c3..4cf47893 100644 --- a/docs/rust-testing-with-rstest-fixtures.md +++ b/docs/rust-testing-with-rstest-fixtures.md @@ -540,10 +540,10 @@ When using `#[once]`, there are critical warnings: the end of the test suite. This makes `#[once]` fixtures best suited for truly passive data or resources whose cleanup is managed by the operating system upon process exit. -1. **Functional Limitations:** `#[once]` fixtures cannot be `async` functions +2. **Functional Limitations:** `#[once]` fixtures cannot be `async` functions and cannot be generic functions (neither with generic type parameters nor using `impl Trait` in arguments or return types). -1. **Attribute Propagation:** `rstest` macros currently drop `#[expect]` +3. **Attribute Propagation:** `rstest` macros currently drop `#[expect]` attributes. If you rely on lint expectations, use `#[allow]` instead to silence false positives. @@ -1168,13 +1168,13 @@ The following table summarizes key differences: **Table 1:** `rstest` **vs. Standard Rust** `#[test]` **for Fixture Management and Parameterization** -| Feature | Standard #[test] Approach | rstest Approach | +| Feature | Standard #[test] Approach | rstest Approach | | ------------------------------------------------------------- | ------------------------------------------------------------- | -------------------------------------------------------------------------------- | -| Fixture Injection | Manual calls to setup functions within each test. | Fixture name as argument in #[rstest] function; fixture defined with #[fixture]. | -| Parameterized Tests (Specific Cases) | Loop inside one test, or multiple distinct #[test] functions. | #[case(...)] attributes on #[rstest] function. | -| Parameterized Tests (Value Combinations) | Nested loops inside one test, or complex manual generation. | #[values(...)] attributes on arguments of #[rstest] function. | -| Async Fixture Setup | Manual async block and .await calls inside test. | async fn fixtures, with #[future] and #[awt] for ergonomic `.await`ing. | -| Reusing Parameter Sets | Manual duplication of cases or custom helper macros. | rstest_reuse crate with #[template] and #[apply] attributes. | +| Fixture Injection | Manual calls to setup functions within each test. | Fixture name as argument in #[rstest] function; fixture defined with #[fixture]. | +| Parameterized Tests (Specific Cases) | Loop inside one test, or multiple distinct #[test] functions. | #[case(...)] attributes on #[rstest] function. | +| Parameterized Tests (Value Combinations) | Nested loops inside one test, or complex manual generation. | #[values(...)] attributes on arguments of #[rstest] function. | +| Async Fixture Setup | Manual async block and .await calls inside test. | async fn fixtures, with #[future] and #[awt] for ergonomic `.await`ing. | +| Reusing Parameter Sets | Manual duplication of cases or custom helper macros. | rstest_reuse crate with #[template] and #[apply] attributes. | This comparison highlights how `rstest`'s attribute-based, declarative approach streamlines common testing patterns, reducing manual effort and improving the @@ -1335,20 +1335,20 @@ provided by `rstest`: **Table 2: Key** `rstest` **Attributes Quick Reference** -| Attribute | Core Purpose | +| Attribute | Core Purpose | | ---------------------------- | -------------------------------------------------------------------------------------------- | -| #[rstest] | Marks a function as an rstest test; enables fixture injection and parameterization. | -| #[fixture] | Defines a function that provides a test fixture (setup data or services). | -| #[case(...)] | Defines a single parameterized test case with specific input values. | -| #[values(...)] | Defines a list of values for an argument, generating tests for each value or combination. | -| #[once] | Marks a fixture to be initialized only once and shared (as a static reference) across tests. | -| #[future] | Simplifies async argument types by removing impl Future boilerplate. | -| #[awt] | (Function or argument level) Automatically .awaits future arguments in async tests. | -| #[from(original_name)] | Allows renaming an injected fixture argument in the test function. | -| #[with(...)] | Overrides default arguments of a fixture for a specific test. | -| #[default(...)] | Provides default values for arguments within a fixture function. | -| #[timeout(...)] | Sets a timeout for an asynchronous test. | -| #[files("glob_pattern",...)] | Injects file paths (or contents, with mode=) matching a glob pattern as test arguments. | +| #[rstest] | Marks a function as an rstest test; enables fixture injection and parameterization. | +| #[fixture] | Defines a function that provides a test fixture (setup data or services). | +| #[case(...)] | Defines a single parameterized test case with specific input values. | +| #[values(...)] | Defines a list of values for an argument, generating tests for each value or combination. | +| #[once] | Marks a fixture to be initialized only once and shared (as a static reference) across tests. | +| #[future] | Simplifies async argument types by removing impl Future boilerplate. | +| #[awt] | (Function or argument level) Automatically .awaits future arguments in async tests. | +| #[from(original_name)] | Allows renaming an injected fixture argument in the test function. | +| #[with(...)] | Overrides default arguments of a fixture for a specific test. | +| #[default(...)] | Provides default values for arguments within a fixture function. | +| #[timeout(...)] | Sets a timeout for an asynchronous test. | +| #[files("glob_pattern",...)] | Injects file paths (or contents, with mode=) matching a glob pattern as test arguments. | By mastering `rstest`, Rust developers can significantly elevate the quality and efficiency of their testing practices, leading to more reliable and maintainable diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index 71a053f5..66c1fce9 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -20,14 +20,16 @@ public consumption. message processing. This phase establishes the internal architecture upon which all public-facing features will be built.* -| Item | Name | Details | Size | Depends | -| ---- | ------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- | -| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | - | -| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | - | -| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; ...)` loop that polls for shutdown signals, high/low priority pushes and the handler response stream in that strict order. | Large | #1.2 | -| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - | -| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 | -| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | +| Item | Name | Details | Size | Depends | +| ------ | ------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- | +| ------ | ------- | +| ------ | ------- | +| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | - | +| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | - | +| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; ...)` loop that polls for shutdown signals, high/low priority pushes and the handler response stream in that strict order. | Large | #1.2 | +| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - | +| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 | +| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | ## Phase 2: Public APIs & Developer Ergonomics @@ -35,14 +37,16 @@ all public-facing features will be built.* and idiomatic API. This phase is about making the powerful new mechanics usable and intuitive.* -| Item | Name | Details | Size | Depends on | -| ---- | --------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------------- | -| 2.1 | WireframeProtocol Trait & Builder | Define the cohesive `WireframeProtocol` trait to encapsulate all protocol-specific logic. Refactor the `WireframeApp` builder to use a fluent `.with_protocol(MyProtocol)` method instead of multiple closures. | Medium | #1.6 | -| 2.2 | Public PushHandle API | Implement the public `PushHandle` struct with its `push`, `try_push` and policy-based `push_with_policy` methods. This handle interacts with the dual-channel system from #1.2. | Medium | #1.2 | -| 2.3 | Leak-Proof SessionRegistry | Implement the `SessionRegistry` for discovering connection handles. This must use `dashmap` with `Weak` pointers to prevent memory leaks from terminated connections. | Medium | #2.2 | -| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and document `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 | -| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 | -| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 | +| Item | Name | Details | Size | Depends on | +| ------ | --------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------------- | +| ------ | ---------------- | +| ------ | ---------------- | +| 2.1 | WireframeProtocol Trait & Builder | Define the cohesive `WireframeProtocol` trait to encapsulate all protocol-specific logic. Refactor the `WireframeApp` builder to use a fluent `.with_protocol(MyProtocol)` method instead of multiple closures. | Medium | #1.6 | +| 2.2 | Public PushHandle API | Implement the public `PushHandle` struct with its `push`, `try_push` and policy-based `push_with_policy` methods. This handle interacts with the dual-channel system from #1.2. | Medium | #1.2 | +| 2.3 | Leak-Proof SessionRegistry | Implement the `SessionRegistry` for discovering connection handles. This must use `dashmap` with `Weak` pointers to prevent memory leaks from terminated connections. | Medium | #2.2 | +| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and document `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 | +| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 | +| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 | ## Phase 3: Production Hardening & Resilience @@ -50,24 +54,28 @@ and intuitive.* operation in a production environment. This phase moves the library from "functional" to "resilient".* -| Item | Name | Details | Size | Depends on | -| ---- | ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- | -| 3.1 | Graceful Shutdown | Implement the server-wide graceful shutdown pattern. Use `tokio_util::sync::CancellationToken` for signalling and `tokio_util::task::TaskTracker` to ensure all connection actors terminate cleanly. | Large | #1.3 | -| 3.2 | Re-assembly DoS Protection | Harden the `FragmentAdapter` by adding a non-optional, configurable timeout for partial message re-assembly and strictly enforcing the `max_message_size` limit to prevent memory exhaustion. | Medium | #1.5 | -| 3.3 | Multiplexed Re-assembly | Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages. Use the `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages. | Large | #3.2 | -| 3.4 | Per-Connection Rate Limiting | Integrate an asynchronous, token-bucket rate limiter into the `PushHandle`. The rate limit should be configurable on the `WireframeApp` builder and enforced on every push. | Medium | #2.2 | -| 3.5 | Dead Letter Queue (DLQ) | Implement the optional Dead Letter Queue mechanism. Allow a user to provide a DLQ channel sender during app setup; failed pushes (due to a full queue) can be routed there instead of being dropped. | Medium | #2.2 | -| 3.6 | Context-Aware FragmentStrategy | Enhance the `FragmentStrategy` trait. `max_fragment_payload` and `encode_header` should receive a reference to the logical `Frame` being processed, allowing for more dynamic fragmentation rules. | Small | #1.4 | +| Item | Name | Details | Size | Depends on | +| ------ | ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | +| ------ | ---------- | +| ------ | ------- | +| 3.1 | Graceful Shutdown | Implement the server-wide graceful shutdown pattern. Use `tokio_util::sync::CancellationToken` for signalling and `tokio_util::task::TaskTracker` to ensure all connection actors terminate cleanly. | Large | #1.3 | +| 3.2 | Re-assembly DoS Protection | Harden the `FragmentAdapter` by adding a non-optional, configurable timeout for partial message re-assembly and strictly enforcing the `max_message_size` limit to prevent memory exhaustion. | Medium | #1.5 | +| 3.3 | Multiplexed Re-assembly | Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages. Use the `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages. | Large | #3.2 | +| 3.4 | Per-Connection Rate Limiting | Integrate an asynchronous, token-bucket rate limiter into the `PushHandle`. The rate limit should be configurable on the `WireframeApp` builder and enforced on every push. | Medium | #2.2 | +| 3.5 | Dead Letter Queue (DLQ) | Implement the optional Dead Letter Queue mechanism. Allow a user to provide a DLQ channel sender during app setup; failed pushes (due to a full queue) can be routed there instead of being dropped. | Medium | #2.2 | +| 3.6 | Context-Aware FragmentStrategy | Enhance the `FragmentStrategy` trait. `max_fragment_payload` and `encode_header` should receive a reference to the logical `Frame` being processed, allowing for more dynamic fragmentation rules. | Small | #1.4 | *Focus: Finalizing the library with comprehensive instrumentation, advanced testing, and high-quality documentation to ensure it is stable, debuggable, and ready for a 1.0 release.* -| Item | Name | Details | Size | Depends on | -| ---- | ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | -| 4.1 | Pervasive tracing instrumentation | Instrument the entire library with `tracing`. Add `span!` calls for connection and request lifecycles and detailed `event!` calls for key state transitions (e.g., back-pressure applied, frame dropped, connection terminated). | Large | All | -| 4.2 | Advanced Testing: Concurrency & Logic | Implement the advanced test suite. Use `loom` to verify the concurrency correctness of the `select!` loop and `PushHandle`. Use `proptest` for stateful property-based testing of complex protocol interactions (e.g., fragmentation and streaming). | Large | #3.3, #3.5 | -| 4.3 | Advanced Testing: Performance | Implement the criterion benchmark suite. Create micro-benchmarks for individual components (e.g., `PushHandle` contention) and macro-benchmarks for end-to-end throughput and latency. | Medium | All | -| 4.4 | Comprehensive User Guides | Write the official documentation for the new features. Create separate guides for "Duplex Messaging & Pushes", "Streaming Responses", and "Message Fragmentation". Each guide must include runnable examples and explain the relevant concepts and APIs. | Large | All | -| 4.5 | High-Quality Examples | Create at least two complete, high-quality examples demonstrating real-world use cases. These should include server-initiated MySQL packets (e.g., LOCAL INFILE and session-state trackers) and a push-driven protocol such as WebSocket heart-beats or MQTT broker fan-out. | Medium | All | -| 4.6 | Changelog & 1.0 Release | Finalise the `CHANGELOG.md` with a comprehensive summary of all new features, enhancements, and breaking changes. Tag and publish the 1.0.0 release. | Small | All | +| Item | Name | Details | Size | Depends on | +| ------ | ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | +| ------ | ---------- | +| ------ | ---------- | +| 4.1 | Pervasive tracing instrumentation | Instrument the entire library with `tracing`. Add `span!` calls for connection and request lifecycles and detailed `event!` calls for key state transitions (e.g., back-pressure applied, frame dropped, connection terminated). | Large | All | +| 4.2 | Advanced Testing: Concurrency & Logic | Implement the advanced test suite. Use `loom` to verify the concurrency correctness of the `select!` loop and `PushHandle`. Use `proptest` for stateful property-based testing of complex protocol interactions (e.g., fragmentation and streaming). | Large | #3.3, #3.5 | +| 4.3 | Advanced Testing: Performance | Implement the criterion benchmark suite. Create micro-benchmarks for individual components (e.g., `PushHandle` contention) and macro-benchmarks for end-to-end throughput and latency. | Medium | All | +| 4.4 | Comprehensive User Guides | Write the official documentation for the new features. Create separate guides for "Duplex Messaging & Pushes", "Streaming Responses", and "Message Fragmentation". Each guide must include runnable examples and explain the relevant concepts and APIs. | Large | All | +| 4.5 | High-Quality Examples | Create at least two complete, high-quality examples demonstrating real-world use cases. These should include server-initiated MySQL packets (e.g., LOCAL INFILE and session-state trackers) and a push-driven protocol such as WebSocket heart-beats or MQTT broker fan-out. | Medium | All | +| 4.6 | Changelog & 1.0 Release | Finalise the `CHANGELOG.md` with a comprehensive summary of all new features, enhancements, and breaking changes. Tag and publish the 1.0.0 release. | Small | All | diff --git a/src/app.rs b/src/app.rs index 0c31612e..4dfabffe 100644 --- a/src/app.rs +++ b/src/app.rs @@ -14,7 +14,10 @@ use std::{ }; use bytes::BytesMut; -use tokio::io::{self, AsyncWrite, AsyncWriteExt}; +use tokio::{ + io::{self, AsyncWrite, AsyncWriteExt}, + sync::mpsc, +}; use crate::{ frame::{FrameProcessor, LengthFormat, LengthPrefixedProcessor}, @@ -82,6 +85,7 @@ pub struct WireframeApp< on_connect: Option>>, on_disconnect: Option>>, protocol: Option, ProtocolError = ()>>>, + push_dlq: Option>>, } /// Alias for asynchronous route handlers. @@ -238,6 +242,7 @@ where on_connect: None, on_disconnect: None, protocol: None, + push_dlq: None, } } } @@ -357,6 +362,7 @@ where on_connect: Some(Arc::new(move || Box::pin(f()))), on_disconnect: None, protocol: self.protocol, + push_dlq: self.push_dlq, }) } @@ -392,6 +398,25 @@ where self } + /// Configure a Dead Letter Queue for dropped push frames. + /// + /// ```rust,no_run + /// use tokio::sync::mpsc; + /// use wireframe::app::WireframeApp; + /// + /// # fn build() -> WireframeApp { WireframeApp::new().unwrap() } + /// # fn main() { + /// let (tx, _rx) = mpsc::channel(16); + /// let app = build().with_push_dlq(tx); + /// # let _ = app; + /// # } + /// ``` + #[must_use] + pub fn with_push_dlq(mut self, dlq: mpsc::Sender>) -> Self { + self.push_dlq = Some(dlq); + self + } + /// Get a clone of the configured protocol, if any. /// /// Returns `None` if no protocol was installed via [`with_protocol`](Self::with_protocol). @@ -439,6 +464,7 @@ where on_connect: self.on_connect, on_disconnect: self.on_disconnect, protocol: self.protocol, + push_dlq: self.push_dlq, } } diff --git a/src/push.rs b/src/push.rs index b7ce37d1..2f1ce295 100644 --- a/src/push.rs +++ b/src/push.rs @@ -69,6 +69,7 @@ pub(crate) struct PushHandleInner { high_prio_tx: mpsc::Sender, low_prio_tx: mpsc::Sender, limiter: Option, + dlq_tx: Option>, } /// Cloneable handle used by producers to push frames to a connection. @@ -151,20 +152,27 @@ impl PushHandle { /// /// Returns [`PushError::QueueFull`] if the queue is full and the policy is /// [`PushPolicy::ReturnErrorIfFull`]. Returns [`PushError::Closed`] if the - /// receiving end has been dropped. + /// receiving end has been dropped. When [`PushPolicy::DropIfFull`] or + /// [`PushPolicy::WarnAndDropIfFull`] is used, a configured dead letter queue + /// receives the dropped frame. /// /// # Examples /// /// ```rust,no_run + /// use tokio::sync::mpsc; /// use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues}; /// /// #[tokio::test] /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded(1, 1); + /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); + /// let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); /// handle.push_high_priority(1u8).await.unwrap(); /// - /// let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull); - /// assert!(matches!(result, Err(PushError::QueueFull))); + /// handle + /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) + /// .unwrap(); + /// + /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); /// let _ = queues.recv().await; /// } /// ``` @@ -181,11 +189,17 @@ impl PushHandle { match tx.try_send(frame) { Ok(()) => Ok(()), - Err(mpsc::error::TrySendError::Full(_f)) => match policy { + Err(mpsc::error::TrySendError::Full(f)) => match policy { PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull), - PushPolicy::DropIfFull => Ok(()), - PushPolicy::WarnAndDropIfFull => { - log::warn!("push queue full; dropping {priority:?} priority frame"); + PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => { + if matches!(policy, PushPolicy::WarnAndDropIfFull) { + log::warn!("push queue full; dropping {priority:?} priority frame"); + } + if let Some(dlq) = &self.0.dlq_tx + && let Err(mpsc::error::TrySendError::Full(_)) = dlq.try_send(f) + { + log::error!("push queue and DLQ full; frame lost"); + } Ok(()) } }, @@ -223,7 +237,7 @@ impl PushQueues { /// ``` #[must_use] pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle) { - Self::bounded_with_rate(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE)) + Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None) } /// Create queues with no rate limiting. @@ -241,7 +255,7 @@ impl PushQueues { high_capacity: usize, low_capacity: usize, ) -> (Self, PushHandle) { - Self::bounded_with_rate(high_capacity, low_capacity, None) + Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None) } /// Create queues with a custom rate limit in pushes per second. @@ -272,6 +286,47 @@ impl PushQueues { high_capacity: usize, low_capacity: usize, rate: Option, + ) -> (Self, PushHandle) { + Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None) + } + + /// Create queues with a custom rate limit and optional dead letter queue. + /// + /// Frames that would be dropped by [`try_push`](PushHandle::try_push) when + /// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] + /// are routed to `dlq` if provided. + /// + /// # Panics + /// + /// Panics if `rate` is zero or greater than [`MAX_PUSH_RATE`]. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::sync::mpsc; + /// use wireframe::push::{PushPolicy, PushPriority, PushQueues}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); + /// let (mut queues, handle) = + /// PushQueues::::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + /// handle.push_high_priority(1u8).await.unwrap(); + /// handle + /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) + /// .unwrap(); + /// + /// let (_, val) = queues.recv().await.unwrap(); + /// assert_eq!(val, 1); + /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); + /// } + /// ``` + #[must_use] + pub fn bounded_with_rate_dlq( + high_capacity: usize, + low_capacity: usize, + rate: Option, + dlq: Option>, ) -> (Self, PushHandle) { if let Some(r) = rate { assert!(r > 0, "rate must be greater than zero, got {r}"); @@ -294,6 +349,7 @@ impl PushQueues { high_prio_tx: high_tx, low_prio_tx: low_tx, limiter, + dlq_tx: dlq, }; ( Self { diff --git a/tests/push_policies.rs b/tests/push_policies.rs index 3b9114ad..ce2e1608 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -4,8 +4,10 @@ use std::sync::{Mutex, OnceLock}; use logtest::Logger; use rstest::{fixture, rstest}; +use serial_test::serial; use tokio::{ runtime::Runtime, + sync::mpsc, time::{Duration, timeout}, }; use wireframe::push::{PushPolicy, PushPriority, PushQueues}; @@ -58,8 +60,10 @@ fn rt() -> Runtime { } #[rstest] +#[serial] fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) { rt.block_on(async { + while logger.pop().is_some() {} let (mut queues, handle) = PushQueues::bounded(1, 1); handle.push_high_priority(1u8).await.unwrap(); handle @@ -78,8 +82,10 @@ fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) { } #[rstest] +#[serial] fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) { rt.block_on(async { + while logger.pop().is_some() {} let (mut queues, handle) = PushQueues::bounded(1, 1); handle.push_low_priority(3u8).await.unwrap(); handle @@ -99,3 +105,51 @@ fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) { assert!(logger.pop().is_none()); }); } + +#[rstest] +fn dropped_frame_goes_to_dlq(rt: Runtime) { + rt.block_on(async { + let (dlq_tx, mut dlq_rx) = mpsc::channel(1); + let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + + handle.push_high_priority(1u8).await.unwrap(); + handle + .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) + .unwrap(); + + let (_, val) = queues.recv().await.unwrap(); + assert_eq!(val, 1); + assert_eq!(dlq_rx.recv().await.unwrap(), 2); + }); +} + +#[rstest] +#[serial] +fn dlq_full_logs_error(rt: Runtime, mut logger: LoggerHandle) { + rt.block_on(async { + while logger.pop().is_some() {} + let (dlq_tx, mut dlq_rx) = mpsc::channel(1); + dlq_tx.try_send(99u8).unwrap(); + let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + + handle.push_high_priority(1u8).await.unwrap(); + handle + .try_push(2u8, PushPriority::High, PushPolicy::WarnAndDropIfFull) + .unwrap(); + + let (_, val) = queues.recv().await.unwrap(); + assert_eq!(val, 1); + assert_eq!(dlq_rx.recv().await.unwrap(), 99); + assert!(dlq_rx.try_recv().is_err()); + + let mut found_error = false; + while let Some(record) = logger.pop() { + if record.level() == log::Level::Error { + assert!(record.args().contains("DLQ")); + found_error = true; + break; + } + } + assert!(found_error, "error log not found"); + }); +} From 244ae7f576a5f79635b9785c9b62e29bfc7b2031 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 8 Jul 2025 22:10:42 +0100 Subject: [PATCH 2/9] Handle DLQ closed case --- ...eframe-a-guide-to-production-resilience.md | 3 +- src/push.rs | 88 +++++++++++++------ tests/push.rs | 8 +- tests/push_policies.rs | 36 +++++++- 4 files changed, 103 insertions(+), 32 deletions(-) diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 23447919..e545e215 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -242,7 +242,8 @@ token-bucket algorithm is ideal. use wireframe::push::PushQueues; // Configure a connection to allow at most 100 pushes per second. -let (queues, handle) = PushQueues::::bounded_with_rate(8, 8, Some(100)); +let (queues, handle) = + PushQueues::::bounded_with_rate(8, 8, Some(100)).unwrap(); // Passing `None` disables rate limiting entirely: let (_unlimited, _handle) = PushQueues::::bounded_no_rate_limit(8, 8); diff --git a/src/push.rs b/src/push.rs index 2f1ce295..4bdbbbc3 100644 --- a/src/push.rs +++ b/src/push.rs @@ -65,6 +65,25 @@ impl std::fmt::Display for PushError { impl std::error::Error for PushError {} +/// Errors returned when creating push queues. +#[derive(Debug)] +pub enum PushConfigError { + /// The provided rate was zero or exceeded [`MAX_PUSH_RATE`]. + InvalidRate(usize), +} + +impl std::fmt::Display for PushConfigError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidRate(r) => { + write!(f, "invalid rate {r}; must be between 1 and {MAX_PUSH_RATE}") + } + } + } +} + +impl std::error::Error for PushConfigError {} + pub(crate) struct PushHandleInner { high_prio_tx: mpsc::Sender, low_prio_tx: mpsc::Sender, @@ -146,6 +165,21 @@ impl PushHandle { self.push_with_priority(frame, PushPriority::Low).await } + /// Send a frame to the configured dead letter queue if available. + fn route_to_dlq(&self, frame: F) { + if let Some(dlq) = &self.0.dlq_tx { + match dlq.try_send(frame) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + log::error!("push queue and DLQ full; frame lost"); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + log::error!("DLQ closed; frame lost"); + } + } + } + } + /// Attempt to push a frame with the given priority and policy. /// /// # Errors @@ -165,7 +199,8 @@ impl PushHandle { /// #[tokio::test] /// async fn example() { /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - /// let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + /// let (mut queues, handle) = + /// PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); /// handle.push_high_priority(1u8).await.unwrap(); /// /// handle @@ -195,11 +230,7 @@ impl PushHandle { if matches!(policy, PushPolicy::WarnAndDropIfFull) { log::warn!("push queue full; dropping {priority:?} priority frame"); } - if let Some(dlq) = &self.0.dlq_tx - && let Err(mpsc::error::TrySendError::Full(_)) = dlq.try_send(f) - { - log::error!("push queue and DLQ full; frame lost"); - } + self.route_to_dlq(f); Ok(()) } }, @@ -235,9 +266,14 @@ impl PushQueues { /// assert_eq!(frame, 7); /// } /// ``` + /// + /// # Panics + /// + /// Panics if an internal invariant is violated. This should never occur. #[must_use] pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle) { Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None) + .expect("DEFAULT_PUSH_RATE is always valid") } /// Create queues with no rate limiting. @@ -250,12 +286,16 @@ impl PushQueues { /// let (_queues, handle) = PushQueues::::bounded_no_rate_limit(1, 1); /// let _ = handle; /// ``` + /// + /// # Panics + /// + /// Panics if an internal invariant is violated. This should never occur. #[must_use] pub fn bounded_no_rate_limit( high_capacity: usize, low_capacity: usize, ) -> (Self, PushHandle) { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None) + Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None).unwrap() } /// Create queues with a custom rate limit in pushes per second. @@ -264,9 +304,10 @@ impl PushQueues { /// per second across all producers for the returned [`PushHandle`]. /// Pass `None` to disable rate limiting entirely. /// - /// # Panics + /// # Errors /// - /// Panics if `rate` is zero or greater than [`MAX_PUSH_RATE`]. + /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater + /// than [`MAX_PUSH_RATE`]. /// /// # Examples /// @@ -275,18 +316,17 @@ impl PushQueues { /// /// #[tokio::main] /// async fn main() { - /// let (mut queues, handle) = PushQueues::::bounded_with_rate(1, 1, Some(10)); + /// let (mut queues, handle) = PushQueues::::bounded_with_rate(1, 1, Some(10)).unwrap(); /// handle.push_low_priority(1u8).await.unwrap(); /// let (_prio, frame) = queues.recv().await.unwrap(); /// assert_eq!(frame, 1); /// } /// ``` - #[must_use] pub fn bounded_with_rate( high_capacity: usize, low_capacity: usize, rate: Option, - ) -> (Self, PushHandle) { + ) -> Result<(Self, PushHandle), PushConfigError> { Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None) } @@ -296,9 +336,10 @@ impl PushQueues { /// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] /// are routed to `dlq` if provided. /// - /// # Panics + /// # Errors /// - /// Panics if `rate` is zero or greater than [`MAX_PUSH_RATE`]. + /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater + /// than [`MAX_PUSH_RATE`]. /// /// # Examples /// @@ -310,7 +351,7 @@ impl PushQueues { /// async fn main() { /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); /// let (mut queues, handle) = - /// PushQueues::::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + /// PushQueues::::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); /// handle.push_high_priority(1u8).await.unwrap(); /// handle /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) @@ -321,19 +362,16 @@ impl PushQueues { /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); /// } /// ``` - #[must_use] pub fn bounded_with_rate_dlq( high_capacity: usize, low_capacity: usize, rate: Option, dlq: Option>, - ) -> (Self, PushHandle) { - if let Some(r) = rate { - assert!(r > 0, "rate must be greater than zero, got {r}"); - assert!( - r <= MAX_PUSH_RATE, - "rate must be <= {MAX_PUSH_RATE}, got {r}" - ); + ) -> Result<(Self, PushHandle), PushConfigError> { + if let Some(r) = rate + && (r == 0 || r > MAX_PUSH_RATE) + { + return Err(PushConfigError::InvalidRate(r)); } let (high_tx, high_rx) = mpsc::channel(high_capacity); let (low_tx, low_rx) = mpsc::channel(low_capacity); @@ -351,13 +389,13 @@ impl PushQueues { limiter, dlq_tx: dlq, }; - ( + Ok(( Self { high_priority_rx: high_rx, low_priority_rx: low_rx, }, PushHandle(Arc::new(inner)), - ) + )) } /// Receive the next frame, preferring high priority frames when available. diff --git a/tests/push.rs b/tests/push.rs index 5a6fdb8f..05fe000e 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -56,7 +56,7 @@ async fn push_queues_error_on_closed() { #[tokio::test] async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)); + let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap(); match priority { PushPriority::High => handle.push_high_priority(1u8).await.unwrap(), @@ -88,7 +88,7 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { #[tokio::test] async fn rate_limiter_allows_after_wait() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)); + let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap(); handle.push_high_priority(1u8).await.unwrap(); time::advance(Duration::from_secs(1)).await; handle.push_high_priority(2u8).await.unwrap(); @@ -101,7 +101,7 @@ async fn rate_limiter_allows_after_wait() { #[tokio::test] async fn rate_limiter_shared_across_priorities() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)); + let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap(); handle.push_high_priority(1u8).await.unwrap(); let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; @@ -134,7 +134,7 @@ async fn unlimited_queues_do_not_block() { #[tokio::test] async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3)); + let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3)).unwrap(); for i in 0u8..3 { handle.push_high_priority(i).await.unwrap(); diff --git a/tests/push_policies.rs b/tests/push_policies.rs index ce2e1608..04d8a649 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -110,7 +110,8 @@ fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) { fn dropped_frame_goes_to_dlq(rt: Runtime) { rt.block_on(async { let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + let (mut queues, handle) = + PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); handle.push_high_priority(1u8).await.unwrap(); handle @@ -130,7 +131,8 @@ fn dlq_full_logs_error(rt: Runtime, mut logger: LoggerHandle) { while logger.pop().is_some() {} let (dlq_tx, mut dlq_rx) = mpsc::channel(1); dlq_tx.try_send(99u8).unwrap(); - let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)); + let (mut queues, handle) = + PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); handle.push_high_priority(1u8).await.unwrap(); handle @@ -153,3 +155,33 @@ fn dlq_full_logs_error(rt: Runtime, mut logger: LoggerHandle) { assert!(found_error, "error log not found"); }); } + +#[rstest] +#[serial] +fn dlq_closed_logs_error(rt: Runtime, mut logger: LoggerHandle) { + rt.block_on(async { + while logger.pop().is_some() {} + let (dlq_tx, dlq_rx) = mpsc::channel::(1); + drop(dlq_rx); + let (mut queues, handle) = + PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); + + handle.push_high_priority(1u8).await.unwrap(); + handle + .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) + .unwrap(); + + let (_, val) = queues.recv().await.unwrap(); + assert_eq!(val, 1); + + let mut found_error = false; + while let Some(record) = logger.pop() { + if record.level() == log::Level::Error { + assert!(record.args().contains("closed")); + found_error = true; + break; + } + } + assert!(found_error, "error log not found"); + }); +} From f8aa4dc4d2b36a0cb3dded7515f62a6f0fb6dc2b Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 9 Jul 2025 07:55:46 +0100 Subject: [PATCH 3/9] Document DLQ routing --- .../asynchronous-outbound-messaging-design.md | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 6090d3fd..0833dd27 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -597,6 +597,30 @@ that would normally be dropped by the `PushPolicy::DropIfFull` or part of the application is then responsible for consuming from the DLQ to inspect, log, and potentially retry these failed messages. +The following sequence diagram illustrates how frames are routed when a DLQ is +configured: + +```mermaid +sequenceDiagram + participant Producer + participant PushHandle + participant HighPrioQueue + participant DLQ as DeadLetterQueue + Producer->>PushHandle: try_push(frame, priority, DropIfFull) + PushHandle->>HighPrioQueue: try_send(frame) + alt Queue full + PushHandle->>DLQ: try_send(frame) + alt DLQ full + PushHandle->>PushHandle: log error (frame lost) + else DLQ has space + DLQ-->>PushHandle: frame accepted + end + else Queue has space + HighPrioQueue-->>PushHandle: frame accepted + end + PushHandle-->>Producer: Ok or logs +``` + ### 5.3 Typed protocol errors `WireframeError` distinguishes transport failures from protocol logic errors. A From e4fc5490550aa73b5dc33851c971d55def79c42f Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 9 Jul 2025 21:35:22 +0100 Subject: [PATCH 4/9] Refactor DLQ error tests --- tests/push_policies.rs | 69 +++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 35 deletions(-) diff --git a/tests/push_policies.rs b/tests/push_policies.rs index 04d8a649..53d9e060 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -2,6 +2,7 @@ use std::sync::{Mutex, OnceLock}; +use futures::future::BoxFuture; use logtest::Logger; use rstest::{fixture, rstest}; use serial_test::serial; @@ -124,60 +125,58 @@ fn dropped_frame_goes_to_dlq(rt: Runtime) { }); } -#[rstest] -#[serial] -fn dlq_full_logs_error(rt: Runtime, mut logger: LoggerHandle) { - rt.block_on(async { - while logger.pop().is_some() {} - let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - dlq_tx.try_send(99u8).unwrap(); - let (mut queues, handle) = - PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); - - handle.push_high_priority(1u8).await.unwrap(); - handle - .try_push(2u8, PushPriority::High, PushPolicy::WarnAndDropIfFull) - .unwrap(); +fn setup_dlq_full(tx: &mpsc::Sender, _rx: &mut Option>) { + tx.try_send(99).unwrap(); +} - let (_, val) = queues.recv().await.unwrap(); - assert_eq!(val, 1); - assert_eq!(dlq_rx.recv().await.unwrap(), 99); - assert!(dlq_rx.try_recv().is_err()); +fn setup_dlq_closed(_: &mpsc::Sender, rx: &mut Option>) { drop(rx.take()); } - let mut found_error = false; - while let Some(record) = logger.pop() { - if record.level() == log::Level::Error { - assert!(record.args().contains("DLQ")); - found_error = true; - break; - } - } - assert!(found_error, "error log not found"); - }); +fn assert_dlq_full(rx: &mut Option>) -> BoxFuture<'_, ()> { + Box::pin(async move { + let receiver = rx.as_mut().expect("receiver missing"); + assert_eq!(receiver.recv().await.unwrap(), 99); + assert!(receiver.try_recv().is_err()); + }) } +fn assert_dlq_closed(_: &mut Option>) -> BoxFuture<'_, ()> { Box::pin(async {}) } + #[rstest] +#[case::dlq_full(setup_dlq_full, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)] +#[case::dlq_closed(setup_dlq_closed, PushPolicy::DropIfFull, "closed", assert_dlq_closed)] #[serial] -fn dlq_closed_logs_error(rt: Runtime, mut logger: LoggerHandle) { +fn dlq_error_scenarios( + rt: Runtime, + mut logger: LoggerHandle, + #[case] setup: Setup, + #[case] policy: PushPolicy, + #[case] expected: &str, + #[case] assertion: AssertFn, +) where + Setup: FnOnce(&mpsc::Sender, &mut Option>), + AssertFn: FnOnce(&mut Option>) -> BoxFuture<'_, ()>, +{ rt.block_on(async { while logger.pop().is_some() {} - let (dlq_tx, dlq_rx) = mpsc::channel::(1); - drop(dlq_rx); + + let (dlq_tx, dlq_rx) = mpsc::channel(1); + let mut dlq_rx = Some(dlq_rx); + setup(&dlq_tx, &mut dlq_rx); let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); handle.push_high_priority(1u8).await.unwrap(); - handle - .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) - .unwrap(); + handle.try_push(2u8, PushPriority::High, policy).unwrap(); let (_, val) = queues.recv().await.unwrap(); assert_eq!(val, 1); + assertion(&mut dlq_rx).await; + let mut found_error = false; while let Some(record) = logger.pop() { if record.level() == log::Level::Error { - assert!(record.args().contains("closed")); + assert!(record.args().contains(expected)); found_error = true; break; } From abe10e12b50f0efe4ce1dfef637b3500a4da7d0f Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 10 Jul 2025 22:47:54 +0100 Subject: [PATCH 5/9] Synchronize push policy tests --- tests/push_policies.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tests/push_policies.rs b/tests/push_policies.rs index 53d9e060..f7fd375a 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -61,7 +61,7 @@ fn rt() -> Runtime { } #[rstest] -#[serial] +#[serial(push_policies)] fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) { rt.block_on(async { while logger.pop().is_some() {} @@ -78,12 +78,18 @@ fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) { .is_err() ); - assert!(logger.pop().is_none()); + while let Some(record) = logger.pop() { + assert!( + !record.args().contains("push queue full"), + "unexpected log: {}", + record.args() + ); + } }); } #[rstest] -#[serial] +#[serial(push_policies)] fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) { rt.block_on(async { while logger.pop().is_some() {} @@ -99,11 +105,14 @@ fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) { .await .is_err() ); - - let record = logger.pop().expect("expected warning"); - assert_eq!(record.level(), log::Level::Warn); - assert!(record.args().contains("push queue full")); - assert!(logger.pop().is_none()); + let mut found = false; + while let Some(record) = logger.pop() { + if record.level() == log::Level::Warn && record.args().contains("push queue full") { + found = true; + break; + } + } + assert!(found, "warning log not found"); }); } @@ -144,7 +153,7 @@ fn assert_dlq_closed(_: &mut Option>) -> BoxFuture<'_, ()> { #[rstest] #[case::dlq_full(setup_dlq_full, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)] #[case::dlq_closed(setup_dlq_closed, PushPolicy::DropIfFull, "closed", assert_dlq_closed)] -#[serial] +#[serial(push_policies)] fn dlq_error_scenarios( rt: Runtime, mut logger: LoggerHandle, From d8738982e61b56447521d7e68aacf51b6eaff847 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 10 Jul 2025 23:28:36 +0100 Subject: [PATCH 6/9] Add logging fixture to test crate --- Cargo.lock | 1 + tests/connection_actor.rs | 39 +----------------------------- tests/push_policies.rs | 39 +----------------------------- wireframe_testing/Cargo.toml | 1 + wireframe_testing/src/lib.rs | 3 +++ wireframe_testing/src/logging.rs | 41 ++++++++++++++++++++++++++++++++ 6 files changed, 48 insertions(+), 76 deletions(-) create mode 100644 wireframe_testing/src/logging.rs diff --git a/Cargo.lock b/Cargo.lock index 31bdc516..af9d432b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,6 +1290,7 @@ version = "0.1.0" dependencies = [ "bincode", "bytes", + "logtest", "rstest", "tokio", "wireframe", diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 2ed5ab16..ec8438bb 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -354,49 +354,12 @@ async fn push_queue_exhaustion_backpressure() { use std::sync::{ Arc, - Mutex, - OnceLock, atomic::{AtomicUsize, Ordering}, }; -use logtest::Logger; use serial_test::serial; use wireframe::{ConnectionContext, ProtocolHooks}; - -/// Handle to the global logger with exclusive access. -struct LoggerHandle { - guard: std::sync::MutexGuard<'static, Logger>, -} - -impl LoggerHandle { - fn new() -> Self { - static LOGGER: OnceLock> = OnceLock::new(); - - let logger = LOGGER.get_or_init(|| Mutex::new(Logger::start())); - let guard = logger - .lock() - .expect("failed to acquire global logger lock; a previous test may still hold it"); - - Self { guard } - } -} - -impl std::ops::Deref for LoggerHandle { - type Target = Logger; - - fn deref(&self) -> &Self::Target { &self.guard } -} - -impl std::ops::DerefMut for LoggerHandle { - fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard } -} - -#[allow( - unused_braces, - reason = "rustc false positive for single line rstest fixtures" -)] -#[fixture] -fn logger() -> LoggerHandle { LoggerHandle::new() } +use wireframe_testing::{LoggerHandle, logger}; #[rstest] #[tokio::test] diff --git a/tests/push_policies.rs b/tests/push_policies.rs index f7fd375a..249c14b6 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -1,9 +1,6 @@ //! Tests for push queue policy behaviour. -use std::sync::{Mutex, OnceLock}; - use futures::future::BoxFuture; -use logtest::Logger; use rstest::{fixture, rstest}; use serial_test::serial; use tokio::{ @@ -12,41 +9,7 @@ use tokio::{ time::{Duration, timeout}, }; use wireframe::push::{PushPolicy, PushPriority, PushQueues}; - -/// Handle to the global logger with exclusive access. -struct LoggerHandle { - guard: std::sync::MutexGuard<'static, Logger>, -} - -impl LoggerHandle { - fn new() -> Self { - static LOGGER: OnceLock> = OnceLock::new(); - - let logger = LOGGER.get_or_init(|| Mutex::new(Logger::start())); - let guard = logger - .lock() - .expect("failed to acquire global logger lock; a previous test may still hold it"); - - Self { guard } - } -} - -impl std::ops::Deref for LoggerHandle { - type Target = Logger; - - fn deref(&self) -> &Self::Target { &self.guard } -} - -impl std::ops::DerefMut for LoggerHandle { - fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard } -} - -#[allow( - unused_braces, - reason = "rustc false positive for single line rstest fixtures" -)] -#[fixture] -fn logger() -> LoggerHandle { LoggerHandle::new() } +use wireframe_testing::{LoggerHandle, logger}; #[allow( unused_braces, diff --git a/wireframe_testing/Cargo.toml b/wireframe_testing/Cargo.toml index 062b4a05..5d70db84 100644 --- a/wireframe_testing/Cargo.toml +++ b/wireframe_testing/Cargo.toml @@ -9,3 +9,4 @@ wireframe = { path = ".." } bincode = "^2.0" bytes = "^1.0" rstest = "0.18.2" +logtest = "2" diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 183bbf12..fee9ce07 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -14,6 +14,7 @@ //! ``` pub mod helpers; +pub mod logging; pub use helpers::{ TestSerializer, @@ -31,3 +32,5 @@ pub use helpers::{ run_app_with_frames_with_capacity, run_with_duplex_server, }; + +pub use logging::{logger, LoggerHandle}; diff --git a/wireframe_testing/src/logging.rs b/wireframe_testing/src/logging.rs new file mode 100644 index 00000000..0e42de81 --- /dev/null +++ b/wireframe_testing/src/logging.rs @@ -0,0 +1,41 @@ +use std::sync::{Mutex, MutexGuard, OnceLock}; + +use logtest::Logger; +use rstest::fixture; + +/// Handle to the global logger with exclusive access. +/// +/// This guard ensures tests do not interfere with each other's log capture by +/// serialising access to a [`logtest::Logger`]. +pub struct LoggerHandle { + guard: MutexGuard<'static, Logger>, +} + +impl LoggerHandle { + /// Acquire the global [`Logger`] instance. + pub fn new() -> Self { + static LOGGER: OnceLock> = OnceLock::new(); + + let logger = LOGGER.get_or_init(|| Mutex::new(Logger::start())); + let guard = logger.lock().expect("logger poisoned"); + + Self { guard } + } +} + +impl std::ops::Deref for LoggerHandle { + type Target = Logger; + + fn deref(&self) -> &Self::Target { &self.guard } +} + +impl std::ops::DerefMut for LoggerHandle { + fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard } +} + +#[allow( + unused_braces, + reason = "rustc false positive for single line rstest fixtures", +)] +#[fixture] +pub fn logger() -> LoggerHandle { LoggerHandle::new() } From cafdf38f8bc3f4e9f14a69f4ddd1d7d6c34cddc2 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 11 Jul 2025 00:02:51 +0100 Subject: [PATCH 7/9] Refactor push policy tests --- tests/push_policies.rs | 55 +++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/tests/push_policies.rs b/tests/push_policies.rs index 249c14b6..a5e334ed 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -24,15 +24,23 @@ fn rt() -> Runtime { } #[rstest] +#[case::drop_if_full(PushPolicy::DropIfFull, false, "push queue full")] +#[case::warn_and_drop(PushPolicy::WarnAndDropIfFull, true, "push queue full")] #[serial(push_policies)] -fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) { +fn push_policy_behaviour( + rt: Runtime, + mut logger: LoggerHandle, + #[case] policy: PushPolicy, + #[case] expect_warning: bool, + #[case] expected_msg: &str, +) { rt.block_on(async { while logger.pop().is_some() {} let (mut queues, handle) = PushQueues::bounded(1, 1); + handle.push_high_priority(1u8).await.unwrap(); - handle - .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) - .unwrap(); + handle.try_push(2u8, PushPriority::High, policy).unwrap(); + let (_, val) = queues.recv().await.unwrap(); assert_eq!(val, 1); assert!( @@ -41,41 +49,18 @@ fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) { .is_err() ); + let mut found_warning = false; while let Some(record) = logger.pop() { - assert!( - !record.args().contains("push queue full"), - "unexpected log: {}", - record.args() - ); + if record.level() == log::Level::Warn && record.args().contains(expected_msg) { + found_warning = true; + } } - }); -} -#[rstest] -#[serial(push_policies)] -fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) { - rt.block_on(async { - while logger.pop().is_some() {} - let (mut queues, handle) = PushQueues::bounded(1, 1); - handle.push_low_priority(3u8).await.unwrap(); - handle - .try_push(4u8, PushPriority::Low, PushPolicy::WarnAndDropIfFull) - .unwrap(); - let (_, val) = queues.recv().await.unwrap(); - assert_eq!(val, 3); - assert!( - timeout(Duration::from_millis(20), queues.recv()) - .await - .is_err() - ); - let mut found = false; - while let Some(record) = logger.pop() { - if record.level() == log::Level::Warn && record.args().contains("push queue full") { - found = true; - break; - } + if expect_warning { + assert!(found_warning, "warning log not found"); + } else { + assert!(!found_warning, "unexpected warning log found"); } - assert!(found, "warning log not found"); }); } From e4900b76bbb0e5e57a12832338f316b528f933fd Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 11 Jul 2025 00:29:43 +0100 Subject: [PATCH 8/9] Format test helpers --- wireframe_testing/src/lib.rs | 3 +-- wireframe_testing/src/logging.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index fee9ce07..11bf893d 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -32,5 +32,4 @@ pub use helpers::{ run_app_with_frames_with_capacity, run_with_duplex_server, }; - -pub use logging::{logger, LoggerHandle}; +pub use logging::{LoggerHandle, logger}; diff --git a/wireframe_testing/src/logging.rs b/wireframe_testing/src/logging.rs index 0e42de81..32eed354 100644 --- a/wireframe_testing/src/logging.rs +++ b/wireframe_testing/src/logging.rs @@ -35,7 +35,7 @@ impl std::ops::DerefMut for LoggerHandle { #[allow( unused_braces, - reason = "rustc false positive for single line rstest fixtures", + reason = "rustc false positive for single line rstest fixtures" )] #[fixture] pub fn logger() -> LoggerHandle { LoggerHandle::new() } From 62cb8f27265ba6ab4c9c53f63674bff38b13202f Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 11 Jul 2025 00:31:17 +0100 Subject: [PATCH 9/9] Add docstring to wireframe_testing/src/logging.rs Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- wireframe_testing/src/logging.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/wireframe_testing/src/logging.rs b/wireframe_testing/src/logging.rs index 32eed354..2e1193d0 100644 --- a/wireframe_testing/src/logging.rs +++ b/wireframe_testing/src/logging.rs @@ -1,8 +1,13 @@ +//! Logging utilities for test infrastructure. +//! +//! This module provides a global, thread-safe logger handle for capturing and +//! inspecting log output during tests. The [`LoggerHandle`] ensures exclusive +//! access to prevent interference between concurrent tests. + use std::sync::{Mutex, MutexGuard, OnceLock}; use logtest::Logger; use rstest::fixture; - /// Handle to the global logger with exclusive access. /// /// This guard ensures tests do not interfere with each other's log capture by