From 3cd28dfbdeacbacaf86cbab0cf006230dd4af4fc Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 29 Aug 2025 09:55:32 +0100 Subject: [PATCH 01/10] Add frame metadata module --- Cargo.lock | 1 + README.md | 11 +- .../asynchronous-outbound-messaging-design.md | 8 +- docs/efficiency-report.md | 143 ------------------ docs/frame-metadata.md | 20 ++- ...ge-fragmentation-and-re-assembly-design.md | 12 +- docs/rust-binary-router-library-design.md | 86 ++++------- ...-set-philosophy-and-capability-maturity.md | 7 +- ...eframe-1-0-detailed-development-roadmap.md | 2 +- docs/wireframe-client-design.md | 3 +- examples/metadata_routing.rs | 15 +- src/app/builder.rs | 46 +----- src/frame/format.rs | 17 ++- src/frame/metadata.rs | 16 ++ src/frame/mod.rs | 6 +- src/frame/processor.rs | 84 ---------- tests/lifecycle.rs | 12 +- tests/middleware_order.rs | 12 +- tests/response.rs | 49 +++--- tests/routes.rs | 26 ++-- wireframe_testing/Cargo.toml | 1 + wireframe_testing/src/helpers.rs | 13 +- wireframe_testing/src/lib.rs | 1 - 23 files changed, 171 insertions(+), 420 deletions(-) delete mode 100644 docs/efficiency-report.md create mode 100644 src/frame/metadata.rs delete mode 100644 src/frame/processor.rs diff --git a/Cargo.lock b/Cargo.lock index 8e1f294a..603d21de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2987,6 +2987,7 @@ dependencies = [ "metrics-util", "rstest 0.18.2", "tokio", + "tokio-util", "wireframe", ] diff --git a/README.md b/README.md index 92dcc5c6..d3561986 100644 --- a/README.md +++ b/README.md @@ -129,13 +129,12 @@ This allows integration with existing packet formats without modifying ## Response Serialization and Framing Handlers can return types implementing the `Responder` trait. These values are -encoded using the application's configured serializer and written back through -the `FrameProcessor`【F:docs/rust-binary-router-library-design.md†L724-L730】. +encoded using the application's configured serializer and framed by a +length‑delimited codec【F:docs/rust-binary-router-library-design.md†L724-L730】. -The included `LengthPrefixedProcessor` illustrates a simple framing strategy -that prefixes each frame with its length. The format is configurable (prefix -size and endianness) and defaults to a 4‑byte big‑endian length -prefix【F:docs/rust-binary-router-library-design.md†L1082-L1123】. +Frames are length prefixed using `tokio_util::codec::LengthDelimitedCodec`. The +prefix length and byte order are configurable and default to a 4‑byte +big‑endian header【F:docs/rust-binary-router-library-design.md†L1082-L1123】. ```rust let app = WireframeApp::new()?; diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 6ad8bd67..7b32d429 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -707,10 +707,10 @@ features of the 1.0 release. that urgent pushes can interrupt a long-running data stream. - **Message Fragmentation:** Pushes occur at the *logical frame* level. The - `FragmentAdapter` will operate at a lower layer in the `FrameProcessor` - stack, transparently splitting any large pushed frames before they are - written to the socket. The `PushHandle` and the application code that uses it - remain completely unaware of fragmentation. + `FragmentAdapter` will operate at a lower layer in the codec stack, + transparently splitting any large pushed frames before they are written to + the socket. The `PushHandle` and the application code that uses it remain + completely unaware of fragmentation. ## 7. Use Cases diff --git a/docs/efficiency-report.md b/docs/efficiency-report.md deleted file mode 100644 index ed004504..00000000 --- a/docs/efficiency-report.md +++ /dev/null @@ -1,143 +0,0 @@ -# Wireframe Efficiency Improvement Report - -## Executive Summary - -This report documents efficiency improvement opportunities identified in the wireframe Rust library codebase. The analysis focused on memory allocations, unnecessary clones, and performance bottlenecks in the frame processing pipeline and connection handling. - -## Key Findings - -### 1. Frame Processor Unnecessary Allocation (HIGH IMPACT) - -**Location**: `src/frame/processor.rs:75` -**Issue**: The `LengthPrefixedProcessor::decode` method performs an unnecessary allocation by calling `.to_vec()` on a `BytesMut` returned from `split_to()`. - -```rust -// Current inefficient code: -Ok(Some(src.split_to(len).to_vec())) -``` - -**Impact**: This allocation occurs for every frame processed, creating performance overhead in high-throughput scenarios. - -**Recommendation**: Use `freeze().to_vec()` or explore changing the frame type to work directly with `Bytes` to avoid the conversion entirely. - -**Status**: ✅ FIXED - Optimized to use `freeze().to_vec()` which is more efficient. - -### 2. Connection Actor Clone Operations (MEDIUM IMPACT) - -**Location**: `src/connection.rs:195, 252` -**Issue**: Multiple `clone()` operations on `CancellationToken` and other types in the connection actor. - -```rust -pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() } -() = Self::await_shutdown(self.shutdown.clone()), if state.is_active() => Event::Shutdown, -``` - -**Impact**: Moderate - these clones are necessary for the async select pattern but could be optimized in some cases. - -**Recommendation**: Review if some clones can be avoided through better lifetime management. - -### 3. Middleware Chain Building (MEDIUM IMPACT) - -**Location**: `src/app.rs:599` -**Issue**: Handler cloning during middleware chain construction. - -```rust -let mut service = HandlerService::new(id, handler.clone()); -``` - -**Impact**: Moderate - occurs during application setup, not in hot path. - -**Recommendation**: Consider using `Arc` references more efficiently. - -### 4. Session Registry Operations (LOW-MEDIUM IMPACT) - -**Location**: `src/session.rs:47-55` - -**Issue**: `Vec::with_capacity` followed by potential reallocation during `retain_and_collect`. - -```rust -let mut out = Vec::with_capacity(self.0.len()); -``` - -**Impact**: Low to medium - depends on registry size and pruning frequency. - -**Recommendation**: Consider more efficient collection strategies for large registries. - -### 5. Vector Initializations (LOW IMPACT) - -**Location**: Various files - -**Issue**: Some `Vec::new()` calls that could use `with_capacity` when size is known. - -**Impact**: Low - minor allocation optimizations. - -**Recommendation**: Use `with_capacity` when the expected size is known. - -## Performance Characteristics - -### Frame Processing Pipeline - -- **Bottleneck**: Frame decode/encode operations in high-throughput scenarios -- **Critical Path**: `LengthPrefixedProcessor::decode` method -- **Optimization Priority**: High - affects every incoming frame - -### Connection Handling - -- **Bottleneck**: Connection actor event loop and fairness tracking -- **Critical Path**: `tokio::select!` in connection actor -- **Optimization Priority**: Medium - affects per-connection performance - -### Message Routing - -- **Bottleneck**: HashMap lookups for route resolution -- **Critical Path**: Route handler lookup in `WireframeApp` -- **Optimization Priority**: Low - HashMap lookups are already efficient - -## Implemented Optimizations - -### Frame Processor Optimization -**Change**: Modified `LengthPrefixedProcessor::decode` to use `freeze().to_vec()` instead of direct `.to_vec()`. - -**Before**: -```rust -Ok(Some(src.split_to(len).to_vec())) -``` - -**After**: -```rust -Ok(Some(src.split_to(len).freeze().to_vec())) -``` - -**Benefits**: -- Reduces memory allocations in the frame processing hot path -- Maintains API compatibility with existing code -- Improves performance for high-throughput scenarios -- No breaking changes to the public API - -## Future Optimization Opportunities - -1. **Frame Type Optimization**: Consider changing the frame type from `Vec` to `Bytes` to eliminate the final `.to_vec()` call entirely. - -2. **Connection Actor Pooling**: Implement connection actor pooling to reduce setup/teardown overhead. - -3. **Middleware Chain Caching**: Cache built middleware chains to avoid reconstruction. - -4. **Session Registry Batching**: Implement batched operations for session registry updates. - -5. **Zero-Copy Serialization**: Explore zero-copy serialization patterns where possible. - -## Testing and Validation - -All optimizations have been tested to ensure: - -- ✅ Compilation succeeds with `cargo check` -- ✅ No new clippy warnings introduced -- ✅ Existing test suite passes -- ✅ API compatibility maintained -- ✅ Performance improvement verified - -## Conclusion - -The implemented frame processor optimization provides immediate performance benefits for the most critical code path in the wireframe library. The additional opportunities identified in this report provide a roadmap for future performance improvements, prioritized by impact and implementation complexity. - -The changes maintain full backward compatibility while improving performance characteristics, making them safe to deploy in production environments. diff --git a/docs/frame-metadata.md b/docs/frame-metadata.md index 557096dc..b0f8b491 100644 --- a/docs/frame-metadata.md +++ b/docs/frame-metadata.md @@ -9,21 +9,29 @@ header bytes. Only the minimal header portion should be read, returning the full frame and the number of bytes consumed from the input. ```rust -use wireframe::frame::{FrameMetadata, FrameProcessor}; +use wireframe::frame::FrameMetadata; use wireframe::app::Envelope; -use bytes::BytesMut; +use tokio_util::codec::{Decoder, Encoder}; +use bytes::{Bytes, BytesMut}; struct MyCodec; -impl FrameProcessor for MyCodec { - type Frame = Vec; +impl Decoder for MyCodec { + type Item = Bytes; type Error = std::io::Error; - fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { + fn decode( + &mut self, + _src: &mut BytesMut, + ) -> Result, Self::Error> { todo!() } +} + +impl Encoder for MyCodec { + type Error = std::io::Error; - fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, _item: Bytes, _dst: &mut BytesMut) -> Result<(), Self::Error> { todo!() } } diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index f173b120..683e7e1f 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -38,9 +38,9 @@ The implementation must satisfy the following core requirements: ## 3. Core Architecture: The `FragmentAdapter` -The feature will be implemented as a `FrameProcessor` middleware called -`FragmentAdapter`. It is instantiated with a protocol-specific -`FragmentStrategy` and wraps any subsequent processors in the chain. +The feature will be implemented as a codec middleware called `FragmentAdapter`. +It is instantiated with a protocol-specific `FragmentStrategy` and wraps any +subsequent codecs in the chain. ``` Socket I/O ↔ ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers @@ -156,7 +156,7 @@ pub trait FragmentStrategy: 'static + Send + Sync { ### 4.2 Configuration Developers will enable fragmentation by adding the `FragmentAdapter` to their -`FrameProcessor` chain via the `WireframeApp` builder. +codec chain via the `WireframeApp` builder. ```Rust // Example: Configuring a server for MySQL-style fragmentation. @@ -226,7 +226,7 @@ The outbound path is simpler and purely procedural. `frame.len()` against `strategy.max_fragment_payload(&frame)`. 2. **No Fragmentation:** If the frame is small enough, it is passed directly to - the next `FrameProcessor` in the chain. + the next codec in the chain. 3. **Fragmentation:** If the frame is too large: @@ -241,7 +241,7 @@ The outbound path is simpler and purely procedural. temporary buffer, followed by the payload chunk. - Each fully formed fragment is then passed individually to the next - `FrameProcessor`. + codec. ## 6. Synergy with Other 1.0 Features diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index 34debb68..a22352b9 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -397,36 +397,23 @@ needs, without requiring modifications to other parts of the system. ### 4.3. Frame Definition and Processing -To handle "arbitrary frame-based protocols," "wireframe" must provide a -flexible way to define and process frames. - -- `FrameProcessor` **(or Tokio** `Decoder`**/**`Encoder` **integration)**: The - core of frame handling will revolve around a user-implementable trait, - tentatively named `FrameProcessor`. Alternatively, and perhaps preferably for - ecosystem compatibility, "wireframe" could directly leverage Tokio's - `tokio_util::codec::{Decoder, Encoder}` traits. Users would implement these - traits to define: - - - **Decoding**: How a raw byte stream from the network is parsed into discrete - frames. This logic would handle issues like partial reads and buffering, - accumulating bytes until one or more complete frames can be extracted. - Examples include length-prefixed framing (where a header indicates the - payload size, similar to `message-io`'s `FramedTcp` 17), delimiter-based - framing (where frames are separated by a specific byte sequence), or - fixed-size framing. - - **Encoding**: How an outgoing message (or its serialized byte payload) is - encapsulated into a frame for transmission, including adding any necessary - headers, length prefixes, or delimiters. - - "wireframe" could provide common `FrameProcessor` implementations (e.g., for - length-prefixed frames) as part of its standard library, simplifying setup - for common protocol types. The library ships with a - `LengthPrefixedProcessor`. It accepts a `LengthFormat` specifying the prefix - size and byte order—for example, `LengthFormat::u16_le()` or - `LengthFormat::u32_be()`. Applications configure it via - `WireframeApp::frame_processor(LengthPrefixedProcessor::new(format))`. The - `FrameProcessor` trait remains public, so custom implementations can be - supplied when required. +To handle "arbitrary frame-based protocols," "wireframe" uses Tokio's +`tokio_util::codec::{Decoder, Encoder}` traits to parse and construct frames. +Implementations of these traits define: + +- **Decoding**: How a raw byte stream from the network is parsed into discrete + frames. This logic handles partial reads and buffering, accumulating bytes + until one or more complete frames can be extracted. Examples include + length-prefixed framing (where a header indicates the payload size, similar + to `message-io`'s `FramedTcp` 17), delimiter-based framing (where frames are + separated by a specific byte sequence), or fixed-size framing. +- **Encoding**: How an outgoing message (or its serialized byte payload) is + encapsulated into a frame for transmission, including adding any necessary + headers, length prefixes, or delimiters. + +"wireframe" provides a default length‑delimited codec built on +`LengthDelimitedCodec`. The prefix format is configured via `LengthFormat`, +allowing applications to select the prefix size and byte order. - **Optional** `FrameMetadata` **Trait**: For protocols where routing decisions or pre-handler middleware logic might depend on information in a frame header @@ -704,7 +691,7 @@ component to run it. ```rust use wireframe::{WireframeApp, WireframeServer, Message, error::Result}; - use my_protocol::{LoginRequest, LoginResponse, ChatMessage, AppState, MyFrameProcessor, MessageType}; + use my_protocol::{LoginRequest, LoginResponse, ChatMessage, AppState, MyCodec, MessageType}; use std::sync::Arc; use tokio::sync::Mutex; @@ -744,9 +731,6 @@ The WireframeApp builder would offer methods like: - WireframeApp::new(): Creates a new application builder. -- `[deprecated]` `.frame_processor(impl FrameProcessor)`: framing is now - handled by the connection codec. - - .route(message_id, handler_function): Explicitly maps a message identifier to a handler. @@ -806,8 +790,8 @@ messages and optionally producing responses. (analogous to Actix Web's `Responder` trait 4). This trait defines how the returned value is serialized and sent back to the client. When a handler yields such a value, `wireframe` encodes it using the application’s - configured serializer and passes the resulting bytes to the - `FrameProcessor` for transmission back to the peer. + configured serializer and passes the resulting bytes to the codec for + transmission back to the peer. - `Result`: For explicit error handling. If `Ok(response_message)`, the message is sent. If `Err(error_value)`, the error is processed by "wireframe's" error handling mechanism (see Section @@ -1047,8 +1031,7 @@ let logging = from_fn(|req, next| async move { times for handlers, or active connection counts. - **Frame/Message Transformation**: On-the-fly modification of frames or messages, such as encryption/decryption or compression/decompression. - (Though complex transformations might be better suited to the - `FrameProcessor` layer). + (Though complex transformations might be better suited to the codec layer). - **Request/Response Manipulation**: Modifying message content before it reaches a handler or before a response is sent. - **Connection Lifecycle Hooks**: Performing actions when connections are @@ -1177,7 +1160,7 @@ how "wireframe" aims to reduce source code complexity. decode and encode): ```rust -// Crate: my_frame_processor.rs +// Crate: my_codec.rs use bytes::{BytesMut, Buf, BufMut}; use tokio_util::codec::{Decoder, Encoder}; use byteorder::{BigEndian, ByteOrder}; @@ -1227,7 +1210,7 @@ impl> Encoder for LengthPrefixedCodec { ``` (Note: "wireframe" would abstract the direct use of `Encoder`/`Decoder` behind -its own `FrameProcessor` trait or provide helpers.) +its own codec trait or provide helpers.) - **Server Setup and Handler**: @@ -1242,7 +1225,7 @@ its own `FrameProcessor` trait or provide helpers.) serializer::BincodeSerializer, }; use my_protocol_messages::{EchoRequest, EchoResponse}; - use my_frame_processor::LengthPrefixedCodec; // Or wireframe's abstraction + use my_codec::LengthPrefixedCodec; // Or wireframe's abstraction use std::time::{SystemTime, UNIX_EPOCH}; // Define a message ID enum if not using type-based routing directly @@ -1270,7 +1253,6 @@ its own `FrameProcessor` trait or provide helpers.) WireframeServer::new(|| { WireframeApp::new() - //.frame_processor(LengthPrefixedCodec) // deprecated: framing handled by codec .serializer(BincodeSerializer) // Specify serializer .route(MyMessageType::Echo, handle_echo) // Route based on ID // OR if type-based routing is supported and EchoRequest has an ID: @@ -1382,7 +1364,6 @@ simplify server implementation. })); WireframeServer::new(move || { WireframeApp::new() - //.frame_processor(...) // deprecated: framing handled by codec .serializer(BincodeSerializer) .app_data(chat_state.clone()) .route(ChatMessageType::ClientJoin, handle_join) @@ -1485,14 +1466,14 @@ choices aim to mitigate them. connection-specific logic. This frees the developer from writing significant networking boilerplate. -- Trait-Based Framing (FrameProcessor): +- Trait-Based Framing (codec): - By allowing users to define or select a FrameProcessor implementation, - "wireframe" separates the logic of how bytes are grouped into frames from the - rest of the application. This means users can plug in different framing - strategies (length-prefixed, delimiter-based, etc.) without altering their - message definitions or handler logic. The library itself can provide common - framing implementations. + By allowing users to define or select a codec implementation, "wireframe" + separates the logic of how bytes are grouped into frames from the rest of the + application. This means users can plug in different framing strategies + (length-prefixed, delimiter-based, etc.) without altering their message + definitions or handler logic. The library itself can provide common codec + implementations. These abstractions collectively contribute to code that is not only less verbose but also more readable, maintainable, and testable. By reducing @@ -1532,9 +1513,8 @@ applicability. - **Advanced Framing Options**: - - **Built-in Codecs**: Providing a richer set of built-in `FrameProcessor` - implementations for common framing techniques beyond simple - length-prefixing, such as: + - **Built-in Codecs**: Providing a richer set of built-in codecs for common + framing techniques beyond simple length-prefixing, such as: - COBS (Consistent Overhead Byte Stuffing), which `postcard` already supports for its serialization output. - SLIP (Serial Line Internet Protocol) framing. diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 7a70c955..0f115690 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -131,10 +131,9 @@ layer, the `FragmentAdapter`. #### The `FragmentAdapter` and `FragmentStrategy` -This feature is designed for pluggability. The `FragmentAdapter` is a -`FrameProcessor` that sits between the raw I/O and the main router. It contains -the generic logic for splitting large outbound frames and re-assembling inbound -fragments. +This feature is designed for pluggability. The `FragmentAdapter` is a codec +that sits between the raw I/O and the main router. It contains the generic +logic for splitting large outbound frames and re-assembling inbound fragments. The protocol-specific rules—how to parse a fragment header, determine the payload length, and identify the final fragment—are provided by the user via a diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index bc753bcd..b231765e 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -26,7 +26,7 @@ which all public-facing features will be built.* | 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | — | | 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; …)` loop that polls for shutdown signals, high-/low-priority pushes and the handler response stream in that strict order. | Large | #1.2 | | 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | — | -| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments, and the outbound logic for splitting a single large frame. | Large | #1.4 | +| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a codec. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments, and the outbound logic for splitting a single large frame. | Large | #1.4 | | 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | ## Phase 2: Public APIs & Developer Ergonomics diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index 731cde8d..e41675c7 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -23,7 +23,8 @@ mirrors `WireframeServer` but operates in the opposite direction: - Connect to a `TcpStream`. - Optionally, send a preamble using the existing `Preamble` helpers. -- Encode outgoing messages using the selected `Serializer` and `FrameProcessor`. +- Encode outgoing messages using the selected `Serializer` and a + length‑delimited codec. - Decode incoming frames into typed responses. - Expose async `send` and `receive` operations. diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index d23b6222..f9a4a68d 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -7,12 +7,8 @@ use std::{io, sync::Arc}; use bytes::BytesMut; use tokio::io::{AsyncWriteExt, duplex}; -use wireframe::{ - app::Envelope, - frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}, - message::Message, - serializer::Serializer, -}; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; +use wireframe::{app::Envelope, frame::FrameMetadata, message::Message, serializer::Serializer}; type App = wireframe::app::WireframeApp; @@ -52,7 +48,7 @@ impl FrameMetadata for HeaderSerializer { // bits. let _ = src[2]; let payload = src[3..].to_vec(); - // `parse` receives the complete frame because `LengthPrefixedProcessor` + // `parse` receives the complete frame because the length-delimited codec // ensures `src` contains exactly one message. Returning `src.len()` is // therefore correct for this demo. Ok((Envelope::new(id, None, payload), src.len())) @@ -96,9 +92,10 @@ async fn main() -> io::Result<()> { frame.push(0); frame.extend_from_slice(&payload); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut bytes = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&frame, &mut bytes) + codec + .encode(frame.into(), &mut bytes) .expect("failed to encode frame"); client.write_all(&bytes).await?; diff --git a/src/app/builder.rs b/src/app/builder.rs index 3e18f327..71a81999 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -13,10 +13,7 @@ use std::{ sync::Arc, }; -use tokio::{ - io, - sync::{OnceCell, mpsc}, -}; +use tokio::sync::{OnceCell, mpsc}; use super::{ envelope::{Envelope, Packet}, @@ -74,14 +71,6 @@ pub struct WireframeApp< pub(super) handlers: HashMap>, pub(super) routes: OnceCell>>>, pub(super) middleware: Vec>>, - #[expect( - dead_code, - reason = "Deprecated: retained temporarily for API compatibility until codec-based \ - framing is fully removed" - )] - #[allow(unfulfilled_lint_expectations)] - pub(super) frame_processor: - Box, Error = io::Error> + Send + Sync>, pub(super) serializer: S, pub(super) app_data: HashMap>, pub(super) on_connect: Option>>, @@ -114,16 +103,13 @@ where C: Send + 'static, E: Packet, { - /// Initializes empty routes, middleware, and application data with a - /// placeholder frame processor and serializer, and no lifecycle hooks. + /// Initializes empty routes, middleware, and application data with the + /// default serializer and no lifecycle hooks. fn default() -> Self { Self { handlers: HashMap::new(), routes: OnceCell::new(), middleware: Vec::new(), - frame_processor: Box::new(crate::frame::LengthPrefixedProcessor::new( - crate::frame::LengthFormat::default(), - )), serializer: S::default(), app_data: HashMap::new(), on_connect: None, @@ -156,17 +142,6 @@ where /// WireframeApp::<_, _, wireframe::app::Envelope>::new().expect("failed to initialize app"); /// ``` pub fn new() -> Result { Ok(Self::default()) } - - /// Construct a new application builder using a custom envelope type. - /// - /// Deprecated: call [`WireframeApp::new`] with explicit envelope type - /// parameters. - /// - /// # Errors - /// - /// Currently always succeeds. - #[deprecated(note = "use `WireframeApp::new()` instead")] - pub fn new_with_envelope() -> Result { Self::new() } } impl WireframeApp @@ -259,7 +234,6 @@ where handlers: self.handlers, routes: OnceCell::new(), middleware: self.middleware, - frame_processor: self.frame_processor, serializer: self.serializer, app_data: self.app_data, on_connect: Some(Arc::new(move || Box::pin(f()))), @@ -349,19 +323,6 @@ where .unwrap_or_default() } - /// Set the frame processor used for encoding and decoding frames. - #[deprecated(note = "framing is handled by the connection codec; this method will be removed")] - #[must_use] - pub fn frame_processor

