diff --git a/Cargo.lock b/Cargo.lock
index 31bdc516..af9d432b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1290,6 +1290,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bytes",
+ "logtest",
"rstest",
"tokio",
"wireframe",
diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md
index 6090d3fd..0833dd27 100644
--- a/docs/asynchronous-outbound-messaging-design.md
+++ b/docs/asynchronous-outbound-messaging-design.md
@@ -597,6 +597,30 @@ that would normally be dropped by the `PushPolicy::DropIfFull` or
part of the application is then responsible for consuming from the DLQ to
inspect, log, and potentially retry these failed messages.
+The following sequence diagram illustrates how frames are routed when a DLQ is
+configured:
+
+```mermaid
+sequenceDiagram
+ participant Producer
+ participant PushHandle
+ participant HighPrioQueue
+ participant DLQ as DeadLetterQueue
+ Producer->>PushHandle: try_push(frame, priority, DropIfFull)
+ PushHandle->>HighPrioQueue: try_send(frame)
+ alt Queue full
+ PushHandle->>DLQ: try_send(frame)
+ alt DLQ full
+ PushHandle->>PushHandle: log error (frame lost)
+ else DLQ has space
+ DLQ-->>PushHandle: frame accepted
+ end
+ else Queue has space
+ HighPrioQueue-->>PushHandle: frame accepted
+ end
+ PushHandle-->>Producer: Ok or logs
+```
+
### 5.3 Typed protocol errors
`WireframeError` distinguishes transport failures from protocol logic errors. A
diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md
index c1df5585..52bf71a9 100644
--- a/docs/asynchronous-outbound-messaging-roadmap.md
+++ b/docs/asynchronous-outbound-messaging-roadmap.md
@@ -44,7 +44,7 @@ design documents.
([Design §5][design-errors]).
- [x] **Per-connection rate limiting** on pushes via a token bucket
([Resilience Guide §4.1][resilience-rate]).
-- [ ] **Optional Dead Letter Queue** for full queues
+- [x] **Optional Dead Letter Queue** for full queues
([Design §5.2][design-dlq]).
## 4. Observability and Quality Assurance
diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md
index 23447919..e545e215 100644
--- a/docs/hardening-wireframe-a-guide-to-production-resilience.md
+++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md
@@ -242,7 +242,8 @@ token-bucket algorithm is ideal.
use wireframe::push::PushQueues;
// Configure a connection to allow at most 100 pushes per second.
-let (queues, handle) = PushQueues::::bounded_with_rate(8, 8, Some(100));
+let (queues, handle) =
+ PushQueues::::bounded_with_rate(8, 8, Some(100)).unwrap();
// Passing `None` disables rate limiting entirely:
let (_unlimited, _handle) = PushQueues::::bounded_no_rate_limit(8, 8);
diff --git a/docs/rust-testing-with-rstest-fixtures.md b/docs/rust-testing-with-rstest-fixtures.md
index 0d2b90c3..4cf47893 100644
--- a/docs/rust-testing-with-rstest-fixtures.md
+++ b/docs/rust-testing-with-rstest-fixtures.md
@@ -540,10 +540,10 @@ When using `#[once]`, there are critical warnings:
the end of the test suite. This makes `#[once]` fixtures best suited for
truly passive data or resources whose cleanup is managed by the operating
system upon process exit.
-1. **Functional Limitations:** `#[once]` fixtures cannot be `async` functions
+2. **Functional Limitations:** `#[once]` fixtures cannot be `async` functions
and cannot be generic functions (neither with generic type parameters nor
using `impl Trait` in arguments or return types).
-1. **Attribute Propagation:** `rstest` macros currently drop `#[expect]`
+3. **Attribute Propagation:** `rstest` macros currently drop `#[expect]`
attributes. If you rely on lint expectations, use `#[allow]` instead to
silence false positives.
@@ -1168,13 +1168,13 @@ The following table summarizes key differences:
**Table 1:** `rstest` **vs. Standard Rust** `#[test]` **for Fixture Management
and Parameterization**
-| Feature | Standard #[test] Approach | rstest Approach |
+| Feature | Standard #[test] Approach | rstest Approach |
| ------------------------------------------------------------- | ------------------------------------------------------------- | -------------------------------------------------------------------------------- |
-| Fixture Injection | Manual calls to setup functions within each test. | Fixture name as argument in #[rstest] function; fixture defined with #[fixture]. |
-| Parameterized Tests (Specific Cases) | Loop inside one test, or multiple distinct #[test] functions. | #[case(...)] attributes on #[rstest] function. |
-| Parameterized Tests (Value Combinations) | Nested loops inside one test, or complex manual generation. | #[values(...)] attributes on arguments of #[rstest] function. |
-| Async Fixture Setup | Manual async block and .await calls inside test. | async fn fixtures, with #[future] and #[awt] for ergonomic `.await`ing. |
-| Reusing Parameter Sets | Manual duplication of cases or custom helper macros. | rstest_reuse crate with #[template] and #[apply] attributes. |
+| Fixture Injection | Manual calls to setup functions within each test. | Fixture name as argument in #[rstest] function; fixture defined with #[fixture]. |
+| Parameterized Tests (Specific Cases) | Loop inside one test, or multiple distinct #[test] functions. | #[case(...)] attributes on #[rstest] function. |
+| Parameterized Tests (Value Combinations) | Nested loops inside one test, or complex manual generation. | #[values(...)] attributes on arguments of #[rstest] function. |
+| Async Fixture Setup | Manual async block and .await calls inside test. | async fn fixtures, with #[future] and #[awt] for ergonomic `.await`ing. |
+| Reusing Parameter Sets | Manual duplication of cases or custom helper macros. | rstest_reuse crate with #[template] and #[apply] attributes. |
This comparison highlights how `rstest`'s attribute-based, declarative approach
streamlines common testing patterns, reducing manual effort and improving the
@@ -1335,20 +1335,20 @@ provided by `rstest`:
**Table 2: Key** `rstest` **Attributes Quick Reference**
-| Attribute | Core Purpose |
+| Attribute | Core Purpose |
| ---------------------------- | -------------------------------------------------------------------------------------------- |
-| #[rstest] | Marks a function as an rstest test; enables fixture injection and parameterization. |
-| #[fixture] | Defines a function that provides a test fixture (setup data or services). |
-| #[case(...)] | Defines a single parameterized test case with specific input values. |
-| #[values(...)] | Defines a list of values for an argument, generating tests for each value or combination. |
-| #[once] | Marks a fixture to be initialized only once and shared (as a static reference) across tests. |
-| #[future] | Simplifies async argument types by removing impl Future boilerplate. |
-| #[awt] | (Function or argument level) Automatically .awaits future arguments in async tests. |
-| #[from(original_name)] | Allows renaming an injected fixture argument in the test function. |
-| #[with(...)] | Overrides default arguments of a fixture for a specific test. |
-| #[default(...)] | Provides default values for arguments within a fixture function. |
-| #[timeout(...)] | Sets a timeout for an asynchronous test. |
-| #[files("glob_pattern",...)] | Injects file paths (or contents, with mode=) matching a glob pattern as test arguments. |
+| #[rstest] | Marks a function as an rstest test; enables fixture injection and parameterization. |
+| #[fixture] | Defines a function that provides a test fixture (setup data or services). |
+| #[case(...)] | Defines a single parameterized test case with specific input values. |
+| #[values(...)] | Defines a list of values for an argument, generating tests for each value or combination. |
+| #[once] | Marks a fixture to be initialized only once and shared (as a static reference) across tests. |
+| #[future] | Simplifies async argument types by removing impl Future boilerplate. |
+| #[awt] | (Function or argument level) Automatically .awaits future arguments in async tests. |
+| #[from(original_name)] | Allows renaming an injected fixture argument in the test function. |
+| #[with(...)] | Overrides default arguments of a fixture for a specific test. |
+| #[default(...)] | Provides default values for arguments within a fixture function. |
+| #[timeout(...)] | Sets a timeout for an asynchronous test. |
+| #[files("glob_pattern",...)] | Injects file paths (or contents, with mode=) matching a glob pattern as test arguments. |
By mastering `rstest`, Rust developers can significantly elevate the quality and
efficiency of their testing practices, leading to more reliable and maintainable
diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md
index 71a053f5..66c1fce9 100644
--- a/docs/wireframe-1-0-detailed-development-roadmap.md
+++ b/docs/wireframe-1-0-detailed-development-roadmap.md
@@ -20,14 +20,16 @@ public consumption.
message processing. This phase establishes the internal architecture upon which
all public-facing features will be built.*
-| Item | Name | Details | Size | Depends |
-| ---- | ------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- |
-| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | - |
-| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | - |
-| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; ...)` loop that polls for shutdown signals, high/low priority pushes and the handler response stream in that strict order. | Large | #1.2 |
-| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - |
-| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 |
-| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 |
+| Item | Name | Details | Size | Depends |
+| ------ | ------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- |
+| ------ | ------- |
+| ------ | ------- |
+| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | - |
+| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | - |
+| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; ...)` loop that polls for shutdown signals, high/low priority pushes and the handler response stream in that strict order. | Large | #1.2 |
+| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - |
+| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 |
+| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 |
## Phase 2: Public APIs & Developer Ergonomics
@@ -35,14 +37,16 @@ all public-facing features will be built.*
and idiomatic API. This phase is about making the powerful new mechanics usable
and intuitive.*
-| Item | Name | Details | Size | Depends on |
-| ---- | --------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------------- |
-| 2.1 | WireframeProtocol Trait & Builder | Define the cohesive `WireframeProtocol` trait to encapsulate all protocol-specific logic. Refactor the `WireframeApp` builder to use a fluent `.with_protocol(MyProtocol)` method instead of multiple closures. | Medium | #1.6 |
-| 2.2 | Public PushHandle API | Implement the public `PushHandle` struct with its `push`, `try_push` and policy-based `push_with_policy` methods. This handle interacts with the dual-channel system from #1.2. | Medium | #1.2 |
-| 2.3 | Leak-Proof SessionRegistry | Implement the `SessionRegistry` for discovering connection handles. This must use `dashmap` with `Weak` pointers to prevent memory leaks from terminated connections. | Medium | #2.2 |
-| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and document `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 |
-| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 |
-| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 |
+| Item | Name | Details | Size | Depends on |
+| ------ | --------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------------- |
+| ------ | ---------------- |
+| ------ | ---------------- |
+| 2.1 | WireframeProtocol Trait & Builder | Define the cohesive `WireframeProtocol` trait to encapsulate all protocol-specific logic. Refactor the `WireframeApp` builder to use a fluent `.with_protocol(MyProtocol)` method instead of multiple closures. | Medium | #1.6 |
+| 2.2 | Public PushHandle API | Implement the public `PushHandle` struct with its `push`, `try_push` and policy-based `push_with_policy` methods. This handle interacts with the dual-channel system from #1.2. | Medium | #1.2 |
+| 2.3 | Leak-Proof SessionRegistry | Implement the `SessionRegistry` for discovering connection handles. This must use `dashmap` with `Weak` pointers to prevent memory leaks from terminated connections. | Medium | #2.2 |
+| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and document `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 |
+| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 |
+| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 |
## Phase 3: Production Hardening & Resilience
@@ -50,24 +54,28 @@ and intuitive.*
operation in a production environment. This phase moves the library from
"functional" to "resilient".*
-| Item | Name | Details | Size | Depends on |
-| ---- | ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- |
-| 3.1 | Graceful Shutdown | Implement the server-wide graceful shutdown pattern. Use `tokio_util::sync::CancellationToken` for signalling and `tokio_util::task::TaskTracker` to ensure all connection actors terminate cleanly. | Large | #1.3 |
-| 3.2 | Re-assembly DoS Protection | Harden the `FragmentAdapter` by adding a non-optional, configurable timeout for partial message re-assembly and strictly enforcing the `max_message_size` limit to prevent memory exhaustion. | Medium | #1.5 |
-| 3.3 | Multiplexed Re-assembly | Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages. Use the `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages. | Large | #3.2 |
-| 3.4 | Per-Connection Rate Limiting | Integrate an asynchronous, token-bucket rate limiter into the `PushHandle`. The rate limit should be configurable on the `WireframeApp` builder and enforced on every push. | Medium | #2.2 |
-| 3.5 | Dead Letter Queue (DLQ) | Implement the optional Dead Letter Queue mechanism. Allow a user to provide a DLQ channel sender during app setup; failed pushes (due to a full queue) can be routed there instead of being dropped. | Medium | #2.2 |
-| 3.6 | Context-Aware FragmentStrategy | Enhance the `FragmentStrategy` trait. `max_fragment_payload` and `encode_header` should receive a reference to the logical `Frame` being processed, allowing for more dynamic fragmentation rules. | Small | #1.4 |
+| Item | Name | Details | Size | Depends on |
+| ------ | ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- |
+| ------ | ---------- |
+| ------ | ------- |
+| 3.1 | Graceful Shutdown | Implement the server-wide graceful shutdown pattern. Use `tokio_util::sync::CancellationToken` for signalling and `tokio_util::task::TaskTracker` to ensure all connection actors terminate cleanly. | Large | #1.3 |
+| 3.2 | Re-assembly DoS Protection | Harden the `FragmentAdapter` by adding a non-optional, configurable timeout for partial message re-assembly and strictly enforcing the `max_message_size` limit to prevent memory exhaustion. | Medium | #1.5 |
+| 3.3 | Multiplexed Re-assembly | Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages. Use the `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages. | Large | #3.2 |
+| 3.4 | Per-Connection Rate Limiting | Integrate an asynchronous, token-bucket rate limiter into the `PushHandle`. The rate limit should be configurable on the `WireframeApp` builder and enforced on every push. | Medium | #2.2 |
+| 3.5 | Dead Letter Queue (DLQ) | Implement the optional Dead Letter Queue mechanism. Allow a user to provide a DLQ channel sender during app setup; failed pushes (due to a full queue) can be routed there instead of being dropped. | Medium | #2.2 |
+| 3.6 | Context-Aware FragmentStrategy | Enhance the `FragmentStrategy` trait. `max_fragment_payload` and `encode_header` should receive a reference to the logical `Frame` being processed, allowing for more dynamic fragmentation rules. | Small | #1.4 |
*Focus: Finalizing the library with comprehensive instrumentation, advanced
testing, and high-quality documentation to ensure it is stable, debuggable, and
ready for a 1.0 release.*
-| Item | Name | Details | Size | Depends on |
-| ---- | ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- |
-| 4.1 | Pervasive tracing instrumentation | Instrument the entire library with `tracing`. Add `span!` calls for connection and request lifecycles and detailed `event!` calls for key state transitions (e.g., back-pressure applied, frame dropped, connection terminated). | Large | All |
-| 4.2 | Advanced Testing: Concurrency & Logic | Implement the advanced test suite. Use `loom` to verify the concurrency correctness of the `select!` loop and `PushHandle`. Use `proptest` for stateful property-based testing of complex protocol interactions (e.g., fragmentation and streaming). | Large | #3.3, #3.5 |
-| 4.3 | Advanced Testing: Performance | Implement the criterion benchmark suite. Create micro-benchmarks for individual components (e.g., `PushHandle` contention) and macro-benchmarks for end-to-end throughput and latency. | Medium | All |
-| 4.4 | Comprehensive User Guides | Write the official documentation for the new features. Create separate guides for "Duplex Messaging & Pushes", "Streaming Responses", and "Message Fragmentation". Each guide must include runnable examples and explain the relevant concepts and APIs. | Large | All |
-| 4.5 | High-Quality Examples | Create at least two complete, high-quality examples demonstrating real-world use cases. These should include server-initiated MySQL packets (e.g., LOCAL INFILE and session-state trackers) and a push-driven protocol such as WebSocket heart-beats or MQTT broker fan-out. | Medium | All |
-| 4.6 | Changelog & 1.0 Release | Finalise the `CHANGELOG.md` with a comprehensive summary of all new features, enhancements, and breaking changes. Tag and publish the 1.0.0 release. | Small | All |
+| Item | Name | Details | Size | Depends on |
+| ------ | ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- |
+| ------ | ---------- |
+| ------ | ---------- |
+| 4.1 | Pervasive tracing instrumentation | Instrument the entire library with `tracing`. Add `span!` calls for connection and request lifecycles and detailed `event!` calls for key state transitions (e.g., back-pressure applied, frame dropped, connection terminated). | Large | All |
+| 4.2 | Advanced Testing: Concurrency & Logic | Implement the advanced test suite. Use `loom` to verify the concurrency correctness of the `select!` loop and `PushHandle`. Use `proptest` for stateful property-based testing of complex protocol interactions (e.g., fragmentation and streaming). | Large | #3.3, #3.5 |
+| 4.3 | Advanced Testing: Performance | Implement the criterion benchmark suite. Create micro-benchmarks for individual components (e.g., `PushHandle` contention) and macro-benchmarks for end-to-end throughput and latency. | Medium | All |
+| 4.4 | Comprehensive User Guides | Write the official documentation for the new features. Create separate guides for "Duplex Messaging & Pushes", "Streaming Responses", and "Message Fragmentation". Each guide must include runnable examples and explain the relevant concepts and APIs. | Large | All |
+| 4.5 | High-Quality Examples | Create at least two complete, high-quality examples demonstrating real-world use cases. These should include server-initiated MySQL packets (e.g., LOCAL INFILE and session-state trackers) and a push-driven protocol such as WebSocket heart-beats or MQTT broker fan-out. | Medium | All |
+| 4.6 | Changelog & 1.0 Release | Finalise the `CHANGELOG.md` with a comprehensive summary of all new features, enhancements, and breaking changes. Tag and publish the 1.0.0 release. | Small | All |
diff --git a/src/app.rs b/src/app.rs
index 0c31612e..4dfabffe 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -14,7 +14,10 @@ use std::{
};
use bytes::BytesMut;
-use tokio::io::{self, AsyncWrite, AsyncWriteExt};
+use tokio::{
+ io::{self, AsyncWrite, AsyncWriteExt},
+ sync::mpsc,
+};
use crate::{
frame::{FrameProcessor, LengthFormat, LengthPrefixedProcessor},
@@ -82,6 +85,7 @@ pub struct WireframeApp<
on_connect: Option>>,
on_disconnect: Option>>,
protocol: Option, ProtocolError = ()>>>,
+ push_dlq: Option>>,
}
/// Alias for asynchronous route handlers.
@@ -238,6 +242,7 @@ where
on_connect: None,
on_disconnect: None,
protocol: None,
+ push_dlq: None,
}
}
}
@@ -357,6 +362,7 @@ where
on_connect: Some(Arc::new(move || Box::pin(f()))),
on_disconnect: None,
protocol: self.protocol,
+ push_dlq: self.push_dlq,
})
}
@@ -392,6 +398,25 @@ where
self
}
+ /// Configure a Dead Letter Queue for dropped push frames.
+ ///
+ /// ```rust,no_run
+ /// use tokio::sync::mpsc;
+ /// use wireframe::app::WireframeApp;
+ ///
+ /// # fn build() -> WireframeApp { WireframeApp::new().unwrap() }
+ /// # fn main() {
+ /// let (tx, _rx) = mpsc::channel(16);
+ /// let app = build().with_push_dlq(tx);
+ /// # let _ = app;
+ /// # }
+ /// ```
+ #[must_use]
+ pub fn with_push_dlq(mut self, dlq: mpsc::Sender>) -> Self {
+ self.push_dlq = Some(dlq);
+ self
+ }
+
/// Get a clone of the configured protocol, if any.
///
/// Returns `None` if no protocol was installed via [`with_protocol`](Self::with_protocol).
@@ -439,6 +464,7 @@ where
on_connect: self.on_connect,
on_disconnect: self.on_disconnect,
protocol: self.protocol,
+ push_dlq: self.push_dlq,
}
}
diff --git a/src/push.rs b/src/push.rs
index b7ce37d1..4bdbbbc3 100644
--- a/src/push.rs
+++ b/src/push.rs
@@ -65,10 +65,30 @@ impl std::fmt::Display for PushError {
impl std::error::Error for PushError {}
+/// Errors returned when creating push queues.
+#[derive(Debug)]
+pub enum PushConfigError {
+ /// The provided rate was zero or exceeded [`MAX_PUSH_RATE`].
+ InvalidRate(usize),
+}
+
+impl std::fmt::Display for PushConfigError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::InvalidRate(r) => {
+ write!(f, "invalid rate {r}; must be between 1 and {MAX_PUSH_RATE}")
+ }
+ }
+ }
+}
+
+impl std::error::Error for PushConfigError {}
+
pub(crate) struct PushHandleInner {
high_prio_tx: mpsc::Sender,
low_prio_tx: mpsc::Sender,
limiter: Option,
+ dlq_tx: Option>,
}
/// Cloneable handle used by producers to push frames to a connection.
@@ -145,26 +165,49 @@ impl PushHandle {
self.push_with_priority(frame, PushPriority::Low).await
}
+ /// Send a frame to the configured dead letter queue if available.
+ fn route_to_dlq(&self, frame: F) {
+ if let Some(dlq) = &self.0.dlq_tx {
+ match dlq.try_send(frame) {
+ Ok(()) => {}
+ Err(mpsc::error::TrySendError::Full(_)) => {
+ log::error!("push queue and DLQ full; frame lost");
+ }
+ Err(mpsc::error::TrySendError::Closed(_)) => {
+ log::error!("DLQ closed; frame lost");
+ }
+ }
+ }
+ }
+
/// Attempt to push a frame with the given priority and policy.
///
/// # Errors
///
/// Returns [`PushError::QueueFull`] if the queue is full and the policy is
/// [`PushPolicy::ReturnErrorIfFull`]. Returns [`PushError::Closed`] if the
- /// receiving end has been dropped.
+ /// receiving end has been dropped. When [`PushPolicy::DropIfFull`] or
+ /// [`PushPolicy::WarnAndDropIfFull`] is used, a configured dead letter queue
+ /// receives the dropped frame.
///
/// # Examples
///
/// ```rust,no_run
+ /// use tokio::sync::mpsc;
/// use wireframe::push::{PushError, PushPolicy, PushPriority, PushQueues};
///
/// #[tokio::test]
/// async fn example() {
- /// let (mut queues, handle) = PushQueues::bounded(1, 1);
+ /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1);
+ /// let (mut queues, handle) =
+ /// PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap();
/// handle.push_high_priority(1u8).await.unwrap();
///
- /// let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull);
- /// assert!(matches!(result, Err(PushError::QueueFull)));
+ /// handle
+ /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull)
+ /// .unwrap();
+ ///
+ /// assert_eq!(dlq_rx.recv().await.unwrap(), 2);
/// let _ = queues.recv().await;
/// }
/// ```
@@ -181,11 +224,13 @@ impl PushHandle {
match tx.try_send(frame) {
Ok(()) => Ok(()),
- Err(mpsc::error::TrySendError::Full(_f)) => match policy {
+ Err(mpsc::error::TrySendError::Full(f)) => match policy {
PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull),
- PushPolicy::DropIfFull => Ok(()),
- PushPolicy::WarnAndDropIfFull => {
- log::warn!("push queue full; dropping {priority:?} priority frame");
+ PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => {
+ if matches!(policy, PushPolicy::WarnAndDropIfFull) {
+ log::warn!("push queue full; dropping {priority:?} priority frame");
+ }
+ self.route_to_dlq(f);
Ok(())
}
},
@@ -221,9 +266,14 @@ impl PushQueues {
/// assert_eq!(frame, 7);
/// }
/// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if an internal invariant is violated. This should never occur.
#[must_use]
pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle) {
- Self::bounded_with_rate(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE))
+ Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None)
+ .expect("DEFAULT_PUSH_RATE is always valid")
}
/// Create queues with no rate limiting.
@@ -236,12 +286,16 @@ impl PushQueues {
/// let (_queues, handle) = PushQueues::::bounded_no_rate_limit(1, 1);
/// let _ = handle;
/// ```
+ ///
+ /// # Panics
+ ///
+ /// Panics if an internal invariant is violated. This should never occur.
#[must_use]
pub fn bounded_no_rate_limit(
high_capacity: usize,
low_capacity: usize,
) -> (Self, PushHandle) {
- Self::bounded_with_rate(high_capacity, low_capacity, None)
+ Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None).unwrap()
}
/// Create queues with a custom rate limit in pushes per second.
@@ -250,9 +304,10 @@ impl PushQueues {
/// per second across all producers for the returned [`PushHandle`].
/// Pass `None` to disable rate limiting entirely.
///
- /// # Panics
+ /// # Errors
///
- /// Panics if `rate` is zero or greater than [`MAX_PUSH_RATE`].
+ /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater
+ /// than [`MAX_PUSH_RATE`].
///
/// # Examples
///
@@ -261,24 +316,62 @@ impl PushQueues {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut queues, handle) = PushQueues::::bounded_with_rate(1, 1, Some(10));
+ /// let (mut queues, handle) = PushQueues::::bounded_with_rate(1, 1, Some(10)).unwrap();
/// handle.push_low_priority(1u8).await.unwrap();
/// let (_prio, frame) = queues.recv().await.unwrap();
/// assert_eq!(frame, 1);
/// }
/// ```
- #[must_use]
pub fn bounded_with_rate(
high_capacity: usize,
low_capacity: usize,
rate: Option,
- ) -> (Self, PushHandle) {
- if let Some(r) = rate {
- assert!(r > 0, "rate must be greater than zero, got {r}");
- assert!(
- r <= MAX_PUSH_RATE,
- "rate must be <= {MAX_PUSH_RATE}, got {r}"
- );
+ ) -> Result<(Self, PushHandle), PushConfigError> {
+ Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None)
+ }
+
+ /// Create queues with a custom rate limit and optional dead letter queue.
+ ///
+ /// Frames that would be dropped by [`try_push`](PushHandle::try_push) when
+ /// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`]
+ /// are routed to `dlq` if provided.
+ ///
+ /// # Errors
+ ///
+ /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater
+ /// than [`MAX_PUSH_RATE`].
+ ///
+ /// # Examples
+ ///
+ /// ```rust,no_run
+ /// use tokio::sync::mpsc;
+ /// use wireframe::push::{PushPolicy, PushPriority, PushQueues};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1);
+ /// let (mut queues, handle) =
+ /// PushQueues::::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap();
+ /// handle.push_high_priority(1u8).await.unwrap();
+ /// handle
+ /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull)
+ /// .unwrap();
+ ///
+ /// let (_, val) = queues.recv().await.unwrap();
+ /// assert_eq!(val, 1);
+ /// assert_eq!(dlq_rx.recv().await.unwrap(), 2);
+ /// }
+ /// ```
+ pub fn bounded_with_rate_dlq(
+ high_capacity: usize,
+ low_capacity: usize,
+ rate: Option,
+ dlq: Option>,
+ ) -> Result<(Self, PushHandle), PushConfigError> {
+ if let Some(r) = rate
+ && (r == 0 || r > MAX_PUSH_RATE)
+ {
+ return Err(PushConfigError::InvalidRate(r));
}
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
@@ -294,14 +387,15 @@ impl PushQueues {
high_prio_tx: high_tx,
low_prio_tx: low_tx,
limiter,
+ dlq_tx: dlq,
};
- (
+ Ok((
Self {
high_priority_rx: high_rx,
low_priority_rx: low_rx,
},
PushHandle(Arc::new(inner)),
- )
+ ))
}
/// Receive the next frame, preferring high priority frames when available.
diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs
index 2ed5ab16..ec8438bb 100644
--- a/tests/connection_actor.rs
+++ b/tests/connection_actor.rs
@@ -354,49 +354,12 @@ async fn push_queue_exhaustion_backpressure() {
use std::sync::{
Arc,
- Mutex,
- OnceLock,
atomic::{AtomicUsize, Ordering},
};
-use logtest::Logger;
use serial_test::serial;
use wireframe::{ConnectionContext, ProtocolHooks};
-
-/// Handle to the global logger with exclusive access.
-struct LoggerHandle {
- guard: std::sync::MutexGuard<'static, Logger>,
-}
-
-impl LoggerHandle {
- fn new() -> Self {
- static LOGGER: OnceLock> = OnceLock::new();
-
- let logger = LOGGER.get_or_init(|| Mutex::new(Logger::start()));
- let guard = logger
- .lock()
- .expect("failed to acquire global logger lock; a previous test may still hold it");
-
- Self { guard }
- }
-}
-
-impl std::ops::Deref for LoggerHandle {
- type Target = Logger;
-
- fn deref(&self) -> &Self::Target { &self.guard }
-}
-
-impl std::ops::DerefMut for LoggerHandle {
- fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard }
-}
-
-#[allow(
- unused_braces,
- reason = "rustc false positive for single line rstest fixtures"
-)]
-#[fixture]
-fn logger() -> LoggerHandle { LoggerHandle::new() }
+use wireframe_testing::{LoggerHandle, logger};
#[rstest]
#[tokio::test]
diff --git a/tests/push.rs b/tests/push.rs
index 5a6fdb8f..05fe000e 100644
--- a/tests/push.rs
+++ b/tests/push.rs
@@ -56,7 +56,7 @@ async fn push_queues_error_on_closed() {
#[tokio::test]
async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) {
time::pause();
- let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1));
+ let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap();
match priority {
PushPriority::High => handle.push_high_priority(1u8).await.unwrap(),
@@ -88,7 +88,7 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) {
#[tokio::test]
async fn rate_limiter_allows_after_wait() {
time::pause();
- let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1));
+ let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap();
handle.push_high_priority(1u8).await.unwrap();
time::advance(Duration::from_secs(1)).await;
handle.push_high_priority(2u8).await.unwrap();
@@ -101,7 +101,7 @@ async fn rate_limiter_allows_after_wait() {
#[tokio::test]
async fn rate_limiter_shared_across_priorities() {
time::pause();
- let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1));
+ let (mut queues, handle) = PushQueues::bounded_with_rate(2, 2, Some(1)).unwrap();
handle.push_high_priority(1u8).await.unwrap();
let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await;
@@ -134,7 +134,7 @@ async fn unlimited_queues_do_not_block() {
#[tokio::test]
async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() {
time::pause();
- let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3));
+ let (mut queues, handle) = PushQueues::bounded_with_rate(4, 4, Some(3)).unwrap();
for i in 0u8..3 {
handle.push_high_priority(i).await.unwrap();
diff --git a/tests/push_policies.rs b/tests/push_policies.rs
index 3b9114ad..a5e334ed 100644
--- a/tests/push_policies.rs
+++ b/tests/push_policies.rs
@@ -1,49 +1,15 @@
//! Tests for push queue policy behaviour.
-use std::sync::{Mutex, OnceLock};
-
-use logtest::Logger;
+use futures::future::BoxFuture;
use rstest::{fixture, rstest};
+use serial_test::serial;
use tokio::{
runtime::Runtime,
+ sync::mpsc,
time::{Duration, timeout},
};
use wireframe::push::{PushPolicy, PushPriority, PushQueues};
-
-/// Handle to the global logger with exclusive access.
-struct LoggerHandle {
- guard: std::sync::MutexGuard<'static, Logger>,
-}
-
-impl LoggerHandle {
- fn new() -> Self {
- static LOGGER: OnceLock> = OnceLock::new();
-
- let logger = LOGGER.get_or_init(|| Mutex::new(Logger::start()));
- let guard = logger
- .lock()
- .expect("failed to acquire global logger lock; a previous test may still hold it");
-
- Self { guard }
- }
-}
-
-impl std::ops::Deref for LoggerHandle {
- type Target = Logger;
-
- fn deref(&self) -> &Self::Target { &self.guard }
-}
-
-impl std::ops::DerefMut for LoggerHandle {
- fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard }
-}
-
-#[allow(
- unused_braces,
- reason = "rustc false positive for single line rstest fixtures"
-)]
-#[fixture]
-fn logger() -> LoggerHandle { LoggerHandle::new() }
+use wireframe_testing::{LoggerHandle, logger};
#[allow(
unused_braces,
@@ -58,13 +24,23 @@ fn rt() -> Runtime {
}
#[rstest]
-fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) {
+#[case::drop_if_full(PushPolicy::DropIfFull, false, "push queue full")]
+#[case::warn_and_drop(PushPolicy::WarnAndDropIfFull, true, "push queue full")]
+#[serial(push_policies)]
+fn push_policy_behaviour(
+ rt: Runtime,
+ mut logger: LoggerHandle,
+ #[case] policy: PushPolicy,
+ #[case] expect_warning: bool,
+ #[case] expected_msg: &str,
+) {
rt.block_on(async {
+ while logger.pop().is_some() {}
let (mut queues, handle) = PushQueues::bounded(1, 1);
+
handle.push_high_priority(1u8).await.unwrap();
- handle
- .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull)
- .unwrap();
+ handle.try_push(2u8, PushPriority::High, policy).unwrap();
+
let (_, val) = queues.recv().await.unwrap();
assert_eq!(val, 1);
assert!(
@@ -73,29 +49,95 @@ fn drop_if_full_discards_frame(rt: Runtime, mut logger: LoggerHandle) {
.is_err()
);
- assert!(logger.pop().is_none());
+ let mut found_warning = false;
+ while let Some(record) = logger.pop() {
+ if record.level() == log::Level::Warn && record.args().contains(expected_msg) {
+ found_warning = true;
+ }
+ }
+
+ if expect_warning {
+ assert!(found_warning, "warning log not found");
+ } else {
+ assert!(!found_warning, "unexpected warning log found");
+ }
});
}
#[rstest]
-fn warn_and_drop_if_full_logs_warning(rt: Runtime, mut logger: LoggerHandle) {
+fn dropped_frame_goes_to_dlq(rt: Runtime) {
rt.block_on(async {
- let (mut queues, handle) = PushQueues::bounded(1, 1);
- handle.push_low_priority(3u8).await.unwrap();
+ let (dlq_tx, mut dlq_rx) = mpsc::channel(1);
+ let (mut queues, handle) =
+ PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap();
+
+ handle.push_high_priority(1u8).await.unwrap();
handle
- .try_push(4u8, PushPriority::Low, PushPolicy::WarnAndDropIfFull)
+ .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull)
.unwrap();
+
let (_, val) = queues.recv().await.unwrap();
- assert_eq!(val, 3);
- assert!(
- timeout(Duration::from_millis(20), queues.recv())
- .await
- .is_err()
- );
+ assert_eq!(val, 1);
+ assert_eq!(dlq_rx.recv().await.unwrap(), 2);
+ });
+}
+
+fn setup_dlq_full(tx: &mpsc::Sender, _rx: &mut Option>) {
+ tx.try_send(99).unwrap();
+}
+
+fn setup_dlq_closed(_: &mpsc::Sender, rx: &mut Option>) { drop(rx.take()); }
+
+fn assert_dlq_full(rx: &mut Option>) -> BoxFuture<'_, ()> {
+ Box::pin(async move {
+ let receiver = rx.as_mut().expect("receiver missing");
+ assert_eq!(receiver.recv().await.unwrap(), 99);
+ assert!(receiver.try_recv().is_err());
+ })
+}
+
+fn assert_dlq_closed(_: &mut Option>) -> BoxFuture<'_, ()> { Box::pin(async {}) }
+
+#[rstest]
+#[case::dlq_full(setup_dlq_full, PushPolicy::WarnAndDropIfFull, "DLQ", assert_dlq_full)]
+#[case::dlq_closed(setup_dlq_closed, PushPolicy::DropIfFull, "closed", assert_dlq_closed)]
+#[serial(push_policies)]
+fn dlq_error_scenarios(
+ rt: Runtime,
+ mut logger: LoggerHandle,
+ #[case] setup: Setup,
+ #[case] policy: PushPolicy,
+ #[case] expected: &str,
+ #[case] assertion: AssertFn,
+) where
+ Setup: FnOnce(&mpsc::Sender, &mut Option>),
+ AssertFn: FnOnce(&mut Option>) -> BoxFuture<'_, ()>,
+{
+ rt.block_on(async {
+ while logger.pop().is_some() {}
+
+ let (dlq_tx, dlq_rx) = mpsc::channel(1);
+ let mut dlq_rx = Some(dlq_rx);
+ setup(&dlq_tx, &mut dlq_rx);
+ let (mut queues, handle) =
+ PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap();
+
+ handle.push_high_priority(1u8).await.unwrap();
+ handle.try_push(2u8, PushPriority::High, policy).unwrap();
+
+ let (_, val) = queues.recv().await.unwrap();
+ assert_eq!(val, 1);
+
+ assertion(&mut dlq_rx).await;
- let record = logger.pop().expect("expected warning");
- assert_eq!(record.level(), log::Level::Warn);
- assert!(record.args().contains("push queue full"));
- assert!(logger.pop().is_none());
+ let mut found_error = false;
+ while let Some(record) = logger.pop() {
+ if record.level() == log::Level::Error {
+ assert!(record.args().contains(expected));
+ found_error = true;
+ break;
+ }
+ }
+ assert!(found_error, "error log not found");
});
}
diff --git a/wireframe_testing/Cargo.toml b/wireframe_testing/Cargo.toml
index 062b4a05..5d70db84 100644
--- a/wireframe_testing/Cargo.toml
+++ b/wireframe_testing/Cargo.toml
@@ -9,3 +9,4 @@ wireframe = { path = ".." }
bincode = "^2.0"
bytes = "^1.0"
rstest = "0.18.2"
+logtest = "2"
diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs
index 183bbf12..11bf893d 100644
--- a/wireframe_testing/src/lib.rs
+++ b/wireframe_testing/src/lib.rs
@@ -14,6 +14,7 @@
//! ```
pub mod helpers;
+pub mod logging;
pub use helpers::{
TestSerializer,
@@ -31,3 +32,4 @@ pub use helpers::{
run_app_with_frames_with_capacity,
run_with_duplex_server,
};
+pub use logging::{LoggerHandle, logger};
diff --git a/wireframe_testing/src/logging.rs b/wireframe_testing/src/logging.rs
new file mode 100644
index 00000000..2e1193d0
--- /dev/null
+++ b/wireframe_testing/src/logging.rs
@@ -0,0 +1,46 @@
+//! Logging utilities for test infrastructure.
+//!
+//! This module provides a global, thread-safe logger handle for capturing and
+//! inspecting log output during tests. The [`LoggerHandle`] ensures exclusive
+//! access to prevent interference between concurrent tests.
+
+use std::sync::{Mutex, MutexGuard, OnceLock};
+
+use logtest::Logger;
+use rstest::fixture;
+/// Handle to the global logger with exclusive access.
+///
+/// This guard ensures tests do not interfere with each other's log capture by
+/// serialising access to a [`logtest::Logger`].
+pub struct LoggerHandle {
+ guard: MutexGuard<'static, Logger>,
+}
+
+impl LoggerHandle {
+ /// Acquire the global [`Logger`] instance.
+ pub fn new() -> Self {
+ static LOGGER: OnceLock> = OnceLock::new();
+
+ let logger = LOGGER.get_or_init(|| Mutex::new(Logger::start()));
+ let guard = logger.lock().expect("logger poisoned");
+
+ Self { guard }
+ }
+}
+
+impl std::ops::Deref for LoggerHandle {
+ type Target = Logger;
+
+ fn deref(&self) -> &Self::Target { &self.guard }
+}
+
+impl std::ops::DerefMut for LoggerHandle {
+ fn deref_mut(&mut self) -> &mut Self::Target { &mut self.guard }
+}
+
+#[allow(
+ unused_braces,
+ reason = "rustc false positive for single line rstest fixtures"
+)]
+#[fixture]
+pub fn logger() -> LoggerHandle { LoggerHandle::new() }