Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes = "1"
log = "0.4"
dashmap = "5"
leaky-bucket = "1.1"
tracing = { version = ">=0.1.40, <0.2.0", features = ["log", "log-always"] }

[dev-dependencies]
rstest = "0.18.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,28 @@ crate throughout its core.

- `wireframe_reassembly_errors_total` (Counter)

```mermaid
sequenceDiagram
participant Client
participant Connection
participant Tracing

Client->>Connection: Open connection
Connection->>Tracing: info!(wireframe_active_connections, "connection opened")
Note right of Connection: ACTIVE_CONNECTIONS += 1

Client->>Connection: Start run()
Connection->>Tracing: info_span!("connection_actor")

alt Shutdown before start
Connection->>Tracing: info!("connection aborted before start")
Note right of Connection: ACTIVE_CONNECTIONS -= 1
else Normal close
Connection->>Tracing: info!("connection closed")
Note right of Connection: ACTIVE_CONNECTIONS -= 1
end
```

### D. A Comprehensive Quality Assurance Strategy

To guarantee the correctness and stability of these new, complex features,
Expand Down Expand Up @@ -320,11 +342,11 @@ components.

<!-- markdownlint-disable MD013 -->

| Phase | Focus | Key Deliverables |
| 1. Foundational Mechanics | Implement the core, non-public machinery. | Internal actor loop with select!(biased!), dual-channel push plumbing, basic FragmentAdapter logic. |
| 2. Public APIs & Ergonomics | Expose functionality to users in a clean, idiomatic way. | Fluent WireframeApp builder, WireframeProtocol trait, enhanced Response enum, FragmentStrategy trait, SessionRegistry with Weak references. |
| 3. Production Hardening | Add features for resilience and security. | CancellationToken-based graceful shutdown, re-assembly timeouts, per-connection rate limiting, optional Dead Letter Queue. |
| 4. Maturity and Polish | Focus on observability, advanced testing, and documentation. | Full tracing instrumentation, criterion benchmarks, loom and proptest test suites, comprehensive user guides and API documentation. |
| Phase | Focus | Key Deliverables |
| 1. Foundational Mechanics | Implement the core, non-public machinery. | Internal actor loop with select!(biased!), dual-channel push plumbing, basic FragmentAdapter logic. |
| 2. Public APIs & Ergonomics | Expose functionality to users in a clean, idiomatic way. | Fluent WireframeApp builder, WireframeProtocol trait, enhanced Response enum, FragmentStrategy trait, SessionRegistry with Weak references. |
| 3. Production Hardening | Add features for resilience and security. | CancellationToken-based graceful shutdown, re-assembly timeouts, per-connection rate limiting, optional Dead Letter Queue. |
| 4. Maturity and Polish | Focus on observability, advanced testing, and documentation. | Full tracing instrumentation, criterion benchmarks, loom and proptest test suites, comprehensive user guides and API documentation. |

<!-- markdownlint-enable MD013 -->

Expand Down
78 changes: 35 additions & 43 deletions docs/wireframe-1-0-detailed-development-roadmap.md

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ where
let routes = self.build_chains().await;

if let Err(e) = self.process_stream(&mut stream, &routes).await {
log::warn!("connection terminated with error: {e}");
tracing::warn!(error = ?e, "connection terminated with error");
}

if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) {
Expand Down Expand Up @@ -658,7 +658,7 @@ where
}
Err(e) => {
*deser_failures += 1;
log::warn!("failed to deserialize message: {e}");
tracing::warn!(error = ?e, "failed to deserialize message");
if *deser_failures >= MAX_DESER_FAILURES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand All @@ -678,15 +678,15 @@ where
msg: resp.into_inner(),
};
if let Err(e) = self.send_response(stream, &response).await {
log::warn!("failed to send response: {e}");
tracing::warn!(error = %e, "failed to send response");
}
}
Err(e) => {
log::warn!("handler error for id {}: {e}", env.id);
tracing::warn!(id = env.id, error = ?e, "handler error");
}
}
} else {
log::warn!("no handler for message id {}", env.id);
tracing::warn!("no handler for message id {}", env.id);
}

Ok(())
Expand Down
72 changes: 64 additions & 8 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,47 @@
//! `biased` keyword ensures high-priority messages are processed before
//! low-priority ones, with streamed responses handled last.

use std::future::Future;
use std::{
future::Future,
net::SocketAddr,
sync::atomic::{AtomicU64, Ordering},
};

use futures::StreamExt;
use tokio::{
sync::mpsc,
time::{Duration, Instant},
};
use tokio_util::sync::CancellationToken;
use tracing::{info, info_span, warn};

/// Global gauge tracking active connections.
static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0);

/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and
/// decrementing it on drop.
struct ActiveConnection;

impl ActiveConnection {
fn new() -> Self {
ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
Self
}
}

impl Drop for ActiveConnection {
fn drop(&mut self) { ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed); }
}

/// Return the current number of active connections.
#[must_use]
pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) }

use crate::{
hooks::{ConnectionContext, ProtocolHooks},
push::{FrameLike, PushHandle, PushQueues},
response::{FrameStream, WireframeError},
session::ConnectionId,
};

