Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ tokio = { version = "1.46.1", default-features = false, features = [
"sync",
"time",
"io-util",
"test-util",
] }
tokio-util = { version = "0.7.16", features = ["rt"] }
tokio-util = { version = "0.7.16", features = ["rt", "codec"] }
futures = "0.3.31"
async-trait = "0.1.88"
bytes = "1.10.1"
Expand All @@ -50,10 +49,19 @@ serial_test = "3.2.0"
# Permit compatible bug fixes but block breaking updates
cucumber = "0.21.1"
metrics-util = "0.20.0"
tracing = { version = "0.1.41", features = ["log", "log-always"] }
tracing-test = "0.2.5"
mockall = "0.13.1"

tokio = { version = "1.46.1", default-features = false, features = [
"macros",
"rt-multi-thread",
"sync",
"time",
"io-util",
"net",
"test-util",
] }

[features]
default = ["metrics"]
metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"]
Expand Down
16 changes: 6 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ connections and runs the Tokio event loop:
```rust
WireframeServer::new(|| {
WireframeApp::new()
.frame_processor(MyFrameProcessor::new())
.app_data(state.clone())
.route(MessageType::Login, handle_login)
.wrap(MyLoggingMiddleware::default())
Expand All @@ -48,10 +47,10 @@ WireframeServer::new(|| {
By default, the number of worker tasks equals the number of CPU cores. If the
CPU count cannot be determined, the server falls back to a single worker.

The builder supports methods like `frame_processor`, `route`, `app_data`, and
`wrap` for middleware configuration. `app_data` stores any `Send + Sync` value
keyed by type; registering another value of the same type overwrites the
previous one. Handlers retrieve these values using the `SharedState<T>`
The builder supports methods like `route`, `app_data`, and `wrap` for
middleware configuration. `app_data` stores any `Send + Sync` value keyed by
type; registering another value of the same type overwrites the previous one.
Handlers retrieve these values using the `SharedState<T>`
extractor【F:docs/rust-binary-router-library-design.md†L622-L710】.

Handlers are asynchronous functions whose parameters implement extractor traits
Expand All @@ -62,7 +61,7 @@ concise【F:docs/rust-binary-router-library-design.md†L682-L710】.
## Example

The design document includes a simple echo server that demonstrates routing
based on a message ID and the use of a length‑prefixed frame processor:
based on a message ID and the use of a length‑delimited codec:

```rust
async fn handle_echo(req: Message<EchoRequest>) -> WireframeResult<EchoResponse> {
Expand Down Expand Up @@ -139,10 +138,7 @@ size and endianness) and defaults to a 4‑byte big‑endian length
prefix【F:docs/rust-binary-router-library-design.md†L1082-L1123】.

```rust
use wireframe::frame::{LengthFormat, LengthPrefixedProcessor};

let app = WireframeApp::new()?
.frame_processor(LengthPrefixedProcessor::new(LengthFormat::u16_le()));
let app = WireframeApp::new()?;
```

