Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tracing = { version = "0.1.41", features = ["log", "log-always"] }
tracing-subscriber = "0.3"
metrics = { version = "0.24.2", optional = true }
metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = ["http-listener"] }
thiserror = "2.0.12"

[dev-dependencies]
rstest = "0.18.2"
Expand Down
34 changes: 17 additions & 17 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ design and possible refinements. See

The implementation must satisfy the following core requirements:

| ID | Requirement |
| ID | Requirement |
| --- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| G1 | Any async task must be able to push frames to a live connection. |
| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. |
| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. |
| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. |
| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. |
| G1 | Any async task must be able to push frames to a live connection. |
| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. |
| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. |
| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. |
| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. |

## 3. Core Architecture: The Connection Actor

Expand All @@ -69,7 +69,7 @@ manage two distinct, bounded `tokio::mpsc` channels for pushed frames:
messages like heartbeats, session control notifications, or protocol-level
pings.

1. `low_priority_push_rx: mpsc::Receiver<F>`: For standard, non-urgent
2. `low_priority_push_rx: mpsc::Receiver<F>`: For standard, non-urgent
background messages like log forwarding or secondary status updates.

The bounded nature of these channels provides an inherent and robust
Expand All @@ -89,13 +89,13 @@ The polling order will be:
1. **Graceful Shutdown Signal:** The `CancellationToken` will be checked first
to ensure immediate reaction to a server-wide shutdown request.

1. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be
2. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be
drained next.

1. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be
3. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be
processed after all high-priority messages.

1. **Handler Response Stream:** Frames from the active request's
4. **Handler Response Stream:** Frames from the active request's
`Response::Stream` will be processed last.

```rust
Expand Down Expand Up @@ -784,11 +784,11 @@ sequenceDiagram

## 8. Measurable Objectives & Success Criteria

| Category | Objective | Success Metric |
| Category | Objective | Success Metric |
| --------------- | ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. |
| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. |
| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. |
| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. |
| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. |
| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. |
| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. |
| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. |
| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. |
| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. |
| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. |
| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. |
15 changes: 7 additions & 8 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
//! The application listens for incoming frames and simply echoes each
//! envelope back to the client.

use std::io;

use wireframe::{
app::{Envelope, WireframeApp},
server::WireframeServer,
server::{ServerError, WireframeServer},
Comment thread
leynos marked this conversation as resolved.
};

#[tokio::main]
async fn main() -> io::Result<()> {
async fn main() -> Result<(), ServerError> {
let factory = || {
WireframeApp::new()
.unwrap()
.expect("failed to create WireframeApp")
.route(
1,
std::sync::Arc::new(|_: &Envelope| {
Expand All @@ -24,11 +22,12 @@ async fn main() -> io::Result<()> {
})
}),
)
.unwrap()
.expect("failed to register route 1")
};

WireframeServer::new(factory)
.bind("127.0.0.1:7878".parse().unwrap())?
.bind("127.0.0.1:7878".parse().expect("invalid socket address"))?
.run()
.await
.await?;
Ok(())
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
9 changes: 5 additions & 4 deletions examples/packet_enum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
//! The application defines an enum representing different packet variants and
//! shows how to dispatch handlers based on the variant received.

use std::{collections::HashMap, future::Future, io, pin::Pin};
use std::{collections::HashMap, future::Future, pin::Pin};

use async_trait::async_trait;
use wireframe::{
app::{Envelope, WireframeApp},
frame::{LengthFormat, LengthPrefixedProcessor},
message::Message,
middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform},
server::WireframeServer,
server::{ServerError, WireframeServer},
};

#[derive(bincode::Encode, bincode::BorrowDecode, Debug)]
Expand Down Expand Up @@ -76,7 +76,7 @@ fn handle_packet(_env: &Envelope) -> Pin<Box<dyn Future<Output = ()> + Send>> {
}

#[tokio::main]
async fn main() -> io::Result<()> {
async fn main() -> Result<(), ServerError> {
let factory = || {
WireframeApp::new()
.expect("Failed to create WireframeApp")
Expand All @@ -92,5 +92,6 @@ async fn main() -> io::Result<()> {
WireframeServer::new(factory)
.bind(addr.parse().expect("Invalid server address"))?
.run()
.await
.await?;
Ok(())
}
13 changes: 6 additions & 7 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
//! Demonstrates custom packet structs and middleware that maps `Ping` to
//! `Pong` responses.

use std::{io, net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc};

use async_trait::async_trait;
use wireframe::{
app::{Envelope, Packet, Result as AppResult, WireframeApp},
message::Message,
middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform},
serializer::BincodeSerializer,
server::WireframeServer,
server::{ServerError, WireframeServer},
};

#[derive(bincode::Encode, bincode::BorrowDecode, Debug)]
Expand Down Expand Up @@ -136,15 +136,14 @@ fn build_app() -> AppResult<WireframeApp> {
}

#[tokio::main]
async fn main() -> io::Result<()> {
async fn main() -> Result<(), ServerError> {
let factory = || build_app().expect("app build failed");

let default_addr = "127.0.0.1:7878";
let addr_str = std::env::args()
.nth(1)
.unwrap_or_else(|| default_addr.into());
let addr: SocketAddr = addr_str
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
WireframeServer::new(factory).bind(addr)?.run().await
let addr: SocketAddr = addr_str.parse().expect("invalid address");
WireframeServer::new(factory).bind(addr)?.run().await?;
Ok(())
}
183 changes: 183 additions & 0 deletions src/server/config/binding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//! Binding configuration for [`WireframeServer`].

use core::marker::PhantomData;
use std::{
net::{SocketAddr, TcpListener as StdTcpListener},
sync::Arc,
};

use tokio::net::TcpListener;

use super::{Unbound, WireframeServer};
use crate::{
app::WireframeApp,
preamble::Preamble,
server::{Bound, ServerError},
};

impl<F, T> WireframeServer<F, T, Unbound>
where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
T: Preamble,
{
/// Return `None` as the server is not bound.
///
/// # Examples
///
/// ```
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// assert!(
/// WireframeServer::new(|| WireframeApp::default())
/// .local_addr()
/// .is_none()
/// );
/// ```
#[must_use]
pub const fn local_addr(&self) -> Option<SocketAddr> { None }

