diff --git a/Cargo.lock b/Cargo.lock index 8e1f294a..603d21de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2987,6 +2987,7 @@ dependencies = [ "metrics-util", "rstest 0.18.2", "tokio", + "tokio-util", "wireframe", ] diff --git a/README.md b/README.md index 92dcc5c6..0a4e95cb 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ reduce this boilerplate through layered abstractions: These layers correspond to the architecture outlined in the design document【F:docs/rust-binary-router-library-design.md†L292-L344】. -## API Overview +## API overview Applications are configured using a builder pattern similar to Actix Web. A `WireframeApp` defines routes and middleware, while `WireframeServer` manages @@ -86,7 +86,7 @@ binary protocol server. See the [full example](docs/rust-binary-router-library-design.md#5-6-illustrative-api-usage-examples) in the design document for further details. -## Custom Envelopes +## Custom envelopes `WireframeApp` defaults to a simple `Envelope` containing a message ID and raw payload bytes. Applications can supply their own envelope type by calling @@ -126,22 +126,21 @@ Use `None` rather than `Some(0)` when a frame lacks a correlation ID. See This allows integration with existing packet formats without modifying `handle_frame`. -## Response Serialization and Framing +## Response serialization and framing Handlers can return types implementing the `Responder` trait. These values are -encoded using the application's configured serializer and written back through -the `FrameProcessor`【F:docs/rust-binary-router-library-design.md†L724-L730】. +encoded using the application's configured serializer and framed by a +length‑delimited codec【F:docs/rust-binary-router-library-design.md†L724-L730】. -The included `LengthPrefixedProcessor` illustrates a simple framing strategy -that prefixes each frame with its length. The format is configurable (prefix -size and endianness) and defaults to a 4‑byte big‑endian length -prefix【F:docs/rust-binary-router-library-design.md†L1082-L1123】. +Frames are length prefixed using `tokio_util::codec::LengthDelimitedCodec`. The +prefix length and byte order are configurable and default to a 4‑byte +big‑endian header【F:docs/rust-binary-router-library-design.md†L1082-L1123】. ```rust let app = WireframeApp::new()?; ``` -## Connection Lifecycle +## Connection lifecycle Protocol callbacks are consolidated under the `WireframeProtocol` trait, replacing the individual `on_connection_setup`/`on_connection_teardown` @@ -195,7 +194,7 @@ impl WireframeProtocol for MySqlProtocolImpl { let app = WireframeApp::new().with_protocol(MySqlProtocolImpl); ``` -## Session Registry +## Session registry The \[`SessionRegistry`\] stores weak references to \[`PushHandle`\]s for active connections. Background tasks can look up a handle by \[`ConnectionId`\] @@ -221,7 +220,7 @@ fn on_connection_setup(&self, handle: PushHandle, _ctx: &mut Connection } ``` -## Custom Extractors +## Custom extractors Extractors are types that implement `FromMessageRequest`. When a handler lists an extractor as a parameter, `wireframe` automatically constructs it using the @@ -312,7 +311,7 @@ $ cargo run --example ping_pong $ printf '\x00\x00\x00\x08\x01\x00\x00\x00\x2a\x00\x00\x00' | nc 127.0.0.1 7878 | xxd ``` -## Current Limitations +## Current limitations Connection handling now processes frames and routes messages. Although the server is still experimental, it now compiles in release mode for evaluation or diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 6ad8bd67..fa20051f 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -296,7 +296,7 @@ struct PushHandleInner { // The public, cloneable handle. #[derive(Clone)] -pub struct PushHandle(Arc>); + pub struct PushHandle(Arc>); pub enum PushPolicy { ReturnErrorIfFull, @@ -317,8 +317,39 @@ impl PushHandle { frame: F, priority: PushPriority, policy: PushPolicy, - ) -> Result<(), PushError>; -} + ) -> Result<(), PushError>; + } +``` + +The example below demonstrates pushing frames and returning a streamed +response. The [`async-stream`](https://docs.rs/async-stream) crate is the +canonical way to build dynamic `Response::Stream` values. + +```rust,no_run +use async_stream::try_stream; +use std::sync::Arc; +use wireframe::{app::{Envelope, WireframeApp}, Response}; + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let app = WireframeApp::new()? + .route(1, Arc::new(|_: &Envelope| Box::pin(async { + Response::Stream(Box::pin(try_stream! { + yield b"ack".to_vec(); + })) + })))?; + + let (push, mut conn) = wireframe_testing::connect(app).await?; + tokio::spawn(async move { + push.push_high_priority(b"urgent".to_vec()).await.unwrap(); + push.push_low_priority(b"stats".to_vec()).await.unwrap(); + }); + + while let Some(frame) = conn.next().await { + println!("{:?}", frame); + } + Ok(()) + } ``` ```mermaid @@ -707,10 +738,23 @@ features of the 1.0 release. that urgent pushes can interrupt a long-running data stream. - **Message Fragmentation:** Pushes occur at the *logical frame* level. The - `FragmentAdapter` will operate at a lower layer in the `FrameProcessor` - stack, transparently splitting any large pushed frames before they are - written to the socket. The `PushHandle` and the application code that uses it - remain completely unaware of fragmentation. + `FragmentAdapter` will operate at a lower layer in the codec stack, + transparently splitting any large pushed frames before they are written to + the socket. The `PushHandle` and the application code that uses it remain + completely unaware of fragmentation. + +```rust +// Codec stack with explicit frame-size limits and fragmentation. +use tokio_util::codec::LengthDelimitedCodec; +const MAX_FRAME: usize = 64 * 1024; +let codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) // 64 KiB cap to prevent OOM + .new_codec(); + +// Wrap the length-delimited frames with the fragmentation adapter. +// Pseudocode pending actual adapter API naming: +// let codec = FragmentAdapter::new(FragmentConfig::default()).wrap(codec); +``` ## 7. Use Cases diff --git a/docs/efficiency-report.md b/docs/efficiency-report.md deleted file mode 100644 index ed004504..00000000 --- a/docs/efficiency-report.md +++ /dev/null @@ -1,143 +0,0 @@ -# Wireframe Efficiency Improvement Report - -## Executive Summary - -This report documents efficiency improvement opportunities identified in the wireframe Rust library codebase. The analysis focused on memory allocations, unnecessary clones, and performance bottlenecks in the frame processing pipeline and connection handling. - -## Key Findings - -### 1. Frame Processor Unnecessary Allocation (HIGH IMPACT) - -**Location**: `src/frame/processor.rs:75` -**Issue**: The `LengthPrefixedProcessor::decode` method performs an unnecessary allocation by calling `.to_vec()` on a `BytesMut` returned from `split_to()`. - -```rust -// Current inefficient code: -Ok(Some(src.split_to(len).to_vec())) -``` - -**Impact**: This allocation occurs for every frame processed, creating performance overhead in high-throughput scenarios. - -**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type to work directly with `Bytes` to avoid the conversion entirely. - -**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more efficient. - -### 2. Connection Actor Clone Operations (MEDIUM IMPACT) - -**Location**: `src/connection.rs:195, 252` -**Issue**: Multiple `clone()` operations on `CancellationToken` and other types in the connection actor. - -```rust -pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } -() = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, -``` - -**Impact**: Moderate - these clones are necessary for the async select pattern but could be optimized in some cases. - -**Recommendation**: Review if some clones can be avoided through better lifetime management. - -### 3. Middleware Chain Building (MEDIUM IMPACT) - -**Location**: `src/app.rs:599` -**Issue**: Handler cloning during middleware chain construction. - -```rust -let mut service = HandlerService::new(id, handler.clone()); -``` - -**Impact**: Moderate - occurs during application setup, not in hot path. - -**Recommendation**: Consider using `Arc` references more efficiently. - -### 4. Session Registry Operations (LOW-MEDIUM IMPACT) - -**Location**: `src/session.rs:47-55` - -**Issue**: `Vec::with_capacity` followed by potential reallocation during `retain_and_collect`. - -```rust -let mut out = Vec::with_capacity(self.0.len()); -``` - -**Impact**: Low to medium - depends on registry size and pruning frequency. - -**Recommendation**: Consider more efficient collection strategies for large registries. - -### 5. Vector Initializations (LOW IMPACT) - -**Location**: Various files - -**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is known. - -**Impact**: Low - minor allocation optimizations. - -**Recommendation**: Use `with_capacity` when the expected size is known. - -## Performance Characteristics - -### Frame Processing Pipeline - -- **Bottleneck**: Frame decode/encode operations in high-throughput scenarios -- **Critical Path**: `LengthPrefixedProcessor::decode` method -- **Optimization Priority**: High - affects every incoming frame - -### Connection Handling - -- **Bottleneck**: Connection actor event loop and fairness tracking -- **Critical Path**: `tokio::select!` in connection actor -- **Optimization Priority**: Medium - affects per-connection performance - -### Message Routing - -- **Bottleneck**: HashMap lookups for route resolution -- **Critical Path**: Route handler lookup in `WireframeApp` -- **Optimization Priority**: Low - HashMap lookups are already efficient - -## Implemented Optimizations - -### Frame Processor Optimization -**Change**: Modified `LengthPrefixedProcessor::decode` to use `freeze().to_vec()` instead of direct `.to_vec()`. - -**Before**: -```rust -Ok(Some(src.split_to(len).to_vec())) -``` - -**After**: -```rust -Ok(Some(src.split_to(len).freeze().to_vec())) -``` - -**Benefits**: -- Reduces memory allocations in the frame processing hot path -- Maintains API compatibility with existing code -- Improves performance for high-throughput scenarios -- No breaking changes to the public API - -## Future Optimization Opportunities - -1. **Frame Type Optimization**: Consider changing the frame type from `Vec` to `Bytes` to eliminate the final `.to_vec()` call entirely. - -2. **Connection Actor Pooling**: Implement connection actor pooling to reduce setup/teardown overhead. - -3. **Middleware Chain Caching**: Cache built middleware chains to avoid reconstruction. - -4. **Session Registry Batching**: Implement batched operations for session registry updates. - -5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where possible. - -## Testing and Validation - -All optimizations have been tested to ensure: - -- ✅ Compilation succeeds with `cargo check` -- ✅ No new clippy warnings introduced -- ✅ Existing test suite passes -- ✅ API compatibility maintained -- ✅ Performance improvement verified - -## Conclusion - -The implemented frame processor optimization provides immediate performance benefits for the most critical code path in the wireframe library. The additional opportunities identified in this report provide a roadmap for future performance improvements, prioritized by impact and implementation complexity. - -The changes maintain full backward compatibility while improving performance characteristics, making them safe to deploy in production environments. diff --git a/docs/frame-metadata.md b/docs/frame-metadata.md index 557096dc..ab028923 100644 --- a/docs/frame-metadata.md +++ b/docs/frame-metadata.md @@ -1,29 +1,38 @@ -# Parsing Frame Metadata +# Parsing frame metadata `FrameMetadata` allows a protocol to inspect frame headers before decoding the entire payload. This can be useful for routing decisions when the message type is encoded in a header field. -Implement the trait for your serializer or decoder that knows how to read the +Implement the trait for your serialiser or decoder that knows how to read the header bytes. Only the minimal header portion should be read, returning the -full frame and the number of bytes consumed from the input. +frame envelope and the number of bytes consumed from the input. ```rust -use wireframe::frame::{FrameMetadata, FrameProcessor}; +use wireframe::frame::FrameMetadata; use wireframe::app::Envelope; -use bytes::BytesMut; +use tokio_util::codec::{Decoder, Encoder}; +use bytes::{Bytes, BytesMut}; struct MyCodec; - -impl FrameProcessor for MyCodec { - type Frame = Vec; +// Example only: implement the minimal Decoder/Encoder surface for docs. +impl Decoder for MyCodec { + // Using BytesMut here mirrors LengthDelimitedCodec’s default Item. + type Item = BytesMut; type Error = std::io::Error; - fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { + fn decode( + &mut self, + _src: &mut BytesMut, + ) -> Result, Self::Error> { todo!() } +} + +impl Encoder for MyCodec { + type Error = std::io::Error; - fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, _item: Bytes, _dst: &mut BytesMut) -> Result<(), Self::Error> { todo!() } } @@ -39,5 +48,5 @@ impl FrameMetadata for MyCodec { } ``` -In `WireframeApp` the metadata parser is used before deserialisation so routes +In `WireframeApp` the metadata parser is used before deserialization so routes can be selected as soon as the header is available. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index f173b120..5804b499 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -1,12 +1,13 @@ -# Comprehensive Design: Generic Message Fragmentation & Re-assembly +# Comprehensive design: Generic message fragmentation & re‑assembly -## 1. Introduction & Philosophy +## 1. Introduction and philosophy -Many robust network protocols, from modern RPC systems to legacy financial -standards, require the ability to split a single logical message into multiple -physical frames. This can be necessary to respect MTU limits, handle large data -payloads, or align with encryption block sizes. `wireframe`'s current model, -which processes one inbound frame to one logical frame, cannot handle this. +Many robust network protocols, from modern Remote Procedure Call (RPC) systems +to legacy financial standards, require the ability to split a single logical +message into multiple physical frames. This can be necessary to respect Maximum +Transmission Unit (MTU) limits, handle large data payloads, or align with +encryption block sizes. `wireframe`'s current model, which processes one +inbound frame to one logical frame, cannot handle this. This document details the design for a generic, protocol-agnostic fragmentation and re-assembly layer. The core philosophy is to treat this as a **transparent @@ -16,47 +17,47 @@ This new layer will be responsible for automatically splitting oversized outbound frames and meticulously re-assembling inbound fragments into a single, coherent message before they reach the router. +> Status: design proposal. API names, trait bounds, and configuration shapes +> may change before stabilisation. + This feature is a critical component of the "Road to Wireframe 1.0," designed to seamlessly integrate with and underpin the streaming and server-push capabilities. -## 2. Design Goals & Requirements +## 2. Design goals and requirements The implementation must satisfy the following core requirements: - - + | ID | Goal | | --- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | G1 | Transparent inbound re-assembly → The router and handlers must always receive one complete, logical Frame. | | G2 | Transparent outbound fragmentation when a payload exceeds a configurable, protocol-specific size. | | G3 | Pluggable Strategy: The logic for parsing and building fragment headers, detecting the final fragment, and managing sequence numbers must be supplied by the protocol implementation, not hard-coded in the framework. | -| G4 | DoS Protection: The re-assembly process must be hardened against resource exhaustion attacks via strict memory and time limits. | +| G4 | Denial‑of‑service (DoS) protection: The re-assembly process must be hardened against resource exhaustion attacks via strict memory and time limits. | | G5 | Zero Friction: Protocols that do not use fragmentation must incur no performance or complexity overhead. This feature must be strictly opt-in. | - +## 3. Core architecture: the `FragmentAdapter` -## 3. Core Architecture: The `FragmentAdapter` +The feature will be implemented as a codec middleware called `FragmentAdapter`. +It is instantiated with a protocol-specific `FragmentStrategy` and wraps any +subsequent codecs in the chain. -The feature will be implemented as a `FrameProcessor` middleware called -`FragmentAdapter`. It is instantiated with a protocol-specific -`FragmentStrategy` and wraps any subsequent processors in the chain. - -``` -Socket I/O ↔ ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers +```plaintext +Socket I/O ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers ``` This layered approach ensures that fragmentation is handled on clear-text, uncompressed data, as required by most protocol specifications. -### 3.1 State Management for Multiplexing +### 3.1 State management for multiplexing A critical requirement for modern protocols is the ability to handle interleaved fragments from different logical messages on the same connection. To support this, the `FragmentAdapter` will not maintain a single re-assembly state, but a map of concurrent re-assembly processes. -```Rust +```rust use dashmap::DashMap; use std::sync::atomic::AtomicU64; use std::time::{Duration, Instant}; @@ -77,6 +78,8 @@ pub struct FragmentAdapter { struct PartialMessage { /// The buffer holding the accumulating payload. buffer: BytesMut, + /// The advertised total payload size, if known. + expected_total: Option, /// The sequence number of the last fragment received. last_sequence: u64, /// The time the first fragment was received. @@ -88,7 +91,7 @@ The use of `dashmap::DashMap` allows for lock-free reads and sharded writes, providing efficient and concurrent access to the re-assembly buffers without blocking the entire connection task. -## 4. Public API: The `FragmentStrategy` Trait +## 4. Public API: the `FragmentStrategy` trait The power and flexibility of this feature come from the `FragmentStrategy` trait. Protocol implementers will provide a type that implements this trait to @@ -99,7 +102,7 @@ inject their specific fragmentation rules into the generic `FragmentAdapter`. The trait is designed to be context-aware and expressive, allowing it to model a wide range of protocols. -```Rust +```rust use bytes::BytesMut; use std::io; @@ -149,26 +152,57 @@ pub trait FragmentStrategy: 'static + Send + Sync { is_final: bool, msg_id: u64, seq: u64, - ); + ) -> io::Result<()>; } ``` ### 4.2 Configuration Developers will enable fragmentation by adding the `FragmentAdapter` to their -`FrameProcessor` chain via the `WireframeApp` builder. +codec chain via the `WireframeApp` builder. -```Rust -// Example: Configuring a server for MySQL-style fragmentation. +```rust +// Pseudo‑API: enable fragmentation with a strategy on the codec stack. WireframeServer::new(|| { - WireframeApp::new() - .route(...) + let mut app = WireframeApp::new(); + app + .codec({ + // Match the application buffer capacity to avoid default 8 MiB limits. + let cap = app.buffer_capacity(); + LengthDelimitedCodec::builder() + .max_frame_length(cap) + .new_codec() + }) + .codec(FragmentAdapter::new(MySqlStrategy::new())) + .route(/* ... */) }) ``` +Configure the same max frame length for all codec instances on inbound and +outbound paths. + +```rust,no_run +use async_stream::try_stream; +use wireframe::{app::WireframeApp, Response}; + +async fn fragmented() -> Response> { + // `FragmentAdapter` splits oversized payloads automatically. + Response::Vec(vec![vec![0_u8; 128 * 1024]]) +} + +async fn streamed() -> Response> { + Response::Stream(Box::pin(try_stream! { + yield vec![1, 2, 3]; + })) +} +``` + +`async-stream` is the canonical crate for constructing dynamic +`Response::Stream` values. + ## 5. Implementation Logic -### 5.1 Inbound Path (Re-assembly) +### 5.1 Inbound path (re‑assembly) The re-assembly logic is the most complex part of the feature and must be robust against errors and attacks. @@ -194,8 +228,9 @@ robust against errors and attacks. passed down the chain immediately without being buffered. - If `meta.is_final` is false, a new `PartialMessage` is created. The - buffer is pre-allocated if `meta.total_message_len` is `Some`. The - `started_at` timestamp is recorded. + buffer is pre-allocated if `meta.total_message_len` is `Some`, and + `expected_total` stores this hint. The `started_at` timestamp is + recorded. - **Continuing Message (**`.get_mut()`**):** @@ -210,15 +245,15 @@ robust against errors and attacks. - **Final Fragment:** If `meta.is_final` is true, the full payload is extracted from the `PartialMessage`, the entry is removed from the map, - and the complete logical frame is passed down the processor chain. + then pass the complete logical frame down the codec chain. -4. **Timeout Handling:** A separate, low-priority background task within the - `FragmentAdapter` will periodically iterate over the `reassembly_buffers`, - checking the `started_at` timestamp of each `PartialMessage`. Any entry that - has exceeded `reassembly_timeout` is removed, and a `WARN`-level `tracing` - event is emitted. +4. **Timeout handling:** Run a background task within the + `FragmentAdapter` that periodically iterates over the re‑assembly buffers, + checks each `PartialMessage`’s `started_at` timestamp, and removes any entry + that has exceeded the re‑assembly timeout, emitting a `WARN`‑level `tracing` + event. -### 5.2 Outbound Path (Fragmentation) +### 5.2 Outbound path (fragmentation) The outbound path is simpler and purely procedural. @@ -226,7 +261,7 @@ The outbound path is simpler and purely procedural. `frame.len()` against `strategy.max_fragment_payload(&frame)`. 2. **No Fragmentation:** If the frame is small enough, it is passed directly to - the next `FrameProcessor` in the chain. + the next codec in the chain. 3. **Fragmentation:** If the frame is too large: @@ -238,12 +273,12 @@ The outbound path is simpler and purely procedural. - For each chunk, it calls `strategy.encode_header()` to write the fragment header (with the correct `msg_id`, `seq`, and `is_final` flag) into a - temporary buffer, followed by the payload chunk. + buffer, propagating any error, followed by the payload chunk. - Each fully formed fragment is then passed individually to the next - `FrameProcessor`. + codec. -## 6. Synergy with Other 1.0 Features +## 6. Synergy with other 1.0 features This feature is designed as a foundational layer that other features build upon. @@ -257,17 +292,14 @@ This feature is designed as a foundational layer that other features build upon. fragmented before transmission. The application code remains blissfully unaware of the underlying network constraints. -## 7. Measurable Objectives & Success Criteria - - +## 7. Measurable objectives and success criteria + | Category | Objective | Success Metric | | --------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | API Correctness | The FragmentStrategy trait and FragmentAdapter are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | -| Functionality | A large logical frame is correctly split into N fragments, and a sequence of N fragments is correctly re-assembled into the original frame. | An end-to-end test confirms byte-for-byte identity of a 64 MiB payload after being fragmented and re-assembled. | +| Functionality | A large logical frame is correctly split into N fragments, and a sequence of N fragments is correctly re-assembled into the original frame. | An end-to-end test confirms byte-for-byte identity of a payload at the configured max_message_size after being fragmented and re-assembled. | | Multiplexing | The adapter can correctly re-assemble two messages whose fragments are interleaved. | A test sending fragments A1, B1, A2, B2, A3, B3 must result in two correctly re-assembled messages, A and B. | -| Resilience | The adapter protects against memory exhaustion from oversized messages. | A test sending fragments for a 2 GiB message against a 1 GiB max_message_size limit must terminate the connection and not allocate more than ~1 GiB of buffer memory. | +| Resilience | The adapter protects against memory exhaustion from oversized messages. | A test sending fragments that exceed max_message_size must terminate the connection and not allocate beyond the configured cap (including allocator overhead). | | Resilience | The adapter protects against resource leaks from abandoned partial messages. | A test that sends an initial fragment but never the final one must result in the partial buffer being purged after the reassembly_timeout duration has passed. | | Performance | The overhead for messages that do not require fragmentation is minimal. | A criterion benchmark passing a stream of small, non-fragmented frames through the FragmentAdapter must show < 5% throughput degradation compared to a build without the adapter. | - - diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index a9c55766..88c2d80e 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -71,9 +71,9 @@ The public API is designed for clarity, performance, and ergonomic flexibility. ### 4.1 The `Response` Enum The `Response` enum is the primary return type for all handlers. It is enhanced -to provide optimised paths for common response patterns. +to provide optimized paths for common response patterns. -```Rust +```rust use futures_core::stream::Stream; use std::pin::Pin; @@ -82,7 +82,7 @@ pub enum Response { /// A single frame reply. The most common case. Single(F), - /// An optimised response for a small, known list of frames. + /// An optimized response for a small, known list of frames. /// Avoids the overhead of boxing and dynamic dispatch for simple multi-part replies. Vec(Vec), @@ -109,9 +109,8 @@ To enable more robust error handling, a generic error enum will be introduced. This allows the framework and protocol implementations to distinguish between unrecoverable transport failures and logical, protocol-level errors. -```Rust +```rust /// A generic error type for wireframe operations. -# pub enum WireframeError { /// An error occurred in the underlying transport (e.g., socket closed). /// These are typically unrecoverable for the connection. @@ -139,7 +138,7 @@ The following examples illustrate how developers will use the new API. Existing code continues to work without modification, fulfilling goal **G4**. -```Rust +```rust async fn handle_ping(_req: Request) -> Result, MyError> { // `MyFrame` implements `Into>` Ok(build_pong_frame().into()) @@ -151,10 +150,10 @@ async fn handle_ping(_req: Request) -> Result, MyErro For simple, fixed-size multi-part responses, like a MySQL result set header, `Response::Vec` is both ergonomic and performant. -```Rust +```rust async fn handle_select_headers(_req: Request) -> Result, MyError> { // Pre-build frames for: column-count, column-def, EOF - let frames = vec!; + let frames = vec![]; Ok(Response::Vec(frames)) } ``` @@ -164,7 +163,7 @@ async fn handle_select_headers(_req: Request) -> Result Result, PgError> { @@ -182,10 +181,14 @@ async fn handle_copy_out(req: PgCopyRequest) -> Result`: For explicit error handling. If `Ok(response_message)`, the message is sent. If `Err(error_value)`, the error is processed by "wireframe's" error handling mechanism (see Section @@ -842,7 +834,7 @@ extractors (similar to how Actix Web extractors handle HTTP-specific parsing 22), "wireframe" handler functions can remain focused on the core business logic associated with each message type. -### 5.3. Data Extraction and Type Safety +### 5.3. Data extraction and type safety Inspired by Actix Web's extractors 22, "wireframe" will provide a type-safe mechanism for accessing data from incoming messages and connection context @@ -923,6 +915,8 @@ handlers and keeps the handler functions lean and focused on their specific business tasks, mirroring the benefits seen with Actix Web's `FromRequest` trait. +// Class diagram illustrating extractor relationships. + ```mermaid classDiagram class FromMessageRequest { @@ -956,7 +950,7 @@ classDiagram ConnectionInfo o-- SocketAddr ``` -### 5.4. Middleware and Extensibility +### 5.4. Middleware and extensibility "wireframe" will incorporate a middleware system conceptually similar to Actix Web's 5, allowing developers to inject custom logic into the message processing @@ -979,6 +973,8 @@ pipeline. The relationships among these components are illustrated in the following diagram: +// Class diagram of middleware traits and their interaction. + ```mermaid classDiagram class ServiceRequest { @@ -1047,8 +1043,7 @@ let logging = from_fn(|req, next| async move { times for handlers, or active connection counts. - **Frame/Message Transformation**: On-the-fly modification of frames or messages, such as encryption/decryption or compression/decompression. - (Though complex transformations might be better suited to the - `FrameProcessor` layer). + (Though complex transformations might be better suited to the codec layer). - **Request/Response Manipulation**: Modifying message content before it reaches a handler or before a response is sent. - **Connection Lifecycle Hooks**: Performing actions when connections are @@ -1066,7 +1061,7 @@ advantages provided by Actix Web's middleware architecture.26 For instance, a middleware could transparently handle session management for stateful binary protocols, abstracting this complexity away from individual message handlers. -### 5.5. Error Handling Strategy +### 5.5. Error handling strategy Robust and clear error handling is crucial for any network service. "wireframe" will provide a comprehensive error handling strategy. @@ -1141,7 +1136,7 @@ translated into meaningful responses for the client. "wireframe" will adopt similar principles to ensure that errors are handled gracefully and informatively. -### 5.6. Illustrative API Usage Examples +### 5.6. Illustrative API usage examples To demonstrate the intended simplicity and the Actix-Web-inspired API, concrete examples are invaluable. They make the abstract design tangible and showcase @@ -1172,12 +1167,12 @@ how "wireframe" aims to reduce source code complexity. } ``` -- **Frame Processor Implementation** (Simple length-prefixed framing using - `tokio-util`; invalid input or oversized frames return `io::Error` from both - decode and encode): +- **Codec implementation** (Simple length‑prefixed framing using + `tokio-util`; invalid input or oversized frames return `io::Error` from + both decode and encode): -```rust -// Crate: my_frame_processor.rs + ```rust + // Crate: my_codec.rs (codec implementation example) use bytes::{BytesMut, Buf, BufMut}; use tokio_util::codec::{Decoder, Encoder}; use byteorder::{BigEndian, ByteOrder}; @@ -1227,7 +1222,7 @@ impl> Encoder for LengthPrefixedCodec { ``` (Note: "wireframe" would abstract the direct use of `Encoder`/`Decoder` behind -its own `FrameProcessor` trait or provide helpers.) +its own codec trait or provide helpers.) - **Server Setup and Handler**: @@ -1242,7 +1237,7 @@ its own `FrameProcessor` trait or provide helpers.) serializer::BincodeSerializer, }; use my_protocol_messages::{EchoRequest, EchoResponse}; - use my_frame_processor::LengthPrefixedCodec; // Or wireframe's abstraction + use my_codec::LengthPrefixedCodec; // Or wireframe's abstraction use std::time::{SystemTime, UNIX_EPOCH}; // Define a message ID enum if not using type-based routing directly @@ -1270,7 +1265,6 @@ its own `FrameProcessor` trait or provide helpers.) WireframeServer::new(|| { WireframeApp::new() - //.frame_processor(LengthPrefixedCodec) // deprecated: framing handled by codec .serializer(BincodeSerializer) // Specify serializer .route(MyMessageType::Echo, handle_echo) // Route based on ID // OR if type-based routing is supported and EchoRequest has an ID: @@ -1382,7 +1376,6 @@ simplify server implementation. })); WireframeServer::new(move || { WireframeApp::new() - //.frame_processor(...) // deprecated: framing handled by codec .serializer(BincodeSerializer) .app_data(chat_state.clone()) .route(ChatMessageType::ClientJoin, handle_join) @@ -1485,14 +1478,14 @@ choices aim to mitigate them. connection-specific logic. This frees the developer from writing significant networking boilerplate. -- Trait-Based Framing (FrameProcessor): +- Trait-Based Framing (codec): - By allowing users to define or select a FrameProcessor implementation, - "wireframe" separates the logic of how bytes are grouped into frames from the - rest of the application. This means users can plug in different framing - strategies (length-prefixed, delimiter-based, etc.) without altering their - message definitions or handler logic. The library itself can provide common - framing implementations. + By allowing users to define or select a codec implementation, "wireframe" + separates the logic of how bytes are grouped into frames from the rest of the + application. This means users can plug in different framing strategies + (length-prefixed, delimiter-based, etc.) without altering their message + definitions or handler logic. The library itself can provide common codec + implementations. These abstractions collectively contribute to code that is not only less verbose but also more readable, maintainable, and testable. By reducing @@ -1532,9 +1525,8 @@ applicability. - **Advanced Framing Options**: - - **Built-in Codecs**: Providing a richer set of built-in `FrameProcessor` - implementations for common framing techniques beyond simple - length-prefixing, such as: + - **Built-in Codecs**: Providing a richer set of built-in codecs for common + framing techniques beyond simple length-prefixing, such as: - COBS (Consistent Overhead Byte Stuffing), which `postcard` already supports for its serialization output. - SLIP (Serial Line Internet Protocol) framing. diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 7a70c955..7e269f9f 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -131,10 +131,10 @@ layer, the `FragmentAdapter`. #### The `FragmentAdapter` and `FragmentStrategy` -This feature is designed for pluggability. The `FragmentAdapter` is a -`FrameProcessor` that sits between the raw I/O and the main router. It contains -the generic logic for splitting large outbound frames and re-assembling inbound -fragments. +This feature is designed for pluggability. The `FragmentAdapter` is a codec +(for example, `tokio_util::codec`) that sits between the raw I/O and the main +router. It contains the generic logic for splitting large outbound frames and +re-assembling inbound fragments. The protocol-specific rules—how to parse a fragment header, determine the payload length, and identify the final fragment—are provided by the user via a diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index bc753bcd..c2b3214f 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -20,14 +20,14 @@ public consumption. and message processing. This phase establishes the internal architecture upon which all public-facing features will be built.* -| Item | Name | Details | Size | Depends on | -| ---- | ------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | -| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | — | -| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | — | -| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; …)` loop that polls for shutdown signals, high-/low-priority pushes and the handler response stream in that strict order. | Large | #1.2 | -| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | — | -| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments, and the outbound logic for splitting a single large frame. | Large | #1.4 | -| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | +| Item | Name | Details | Size | Depends on | +| ---- | ------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | +| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | — | +| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | — | +| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; …)` loop that polls for shutdown signals, high-/low-priority pushes and the handler response stream in that strict order. | Large | #1.2 | +| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | — | +| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a codec (for example, via `tokio_util::codec`). Build the inbound reassembly logic for a single, non-multiplexed stream of fragments, and the outbound logic for splitting a single large frame. | Large | #1.4 | +| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | ## Phase 2: Public APIs & Developer Ergonomics diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index 731cde8d..34f0d981 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -1,4 +1,4 @@ -# Client Support in Wireframe +# Client support in Wireframe This document proposes an initial design for adding client-side protocol support to `wireframe`. The goal is to reuse the existing framing, @@ -14,7 +14,7 @@ logic. The design document outlines these layers, which process frames from raw bytes to typed messages and back[^1]. Reusing these pieces enables the implementation of a lightweight client without duplicating protocol code. -## Core Components +## Core components ### `WireframeClient` @@ -23,11 +23,15 @@ mirrors `WireframeServer` but operates in the opposite direction: - Connect to a `TcpStream`. - Optionally, send a preamble using the existing `Preamble` helpers. -- Encode outgoing messages using the selected `Serializer` and `FrameProcessor`. +- Encode outgoing messages using the selected `Serializer` and + `tokio_util::codec::LengthDelimitedCodec` (4‑byte big‑endian prefix by + default; configurable). Configure the codec’s `max_frame_length` on both the + inbound (decode) and outbound (encode) paths to match the server’s frame + capacity; otherwise, frames larger than the default 8 MiB will fail. - Decode incoming frames into typed responses. - Expose async `send` and `receive` operations. -### Builder Pattern +### Builder pattern A `WireframeClient::builder()` method configures the client: @@ -42,7 +46,7 @@ The same `Serializer` trait used by the server is reused here, ensuring messages are encoded consistently while framing is handled by the length‑delimited codec. -### Request/Response Helpers +### Request/response helpers To keep the API simple, `WireframeClient` offers a `call` method that sends a message implementing `Message` and waits for the next response frame: @@ -56,13 +60,13 @@ Internally, this uses the `Serializer` to encode the request, sends it through the length‑delimited codec, then waits for a frame, decodes it, and deserializes the response type. -### Connection Lifecycle +### Connection lifecycle Like the server, the client should expose hooks for connection setup and -teardown. These mirror the server’s lifecycle callbacks so both sides can share -initialization logic. +teardown. These mirror the server’s lifecycle callbacks, so both sides can +share initialization logic. -## Example Usage +## Example usage ```rust #[tokio::main] @@ -79,7 +83,7 @@ async fn main() -> std::io::Result<()> { } ``` -## Future Work +## Future work This initial design focuses on a basic request/response workflow. Future extensions might include: @@ -91,5 +95,6 @@ extensions might include: By leveraging the existing abstractions for framing and serialization, client support can share most of the server’s implementation while providing a small ergonomic API. -[^1]: See [wireframe router - design](rust-binary-router-library-design.md#implementation-details). + +[^1]: See + [wireframe router design](rust-binary-router-library-design.md#implementation-details). diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index d23b6222..654f39a9 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -7,15 +7,13 @@ use std::{io, sync::Arc}; use bytes::BytesMut; use tokio::io::{AsyncWriteExt, duplex}; -use wireframe::{ - app::Envelope, - frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}, - message::Message, - serializer::Serializer, -}; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use wireframe::{app::Envelope, frame::FrameMetadata, message::Message, serializer::Serializer}; type App = wireframe::app::WireframeApp; +const MAX_FRAME: usize = 64 * 1024; + /// Frame format with a two-byte id, one-byte flags, and bincode payload. #[derive(Default)] struct HeaderSerializer; @@ -51,11 +49,8 @@ impl FrameMetadata for HeaderSerializer { // ignores the flags, but a real protocol might parse and act on these // bits. let _ = src[2]; - let payload = src[3..].to_vec(); - // `parse` receives the complete frame because `LengthPrefixedProcessor` - // ensures `src` contains exactly one message. Returning `src.len()` is - // therefore correct for this demo. - Ok((Envelope::new(id, None, payload), src.len())) + // Only extract metadata here; defer payload handling to the serializer. + Ok((Envelope::new(id, None, Vec::new()), 3)) } } @@ -96,9 +91,12 @@ async fn main() -> io::Result<()> { frame.push(0); frame.extend_from_slice(&payload); - let mut bytes = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&frame, &mut bytes) + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) // 64 KiB cap for the example + .new_codec(); + let mut bytes = BytesMut::with_capacity(frame.len() + 4); // +4 for the length prefix + codec + .encode(frame.into(), &mut bytes) .expect("failed to encode frame"); client.write_all(&bytes).await?; diff --git a/src/app/builder.rs b/src/app/builder.rs index 3e18f327..4c3ab02d 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -13,10 +13,7 @@ use std::{ sync::Arc, }; -use tokio::{ - io, - sync::{OnceCell, mpsc}, -}; +use tokio::sync::{OnceCell, mpsc}; use super::{ envelope::{Envelope, Packet}, @@ -36,7 +33,7 @@ const MAX_READ_TIMEOUT_MS: u64 = 86_400_000; /// /// # Examples /// -/// ```no_run +/// ```rust,no_run /// use std::sync::Arc; /// /// use wireframe::app::ConnectionSetup; @@ -49,7 +46,7 @@ pub type ConnectionSetup = dyn Fn() -> Pin + Send> /// /// # Examples /// -/// ```no_run +/// ```rust,no_run /// use std::sync::Arc; /// /// use wireframe::app::ConnectionTeardown; @@ -74,14 +71,6 @@ pub struct WireframeApp< pub(super) handlers: HashMap>, pub(super) routes: OnceCell>>>, pub(super) middleware: Vec>>, - #[expect( - dead_code, - reason = "Deprecated: retained temporarily for API compatibility until codec-based \ - framing is fully removed" - )] - #[allow(unfulfilled_lint_expectations)] - pub(super) frame_processor: - Box, Error = io::Error> + Send + Sync>, pub(super) serializer: S, pub(super) app_data: HashMap>, pub(super) on_connect: Option>>, @@ -114,16 +103,13 @@ where C: Send + 'static, E: Packet, { - /// Initializes empty routes, middleware, and application data with a - /// placeholder frame processor and serializer, and no lifecycle hooks. + /// Initialises empty routes, middleware, and application data with the + /// default serializer and no lifecycle hooks. fn default() -> Self { Self { handlers: HashMap::new(), routes: OnceCell::new(), middleware: Vec::new(), - frame_processor: Box::new(crate::frame::LengthPrefixedProcessor::new( - crate::frame::LengthFormat::default(), - )), serializer: S::default(), app_data: HashMap::new(), on_connect: None, @@ -153,20 +139,9 @@ where /// /// ``` /// use wireframe::app::WireframeApp; - /// WireframeApp::<_, _, wireframe::app::Envelope>::new().expect("failed to initialize app"); + /// WireframeApp::<_, _, wireframe::app::Envelope>::new().expect("failed to initialise app"); /// ``` pub fn new() -> Result { Ok(Self::default()) } - - /// Construct a new application builder using a custom envelope type. - /// - /// Deprecated: call [`WireframeApp::new`] with explicit envelope type - /// parameters. - /// - /// # Errors - /// - /// Currently always succeeds. - #[deprecated(note = "use `WireframeApp::new()` instead")] - pub fn new_with_envelope() -> Result { Self::new() } } impl WireframeApp @@ -259,7 +234,6 @@ where handlers: self.handlers, routes: OnceCell::new(), middleware: self.middleware, - frame_processor: self.frame_processor, serializer: self.serializer, app_data: self.app_data, on_connect: Some(Arc::new(move || Box::pin(f()))), @@ -349,19 +323,6 @@ where .unwrap_or_default() } - /// Set the frame processor used for encoding and decoding frames. - #[deprecated(note = "framing is handled by the connection codec; this method will be removed")] - #[must_use] - pub fn frame_processor