## Connection Lifecycle
Expand Down
5 changes: 0 additions & 5 deletions docs/generic-message-fragmentation-and-re-assembly-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,6 @@ Developers will enable fragmentation by adding the `FragmentAdapter` to their
// Example: Configuring a server for MySQL-style fragmentation.
WireframeServer::new(|| {
WireframeApp::new()
.frame_processor(
FragmentAdapter::new(MySqlStrategy)
.with_max_message_size(64 * 1024 * 1024) // 64 MiB
.with_reassembly_timeout(Duration::from_secs(30))
)
.route(...)
})
```
Expand Down
27 changes: 12 additions & 15 deletions docs/rust-binary-router-library-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,7 @@ component to run it.
async fn main_server_setup() -> std::io::Result<()> {
let app_state = Arc::new(Mutex::new(AppState::new()));
WireframeServer::new(move || { // Closure provides App per worker thread
WireframeApp::new()
.frame_processor(MyFrameProcessor::new()) // Configure the framing logic
WireframeApp::new()
.app_data(app_state.clone()) // Shared application state
//.service(login_handler) // If using attribute macros and auto-discovery
//.service(chat_handler)
Expand All @@ -745,10 +744,8 @@ The WireframeApp builder would offer methods like:

- WireframeApp::new(): Creates a new application builder.

- .frame_processor(impl FrameProcessor): Sets the framing logic.

- .service(handler_function): Registers a handler function, potentially
inferring the message type it handles if attribute macros are used.
- `[deprecated]` `.frame_processor(impl FrameProcessor)`: framing is now
handled by the connection codec.

- .route(message_id, handler_function): Explicitly maps a message identifier to
a handler.
Expand Down Expand Up @@ -1273,7 +1270,7 @@ its own `FrameProcessor` trait or provide helpers.) <!-- list break -->

WireframeServer::new(|| {
WireframeApp::new()
//.frame_processor(LengthPrefixedCodec) // Simplified
//.frame_processor(LengthPrefixedCodec) // deprecated: framing handled by codec
.serializer(BincodeSerializer) // Specify serializer
.route(MyMessageType::Echo, handle_echo) // Route based on ID
// OR if type-based routing is supported and EchoRequest has an ID:
Expand Down Expand Up @@ -1383,14 +1380,14 @@ simplify server implementation.
let chat_state = Arc::new(Mutex::new(ChatRoomState {
users: HashMap::new()
}));
WireframeServer::new(move || {
WireframeApp::new()
//.frame_processor(...)
.serializer(BincodeSerializer)
.app_data(chat_state.clone())
.route(ChatMessageType::ClientJoin, handle_join)
.route(ChatMessageType::ClientPost, handle_post)
})
WireframeServer::new(move || {
WireframeApp::new()
//.frame_processor(...) // deprecated: framing handled by codec
.serializer(BincodeSerializer)
.app_data(chat_state.clone())
.route(ChatMessageType::ClientJoin, handle_join)
.route(ChatMessageType::ClientPost, handle_post)
})
.bind("127.0.0.1:8001")?
.run()
.await
Expand Down
13 changes: 6 additions & 7 deletions docs/wireframe-client-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ A `WireframeClient::builder()` method configures the client:

```rust
let client = WireframeClient::builder()
.frame_processor(LengthPrefixedProcessor::new(LengthFormat::u32_be()))
.serializer(BincodeSerializer)
.connect("127.0.0.1:7878")
.await?;
```

The same `FrameProcessor` and `Serializer` traits used by the server are reused
here, ensuring messages are framed and encoded consistently.
The same `Serializer` trait used by the server is reused here, ensuring
messages are encoded consistently while framing is handled by the
length‑delimited codec.

### Request/Response Helpers

Expand All @@ -52,9 +52,9 @@ let request = Login { username: "guest".into() };
let response: LoginAck = client.call(request).await?;
```

Internally, this uses the `Serializer` to encode the request, writes it through
the `FrameProcessor`, then waits for a frame, decodes it, and deserializes the
response type.
Internally, this uses the `Serializer` to encode the request, sends it through
the length‑delimited codec, then waits for a frame, decodes it, and
deserializes the response type.

### Connection Lifecycle

