Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,10 @@ management. Contributors should follow these best practices when working on the
project:

- Run `make fmt`, `make lint`, and `make test` before committing. These targets
wrap `cargo fmt`, `cargo clippy`, and `cargo test` with the appropriate
flags.
wrap `cargo fmt`, `cargo clippy`, and `cargo test` with the appropriate flags.
- Clippy warnings MUST be disallowed.
- Fix any warnings emitted during tests in the code itself rather than
silencing them.
- Fix any warnings emitted during tests in the code itself rather than silencing
them.
- Where a function is too long, extract meaningfully named helper functions
adhering to separation of concerns and CQRS.
- Where a function has too many parameters, group related parameters in
Expand All @@ -121,8 +120,8 @@ project:
- Validate Markdown files using `markdownlint`.
- Run `mdformat-all` after any documentation changes to format all Markdown
files and fix table markup.
- Validate Markdown Mermaid diagrams using the `nixie` CLI. The tool is
already installed; run `nixie` directly instead of using `npx`.
- Validate Markdown Mermaid diagrams using the `nixie` CLI. The tool is already
installed; run `nixie` directly instead of using `npx`.

### Key Takeaway

Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ encoded using the application's configured serializer and written back through
the `FrameProcessor`【F:docs/rust-binary-router-library-design.md†L718-L724】.

The included `LengthPrefixedProcessor` illustrates a simple framing strategy
based on a big‑endian length
that prefixes each frame with its length. The format is configurable (prefix
size and endianness) and defaults to a 4‑byte big‑endian length
prefix【F:docs/rust-binary-router-library-design.md†L1076-L1117】.

## Connection Lifecycle
Expand All @@ -113,9 +114,9 @@ when the connection ends.

Extractors are types that implement `FromMessageRequest`. When a handler lists
an extractor as a parameter, `wireframe` automatically constructs it using the
incoming \[`MessageRequest`\] and remaining \[`Payload`\]. Built‑in extractors like
`Message<T>`, `SharedState<T>` and `ConnectionInfo` decode the payload, access
app state or expose peer information.
incoming \[`MessageRequest`\] and remaining \[`Payload`\]. Built‑in extractors
like `Message<T>`, `SharedState<T>` and `ConnectionInfo` decode the payload,
access app state or expose peer information.