(self, processor: P) -> Self - where - P: crate::frame::FrameProcessor, Error = io::Error> + Send + Sync + 'static, - { - WireframeApp { - frame_processor: Box::new(processor), - ..self - } - } - /// Replace the serializer used for messages. #[must_use] pub fn serializer(self, serializer: Ser) -> WireframeApp @@ -372,7 +333,6 @@ where handlers: self.handlers, routes: OnceCell::new(), middleware: self.middleware, - frame_processor: self.frame_processor, serializer, app_data: self.app_data, on_connect: self.on_connect, diff --git a/src/app/connection.rs b/src/app/connection.rs index 04947571..d2aa8a14 100644 --- a/src/app/connection.rs +++ b/src/app/connection.rs @@ -37,6 +37,14 @@ where C: Send + 'static, E: Packet, { + /// Construct a length-delimited codec capped by the application's buffer + /// capacity. + fn new_length_codec(&self) -> LengthDelimitedCodec { + LengthDelimitedCodec::builder() + .max_frame_length(self.buffer_capacity) + .new_codec() + } + /// Serialize `msg` and write it to `stream` using a length-delimited codec. /// /// # Errors @@ -55,8 +63,8 @@ where .serializer .serialize(msg) .map_err(SendError::Serialize)?; - let mut codec = LengthDelimitedCodec::builder().new_codec(); - let mut framed = BytesMut::new(); + let mut codec = self.new_length_codec(); + let mut framed = BytesMut::with_capacity(bytes.len() + 4); codec .encode(bytes.into(), &mut framed) .map_err(|e| SendError::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?; @@ -158,7 +166,7 @@ where where W: AsyncRead + AsyncWrite + Unpin, { - let codec = LengthDelimitedCodec::builder().new_codec(); + let codec = self.new_length_codec(); let mut framed = Framed::new(stream, codec); framed.read_buffer_mut().reserve(self.buffer_capacity); let mut deser_failures = 0u32; diff --git a/src/extractor.rs b/src/extractor.rs index fcd2f801..bfbd1a14 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -189,25 +189,6 @@ pub trait FromMessageRequest: Sized { #[derive(Clone)] pub struct SharedState(Arc); -impl SharedState { - /// Creates a new [`SharedState`] instance wrapping the provided `Arc`. - /// - /// # Examples - /// - /// ```rust,no_run - /// use std::sync::Arc; - /// - /// use wireframe::extractor::SharedState; - /// - /// let data = Arc::new(5u32); - /// let state: SharedState = Arc::clone(&data).into(); - /// assert_eq!(*state, 5); - /// ``` - #[must_use] - #[deprecated(since = "0.2.0", note = "construct via `inner.into()` instead")] - pub fn new(inner: Arc) -> Self { Self(inner) } -} - impl From> for SharedState { fn from(inner: Arc) -> Self { Self(inner) } } diff --git a/src/frame/format.rs b/src/frame/format.rs index b791569d..3b722d14 100644 --- a/src/frame/format.rs +++ b/src/frame/format.rs @@ -17,15 +17,45 @@ pub enum Endianness { /// Format of the length prefix preceding each frame. #[derive(Clone, Copy, Debug)] pub struct LengthFormat { - pub(crate) bytes: usize, - pub(crate) endianness: Endianness, + bytes: usize, + endianness: Endianness, } impl LengthFormat { /// Creates a new `LengthFormat` with the specified number of bytes and /// endianness for the length prefix. + /// + /// # Panics + /// + /// Panics if `bytes` is not in `1..=8`. #[must_use] - pub const fn new(bytes: usize, endianness: Endianness) -> Self { Self { bytes, endianness } } + pub const fn new(bytes: usize, endianness: Endianness) -> Self { + assert!(matches!(bytes, 1..=8), "invalid length-prefix width"); + Self { bytes, endianness } + } + + /// Fallible constructor validating the prefix width. + /// + /// # Errors + /// + /// Returns an error if `bytes` is not in `1..=8`. + pub fn try_new(bytes: usize, endianness: Endianness) -> io::Result { + if !(1..=8).contains(&bytes) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid length-prefix width", + )); + } + Ok(Self { bytes, endianness }) + } + + /// Returns the prefix width in bytes. + #[must_use] + pub const fn bytes(&self) -> usize { self.bytes } + + /// Returns the endianness used for the prefix. + #[must_use] + pub const fn endianness(&self) -> Endianness { self.endianness } /// Creates a `LengthFormat` for a 2-byte big-endian length prefix. #[must_use] @@ -43,7 +73,13 @@ impl LengthFormat { #[must_use] pub const fn u32_le() -> Self { Self::new(4, Endianness::Little) } - pub(crate) fn read_len(&self, bytes: &[u8]) -> io::Result { + /// Read a length prefix from `bytes` according to this format. + /// + /// # Errors + /// Returns an error if `bytes` are shorter than the prefix, if the + /// configured prefix width is not in `1..=8`, or if the encoded length + /// exceeds `usize`. + pub fn read_len(&self, bytes: &[u8]) -> io::Result { let len = bytes_to_u64(bytes, self.bytes, self.endianness)?; usize::try_from(len).map_err(|_| { io::Error::new( @@ -53,7 +89,11 @@ impl LengthFormat { }) } - pub(crate) fn write_len(&self, len: usize, dst: &mut BytesMut) -> io::Result<()> { + /// Write `len` to `dst` using this format's prefix encoding. + /// + /// # Errors + /// Returns an error if `len` cannot be represented by the prefix size. + pub fn write_len(&self, len: usize, dst: &mut BytesMut) -> io::Result<()> { let mut buf = [0u8; 8]; let written = u64_to_bytes(len, self.bytes, self.endianness, &mut buf)?; dst.extend_from_slice(&buf[..written]); diff --git a/src/frame/metadata.rs b/src/frame/metadata.rs new file mode 100644 index 00000000..690758be --- /dev/null +++ b/src/frame/metadata.rs @@ -0,0 +1,40 @@ +//! Trait for parsing frame metadata from a header without decoding the payload. + +/// Parse frame metadata from a byte slice without decoding the payload. +/// +/// # Examples +/// +/// ```rust,no_run +/// use std::io; +/// +/// use bytes::Bytes; +/// struct Demo; +/// impl FrameMetadata for Demo { +/// type Frame = (u32, Bytes); // (id, payload) +/// type Error = io::Error; +/// fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error> { +/// if src.len() < 4 { +/// return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "len")); +/// } +/// let id = u32::from_be_bytes([src[0], src[1], src[2], src[3]]); +/// Ok(((id, Bytes::copy_from_slice(&src[4..])), src.len())) +/// } +/// } +/// ``` +pub trait FrameMetadata { + /// Fully deserialised frame envelope type. + /// Prefer avoiding unnecessary allocations. Zero‑copy is only possible + /// when the caller supplies an owned, ref‑counted buffer (e.g., `Bytes`). + /// With the current `&[u8]` input, copying the payload is typically required. + type Frame; + + /// Error produced when parsing the metadata. + type Error: std::error::Error + Send + Sync + 'static; + + /// Parse frame metadata from `src`, returning the envelope and the number + /// of bytes consumed. + /// + /// # Errors + /// Returns an error if the bytes cannot be interpreted as valid metadata. + fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error>; +} diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 4e65738d..6f1bd73d 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,12 +1,13 @@ -//! Frame encoding utilities and length-prefixed processors. +//! Frame encoding utilities and length-prefix helpers built around +//! `tokio_util::codec::LengthDelimitedCodec` (4-byte big-endian by default). pub mod conversion; pub mod format; -pub mod processor; +pub mod metadata; pub use conversion::{bytes_to_u64, u64_to_bytes}; pub use format::{Endianness, LengthFormat}; -pub use processor::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}; +pub use metadata::FrameMetadata; #[cfg(test)] mod tests; diff --git a/src/frame/processor.rs b/src/frame/processor.rs deleted file mode 100644 index 1c02c204..00000000 --- a/src/frame/processor.rs +++ /dev/null @@ -1,84 +0,0 @@ -//! Frame processor implementations. - -use bytes::{Buf, BytesMut}; - -use super::{conversion::ERR_FRAME_TOO_LARGE, format::LengthFormat}; - -/// Trait defining how raw bytes are decoded into frames and encoded back. -pub trait FrameProcessor: Send + Sync { - /// Logical frame type extracted from the stream. - type Frame; - - /// Error type returned by `decode` and `encode`. - type Error: std::error::Error + Send + Sync + 'static; - - /// Attempt to decode the next frame from `src`. - /// - /// # Errors - /// Returns an error if the bytes in `src` cannot be parsed into a complete frame. - fn decode(&self, src: &mut BytesMut) -> Result, Self::Error>; - - /// Encode `frame` and append the bytes to `dst`. - /// - /// # Errors - /// Returns an error if the frame cannot be written to `dst`. - fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; -} - -/// Trait for parsing frame metadata from a header without decoding the payload. -pub trait FrameMetadata { - /// Fully deserialised frame type. - type Frame; - - /// Error produced when parsing the metadata. - type Error: std::error::Error + Send + Sync + 'static; - - /// Parse frame metadata from `src`, returning the frame and bytes consumed. - /// - /// # Errors - /// Returns an error if the bytes cannot be interpreted as valid metadata. - fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error>; -} - -/// Simple length-prefixed framing using a configurable length prefix. -#[derive(Clone, Copy, Debug)] -pub struct LengthPrefixedProcessor { - format: LengthFormat, -} - -impl LengthPrefixedProcessor { - /// Creates a new `LengthPrefixedProcessor` with the specified length prefix format. - #[must_use] - pub const fn new(format: LengthFormat) -> Self { Self { format } } -} - -impl Default for LengthPrefixedProcessor { - fn default() -> Self { Self::new(LengthFormat::default()) } -} - -impl FrameProcessor for LengthPrefixedProcessor { - type Frame = Vec; - type Error = std::io::Error; - - fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { - if src.len() < self.format.bytes { - return Ok(None); - } - let len = self.format.read_len(&src[..self.format.bytes])?; - let needed = self.format.bytes.checked_add(len).ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::InvalidInput, ERR_FRAME_TOO_LARGE) - })?; - if src.len() < needed { - return Ok(None); - } - src.advance(self.format.bytes); - Ok(Some(src.split_to(len).freeze().to_vec())) - } - - fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.reserve(self.format.bytes + frame.len()); - self.format.write_len(frame.len(), dst)?; - dst.extend_from_slice(frame); - Ok(()) - } -} diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 709a6e14..23182b42 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -12,12 +12,12 @@ use std::{ }; use bytes::BytesMut; +use tokio_util::codec::{Decoder, Encoder}; use wireframe::{ app::{Envelope, Packet, PacketParts}, - frame::FrameProcessor, serializer::{BincodeSerializer, Serializer}, }; -use wireframe_testing::{processor, run_app, run_with_duplex_server}; +use wireframe_testing::{TEST_MAX_FRAME, new_test_codec, run_app, run_with_duplex_server}; type App = wireframe::app::WireframeApp; type BasicApp = wireframe::app::WireframeApp; @@ -147,9 +147,10 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { let bytes = BincodeSerializer .serialize(&env) .expect("failed to serialise envelope"); - let mut frame = BytesMut::new(); - let proc = processor(); - proc.encode(&bytes, &mut frame) + let mut frame = BytesMut::with_capacity(bytes.len() + 4); + let mut codec = new_test_codec(TEST_MAX_FRAME); + codec + .encode(bytes.into(), &mut frame) .expect("encode should succeed"); let out = run_app(app, vec![frame.to_vec()], None) @@ -158,11 +159,11 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { assert!(!out.is_empty()); let mut buf = BytesMut::from(&out[..]); - let processor = processor(); - let decoded = processor + let decoded = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp, _) = BincodeSerializer .deserialize::(&decoded) .expect("deserialize failed"); diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index f823d467..29ec397b 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -5,12 +5,13 @@ use async_trait::async_trait; use bytes::BytesMut; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; +use tokio_util::codec::{Decoder, Encoder}; use wireframe::{ app::{Envelope, Handler}, - frame::{FrameProcessor, LengthPrefixedProcessor}, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, }; +use wireframe_testing::{TEST_MAX_FRAME, new_test_codec}; type TestApp = wireframe::app::WireframeApp; @@ -69,10 +70,12 @@ async fn middleware_applied_in_reverse_order() { let env = Envelope::new(1, Some(7), vec![b'X']); let serializer = BincodeSerializer; let bytes = serializer.serialize(&env).expect("serialization failed"); - // Use the default 4-byte big-endian length prefix for framing - let processor = LengthPrefixedProcessor::default(); - let mut buf = BytesMut::new(); - processor.encode(&bytes, &mut buf).expect("encoding failed"); + // Use a length-delimited codec for framing + let mut codec = new_test_codec(TEST_MAX_FRAME); + let mut buf = BytesMut::with_capacity(bytes.len() + 4); + codec + .encode(bytes.into(), &mut buf) + .expect("encoding failed"); client.write_all(&buf).await.expect("write failed"); client.shutdown().await.expect("shutdown failed"); @@ -83,10 +86,11 @@ async fn middleware_applied_in_reverse_order() { handle.await.expect("join failed"); let mut buf = BytesMut::from(&out[..]); - let frame = processor + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after frame"); let (resp, _) = serializer .deserialize::(&frame) .expect("deserialize failed"); diff --git a/tests/response.rs b/tests/response.rs index 41d484c4..a4dac65a 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -3,18 +3,26 @@ //! These verify normal encoding as well as error conditions like //! write failures and encode errors. +use std::sync::Arc; + use bytes::BytesMut; use rstest::rstest; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ - app::{Envelope, WireframeApp}, - frame::{Endianness, FrameProcessor, LengthFormat, LengthPrefixedProcessor}, + Serializer, + app::{Envelope, Packet, WireframeApp}, + frame::{Endianness, LengthFormat}, message::Message, serializer::BincodeSerializer, }; +use wireframe_testing::{TEST_MAX_FRAME, new_test_codec, run_app}; mod common; use common::TestApp; +// Larger cap used for oversized frame tests. +const LARGE_FRAME: usize = 16 * 1024 * 1024; + #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestResp(u32); @@ -38,6 +46,9 @@ impl<'de> bincode::BorrowDecode<'de, ()> for FailingResp { } } +#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] +struct Large(Vec); + /// Tests that sending a response serializes and frames the data correctly, /// and that the response can be decoded and deserialized back to its original value asynchronously. #[tokio::test] @@ -49,14 +60,15 @@ async fn send_response_encodes_and_frames() { .await .expect("send_response failed"); - let processor = LengthPrefixedProcessor::default(); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&out[..]); - let frame = processor + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); let (decoded, _) = TestResp::from_bytes(&frame).expect("deserialize failed"); assert_eq!(decoded, TestResp(7)); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); } /// Tests that decoding with an incomplete length prefix header returns `None` and does not consume @@ -65,23 +77,24 @@ async fn send_response_encodes_and_frames() { /// This ensures that the decoder waits for the full header before attempting to decode a frame. #[tokio::test] async fn length_prefixed_decode_requires_complete_header() { - let processor = LengthPrefixedProcessor::default(); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes - assert!(processor.decode(&mut buf).expect("decode failed").is_none()); + assert!(codec.decode(&mut buf).expect("decode failed").is_none()); assert_eq!(buf.len(), 3); // nothing consumed } /// Tests that decoding with a complete length prefix but incomplete frame data returns `None` -/// and retains all bytes in the buffer. +/// and consumes only the 4-byte length prefix. /// -/// Ensures that the decoder does not consume any bytes when the full frame is not yet available. +/// Confirms that the decoder leaves the incomplete body in the buffer until the full frame arrives. #[tokio::test] async fn length_prefixed_decode_requires_full_frame() { - let processor = LengthPrefixedProcessor::default(); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); - assert!(processor.decode(&mut buf).expect("decode failed").is_none()); - // buffer should retain bytes since frame isn't complete - assert_eq!(buf.len(), 6); + assert!(codec.decode(&mut buf).expect("decode failed").is_none()); + // LengthDelimitedCodec consumes the length prefix even if the frame + // remains incomplete. + assert_eq!(buf.len(), 2); } struct FailingWriter; @@ -118,15 +131,24 @@ fn custom_length_roundtrip( #[case] frame: Vec, #[case] prefix: Vec, ) { - let processor = LengthPrefixedProcessor::new(fmt); - let mut buf = BytesMut::new(); - processor.encode(&frame, &mut buf).expect("encode failed"); + let mut builder = LengthDelimitedCodec::builder(); + builder.length_field_length(fmt.bytes()); + if fmt.endianness() == Endianness::Little { + builder.little_endian(); + } + builder.max_frame_length(TEST_MAX_FRAME); + let mut codec = builder.new_codec(); + let mut buf = BytesMut::with_capacity(frame.len() + prefix.len()); + codec + .encode(frame.clone().into(), &mut buf) + .expect("encode failed"); assert_eq!(&buf[..prefix.len()], &prefix[..]); - let decoded = processor + let decoded = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); assert_eq!(decoded, frame); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); } #[tokio::test] @@ -143,29 +165,17 @@ async fn send_response_propagates_write_error() { #[rstest] #[case(0, Endianness::Big)] -#[case(3, Endianness::Big)] -#[case(5, Endianness::Little)] +#[case(9, Endianness::Little)] fn encode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: Endianness) { - let fmt = LengthFormat::new(bytes, endian); - let processor = LengthPrefixedProcessor::new(fmt); - let mut buf = BytesMut::new(); - let err = processor - .encode(&vec![1, 2], &mut buf) - .expect_err("encode must fail for unsupported prefix size"); + let err = LengthFormat::try_new(bytes, endian).expect_err("invalid width should error"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } #[rstest] #[case(0, Endianness::Little)] -#[case(3, Endianness::Little)] -#[case(5, Endianness::Big)] +#[case(9, Endianness::Big)] fn decode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: Endianness) { - let fmt = LengthFormat::new(bytes, endian); - let processor = LengthPrefixedProcessor::new(fmt); - let mut buf = BytesMut::from(vec![0u8; bytes].as_slice()); - let err = processor - .decode(&mut buf) - .expect_err("decode must fail for unsupported prefix size"); + let err = LengthFormat::try_new(bytes, endian).expect_err("invalid width should error"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -173,11 +183,10 @@ fn decode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: En #[case(LengthFormat::new(1, Endianness::Big), 256)] #[case(LengthFormat::new(2, Endianness::Little), 65_536)] fn encode_fails_for_length_too_large(#[case] fmt: LengthFormat, #[case] len: usize) { - let processor = LengthPrefixedProcessor::new(fmt); let frame = vec![0u8; len]; let mut buf = BytesMut::new(); - let err = processor - .encode(&frame, &mut buf) + let err = fmt + .write_len(frame.len(), &mut buf) .expect_err("expected error"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -188,7 +197,7 @@ fn encode_fails_for_length_too_large(#[case] fmt: LengthFormat, #[case] len: usi /// error is of the `Serialize` variant, indicating a failure during response encoding. #[tokio::test] async fn send_response_returns_encode_error() { - // Intentionally do not set a frame processor: encode should fail before framing. + // Use a type that fails during serialization; encode should fail before any framing. let app = WireframeApp::::new().expect("failed to create app"); let err = app .send_response(&mut Vec::new(), &FailingResp) @@ -196,3 +205,65 @@ async fn send_response_returns_encode_error() { .expect_err("send_response should fail when encode errors"); assert!(matches!(err, wireframe::app::SendError::Serialize(_))); } + +/// Ensures `send_response` permits frames up to the configured buffer capacity, +/// exceeding the codec's default 8 MiB limit. +#[tokio::test] +async fn send_response_honours_buffer_capacity() { + let app = TestApp::new() + .expect("failed to create app") + .buffer_capacity(LARGE_FRAME); + + let payload = vec![0_u8; 9 * 1024 * 1024]; + let large = Large(payload.clone()); + let mut out = Vec::new(); + + app.send_response(&mut out, &large) + .await + .expect("send_response failed"); + + let mut codec = new_test_codec(LARGE_FRAME); + let mut buf = BytesMut::from(&out[..]); + let frame = codec + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (decoded, _) = Large::from_bytes(&frame).expect("deserialize failed"); + assert_eq!(decoded.0.len(), payload.len()); +} + +/// Verifies inbound and outbound codecs respect the application's buffer +/// capacity by round-tripping a 9 MiB payload. +#[tokio::test] +async fn process_stream_honours_buffer_capacity() { + let app = TestApp::new() + .expect("failed to create app") + .buffer_capacity(LARGE_FRAME) + .route(1, Arc::new(|_: &Envelope| Box::pin(async {}))) + .expect("route registration failed"); + + let payload = vec![0_u8; 9 * 1024 * 1024]; + let env = Envelope::new(1, None, payload.clone()); + let bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); + + let mut codec = new_test_codec(LARGE_FRAME); + let mut framed = BytesMut::with_capacity(bytes.len() + 4); + codec + .encode(bytes.into(), &mut framed) + .expect("encode frame failed"); + + let out = run_app(app, vec![framed.to_vec()], Some(10 * 1024 * 1024)) + .await + .expect("run_app failed"); + + let mut buf = BytesMut::from(&out[..]); + let frame = codec + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (resp_env, _) = BincodeSerializer + .deserialize::(&frame) + .expect("deserialize failed"); + let resp_len = resp_env.into_parts().payload().len(); + assert_eq!(resp_len, payload.len()); +} diff --git a/tests/routes.rs b/tests/routes.rs index aafc793b..c725bebe 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -9,14 +9,14 @@ use std::sync::{ use bytes::BytesMut; use rstest::rstest; +use tokio_util::codec::{Decoder, Encoder}; use wireframe::{ Serializer, app::{Packet, PacketParts}, - frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::BincodeSerializer, }; -use wireframe_testing::{drive_with_bincode, drive_with_frames}; +use wireframe_testing::{TEST_MAX_FRAME, drive_with_bincode, drive_with_frames, new_test_codec}; type TestApp = wireframe::app::WireframeApp; @@ -82,11 +82,13 @@ async fn handler_receives_message_and_echoes_response() { .await .expect("drive_with_bincode failed"); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor::default() + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); @@ -114,11 +116,13 @@ async fn handler_echoes_with_none_correlation_id() { }; let out = drive_with_bincode(app, env).await.expect("drive failed"); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor::default() + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("missing frame"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); @@ -138,31 +142,31 @@ async fn multiple_frames_processed_in_sequence() { ) .expect("route registration failed"); - let frames: Vec> = (1u8..=2) - .map(|id| { - let msg_bytes = Echo(id).to_bytes().expect("encode failed"); - let env = TestEnvelope { - id: 1, - correlation_id: Some(u64::from(id)), - payload: msg_bytes, - }; - let env_bytes = BincodeSerializer - .serialize(&env) - .expect("serialization failed"); - let mut framed = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&env_bytes, &mut framed) - .expect("encode failed"); - framed.to_vec() - }) - .collect(); - - let out = drive_with_frames(app, frames) + let mut codec = new_test_codec(TEST_MAX_FRAME); + let mut encoded_frames = Vec::new(); + for id in 1u8..=2 { + let msg_bytes = Echo(id).to_bytes().expect("encode failed"); + let env = TestEnvelope { + id: 1, + correlation_id: Some(u64::from(id)), + payload: msg_bytes, + }; + let env_bytes = BincodeSerializer + .serialize(&env) + .expect("serialization failed"); + let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); + codec + .encode(env_bytes.into(), &mut framed) + .expect("encode failed"); + encoded_frames.push(framed.to_vec()); + } + + let out = drive_with_frames(app, encoded_frames) .await .expect("drive_with_frames failed"); let mut buf = BytesMut::from(&out[..]); - let first = LengthPrefixedProcessor::default() + let first = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); @@ -170,10 +174,11 @@ async fn multiple_frames_processed_in_sequence() { .deserialize::(&first) .expect("deserialize failed"); let (echo1, _) = Echo::from_bytes(&env1.payload).expect("decode echo failed"); - let second = LengthPrefixedProcessor::default() + let second = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after two frames"); let (env2, _) = BincodeSerializer .deserialize::(&second) .expect("deserialize failed"); @@ -206,19 +211,21 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { }; let env_bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); - let mut framed = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&env_bytes, &mut framed) + let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); + let mut codec = new_test_codec(TEST_MAX_FRAME); + codec + .encode(env_bytes.into(), &mut framed) .expect("encode failed"); let out = drive_with_frames(app, vec![framed.to_vec()]) .await .expect("drive failed"); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor::default() + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); diff --git a/wireframe_testing/Cargo.toml b/wireframe_testing/Cargo.toml index 9b3af948..66516013 100644 --- a/wireframe_testing/Cargo.toml +++ b/wireframe_testing/Cargo.toml @@ -13,3 +13,4 @@ logtest = "2" log = "0.4" metrics-util = "0.20" futures = "0.3" +tokio-util = { version = "0.7", features = ["codec"] } diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 08f232a2..16c69497 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -7,20 +7,14 @@ use std::io; use bincode::config; use bytes::BytesMut; -use rstest::fixture; use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; use wireframe::{ app::{Envelope, Packet, WireframeApp}, - frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}, + frame::FrameMetadata, serializer::Serializer, }; -/// Create a default length-prefixed frame processor for tests. -#[fixture] -#[expect(unused_braces, reason = "Braces are intentional here; false positive")] -#[allow(unfulfilled_lint_expectations)] -pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() } - pub trait TestSerializer: Serializer + FrameMetadata + Send + Sync + 'static { @@ -98,6 +92,15 @@ where const DEFAULT_CAPACITY: usize = 4096; const MAX_CAPACITY: usize = 1024 * 1024 * 10; // 10MB limit pub(crate) const EMPTY_SERVER_CAPACITY: usize = 64; +/// Shared frame cap used by helpers and tests to avoid drift. +pub const TEST_MAX_FRAME: usize = DEFAULT_CAPACITY; + +#[inline] +pub fn new_test_codec(max_len: usize) -> LengthDelimitedCodec { + let mut builder = LengthDelimitedCodec::builder(); + builder.max_frame_length(max_len); + builder.new_codec() +} macro_rules! forward_default { ( @@ -353,8 +356,9 @@ where format!("bincode encode failed: {e}"), ) })?; - let mut framed = BytesMut::new(); - LengthPrefixedProcessor::default().encode(&bytes, &mut framed)?; + let mut codec = new_test_codec(DEFAULT_CAPACITY); + let mut framed = BytesMut::with_capacity(bytes.len() + 4); + codec.encode(bytes.into(), &mut framed)?; drive_with_frame(app, framed.to_vec()).await } @@ -424,6 +428,31 @@ where Ok(buf) } +#[cfg(test)] +mod tests { + use wireframe::app::WireframeApp; + + use super::*; + + #[tokio::test] + async fn run_app_rejects_zero_capacity() { + let app = WireframeApp::new().expect("failed to create app"); + let err = run_app(app, vec![], Some(0)) + .await + .expect_err("capacity of zero should error"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + } + + #[tokio::test] + async fn run_app_rejects_excess_capacity() { + let app = WireframeApp::new().expect("failed to create app"); + let err = run_app(app, vec![], Some(MAX_CAPACITY + 1)) + .await + .expect_err("capacity beyond max should error"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + } +} + /// Run `app` against an empty duplex stream. /// /// This helper drives the connection lifecycle without sending any frames, diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index b3913181..4a118539 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -22,6 +22,7 @@ pub mod helpers; pub mod logging; pub use helpers::{ + TEST_MAX_FRAME, TestSerializer, drive_with_bincode, drive_with_frame, @@ -30,7 +31,7 @@ pub use helpers::{ drive_with_frames, drive_with_frames_mut, drive_with_frames_with_capacity, - processor, + new_test_codec, run_app, run_with_duplex_server, };