Expand All @@ -68,7 +68,6 @@ initialization logic.
#[tokio::main]
async fn main() -> std::io::Result<()> {
let mut client = WireframeClient::builder()
.frame_processor(LengthPrefixedProcessor::new(LengthFormat::u32_be()))
.serializer(BincodeSerializer)
.connect("127.0.0.1:7878")
.await?;
Expand Down
2 changes: 0 additions & 2 deletions docs/wireframe-testing-crate.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,12 @@ let (_, frame) = recv_expect!(queues.recv());
```rust
use std::sync::Arc;
use wireframe_testing::{drive_with_frame, drive_with_frames};
use wireframe::processor::LengthPrefixedProcessor;
use crate::tests::{build_test_frame, expected_bytes};

#[tokio::test]
async fn handler_echoes_message() {
let app = WireframeApp::new()
.unwrap()
.frame_processor(LengthPrefixedProcessor::default())
.route(1, Arc::new(|_| Box::pin(async {})))
.unwrap();

Expand Down
7 changes: 5 additions & 2 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
//! envelope back to the client.

use wireframe::{
app::{Envelope, WireframeApp},
app::Envelope,
serializer::BincodeSerializer,
server::{ServerError, WireframeServer},
};

type App = wireframe::app::WireframeApp<BincodeSerializer, (), Envelope>;

#[tokio::main]
async fn main() -> Result<(), ServerError> {
let factory = || {
WireframeApp::new()
App::new()
.expect("failed to create WireframeApp")
.route(
1,
Expand Down
13 changes: 7 additions & 6 deletions examples/metadata_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use std::{io, sync::Arc};
use bytes::BytesMut;
use tokio::io::{AsyncWriteExt, duplex};
use wireframe::{
app::{Envelope, WireframeApp},
app::Envelope,
frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor},
message::Message,
serializer::Serializer,
};

type App = wireframe::app::WireframeApp<HeaderSerializer, (), Envelope>;

/// Frame format with a two-byte id, one-byte flags, and bincode payload.
#[derive(Default)]
struct HeaderSerializer;

impl Serializer for HeaderSerializer {
Expand Down Expand Up @@ -61,15 +64,13 @@ struct Ping;

#[tokio::main]
async fn main() -> io::Result<()> {
let app = WireframeApp::new()
let app = App::with_serializer(HeaderSerializer)
.expect("failed to create app")
.frame_processor(LengthPrefixedProcessor::default())
.serializer(HeaderSerializer)
.route(
1,
Arc::new(|_env: &Envelope| {
Box::pin(async move {
println!("received ping message");
tracing::info!("received ping message");
})
}),
)
Expand All @@ -78,7 +79,7 @@ async fn main() -> io::Result<()> {
2,
Arc::new(|_env: &Envelope| {
Box::pin(async move {
println!("received pong message");
tracing::info!("received pong message");
})
}),
)
Expand Down
24 changes: 13 additions & 11 deletions examples/packet_enum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
use std::{collections::HashMap, future::Future, pin::Pin};

use async_trait::async_trait;
use tracing::{info, warn};
use wireframe::{
app::{Envelope, WireframeApp},
frame::{LengthFormat, LengthPrefixedProcessor},
app::Envelope,
message::Message,
middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform},
serializer::BincodeSerializer,
server::{ServerError, WireframeServer},
};

type App = wireframe::app::WireframeApp<BincodeSerializer, (), Envelope>;

#[derive(bincode::Encode, bincode::BorrowDecode, Debug)]
enum Packet {
enum ExamplePacket {
Ping,
Chat { user: String, msg: String },
Stats(Vec<u32>),
Expand All @@ -24,7 +27,7 @@ enum Packet {
#[derive(bincode::Encode, bincode::BorrowDecode, Debug)]
struct Frame {
headers: HashMap<String, String>,
packet: Packet,
packet: ExamplePacket,
}

/// Middleware that decodes incoming frames and logs packet details.
Expand All @@ -45,12 +48,12 @@ where
async fn call(&self, req: ServiceRequest) -> Result<ServiceResponse, Self::Error> {
match Frame::from_bytes(req.frame()) {
Ok((frame, _)) => match frame.packet {
Packet::Ping => println!("ping: {:?}", frame.headers),
Packet::Chat { user, msg } => println!("{user} says: {msg}"),
Packet::Stats(values) => println!("stats: {values:?}"),
ExamplePacket::Ping => info!("ping: {:?}", frame.headers),
ExamplePacket::Chat { user, msg } => info!("{user} says: {msg}"),
ExamplePacket::Stats(values) => info!("stats: {values:?}"),
},
Err(e) => {
eprintln!("Failed to decode frame: {e}");
warn!("Failed to decode frame: {e}");
}
}

Expand All @@ -71,16 +74,15 @@ impl Transform<HandlerService<Envelope>> for DecodeMiddleware {

fn handle_packet(_env: &Envelope) -> Pin<Box<dyn Future<Output = ()> + Send>> {
Box::pin(async {
println!("packet received");
info!("packet received");
})
}

#[tokio::main]
async fn main() -> Result<(), ServerError> {
let factory = || {
WireframeApp::new()
App::new()
.expect("Failed to create WireframeApp")
.frame_processor(LengthPrefixedProcessor::new(LengthFormat::u16_le()))
.wrap(DecodeMiddleware)
.expect("Failed to wrap middleware")
.route(1, std::sync::Arc::new(handle_packet))
Expand Down
13 changes: 9 additions & 4 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use std::{net::SocketAddr, sync::Arc};

use async_trait::async_trait;
use wireframe::{
app::{Envelope, Packet, Result as AppResult, WireframeApp},
app::{Envelope, Packet, Result as AppResult},
message::Message,
middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform},
serializer::BincodeSerializer,
server::{ServerError, WireframeServer},
};

type App = wireframe::app::WireframeApp<BincodeSerializer, (), Envelope>;

#[derive(bincode::Encode, bincode::BorrowDecode, Debug)]
struct Ping(u32);

Expand All @@ -40,7 +42,10 @@ const PING_ID: u32 = 1;
///
/// The middleware chain generates the actual response, so this
/// handler intentionally performs no work.
#[allow(clippy::unused_async)]
#[expect(
clippy::unused_async,
reason = "Keep async signature to match Handler and Transform trait expectations"
)]
async fn ping_handler() {}

struct PongMiddleware;
Expand Down Expand Up @@ -130,8 +135,8 @@ impl<E: Packet> Transform<HandlerService<E>> for Logging {
}
}

fn build_app() -> AppResult<WireframeApp> {
WireframeApp::new()?
fn build_app() -> AppResult<App> {
App::new()?
.serializer(BincodeSerializer)
.route(PING_ID, Arc::new(|_: &Envelope| Box::pin(ping_handler())))?
.wrap(PongMiddleware)?
Expand Down
Loading
Loading