Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
983 changes: 943 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ log = "0.4"
dashmap = "5"
leaky-bucket = "1.1"
tracing = { version = ">=0.1.40, <0.2.0", features = ["log", "log-always"] }
metrics = { version = "0.24", optional = true }
metrics-exporter-prometheus = { version = "0.17", optional = true, features = ["http-listener"] }

[dev-dependencies]
rstest = "0.18.2"
Expand All @@ -25,9 +27,14 @@ loom = "^0.7"
async-stream = "0.3"
tokio = { version = "1", default-features = false, features = ["test-util"] }
serial_test = "3.1"
metrics-util = "0.20"
metrics-exporter-prometheus = "0.17"

[features]
default = ["metrics"]
metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"]
advanced-tests = []

[lints.clippy]
pedantic = "warn"

7 changes: 7 additions & 0 deletions docs/hardening-wireframe-a-guide-to-production-resilience.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ A separate part of the application is then responsible for consuming from the
DLQ's receiver to inspect, log, and re-process these failed messages, ensuring
zero message loss even under transient high load.

## 5. Metrics and Observability

Operational visibility is critical in production. `wireframe` updates counters
and gauges through the optional `metrics` feature. See the documentation for
`wireframe::metrics` for a Prometheus recorder example. The exposed metrics
include processed frame counts, error totals and the active connection gauge.

By systematically implementing these hardening strategies, `wireframe` will
provide the guarantees of stability, security, and reliability expected of a
foundational piece of network infrastructure.
6 changes: 3 additions & 3 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ production environments.
- [x] Create a helper crate for test logging setup
(`wireframe_testing/src/logging.rs`).

- [ ] **Metrics & Observability:**
- [x] **Metrics & Observability:**

- [ ] Expose key operational metrics (e.g., active connections, messages per
- [x] Expose key operational metrics (e.g., active connections, messages per
second, error rates).

- [ ] Provide an integration guide for popular monitoring systems (e.g.,
- [x] Provide an integration guide for popular monitoring systems (e.g.,
Prometheus).

- [ ] **Advanced Error Handling:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,32 @@ A production system is a black box without good instrumentation. `wireframe`

- `wireframe_reassembly_errors_total` (Counter)

The following entity–relationship diagram summarises how the core metrics
relate.

```mermaid
Comment thread
coderabbitai[bot] marked this conversation as resolved.
erDiagram
CONNECTIONS_ACTIVE ||--o{ FRAMES_PROCESSED : tracks
CONNECTIONS_ACTIVE {
string name
float value
}
FRAMES_PROCESSED {
string name
int value
string direction
}
ERRORS_TOTAL {
string name
int value
}
CONNECTIONS_ACTIVE ||--o{ ERRORS_TOTAL : tracks
```

Metrics are emitted using the optional `metrics` feature. See
[`wireframe::metrics`] for a Prometheus recorder example. All instrumentation
is gated behind this feature, so users can opt out if metrics are unnecessary.

```mermaid
sequenceDiagram
participant Client
Expand Down
1 change: 1 addition & 0 deletions examples/metadata_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl FrameMetadata for HeaderSerializer {
struct Ping;

#[derive(bincode::Decode, bincode::Encode)]
#[expect(dead_code, reason = "used only in documentation example")]
struct Pong;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "nightly-2025-06-10"
channel = "nightly-2025-07-22"
components = ["rustfmt", "clippy"]
5 changes: 5 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ where
{
// Parse the frame first; routing is handled below to avoid duplicating
// logic on the success path.
crate::metrics::inc_frames(crate::metrics::Direction::Inbound);
let (env, _) = match self.parse_envelope(frame) {
Ok(result) => {
*deser_failures = 0;
Expand All @@ -659,6 +660,7 @@ where
Err(e) => {
*deser_failures += 1;
tracing::warn!(error = ?e, "failed to deserialize message");
crate::metrics::inc_deser_errors();
if *deser_failures >= MAX_DESER_FAILURES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand All @@ -679,14 +681,17 @@ where
};
if let Err(e) = self.send_response(stream, &response).await {
tracing::warn!(error = %e, "failed to send response");
crate::metrics::inc_handler_errors();
}
}
Err(e) => {
tracing::warn!(id = env.id, error = ?e, "handler error");
crate::metrics::inc_handler_errors();
}
}
} else {
tracing::warn!("no handler for message id {}", env.id);
crate::metrics::inc_handler_errors();
}

Ok(())
Expand Down
9 changes: 8 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ struct ActiveConnection;
impl ActiveConnection {
fn new() -> Self {
ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
crate::metrics::inc_connections();
Self
}
}

impl Drop for ActiveConnection {
fn drop(&mut self) { ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); }
fn drop(&mut self) {
ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
crate::metrics::dec_connections();
}
}

/// Return the current number of active connections.
Expand Down Expand Up @@ -322,6 +326,7 @@ where
let mut frame = frame;
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}

/// Common logic for handling closed receivers.
Expand Down Expand Up @@ -422,12 +427,14 @@ where
Some(Ok(mut frame)) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}
Some(Err(WireframeError::Protocol(e))) => {
warn!(error = ?e, "protocol error");
self.hooks.handle_error(e, &mut self.ctx);
state.mark_closed();
self.hooks.on_command_end(&mut self.ctx);
crate::metrics::inc_handler_errors();
}
Some(Err(e)) => return Err(e),
None => {
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod extractor;
pub mod frame;
pub mod hooks;
pub mod message;
pub mod metrics;
pub mod middleware;
pub mod preamble;
pub mod push;
Expand All @@ -22,5 +23,6 @@ pub mod session;

pub use connection::ConnectionActor;
pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol};
pub use metrics::{CONNECTIONS_ACTIVE, Direction, ERRORS_TOTAL, FRAMES_PROCESSED};
pub use response::{FrameStream, Response, WireframeError};
pub use session::{ConnectionId, SessionRegistry};
81 changes: 81 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Metric helpers for `wireframe`.
//!
//! This module defines metric names and helper functions wrapping the
//! [`metrics`](https://docs.rs/metrics) crate. All functions become no-ops
//! if the optional `metrics` Cargo feature is disabled.
//!
//! # Prometheus Integration
//!
//! ```
//! use metrics_exporter_prometheus::PrometheusBuilder;
//!
//! let handle = PrometheusBuilder::new()
//! .install_recorder()
//! .expect("recorder install");
//! println!("{}", handle.render());
//! ```

#[cfg(feature = "metrics")]
use metrics::{counter, gauge};

/// Name of the gauge tracking active connections.
pub const CONNECTIONS_ACTIVE: &str = "wireframe_connections_active";
/// Name of the counter tracking processed frames.
pub const FRAMES_PROCESSED: &str = "wireframe_frames_processed_total";
/// Name of the counter tracking error occurrences.
pub const ERRORS_TOTAL: &str = "wireframe_errors_total";

/// Direction of frame processing.
#[derive(Clone, Copy)]
pub enum Direction {
/// Inbound frames received from a client.
Inbound,
/// Outbound frames sent to a client.
Outbound,
}

impl Direction {
fn as_str(self) -> &'static str {
match self {
Direction::Inbound => "inbound",
Direction::Outbound => "outbound",
}
}
}

/// Increment the active connections gauge.
#[cfg(feature = "metrics")]
pub fn inc_connections() { gauge!(CONNECTIONS_ACTIVE).increment(1.0); }

#[cfg(not(feature = "metrics"))]
pub fn inc_connections() {}

/// Decrement the active connections gauge.
#[cfg(feature = "metrics")]
pub fn dec_connections() { gauge!(CONNECTIONS_ACTIVE).decrement(1.0); }

#[cfg(not(feature = "metrics"))]
pub fn dec_connections() {}

/// Record a processed frame for the given direction.
#[cfg(feature = "metrics")]
pub fn inc_frames(direction: Direction) {
counter!(FRAMES_PROCESSED, "direction" => direction.as_str()).increment(1);
}

#[cfg(not(feature = "metrics"))]
pub fn inc_frames(_direction: Direction) {}

/// Record a deserialization error.
#[cfg(feature = "metrics")]
pub fn inc_deser_errors() { counter!(ERRORS_TOTAL, "kind" => "deserialization").increment(1); }

#[cfg(not(feature = "metrics"))]
pub fn inc_deser_errors() {}

/// Record a handler error.
#[cfg(feature = "metrics")]
pub fn inc_handler_errors() { counter!(ERRORS_TOTAL, "kind" => "handler").increment(1); }

#[cfg(not(feature = "metrics"))]
pub fn inc_handler_errors() {}
2 changes: 2 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,9 @@ mod tests {
message: String,
}

/// Test helper preamble carrying no data.
#[derive(Debug, Clone, PartialEq, Encode, Decode)]
#[expect(dead_code, reason = "test helper for unused preamble type")]
struct EmptyPreamble;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

#[fixture]
Expand Down
63 changes: 63 additions & 0 deletions tests/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! Tests for `wireframe` metrics helpers.
//!
//! These tests verify that counters and gauges update as expected using
//! `metrics_util::debugging::DebuggingRecorder`.
use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};