Custom extractors let you centralize parsing and validation logic that would
otherwise be duplicated across handlers. A session token parser, for example,
Expand Down Expand Up @@ -168,9 +169,9 @@ let logging = from_fn(|req, next| async move {

## Current Limitations

Connection handling now processes frames and routes messages, but the
server is still experimental. Release builds fail to compile, so the
library cannot be used accidentally in production.
Connection handling now processes frames and routes messages, but the server is
still experimental. Release builds fail to compile, so the library cannot be
used accidentally in production.

## Roadmap

Expand Down
7 changes: 4 additions & 3 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ after formatting. Line numbers below refer to that file.
- [ ] Implement middleware using `Transform`/`Service` traits.

- [x] Implement `ServiceRequest` and `ServiceResponse` wrappers (lines
866-899) and introduce a `Next` helper to build the asynchronous call
chain. Trait definitions live in
[`src/middleware.rs`](../src/middleware.rs#L71-L84).
866-899) and introduce a `Next` helper to build the asynchronous call chain.
Comment thread
leynos marked this conversation as resolved.
Trait definitions live in
Comment thread
leynos marked this conversation as resolved.
[`src/middleware.rs`](../src/middleware.rs#L71-L84).

- [ ] Provide a `from_fn` helper for functional middleware.
- [x] Add tests verifying middleware can modify requests and observe
responses.
Expand Down
1 change: 0 additions & 1 deletion docs/rust-binary-router-library-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,6 @@ instance of each type can exist; later registrations overwrite earlier ones.
a specific field in all messages, validate it, and provide a `UserSession`
object to the handler.


This extractor system, backed by Rust's strong type system, ensures that
handlers receive correctly typed and validated data, significantly reducing the
likelihood of runtime errors and boilerplate parsing code within the handler
Expand Down
8 changes: 6 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use bytes::BytesMut;
use tokio::io::{self, AsyncWrite, AsyncWriteExt};

use crate::{
frame::{FrameProcessor, LengthPrefixedProcessor},
frame::{FrameProcessor, LengthFormat, LengthPrefixedProcessor},
message::Message,
serializer::{BincodeSerializer, Serializer},
};
Expand Down Expand Up @@ -147,12 +147,16 @@ where
S: Serializer + Default,
C: Send + 'static,
{
///
/// Initialises empty routes, services, middleware, and application data.
/// Sets the default frame processor and serializer, with no connection
/// lifecycle hooks.
fn default() -> Self {
Self {
routes: HashMap::new(),
services: Vec::new(),
middleware: Vec::new(),
frame_processor: Box::new(LengthPrefixedProcessor),
frame_processor: Box::new(LengthPrefixedProcessor::new(LengthFormat::default())),
Comment thread
leynos marked this conversation as resolved.
serializer: S::default(),
app_data: HashMap::new(),
on_connect: None,
Expand Down
212 changes: 196 additions & 16 deletions src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,158 @@

Comment thread
leynos marked this conversation as resolved.
use std::io;

use bytes::{Buf, BytesMut};
use bytes::{Buf, BufMut, BytesMut};

/// Byte order used for encoding and decoding length prefixes.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Endianness {
/// Most significant byte first.
Big,
/// Least significant byte first.
Little,
}

/// Format of the length prefix preceding each frame.
#[derive(Clone, Copy, Debug)]
pub struct LengthFormat {
bytes: usize,
endianness: Endianness,
}

impl LengthFormat {
/// Creates a new `LengthFormat` with the specified number of bytes and
/// endianness for the length prefix.
///
/// # Parameters
/// - `bytes`: The number of bytes used for the length prefix.
/// - `endianness`: The byte order for encoding and decoding the length prefix.
///
/// # Returns
/// A `LengthFormat` configured with the given size and endianness.
#[must_use]
pub const fn new(bytes: usize, endianness: Endianness) -> Self { Self { bytes, endianness } }

/// Creates a `LengthFormat` for a 2-byte big-endian length prefix.
#[must_use]
pub const fn u16_be() -> Self { Self::new(2, Endianness::Big) }

/// Creates a `LengthFormat` for a 2-byte little-endian length prefix.
#[must_use]
pub const fn u16_le() -> Self { Self::new(2, Endianness::Little) }

/// Creates a `LengthFormat` for a 4-byte big-endian length prefix.
#[must_use]
pub const fn u32_be() -> Self { Self::new(4, Endianness::Big) }

/// Creates a `LengthFormat` for a 4-byte little-endian length prefix.
#[must_use]
pub const fn u32_le() -> Self { Self::new(4, Endianness::Little) }

/// Reads a length prefix from a byte slice according to the configured prefix size and
/// endianness.
///
/// # Parameters
/// - `bytes`: The byte slice containing the length prefix. Must be at least as long as the
/// configured prefix size.
///
/// # Returns
/// The decoded length as a `usize` if successful.
///
/// # Errors
/// Returns an error if the prefix size is unsupported or if the decoded length does not fit in
/// a `usize`.
fn read_len(&self, bytes: &[u8]) -> io::Result<usize> {
let len = match (self.bytes, self.endianness) {
(1, _) => u64::from(u8::from_ne_bytes([bytes[0]])),
(2, Endianness::Big) => u64::from(u16::from_be_bytes([bytes[0], bytes[1]])),
(2, Endianness::Little) => u64::from(u16::from_le_bytes([bytes[0], bytes[1]])),
(4, Endianness::Big) => {
u64::from(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
(4, Endianness::Little) => {
u64::from(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
Comment thread
leynos marked this conversation as resolved.
}
(8, Endianness::Big) => u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]),
(8, Endianness::Little) => u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"unsupported length prefix size",
));
}
};
usize::try_from(len).map_err(|_| io::Error::other("frame too large"))
}

/// Writes a length prefix to the destination buffer using the configured size and endianness.
///
/// Returns an error if the length is too large to fit in the configured prefix size or if the
/// prefix size is unsupported.
///
/// # Parameters
/// - `len`: The length value to encode and write.
/// - `dst`: The buffer to which the encoded length prefix will be appended.
///
/// # Errors
/// Returns an error if `len` exceeds the maximum value for the configured prefix size or if the
/// prefix size is not supported.
fn write_len(&self, len: usize, dst: &mut BytesMut) -> io::Result<()> {
match (self.bytes, self.endianness) {
(1, _) => dst.put_u8(
u8::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?,
),
(2, Endianness::Big) => dst.put_slice(
&u16::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?
.to_be_bytes(),
),
(2, Endianness::Little) => dst.put_slice(
&u16::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?
.to_le_bytes(),
),
(4, Endianness::Big) => dst.put_slice(
&u32::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?
.to_be_bytes(),
),
(4, Endianness::Little) => dst.put_slice(
&u32::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?
.to_le_bytes(),
),
(8, Endianness::Big) => dst.put_slice(
&u64::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?
.to_be_bytes(),
),
(8, Endianness::Little) => dst.put_slice(
&u64::try_from(len)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?
.to_le_bytes(),
),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"unsupported length prefix size",
));
}
}
Ok(())
}
}

