Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,22 @@ payload bytes. Applications can supply their own envelope type by calling
`Packet` trait:

```rust
use wireframe::app::{Packet, WireframeApp};
use wireframe::app::{Packet, PacketParts, WireframeApp};

#[derive(bincode::Encode, bincode::BorrowDecode)]
struct MyEnv { id: u32, correlation_id: u64, data: Vec<u8> }
struct MyEnv { id: u32, correlation_id: Option<u64>, payload: Vec<u8> }

impl Packet for MyEnv {
fn id(&self) -> u32 { self.id }
fn correlation_id(&self) -> u64 { self.correlation_id }
fn into_parts(self) -> (u32, u64, Vec<u8>) { (self.id, self.correlation_id, self.data) }
fn from_parts(id: u32, correlation_id: u64, data: Vec<u8>) -> Self {
Self { id, correlation_id, data }
fn correlation_id(&self) -> Option<u64> { self.correlation_id }
fn into_parts(self) -> PacketParts {
PacketParts::new(self.id, self.correlation_id, self.payload)
}
fn from_parts(parts: PacketParts) -> Self {
let id = parts.id();
let correlation_id = parts.correlation_id();
let payload = parts.payload();
Self { id, correlation_id, payload }
}
}

Expand All @@ -115,6 +120,10 @@ let app = WireframeApp::<_, _, MyEnv>::new()
.unwrap();
```

A `None` correlation ID denotes an unsolicited event or server-initiated push.
Use `None` rather than `Some(0)` when a frame lacks a correlation ID. See
[PacketParts](docs/api.md#packetparts) for field details.

This allows integration with existing packet formats without modifying
`handle_frame`.

Expand Down Expand Up @@ -281,8 +290,9 @@ Example programs are available in the `examples/` directory:
- `ping_pong.rs` — showcases serialization and middleware in a ping/pong
protocol. See [examples/ping_pong.md](examples/ping_pong.md) for a detailed
overview.
- `packet_enum.rs` – shows packet type discrimination with a bincode enum and a
frame containing container types like `HashMap` and `Vec`.
- [`packet_enum.rs`](examples/packet_enum.rs) — shows packet type discrimination
with a bincode enum and a frame containing container types like `HashMap` and
`Vec`.

Run an example with Cargo:

Expand Down
25 changes: 25 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# API overview

## PacketParts

A `PacketParts` struct decomposes a packet into its components:

```rust
let parts = PacketParts::new(id, correlation_id, payload);
```

- `id: u32` — frame identifier
- `correlation_id: Option<u64>` — `None` marks an unsolicited event or
server‑initiated push
- `payload: Vec<u8>` — raw message bytes

Custom packet types can convert to and from `PacketParts` to avoid manual
mapping:

```rust
let parts = PacketParts::new(id, None, data);
let env = Envelope::from(parts);
```

`None` propagation ensures packets that originate on the server carry no
accidental correlation identifier.
8 changes: 7 additions & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,15 @@ production environments.
- [x] Expose key operational metrics (e.g., active connections, messages per
second, error rates).

- [x] Provide an integration guide for popular monitoring systems (e.g.,
- [x] Provide an integration guide for popular monitoring systems (e.g.,
Prometheus).

- [x] **Packet decomposition:**

- [x] Introduce `PacketParts` to replace tuple-based packet handling.
- [x] Treat `correlation_id` as `Option<u64>` so `None` denotes an
unsolicited event or server-initiated push.

- [x] **Advanced Error Handling:**

- [x] Implement panic handlers in connection tasks to prevent a single
Expand Down
47 changes: 47 additions & 0 deletions docs/rust-binary-router-library-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,53 @@ frame processing, akin to how `tokio-util::codec` operates, endows "wireframe"
with the necessary flexibility to adapt to this diversity without embedding
assumptions about any single framing strategy into its core.

#### 4.3.1 Packet abstraction

The library defines a `Packet` trait to represent transport frames. Frames can
be decomposed into `PacketParts` for efficient handling and reassembly.
`Envelope` is the default implementation used by `wireframe`. The following
diagram depicts the `Packet` trait, `PacketParts`, and `Envelope`.

```mermaid
Comment thread
coderabbitai[bot] marked this conversation as resolved.
classDiagram
class Packet {
<<trait>>
+id() u32
+correlation_id() Option<u64>
+into_parts() PacketParts
+from_parts(parts: PacketParts) Self
}
class PacketParts {
-id: u32
-correlation_id: Option<u64>
-payload: Vec<u8>
+new(id: u32, correlation_id: Option<u64>, payload: Vec<u8>) PacketParts
+id() u32
+correlation_id() Option<u64>
+payload() Vec<u8>
+inherit_correlation(source: Option<u64>) PacketParts
}
class Envelope {
-id: u32
-correlation_id: Option<u64>
-payload: Vec<u8>
+new(id: u32, correlation_id: Option<u64>, payload: Vec<u8>)
+from_parts(parts: PacketParts) Envelope
+into_parts() PacketParts
}
Packet <|.. Envelope
PacketParts <.. Envelope : uses
PacketParts <.. Packet : uses
```

`Envelope` implements `Packet`, carrying payload and metadata through the
system. `PacketParts` avoids repetitive tuple unpacking when frames are split
into constituent pieces. A `None` correlation ID denotes an unsolicited event
or server-initiated push. In multi-packet streaming responses, the optional
`correlation_id` links all packets in the stream to the originating request,
and protocols should define an explicit end-of-stream indicator alongside the
shared correlation identifier.

### 4.4. Message Serialization and Deserialization

The conversion of frame payloads to and from Rust types is a critical source of
Expand Down
14 changes: 7 additions & 7 deletions examples/metadata_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl FrameMetadata for HeaderSerializer {
// `parse` receives the complete frame because `LengthPrefixedProcessor`
// ensures `src` contains exactly one message. Returning `src.len()` is
// therefore correct for this demo.
Ok((Envelope::new(id, 0, payload), src.len()))
Ok((Envelope::new(id, None, payload), src.len()))
Comment thread
leynos marked this conversation as resolved.
}
}

Expand All @@ -62,7 +62,7 @@ struct Ping;
#[tokio::main]
async fn main() -> io::Result<()> {
let app = WireframeApp::new()
.unwrap()
.expect("failed to create app")
.frame_processor(LengthPrefixedProcessor::default())
.serializer(HeaderSerializer)
.route(
Expand All @@ -73,7 +73,7 @@ async fn main() -> io::Result<()> {
})
}),
)
.unwrap()
.expect("failed to add ping route")
.route(
2,
Arc::new(|_env: &Envelope| {
Expand All @@ -82,14 +82,14 @@ async fn main() -> io::Result<()> {
})
}),
)
.unwrap();
.expect("failed to add pong route");