/// Creates a debugging recorder and snapshotter for metrics testing.
fn debugging_recorder_setup() -> (Snapshotter, DebuggingRecorder) {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();
(snapshotter, recorder)
}

#[test]
fn outbound_frame_metric_increments() {
let (snapshotter, recorder) = debugging_recorder_setup();
metrics::with_local_recorder(&recorder, || {
wireframe::metrics::inc_frames(wireframe::metrics::Direction::Outbound);
});

let metrics = snapshotter.snapshot().into_vec();
let found = metrics.iter().any(|(k, _, _, v)| {
k.key().name() == wireframe::metrics::FRAMES_PROCESSED
&& k.key()
.labels()
.any(|l| l.key() == "direction" && l.value() == "outbound")
&& matches!(v, DebugValue::Counter(c) if *c > 0)
});
assert!(found, "outbound frames metric not recorded");
}

#[test]
fn inbound_frame_metric_increments() {
let (snapshotter, recorder) = debugging_recorder_setup();
metrics::with_local_recorder(&recorder, || {
wireframe::metrics::inc_frames(wireframe::metrics::Direction::Inbound);
});
let metrics = snapshotter.snapshot().into_vec();
let found = metrics.iter().any(|(k, _, _, v)| {
k.key().name() == wireframe::metrics::FRAMES_PROCESSED
&& k.key()
.labels()
.any(|l| l.key() == "direction" && l.value() == "inbound")
&& matches!(v, DebugValue::Counter(c) if *c > 0)
});

assert!(found, "inbound frames metric not recorded");
}

#[test]
fn error_metric_increments() {
let (snapshotter, recorder) = debugging_recorder_setup();
metrics::with_local_recorder(&recorder, || {
wireframe::metrics::inc_deser_errors();
});

let metrics = snapshotter.snapshot().into_vec();
let found = metrics.iter().any(|(k, _, _, v)| {
k.key().name() == wireframe::metrics::ERRORS_TOTAL
&& matches!(v, DebugValue::Counter(c) if *c > 0)
});
assert!(found, "error metric not recorded");
}
1 change: 1 addition & 0 deletions wireframe_testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ bytes = "^1.0"
rstest = "0.18.2"
logtest = "2"
log = "0.4"
metrics-util = "0.20"
Loading