diff --git a/Cargo.lock b/Cargo.lock index 91bccf75..9f7d4680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3009,6 +3009,7 @@ dependencies = [ "tracing", "tracing-subscriber", "tracing-test", + "wireframe", "wireframe_testing", ] diff --git a/Cargo.toml b/Cargo.toml index 130df8b1..0c01ddb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ socket2 = "0.6.0" [dev-dependencies] rstest = "0.26.1" +wireframe = { path = ".", features = ["test-helpers"] } wireframe_testing = { path = "./wireframe_testing" } logtest = "2.0.0" proptest = "1.7.0" @@ -85,6 +86,7 @@ advanced-tests = [] examples = [] cucumber-tests = [] test-support = [] +test-helpers = [] [lints.clippy] pedantic = { level = "warn", priority = -1 } diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 37e8efa9..5d0b38d1 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -146,6 +146,53 @@ At a minimum, the hook must allow a protocol to provide: Wireframe will provide, centrally: +- a stable hook trait and header model for protocol crates. The initial public + surface is: + +```rust +use wireframe::message_assembler::{ + ContinuationFrameHeader, + FrameHeader, + FrameSequence, + FirstFrameHeader, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; + +pub trait MessageAssembler: Send + Sync + 'static { + fn parse_frame_header( + &self, + payload: &[u8], + ) -> Result; +} + +pub struct ParsedFrameHeader { + header: FrameHeader, + header_len: usize, +} + +pub enum FrameHeader { + First(FirstFrameHeader), + Continuation(ContinuationFrameHeader), +} + +pub struct FirstFrameHeader { + pub message_key: MessageKey, + pub metadata_len: usize, + pub body_len: usize, + pub total_body_len: Option, + pub is_last: bool, +} + +pub struct ContinuationFrameHeader { + pub message_key: MessageKey, + pub sequence: Option, + pub body_len: usize, + pub is_last: bool, +} +``` + - buffering and assembly state management; - enforcement of maximum total message size, maximum fragment size, timeouts, and “max in-flight bytes”; and diff --git a/docs/execplans/8-2-1-message-assembler-hook-trait.md b/docs/execplans/8-2-1-message-assembler-hook-trait.md new file mode 100644 index 00000000..73dd3279 --- /dev/null +++ b/docs/execplans/8-2-1-message-assembler-hook-trait.md @@ -0,0 +1,307 @@ +# MessageAssembler hook trait and per-frame header parsing + +This ExecPlan is a living document. The sections `Progress`, +`Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must +be kept up to date as work proceeds. + +No `PLANS.md` exists in this repository as of 2026-01-04. + +## Purpose / Big Picture + +Wireframe needs a protocol-facing hook for multi-frame request assembly and a +shared header model that distinguishes “first frame” from “continuation frame”. +This work delivers the `MessageAssembler` trait and the first version of header +parsing types so protocol crates can implement consistent parsing and Wireframe +can later apply shared buffering and assembly logic. + +Success is observable when: + +- The public `MessageAssembler` trait and frame header types are available in + `wireframe::message_assembler` and re-exported in `wireframe`. +- A protocol can implement `MessageAssembler` to parse a simple header into a + `FrameHeader::First` or `FrameHeader::Continuation` value. +- Unit tests cover first/continuation parsing, short-header errors, and header + length accounting. +- A Cucumber feature validates the same behaviours via the behavioural test + harness. +- `docs/users-guide.md` explains the new public interface and how to configure + it (including any “not yet wired into the connection path” caveats). +- The relevant design docs capture the interface decisions. +- `docs/roadmap.md` marks 8.2.1 and 8.2.2 as done once the change lands. + +## Progress + +- [x] (2026-01-04 00:00Z) Draft ExecPlan for 8.2.1 and 8.2.2. +- [x] (2026-01-04 01:00Z) Define `MessageAssembler` trait and header types under + `src/message_assembler/` with module-level docs and examples. +- [x] (2026-01-04 01:00Z) Add builder support to store an optional assembler in + `src/app/builder.rs`. +- [x] (2026-01-04 01:05Z) Add unit tests for header parsing and error handling. +- [x] (2026-01-04 01:10Z) Add Cucumber feature, world, and steps for message + assembler parsing. +- [x] (2026-01-04 01:15Z) Update design docs, user guide, and roadmap + checkboxes. +- [x] (2026-01-04 02:05Z) Run formatting, linting, and test gates with `make` + targets. + +## Surprises & Discoveries + +- Observation: Streaming request primitives exist (`src/request/mod.rs` and + `src/extractor.rs`), but the inbound pipeline in `src/app/connection.rs` does + not yet invoke a message-assembly hook. This plan keeps integration scoped to + 8.2.1/8.2.2 and documents the limitation in the user guide until 8.2.5. + Evidence: `WireframeApp::handle_frame` decodes to `Envelope` and dispatches + directly to handlers after fragmentation reassembly. + +## Decision Log + +- Decision: Introduce a dedicated public module + `src/message_assembler/` for the hook trait and header types. Rationale: + Keeps assembly concerns distinct from `hooks` and allows clear documentation + of the protocol-facing interface. Date/Author: 2026-01-04 (Codex). +- Decision: Model frame headers with a public `FrameHeader` enum and + `FirstFrameHeader`/`ContinuationFrameHeader` structs plus newtypes for + `MessageKey` and optional `FrameSequence`. Rationale: Avoids integer soup and + aligns with the ADR’s required header fields while leaving room for + 8.2.3/8.2.4. Date/Author: 2026-01-04 (Codex). +- Decision: Use `std::io::Error` for parse failures in the trait method. + Rationale: Matches the existing framing/codec error model and keeps the trait + object-safe without adding another type parameter to `WireframeApp`. + Date/Author: 2026-01-04 (Codex). + +## Outcomes & Retrospective + +Not started yet. + +## Context and Orientation + +Wireframe’s inbound path decodes frames into `Envelope` values inside +`src/app/connection.rs`, then optionally applies transport fragmentation +reassembly via `src/app/fragmentation_state.rs` and +`frame_handling::reassemble_if_needed`. Handlers receive buffered payloads +through `ServiceRequest` and `PacketParts` in `src/middleware.rs`. + +Streaming request types (`RequestParts`, `RequestBodyStream`, and the +`StreamingBody` extractor) live in `src/request/mod.rs` and `src/extractor.rs`, +but they are not yet wired into the connection loop. The `MessageAssembler` +hook introduced here is the protocol-facing interface for that forthcoming +integration (see Architecture Decision Record (ADR) 0002). + +Key references: + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` + (authoritative requirements for `MessageAssembler`). +- `docs/generic-message-fragmentation-and-re-assembly-design.md` section 9 + (composition order and memory budgeting). +- `docs/multi-packet-and-streaming-responses-design.md` section 11.4 + (MessageAssembler composition narrative). +- `docs/the-road-to-wireframe-1-0-feature-set-` + `philosophy-and-capability-maturity.md` section “Protocol-level message + assembly”. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` (resource + limits and failure semantics). +- Testing guidance: `docs/rust-testing-with-rstest-fixtures.md`, + `docs/behavioural-testing-in-rust-with-cucumber.md`, + `docs/reliable-testing-in-rust-via-dependency-injection.md`, and + `docs/rust-doctest-dry-guide.md`. + +## Plan of Work + +Start by adding a new `message_assembler` module that defines the +`MessageAssembler` trait, header types, and newtypes for message keys and +(optional) sequence indices. Document the trait with examples that show how to +parse a header using `bytes::Buf` rather than indexing. Next, wire the builder +so applications can register an assembler via +`WireframeApp::with_message_assembler` (and access it via +`message_assembler()`), even though the runtime integration lands in 8.2.5. +Then add unit tests for parsing behaviours and error handling using a +lightweight test assembler. Add a Cucumber feature that asserts the same +behaviours through the behavioural harness. Finally, update the design and user +documentation, and mark 8.2.1/8.2.2 as done in the roadmap. + +## Concrete Steps + +1. Read the core design documents listed above to confirm required header + fields and the intended composition order. + +2. Create `src/message_assembler/` with: + + - `src/message_assembler/mod.rs` containing the public API and module-level + `//!` documentation. + - `src/message_assembler/header.rs` defining `FrameHeader`, + `FirstFrameHeader`, and `ContinuationFrameHeader` along with newtypes such + as `MessageKey` and `FrameSequence`. + - `src/message_assembler/error.rs` only if a shared error type is needed for + examples or tests; otherwise, rely on `std::io::Error`. + + Keep each file below 400 lines and add doc examples for public items. + +3. Add `pub mod message_assembler;` to `src/lib.rs`, and re-export the new + trait and header types for a stable public surface. + +4. Update `src/app/builder.rs`: + + - Add a `message_assembler: Option>` field. + - Thread the field through `Default`, `rebuild_with_params`, and any + builder transitions. + - Add `with_message_assembler(...)` and `message_assembler()` methods. + +5. Implement unit tests (suggested location: + `src/message_assembler/tests.rs`): + + - Build a small `TestMessageAssembler` that parses a simple header format + (one byte “kind” tag plus fixed-width fields for key and lengths). + - Validate first-frame parsing returns `FrameHeader::First` with correct + metadata length, body length, and header byte count. + - Validate continuation parsing returns `FrameHeader::Continuation`. + - Validate short/invalid headers return `io::ErrorKind::InvalidData`. + - Avoid indexing and use `bytes::Buf` for parsing to satisfy clippy. + +6. Add behavioural tests: + + - `tests/features/message_assembler.feature` with scenarios for first-frame + and continuation parsing. + - `tests/worlds/message_assembler.rs` storing the parsed header and errors. + - `tests/steps/message_assembler_steps.rs` with Given/When/Then steps. + - Register the world in `tests/worlds/mod.rs`, `tests/world.rs`, and run it + from `tests/cucumber.rs`. Add the steps module in `tests/steps/mod.rs`. + +7. Update documentation: + + - `docs/users-guide.md`: describe the new `MessageAssembler` trait, how to + configure it via `WireframeApp`, and call out that runtime integration + into the inbound pipeline arrives in 8.2.5. + - `docs/adr-002-streaming-requests-and-shared-message-assembly.md`: record + the concrete trait signature and header model decisions. + - `docs/generic-message-fragmentation-and-re-assembly-design.md` and + `docs/multi-packet-and-streaming-responses-design.md`: add references to + the new API surface. + - `docs/the-road-to-wireframe-1-0-feature-set-` + `philosophy-and-capability-maturity.md`: note the concrete hook trait + surface. + +8. Update `docs/roadmap.md` to mark 8.2.1 and 8.2.2 as done when all above + changes are complete. + +## Validation and Acceptance + +Acceptance requires all of the following: + +- New public `MessageAssembler` trait and header types compile and are + documented with examples. +- Unit tests cover parsing for first and continuation frames and error cases. +- Cucumber behavioural tests for message assembler parsing pass. +- Design docs, user guide, and roadmap reflect the new interface. + +Run validation from the repository root (use `tee` to capture full output): + + set -o pipefail + timeout 300 make fmt 2>&1 | tee /tmp/wireframe-fmt.log + echo "fmt exit: $?" + + set -o pipefail + timeout 300 make markdownlint 2>&1 | tee /tmp/wireframe-markdownlint.log + echo "markdownlint exit: $?" + + set -o pipefail + timeout 300 make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log + echo "check-fmt exit: $?" + + set -o pipefail + timeout 300 make lint 2>&1 | tee /tmp/wireframe-lint.log + echo "lint exit: $?" + + set -o pipefail + timeout 300 make test 2>&1 | tee /tmp/wireframe-test.log + echo "test exit: $?" + +If Mermaid diagrams are edited or added, also run: + + set -o pipefail + timeout 300 make nixie 2>&1 | tee /tmp/wireframe-nixie.log + echo "nixie exit: $?" + +## Idempotence and Recovery + +All steps are additive and can be re-run safely. If a step fails, fix the +underlying issue and re-run only the affected command(s). Use the `tee` outputs +to locate the failure before retrying. Avoid destructive commands; if a local +change needs to be backed out, revert only the specific files edited for this +feature. + +## Artifacts and Notes + +Expected artefacts after completion: + +- `src/message_assembler/` module with trait, header types, and tests. +- New Cucumber feature and world files under `tests/features/` and + `tests/worlds/`. +- Updated design docs and user guide entries. + +## Interfaces and Dependencies + +At the end of this work, the following public interfaces must exist and be +re-exported from `src/lib.rs`: + +- Module `wireframe::message_assembler` with a module-level `//!` comment. +- Newtypes: + + pub struct MessageKey(pub u64); + pub struct FrameSequence(pub u32); + + Include `From`/`From` and `Copy`, `Clone`, `Debug`, `Eq`, and + `Hash`. + +- Header model: + + pub enum FrameHeader { + First(FirstFrameHeader), + Continuation(ContinuationFrameHeader), + } + + pub struct FirstFrameHeader { + pub message_key: MessageKey, + pub metadata_len: usize, + pub body_len: usize, + pub total_body_len: Option, + pub is_last: bool, + } + + pub struct ContinuationFrameHeader { + pub message_key: MessageKey, + pub sequence: Option, + pub body_len: usize, + pub is_last: bool, + } + + If header length accounting is needed for slicing, add a + `ParsedFrameHeader { header_len, header }` wrapper. + +- Hook trait: + + pub trait MessageAssembler: Send + Sync + 'static { + fn parse_frame_header( + &self, + payload: &[u8], + ) -> Result; + } + +- Builder configuration: + + impl WireframeApp { + pub fn with_message_assembler( + self, + assembler: impl MessageAssembler + 'static, + ) -> Self; + + pub fn message_assembler(&self) -> Option>; + } + +If the builder uses `Arc`, add the required `use` and +thread it through `rebuild_with_params` so type transitions preserve the +configured assembler. + +## Revision note + +Add a revision note only when this ExecPlan is updated after implementation +work has started. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 6996ffff..0126b1c3 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -460,6 +460,12 @@ This ordering ensures protocol crates migrating to `MessageAssembler` do not need to special-case transport fragments: the assembler only sees complete packet payloads and focuses on protocol continuity rules. +Wireframe exposes the hook surface as +`wireframe::message_assembler::MessageAssembler`, and applications register an +implementation via `WireframeApp::with_message_assembler`. The hook is stored +on the builder today and integrated into the inbound pipeline in roadmap item +8.2.5. + ### 9.3 Memory budget integration Per-connection memory budgets apply across both layers. Budgets cover: diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 487d1ef2..a88fbdb8 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -440,7 +440,9 @@ not hang. protocol crates to supply protocol-specific parsing and continuity rules while Wireframe provides shared buffering machinery and limit enforcement. See the [fragmentation design][frag-design] for how transport-level - fragmentation and protocol-level assembly compose. + fragmentation and protocol-level assembly compose. The hook is surfaced as + `wireframe::message_assembler::MessageAssembler` and registered via + `WireframeApp::with_message_assembler`. [frag-design]: generic-message-fragmentation-and-re-assembly-design.md diff --git a/docs/roadmap.md b/docs/roadmap.md index f65851b2..e5c9bd1d 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -277,9 +277,9 @@ and standardized per-connection memory budgets. ### 8.2. MessageAssembler abstraction -- [ ] 8.2.1. Define a `MessageAssembler` hook trait for protocol-specific +- [x] 8.2.1. Define a `MessageAssembler` hook trait for protocol-specific multi-frame parsing. -- [ ] 8.2.2. Implement per-frame header parsing with "first frame" versus +- [x] 8.2.2. Implement per-frame header parsing with "first frame" versus "continuation frame" handling. - [ ] 8.2.3. Add message key support for multiplexing interleaved assemblies. - [ ] 8.2.4. Implement continuity validation (ordering, missing frames, and diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 8bdb5c1d..ee3be6ee 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -217,6 +217,8 @@ The `MessageAssembler` hook allows a protocol to provide: Wireframe provides, centrally: +- a stable hook trait and header model exposed as + `wireframe::message_assembler::MessageAssembler`; - buffering and assembly state management; - enforcement of maximum total message size, maximum fragment size, timeouts, and "max in-flight bytes"; and diff --git a/docs/users-guide.md b/docs/users-guide.md index af9ec6c0..4f19247c 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -282,6 +282,59 @@ reconstruction), `RequestParts` carries only protocol-defined metadata required to interpret the streaming body. The body itself is consumed through a separate stream, enabling back-pressure and incremental processing.[^46] +### Message assembler hook + +Wireframe exposes a protocol-facing `MessageAssembler` hook that parses +per-frame headers into `FrameHeader::First` and `FrameHeader::Continuation` +values. It returns a `ParsedFrameHeader` that includes the header length so the +remaining bytes can be treated as the body chunk. + +Register an assembler with `WireframeApp::with_message_assembler`: + +```rust,no_run +use wireframe::{ + app::WireframeApp, + message_assembler::{ + FrameHeader, + FirstFrameHeader, + MessageAssembler, + MessageKey, + ParsedFrameHeader, + }, +}; + +struct DemoAssembler; + +impl MessageAssembler for DemoAssembler { + fn parse_frame_header( + &self, + _payload: &[u8], + ) -> Result { + Ok(ParsedFrameHeader::new( + FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(1), + metadata_len: 0, + body_len: 0, + total_body_len: None, + is_last: true, + }), + 0, + )) + } +} + +let _app = WireframeApp::new() + .expect("builder") + .with_message_assembler(DemoAssembler); +``` + +Note: the hook is stored on the application today but is wired into the inbound +connection path in roadmap item 8.2.5. Until that integration lands, protocol +crates can use the trait for shared parsing logic and tests. + +`WireframeApp::message_assembler` returns the configured hook as an +`Option<&Arc>` if you need to access it directly. + ### Streaming request body consumption Handlers can opt into streaming request bodies using the `StreamingBody` diff --git a/src/app/builder.rs b/src/app/builder.rs index 294fc8fa..0696f6d9 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -23,6 +23,7 @@ use crate::{ codec::{FrameCodec, LengthDelimitedFrameCodec, clamp_frame_length}, fragment::FragmentationConfig, hooks::{ProtocolHooks, WireframeProtocol}, + message_assembler::MessageAssembler, middleware::HandlerService, serializer::{BincodeSerializer, Serializer}, }; @@ -50,6 +51,7 @@ pub struct WireframeApp< pub(super) codec: F, pub(super) read_timeout_ms: u64, pub(super) fragmentation: Option, + pub(super) message_assembler: Option>, } impl Default for WireframeApp @@ -77,6 +79,7 @@ where codec, read_timeout_ms: 100, fragmentation: default_fragmentation(max_frame_length), + message_assembler: None, } } } @@ -138,6 +141,7 @@ where codec: F2, protocol: Option>>, fragmentation: Option, + message_assembler: Option>, ) -> WireframeApp where S2: Serializer + Send + Sync, @@ -156,6 +160,7 @@ where codec, read_timeout_ms: self.read_timeout_ms, fragmentation, + message_assembler, } } @@ -171,7 +176,8 @@ where { let fragmentation = default_fragmentation(codec.max_frame_length()); let serializer = std::mem::take(&mut self.serializer); - self.rebuild_with_params(serializer, codec, None, fragmentation) + let message_assembler = self.message_assembler.take(); + self.rebuild_with_params(serializer, codec, None, fragmentation, message_assembler) } /// Register a route that maps `id` to `handler`. @@ -257,6 +263,7 @@ where codec: self.codec, read_timeout_ms: self.read_timeout_ms, fragmentation: self.fragmentation, + message_assembler: self.message_assembler, }) } @@ -294,6 +301,74 @@ where } } + /// Install a [`MessageAssembler`] implementation. + /// + /// The assembler parses protocol-specific frame headers to support + /// multi-frame request assembly once the inbound pipeline integrates it. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::{ + /// app::WireframeApp, + /// message_assembler::{MessageAssembler, ParsedFrameHeader}, + /// }; + /// + /// struct DemoAssembler; + /// + /// impl MessageAssembler for DemoAssembler { + /// fn parse_frame_header(&self, _payload: &[u8]) -> Result { + /// Err(std::io::Error::new( + /// std::io::ErrorKind::InvalidData, + /// "unimplemented", + /// )) + /// } + /// } + /// + /// let app = WireframeApp::new() + /// .expect("builder") + /// .with_message_assembler(DemoAssembler); + /// let _ = app; + /// ``` + #[must_use] + pub fn with_message_assembler(self, assembler: impl MessageAssembler + 'static) -> Self { + WireframeApp { + message_assembler: Some(Arc::new(assembler)), + ..self + } + } + + /// Get the configured message assembler, if any. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::{ + /// app::WireframeApp, + /// message_assembler::{MessageAssembler, ParsedFrameHeader}, + /// }; + /// + /// struct DemoAssembler; + /// + /// impl MessageAssembler for DemoAssembler { + /// fn parse_frame_header(&self, _payload: &[u8]) -> Result { + /// Err(std::io::Error::new( + /// std::io::ErrorKind::InvalidData, + /// "unimplemented", + /// )) + /// } + /// } + /// + /// let app = WireframeApp::new() + /// .expect("builder") + /// .with_message_assembler(DemoAssembler); + /// assert!(app.message_assembler().is_some()); + /// ``` + #[must_use] + pub fn message_assembler(&self) -> Option<&Arc> { + self.message_assembler.as_ref() + } + /// Configure a Dead Letter Queue for dropped push frames. /// /// ```rust,no_run @@ -348,7 +423,14 @@ where let codec = std::mem::take(&mut self.codec); let protocol = self.protocol.take(); let fragmentation = self.fragmentation.take(); - self.rebuild_with_params(serializer, codec, protocol, fragmentation) + let message_assembler = self.message_assembler.take(); + self.rebuild_with_params( + serializer, + codec, + protocol, + fragmentation, + message_assembler, + ) } /// Configure the read timeout in milliseconds. diff --git a/src/lib.rs b/src/lib.rs index 6cb4355b..38e03cb8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ //! This crate provides building blocks for asynchronous binary protocol //! servers, including routing, middleware, and connection utilities. +extern crate self as wireframe; + pub mod app; pub mod byte_order; pub mod codec; @@ -22,6 +24,7 @@ pub mod fragment; pub mod frame; pub mod hooks; pub mod message; +pub mod message_assembler; pub mod metrics; pub mod middleware; pub mod panic; @@ -33,6 +36,8 @@ pub mod rewind_stream; #[cfg(not(loom))] pub mod server; pub mod session; +#[cfg(any(test, feature = "test-helpers"))] +pub mod test_helpers; pub use client::{ClientCodecConfig, ClientError, SocketOptions, WireframeClient}; pub use connection::ConnectionActor; @@ -58,6 +63,15 @@ pub use fragment::{ fragment_overhead, }; pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}; +pub use message_assembler::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; pub use metrics::{CONNECTIONS_ACTIVE, Direction, ERRORS_TOTAL, FRAMES_PROCESSED}; pub use request::{ DEFAULT_BODY_CHANNEL_CAPACITY, diff --git a/src/message_assembler/header.rs b/src/message_assembler/header.rs new file mode 100644 index 00000000..c10f0740 --- /dev/null +++ b/src/message_assembler/header.rs @@ -0,0 +1,185 @@ +//! Header model for protocol-level message assembly. +//! +//! These types describe per-frame metadata required to reassemble multi-frame +//! requests. Protocol crates populate them via `MessageAssembler`. + +use std::fmt; + +/// Identifies a logical message being assembled from multiple frames. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct MessageKey(pub u64); + +impl From for MessageKey { + fn from(value: u64) -> Self { Self(value) } +} + +impl From for u64 { + fn from(value: MessageKey) -> Self { value.0 } +} + +impl fmt::Display for MessageKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } +} + +/// Optional sequence number for continuation frames. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct FrameSequence(pub u32); + +impl From for FrameSequence { + fn from(value: u32) -> Self { Self(value) } +} + +impl From for u32 { + fn from(value: FrameSequence) -> Self { value.0 } +} + +impl fmt::Display for FrameSequence { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } +} + +/// Parsed per-frame header information. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum FrameHeader { + /// Header describing the first frame in a multi-frame request. + First(FirstFrameHeader), + /// Header describing a continuation frame. + Continuation(ContinuationFrameHeader), +} + +/// Header metadata for the first frame in a message series. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct FirstFrameHeader { + /// Key used to correlate frames belonging to the same message. + pub message_key: MessageKey, + /// Protocol-specific metadata length in bytes. + pub metadata_len: usize, + /// Length of the body bytes in this frame. + pub body_len: usize, + /// Total expected body length across all frames, if declared by the protocol. + pub total_body_len: Option, + /// Whether this frame completes the message. + pub is_last: bool, +} + +/// Header metadata for continuation frames. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ContinuationFrameHeader { + /// Key used to correlate frames belonging to the same message. + pub message_key: MessageKey, + /// Optional frame sequence index, when supplied by the protocol. + pub sequence: Option, + /// Length of the body bytes in this frame. + pub body_len: usize, + /// Whether this frame completes the message. + pub is_last: bool, +} + +/// Result of parsing a frame header from payload bytes. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ParsedFrameHeader { + header: FrameHeader, + header_len: usize, +} + +impl ParsedFrameHeader { + /// Create a parsed header with its byte length. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(1), + /// metadata_len: 0, + /// body_len: 4, + /// total_body_len: None, + /// is_last: true, + /// }); + /// + /// let parsed = ParsedFrameHeader::new(header, 12); + /// assert_eq!(parsed.header_len(), 12); + /// ``` + #[must_use] + pub const fn new(header: FrameHeader, header_len: usize) -> Self { Self { header, header_len } } + + /// Return the parsed header information. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(7), + /// metadata_len: 0, + /// body_len: 1, + /// total_body_len: None, + /// is_last: true, + /// }); + /// let parsed = ParsedFrameHeader::new(header, 4); + /// assert!(matches!(parsed.header(), FrameHeader::First(_))); + /// ``` + #[must_use] + pub const fn header(&self) -> &FrameHeader { &self.header } + + /// Return the number of bytes consumed by the header. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(3), + /// metadata_len: 0, + /// body_len: 0, + /// total_body_len: None, + /// is_last: true, + /// }); + /// let parsed = ParsedFrameHeader::new(header, 12); + /// assert_eq!(parsed.header_len(), 12); + /// ``` + #[must_use] + pub const fn header_len(&self) -> usize { self.header_len } + + /// Consume the parsed header and return the underlying header value. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(1), + /// metadata_len: 0, + /// body_len: 0, + /// total_body_len: None, + /// is_last: true, + /// }); + /// let parsed = ParsedFrameHeader::new(header, 0); + /// assert!(matches!(parsed.into_header(), FrameHeader::First(_))); + /// ``` + #[must_use] + pub fn into_header(self) -> FrameHeader { self.header } +} diff --git a/src/message_assembler/mod.rs b/src/message_assembler/mod.rs new file mode 100644 index 00000000..fc26c014 --- /dev/null +++ b/src/message_assembler/mod.rs @@ -0,0 +1,101 @@ +//! Protocol-level message assembly hooks. +//! +//! A `MessageAssembler` parses protocol-specific frame headers to distinguish +//! "first" frames from "continuation" frames. The hook operates above +//! transport fragmentation and feeds the streaming request pipeline once the +//! connection actor integrates it (see ADR 0002). + +mod header; + +use std::io; + +pub use header::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageKey, + ParsedFrameHeader, +}; + +/// Hook trait for protocol-specific multi-frame request parsing. +/// +/// Implementations should parse only the per-frame header and return the +/// parsed header plus the number of bytes consumed. The remaining bytes are +/// treated as the frame's body chunk. +/// +/// # Examples +/// +/// ```rust,no_run +/// use bytes::Buf; +/// use wireframe::message_assembler::{ +/// FirstFrameHeader, +/// FrameHeader, +/// MessageAssembler, +/// MessageKey, +/// ParsedFrameHeader, +/// }; +/// +/// struct DemoAssembler; +/// +/// impl MessageAssembler for DemoAssembler { +/// fn parse_frame_header(&self, payload: &[u8]) -> Result { +/// let mut buf = payload; +/// if buf.remaining() < 9 { +/// return Err(std::io::Error::new( +/// std::io::ErrorKind::InvalidData, +/// "header too short", +/// )); +/// } +/// +/// let tag = buf.get_u8(); +/// let key = MessageKey::from(buf.get_u64()); +/// let header = match tag { +/// 0x01 => FrameHeader::First(FirstFrameHeader { +/// message_key: key, +/// metadata_len: 0, +/// body_len: buf.remaining(), +/// total_body_len: None, +/// is_last: true, +/// }), +/// _ => { +/// return Err(std::io::Error::new( +/// std::io::ErrorKind::InvalidData, +/// "unknown header tag", +/// )); +/// } +/// }; +/// +/// let header_len = payload.len() - buf.remaining(); +/// Ok(ParsedFrameHeader::new(header, header_len)) +/// } +/// } +/// ``` +pub trait MessageAssembler: Send + Sync + 'static { + /// Parse a protocol header from the provided payload bytes. + /// + /// # Errors + /// + /// Returns an `io::Error` when the header is malformed or incomplete. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::message_assembler::{MessageAssembler, ParsedFrameHeader}; + /// + /// struct Demo; + /// + /// impl MessageAssembler for Demo { + /// fn parse_frame_header(&self, _payload: &[u8]) -> Result { + /// Err(std::io::Error::new( + /// std::io::ErrorKind::InvalidData, + /// "header not implemented", + /// )) + /// } + /// } + /// ``` + fn parse_frame_header(&self, payload: &[u8]) -> Result; +} + +#[cfg(test)] +mod tests; diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs new file mode 100644 index 00000000..1ea199eb --- /dev/null +++ b/src/message_assembler/tests.rs @@ -0,0 +1,189 @@ +//! Unit tests for message assembler header parsing. + +use std::io; + +use bytes::{BufMut, BytesMut}; +use rstest::rstest; + +use super::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; +use crate::test_helpers::TestAssembler; + +#[rstest] +#[case::first_frame_without_total( + "first frame without total", + build_first_header_payload(FirstHeaderSpec { + flags: 0b0, + message_key: 9, + metadata_len: 2, + body_len: 12, + total_body_len: None, + }), + FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(9), + metadata_len: 2, + body_len: 12, + total_body_len: None, + is_last: false, + }), +)] +#[case::first_frame_with_total_and_last( + "first frame with total and last", + build_first_header_payload(FirstHeaderSpec { + flags: 0b11, + message_key: 42, + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + }), + FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(42), + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + is_last: true, + }), +)] +#[case::continuation_frame_with_sequence( + "continuation frame with sequence", + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b10, + message_key: 7, + body_len: 16, + sequence: Some(3), + }), + FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(7), + sequence: Some(FrameSequence(3)), + body_len: 16, + is_last: false, + }), +)] +#[case::continuation_frame_without_sequence( + "continuation frame without sequence", + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b1, + message_key: 11, + body_len: 5, + sequence: None, + }), + FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(11), + sequence: None, + body_len: 5, + is_last: true, + }), +)] +fn parse_frame_headers( + #[case] case_name: &'static str, + #[case] payload: Vec, + #[case] expected_header: FrameHeader, +) { + let parsed = parse_header(&payload); + assert_eq!(parsed.header(), &expected_header, "case: {case_name}"); + assert_eq!( + parsed.header_len(), + payload.len(), + "case (header-only payload): {case_name}" + ); + + let mut payload_with_body = payload.clone(); + payload_with_body.extend_from_slice(&[0xaa, 0xbb, 0xcc]); + + let parsed_with_body = parse_header(&payload_with_body); + assert_eq!( + parsed_with_body.header(), + &expected_header, + "case (with body bytes): {case_name}" + ); + assert_eq!( + parsed_with_body.header_len(), + payload.len(), + "case (with body bytes, header_len mismatch): {case_name}" + ); + assert!( + parsed_with_body.header_len() < payload_with_body.len(), + "case (with body bytes, header_len not less than payload.len()): {case_name}" + ); +} + +#[test] +fn short_header_errors() { + let payload = vec![0x01]; + let err = TestAssembler + .parse_frame_header(&payload) + .expect_err("expected error"); + + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + assert_eq!(err.to_string(), "header too short"); +} + +#[test] +fn unknown_header_kind_errors() { + let mut bytes = BytesMut::new(); + bytes.put_u8(0xff); + bytes.put_u8(0x00); + bytes.put_u64(0); + let payload = bytes.to_vec(); + let err = TestAssembler + .parse_frame_header(&payload) + .expect_err("expected error"); + + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + assert_eq!(err.to_string(), "unknown header kind"); +} + +#[derive(Clone, Copy)] +struct FirstHeaderSpec { + flags: u8, + message_key: u64, + metadata_len: u16, + body_len: u32, + total_body_len: Option, +} + +#[derive(Clone, Copy)] +struct ContinuationHeaderSpec { + flags: u8, + message_key: u64, + body_len: u32, + sequence: Option, +} + +fn build_first_header_payload(spec: FirstHeaderSpec) -> Vec { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x01); + bytes.put_u8(spec.flags); + bytes.put_u64(spec.message_key); + bytes.put_u16(spec.metadata_len); + bytes.put_u32(spec.body_len); + if let Some(total_body_len) = spec.total_body_len { + bytes.put_u32(total_body_len); + } + bytes.to_vec() +} + +fn build_continuation_header_payload(spec: ContinuationHeaderSpec) -> Vec { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x02); + bytes.put_u8(spec.flags); + bytes.put_u64(spec.message_key); + bytes.put_u32(spec.body_len); + if let Some(sequence) = spec.sequence { + bytes.put_u32(sequence); + } + bytes.to_vec() +} + +fn parse_header(payload: &[u8]) -> ParsedFrameHeader { + TestAssembler + .parse_frame_header(payload) + .expect("header parse") +} diff --git a/src/test_helpers.rs b/src/test_helpers.rs new file mode 100644 index 00000000..8fe5641a --- /dev/null +++ b/src/test_helpers.rs @@ -0,0 +1,115 @@ +#![cfg(any(test, feature = "test-helpers"))] +//! Test-only helpers for shared test utilities. + +use std::io; + +use bytes::Buf; + +use crate::message_assembler::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; + +/// Test-friendly message assembler implementation that shares parsing logic. +#[derive(Clone, Copy, Debug, Default)] +pub struct TestAssembler; + +impl MessageAssembler for TestAssembler { + fn parse_frame_header(&self, payload: &[u8]) -> Result { + parse_frame_header(payload) + } +} + +/// Parse a protocol-specific frame header for tests. +/// +/// # Errors +/// +/// Returns an error if the payload is too short or contains an invalid header. +pub fn parse_frame_header(payload: &[u8]) -> Result { + let mut buf = payload; + let initial = buf.remaining(); + + let kind = take_u8(&mut buf)?; + let flags = take_u8(&mut buf)?; + let message_key = MessageKey::from(take_u64(&mut buf)?); + + let header = match kind { + 0x01 => { + let metadata_len = usize::from(take_u16(&mut buf)?); + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let total_body_len = if flags & 0b10 == 0b10 { + Some( + usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("total length too large"))?, + ) + } else { + None + }; + + FrameHeader::First(FirstFrameHeader { + message_key, + metadata_len, + body_len, + total_body_len, + is_last: flags & 0b1 == 0b1, + }) + } + 0x02 => { + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let sequence = if flags & 0b10 == 0b10 { + Some(FrameSequence::from(take_u32(&mut buf)?)) + } else { + None + }; + + FrameHeader::Continuation(ContinuationFrameHeader { + message_key, + sequence, + body_len, + is_last: flags & 0b1 == 0b1, + }) + } + _ => return Err(invalid_data("unknown header kind")), + }; + + let header_len = initial - buf.remaining(); + Ok(ParsedFrameHeader::new(header, header_len)) +} + +fn take_u8(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 1)?; + Ok(buf.get_u8()) +} + +fn take_u16(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 2)?; + Ok(buf.get_u16()) +} + +fn take_u32(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 4)?; + Ok(buf.get_u32()) +} + +fn take_u64(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 8)?; + Ok(buf.get_u64()) +} + +fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { + if buf.remaining() < needed { + return Err(invalid_data("header too short")); + } + Ok(()) +} + +fn invalid_data(message: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, message) +} diff --git a/tests/cucumber.rs b/tests/cucumber.rs index 74fc7252..7c497e6f 100644 --- a/tests/cucumber.rs +++ b/tests/cucumber.rs @@ -7,6 +7,7 @@ //! - `StreamEndWorld`: Verifies end-of-stream signalling //! - `MultiPacketWorld`: Tests channel-backed multi-packet response delivery //! - `FragmentWorld`: Tests fragment metadata enforcement and reassembly primitives +//! - `MessageAssemblerWorld`: Tests message assembler header parsing //! - `ClientRuntimeWorld`: Tests client runtime configuration and framing behaviour //! - `CodecStatefulWorld`: Tests instance-aware codec sequence counters //! - `RequestPartsWorld`: Tests request parts metadata handling @@ -21,6 +22,7 @@ //! tests/features/stream_end.feature -> StreamEndWorld context //! tests/features/multi_packet.feature -> MultiPacketWorld context //! tests/features/fragment.feature -> FragmentWorld context +//! tests/features/message_assembler.feature -> MessageAssemblerWorld context //! tests/features/client_runtime.feature -> ClientRuntimeWorld context //! tests/features/codec_stateful.feature -> CodecStatefulWorld context //! tests/features/request_parts.feature -> RequestPartsWorld context @@ -40,6 +42,7 @@ use world::{ CodecStatefulWorld, CorrelationWorld, FragmentWorld, + MessageAssemblerWorld, MultiPacketWorld, PanicWorld, RequestPartsWorld, @@ -53,6 +56,7 @@ async fn main() { StreamEndWorld::run("tests/features/stream_end.feature").await; MultiPacketWorld::run("tests/features/multi_packet.feature").await; FragmentWorld::run("tests/features/fragment.feature").await; + MessageAssemblerWorld::run("tests/features/message_assembler.feature").await; ClientRuntimeWorld::run("tests/features/client_runtime.feature").await; CodecStatefulWorld::run("tests/features/codec_stateful.feature").await; RequestPartsWorld::run("tests/features/request_parts.feature").await; diff --git a/tests/features/message_assembler.feature b/tests/features/message_assembler.feature new file mode 100644 index 00000000..98597a37 --- /dev/null +++ b/tests/features/message_assembler.feature @@ -0,0 +1,64 @@ +Feature: Message assembler header parsing + Wireframe exposes a MessageAssembler hook for protocol-specific header + parsing. The hook distinguishes first frames from continuation frames and + exposes key metadata needed for assembly. + + Scenario: Builder exposes a configured message assembler + Given a wireframe app with a message assembler + And a first frame header with key 9 metadata length 2 body length 12 + When the message assembler parses the header + Then the app exposes a message assembler + And the parsed header is first + And the message key is 9 + And the metadata length is 2 + And the body length is 12 + And the header length is 16 + And the total body length is absent + And the frame is marked last false + + Scenario: Parsing a first frame header without total length + Given a first frame header with key 9 metadata length 2 body length 12 + When the message assembler parses the header + Then the parsed header is first + And the message key is 9 + And the metadata length is 2 + And the body length is 12 + And the header length is 16 + And the total body length is absent + And the frame is marked last false + + Scenario: Parsing a first frame header with total length + Given a first frame header with key 42 body length 8 total 64 + When the message assembler parses the header + Then the parsed header is first + And the message key is 42 + And the metadata length is 0 + And the body length is 8 + And the header length is 20 + And the total body length is 64 + And the frame is marked last true + + Scenario: Parsing a continuation header with sequence + Given a continuation header with key 7 body length 16 sequence 3 + When the message assembler parses the header + Then the parsed header is continuation + And the message key is 7 + And the body length is 16 + And the header length is 18 + And the sequence is 3 + And the frame is marked last false + + Scenario: Parsing a continuation header without sequence + Given a continuation header with key 11 body length 5 + When the message assembler parses the header + Then the parsed header is continuation + And the message key is 11 + And the body length is 5 + And the header length is 14 + And the sequence is absent + And the frame is marked last true + + Scenario: Invalid header payload returns error + Given an invalid message header + When the message assembler parses the header + Then the parse fails with invalid data diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs new file mode 100644 index 00000000..c167cb23 --- /dev/null +++ b/tests/steps/message_assembler_steps.rs @@ -0,0 +1,183 @@ +//! Step definitions for message assembler header parsing. + +use cucumber::{given, then, when}; + +use crate::world::{ContinuationHeaderSpec, FirstHeaderSpec, MessageAssemblerWorld}; + +const DEFAULT_METADATA_LEN: usize = 0; +const FLAG_NONE: bool = false; +const FLAG_LAST: bool = true; +const NO_SEQUENCE: Option = None; +const NO_TOTAL_LEN: Option = None; + +// Helper builders to reduce duplication in step definitions +fn first_header_without_total(key: u64, metadata_len: usize, body_len: usize) -> FirstHeaderSpec { + FirstHeaderSpec { + key, + metadata_len, + body_len, + total_len: NO_TOTAL_LEN, + is_last: FLAG_NONE, + } +} + +fn first_header_with_total(key: u64, body_len: usize, total_len: usize) -> FirstHeaderSpec { + FirstHeaderSpec { + key, + metadata_len: DEFAULT_METADATA_LEN, + body_len, + total_len: Some(total_len), + is_last: FLAG_LAST, + } +} + +fn continuation_header_with_sequence( + key: u64, + body_len: usize, + sequence: u32, +) -> ContinuationHeaderSpec { + ContinuationHeaderSpec { + key, + body_len, + sequence: Some(sequence), + is_last: FLAG_NONE, + } +} + +fn continuation_header_without_sequence(key: u64, body_len: usize) -> ContinuationHeaderSpec { + ContinuationHeaderSpec { + key, + body_len, + sequence: NO_SEQUENCE, + is_last: FLAG_LAST, + } +} + +// Cucumber step definitions +#[given(expr = "a first frame header with key {int} metadata length {int} body length {int}")] +fn given_first_header( + world: &mut MessageAssemblerWorld, + key: u64, + metadata_len: usize, + body_len: usize, +) -> crate::world::TestResult { + world.set_first_header(first_header_without_total(key, metadata_len, body_len)) +} + +#[given(expr = "a first frame header with key {int} body length {int} total {int}")] +fn given_first_header_with_total( + world: &mut MessageAssemblerWorld, + key: u64, + body_len: usize, + total_len: usize, +) -> crate::world::TestResult { + world.set_first_header(first_header_with_total(key, body_len, total_len)) +} + +#[given(expr = "a continuation header with key {int} body length {int} sequence {int}")] +fn given_continuation_header_with_sequence( + world: &mut MessageAssemblerWorld, + key: u64, + body_len: usize, + sequence: u32, +) -> crate::world::TestResult { + world.set_continuation_header(continuation_header_with_sequence(key, body_len, sequence)) +} + +#[given(expr = "a continuation header with key {int} body length {int}")] +fn given_continuation_header( + world: &mut MessageAssemblerWorld, + key: u64, + body_len: usize, +) -> crate::world::TestResult { + world.set_continuation_header(continuation_header_without_sequence(key, body_len)) +} + +#[given("a wireframe app with a message assembler")] +fn given_app_with_message_assembler(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.set_app_with_message_assembler() +} + +#[given("an invalid message header")] +fn given_invalid_header(world: &mut MessageAssemblerWorld) { world.set_invalid_payload(); } + +#[when("the message assembler parses the header")] +fn when_parsing(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.parse_header() +} + +#[then(expr = "the parsed header is {word}")] +#[expect( + clippy::needless_pass_by_value, + reason = "cucumber hands {word} captures to step functions as owned strings" +)] +fn then_header_kind(world: &mut MessageAssemblerWorld, kind: String) -> crate::world::TestResult { + world.assert_header_kind(&kind) +} + +#[then(expr = "the message key is {int}")] +fn then_message_key(world: &mut MessageAssemblerWorld, key: u64) -> crate::world::TestResult { + world.assert_message_key(key) +} + +#[then(expr = "the metadata length is {int}")] +fn then_metadata_len( + world: &mut MessageAssemblerWorld, + metadata_len: usize, +) -> crate::world::TestResult { + world.assert_metadata_len(metadata_len) +} + +#[then(expr = "the body length is {int}")] +fn then_body_len(world: &mut MessageAssemblerWorld, body_len: usize) -> crate::world::TestResult { + world.assert_body_len(body_len) +} + +#[then(expr = "the header length is {int}")] +fn then_header_len( + world: &mut MessageAssemblerWorld, + header_len: usize, +) -> crate::world::TestResult { + world.assert_header_len(header_len) +} + +#[then("the total body length is absent")] +fn then_total_absent(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.assert_total_len(None) +} + +#[then(expr = "the total body length is {int}")] +fn then_total_present(world: &mut MessageAssemblerWorld, total: usize) -> crate::world::TestResult { + world.assert_total_len(Some(total)) +} + +#[then(expr = "the sequence is {int}")] +fn then_sequence(world: &mut MessageAssemblerWorld, sequence: u32) -> crate::world::TestResult { + world.assert_sequence(Some(sequence)) +} + +#[then("the sequence is absent")] +fn then_sequence_absent(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.assert_sequence(None) +} + +#[then(expr = "the frame is marked last {word}")] +#[expect( + clippy::needless_pass_by_value, + reason = "cucumber hands {word} captures to step functions as owned strings" +)] +fn then_is_last(world: &mut MessageAssemblerWorld, expected: String) -> crate::world::TestResult { + world.assert_is_last(expected == "true") +} + +#[then("the parse fails with invalid data")] +fn then_invalid_data(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.assert_invalid_data_error() +} + +#[then("the app exposes a message assembler")] +fn then_app_exposes_message_assembler( + world: &mut MessageAssemblerWorld, +) -> crate::world::TestResult { + world.assert_message_assembler_configured() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index c1c6d2d6..f4ef427d 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -8,6 +8,7 @@ mod client_steps; mod codec_stateful_steps; mod correlation_steps; mod fragment_steps; +mod message_assembler_steps; mod multi_packet_steps; mod panic_steps; mod request_parts_steps; diff --git a/tests/world.rs b/tests/world.rs index a1c2bfae..96f38557 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -11,6 +11,7 @@ pub use worlds::{ common::TestResult, correlation::CorrelationWorld, fragment::FragmentWorld, + message_assembler::{ContinuationHeaderSpec, FirstHeaderSpec, MessageAssemblerWorld}, multi_packet::MultiPacketWorld, panic::PanicWorld, request_parts::RequestPartsWorld, diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs new file mode 100644 index 00000000..2ef637dd --- /dev/null +++ b/tests/worlds/message_assembler.rs @@ -0,0 +1,374 @@ +//! Test world for message assembler header parsing. +#![cfg(not(loom))] + +use std::{fmt, io}; + +use bytes::{BufMut, BytesMut}; +use cucumber::World; +use wireframe::{ + message_assembler::{FrameHeader, FrameSequence, MessageAssembler, ParsedFrameHeader}, + test_helpers::TestAssembler, +}; + +use super::{TestApp, TestResult}; + +/// Specification for first-frame header encoding used in tests. +#[derive(Debug, Clone, Copy)] +pub struct FirstHeaderSpec { + /// Message key to encode into the header. + pub key: u64, + /// Metadata length in bytes. + pub metadata_len: usize, + /// Body length in bytes for this frame. + pub body_len: usize, + /// Optional total body length across all frames. + pub total_len: Option, + /// Whether the frame is the final one in the series. + pub is_last: bool, +} + +/// Specification for continuation-frame header encoding used in tests. +#[derive(Debug, Clone, Copy)] +pub struct ContinuationHeaderSpec { + /// Message key to encode into the header. + pub key: u64, + /// Body length in bytes for this frame. + pub body_len: usize, + /// Optional sequence number. + pub sequence: Option, + /// Whether the frame is the final one in the series. + pub is_last: bool, +} + +#[derive(Debug, Clone, Copy)] +struct HeaderEnvelope { + kind: u8, + flags: u8, + key: u64, +} + +/// World used by Cucumber to test message assembler header parsing. +#[derive(Default, World)] +pub struct MessageAssemblerWorld { + payload: Option>, + parsed: Option, + error: Option, + app: Option, +} + +impl fmt::Debug for MessageAssemblerWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MessageAssemblerWorld") + .field("payload", &self.payload) + .field("parsed", &self.parsed) + .field("error", &self.error) + .field( + "app", + &self.app.as_ref().map(|_| "wireframe::app::WireframeApp"), + ) + .finish() + } +} + +impl MessageAssemblerWorld { + fn assert_common_field(&self, field: &str, expected: &T, extractor: F) -> TestResult + where + T: PartialEq + fmt::Display + Copy, + F: FnOnce(&FrameHeader) -> T, + { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let actual = extractor(parsed.header()); + if actual != *expected { + return Err(format!("expected {field} {expected}, got {actual}").into()); + } + Ok(()) + } + + /// Generic helper for asserting header-type-specific fields. + /// + /// The extractor performs both type-checking (via pattern matching) and field + /// extraction, returning an error message if the header type is incorrect. + fn assert_header_field(&self, field_name: &str, expected: &T, extractor: F) -> TestResult + where + T: PartialEq + fmt::Display + Copy, + F: FnOnce(&FrameHeader) -> Result, + { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let actual = extractor(parsed.header()).map_err(ToString::to_string)?; + if actual != *expected { + return Err(format!("expected {field_name} {expected}, got {actual}").into()); + } + Ok(()) + } + + /// Store an encoded first-frame header in the world payload. + /// + /// # Errors + /// + /// Returns an error if any length field exceeds the header encoding limits. + pub fn set_first_header(&mut self, spec: FirstHeaderSpec) -> TestResult { + let mut flags = 0u8; + if spec.is_last { + flags |= 0b1; + } + if spec.total_len.is_some() { + flags |= 0b10; + } + self.set_payload_with_header( + HeaderEnvelope { + kind: 0x01, + flags, + key: spec.key, + }, + |bytes| { + let metadata_len = + u16::try_from(spec.metadata_len).map_err(|_| "metadata length too large")?; + bytes.put_u16(metadata_len); + let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; + bytes.put_u32(body_len); + if let Some(total) = spec.total_len { + let total = u32::try_from(total).map_err(|_| "total length too large")?; + bytes.put_u32(total); + } + Ok(()) + }, + ) + } + + /// Store an encoded continuation-frame header in the world payload. + /// + /// # Errors + /// + /// Returns an error if any length field exceeds the header encoding limits. + pub fn set_continuation_header(&mut self, spec: ContinuationHeaderSpec) -> TestResult { + let mut flags = 0u8; + if spec.is_last { + flags |= 0b1; + } + if spec.sequence.is_some() { + flags |= 0b10; + } + self.set_payload_with_header( + HeaderEnvelope { + kind: 0x02, + flags, + key: spec.key, + }, + |bytes| { + let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; + bytes.put_u32(body_len); + if let Some(seq) = spec.sequence { + bytes.put_u32(seq); + } + Ok(()) + }, + ) + } + + fn set_payload_with_header(&mut self, envelope: HeaderEnvelope, encode: F) -> TestResult + where + F: FnOnce(&mut BytesMut) -> TestResult, + { + let mut bytes = BytesMut::new(); + bytes.put_u8(envelope.kind); + bytes.put_u8(envelope.flags); + bytes.put_u64(envelope.key); + encode(&mut bytes)?; + self.payload = Some(bytes.to_vec()); + Ok(()) + } + + /// Store a deliberately invalid header payload. + pub fn set_invalid_payload(&mut self) { self.payload = Some(vec![0x01]); } + + /// Parse the stored payload with the test assembler. + /// + /// # Errors + /// + /// Returns an error if no payload has been configured. + pub fn parse_header(&mut self) -> TestResult { + let payload = self.payload.as_deref().ok_or("payload not set")?; + let fallback = TestAssembler; + let assembler: &dyn MessageAssembler = match self.app.as_ref() { + Some(app) => app + .message_assembler() + .ok_or("message assembler not set")? + .as_ref(), + None => &fallback, + }; + match assembler.parse_frame_header(payload) { + Ok(parsed) => { + self.parsed = Some(parsed); + self.error = None; + } + Err(err) => { + self.parsed = None; + self.error = Some(err); + } + } + Ok(()) + } + + /// Assert that the parsed header is of the expected kind. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the kind does not match. + pub fn assert_header_kind(&self, expected: &str) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let matches_kind = matches!( + (expected, parsed.header()), + ("first", FrameHeader::First(_)) | ("continuation", FrameHeader::Continuation(_)) + ); + if matches_kind { + Ok(()) + } else { + Err(format!("expected {expected} header").into()) + } + } + + /// Assert that the parsed header contains the expected message key. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the key does not match. + pub fn assert_message_key(&self, expected: u64) -> TestResult { + self.assert_common_field("key", &expected, |header| match header { + FrameHeader::First(header) => u64::from(header.message_key), + FrameHeader::Continuation(header) => u64::from(header.message_key), + }) + } + + /// Assert that the parsed header contains the expected metadata length. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the metadata length differs. + pub fn assert_metadata_len(&self, expected: usize) -> TestResult { + self.assert_header_field("metadata length", &expected, |header| { + if let FrameHeader::First(header) = header { + Ok(header.metadata_len) + } else { + Err("expected first header") + } + }) + } + + /// Assert that the parsed header contains the expected body length. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the body length differs. + pub fn assert_body_len(&self, expected: usize) -> TestResult { + self.assert_common_field("body length", &expected, |header| match header { + FrameHeader::First(header) => header.body_len, + FrameHeader::Continuation(header) => header.body_len, + }) + } + + /// Assert that the parsed header contains the expected total body length. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the total length differs. + pub fn assert_total_len(&self, expected: Option) -> TestResult { + let expected = DebugDisplay(expected); + self.assert_header_field("total length", &expected, |header| { + if let FrameHeader::First(header) = header { + Ok(DebugDisplay(header.total_body_len)) + } else { + Err("expected first header") + } + }) + } + + /// Assert that the parsed header contains the expected sequence. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the sequence differs. + pub fn assert_sequence(&self, expected: Option) -> TestResult { + let expected = expected.map(FrameSequence::from); + let expected = DebugDisplay(expected); + self.assert_header_field("sequence", &expected, |header| { + if let FrameHeader::Continuation(header) = header { + Ok(DebugDisplay(header.sequence)) + } else { + Err("expected continuation header") + } + }) + } + + /// Assert that the parsed header matches the expected `is_last` flag. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the flag differs. + pub fn assert_is_last(&self, expected: bool) -> TestResult { + self.assert_common_field("is_last", &expected, |header| match header { + FrameHeader::First(header) => header.is_last, + FrameHeader::Continuation(header) => header.is_last, + }) + } + + /// Assert that the parsed header length matches the expected value. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the length differs. + pub fn assert_header_len(&self, expected: usize) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let actual = parsed.header_len(); + if actual != expected { + return Err(format!("expected header length {expected}, got {actual}").into()); + } + Ok(()) + } + + /// Assert that the parse failed with `InvalidData`. + /// + /// # Errors + /// + /// Returns an error if no parse error was captured or the kind differs. + pub fn assert_invalid_data_error(&self) -> TestResult { + let err = self.error.as_ref().ok_or("expected error")?; + if err.kind() != io::ErrorKind::InvalidData { + return Err(format!("expected InvalidData error, got {:?}", err.kind()).into()); + } + Ok(()) + } + + /// Store a wireframe app configured with a test message assembler. + /// + /// # Errors + /// + /// Returns an error if the app builder fails. + pub fn set_app_with_message_assembler(&mut self) -> TestResult { + let app = TestApp::new() + .map_err(|err| format!("failed to build app: {err}"))? + .with_message_assembler(TestAssembler); + self.app = Some(app); + Ok(()) + } + + /// Assert that the app exposes a message assembler. + /// + /// # Errors + /// + /// Returns an error if the app or assembler is missing. + pub fn assert_message_assembler_configured(&self) -> TestResult { + let app = self.app.as_ref().ok_or("app not set")?; + if app.message_assembler().is_some() { + Ok(()) + } else { + Err("expected message assembler".into()) + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq)] +struct DebugDisplay(T); + +impl fmt::Display for DebugDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self.0) } +} diff --git a/tests/worlds/mod.rs b/tests/worlds/mod.rs index 2be85b58..32534de3 100644 --- a/tests/worlds/mod.rs +++ b/tests/worlds/mod.rs @@ -2,9 +2,9 @@ //! //! Provides world types for behaviour-driven tests covering fragmentation, //! correlation, panic recovery, stream termination, multi-packet channels, -//! stateful codecs, and request parts. Shared utilities like -//! `build_small_queues` keep individual worlds focused on their respective -//! scenarios. +//! stateful codecs, request parts, and message assembler parsing. Shared +//! utilities like `build_small_queues` keep individual worlds focused on their +//! respective scenarios. #![cfg(not(loom))] #[path = "../common/mod.rs"] @@ -32,6 +32,7 @@ pub mod client_runtime; pub mod codec_stateful; pub mod correlation; pub mod fragment; +pub mod message_assembler; pub mod multi_packet; pub mod panic; pub mod request_parts;