impl Default for LengthFormat {
/// Returns a `LengthFormat` using a 4-byte big-endian length prefix.
///
/// This is the default format for length-prefixed framing.
fn default() -> Self { Self::u32_be() }
}

/// Trait defining how raw bytes are decoded into frames and how frames are
/// encoded back into bytes for transmission.
Expand Down Expand Up @@ -38,34 +189,63 @@ pub trait FrameProcessor: Send + Sync {
fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>;
}

/// Simple length-prefixed framing using big-endian u32 lengths.
pub struct LengthPrefixedProcessor;
/// Simple length-prefixed framing using a configurable length prefix.
pub struct LengthPrefixedProcessor {
format: LengthFormat,
}

impl LengthPrefixedProcessor {
/// Creates a new `LengthPrefixedProcessor` with the specified length prefix
/// format.
///
/// # Parameters
/// - `format`: The length prefix format to use for framing.
///
/// # Returns
/// A `LengthPrefixedProcessor` configured with the given length format.
#[must_use]
pub const fn new(format: LengthFormat) -> Self { Self { format } }
}

impl Default for LengthPrefixedProcessor {
/// Creates a `LengthPrefixedProcessor` using the default length format (4-byte big-endian
/// prefix).
///
/// # Returns
/// A processor configured for 4-byte big-endian length-prefixed framing.
fn default() -> Self { Self::new(LengthFormat::default()) }
}

impl FrameProcessor for LengthPrefixedProcessor {
type Frame = Vec<u8>;
type Error = std::io::Error;

/// Attempts to decode a single length-prefixed frame from the source buffer.
///
/// Returns `Ok(Some(frame))` if a complete frame is available, `Ok(None)` if
/// more data is needed, or an error if the length prefix is invalid or cannot
/// be read according to the configured format.
///
/// The source buffer is advanced past the decoded frame and its length prefix.
fn decode(&self, src: &mut BytesMut) -> Result<Option<Self::Frame>, Self::Error> {
if src.len() < 4 {
if src.len() < self.format.bytes {
return Ok(None);
}
let mut len_bytes = [0u8; 4];
len_bytes.copy_from_slice(&src[..4]);
let len = u32::from_be_bytes(len_bytes);
let len_usize = usize::try_from(len).map_err(|_| io::Error::other("frame too large"))?;
if src.len() < 4 + len_usize {
let len = self.format.read_len(&src[..self.format.bytes])?;
if src.len() < self.format.bytes + len {
return Ok(None);
}
src.advance(4);
Ok(Some(src.split_to(len_usize).to_vec()))
src.advance(self.format.bytes);
Ok(Some(src.split_to(len).to_vec()))
}

/// Encodes a frame by prefixing it with its length and appending it to the destination buffer.
///
/// The length prefix format is determined by the processor's configuration. Returns an error
/// if the frame length cannot be represented in the configured format.
fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
use bytes::BufMut;
dst.reserve(4 + frame.len());
let len = u32::try_from(frame.len())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?;
dst.put_u32(len);
dst.reserve(self.format.bytes + frame.len());
self.format.write_len(frame.len(), dst)?;
dst.extend_from_slice(frame);
Ok(())
}
Expand Down
Loading