let (mut client, server) = duplex(1024);
let server_task = tokio::spawn(async move {
app.handle_connection(server).await;
});

let payload = Ping.to_bytes().unwrap();
let payload = Ping.to_bytes().expect("failed to serialize Ping message");
let mut frame = Vec::new();
frame.extend_from_slice(&1u16.to_be_bytes());
frame.push(0);
Expand All @@ -98,11 +98,11 @@ async fn main() -> io::Result<()> {
let mut bytes = BytesMut::new();
LengthPrefixedProcessor::default()
.encode(&frame, &mut bytes)
.unwrap();
.expect("failed to encode frame");

client.write_all(&bytes).await?;
client.shutdown().await?;

server_task.await.unwrap();
server_task.await.expect("server task failed");
Ok(())
}
17 changes: 10 additions & 7 deletions examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,32 @@ where
type Error = std::convert::Infallible;

async fn call(&self, req: ServiceRequest) -> Result<ServiceResponse, Self::Error> {
let cid = req.correlation_id();
let (ping_req, _) = match Ping::from_bytes(req.frame()) {
Ok(val) => val,
Err(e) => {
eprintln!("failed to decode ping: {e:?}");
return Ok(ServiceResponse::new(encode_error(format!(
"decode error: {e:?}"
))));
return Ok(ServiceResponse::new(
encode_error(format!("decode error: {e:?}")),
cid,
));
}
};
let mut response = self.inner.call(req).await?;
let pong_resp = if let Some(v) = ping_req.0.checked_add(1) {
Pong(v)
} else {
eprintln!("ping overflowed at {}", ping_req.0);
return Ok(ServiceResponse::new(encode_error("overflow")));
return Ok(ServiceResponse::new(encode_error("overflow"), cid));
};
match pong_resp.to_bytes() {
Ok(bytes) => *response.frame_mut() = bytes,
Err(e) => {
eprintln!("failed to encode pong: {e:?}");
return Ok(ServiceResponse::new(encode_error(format!(
"encode error: {e:?}"
))));
return Ok(ServiceResponse::new(
encode_error(format!("encode error: {e:?}")),
cid,
));
}
}
Ok(response)
Expand Down
Loading
Loading