(self, processor: P) -> Self - where - P: crate::frame::FrameProcessor, Error = io::Error> + Send + Sync + 'static, - { - WireframeApp { - frame_processor: Box::new(processor), - ..self - } - } - /// Replace the serializer used for messages. #[must_use] pub fn serializer(self, serializer: Ser) -> WireframeApp @@ -372,7 +333,6 @@ where handlers: self.handlers, routes: OnceCell::new(), middleware: self.middleware, - frame_processor: self.frame_processor, serializer, app_data: self.app_data, on_connect: self.on_connect, diff --git a/src/frame/format.rs b/src/frame/format.rs index b791569d..6d1da226 100644 --- a/src/frame/format.rs +++ b/src/frame/format.rs @@ -17,8 +17,8 @@ pub enum Endianness { /// Format of the length prefix preceding each frame. #[derive(Clone, Copy, Debug)] pub struct LengthFormat { - pub(crate) bytes: usize, - pub(crate) endianness: Endianness, + pub bytes: usize, + pub endianness: Endianness, } impl LengthFormat { @@ -43,7 +43,12 @@ impl LengthFormat { #[must_use] pub const fn u32_le() -> Self { Self::new(4, Endianness::Little) } - pub(crate) fn read_len(&self, bytes: &[u8]) -> io::Result { + /// Read a length prefix from `bytes` according to this format. + /// + /// # Errors + /// Returns an error if `bytes` are shorter than the prefix or if the + /// encoded length exceeds `usize`. + pub fn read_len(&self, bytes: &[u8]) -> io::Result { let len = bytes_to_u64(bytes, self.bytes, self.endianness)?; usize::try_from(len).map_err(|_| { io::Error::new( @@ -53,7 +58,11 @@ impl LengthFormat { }) } - pub(crate) fn write_len(&self, len: usize, dst: &mut BytesMut) -> io::Result<()> { + /// Write `len` to `dst` using this format's prefix encoding. + /// + /// # Errors + /// Returns an error if `len` cannot be represented by the prefix size. + pub fn write_len(&self, len: usize, dst: &mut BytesMut) -> io::Result<()> { let mut buf = [0u8; 8]; let written = u64_to_bytes(len, self.bytes, self.endianness, &mut buf)?; dst.extend_from_slice(&buf[..written]); diff --git a/src/frame/metadata.rs b/src/frame/metadata.rs new file mode 100644 index 00000000..993aef22 --- /dev/null +++ b/src/frame/metadata.rs @@ -0,0 +1,16 @@ +//! Trait for parsing frame metadata from a header without decoding the payload. + +/// Parse frame metadata from a byte slice. +pub trait FrameMetadata { + /// Fully deserialised frame type. + type Frame; + + /// Error produced when parsing the metadata. + type Error: std::error::Error + Send + Sync + 'static; + + /// Parse frame metadata from `src`, returning the frame and bytes consumed. + /// + /// # Errors + /// Returns an error if the bytes cannot be interpreted as valid metadata. + fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error>; +} diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 4e65738d..42a1fe0c 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,12 +1,12 @@ -//! Frame encoding utilities and length-prefixed processors. +//! Frame encoding utilities and length-prefix helpers. pub mod conversion; pub mod format; -pub mod processor; +pub mod metadata; pub use conversion::{bytes_to_u64, u64_to_bytes}; pub use format::{Endianness, LengthFormat}; -pub use processor::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}; +pub use metadata::FrameMetadata; #[cfg(test)] mod tests; diff --git a/src/frame/processor.rs b/src/frame/processor.rs deleted file mode 100644 index 1c02c204..00000000 --- a/src/frame/processor.rs +++ /dev/null @@ -1,84 +0,0 @@ -//! Frame processor implementations. - -use bytes::{Buf, BytesMut}; - -use super::{conversion::ERR_FRAME_TOO_LARGE, format::LengthFormat}; - -/// Trait defining how raw bytes are decoded into frames and encoded back. -pub trait FrameProcessor: Send + Sync { - /// Logical frame type extracted from the stream. - type Frame; - - /// Error type returned by `decode` and `encode`. - type Error: std::error::Error + Send + Sync + 'static; - - /// Attempt to decode the next frame from `src`. - /// - /// # Errors - /// Returns an error if the bytes in `src` cannot be parsed into a complete frame. - fn decode(&self, src: &mut BytesMut) -> Result, Self::Error>; - - /// Encode `frame` and append the bytes to `dst`. - /// - /// # Errors - /// Returns an error if the frame cannot be written to `dst`. - fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error>; -} - -/// Trait for parsing frame metadata from a header without decoding the payload. -pub trait FrameMetadata { - /// Fully deserialised frame type. - type Frame; - - /// Error produced when parsing the metadata. - type Error: std::error::Error + Send + Sync + 'static; - - /// Parse frame metadata from `src`, returning the frame and bytes consumed. - /// - /// # Errors - /// Returns an error if the bytes cannot be interpreted as valid metadata. - fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error>; -} - -/// Simple length-prefixed framing using a configurable length prefix. -#[derive(Clone, Copy, Debug)] -pub struct LengthPrefixedProcessor { - format: LengthFormat, -} - -impl LengthPrefixedProcessor { - /// Creates a new `LengthPrefixedProcessor` with the specified length prefix format. - #[must_use] - pub const fn new(format: LengthFormat) -> Self { Self { format } } -} - -impl Default for LengthPrefixedProcessor { - fn default() -> Self { Self::new(LengthFormat::default()) } -} - -impl FrameProcessor for LengthPrefixedProcessor { - type Frame = Vec; - type Error = std::io::Error; - - fn decode(&self, src: &mut BytesMut) -> Result, Self::Error> { - if src.len() < self.format.bytes { - return Ok(None); - } - let len = self.format.read_len(&src[..self.format.bytes])?; - let needed = self.format.bytes.checked_add(len).ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::InvalidInput, ERR_FRAME_TOO_LARGE) - })?; - if src.len() < needed { - return Ok(None); - } - src.advance(self.format.bytes); - Ok(Some(src.split_to(len).freeze().to_vec())) - } - - fn encode(&self, frame: &Self::Frame, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.reserve(self.format.bytes + frame.len()); - self.format.write_len(frame.len(), dst)?; - dst.extend_from_slice(frame); - Ok(()) - } -} diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 709a6e14..ff39517e 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -12,12 +12,12 @@ use std::{ }; use bytes::BytesMut; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ app::{Envelope, Packet, PacketParts}, - frame::FrameProcessor, serializer::{BincodeSerializer, Serializer}, }; -use wireframe_testing::{processor, run_app, run_with_duplex_server}; +use wireframe_testing::{run_app, run_with_duplex_server}; type App = wireframe::app::WireframeApp; type BasicApp = wireframe::app::WireframeApp; @@ -148,8 +148,9 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { .serialize(&env) .expect("failed to serialise envelope"); let mut frame = BytesMut::new(); - let proc = processor(); - proc.encode(&bytes, &mut frame) + let mut codec = LengthDelimitedCodec::builder().new_codec(); + codec + .encode(bytes.into(), &mut frame) .expect("encode should succeed"); let out = run_app(app, vec![frame.to_vec()], None) @@ -158,8 +159,7 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { assert!(!out.is_empty()); let mut buf = BytesMut::from(&out[..]); - let processor = processor(); - let decoded = processor + let decoded = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index f823d467..8a8e135f 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -5,9 +5,9 @@ use async_trait::async_trait; use bytes::BytesMut; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ app::{Envelope, Handler}, - frame::{FrameProcessor, LengthPrefixedProcessor}, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, }; @@ -69,10 +69,12 @@ async fn middleware_applied_in_reverse_order() { let env = Envelope::new(1, Some(7), vec![b'X']); let serializer = BincodeSerializer; let bytes = serializer.serialize(&env).expect("serialization failed"); - // Use the default 4-byte big-endian length prefix for framing - let processor = LengthPrefixedProcessor::default(); + // Use a length-delimited codec for framing + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::new(); - processor.encode(&bytes, &mut buf).expect("encoding failed"); + codec + .encode(bytes.into(), &mut buf) + .expect("encoding failed"); client.write_all(&buf).await.expect("write failed"); client.shutdown().await.expect("shutdown failed"); @@ -83,7 +85,7 @@ async fn middleware_applied_in_reverse_order() { handle.await.expect("join failed"); let mut buf = BytesMut::from(&out[..]); - let frame = processor + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); diff --git a/tests/response.rs b/tests/response.rs index 41d484c4..9c6b8666 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -5,9 +5,10 @@ use bytes::BytesMut; use rstest::rstest; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ app::{Envelope, WireframeApp}, - frame::{Endianness, FrameProcessor, LengthFormat, LengthPrefixedProcessor}, + frame::{Endianness, LengthFormat}, message::Message, serializer::BincodeSerializer, }; @@ -49,9 +50,9 @@ async fn send_response_encodes_and_frames() { .await .expect("send_response failed"); - let processor = LengthPrefixedProcessor::default(); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); - let frame = processor + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); @@ -65,9 +66,9 @@ async fn send_response_encodes_and_frames() { /// This ensures that the decoder waits for the full header before attempting to decode a frame. #[tokio::test] async fn length_prefixed_decode_requires_complete_header() { - let processor = LengthPrefixedProcessor::default(); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes - assert!(processor.decode(&mut buf).expect("decode failed").is_none()); + assert!(codec.decode(&mut buf).expect("decode failed").is_none()); assert_eq!(buf.len(), 3); // nothing consumed } @@ -77,11 +78,11 @@ async fn length_prefixed_decode_requires_complete_header() { /// Ensures that the decoder does not consume any bytes when the full frame is not yet available. #[tokio::test] async fn length_prefixed_decode_requires_full_frame() { - let processor = LengthPrefixedProcessor::default(); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); - assert!(processor.decode(&mut buf).expect("decode failed").is_none()); - // buffer should retain bytes since frame isn't complete - assert_eq!(buf.len(), 6); + assert!(codec.decode(&mut buf).expect("decode failed").is_none()); + // the length prefix is consumed even if the frame is incomplete + assert_eq!(buf.len(), 2); } struct FailingWriter; @@ -118,11 +119,18 @@ fn custom_length_roundtrip( #[case] frame: Vec, #[case] prefix: Vec, ) { - let processor = LengthPrefixedProcessor::new(fmt); + let mut builder = LengthDelimitedCodec::builder(); + builder.length_field_length(fmt.bytes); + if fmt.endianness == Endianness::Little { + builder.little_endian(); + } + let mut codec = builder.new_codec(); let mut buf = BytesMut::new(); - processor.encode(&frame, &mut buf).expect("encode failed"); + codec + .encode(frame.clone().into(), &mut buf) + .expect("encode failed"); assert_eq!(&buf[..prefix.len()], &prefix[..]); - let decoded = processor + let decoded = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); @@ -147,10 +155,9 @@ async fn send_response_propagates_write_error() { #[case(5, Endianness::Little)] fn encode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: Endianness) { let fmt = LengthFormat::new(bytes, endian); - let processor = LengthPrefixedProcessor::new(fmt); let mut buf = BytesMut::new(); - let err = processor - .encode(&vec![1, 2], &mut buf) + let err = fmt + .write_len(2, &mut buf) .expect_err("encode must fail for unsupported prefix size"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -161,10 +168,9 @@ fn encode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: En #[case(5, Endianness::Big)] fn decode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: Endianness) { let fmt = LengthFormat::new(bytes, endian); - let processor = LengthPrefixedProcessor::new(fmt); - let mut buf = BytesMut::from(vec![0u8; bytes].as_slice()); - let err = processor - .decode(&mut buf) + let buf = vec![0u8; bytes]; + let err = fmt + .read_len(&buf) .expect_err("decode must fail for unsupported prefix size"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -173,11 +179,10 @@ fn decode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: En #[case(LengthFormat::new(1, Endianness::Big), 256)] #[case(LengthFormat::new(2, Endianness::Little), 65_536)] fn encode_fails_for_length_too_large(#[case] fmt: LengthFormat, #[case] len: usize) { - let processor = LengthPrefixedProcessor::new(fmt); let frame = vec![0u8; len]; let mut buf = BytesMut::new(); - let err = processor - .encode(&frame, &mut buf) + let err = fmt + .write_len(frame.len(), &mut buf) .expect_err("expected error"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } diff --git a/tests/routes.rs b/tests/routes.rs index aafc793b..c3e2ed7c 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -9,10 +9,10 @@ use std::sync::{ use bytes::BytesMut; use rstest::rstest; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ Serializer, app::{Packet, PacketParts}, - frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::BincodeSerializer, }; @@ -82,8 +82,9 @@ async fn handler_receives_message_and_echoes_response() { .await .expect("drive_with_bincode failed"); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor::default() + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); @@ -114,8 +115,9 @@ async fn handler_echoes_with_none_correlation_id() { }; let out = drive_with_bincode(app, env).await.expect("drive failed"); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor::default() + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("missing frame"); @@ -150,8 +152,9 @@ async fn multiple_frames_processed_in_sequence() { .serialize(&env) .expect("serialization failed"); let mut framed = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&env_bytes, &mut framed) + let mut codec = LengthDelimitedCodec::builder().new_codec(); + codec + .encode(env_bytes.into(), &mut framed) .expect("encode failed"); framed.to_vec() }) @@ -161,8 +164,9 @@ async fn multiple_frames_processed_in_sequence() { .await .expect("drive_with_frames failed"); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); - let first = LengthPrefixedProcessor::default() + let first = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); @@ -170,7 +174,7 @@ async fn multiple_frames_processed_in_sequence() { .deserialize::(&first) .expect("deserialize failed"); let (echo1, _) = Echo::from_bytes(&env1.payload).expect("decode echo failed"); - let second = LengthPrefixedProcessor::default() + let second = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); @@ -207,15 +211,17 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { let env_bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); let mut framed = BytesMut::new(); - LengthPrefixedProcessor::default() - .encode(&env_bytes, &mut framed) + let mut codec = LengthDelimitedCodec::builder().new_codec(); + codec + .encode(env_bytes.into(), &mut framed) .expect("encode failed"); let out = drive_with_frames(app, vec![framed.to_vec()]) .await .expect("drive failed"); + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor::default() + let frame = codec .decode(&mut buf) .expect("decode failed") .expect("missing"); diff --git a/wireframe_testing/Cargo.toml b/wireframe_testing/Cargo.toml index 9b3af948..66516013 100644 --- a/wireframe_testing/Cargo.toml +++ b/wireframe_testing/Cargo.toml @@ -13,3 +13,4 @@ logtest = "2" log = "0.4" metrics-util = "0.20" futures = "0.3" +tokio-util = { version = "0.7", features = ["codec"] } diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 08f232a2..182f4799 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -7,20 +7,14 @@ use std::io; use bincode::config; use bytes::BytesMut; -use rstest::fixture; use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; +use tokio_util::codec::{Encoder, LengthDelimitedCodec}; use wireframe::{ app::{Envelope, Packet, WireframeApp}, - frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor}, + frame::FrameMetadata, serializer::Serializer, }; -/// Create a default length-prefixed frame processor for tests. -#[fixture] -#[expect(unused_braces, reason = "Braces are intentional here; false positive")] -#[allow(unfulfilled_lint_expectations)] -pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() } - pub trait TestSerializer: Serializer + FrameMetadata + Send + Sync + 'static { @@ -353,8 +347,9 @@ where format!("bincode encode failed: {e}"), ) })?; + let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut framed = BytesMut::new(); - LengthPrefixedProcessor::default().encode(&bytes, &mut framed)?; + codec.encode(bytes.into(), &mut framed)?; drive_with_frame(app, framed.to_vec()).await } diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index b3913181..591d295f 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -30,7 +30,6 @@ pub use helpers::{ drive_with_frames, drive_with_frames_mut, drive_with_frames_with_capacity, - processor, run_app, run_with_duplex_server, }; From ed0f73c34a0cf4f033da5e22ea94ce81613bf32f Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 29 Aug 2025 18:54:09 +0100 Subject: [PATCH 02/10] Align codec limits with buffer capacity --- src/app/connection.rs | 8 +++- tests/response.rs | 75 +++++++++++++++++++++++++++++++- wireframe_testing/src/helpers.rs | 4 +- 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/app/connection.rs b/src/app/connection.rs index 04947571..e47f0d0b 100644 --- a/src/app/connection.rs +++ b/src/app/connection.rs @@ -55,7 +55,9 @@ where .serializer .serialize(msg) .map_err(SendError::Serialize)?; - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(self.buffer_capacity) + .new_codec(); let mut framed = BytesMut::new(); codec .encode(bytes.into(), &mut framed) @@ -158,7 +160,9 @@ where where W: AsyncRead + AsyncWrite + Unpin, { - let codec = LengthDelimitedCodec::builder().new_codec(); + let codec = LengthDelimitedCodec::builder() + .max_frame_length(self.buffer_capacity) + .new_codec(); let mut framed = Framed::new(stream, codec); framed.read_buffer_mut().reserve(self.buffer_capacity); let mut deser_failures = 0u32; diff --git a/tests/response.rs b/tests/response.rs index 9c6b8666..a379a665 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -3,15 +3,19 @@ //! These verify normal encoding as well as error conditions like //! write failures and encode errors. +use std::sync::Arc; + use bytes::BytesMut; use rstest::rstest; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use wireframe::{ - app::{Envelope, WireframeApp}, + Serializer, + app::{Envelope, Packet, WireframeApp}, frame::{Endianness, LengthFormat}, message::Message, serializer::BincodeSerializer, }; +use wireframe_testing::run_app; mod common; use common::TestApp; @@ -39,6 +43,9 @@ impl<'de> bincode::BorrowDecode<'de, ()> for FailingResp { } } +#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] +struct Large(Vec); + /// Tests that sending a response serializes and frames the data correctly, /// and that the response can be decoded and deserialized back to its original value asynchronously. #[tokio::test] @@ -201,3 +208,69 @@ async fn send_response_returns_encode_error() { .expect_err("send_response should fail when encode errors"); assert!(matches!(err, wireframe::app::SendError::Serialize(_))); } + +/// Ensures `send_response` permits frames up to the configured buffer capacity, +/// exceeding the codec's default 8 MiB limit. +#[tokio::test] +async fn send_response_honours_buffer_capacity() { + let app = TestApp::new() + .expect("failed to create app") + .buffer_capacity(16 * 1024 * 1024); + + let payload = vec![0_u8; 9 * 1024 * 1024]; + let large = Large(payload.clone()); + let mut out = Vec::new(); + + app.send_response(&mut out, &large) + .await + .expect("send_response failed"); + + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(16 * 1024 * 1024) + .new_codec(); + let mut buf = BytesMut::from(&out[..]); + let frame = codec + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (decoded, _) = Large::from_bytes(&frame).expect("deserialize failed"); + assert_eq!(decoded.0.len(), payload.len()); +} + +/// Verifies inbound and outbound codecs respect the application's buffer +/// capacity by round-tripping a 9 MiB payload. +#[tokio::test] +async fn process_stream_honours_buffer_capacity() { + let app = TestApp::new() + .expect("failed to create app") + .buffer_capacity(16 * 1024 * 1024) + .route(1, Arc::new(|_: &Envelope| Box::pin(async {}))) + .expect("route registration failed"); + + let payload = vec![0_u8; 9 * 1024 * 1024]; + let env = Envelope::new(1, None, payload.clone()); + let bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); + + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(16 * 1024 * 1024) + .new_codec(); + let mut framed = BytesMut::new(); + codec + .encode(bytes.into(), &mut framed) + .expect("encode frame failed"); + + let out = run_app(app, vec![framed.to_vec()], Some(10 * 1024 * 1024)) + .await + .expect("run_app failed"); + + let mut buf = BytesMut::from(&out[..]); + let frame = codec + .decode(&mut buf) + .expect("decode failed") + .expect("frame missing"); + let (resp_env, _) = BincodeSerializer + .deserialize::(&frame) + .expect("deserialize failed"); + let resp_len = resp_env.into_parts().payload().len(); + assert_eq!(resp_len, payload.len()); +} diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 182f4799..7b8cf1ff 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -347,7 +347,9 @@ where format!("bincode encode failed: {e}"), ) })?; - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(bytes.len()) + .new_codec(); let mut framed = BytesMut::new(); codec.encode(bytes.into(), &mut framed)?; drive_with_frame(app, framed.to_vec()).await From 0cc0425b339b27f93369a2073c3a028d6465cfd3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 30 Aug 2025 00:40:53 +0100 Subject: [PATCH 03/10] Address review feedback --- .../asynchronous-outbound-messaging-design.md | 39 +++++++++-- docs/frame-metadata.md | 2 +- ...ge-fragmentation-and-re-assembly-design.md | 70 ++++++++++++------- ...i-packet-and-streaming-responses-design.md | 16 +++-- docs/rust-binary-router-library-design.md | 36 ++++++---- docs/wireframe-client-design.md | 14 ++-- src/app/builder.rs | 4 +- src/extractor.rs | 19 ----- src/frame/format.rs | 29 +++++++- tests/response.rs | 21 ++---- tests/routes.rs | 43 ++++++------ wireframe_testing/src/helpers.rs | 25 +++++++ 12 files changed, 202 insertions(+), 116 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 7b32d429..b30a271d 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -296,7 +296,7 @@ struct PushHandleInner { // The public, cloneable handle. #[derive(Clone)] -pub struct PushHandle(Arc>); + pub struct PushHandle(Arc>); pub enum PushPolicy { ReturnErrorIfFull, @@ -317,9 +317,40 @@ impl PushHandle { frame: F, priority: PushPriority, policy: PushPolicy, - ) -> Result<(), PushError>; -} -``` + ) -> Result<(), PushError>; + } + ``` + + The example below demonstrates pushing frames and returning a streamed + response. The [`async-stream`](https://docs.rs/async-stream) crate is the + canonical way to build dynamic `Response::Stream` values. + + ```rust,no_run + use async_stream::try_stream; + use std::sync::Arc; + use wireframe::{app::{Envelope, WireframeApp}, Response}; + + #[tokio::main] + async fn main() -> std::io::Result<()> { + let app = WireframeApp::new()? + .route(1, Arc::new(|_: &Envelope| Box::pin(async { + Response::Stream(Box::pin(try_stream! { + yield b"ack".to_vec(); + })) + })))?; + + let (push, mut conn) = wireframe_testing::connect(app).await?; + tokio::spawn(async move { + push.push_high_priority(b"urgent".to_vec()).await.unwrap(); + push.push_low_priority(b"stats".to_vec()).await.unwrap(); + }); + + while let Some(frame) = conn.next().await { + println!("{:?}", frame); + } + Ok(()) + } + ``` ```mermaid classDiagram diff --git a/docs/frame-metadata.md b/docs/frame-metadata.md index b0f8b491..624252c0 100644 --- a/docs/frame-metadata.md +++ b/docs/frame-metadata.md @@ -1,4 +1,4 @@ -# Parsing Frame Metadata +# Parsing frame metadata `FrameMetadata` allows a protocol to inspect frame headers before decoding the entire payload. This can be useful for routing decisions when the message type diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 683e7e1f..68670cbb 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -1,12 +1,13 @@ -# Comprehensive Design: Generic Message Fragmentation & Re-assembly +# Comprehensive design: Generic message fragmentation & re‑assembly -## 1. Introduction & Philosophy +## 1. Introduction and philosophy -Many robust network protocols, from modern RPC systems to legacy financial -standards, require the ability to split a single logical message into multiple -physical frames. This can be necessary to respect MTU limits, handle large data -payloads, or align with encryption block sizes. `wireframe`'s current model, -which processes one inbound frame to one logical frame, cannot handle this. +Many robust network protocols, from modern Remote Procedure Call (RPC) systems +to legacy financial standards, require the ability to split a single logical +message into multiple physical frames. This can be necessary to respect Maximum +Transmission Unit (MTU) limits, handle large data payloads, or align with +encryption block sizes. `wireframe`'s current model, which processes one +inbound frame to one logical frame, cannot handle this. This document details the design for a generic, protocol-agnostic fragmentation and re-assembly layer. The core philosophy is to treat this as a **transparent @@ -20,7 +21,7 @@ This feature is a critical component of the "Road to Wireframe 1.0," designed to seamlessly integrate with and underpin the streaming and server-push capabilities. -## 2. Design Goals & Requirements +## 2. Design goals and requirements The implementation must satisfy the following core requirements: @@ -31,32 +32,32 @@ The implementation must satisfy the following core requirements: | G1 | Transparent inbound re-assembly → The router and handlers must always receive one complete, logical Frame. | | G2 | Transparent outbound fragmentation when a payload exceeds a configurable, protocol-specific size. | | G3 | Pluggable Strategy: The logic for parsing and building fragment headers, detecting the final fragment, and managing sequence numbers must be supplied by the protocol implementation, not hard-coded in the framework. | -| G4 | DoS Protection: The re-assembly process must be hardened against resource exhaustion attacks via strict memory and time limits. | +| G4 | Denial‑of‑service (DoS) protection: The re-assembly process must be hardened against resource exhaustion attacks via strict memory and time limits. | | G5 | Zero Friction: Protocols that do not use fragmentation must incur no performance or complexity overhead. This feature must be strictly opt-in. | -## 3. Core Architecture: The `FragmentAdapter` +## 3. Core architecture: the `FragmentAdapter` The feature will be implemented as a codec middleware called `FragmentAdapter`. It is instantiated with a protocol-specific `FragmentStrategy` and wraps any subsequent codecs in the chain. -``` -Socket I/O ↔ ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers +```plaintext +Socket I/O ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers ``` This layered approach ensures that fragmentation is handled on clear-text, uncompressed data, as required by most protocol specifications. -### 3.1 State Management for Multiplexing +### 3.1 State management for multiplexing A critical requirement for modern protocols is the ability to handle interleaved fragments from different logical messages on the same connection. To support this, the `FragmentAdapter` will not maintain a single re-assembly state, but a map of concurrent re-assembly processes. -```Rust +```rust use dashmap::DashMap; use std::sync::atomic::AtomicU64; use std::time::{Duration, Instant}; @@ -88,7 +89,7 @@ The use of `dashmap::DashMap` allows for lock-free reads and sharded writes, providing efficient and concurrent access to the re-assembly buffers without blocking the entire connection task. -## 4. Public API: The `FragmentStrategy` Trait +## 4. Public API: the `FragmentStrategy` trait The power and flexibility of this feature come from the `FragmentStrategy` trait. Protocol implementers will provide a type that implements this trait to @@ -99,7 +100,7 @@ inject their specific fragmentation rules into the generic `FragmentAdapter`. The trait is designed to be context-aware and expressive, allowing it to model a wide range of protocols. -```Rust +```rust use bytes::BytesMut; use std::io; @@ -158,17 +159,38 @@ pub trait FragmentStrategy: 'static + Send + Sync { Developers will enable fragmentation by adding the `FragmentAdapter` to their codec chain via the `WireframeApp` builder. -```Rust -// Example: Configuring a server for MySQL-style fragmentation. +```rust +// Pseudo‑API: enable fragmentation with a strategy on the codec stack. WireframeServer::new(|| { WireframeApp::new() - .route(...) + .codec(LengthDelimitedCodec::builder().new_codec()) + .codec(FragmentAdapter::new(MySqlStrategy::new())) + .route(/* ... */) }) ``` +```rust,no_run +use async_stream::try_stream; +use wireframe::{app::WireframeApp, Response}; + +async fn fragmented() -> Response> { + // `FragmentAdapter` splits oversized payloads automatically. + Response::Vec(vec![vec![0_u8; 128 * 1024]]) +} + +async fn streamed() -> Response> { + Response::Stream(Box::pin(try_stream! { + yield vec![1, 2, 3]; + })) +} +``` + +`async-stream` is the canonical crate for constructing dynamic +`Response::Stream` values. + ## 5. Implementation Logic -### 5.1 Inbound Path (Re-assembly) +### 5.1 Inbound path (re‑assembly) The re-assembly logic is the most complex part of the feature and must be robust against errors and attacks. @@ -210,7 +232,7 @@ robust against errors and attacks. - **Final Fragment:** If `meta.is_final` is true, the full payload is extracted from the `PartialMessage`, the entry is removed from the map, - and the complete logical frame is passed down the processor chain. + and the complete logical frame is passed down the codec chain. 4. **Timeout Handling:** A separate, low-priority background task within the `FragmentAdapter` will periodically iterate over the `reassembly_buffers`, @@ -218,7 +240,7 @@ robust against errors and attacks. has exceeded `reassembly_timeout` is removed, and a `WARN`-level `tracing` event is emitted. -### 5.2 Outbound Path (Fragmentation) +### 5.2 Outbound path (fragmentation) The outbound path is simpler and purely procedural. @@ -243,7 +265,7 @@ The outbound path is simpler and purely procedural. - Each fully formed fragment is then passed individually to the next codec. -## 6. Synergy with Other 1.0 Features +## 6. Synergy with other 1.0 features This feature is designed as a foundational layer that other features build upon. @@ -257,7 +279,7 @@ This feature is designed as a foundational layer that other features build upon. fragmented before transmission. The application code remains blissfully unaware of the underlying network constraints. -## 7. Measurable Objectives & Success Criteria +## 7. Measurable objectives and success criteria diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index a9c55766..86a7f3de 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -73,7 +73,7 @@ The public API is designed for clarity, performance, and ergonomic flexibility. The `Response` enum is the primary return type for all handlers. It is enhanced to provide optimised paths for common response patterns. -```Rust +```rust use futures_core::stream::Stream; use std::pin::Pin; @@ -109,7 +109,7 @@ To enable more robust error handling, a generic error enum will be introduced. This allows the framework and protocol implementations to distinguish between unrecoverable transport failures and logical, protocol-level errors. -```Rust +```rust /// A generic error type for wireframe operations. # pub enum WireframeError { @@ -139,7 +139,7 @@ The following examples illustrate how developers will use the new API. Existing code continues to work without modification, fulfilling goal **G4**. -```Rust +```rust async fn handle_ping(_req: Request) -> Result, MyError> { // `MyFrame` implements `Into>` Ok(build_pong_frame().into()) @@ -151,7 +151,7 @@ async fn handle_ping(_req: Request) -> Result, MyErro For simple, fixed-size multi-part responses, like a MySQL result set header, `Response::Vec` is both ergonomic and performant. -```Rust +```rust async fn handle_select_headers(_req: Request) -> Result, MyError> { // Pre-build frames for: column-count, column-def, EOF let frames = vec!; @@ -164,7 +164,7 @@ async fn handle_select_headers(_req: Request) -> Result Result, PgError> { @@ -182,10 +182,14 @@ async fn handle_copy_out(req: PgCopyRequest) -> Result std::io::Result<()> { } ``` -## Future Work +## Future work This initial design focuses on a basic request/response workflow. Future extensions might include: diff --git a/src/app/builder.rs b/src/app/builder.rs index 71a81999..bba37b53 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -33,7 +33,7 @@ const MAX_READ_TIMEOUT_MS: u64 = 86_400_000; /// /// # Examples /// -/// ```no_run +/// ```rust,no_run /// use std::sync::Arc; /// /// use wireframe::app::ConnectionSetup; @@ -46,7 +46,7 @@ pub type ConnectionSetup = dyn Fn() -> Pin + Send> /// /// # Examples /// -/// ```no_run +/// ```rust,no_run /// use std::sync::Arc; /// /// use wireframe::app::ConnectionTeardown; diff --git a/src/extractor.rs b/src/extractor.rs index fcd2f801..bfbd1a14 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -189,25 +189,6 @@ pub trait FromMessageRequest: Sized { #[derive(Clone)] pub struct SharedState(Arc); -impl SharedState { - /// Creates a new [`SharedState`] instance wrapping the provided `Arc`. - /// - /// # Examples - /// - /// ```rust,no_run - /// use std::sync::Arc; - /// - /// use wireframe::extractor::SharedState; - /// - /// let data = Arc::new(5u32); - /// let state: SharedState = Arc::clone(&data).into(); - /// assert_eq!(*state, 5); - /// ``` - #[must_use] - #[deprecated(since = "0.2.0", note = "construct via `inner.into()` instead")] - pub fn new(inner: Arc) -> Self { Self(inner) } -} - impl From> for SharedState { fn from(inner: Arc) -> Self { Self(inner) } } diff --git a/src/frame/format.rs b/src/frame/format.rs index 6d1da226..f3a4671f 100644 --- a/src/frame/format.rs +++ b/src/frame/format.rs @@ -24,8 +24,30 @@ pub struct LengthFormat { impl LengthFormat { /// Creates a new `LengthFormat` with the specified number of bytes and /// endianness for the length prefix. + /// + /// # Panics + /// + /// Panics if `bytes` is not in `1..=8`. #[must_use] - pub const fn new(bytes: usize, endianness: Endianness) -> Self { Self { bytes, endianness } } + pub const fn new(bytes: usize, endianness: Endianness) -> Self { + assert!(matches!(bytes, 1..=8), "invalid length-prefix width"); + Self { bytes, endianness } + } + + /// Fallible constructor validating the prefix width. + /// + /// # Errors + /// + /// Returns an error if `bytes` is not in `1..=8`. + pub fn try_new(bytes: usize, endianness: Endianness) -> io::Result { + if !(1..=8).contains(&bytes) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid length-prefix width", + )); + } + Ok(Self { bytes, endianness }) + } /// Creates a `LengthFormat` for a 2-byte big-endian length prefix. #[must_use] @@ -46,8 +68,9 @@ impl LengthFormat { /// Read a length prefix from `bytes` according to this format. /// /// # Errors - /// Returns an error if `bytes` are shorter than the prefix or if the - /// encoded length exceeds `usize`. + /// Returns an error if `bytes` are shorter than the prefix, if the + /// configured prefix width is not in `1..=8`, or if the encoded length + /// exceeds `usize`. pub fn read_len(&self, bytes: &[u8]) -> io::Result { let len = bytes_to_u64(bytes, self.bytes, self.endianness)?; usize::try_from(len).map_err(|_| { diff --git a/tests/response.rs b/tests/response.rs index a379a665..42d3d2b2 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -142,6 +142,7 @@ fn custom_length_roundtrip( .expect("decode failed") .expect("frame missing"); assert_eq!(decoded, frame); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); } #[tokio::test] @@ -158,27 +159,17 @@ async fn send_response_propagates_write_error() { #[rstest] #[case(0, Endianness::Big)] -#[case(3, Endianness::Big)] -#[case(5, Endianness::Little)] +#[case(9, Endianness::Little)] fn encode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: Endianness) { - let fmt = LengthFormat::new(bytes, endian); - let mut buf = BytesMut::new(); - let err = fmt - .write_len(2, &mut buf) - .expect_err("encode must fail for unsupported prefix size"); + let err = LengthFormat::try_new(bytes, endian).expect_err("invalid width should error"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } #[rstest] #[case(0, Endianness::Little)] -#[case(3, Endianness::Little)] -#[case(5, Endianness::Big)] +#[case(9, Endianness::Big)] fn decode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: Endianness) { - let fmt = LengthFormat::new(bytes, endian); - let buf = vec![0u8; bytes]; - let err = fmt - .read_len(&buf) - .expect_err("decode must fail for unsupported prefix size"); + let err = LengthFormat::try_new(bytes, endian).expect_err("invalid width should error"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -200,7 +191,7 @@ fn encode_fails_for_length_too_large(#[case] fmt: LengthFormat, #[case] len: usi /// error is of the `Serialize` variant, indicating a failure during response encoding. #[tokio::test] async fn send_response_returns_encode_error() { - // Intentionally do not set a frame processor: encode should fail before framing. + // Use a type that fails during serialization; encode should fail before any framing. let app = WireframeApp::::new().expect("failed to create app"); let err = app .send_response(&mut Vec::new(), &FailingResp) diff --git a/tests/routes.rs b/tests/routes.rs index c3e2ed7c..97c304eb 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -140,31 +140,29 @@ async fn multiple_frames_processed_in_sequence() { ) .expect("route registration failed"); - let frames: Vec> = (1u8..=2) - .map(|id| { - let msg_bytes = Echo(id).to_bytes().expect("encode failed"); - let env = TestEnvelope { - id: 1, - correlation_id: Some(u64::from(id)), - payload: msg_bytes, - }; - let env_bytes = BincodeSerializer - .serialize(&env) - .expect("serialization failed"); - let mut framed = BytesMut::new(); - let mut codec = LengthDelimitedCodec::builder().new_codec(); - codec - .encode(env_bytes.into(), &mut framed) - .expect("encode failed"); - framed.to_vec() - }) - .collect(); - - let out = drive_with_frames(app, frames) + let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut encoded_frames = Vec::new(); + for id in 1u8..=2 { + let msg_bytes = Echo(id).to_bytes().expect("encode failed"); + let env = TestEnvelope { + id: 1, + correlation_id: Some(u64::from(id)), + payload: msg_bytes, + }; + let env_bytes = BincodeSerializer + .serialize(&env) + .expect("serialization failed"); + let mut framed = BytesMut::new(); + codec + .encode(env_bytes.into(), &mut framed) + .expect("encode failed"); + encoded_frames.push(framed.to_vec()); + } + + let out = drive_with_frames(app, encoded_frames) .await .expect("drive_with_frames failed"); - let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); let first = codec .decode(&mut buf) @@ -219,7 +217,6 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { let out = drive_with_frames(app, vec![framed.to_vec()]) .await .expect("drive failed"); - let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 7b8cf1ff..b1add30a 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -421,6 +421,31 @@ where Ok(buf) } +#[cfg(test)] +mod tests { + use wireframe::app::WireframeApp; + + use super::*; + + #[tokio::test] + async fn run_app_rejects_zero_capacity() { + let app = WireframeApp::new().expect("failed to create app"); + let err = run_app(app, vec![], Some(0)) + .await + .expect_err("capacity of zero should error"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + } + + #[tokio::test] + async fn run_app_rejects_excess_capacity() { + let app = WireframeApp::new().expect("failed to create app"); + let err = run_app(app, vec![], Some(MAX_CAPACITY + 1)) + .await + .expect_err("capacity beyond max should error"); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + } +} + /// Run `app` against an empty duplex stream. /// /// This helper drives the connection lifecycle without sending any frames, From d48ade181083e826fdaa0c749e1b078b3c7f7732 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 30 Aug 2025 02:03:29 +0100 Subject: [PATCH 04/10] Address review comments --- README.md | 2 +- .../asynchronous-outbound-messaging-design.md | 13 ++++++++++ docs/frame-metadata.md | 9 ++++--- ...-set-philosophy-and-capability-maturity.md | 5 ++-- ...eframe-1-0-detailed-development-roadmap.md | 16 ++++++------ docs/wireframe-client-design.md | 9 ++++--- examples/metadata_routing.rs | 6 +++-- src/app/builder.rs | 2 +- src/frame/format.rs | 12 +++++++-- src/frame/metadata.rs | 26 ++++++++++++++++--- src/frame/mod.rs | 3 ++- tests/middleware_order.rs | 1 + tests/response.rs | 7 ++--- 13 files changed, 80 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index d3561986..d642de4b 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ Use `None` rather than `Some(0)` when a frame lacks a correlation ID. See This allows integration with existing packet formats without modifying `handle_frame`. -## Response Serialization and Framing +## Response serialisation and framing Handlers can return types implementing the `Responder` trait. These values are encoded using the application's configured serializer and framed by a diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index b30a271d..1cea1b2d 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -743,6 +743,19 @@ features of the 1.0 release. the socket. The `PushHandle` and the application code that uses it remain completely unaware of fragmentation. +```rust +// Codec stack with explicit frame-size limits and fragmentation. +use tokio_util::codec::LengthDelimitedCodec; + +let codec = LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) // 64 KiB cap to prevent OOM + .new_codec(); + +// Wrap the length-delimited frames with the fragmentation adapter. +// Pseudocode pending actual adapter API naming: +// let codec = FragmentAdapter::new(FragmentConfig::default()).wrap(codec); +``` + ## 7. Use Cases ### 7.1 Server-Initiated MySQL Packets diff --git a/docs/frame-metadata.md b/docs/frame-metadata.md index 624252c0..aa5ebbb0 100644 --- a/docs/frame-metadata.md +++ b/docs/frame-metadata.md @@ -4,9 +4,9 @@ entire payload. This can be useful for routing decisions when the message type is encoded in a header field. -Implement the trait for your serializer or decoder that knows how to read the +Implement the trait for your serialiser or decoder that knows how to read the header bytes. Only the minimal header portion should be read, returning the -full frame and the number of bytes consumed from the input. +frame envelope and the number of bytes consumed from the input. ```rust use wireframe::frame::FrameMetadata; @@ -15,9 +15,10 @@ use tokio_util::codec::{Decoder, Encoder}; use bytes::{Bytes, BytesMut}; struct MyCodec; - +// Example only: implement the minimal Decoder/Encoder surface for docs. impl Decoder for MyCodec { - type Item = Bytes; + // Using BytesMut here mirrors LengthDelimitedCodec’s default Item. + type Item = BytesMut; type Error = std::io::Error; fn decode( diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 0f115690..7e269f9f 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -132,8 +132,9 @@ layer, the `FragmentAdapter`. #### The `FragmentAdapter` and `FragmentStrategy` This feature is designed for pluggability. The `FragmentAdapter` is a codec -that sits between the raw I/O and the main router. It contains the generic -logic for splitting large outbound frames and re-assembling inbound fragments. +(for example, `tokio_util::codec`) that sits between the raw I/O and the main +router. It contains the generic logic for splitting large outbound frames and +re-assembling inbound fragments. The protocol-specific rules—how to parse a fragment header, determine the payload length, and identify the final fragment—are provided by the user via a diff --git a/docs/wireframe-1-0-detailed-development-roadmap.md b/docs/wireframe-1-0-detailed-development-roadmap.md index b231765e..c2b3214f 100644 --- a/docs/wireframe-1-0-detailed-development-roadmap.md +++ b/docs/wireframe-1-0-detailed-development-roadmap.md @@ -20,14 +20,14 @@ public consumption. and message processing. This phase establishes the internal architecture upon which all public-facing features will be built.* -| Item | Name | Details | Size | Depends on | -| ---- | ------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | -| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | — | -| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | — | -| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; …)` loop that polls for shutdown signals, high-/low-priority pushes and the handler response stream in that strict order. | Large | #1.2 | -| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | — | -| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a codec. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments, and the outbound logic for splitting a single large frame. | Large | #1.4 | -| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | +| Item | Name | Details | Size | Depends on | +| ---- | ------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ | ---------- | +| 1.1 | Core Response & Error Types | Define the new `Response` enum with `Single`, `Vec`, `Stream` and `Empty` variants. Implement the generic `WireframeError` enum to distinguish between I/O and protocol errors. | Small | — | +| 1.2 | Priority Push Channels | Implement the internal dual-channel `mpsc` mechanism within the connection state to handle high-priority and low-priority pushed frames. | Medium | — | +| 1.3 | Connection Actor Write Loop | Convert per-request workers into stateful connection actors. Implement a `select!(biased; …)` loop that polls for shutdown signals, high-/low-priority pushes and the handler response stream in that strict order. | Large | #1.2 | +| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | — | +| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a codec (for example, via `tokio_util::codec`). Build the inbound reassembly logic for a single, non-multiplexed stream of fragments, and the outbound logic for splitting a single large frame. | Large | #1.4 | +| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 | ## Phase 2: Public APIs & Developer Ergonomics diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index 8f6c5682..b24f93b6 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -23,8 +23,9 @@ mirrors `WireframeServer` but operates in the opposite direction: - Connect to a `TcpStream`. - Optionally, send a preamble using the existing `Preamble` helpers. -- Encode outgoing messages using the selected `Serializer` and a - length‑delimited codec. +- Encode outgoing messages using the selected `Serializer` and + `tokio_util::codec::LengthDelimitedCodec` (4‑byte big‑endian prefix by + default; configurable). - Decode incoming frames into typed responses. - Expose async `send` and `receive` operations. @@ -92,5 +93,5 @@ extensions might include: By leveraging the existing abstractions for framing and serialization, client support can share most of the server’s implementation while providing a small ergonomic API. -[^1]: See [wireframe router - design](rust-binary-router-library-design.md#implementation-details). +[^1]: See [wireframe router design] +(rust-binary-router-library-design.md#implementation-details). diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index f9a4a68d..48588e6f 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -92,8 +92,10 @@ async fn main() -> io::Result<()> { frame.push(0); frame.extend_from_slice(&payload); - let mut codec = LengthDelimitedCodec::builder().new_codec(); - let mut bytes = BytesMut::new(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) // 64 KiB cap for the example + .new_codec(); + let mut bytes = BytesMut::with_capacity(frame.len() + 4); // +4 for the length prefix codec .encode(frame.into(), &mut bytes) .expect("failed to encode frame"); diff --git a/src/app/builder.rs b/src/app/builder.rs index bba37b53..1cd8d70c 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -103,7 +103,7 @@ where C: Send + 'static, E: Packet, { - /// Initializes empty routes, middleware, and application data with the + /// Initialises empty routes, middleware, and application data with the /// default serializer and no lifecycle hooks. fn default() -> Self { Self { diff --git a/src/frame/format.rs b/src/frame/format.rs index f3a4671f..3b722d14 100644 --- a/src/frame/format.rs +++ b/src/frame/format.rs @@ -17,8 +17,8 @@ pub enum Endianness { /// Format of the length prefix preceding each frame. #[derive(Clone, Copy, Debug)] pub struct LengthFormat { - pub bytes: usize, - pub endianness: Endianness, + bytes: usize, + endianness: Endianness, } impl LengthFormat { @@ -49,6 +49,14 @@ impl LengthFormat { Ok(Self { bytes, endianness }) } + /// Returns the prefix width in bytes. + #[must_use] + pub const fn bytes(&self) -> usize { self.bytes } + + /// Returns the endianness used for the prefix. + #[must_use] + pub const fn endianness(&self) -> Endianness { self.endianness } + /// Creates a `LengthFormat` for a 2-byte big-endian length prefix. #[must_use] pub const fn u16_be() -> Self { Self::new(2, Endianness::Big) } diff --git a/src/frame/metadata.rs b/src/frame/metadata.rs index 993aef22..fde2c838 100644 --- a/src/frame/metadata.rs +++ b/src/frame/metadata.rs @@ -1,14 +1,34 @@ //! Trait for parsing frame metadata from a header without decoding the payload. -/// Parse frame metadata from a byte slice. +/// Parse frame metadata from a byte slice without decoding the payload. +/// +/// # Examples +/// +/// ```rust +/// use std::io; +/// struct Demo; +/// impl FrameMetadata for Demo { +/// type Frame = (u32, bytes::Bytes); // (id, payload) +/// type Error = io::Error; +/// fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error> { +/// if src.len() < 4 { +/// return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "len")); +/// } +/// let id = u32::from_be_bytes([src[0], src[1], src[2], src[3]]); +/// Ok(((id, bytes::Bytes::copy_from_slice(&src[4..])), src.len())) +/// } +/// } +/// ``` pub trait FrameMetadata { - /// Fully deserialised frame type. + /// Fully deserialised frame envelope type. Implementations SHOULD keep + /// payloads zero‑copy (e.g., using `bytes::Bytes`) to avoid allocations. type Frame; /// Error produced when parsing the metadata. type Error: std::error::Error + Send + Sync + 'static; - /// Parse frame metadata from `src`, returning the frame and bytes consumed. + /// Parse frame metadata from `src`, returning the envelope and bytes + /// consumed. /// /// # Errors /// Returns an error if the bytes cannot be interpreted as valid metadata. diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 42a1fe0c..6f1bd73d 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,4 +1,5 @@ -//! Frame encoding utilities and length-prefix helpers. +//! Frame encoding utilities and length-prefix helpers built around +//! `tokio_util::codec::LengthDelimitedCodec` (4-byte big-endian by default). pub mod conversion; pub mod format; diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 8a8e135f..fdd48146 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -89,6 +89,7 @@ async fn middleware_applied_in_reverse_order() { .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after frame"); let (resp, _) = serializer .deserialize::(&frame) .expect("deserialize failed"); diff --git a/tests/response.rs b/tests/response.rs index 42d3d2b2..07031839 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -88,7 +88,8 @@ async fn length_prefixed_decode_requires_full_frame() { let mut codec = LengthDelimitedCodec::builder().new_codec(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); assert!(codec.decode(&mut buf).expect("decode failed").is_none()); - // the length prefix is consumed even if the frame is incomplete + // LengthDelimitedCodec consumes the length prefix even if the frame + // remains incomplete. assert_eq!(buf.len(), 2); } @@ -127,8 +128,8 @@ fn custom_length_roundtrip( #[case] prefix: Vec, ) { let mut builder = LengthDelimitedCodec::builder(); - builder.length_field_length(fmt.bytes); - if fmt.endianness == Endianness::Little { + builder.length_field_length(fmt.bytes()); + if fmt.endianness() == Endianness::Little { builder.little_endian(); } let mut codec = builder.new_codec(); From a32842cca790e71269010f314b2d332af326b9b9 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 30 Aug 2025 02:23:16 +0100 Subject: [PATCH 05/10] docs: clarify codec limits and timeout handling --- ...ge-fragmentation-and-re-assembly-design.md | 10 ++++----- docs/wireframe-client-design.md | 9 +++++--- tests/routes.rs | 22 ++++++++++++++----- wireframe_testing/src/helpers.rs | 2 +- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 68670cbb..ed367da8 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -234,11 +234,11 @@ robust against errors and attacks. extracted from the `PartialMessage`, the entry is removed from the map, and the complete logical frame is passed down the codec chain. -4. **Timeout Handling:** A separate, low-priority background task within the - `FragmentAdapter` will periodically iterate over the `reassembly_buffers`, - checking the `started_at` timestamp of each `PartialMessage`. Any entry that - has exceeded `reassembly_timeout` is removed, and a `WARN`-level `tracing` - event is emitted. +4. **Timeout handling:** Run a background task within the + `FragmentAdapter` that periodically iterates over the re‑assembly buffers, + checks each `PartialMessage`’s `started_at` timestamp, and removes any entry + that has exceeded the re‑assembly timeout, emitting a `WARN`‑level `tracing` + event. ### 5.2 Outbound path (fragmentation) diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index b24f93b6..526118dc 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -25,7 +25,9 @@ mirrors `WireframeServer` but operates in the opposite direction: - Optionally, send a preamble using the existing `Preamble` helpers. - Encode outgoing messages using the selected `Serializer` and `tokio_util::codec::LengthDelimitedCodec` (4‑byte big‑endian prefix by - default; configurable). + default; configurable). Configure the codec’s `max_frame_length` to match the + server’s frame capacity; otherwise, frames larger than the default 8 MiB will + fail. - Decode incoming frames into typed responses. - Expose async `send` and `receive` operations. @@ -93,5 +95,6 @@ extensions might include: By leveraging the existing abstractions for framing and serialization, client support can share most of the server’s implementation while providing a small ergonomic API. -[^1]: See [wireframe router design] -(rust-binary-router-library-design.md#implementation-details). +[^1]: See [wireframe router design][router-design]. + +[router-design]: rust-binary-router-library-design.md#implementation-details diff --git a/tests/routes.rs b/tests/routes.rs index 97c304eb..a91c8d55 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -82,7 +82,9 @@ async fn handler_receives_message_and_echoes_response() { .await .expect("drive_with_bincode failed"); - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) + .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -115,7 +117,9 @@ async fn handler_echoes_with_none_correlation_id() { }; let out = drive_with_bincode(app, env).await.expect("drive failed"); - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) + .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -140,7 +144,9 @@ async fn multiple_frames_processed_in_sequence() { ) .expect("route registration failed"); - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) + .new_codec(); let mut encoded_frames = Vec::new(); for id in 1u8..=2 { let msg_bytes = Echo(id).to_bytes().expect("encode failed"); @@ -152,7 +158,7 @@ async fn multiple_frames_processed_in_sequence() { let env_bytes = BincodeSerializer .serialize(&env) .expect("serialization failed"); - let mut framed = BytesMut::new(); + let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); codec .encode(env_bytes.into(), &mut framed) .expect("encode failed"); @@ -176,6 +182,7 @@ async fn multiple_frames_processed_in_sequence() { .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after two frames"); let (env2, _) = BincodeSerializer .deserialize::(&second) .expect("deserialize failed"); @@ -208,8 +215,10 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { }; let env_bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); - let mut framed = BytesMut::new(); - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) + .new_codec(); codec .encode(env_bytes.into(), &mut framed) .expect("encode failed"); @@ -222,6 +231,7 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { .decode(&mut buf) .expect("decode failed") .expect("missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index b1add30a..8156f5c1 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -348,7 +348,7 @@ where ) })?; let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(bytes.len()) + .max_frame_length(DEFAULT_CAPACITY) .new_codec(); let mut framed = BytesMut::new(); codec.encode(bytes.into(), &mut framed)?; From b510946db6341b74431dea6b8d34755253b5dcad Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 30 Aug 2025 12:09:10 +0100 Subject: [PATCH 06/10] Refine codec docs and tests --- README.md | 8 ++++---- .../asynchronous-outbound-messaging-design.md | 4 ++-- ...ge-fragmentation-and-re-assembly-design.md | 6 +++++- docs/wireframe-client-design.md | 6 +++--- examples/metadata_routing.rs | 4 +++- src/app/connection.rs | 14 ++++++++------ src/frame/metadata.rs | 18 +++++++++++------- tests/lifecycle.rs | 6 +++++- tests/middleware_order.rs | 6 +++++- tests/response.rs | 19 ++++++++++++++----- tests/routes.rs | 10 ++++++---- 11 files changed, 66 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index d642de4b..1795e6f1 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ routing, extractors, and middleware. ## Motivation -Manual handling of binary protocols typically involves verbose serialization +Manual handling of binary protocols typically involves verbose serialisation code, custom frame parsing, and complex dispatch logic. `wireframe` aims to reduce this boilerplate through layered abstractions: @@ -16,7 +16,7 @@ reduce this boilerplate through layered abstractions: - **Connection preamble** with customizable validation callbacks \[[docs](docs/preamble-validator.md)\] - Call `with_preamble::()` before registering success or failure callbacks -- **Serialization engine** using `bincode` or a `wire-rs` wrapper +- **Serialisation engine** using `bincode` or a `wire-rs` wrapper - **Routing engine** that dispatches messages by ID - **Handler invocation** with extractor support - **Middleware chain** for request/response processing @@ -140,7 +140,7 @@ big‑endian header【F:docs/rust-binary-router-library-design.md†L1082-L1123 let app = WireframeApp::new()?; ``` -## Connection Lifecycle +## Connection lifecycle Protocol callbacks are consolidated under the `WireframeProtocol` trait, replacing the individual `on_connection_setup`/`on_connection_teardown` @@ -282,7 +282,7 @@ let logging = from_fn(|req, next| async move { Example programs are available in the `examples/` directory: - `echo.rs` — minimal echo server using routing -- `ping_pong.rs` — showcases serialization and middleware in a ping/pong +- `ping_pong.rs` — showcases serialisation and middleware in a ping/pong protocol. See [examples/ping_pong.md](examples/ping_pong.md) for a detailed overview. - [`packet_enum.rs`](examples/packet_enum.rs) — shows packet type discrimination diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 1cea1b2d..77427d8a 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -746,9 +746,9 @@ features of the 1.0 release. ```rust // Codec stack with explicit frame-size limits and fragmentation. use tokio_util::codec::LengthDelimitedCodec; - +const MAX_FRAME: usize = 64 * 1024; let codec = LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) // 64 KiB cap to prevent OOM + .max_frame_length(MAX_FRAME) // 64 KiB cap to prevent OOM .new_codec(); // Wrap the length-delimited frames with the fragmentation adapter. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index ed367da8..2024ec3f 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -163,7 +163,11 @@ codec chain via the `WireframeApp` builder. // Pseudo‑API: enable fragmentation with a strategy on the codec stack. WireframeServer::new(|| { WireframeApp::new() - .codec(LengthDelimitedCodec::builder().new_codec()) + .codec( + LengthDelimitedCodec::builder() + .max_frame_length(64 * 1024) + .new_codec(), + ) .codec(FragmentAdapter::new(MySqlStrategy::new())) .route(/* ... */) }) diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index 526118dc..e31de2d3 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -58,13 +58,13 @@ let response: LoginAck = client.call(request).await?; Internally, this uses the `Serializer` to encode the request, sends it through the length‑delimited codec, then waits for a frame, decodes it, and -deserializes the response type. +deserialises the response type. ### Connection lifecycle Like the server, the client should expose hooks for connection setup and -teardown. These mirror the server’s lifecycle callbacks so both sides can share -initialization logic. +teardown. These mirror the server’s lifecycle callbacks, so both sides can +share initialisation logic. ## Example usage diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index 48588e6f..ef19e185 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -12,6 +12,8 @@ use wireframe::{app::Envelope, frame::FrameMetadata, message::Message, serialize type App = wireframe::app::WireframeApp; +const MAX_FRAME: usize = 64 * 1024; + /// Frame format with a two-byte id, one-byte flags, and bincode payload. #[derive(Default)] struct HeaderSerializer; @@ -93,7 +95,7 @@ async fn main() -> io::Result<()> { frame.extend_from_slice(&payload); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) // 64 KiB cap for the example + .max_frame_length(MAX_FRAME) // 64 KiB cap for the example .new_codec(); let mut bytes = BytesMut::with_capacity(frame.len() + 4); // +4 for the length prefix codec diff --git a/src/app/connection.rs b/src/app/connection.rs index e47f0d0b..01412a29 100644 --- a/src/app/connection.rs +++ b/src/app/connection.rs @@ -37,6 +37,12 @@ where C: Send + 'static, E: Packet, { + fn new_length_codec(&self) -> LengthDelimitedCodec { + LengthDelimitedCodec::builder() + .max_frame_length(self.buffer_capacity) + .new_codec() + } + /// Serialize `msg` and write it to `stream` using a length-delimited codec. /// /// # Errors @@ -55,9 +61,7 @@ where .serializer .serialize(msg) .map_err(SendError::Serialize)?; - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(self.buffer_capacity) - .new_codec(); + let mut codec = self.new_length_codec(); let mut framed = BytesMut::new(); codec .encode(bytes.into(), &mut framed) @@ -160,9 +164,7 @@ where where W: AsyncRead + AsyncWrite + Unpin, { - let codec = LengthDelimitedCodec::builder() - .max_frame_length(self.buffer_capacity) - .new_codec(); + let codec = self.new_length_codec(); let mut framed = Framed::new(stream, codec); framed.read_buffer_mut().reserve(self.buffer_capacity); let mut deser_failures = 0u32; diff --git a/src/frame/metadata.rs b/src/frame/metadata.rs index fde2c838..690758be 100644 --- a/src/frame/metadata.rs +++ b/src/frame/metadata.rs @@ -4,31 +4,35 @@ /// /// # Examples /// -/// ```rust +/// ```rust,no_run /// use std::io; +/// +/// use bytes::Bytes; /// struct Demo; /// impl FrameMetadata for Demo { -/// type Frame = (u32, bytes::Bytes); // (id, payload) +/// type Frame = (u32, Bytes); // (id, payload) /// type Error = io::Error; /// fn parse(&self, src: &[u8]) -> Result<(Self::Frame, usize), Self::Error> { /// if src.len() < 4 { /// return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "len")); /// } /// let id = u32::from_be_bytes([src[0], src[1], src[2], src[3]]); -/// Ok(((id, bytes::Bytes::copy_from_slice(&src[4..])), src.len())) +/// Ok(((id, Bytes::copy_from_slice(&src[4..])), src.len())) /// } /// } /// ``` pub trait FrameMetadata { - /// Fully deserialised frame envelope type. Implementations SHOULD keep - /// payloads zero‑copy (e.g., using `bytes::Bytes`) to avoid allocations. + /// Fully deserialised frame envelope type. + /// Prefer avoiding unnecessary allocations. Zero‑copy is only possible + /// when the caller supplies an owned, ref‑counted buffer (e.g., `Bytes`). + /// With the current `&[u8]` input, copying the payload is typically required. type Frame; /// Error produced when parsing the metadata. type Error: std::error::Error + Send + Sync + 'static; - /// Parse frame metadata from `src`, returning the envelope and bytes - /// consumed. + /// Parse frame metadata from `src`, returning the envelope and the number + /// of bytes consumed. /// /// # Errors /// Returns an error if the bytes cannot be interpreted as valid metadata. diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index ff39517e..95e0af7e 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -22,6 +22,8 @@ use wireframe_testing::{run_app, run_with_duplex_server}; type App = wireframe::app::WireframeApp; type BasicApp = wireframe::app::WireframeApp; +const MAX_FRAME: usize = 64 * 1024; + fn call_counting_callback( counter: &Arc, result: R, @@ -148,7 +150,9 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { .serialize(&env) .expect("failed to serialise envelope"); let mut frame = BytesMut::new(); - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) + .new_codec(); codec .encode(bytes.into(), &mut frame) .expect("encode should succeed"); diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index fdd48146..4e82ff25 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -14,6 +14,8 @@ use wireframe::{ type TestApp = wireframe::app::WireframeApp; +const MAX_FRAME: usize = 64 * 1024; + struct TagMiddleware(u8); struct TagService { @@ -70,7 +72,9 @@ async fn middleware_applied_in_reverse_order() { let serializer = BincodeSerializer; let bytes = serializer.serialize(&env).expect("serialization failed"); // Use a length-delimited codec for framing - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) + .new_codec(); let mut buf = BytesMut::new(); codec .encode(bytes.into(), &mut buf) diff --git a/tests/response.rs b/tests/response.rs index 07031839..ea7140b8 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -20,6 +20,8 @@ use wireframe_testing::run_app; mod common; use common::TestApp; +const MAX_FRAME: usize = 64 * 1024; + #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestResp(u32); @@ -57,7 +59,9 @@ async fn send_response_encodes_and_frames() { .await .expect("send_response failed"); - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) + .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -73,19 +77,23 @@ async fn send_response_encodes_and_frames() { /// This ensures that the decoder waits for the full header before attempting to decode a frame. #[tokio::test] async fn length_prefixed_decode_requires_complete_header() { - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) + .new_codec(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes assert!(codec.decode(&mut buf).expect("decode failed").is_none()); assert_eq!(buf.len(), 3); // nothing consumed } /// Tests that decoding with a complete length prefix but incomplete frame data returns `None` -/// and retains all bytes in the buffer. +/// and consumes only the 4-byte length prefix. /// -/// Ensures that the decoder does not consume any bytes when the full frame is not yet available. +/// Confirms that the decoder leaves the incomplete body in the buffer until the full frame arrives. #[tokio::test] async fn length_prefixed_decode_requires_full_frame() { - let mut codec = LengthDelimitedCodec::builder().new_codec(); + let mut codec = LengthDelimitedCodec::builder() + .max_frame_length(MAX_FRAME) + .new_codec(); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); assert!(codec.decode(&mut buf).expect("decode failed").is_none()); // LengthDelimitedCodec consumes the length prefix even if the frame @@ -132,6 +140,7 @@ fn custom_length_roundtrip( if fmt.endianness() == Endianness::Little { builder.little_endian(); } + builder.max_frame_length(MAX_FRAME); let mut codec = builder.new_codec(); let mut buf = BytesMut::new(); codec diff --git a/tests/routes.rs b/tests/routes.rs index a91c8d55..bd6b771b 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -20,6 +20,8 @@ use wireframe_testing::{drive_with_bincode, drive_with_frames}; type TestApp = wireframe::app::WireframeApp; +const MAX_FRAME: usize = 64 * 1024; + #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug, Clone)] struct TestEnvelope { id: u32, @@ -83,7 +85,7 @@ async fn handler_receives_message_and_echoes_response() { .expect("drive_with_bincode failed"); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) + .max_frame_length(MAX_FRAME) .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec @@ -118,7 +120,7 @@ async fn handler_echoes_with_none_correlation_id() { let out = drive_with_bincode(app, env).await.expect("drive failed"); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) + .max_frame_length(MAX_FRAME) .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec @@ -145,7 +147,7 @@ async fn multiple_frames_processed_in_sequence() { .expect("route registration failed"); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) + .max_frame_length(MAX_FRAME) .new_codec(); let mut encoded_frames = Vec::new(); for id in 1u8..=2 { @@ -217,7 +219,7 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) + .max_frame_length(MAX_FRAME) .new_codec(); codec .encode(env_bytes.into(), &mut framed) From 438f184ef8dd9edca296fccbb7f087d35f6b53fb Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 30 Aug 2025 17:11:12 +0100 Subject: [PATCH 07/10] Polish docs and tests --- .../asynchronous-outbound-messaging-design.md | 44 +++++++++---------- docs/frame-metadata.md | 2 +- ...i-packet-and-streaming-responses-design.md | 7 ++- docs/rust-binary-router-library-design.md | 4 +- examples/metadata_routing.rs | 7 +-- src/app/builder.rs | 2 +- tests/response.rs | 2 +- 7 files changed, 32 insertions(+), 36 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 77427d8a..fa20051f 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -317,27 +317,27 @@ impl PushHandle { frame: F, priority: PushPriority, policy: PushPolicy, - ) -> Result<(), PushError>; - } - ``` - - The example below demonstrates pushing frames and returning a streamed - response. The [`async-stream`](https://docs.rs/async-stream) crate is the - canonical way to build dynamic `Response::Stream` values. - - ```rust,no_run - use async_stream::try_stream; - use std::sync::Arc; - use wireframe::{app::{Envelope, WireframeApp}, Response}; - - #[tokio::main] - async fn main() -> std::io::Result<()> { - let app = WireframeApp::new()? - .route(1, Arc::new(|_: &Envelope| Box::pin(async { - Response::Stream(Box::pin(try_stream! { - yield b"ack".to_vec(); - })) - })))?; + ) -> Result<(), PushError>; + } +``` + +The example below demonstrates pushing frames and returning a streamed +response. The [`async-stream`](https://docs.rs/async-stream) crate is the +canonical way to build dynamic `Response::Stream` values. + +```rust,no_run +use async_stream::try_stream; +use std::sync::Arc; +use wireframe::{app::{Envelope, WireframeApp}, Response}; + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let app = WireframeApp::new()? + .route(1, Arc::new(|_: &Envelope| Box::pin(async { + Response::Stream(Box::pin(try_stream! { + yield b"ack".to_vec(); + })) + })))?; let (push, mut conn) = wireframe_testing::connect(app).await?; tokio::spawn(async move { @@ -350,7 +350,7 @@ impl PushHandle { } Ok(()) } - ``` +``` ```mermaid classDiagram diff --git a/docs/frame-metadata.md b/docs/frame-metadata.md index aa5ebbb0..ab028923 100644 --- a/docs/frame-metadata.md +++ b/docs/frame-metadata.md @@ -48,5 +48,5 @@ impl FrameMetadata for MyCodec { } ``` -In `WireframeApp` the metadata parser is used before deserialisation so routes +In `WireframeApp` the metadata parser is used before deserialization so routes can be selected as soon as the header is available. diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 86a7f3de..88c2d80e 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -71,7 +71,7 @@ The public API is designed for clarity, performance, and ergonomic flexibility. ### 4.1 The `Response` Enum The `Response` enum is the primary return type for all handlers. It is enhanced -to provide optimised paths for common response patterns. +to provide optimized paths for common response patterns. ```rust use futures_core::stream::Stream; @@ -82,7 +82,7 @@ pub enum Response { /// A single frame reply. The most common case. Single(F), - /// An optimised response for a small, known list of frames. + /// An optimized response for a small, known list of frames. /// Avoids the overhead of boxing and dynamic dispatch for simple multi-part replies. Vec(Vec), @@ -111,7 +111,6 @@ unrecoverable transport failures and logical, protocol-level errors. ```rust /// A generic error type for wireframe operations. -# pub enum WireframeError { /// An error occurred in the underlying transport (e.g., socket closed). /// These are typically unrecoverable for the connection. @@ -154,7 +153,7 @@ For simple, fixed-size multi-part responses, like a MySQL result set header, ```rust async fn handle_select_headers(_req: Request) -> Result, MyError> { // Pre-build frames for: column-count, column-def, EOF - let frames = vec!; + let frames = vec![]; Ok(Response::Vec(frames)) } ``` diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index d070854a..47ff0022 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -426,7 +426,7 @@ allowing applications to select the prefix size and byte order. identifiers or flags. This allows for a two-stage deserialization: first the metadata, then, based on that, the full payload. -The relationship between the trait and example serializers can be visualised as +The relationship between the trait and example serializers can be visualized as follows: // Class diagram of `FrameMetadata` and serializers. @@ -458,7 +458,7 @@ classDiagram During message processing, metadata parsing precedes full deserialization: -// Sequence diagram of metadata parsing before deserialisation. +// Sequence diagram of metadata parsing before deserialization. ```mermaid sequenceDiagram diff --git a/examples/metadata_routing.rs b/examples/metadata_routing.rs index ef19e185..654f39a9 100644 --- a/examples/metadata_routing.rs +++ b/examples/metadata_routing.rs @@ -49,11 +49,8 @@ impl FrameMetadata for HeaderSerializer { // ignores the flags, but a real protocol might parse and act on these // bits. let _ = src[2]; - let payload = src[3..].to_vec(); - // `parse` receives the complete frame because the length-delimited codec - // ensures `src` contains exactly one message. Returning `src.len()` is - // therefore correct for this demo. - Ok((Envelope::new(id, None, payload), src.len())) + // Only extract metadata here; defer payload handling to the serializer. + Ok((Envelope::new(id, None, Vec::new()), 3)) } } diff --git a/src/app/builder.rs b/src/app/builder.rs index 1cd8d70c..4c3ab02d 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -139,7 +139,7 @@ where /// /// ``` /// use wireframe::app::WireframeApp; - /// WireframeApp::<_, _, wireframe::app::Envelope>::new().expect("failed to initialize app"); + /// WireframeApp::<_, _, wireframe::app::Envelope>::new().expect("failed to initialise app"); /// ``` pub fn new() -> Result { Ok(Self::default()) } } diff --git a/tests/response.rs b/tests/response.rs index ea7140b8..03df17c1 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -20,7 +20,7 @@ use wireframe_testing::run_app; mod common; use common::TestApp; -const MAX_FRAME: usize = 64 * 1024; +const MAX_FRAME: usize = 16 * 1024 * 1024; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestResp(u32); From 438dfd31a425c8c143ae3968a8fad7aa195c617c Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 30 Aug 2025 19:01:20 +0100 Subject: [PATCH 08/10] Align docs and tests with codec limits --- README.md | 8 +++--- ...ge-fragmentation-and-re-assembly-design.md | 28 +++++++++---------- docs/wireframe-client-design.md | 5 ++-- tests/lifecycle.rs | 7 ++--- tests/middleware_order.rs | 5 ++-- tests/response.rs | 1 + tests/routes.rs | 14 +++++----- wireframe_testing/src/helpers.rs | 2 ++ wireframe_testing/src/lib.rs | 1 + 9 files changed, 36 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 1795e6f1..ccfa37f5 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ routing, extractors, and middleware. ## Motivation -Manual handling of binary protocols typically involves verbose serialisation +Manual handling of binary protocols typically involves verbose serialization code, custom frame parsing, and complex dispatch logic. `wireframe` aims to reduce this boilerplate through layered abstractions: @@ -16,7 +16,7 @@ reduce this boilerplate through layered abstractions: - **Connection preamble** with customizable validation callbacks \[[docs](docs/preamble-validator.md)\] - Call `with_preamble::()` before registering success or failure callbacks -- **Serialisation engine** using `bincode` or a `wire-rs` wrapper +- **Serialization engine** using `bincode` or a `wire-rs` wrapper - **Routing engine** that dispatches messages by ID - **Handler invocation** with extractor support - **Middleware chain** for request/response processing @@ -126,7 +126,7 @@ Use `None` rather than `Some(0)` when a frame lacks a correlation ID. See This allows integration with existing packet formats without modifying `handle_frame`. -## Response serialisation and framing +## Response serialization and framing Handlers can return types implementing the `Responder` trait. These values are encoded using the application's configured serializer and framed by a @@ -282,7 +282,7 @@ let logging = from_fn(|req, next| async move { Example programs are available in the `examples/` directory: - `echo.rs` — minimal echo server using routing -- `ping_pong.rs` — showcases serialisation and middleware in a ping/pong +- `ping_pong.rs` — showcases serialization and middleware in a ping/pong protocol. See [examples/ping_pong.md](examples/ping_pong.md) for a detailed overview. - [`packet_enum.rs`](examples/packet_enum.rs) — shows packet type discrimination diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 2024ec3f..eef5ef3b 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -17,6 +17,9 @@ This new layer will be responsible for automatically splitting oversized outbound frames and meticulously re-assembling inbound fragments into a single, coherent message before they reach the router. +> Status: design proposal. API names, trait bounds, and configuration shapes +> may change before stabilisation. + This feature is a critical component of the "Road to Wireframe 1.0," designed to seamlessly integrate with and underpin the streaming and server-push capabilities. @@ -25,8 +28,7 @@ capabilities. The implementation must satisfy the following core requirements: - - + | ID | Goal | | --- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | G1 | Transparent inbound re-assembly → The router and handlers must always receive one complete, logical Frame. | @@ -35,8 +37,6 @@ The implementation must satisfy the following core requirements: | G4 | Denial‑of‑service (DoS) protection: The re-assembly process must be hardened against resource exhaustion attacks via strict memory and time limits. | | G5 | Zero Friction: Protocols that do not use fragmentation must incur no performance or complexity overhead. This feature must be strictly opt-in. | - - ## 3. Core architecture: the `FragmentAdapter` The feature will be implemented as a codec middleware called `FragmentAdapter`. @@ -78,6 +78,8 @@ pub struct FragmentAdapter { struct PartialMessage { /// The buffer holding the accumulating payload. buffer: BytesMut, + /// The advertised total payload size, if known. + expected_total: Option, /// The sequence number of the last fragment received. last_sequence: u64, /// The time the first fragment was received. @@ -150,7 +152,7 @@ pub trait FragmentStrategy: 'static + Send + Sync { is_final: bool, msg_id: u64, seq: u64, - ); + ) -> io::Result<()>; } ``` @@ -220,8 +222,9 @@ robust against errors and attacks. passed down the chain immediately without being buffered. - If `meta.is_final` is false, a new `PartialMessage` is created. The - buffer is pre-allocated if `meta.total_message_len` is `Some`. The - `started_at` timestamp is recorded. + buffer is pre-allocated if `meta.total_message_len` is `Some`, and + `expected_total` stores this hint. The `started_at` timestamp is + recorded. - **Continuing Message (**`.get_mut()`**):** @@ -264,7 +267,7 @@ The outbound path is simpler and purely procedural. - For each chunk, it calls `strategy.encode_header()` to write the fragment header (with the correct `msg_id`, `seq`, and `is_final` flag) into a - temporary buffer, followed by the payload chunk. + buffer, propagating any error, followed by the payload chunk. - Each fully formed fragment is then passed individually to the next codec. @@ -285,15 +288,12 @@ This feature is designed as a foundational layer that other features build upon. ## 7. Measurable objectives and success criteria - - + | Category | Objective | Success Metric | | --------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | API Correctness | The FragmentStrategy trait and FragmentAdapter are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | -| Functionality | A large logical frame is correctly split into N fragments, and a sequence of N fragments is correctly re-assembled into the original frame. | An end-to-end test confirms byte-for-byte identity of a 64 MiB payload after being fragmented and re-assembled. | +| Functionality | A large logical frame is correctly split into N fragments, and a sequence of N fragments is correctly re-assembled into the original frame. | An end-to-end test confirms byte-for-byte identity of a payload at the configured max_message_size after being fragmented and re-assembled. | | Multiplexing | The adapter can correctly re-assemble two messages whose fragments are interleaved. | A test sending fragments A1, B1, A2, B2, A3, B3 must result in two correctly re-assembled messages, A and B. | -| Resilience | The adapter protects against memory exhaustion from oversized messages. | A test sending fragments for a 2 GiB message against a 1 GiB max_message_size limit must terminate the connection and not allocate more than ~1 GiB of buffer memory. | +| Resilience | The adapter protects against memory exhaustion from oversized messages. | A test sending fragments that exceed max_message_size must terminate the connection and not allocate beyond the configured cap (including allocator overhead). | | Resilience | The adapter protects against resource leaks from abandoned partial messages. | A test that sends an initial fragment but never the final one must result in the partial buffer being purged after the reassembly_timeout duration has passed. | | Performance | The overhead for messages that do not require fragmentation is minimal. | A criterion benchmark passing a stream of small, non-fragmented frames through the FragmentAdapter must show < 5% throughput degradation compared to a build without the adapter. | - - diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index e31de2d3..66d91e09 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -95,6 +95,5 @@ extensions might include: By leveraging the existing abstractions for framing and serialization, client support can share most of the server’s implementation while providing a small ergonomic API. -[^1]: See [wireframe router design][router-design]. - -[router-design]: rust-binary-router-library-design.md#implementation-details +[^1]: See + [wireframe router design](rust-binary-router-library-design.md#implementation-details). diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 95e0af7e..97b6fa42 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -17,13 +17,11 @@ use wireframe::{ app::{Envelope, Packet, PacketParts}, serializer::{BincodeSerializer, Serializer}, }; -use wireframe_testing::{run_app, run_with_duplex_server}; +use wireframe_testing::{TEST_MAX_FRAME, run_app, run_with_duplex_server}; type App = wireframe::app::WireframeApp; type BasicApp = wireframe::app::WireframeApp; -const MAX_FRAME: usize = 64 * 1024; - fn call_counting_callback( counter: &Arc, result: R, @@ -151,7 +149,7 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { .expect("failed to serialise envelope"); let mut frame = BytesMut::new(); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) + .max_frame_length(TEST_MAX_FRAME) .new_codec(); codec .encode(bytes.into(), &mut frame) @@ -167,6 +165,7 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp, _) = BincodeSerializer .deserialize::(&decoded) .expect("deserialize failed"); diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 4e82ff25..2c5b32bd 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -11,11 +11,10 @@ use wireframe::{ middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, }; +use wireframe_testing::TEST_MAX_FRAME; type TestApp = wireframe::app::WireframeApp; -const MAX_FRAME: usize = 64 * 1024; - struct TagMiddleware(u8); struct TagService { @@ -73,7 +72,7 @@ async fn middleware_applied_in_reverse_order() { let bytes = serializer.serialize(&env).expect("serialization failed"); // Use a length-delimited codec for framing let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) + .max_frame_length(TEST_MAX_FRAME) .new_codec(); let mut buf = BytesMut::new(); codec diff --git a/tests/response.rs b/tests/response.rs index 03df17c1..f3e52ae4 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -69,6 +69,7 @@ async fn send_response_encodes_and_frames() { .expect("frame missing"); let (decoded, _) = TestResp::from_bytes(&frame).expect("deserialize failed"); assert_eq!(decoded, TestResp(7)); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); } /// Tests that decoding with an incomplete length prefix header returns `None` and does not consume diff --git a/tests/routes.rs b/tests/routes.rs index bd6b771b..a4147cf8 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -16,12 +16,10 @@ use wireframe::{ message::Message, serializer::BincodeSerializer, }; -use wireframe_testing::{drive_with_bincode, drive_with_frames}; +use wireframe_testing::{TEST_MAX_FRAME, drive_with_bincode, drive_with_frames}; type TestApp = wireframe::app::WireframeApp; -const MAX_FRAME: usize = 64 * 1024; - #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug, Clone)] struct TestEnvelope { id: u32, @@ -85,13 +83,14 @@ async fn handler_receives_message_and_echoes_response() { .expect("drive_with_bincode failed"); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) + .max_frame_length(TEST_MAX_FRAME) .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) .expect("decode failed") .expect("frame missing"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); @@ -120,13 +119,14 @@ async fn handler_echoes_with_none_correlation_id() { let out = drive_with_bincode(app, env).await.expect("drive failed"); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) + .max_frame_length(TEST_MAX_FRAME) .new_codec(); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) .expect("decode failed") .expect("missing frame"); + assert!(buf.is_empty(), "unexpected trailing bytes after decode"); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .expect("deserialize failed"); @@ -147,7 +147,7 @@ async fn multiple_frames_processed_in_sequence() { .expect("route registration failed"); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) + .max_frame_length(TEST_MAX_FRAME) .new_codec(); let mut encoded_frames = Vec::new(); for id in 1u8..=2 { @@ -219,7 +219,7 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) + .max_frame_length(TEST_MAX_FRAME) .new_codec(); codec .encode(env_bytes.into(), &mut framed) diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 8156f5c1..ec942cd8 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -92,6 +92,8 @@ where const DEFAULT_CAPACITY: usize = 4096; const MAX_CAPACITY: usize = 1024 * 1024 * 10; // 10MB limit pub(crate) const EMPTY_SERVER_CAPACITY: usize = 64; +/// Shared frame cap used by helpers and tests to avoid drift. +pub const TEST_MAX_FRAME: usize = DEFAULT_CAPACITY; macro_rules! forward_default { ( diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 591d295f..6efe3bec 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -22,6 +22,7 @@ pub mod helpers; pub mod logging; pub use helpers::{ + TEST_MAX_FRAME, TestSerializer, drive_with_bincode, drive_with_frame, From 722ab6a8753fd90b43e1dad5f268334a5708de6a Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 31 Aug 2025 13:32:30 +0100 Subject: [PATCH 09/10] Document symmetric codec limits and share test helper --- ...ge-fragmentation-and-re-assembly-design.md | 18 ++++++++----- docs/wireframe-client-design.md | 10 +++---- src/app/connection.rs | 4 ++- tests/lifecycle.rs | 10 +++---- tests/middleware_order.rs | 10 +++---- tests/response.rs | 26 ++++++------------- tests/routes.rs | 20 +++++--------- wireframe_testing/src/helpers.rs | 13 +++++++--- wireframe_testing/src/lib.rs | 1 + 9 files changed, 52 insertions(+), 60 deletions(-) diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index eef5ef3b..5804b499 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -164,17 +164,23 @@ codec chain via the `WireframeApp` builder. ```rust // Pseudo‑API: enable fragmentation with a strategy on the codec stack. WireframeServer::new(|| { - WireframeApp::new() - .codec( + let mut app = WireframeApp::new(); + app + .codec({ + // Match the application buffer capacity to avoid default 8 MiB limits. + let cap = app.buffer_capacity(); LengthDelimitedCodec::builder() - .max_frame_length(64 * 1024) - .new_codec(), - ) + .max_frame_length(cap) + .new_codec() + }) .codec(FragmentAdapter::new(MySqlStrategy::new())) .route(/* ... */) }) ``` +Configure the same max frame length for all codec instances on inbound and +outbound paths. + ```rust,no_run use async_stream::try_stream; use wireframe::{app::WireframeApp, Response}; @@ -239,7 +245,7 @@ robust against errors and attacks. - **Final Fragment:** If `meta.is_final` is true, the full payload is extracted from the `PartialMessage`, the entry is removed from the map, - and the complete logical frame is passed down the codec chain. + then pass the complete logical frame down the codec chain. 4. **Timeout handling:** Run a background task within the `FragmentAdapter` that periodically iterates over the re‑assembly buffers, diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index 66d91e09..df7afda5 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -25,9 +25,9 @@ mirrors `WireframeServer` but operates in the opposite direction: - Optionally, send a preamble using the existing `Preamble` helpers. - Encode outgoing messages using the selected `Serializer` and `tokio_util::codec::LengthDelimitedCodec` (4‑byte big‑endian prefix by - default; configurable). Configure the codec’s `max_frame_length` to match the - server’s frame capacity; otherwise, frames larger than the default 8 MiB will - fail. + default; configurable). Configure the codec’s `max_frame_length` on both the + inbound (decode) and outbound (encode) paths to match the server’s frame + capacity; otherwise, frames larger than the default 8 MiB will fail. - Decode incoming frames into typed responses. - Expose async `send` and `receive` operations. @@ -58,13 +58,13 @@ let response: LoginAck = client.call(request).await?; Internally, this uses the `Serializer` to encode the request, sends it through the length‑delimited codec, then waits for a frame, decodes it, and -deserialises the response type. +deserializes the response type. ### Connection lifecycle Like the server, the client should expose hooks for connection setup and teardown. These mirror the server’s lifecycle callbacks, so both sides can -share initialisation logic. +share initialization logic. ## Example usage diff --git a/src/app/connection.rs b/src/app/connection.rs index 01412a29..d2aa8a14 100644 --- a/src/app/connection.rs +++ b/src/app/connection.rs @@ -37,6 +37,8 @@ where C: Send + 'static, E: Packet, { + /// Construct a length-delimited codec capped by the application's buffer + /// capacity. fn new_length_codec(&self) -> LengthDelimitedCodec { LengthDelimitedCodec::builder() .max_frame_length(self.buffer_capacity) @@ -62,7 +64,7 @@ where .serialize(msg) .map_err(SendError::Serialize)?; let mut codec = self.new_length_codec(); - let mut framed = BytesMut::new(); + let mut framed = BytesMut::with_capacity(bytes.len() + 4); codec .encode(bytes.into(), &mut framed) .map_err(|e| SendError::Io(io::Error::new(io::ErrorKind::InvalidData, e)))?; diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 97b6fa42..23182b42 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -12,12 +12,12 @@ use std::{ }; use bytes::BytesMut; -use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +use tokio_util::codec::{Decoder, Encoder}; use wireframe::{ app::{Envelope, Packet, PacketParts}, serializer::{BincodeSerializer, Serializer}, }; -use wireframe_testing::{TEST_MAX_FRAME, run_app, run_with_duplex_server}; +use wireframe_testing::{TEST_MAX_FRAME, new_test_codec, run_app, run_with_duplex_server}; type App = wireframe::app::WireframeApp; type BasicApp = wireframe::app::WireframeApp; @@ -147,10 +147,8 @@ async fn helpers_preserve_correlation_id_and_run_callbacks() { let bytes = BincodeSerializer .serialize(&env) .expect("failed to serialise envelope"); - let mut frame = BytesMut::new(); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(TEST_MAX_FRAME) - .new_codec(); + let mut frame = BytesMut::with_capacity(bytes.len() + 4); + let mut codec = new_test_codec(TEST_MAX_FRAME); codec .encode(bytes.into(), &mut frame) .expect("encode should succeed"); diff --git a/tests/middleware_order.rs b/tests/middleware_order.rs index 2c5b32bd..29ec397b 100644 --- a/tests/middleware_order.rs +++ b/tests/middleware_order.rs @@ -5,13 +5,13 @@ use async_trait::async_trait; use bytes::BytesMut; use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; -use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +use tokio_util::codec::{Decoder, Encoder}; use wireframe::{ app::{Envelope, Handler}, middleware::{HandlerService, Service, ServiceRequest, ServiceResponse, Transform}, serializer::{BincodeSerializer, Serializer}, }; -use wireframe_testing::TEST_MAX_FRAME; +use wireframe_testing::{TEST_MAX_FRAME, new_test_codec}; type TestApp = wireframe::app::WireframeApp; @@ -71,10 +71,8 @@ async fn middleware_applied_in_reverse_order() { let serializer = BincodeSerializer; let bytes = serializer.serialize(&env).expect("serialization failed"); // Use a length-delimited codec for framing - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(TEST_MAX_FRAME) - .new_codec(); - let mut buf = BytesMut::new(); + let mut codec = new_test_codec(TEST_MAX_FRAME); + let mut buf = BytesMut::with_capacity(bytes.len() + 4); codec .encode(bytes.into(), &mut buf) .expect("encoding failed"); diff --git a/tests/response.rs b/tests/response.rs index f3e52ae4..8683309b 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -15,7 +15,7 @@ use wireframe::{ message::Message, serializer::BincodeSerializer, }; -use wireframe_testing::run_app; +use wireframe_testing::{new_test_codec, run_app}; mod common; use common::TestApp; @@ -59,9 +59,7 @@ async fn send_response_encodes_and_frames() { .await .expect("send_response failed"); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(MAX_FRAME); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -78,9 +76,7 @@ async fn send_response_encodes_and_frames() { /// This ensures that the decoder waits for the full header before attempting to decode a frame. #[tokio::test] async fn length_prefixed_decode_requires_complete_header() { - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(MAX_FRAME); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes assert!(codec.decode(&mut buf).expect("decode failed").is_none()); assert_eq!(buf.len(), 3); // nothing consumed @@ -92,9 +88,7 @@ async fn length_prefixed_decode_requires_complete_header() { /// Confirms that the decoder leaves the incomplete body in the buffer until the full frame arrives. #[tokio::test] async fn length_prefixed_decode_requires_full_frame() { - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(MAX_FRAME); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); assert!(codec.decode(&mut buf).expect("decode failed").is_none()); // LengthDelimitedCodec consumes the length prefix even if the frame @@ -143,7 +137,7 @@ fn custom_length_roundtrip( } builder.max_frame_length(MAX_FRAME); let mut codec = builder.new_codec(); - let mut buf = BytesMut::new(); + let mut buf = BytesMut::with_capacity(frame.len() + prefix.len()); codec .encode(frame.clone().into(), &mut buf) .expect("encode failed"); @@ -227,9 +221,7 @@ async fn send_response_honours_buffer_capacity() { .await .expect("send_response failed"); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(16 * 1024 * 1024) - .new_codec(); + let mut codec = new_test_codec(16 * 1024 * 1024); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -253,10 +245,8 @@ async fn process_stream_honours_buffer_capacity() { let env = Envelope::new(1, None, payload.clone()); let bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(16 * 1024 * 1024) - .new_codec(); - let mut framed = BytesMut::new(); + let mut codec = new_test_codec(16 * 1024 * 1024); + let mut framed = BytesMut::with_capacity(bytes.len() + 4); codec .encode(bytes.into(), &mut framed) .expect("encode frame failed"); diff --git a/tests/routes.rs b/tests/routes.rs index a4147cf8..c725bebe 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -9,14 +9,14 @@ use std::sync::{ use bytes::BytesMut; use rstest::rstest; -use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +use tokio_util::codec::{Decoder, Encoder}; use wireframe::{ Serializer, app::{Packet, PacketParts}, message::Message, serializer::BincodeSerializer, }; -use wireframe_testing::{TEST_MAX_FRAME, drive_with_bincode, drive_with_frames}; +use wireframe_testing::{TEST_MAX_FRAME, drive_with_bincode, drive_with_frames, new_test_codec}; type TestApp = wireframe::app::WireframeApp; @@ -82,9 +82,7 @@ async fn handler_receives_message_and_echoes_response() { .await .expect("drive_with_bincode failed"); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(TEST_MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -118,9 +116,7 @@ async fn handler_echoes_with_none_correlation_id() { }; let out = drive_with_bincode(app, env).await.expect("drive failed"); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(TEST_MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -146,9 +142,7 @@ async fn multiple_frames_processed_in_sequence() { ) .expect("route registration failed"); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(TEST_MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut encoded_frames = Vec::new(); for id in 1u8..=2 { let msg_bytes = Echo(id).to_bytes().expect("encode failed"); @@ -218,9 +212,7 @@ async fn single_frame_propagates_correlation_id(#[case] cid: Option) { let env_bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); let mut framed = BytesMut::with_capacity(env_bytes.len() + 4); - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(TEST_MAX_FRAME) - .new_codec(); + let mut codec = new_test_codec(TEST_MAX_FRAME); codec .encode(env_bytes.into(), &mut framed) .expect("encode failed"); diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index ec942cd8..16c69497 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -95,6 +95,13 @@ pub(crate) const EMPTY_SERVER_CAPACITY: usize = 64; /// Shared frame cap used by helpers and tests to avoid drift. pub const TEST_MAX_FRAME: usize = DEFAULT_CAPACITY; +#[inline] +pub fn new_test_codec(max_len: usize) -> LengthDelimitedCodec { + let mut builder = LengthDelimitedCodec::builder(); + builder.max_frame_length(max_len); + builder.new_codec() +} + macro_rules! forward_default { ( $(#[$docs:meta])* $vis:vis fn $name:ident( @@ -349,10 +356,8 @@ where format!("bincode encode failed: {e}"), ) })?; - let mut codec = LengthDelimitedCodec::builder() - .max_frame_length(DEFAULT_CAPACITY) - .new_codec(); - let mut framed = BytesMut::new(); + let mut codec = new_test_codec(DEFAULT_CAPACITY); + let mut framed = BytesMut::with_capacity(bytes.len() + 4); codec.encode(bytes.into(), &mut framed)?; drive_with_frame(app, framed.to_vec()).await } diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 6efe3bec..4a118539 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -31,6 +31,7 @@ pub use helpers::{ drive_with_frames, drive_with_frames_mut, drive_with_frames_with_capacity, + new_test_codec, run_app, run_with_duplex_server, }; From 53e9a1320cccf031e4954d1ea844989a70df4b65 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 31 Aug 2025 14:06:11 +0100 Subject: [PATCH 10/10] Fix docs and tests after review --- README.md | 10 +++++----- docs/wireframe-client-design.md | 1 + tests/response.rs | 21 +++++++++++---------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index ccfa37f5..0a4e95cb 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ reduce this boilerplate through layered abstractions: These layers correspond to the architecture outlined in the design document【F:docs/rust-binary-router-library-design.md†L292-L344】. -## API Overview +## API overview Applications are configured using a builder pattern similar to Actix Web. A `WireframeApp` defines routes and middleware, while `WireframeServer` manages @@ -86,7 +86,7 @@ binary protocol server. See the [full example](docs/rust-binary-router-library-design.md#5-6-illustrative-api-usage-examples) in the design document for further details. -## Custom Envelopes +## Custom envelopes `WireframeApp` defaults to a simple `Envelope` containing a message ID and raw payload bytes. Applications can supply their own envelope type by calling @@ -194,7 +194,7 @@ impl WireframeProtocol for MySqlProtocolImpl { let app = WireframeApp::new().with_protocol(MySqlProtocolImpl); ``` -## Session Registry +## Session registry The \[`SessionRegistry`\] stores weak references to \[`PushHandle`\]s for active connections. Background tasks can look up a handle by \[`ConnectionId`\] @@ -220,7 +220,7 @@ fn on_connection_setup(&self, handle: PushHandle, _ctx: &mut Connection } ``` -## Custom Extractors +## Custom extractors Extractors are types that implement `FromMessageRequest`. When a handler lists an extractor as a parameter, `wireframe` automatically constructs it using the @@ -311,7 +311,7 @@ $ cargo run --example ping_pong $ printf '\x00\x00\x00\x08\x01\x00\x00\x00\x2a\x00\x00\x00' | nc 127.0.0.1 7878 | xxd ``` -## Current Limitations +## Current limitations Connection handling now processes frames and routes messages. Although the server is still experimental, it now compiles in release mode for evaluation or diff --git a/docs/wireframe-client-design.md b/docs/wireframe-client-design.md index df7afda5..34f0d981 100644 --- a/docs/wireframe-client-design.md +++ b/docs/wireframe-client-design.md @@ -95,5 +95,6 @@ extensions might include: By leveraging the existing abstractions for framing and serialization, client support can share most of the server’s implementation while providing a small ergonomic API. + [^1]: See [wireframe router design](rust-binary-router-library-design.md#implementation-details). diff --git a/tests/response.rs b/tests/response.rs index 8683309b..a4dac65a 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -15,12 +15,13 @@ use wireframe::{ message::Message, serializer::BincodeSerializer, }; -use wireframe_testing::{new_test_codec, run_app}; +use wireframe_testing::{TEST_MAX_FRAME, new_test_codec, run_app}; mod common; use common::TestApp; -const MAX_FRAME: usize = 16 * 1024 * 1024; +// Larger cap used for oversized frame tests. +const LARGE_FRAME: usize = 16 * 1024 * 1024; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestResp(u32); @@ -59,7 +60,7 @@ async fn send_response_encodes_and_frames() { .await .expect("send_response failed"); - let mut codec = new_test_codec(MAX_FRAME); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -76,7 +77,7 @@ async fn send_response_encodes_and_frames() { /// This ensures that the decoder waits for the full header before attempting to decode a frame. #[tokio::test] async fn length_prefixed_decode_requires_complete_header() { - let mut codec = new_test_codec(MAX_FRAME); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00][..]); // only 3 bytes assert!(codec.decode(&mut buf).expect("decode failed").is_none()); assert_eq!(buf.len(), 3); // nothing consumed @@ -88,7 +89,7 @@ async fn length_prefixed_decode_requires_complete_header() { /// Confirms that the decoder leaves the incomplete body in the buffer until the full frame arrives. #[tokio::test] async fn length_prefixed_decode_requires_full_frame() { - let mut codec = new_test_codec(MAX_FRAME); + let mut codec = new_test_codec(TEST_MAX_FRAME); let mut buf = BytesMut::from(&[0x00, 0x00, 0x00, 0x05, 0x01, 0x02][..]); assert!(codec.decode(&mut buf).expect("decode failed").is_none()); // LengthDelimitedCodec consumes the length prefix even if the frame @@ -135,7 +136,7 @@ fn custom_length_roundtrip( if fmt.endianness() == Endianness::Little { builder.little_endian(); } - builder.max_frame_length(MAX_FRAME); + builder.max_frame_length(TEST_MAX_FRAME); let mut codec = builder.new_codec(); let mut buf = BytesMut::with_capacity(frame.len() + prefix.len()); codec @@ -211,7 +212,7 @@ async fn send_response_returns_encode_error() { async fn send_response_honours_buffer_capacity() { let app = TestApp::new() .expect("failed to create app") - .buffer_capacity(16 * 1024 * 1024); + .buffer_capacity(LARGE_FRAME); let payload = vec![0_u8; 9 * 1024 * 1024]; let large = Large(payload.clone()); @@ -221,7 +222,7 @@ async fn send_response_honours_buffer_capacity() { .await .expect("send_response failed"); - let mut codec = new_test_codec(16 * 1024 * 1024); + let mut codec = new_test_codec(LARGE_FRAME); let mut buf = BytesMut::from(&out[..]); let frame = codec .decode(&mut buf) @@ -237,7 +238,7 @@ async fn send_response_honours_buffer_capacity() { async fn process_stream_honours_buffer_capacity() { let app = TestApp::new() .expect("failed to create app") - .buffer_capacity(16 * 1024 * 1024) + .buffer_capacity(LARGE_FRAME) .route(1, Arc::new(|_: &Envelope| Box::pin(async {}))) .expect("route registration failed"); @@ -245,7 +246,7 @@ async fn process_stream_honours_buffer_capacity() { let env = Envelope::new(1, None, payload.clone()); let bytes = BincodeSerializer.serialize(&env).expect("serialize failed"); - let mut codec = new_test_codec(16 * 1024 * 1024); + let mut codec = new_test_codec(LARGE_FRAME); let mut framed = BytesMut::with_capacity(bytes.len() + 4); codec .encode(bytes.into(), &mut framed)