/// Bind to a fresh address.
///
/// # Examples
///
/// ```
/// use std::net::{Ipv4Addr, SocketAddr};
///
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
/// let server = WireframeServer::new(|| WireframeApp::default())
/// .bind(addr)
/// .expect("bind failed");
/// assert!(server.local_addr().is_some());
/// ```
///
/// # Errors
/// Returns a [`ServerError`] if binding or configuring the listener fails.
pub fn bind(self, addr: SocketAddr) -> Result<WireframeServer<F, T, Bound>, ServerError> {
let std = StdTcpListener::bind(addr).map_err(ServerError::Bind)?;
self.bind_listener(std)
}
Comment on lines +57 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Factor out duplicated listener-setup code

Both bind/bind_listener paths repeat the set_nonblocking + from_std + struct rebuild sequence. Extract a private helper to trim duplication and keep the invariants in one place.

Also applies to: 143-180

🤖 Prompt for AI Agents
In src/server/config/binding.rs around lines 57 to 60 and also between lines 143
to 180, the code for setting up the listener duplicates the sequence of calling
set_nonblocking, from_std, and rebuilding the struct. To fix this, create a
private helper function that takes the standard listener, performs
set_nonblocking and from_std, and returns the rebuilt struct. Replace the
duplicated code in both bind and bind_listener with calls to this new helper to
centralize the logic and reduce duplication.


/// Bind to an existing `StdTcpListener`.
///
/// # Examples
///
/// ```
/// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener};
///
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// let std = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap();
/// let server = WireframeServer::new(|| WireframeApp::default())
/// .bind_listener(std)
/// .expect("bind failed");
/// assert!(server.local_addr().is_some());
/// ```
///
/// # Errors
/// Returns a [`ServerError`] if configuring the listener fails.
pub fn bind_listener(
self,
std: StdTcpListener,
) -> Result<WireframeServer<F, T, Bound>, ServerError> {
std.set_nonblocking(true).map_err(ServerError::Bind)?;
let tokio = TcpListener::from_std(std).map_err(ServerError::Bind)?;
Ok(WireframeServer {
factory: self.factory,
workers: self.workers,
on_preamble_success: self.on_preamble_success,
on_preamble_failure: self.on_preamble_failure,
ready_tx: self.ready_tx,
state: Bound {
listener: Arc::new(tokio),
},
_preamble: PhantomData,
})
}
Comment thread
leynos marked this conversation as resolved.
}

impl<F, T> WireframeServer<F, T, Bound>
where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
T: Preamble,
{
/// Returns the bound address, or `None` if retrieving it fails.
///
/// # Examples
///
/// ```
/// use std::net::SocketAddr;
///
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
/// let server = WireframeServer::new(|| WireframeApp::default())
/// .bind(addr)
/// .expect("bind failed");
/// assert!(server.local_addr().is_some());
/// ```
#[must_use]
pub fn local_addr(&self) -> Option<SocketAddr> { self.state.listener.local_addr().ok() }

/// Rebind to a fresh address.
///
/// # Examples
///
/// ```
/// use std::net::{Ipv4Addr, SocketAddr};
///
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
/// let server = WireframeServer::new(|| WireframeApp::default())
/// .bind(addr)
/// .expect("bind failed");
/// let addr2 = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
/// let server = server.bind(addr2).expect("rebind failed");
/// assert!(server.local_addr().is_some());
/// ```
///
/// # Errors
/// Returns a [`ServerError`] if binding or configuring the listener fails.
pub fn bind(self, addr: SocketAddr) -> Result<Self, ServerError> {
let std = StdTcpListener::bind(addr).map_err(ServerError::Bind)?;
self.bind_listener(std)
}

/// Rebind using an existing `StdTcpListener`.
///
/// # Examples
///
/// ```
/// use std::net::{Ipv4Addr, SocketAddr, TcpListener as StdTcpListener};
///
/// use wireframe::{app::WireframeApp, server::WireframeServer};
///
/// let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
/// let server = WireframeServer::new(|| WireframeApp::default())
/// .bind(addr)
/// .expect("bind failed");
/// let std = StdTcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap();
/// let server = server.bind_listener(std).expect("rebind failed");
/// assert!(server.local_addr().is_some());
/// ```
///
/// # Errors
/// Returns a [`ServerError`] if configuring the listener fails.
pub fn bind_listener(self, std: StdTcpListener) -> Result<Self, ServerError> {
std.set_nonblocking(true).map_err(ServerError::Bind)?;
let tokio = TcpListener::from_std(std).map_err(ServerError::Bind)?;
Ok(WireframeServer {
factory: self.factory,
workers: self.workers,
on_preamble_success: self.on_preamble_success,
on_preamble_failure: self.on_preamble_failure,
ready_tx: self.ready_tx,
state: Bound {
listener: Arc::new(tokio),
},
_preamble: PhantomData,
})
}
}
Loading
Loading