diff --git a/Cargo.lock b/Cargo.lock index af9d432b..282d6fcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -943,10 +943,23 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.34" @@ -1281,6 +1294,7 @@ dependencies = [ "serial_test", "tokio", "tokio-util", + "tracing", "wireframe_testing", ] diff --git a/Cargo.toml b/Cargo.toml index 4d10ebb0..181e5c76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1" log = "0.4" dashmap = "5" leaky-bucket = "1.1" +tracing = { version = ">=0.1.40, <0.2.0", features = ["log", "log-always"] } [dev-dependencies] rstest = "0.18.2" diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 16a730db..e0866dc2 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -289,6 +289,28 @@ crate throughout its core. - `wireframe_reassembly_errors_total` (Counter) +```mermaid +sequenceDiagram + participant Client + participant Connection + participant Tracing + + Client->>Connection: Open connection + Connection->>Tracing: info!(wireframe_active_connections, "connection opened") + Note right of Connection: ACTIVE_CONNECTIONS += 1 + + Client->>Connection: Start run() + Connection->>Tracing: info_span!("connection_actor") + + alt Shutdown before start + Connection->>Tracing: info!("connection aborted before start") + Note right of Connection: ACTIVE_CONNECTIONS -= 1 + else Normal close + Connection->>Tracing: info!("connection closed") + Note right of Connection: ACTIVE_CONNECTIONS -= 1 + end +``` + ### D. A Comprehensive Quality Assurance Strategy To guarantee the correctness and stability of these new, complex features, @@ -320,11 +342,11 @@ components. -| Phase | Focus | Key Deliverables | -| 1. Foundational Mechanics | Implement the core, non-public machinery. | Internal actor loop with select!(biased!), dual-channel push plumbing, basic FragmentAdapter logic. | -| 2. Public APIs & Ergonomics | Expose functionality to users in a clean, idiomatic way. | Fluent WireframeApp builder, WireframeProtocol trait, enhanced Response enum, FragmentStrategy trait, SessionRegistry with Weak references. | -| 3. Production Hardening | Add features for resilience and security. | CancellationToken-based graceful shutdown, re-assembly timeouts, per-connection rate limiting, optional Dead Letter Queue. | -| 4. Maturity and Polish | Focus on observability, advanced testing, and documentation. | Full tracing instrumentation, criterion benchmarks, loom and proptest test suites, comprehensive user guides and API documentation. | +| Phase | Focus | Key Deliverables | +| 1. Foundational Mechanics | Implement the core, non-public machinery. | Internal actor loop with select!(biased!), dual-channel push plumbing, basic FragmentAdapter logic. | +| 2. Public APIs & Ergonomics | Expose functionality to users in a clean, idiomatic way. | Fluent WireframeApp builder, WireframeProtocol trait, enhanced Response enum, FragmentStrategy trait, SessionRegistry with Weak references. | +| 3. Production Hardening | Add features for resilience and security. | CancellationToken-based graceful shutdown, re-assembly timeouts, per-connection rate limiting, optional Dead Letter Queue. | +| 4. Maturity and Polish | Focus on observability, advanced testing, and documentation. | Full tracing instrumentation, criterion benchmarks, loom and proptest test suites, comprehensive user guides and API documentation. | diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index 66c1fce9..d849d480 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -5,9 +5,9 @@ This document provides a granular, task-oriented development roadmap for the features and capabilities outlined in "The road to Wireframe 1.0 - Feature-set, Philosophy and Capability Maturity". It is intended to guide implementation by -breaking the project down into four distinct phases, each with a set of -well-defined tasks. The dependencies between tasks are explicitly noted to -ensure a logical and stable development progression. +breaking the project down into four distinct phases, each with a set of well- +defined tasks. The dependencies between tasks are explicitly noted to ensure a +logical and stable development progression. At the time of writing, the push queue utilities and the connection actor with its biased `select!` write loop are implemented. The first phase therefore @@ -20,16 +20,14 @@ public consumption. message processing. This phase establishes the internal architecture upon which all public-facing features will be built.* -| Item | Name | Details | Size | Depends | -| ------ | ------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- | -| ------ | ------- | -| ------ | ------- | -| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | - | -| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | - | -| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; ...)` loop that polls for shutdown signals, high/low priority pushes and the handler response stream in that strict order. | Large | #1.2 | -| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - | -| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 | -| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | +| Item | Name | Details | Size | Depends | +| ---- | ------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ------- | +| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | - | +| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | - | +| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; ...)` loop that polls for shutdown signals, high/low priority pushes and the handler response stream in that strict order. | Large | #1.2 | +| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - | +| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 | +| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | ## Phase 2: Public APIs & Developer Ergonomics @@ -37,16 +35,14 @@ all public-facing features will be built.* and idiomatic API. This phase is about making the powerful new mechanics usable and intuitive.* -| Item | Name | Details | Size | Depends on | -| ------ | --------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------------- | -| ------ | ---------------- | -| ------ | ---------------- | -| 2.1 | WireframeProtocol Trait & Builder | Define the cohesive `WireframeProtocol` trait to encapsulate all protocol-specific logic. Refactor the `WireframeApp` builder to use a fluent `.with_protocol(MyProtocol)` method instead of multiple closures. | Medium | #1.6 | -| 2.2 | Public PushHandle API | Implement the public `PushHandle` struct with its `push`, `try_push` and policy-based `push_with_policy` methods. This handle interacts with the dual-channel system from #1.2. | Medium | #1.2 | -| 2.3 | Leak-Proof SessionRegistry | Implement the `SessionRegistry` for discovering connection handles. This must use `dashmap` with `Weak` pointers to prevent memory leaks from terminated connections. | Medium | #2.2 | -| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and document `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 | -| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 | -| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 | +| Item | Name | Details | Size | Depends on | +| ---- | --------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------------- | +| 2.1 | WireframeProtocol Trait & Builder | Define the cohesive `WireframeProtocol` trait to encapsulate all protocol-specific logic. Refactor the `WireframeApp` builder to use a fluent `.with_protocol(MyProtocol)` method instead of multiple closures. | Medium | #1.6 | +| 2.2 | Public PushHandle API | Implement the public `PushHandle` struct with its `push`, `try_push` and policy-based `push_with_policy` methods. This handle interacts with the dual-channel system from #1.2. | Medium | #1.2 | +| 2.3 | Leak-Proof SessionRegistry | Implement the `SessionRegistry` for discovering connection handles. This must use `dashmap` with `Weak` pointers to prevent memory leaks from terminated connections. | Medium | #2.2 | +| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and document `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 | +| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 | +| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 | ## Phase 3: Production Hardening & Resilience @@ -54,28 +50,24 @@ and intuitive.* operation in a production environment. This phase moves the library from "functional" to "resilient".* -| Item | Name | Details | Size | Depends on | -| ------ | ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | -| ------ | ---------- | -| ------ | ------- | -| 3.1 | Graceful Shutdown | Implement the server-wide graceful shutdown pattern. Use `tokio_util::sync::CancellationToken` for signalling and `tokio_util::task::TaskTracker` to ensure all connection actors terminate cleanly. | Large | #1.3 | -| 3.2 | Re-assembly DoS Protection | Harden the `FragmentAdapter` by adding a non-optional, configurable timeout for partial message re-assembly and strictly enforcing the `max_message_size` limit to prevent memory exhaustion. | Medium | #1.5 | -| 3.3 | Multiplexed Re-assembly | Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages. Use the `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages. | Large | #3.2 | -| 3.4 | Per-Connection Rate Limiting | Integrate an asynchronous, token-bucket rate limiter into the `PushHandle`. The rate limit should be configurable on the `WireframeApp` builder and enforced on every push. | Medium | #2.2 | -| 3.5 | Dead Letter Queue (DLQ) | Implement the optional Dead Letter Queue mechanism. Allow a user to provide a DLQ channel sender during app setup; failed pushes (due to a full queue) can be routed there instead of being dropped. | Medium | #2.2 | -| 3.6 | Context-Aware FragmentStrategy | Enhance the `FragmentStrategy` trait. `max_fragment_payload` and `encode_header` should receive a reference to the logical `Frame` being processed, allowing for more dynamic fragmentation rules. | Small | #1.4 | +| Item | Name | Details | Size | Depends on | +| ---- | ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | +| 3.1 | Graceful Shutdown | Implement the server-wide graceful shutdown pattern. Use `tokio_util::sync::CancellationToken` for signalling and `tokio_util::task::TaskTracker` to ensure all connection actors terminate cleanly. | Large | #1.3 | +| 3.2 | Re-assembly DoS Protection | Harden the `FragmentAdapter` by adding a non-optional, configurable timeout for partial message re-assembly and strictly enforcing the `max_message_size` limit to prevent memory exhaustion. | Medium | #1.5 | +| 3.3 | Multiplexed Re-assembly | Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages. Use the `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages. | Large | #3.2 | +| 3.4 | Per-Connection Rate Limiting | Integrate an asynchronous, token-bucket rate limiter into the `PushHandle`. The rate limit should be configurable on the `WireframeApp` builder and enforced on every push. | Medium | #2.2 | +| 3.5 | Dead Letter Queue (DLQ) | Implement the optional Dead Letter Queue mechanism. Allow a user to provide a DLQ channel sender during app setup; failed pushes (due to a full queue) can be routed there instead of being dropped. | Medium | #2.2 | +| 3.6 | Context-Aware FragmentStrategy | Enhance the `FragmentStrategy` trait. `max_fragment_payload` and `encode_header` should receive a reference to the logical `Frame` being processed, allowing for more dynamic fragmentation rules. | Small | #1.4 | *Focus: Finalizing the library with comprehensive instrumentation, advanced testing, and high-quality documentation to ensure it is stable, debuggable, and ready for a 1.0 release.* -| Item | Name | Details | Size | Depends on | -| ------ | ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | -| ------ | ---------- | -| ------ | ---------- | -| 4.1 | Pervasive tracing instrumentation | Instrument the entire library with `tracing`. Add `span!` calls for connection and request lifecycles and detailed `event!` calls for key state transitions (e.g., back-pressure applied, frame dropped, connection terminated). | Large | All | -| 4.2 | Advanced Testing: Concurrency & Logic | Implement the advanced test suite. Use `loom` to verify the concurrency correctness of the `select!` loop and `PushHandle`. Use `proptest` for stateful property-based testing of complex protocol interactions (e.g., fragmentation and streaming). | Large | #3.3, #3.5 | -| 4.3 | Advanced Testing: Performance | Implement the criterion benchmark suite. Create micro-benchmarks for individual components (e.g., `PushHandle` contention) and macro-benchmarks for end-to-end throughput and latency. | Medium | All | -| 4.4 | Comprehensive User Guides | Write the official documentation for the new features. Create separate guides for "Duplex Messaging & Pushes", "Streaming Responses", and "Message Fragmentation". Each guide must include runnable examples and explain the relevant concepts and APIs. | Large | All | -| 4.5 | High-Quality Examples | Create at least two complete, high-quality examples demonstrating real-world use cases. These should include server-initiated MySQL packets (e.g., LOCAL INFILE and session-state trackers) and a push-driven protocol such as WebSocket heart-beats or MQTT broker fan-out. | Medium | All | -| 4.6 | Changelog & 1.0 Release | Finalise the `CHANGELOG.md` with a comprehensive summary of all new features, enhancements, and breaking changes. Tag and publish the 1.0.0 release. | Small | All | +| Item | Name | Details | Size | Depends on | +| ---- | ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | +| 4.1 | Pervasive tracing instrumentation | Instrument the entire library with `tracing`. Add `span!` calls for connection and request lifecycles and detailed `event!` calls for key state transitions (e.g., back-pressure applied, frame dropped, connection terminated). | Large | All | +| 4.2 | Advanced Testing: Concurrency & Logic | Implement the advanced test suite. Use `loom` to verify the concurrency correctness of the `select!` loop and `PushHandle`. Use `proptest` for stateful property-based testing of complex protocol interactions (e.g., fragmentation and streaming). | Large | #3.3, #3.5 | +| 4.3 | Advanced Testing: Performance | Implement the criterion benchmark suite. Create micro-benchmarks for individual components (e.g., `PushHandle` contention) and macro-benchmarks for end-to-end throughput and latency. | Medium | All | +| 4.4 | Comprehensive User Guides | Write the official documentation for the new features. Create separate guides for "Duplex Messaging & Pushes", "Streaming Responses", and "Message Fragmentation". Each guide must include runnable examples and explain the relevant concepts and APIs. | Large | All | +| 4.5 | High-Quality Examples | Create at least two complete, high-quality examples demonstrating real-world use cases. These should include server-initiated MySQL packets (e.g., LOCAL INFILE and session-state trackers) and a push-driven protocol such as WebSocket heart-beats or MQTT broker fan-out. | Medium | All | +| 4.6 | Changelog & 1.0 Release | Finalise the `CHANGELOG.md` with a comprehensive summary of all new features, enhancements, and breaking changes. Tag and publish the 1.0.0 release. | Small | All | diff --git a/src/app.rs b/src/app.rs index 4dfabffe..063d3b39 100644 --- a/src/app.rs +++ b/src/app.rs @@ -531,7 +531,7 @@ where let routes = self.build_chains().await; if let Err(e) = self.process_stream(&mut stream, &routes).await { - log::warn!("connection terminated with error: {e}"); + tracing::warn!(error = ?e, "connection terminated with error"); } if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { @@ -658,7 +658,7 @@ where } Err(e) => { *deser_failures += 1; - log::warn!("failed to deserialize message: {e}"); + tracing::warn!(error = ?e, "failed to deserialize message"); if *deser_failures >= MAX_DESER_FAILURES { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -678,15 +678,15 @@ where msg: resp.into_inner(), }; if let Err(e) = self.send_response(stream, &response).await { - log::warn!("failed to send response: {e}"); + tracing::warn!(error = %e, "failed to send response"); } } Err(e) => { - log::warn!("handler error for id {}: {e}", env.id); + tracing::warn!(id = env.id, error = ?e, "handler error"); } } } else { - log::warn!("no handler for message id {}", env.id); + tracing::warn!("no handler for message id {}", env.id); } Ok(()) diff --git a/src/connection.rs b/src/connection.rs index 3289a6f3..a6bdab70 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -5,7 +5,11 @@ //! `biased` keyword ensures high-priority messages are processed before //! low-priority ones, with streamed responses handled last. -use std::future::Future; +use std::{ + future::Future, + net::SocketAddr, + sync::atomic::{AtomicU64, Ordering}, +}; use futures::StreamExt; use tokio::{ @@ -13,11 +17,35 @@ use tokio::{ time::{Duration, Instant}, }; use tokio_util::sync::CancellationToken; +use tracing::{info, info_span, warn}; + +/// Global gauge tracking active connections. +static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0); + +/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and +/// decrementing it on drop. +struct ActiveConnection; + +impl ActiveConnection { + fn new() -> Self { + ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed); + Self + } +} + +impl Drop for ActiveConnection { + fn drop(&mut self) { ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); } +} + +/// Return the current number of active connections. +#[must_use] +pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) } use crate::{ hooks::{ConnectionContext, ProtocolHooks}, push::{FrameLike, PushHandle, PushQueues}, response::{FrameStream, WireframeError}, + session::ConnectionId, }; /// Events returned by [`next_event`]. @@ -75,11 +103,14 @@ pub struct ConnectionActor { low_rx: Option>, response: Option>, // current streaming response shutdown: CancellationToken, + counter: Option, hooks: ProtocolHooks, ctx: ConnectionContext, fairness: FairnessConfig, high_counter: usize, high_start: Option, + connection_id: Option, + peer_addr: Option, } impl ConnectionActor @@ -123,21 +154,33 @@ where handle: PushHandle, response: Option>, shutdown: CancellationToken, - mut hooks: ProtocolHooks, + hooks: ProtocolHooks, ) -> Self { - let mut ctx = ConnectionContext; - hooks.on_connection_setup(handle, &mut ctx); - Self { + let ctx = ConnectionContext; + let counter = ActiveConnection::new(); + let mut actor = Self { high_rx: Some(queues.high_priority_rx), low_rx: Some(queues.low_priority_rx), response, shutdown, + counter: Some(counter), hooks, ctx, fairness: FairnessConfig::default(), high_counter: 0, high_start: None, - } + connection_id: None, + peer_addr: None, + }; + let current = ACTIVE_CONNECTIONS.load(Ordering::Relaxed); + info!( + wireframe_active_connections = current, + id = ?actor.connection_id, + peer = ?actor.peer_addr, + "connection opened" + ); + actor.hooks.on_connection_setup(handle, &mut actor.ctx); + actor } /// Replace the fairness configuration. @@ -158,10 +201,18 @@ where /// /// Returns a [`WireframeError`] if the response stream yields an I/O error. pub async fn run(&mut self, out: &mut Vec) -> Result<(), WireframeError> { + let span = info_span!("connection_actor"); + let _enter = span.enter(); // If cancellation has already been requested, exit immediately. Nothing // will be drained and any streaming response is abandoned. This mirrors // a hard shutdown and is required for the tests. if self.shutdown.is_cancelled() { + info!( + id = ?self.connection_id, + peer = ?self.peer_addr, + "connection aborted before start" + ); + let _ = self.counter.take(); return Ok(()); } @@ -170,7 +221,12 @@ where while !state.is_done() { self.poll_sources(&mut state, out).await?; } - + info!( + id = ?self.connection_id, + peer = ?self.peer_addr, + "connection closed" + ); + let _ = self.counter.take(); Ok(()) } @@ -368,7 +424,7 @@ where out.push(frame); } Some(Err(WireframeError::Protocol(e))) => { - log::warn!("protocol error: {e:?}"); + warn!(error = ?e, "protocol error"); self.hooks.handle_error(e, &mut self.ctx); state.mark_closed(); self.hooks.on_command_end(&mut self.ctx); diff --git a/src/push.rs b/src/push.rs index 4bdbbbc3..01103745 100644 --- a/src/push.rs +++ b/src/push.rs @@ -13,6 +13,7 @@ use std::{ use leaky_bucket::RateLimiter; use tokio::sync::mpsc; +use tracing::{debug, error, warn}; /// Messages can be sent through a [`PushHandle`]. /// @@ -110,7 +111,9 @@ impl PushHandle { PushPriority::High => &self.0.high_prio_tx, PushPriority::Low => &self.0.low_prio_tx, }; - tx.send(frame).await.map_err(|_| PushError::Closed) + tx.send(frame).await.map_err(|_| PushError::Closed)?; + debug!(?priority, "frame pushed"); + Ok(()) } /// Push a high-priority frame subject to rate limiting. /// @@ -166,15 +169,18 @@ impl PushHandle { } /// Send a frame to the configured dead letter queue if available. - fn route_to_dlq(&self, frame: F) { + fn route_to_dlq(&self, frame: F) + where + F: std::fmt::Debug, + { if let Some(dlq) = &self.0.dlq_tx { match dlq.try_send(frame) { Ok(()) => {} - Err(mpsc::error::TrySendError::Full(_)) => { - log::error!("push queue and DLQ full; frame lost"); + Err(mpsc::error::TrySendError::Full(f)) => { + error!(?f, "push queue and DLQ full; frame lost"); } - Err(mpsc::error::TrySendError::Closed(_)) => { - log::error!("DLQ closed; frame lost"); + Err(mpsc::error::TrySendError::Closed(f)) => { + error!(?f, "DLQ closed; frame lost"); } } } @@ -216,7 +222,10 @@ impl PushHandle { frame: F, priority: PushPriority, policy: PushPolicy, - ) -> Result<(), PushError> { + ) -> Result<(), PushError> + where + F: std::fmt::Debug, + { let tx = match priority { PushPriority::High => &self.0.high_prio_tx, PushPriority::Low => &self.0.low_prio_tx, @@ -228,7 +237,12 @@ impl PushHandle { PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull), PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => { if matches!(policy, PushPolicy::WarnAndDropIfFull) { - log::warn!("push queue full; dropping {priority:?} priority frame"); + warn!( + ?priority, + ?policy, + dlq = self.0.dlq_tx.is_some(), + "push queue full" + ); } self.route_to_dlq(f); Ok(()) diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 34c795b8..c7a8bd64 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -39,6 +39,7 @@ fn empty_stream() -> Option> { None } #[rstest] #[tokio::test] +#[serial] async fn strict_priority_order( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -57,6 +58,7 @@ async fn strict_priority_order( #[rstest] #[tokio::test] +#[serial] async fn fairness_yields_low_after_burst( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -122,6 +124,7 @@ async fn queue_frames( // Ensure the helper correctly handles edge cases without queued frames. #[rstest] #[tokio::test] +#[serial] async fn queue_frames_empty_input(queues: (PushQueues, wireframe::push::PushHandle)) { let (_, handle) = queues; let priorities: &[Priority] = &[]; @@ -146,6 +149,7 @@ async fn queue_frames_empty_input(queues: (PushQueues, wireframe::push::Push Priority::High, ])] #[tokio::test] +#[serial] async fn processes_all_priorities_in_order( #[case] order: Vec, queues: (PushQueues, wireframe::push::PushHandle), @@ -170,6 +174,7 @@ async fn processes_all_priorities_in_order( #[rstest] #[tokio::test] +#[serial] async fn fairness_yields_low_with_time_slice( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -214,6 +219,7 @@ async fn fairness_yields_low_with_time_slice( #[rstest] #[tokio::test] +#[serial] async fn shutdown_signal_precedence( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -230,6 +236,7 @@ async fn shutdown_signal_precedence( #[rstest] #[tokio::test] +#[serial] async fn complete_draining_of_sources( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -253,6 +260,7 @@ enum TestError { #[rstest] #[tokio::test] +#[serial] async fn error_propagation_from_stream( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -301,13 +309,19 @@ async fn protocol_error_logs_warning( let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert!(out.is_empty()); - let record = logger.pop().expect("expected warning"); - assert_eq!(record.level(), log::Level::Warn); - assert!(record.args().contains("protocol error")); + let mut found = false; + while let Some(record) = logger.pop() { + if record.level() == log::Level::Warn && record.args().contains("protocol error") { + found = true; + break; + } + } + assert!(found, "warning log not found"); } #[rstest] #[tokio::test] +#[serial] async fn io_error_terminates_connection( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -327,6 +341,7 @@ async fn io_error_terminates_connection( #[rstest] #[tokio::test] +#[serial] async fn interleaved_shutdown_during_stream( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -355,6 +370,7 @@ async fn interleaved_shutdown_during_stream( #[rstest] #[tokio::test] +#[serial] async fn push_queue_exhaustion_backpressure() { let (mut queues, handle) = PushQueues::bounded(1, 1); handle.push_high_priority(1).await.unwrap(); @@ -377,6 +393,7 @@ use wireframe_testing::{LoggerHandle, logger}; #[rstest] #[tokio::test] +#[serial] async fn before_send_hook_modifies_frames( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -404,6 +421,7 @@ async fn before_send_hook_modifies_frames( #[rstest] #[tokio::test] +#[serial] async fn on_command_end_hook_runs( queues: (PushQueues, wireframe::push::PushHandle), shutdown_token: CancellationToken, @@ -434,6 +452,7 @@ async fn on_command_end_hook_runs( #[rstest] #[tokio::test] +#[serial] async fn graceful_shutdown_waits_for_tasks() { let tracker = TaskTracker::new(); let token = CancellationToken::new(); @@ -459,3 +478,45 @@ async fn graceful_shutdown_waits_for_tasks() { .is_ok() ); } + +#[rstest] +#[tokio::test] +#[serial] +async fn connection_count_decrements_on_abort( + queues: (PushQueues, wireframe::push::PushHandle), +) { + let (queues, handle) = queues; + let token = CancellationToken::new(); + token.cancel(); + + let before = wireframe::connection::active_connection_count(); + let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, token); + let during = wireframe::connection::active_connection_count(); + assert_eq!(during, before + 1); + + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + let after = wireframe::connection::active_connection_count(); + assert_eq!(during - after, 1); +} + +#[rstest] +#[tokio::test] +#[serial] +async fn connection_count_decrements_on_close( + queues: (PushQueues, wireframe::push::PushHandle), + shutdown_token: CancellationToken, +) { + let (queues, handle) = queues; + let before = wireframe::connection::active_connection_count(); + let stream = stream::iter(vec![Ok(1u8)]); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); + let during = wireframe::connection::active_connection_count(); + assert_eq!(during, before + 1); + + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + let after = wireframe::connection::active_connection_count(); + assert_eq!(during - after, 1); +}