/// Events returned by [`next_event`].
Expand Down Expand Up @@ -75,11 +103,14 @@ pub struct ConnectionActor<F, E> {
low_rx: Option<mpsc::Receiver<F>>,
response: Option<FrameStream<F, E>>, // current streaming response
shutdown: CancellationToken,
counter: Option<ActiveConnection>,
hooks: ProtocolHooks<F, E>,
ctx: ConnectionContext,
fairness: FairnessConfig,
high_counter: usize,
high_start: Option<Instant>,
connection_id: Option<ConnectionId>,
peer_addr: Option<SocketAddr>,
}

impl<F, E> ConnectionActor<F, E>
Expand Down Expand Up @@ -123,21 +154,33 @@ where
handle: PushHandle<F>,
response: Option<FrameStream<F, E>>,
shutdown: CancellationToken,
mut hooks: ProtocolHooks<F, E>,
hooks: ProtocolHooks<F, E>,
) -> Self {
let mut ctx = ConnectionContext;
hooks.on_connection_setup(handle, &mut ctx);
Self {
let ctx = ConnectionContext;
let counter = ActiveConnection::new();
let mut actor = Self {
high_rx: Some(queues.high_priority_rx),
low_rx: Some(queues.low_priority_rx),
response,
shutdown,
counter: Some(counter),
hooks,
ctx,
fairness: FairnessConfig::default(),
high_counter: 0,
high_start: None,
}
connection_id: None,
peer_addr: None,
};
let current = ACTIVE_CONNECTIONS.load(Ordering::Relaxed);
info!(
wireframe_active_connections = current,
id = ?actor.connection_id,
peer = ?actor.peer_addr,
"connection opened"
);
actor.hooks.on_connection_setup(handle, &mut actor.ctx);
actor
}

/// Replace the fairness configuration.
Expand All @@ -158,10 +201,18 @@ where
///
/// Returns a [`WireframeError`] if the response stream yields an I/O error.
pub async fn run(&mut self, out: &mut Vec<F>) -> Result<(), WireframeError<E>> {
let span = info_span!("connection_actor");
let _enter = span.enter();
// If cancellation has already been requested, exit immediately. Nothing
// will be drained and any streaming response is abandoned. This mirrors
// a hard shutdown and is required for the tests.
if self.shutdown.is_cancelled() {
info!(
id = ?self.connection_id,
peer = ?self.peer_addr,
"connection aborted before start"
);
let _ = self.counter.take();
return Ok(());
}

Expand All @@ -170,7 +221,12 @@ where
while !state.is_done() {
self.poll_sources(&mut state, out).await?;
}

info!(
id = ?self.connection_id,
peer = ?self.peer_addr,
"connection closed"
);
let _ = self.counter.take();
Ok(())
}

Expand Down Expand Up @@ -368,7 +424,7 @@ where
out.push(frame);
}
Some(Err(WireframeError::Protocol(e))) => {
log::warn!("protocol error: {e:?}");
warn!(error = ?e, "protocol error");
self.hooks.handle_error(e, &mut self.ctx);
state.mark_closed();
self.hooks.on_command_end(&mut self.ctx);
Expand Down
30 changes: 22 additions & 8 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{

use leaky_bucket::RateLimiter;
use tokio::sync::mpsc;
use tracing::{debug, error, warn};

/// Messages can be sent through a [`PushHandle`].
///
Expand Down Expand Up @@ -110,7 +111,9 @@ impl<F: FrameLike> PushHandle<F> {
PushPriority::High => &self.0.high_prio_tx,
PushPriority::Low => &self.0.low_prio_tx,
};
tx.send(frame).await.map_err(|_| PushError::Closed)
tx.send(frame).await.map_err(|_| PushError::Closed)?;
debug!(?priority, "frame pushed");
Ok(())
}
/// Push a high-priority frame subject to rate limiting.
///
Expand Down Expand Up @@ -166,15 +169,18 @@ impl<F: FrameLike> PushHandle<F> {
}

/// Send a frame to the configured dead letter queue if available.
fn route_to_dlq(&self, frame: F) {
fn route_to_dlq(&self, frame: F)
where
F: std::fmt::Debug,
{
if let Some(dlq) = &self.0.dlq_tx {
match dlq.try_send(frame) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
log::error!("push queue and DLQ full; frame lost");
Err(mpsc::error::TrySendError::Full(f)) => {
error!(?f, "push queue and DLQ full; frame lost");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
log::error!("DLQ closed; frame lost");
Err(mpsc::error::TrySendError::Closed(f)) => {
error!(?f, "DLQ closed; frame lost");
}
}
}
Expand Down Expand Up @@ -216,7 +222,10 @@ impl<F: FrameLike> PushHandle<F> {
frame: F,
priority: PushPriority,
policy: PushPolicy,
) -> Result<(), PushError> {
) -> Result<(), PushError>
where
F: std::fmt::Debug,
{
let tx = match priority {
PushPriority::High => &self.0.high_prio_tx,
PushPriority::Low => &self.0.low_prio_tx,
Expand All @@ -228,7 +237,12 @@ impl<F: FrameLike> PushHandle<F> {
PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull),
PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => {
if matches!(policy, PushPolicy::WarnAndDropIfFull) {
log::warn!("push queue full; dropping {priority:?} priority frame");
warn!(
?priority,
?policy,
dlq = self.0.dlq_tx.is_some(),
"push queue full"
);
}
self.route_to_dlq(f);
Ok(())
Expand Down
Loading