From ecf62a31932f09127ab2a9aae7844d52ae883acd Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 4 Jan 2026 04:52:35 +0000 Subject: [PATCH 01/14] feat(message_assembler): add MessageAssembler trait and header parsing design Introduce the MessageAssembler trait and associated frame header types to support multi-frame request assembly and consistent shared header parsing across protocol crates. This new public interface includes MessageKey and FrameSequence newtypes, FrameHeader enum with First and Continuation variants, and a parse_frame_header method returning ParsedFrameHeader. Also document the design extensively, propose builder integration for assembler registration, and define comprehensive unit and behavioral tests to validate parsing and error handling. This completes the design and documentation phase to enable subsequent runtime integration. Co-authored-by: terragon-labs[bot] --- .../8-2-1-message-assembler-hook-trait.md | 307 ++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 docs/execplans/8-2-1-message-assembler-hook-trait.md diff --git a/docs/execplans/8-2-1-message-assembler-hook-trait.md b/docs/execplans/8-2-1-message-assembler-hook-trait.md new file mode 100644 index 00000000..b582a172 --- /dev/null +++ b/docs/execplans/8-2-1-message-assembler-hook-trait.md @@ -0,0 +1,307 @@ +# MessageAssembler hook trait and per-frame header parsing + +This ExecPlan is a living document. The sections `Progress`, +`Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must +be kept up to date as work proceeds. + +No `PLANS.md` exists in this repository as of 2026-01-04. + +## Purpose / Big Picture + +Wireframe needs a protocol-facing hook for multi-frame request assembly and a +shared header model that distinguishes “first frame” from “continuation +frame”. This work delivers the `MessageAssembler` trait and the first version +of header parsing types so protocol crates can implement consistent parsing and +Wireframe can later apply shared buffering and assembly logic. + +Success is observable when: + +- The public `MessageAssembler` trait and frame header types are available in + `wireframe::message_assembler` and re-exported in `wireframe`. +- A protocol can implement `MessageAssembler` to parse a simple header into a + `FrameHeader::First` or `FrameHeader::Continuation` value. +- Unit tests cover first/continuation parsing, short-header errors, and header + length accounting. +- A Cucumber feature validates the same behaviours via the behavioural test + harness. +- `docs/users-guide.md` explains the new public interface and how to configure + it (including any “not yet wired into the connection path” caveats). +- The relevant design docs capture the interface decisions. +- `docs/roadmap.md` marks 8.2.1 and 8.2.2 as done once the change lands. + +## Progress + +- [x] (2026-01-04 00:00Z) Draft ExecPlan for 8.2.1 and 8.2.2. +- [ ] Define `MessageAssembler` trait and header types under + `src/message_assembler/` with module-level docs and examples. +- [ ] Add builder support to store an optional assembler in + `src/app/builder.rs`. +- [ ] Add unit tests for header parsing and error handling. +- [ ] Add Cucumber feature, world, and steps for message assembler parsing. +- [ ] Update design docs, user guide, and roadmap checkboxes. +- [ ] Run formatting, linting, and test gates with `make` targets. + +## Surprises & Discoveries + +- Observation: Streaming request primitives exist (`src/request/mod.rs` and + `src/extractor.rs`), but the inbound pipeline in + `src/app/connection.rs` does not yet invoke a message-assembly hook. This + plan keeps integration scoped to 8.2.1/8.2.2 and documents the limitation in + the user guide until 8.2.5. + Evidence: `WireframeApp::handle_frame` decodes to `Envelope` and dispatches + directly to handlers after fragmentation reassembly. + +## Decision Log + +- Decision: Introduce a dedicated public module + `src/message_assembler/` for the hook trait and header types. + Rationale: Keeps assembly concerns distinct from `hooks` and allows clear + documentation of the protocol-facing interface. + Date/Author: 2026-01-04 (Codex). +- Decision: Model frame headers with a public `FrameHeader` enum and + `FirstFrameHeader`/`ContinuationFrameHeader` structs plus newtypes for + `MessageKey` and optional `FrameSequence`. + Rationale: Avoids integer soup and aligns with the ADR’s required header + fields while leaving room for 8.2.3/8.2.4. + Date/Author: 2026-01-04 (Codex). +- Decision: Use `std::io::Error` for parse failures in the trait method. + Rationale: Matches the existing framing/codec error model and keeps the + trait object-safe without adding another type parameter to `WireframeApp`. + Date/Author: 2026-01-04 (Codex). + +## Outcomes & Retrospective + +Not started yet. + +## Context and Orientation + +Wireframe’s inbound path decodes frames into `Envelope` values inside +`src/app/connection.rs`, then optionally applies transport fragmentation +reassembly via `src/app/fragmentation_state.rs` and +`frame_handling::reassemble_if_needed`. Handlers receive buffered payloads +through `ServiceRequest` and `PacketParts` in `src/middleware.rs`. + +Streaming request types (`RequestParts`, `RequestBodyStream`, and the +`StreamingBody` extractor) live in `src/request/mod.rs` and +`src/extractor.rs`, but they are not yet wired into the connection loop. +The `MessageAssembler` hook introduced here is the protocol-facing interface +for that forthcoming integration (see ADR 0002). + +Key references: + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` + (authoritative requirements for `MessageAssembler`). +- `docs/generic-message-fragmentation-and-re-assembly-design.md` section 9 + (composition order and memory budgeting). +- `docs/multi-packet-and-streaming-responses-design.md` section 11.4 + (MessageAssembler composition narrative). +- `docs/the-road-to-wireframe-1-0-feature-set-` + `philosophy-and-capability-maturity.md` section + “Protocol-level message assembly”. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` (resource + limits and failure semantics). +- Testing guidance: `docs/rust-testing-with-rstest-fixtures.md`, + `docs/behavioural-testing-in-rust-with-cucumber.md`, + `docs/reliable-testing-in-rust-via-dependency-injection.md`, and + `docs/rust-doctest-dry-guide.md`. + +## Plan of Work + +Start by adding a new `message_assembler` module that defines the +`MessageAssembler` trait, header types, and newtypes for message keys and +(optional) sequence indices. Document the trait with examples that show how to +parse a header using `bytes::Buf` rather than indexing. Next, wire the builder +so applications can register an assembler via +`WireframeApp::with_message_assembler` (and access it via +`message_assembler()`), even though the runtime integration lands in 8.2.5. +Then add unit tests for parsing behaviours and error handling using a +lightweight test assembler. Add a Cucumber feature that asserts the same +behaviours through the behavioural harness. Finally, update the design and +user documentation, and mark 8.2.1/8.2.2 as done in the roadmap. + +## Concrete Steps + +1. Read the core design documents listed above to confirm required header + fields and the intended composition order. + +2. Create `src/message_assembler/` with: + + - `src/message_assembler/mod.rs` containing the public API and module-level + `//!` documentation. + - `src/message_assembler/header.rs` defining `FrameHeader`, + `FirstFrameHeader`, and `ContinuationFrameHeader` along with newtypes such + as `MessageKey` and `FrameSequence`. + - `src/message_assembler/error.rs` only if a shared error type is needed for + examples or tests; otherwise, rely on `std::io::Error`. + + Keep each file below 400 lines and add doc examples for public items. + +3. Add `pub mod message_assembler;` to `src/lib.rs`, and re-export the new + trait and header types for a stable public surface. + +4. Update `src/app/builder.rs`: + + - Add a `message_assembler: Option>` field. + - Thread the field through `Default`, `rebuild_with_params`, and any + builder transitions. + - Add `with_message_assembler(...)` and `message_assembler()` methods. + +5. Implement unit tests (suggested location: + `src/message_assembler/tests.rs`): + + - Build a small `TestMessageAssembler` that parses a simple header format + (one byte “kind” tag plus fixed-width fields for key and lengths). + - Validate first-frame parsing returns `FrameHeader::First` with correct + metadata length, body length, and header byte count. + - Validate continuation parsing returns `FrameHeader::Continuation`. + - Validate short/invalid headers return `io::ErrorKind::InvalidData`. + - Avoid indexing and use `bytes::Buf` for parsing to satisfy clippy. + +6. Add behavioural tests: + + - `tests/features/message_assembler.feature` with scenarios for first-frame + and continuation parsing. + - `tests/worlds/message_assembler.rs` storing the parsed header and errors. + - `tests/steps/message_assembler_steps.rs` with Given/When/Then steps. + - Register the world in `tests/worlds/mod.rs`, `tests/world.rs`, and run it + from `tests/cucumber.rs`. Add the steps module in `tests/steps/mod.rs`. + +7. Update documentation: + + - `docs/users-guide.md`: describe the new `MessageAssembler` trait, how to + configure it via `WireframeApp`, and call out that runtime integration + into the inbound pipeline arrives in 8.2.5. + - `docs/adr-002-streaming-requests-and-shared-message-assembly.md`: record + the concrete trait signature and header model decisions. + - `docs/generic-message-fragmentation-and-re-assembly-design.md` and + `docs/multi-packet-and-streaming-responses-design.md`: add references to + the new API surface. + - `docs/the-road-to-wireframe-1-0-feature-set-` + `philosophy-and-capability-maturity.md`: note the concrete hook trait + surface. + +8. Update `docs/roadmap.md` to mark 8.2.1 and 8.2.2 as done when all above + changes are complete. + +## Validation and Acceptance + +Acceptance requires all of the following: + +- New public `MessageAssembler` trait and header types compile and are + documented with examples. +- Unit tests cover parsing for first and continuation frames and error cases. +- Cucumber behavioural tests for message assembler parsing pass. +- Design docs, user guide, and roadmap reflect the new interface. + +Run validation from the repository root (use `tee` to capture full output): + + set -o pipefail + timeout 300 make fmt 2>&1 | tee /tmp/wireframe-fmt.log + echo "fmt exit: $?" + + set -o pipefail + timeout 300 make markdownlint 2>&1 | tee /tmp/wireframe-markdownlint.log + echo "markdownlint exit: $?" + + set -o pipefail + timeout 300 make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log + echo "check-fmt exit: $?" + + set -o pipefail + timeout 300 make lint 2>&1 | tee /tmp/wireframe-lint.log + echo "lint exit: $?" + + set -o pipefail + timeout 300 make test 2>&1 | tee /tmp/wireframe-test.log + echo "test exit: $?" + +If Mermaid diagrams are edited or added, also run: + + set -o pipefail + timeout 300 make nixie 2>&1 | tee /tmp/wireframe-nixie.log + echo "nixie exit: $?" + +## Idempotence and Recovery + +All steps are additive and can be re-run safely. If a step fails, fix the +underlying issue and re-run only the affected command(s). Use the `tee` +outputs to locate the failure before retrying. Avoid destructive commands; if +a local change needs to be backed out, revert only the specific files edited +for this feature. + +## Artifacts and Notes + +Expected artefacts after completion: + +- `src/message_assembler/` module with trait, header types, and tests. +- New Cucumber feature and world files under `tests/features/` and + `tests/worlds/`. +- Updated design docs and user guide entries. + +## Interfaces and Dependencies + +At the end of this work, the following public interfaces must exist and be +re-exported from `src/lib.rs`: + +- Module `wireframe::message_assembler` with a module-level `//!` comment. +- Newtypes: + + pub struct MessageKey(pub u64); + pub struct FrameSequence(pub u32); + + Include `From`/`From` and `Copy`, `Clone`, `Debug`, `Eq`, and + `Hash`. + +- Header model: + + pub enum FrameHeader { + First(FirstFrameHeader), + Continuation(ContinuationFrameHeader), + } + + pub struct FirstFrameHeader { + pub message_key: MessageKey, + pub metadata_len: usize, + pub body_len: usize, + pub total_body_len: Option, + pub is_last: bool, + } + + pub struct ContinuationFrameHeader { + pub message_key: MessageKey, + pub sequence: Option, + pub body_len: usize, + pub is_last: bool, + } + + If header length accounting is needed for slicing, add a + `ParsedFrameHeader { header_len, header }` wrapper. + +- Hook trait: + + pub trait MessageAssembler: Send + Sync + 'static { + fn parse_frame_header( + &self, + payload: &[u8], + ) -> Result; + } + +- Builder configuration: + + impl WireframeApp { + pub fn with_message_assembler( + self, + assembler: impl MessageAssembler + 'static, + ) -> Self; + + pub fn message_assembler(&self) -> Option>; + } + +If the builder uses `Arc`, add the required `use` and +thread it through `rebuild_with_params` so type transitions preserve the +configured assembler. + +## Revision note + +Add a revision note only when this ExecPlan is updated after implementation +work has started. From edb5d3d8e34a64799f3c82031a84c842365445bf Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 4 Jan 2026 05:46:00 +0000 Subject: [PATCH 02/14] feat(message_assembler): add MessageAssembler trait and header parsing for multi-frame requests Introduce a new protocol-facing hook trait `MessageAssembler` that defines parsing of per-frame headers distinguishing first and continuation frames. This addition includes detailed header models (`FirstFrameHeader`, `ContinuationFrameHeader`, `FrameHeader`, and related types) and a parsed header wrapper `ParsedFrameHeader` with length accounting. The hook supports multi-frame request assembly and is designed for protocol crates to implement consistent parsing logic. Unit tests, Cucumber integration, and documentation updates accompany this feature, along with changes to builder APIs to register the assembler. This lays the groundwork for streaming request support integration in later roadmap steps. Co-authored-by: terragon-labs[bot] --- ...ng-requests-and-shared-message-assembly.md | 47 +++ .../8-2-1-message-assembler-hook-trait.md | 122 +++--- ...ge-fragmentation-and-re-assembly-design.md | 6 + ...i-packet-and-streaming-responses-design.md | 4 +- docs/roadmap.md | 4 +- ...-set-philosophy-and-capability-maturity.md | 2 + docs/users-guide.md | 50 +++ src/app/builder.rs | 86 ++++- src/lib.rs | 10 + src/message_assembler/header.rs | 185 +++++++++ src/message_assembler/mod.rs | 101 +++++ src/message_assembler/tests.rs | 223 +++++++++++ tests/cucumber.rs | 4 + tests/features/message_assembler.feature | 47 +++ tests/steps/message_assembler_steps.rs | 132 +++++++ tests/steps/mod.rs | 1 + tests/world.rs | 1 + tests/worlds/message_assembler.rs | 363 ++++++++++++++++++ tests/worlds/mod.rs | 7 +- 19 files changed, 1326 insertions(+), 69 deletions(-) create mode 100644 src/message_assembler/header.rs create mode 100644 src/message_assembler/mod.rs create mode 100644 src/message_assembler/tests.rs create mode 100644 tests/features/message_assembler.feature create mode 100644 tests/steps/message_assembler_steps.rs create mode 100644 tests/worlds/message_assembler.rs diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 37e8efa9..5d0b38d1 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -146,6 +146,53 @@ At a minimum, the hook must allow a protocol to provide: Wireframe will provide, centrally: +- a stable hook trait and header model for protocol crates. The initial public + surface is: + +```rust +use wireframe::message_assembler::{ + ContinuationFrameHeader, + FrameHeader, + FrameSequence, + FirstFrameHeader, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; + +pub trait MessageAssembler: Send + Sync + 'static { + fn parse_frame_header( + &self, + payload: &[u8], + ) -> Result; +} + +pub struct ParsedFrameHeader { + header: FrameHeader, + header_len: usize, +} + +pub enum FrameHeader { + First(FirstFrameHeader), + Continuation(ContinuationFrameHeader), +} + +pub struct FirstFrameHeader { + pub message_key: MessageKey, + pub metadata_len: usize, + pub body_len: usize, + pub total_body_len: Option, + pub is_last: bool, +} + +pub struct ContinuationFrameHeader { + pub message_key: MessageKey, + pub sequence: Option, + pub body_len: usize, + pub is_last: bool, +} +``` + - buffering and assembly state management; - enforcement of maximum total message size, maximum fragment size, timeouts, and “max in-flight bytes”; and diff --git a/docs/execplans/8-2-1-message-assembler-hook-trait.md b/docs/execplans/8-2-1-message-assembler-hook-trait.md index b582a172..a17fd0d4 100644 --- a/docs/execplans/8-2-1-message-assembler-hook-trait.md +++ b/docs/execplans/8-2-1-message-assembler-hook-trait.md @@ -9,10 +9,10 @@ No `PLANS.md` exists in this repository as of 2026-01-04. ## Purpose / Big Picture Wireframe needs a protocol-facing hook for multi-frame request assembly and a -shared header model that distinguishes “first frame” from “continuation -frame”. This work delivers the `MessageAssembler` trait and the first version -of header parsing types so protocol crates can implement consistent parsing and -Wireframe can later apply shared buffering and assembly logic. +shared header model that distinguishes “first frame” from “continuation frame”. +This work delivers the `MessageAssembler` trait and the first version of header +parsing types so protocol crates can implement consistent parsing and Wireframe +can later apply shared buffering and assembly logic. Success is observable when: @@ -32,41 +32,41 @@ Success is observable when: ## Progress - [x] (2026-01-04 00:00Z) Draft ExecPlan for 8.2.1 and 8.2.2. -- [ ] Define `MessageAssembler` trait and header types under +- [x] (2026-01-04 01:00Z) Define `MessageAssembler` trait and header types under `src/message_assembler/` with module-level docs and examples. -- [ ] Add builder support to store an optional assembler in +- [x] (2026-01-04 01:00Z) Add builder support to store an optional assembler in `src/app/builder.rs`. -- [ ] Add unit tests for header parsing and error handling. -- [ ] Add Cucumber feature, world, and steps for message assembler parsing. -- [ ] Update design docs, user guide, and roadmap checkboxes. -- [ ] Run formatting, linting, and test gates with `make` targets. +- [x] (2026-01-04 01:05Z) Add unit tests for header parsing and error handling. +- [x] (2026-01-04 01:10Z) Add Cucumber feature, world, and steps for message + assembler parsing. +- [x] (2026-01-04 01:15Z) Update design docs, user guide, and roadmap + checkboxes. +- [x] (2026-01-04 02:05Z) Run formatting, linting, and test gates with `make` + targets. ## Surprises & Discoveries - Observation: Streaming request primitives exist (`src/request/mod.rs` and - `src/extractor.rs`), but the inbound pipeline in - `src/app/connection.rs` does not yet invoke a message-assembly hook. This - plan keeps integration scoped to 8.2.1/8.2.2 and documents the limitation in - the user guide until 8.2.5. + `src/extractor.rs`), but the inbound pipeline in `src/app/connection.rs` does + not yet invoke a message-assembly hook. This plan keeps integration scoped to + 8.2.1/8.2.2 and documents the limitation in the user guide until 8.2.5. Evidence: `WireframeApp::handle_frame` decodes to `Envelope` and dispatches directly to handlers after fragmentation reassembly. ## Decision Log - Decision: Introduce a dedicated public module - `src/message_assembler/` for the hook trait and header types. - Rationale: Keeps assembly concerns distinct from `hooks` and allows clear - documentation of the protocol-facing interface. - Date/Author: 2026-01-04 (Codex). + `src/message_assembler/` for the hook trait and header types. Rationale: + Keeps assembly concerns distinct from `hooks` and allows clear documentation + of the protocol-facing interface. Date/Author: 2026-01-04 (Codex). - Decision: Model frame headers with a public `FrameHeader` enum and `FirstFrameHeader`/`ContinuationFrameHeader` structs plus newtypes for - `MessageKey` and optional `FrameSequence`. - Rationale: Avoids integer soup and aligns with the ADR’s required header - fields while leaving room for 8.2.3/8.2.4. - Date/Author: 2026-01-04 (Codex). + `MessageKey` and optional `FrameSequence`. Rationale: Avoids integer soup and + aligns with the ADR’s required header fields while leaving room for + 8.2.3/8.2.4. Date/Author: 2026-01-04 (Codex). - Decision: Use `std::io::Error` for parse failures in the trait method. - Rationale: Matches the existing framing/codec error model and keeps the - trait object-safe without adding another type parameter to `WireframeApp`. + Rationale: Matches the existing framing/codec error model and keeps the trait + object-safe without adding another type parameter to `WireframeApp`. Date/Author: 2026-01-04 (Codex). ## Outcomes & Retrospective @@ -82,10 +82,10 @@ reassembly via `src/app/fragmentation_state.rs` and through `ServiceRequest` and `PacketParts` in `src/middleware.rs`. Streaming request types (`RequestParts`, `RequestBodyStream`, and the -`StreamingBody` extractor) live in `src/request/mod.rs` and -`src/extractor.rs`, but they are not yet wired into the connection loop. -The `MessageAssembler` hook introduced here is the protocol-facing interface -for that forthcoming integration (see ADR 0002). +`StreamingBody` extractor) live in `src/request/mod.rs` and `src/extractor.rs`, +but they are not yet wired into the connection loop. The `MessageAssembler` +hook introduced here is the protocol-facing interface for that forthcoming +integration (see ADR 0002). Key references: @@ -96,8 +96,8 @@ Key references: - `docs/multi-packet-and-streaming-responses-design.md` section 11.4 (MessageAssembler composition narrative). - `docs/the-road-to-wireframe-1-0-feature-set-` - `philosophy-and-capability-maturity.md` section - “Protocol-level message assembly”. + `philosophy-and-capability-maturity.md` section “Protocol-level message + assembly”. - `docs/hardening-wireframe-a-guide-to-production-resilience.md` (resource limits and failure semantics). - Testing guidance: `docs/rust-testing-with-rstest-fixtures.md`, @@ -116,8 +116,8 @@ so applications can register an assembler via `message_assembler()`), even though the runtime integration lands in 8.2.5. Then add unit tests for parsing behaviours and error handling using a lightweight test assembler. Add a Cucumber feature that asserts the same -behaviours through the behavioural harness. Finally, update the design and -user documentation, and mark 8.2.1/8.2.2 as done in the roadmap. +behaviours through the behavioural harness. Finally, update the design and user +documentation, and mark 8.2.1/8.2.2 as done in the roadmap. ## Concrete Steps @@ -224,10 +224,10 @@ If Mermaid diagrams are edited or added, also run: ## Idempotence and Recovery All steps are additive and can be re-run safely. If a step fails, fix the -underlying issue and re-run only the affected command(s). Use the `tee` -outputs to locate the failure before retrying. Avoid destructive commands; if -a local change needs to be backed out, revert only the specific files edited -for this feature. +underlying issue and re-run only the affected command(s). Use the `tee` outputs +to locate the failure before retrying. Avoid destructive commands; if a local +change needs to be backed out, revert only the specific files edited for this +feature. ## Artifacts and Notes @@ -254,37 +254,37 @@ re-exported from `src/lib.rs`: - Header model: - pub enum FrameHeader { - First(FirstFrameHeader), - Continuation(ContinuationFrameHeader), - } - - pub struct FirstFrameHeader { - pub message_key: MessageKey, - pub metadata_len: usize, - pub body_len: usize, - pub total_body_len: Option, - pub is_last: bool, - } - - pub struct ContinuationFrameHeader { - pub message_key: MessageKey, - pub sequence: Option, - pub body_len: usize, - pub is_last: bool, - } + pub enum FrameHeader { + First(FirstFrameHeader), + Continuation(ContinuationFrameHeader), + } + + pub struct FirstFrameHeader { + pub message_key: MessageKey, + pub metadata_len: usize, + pub body_len: usize, + pub total_body_len: Option, + pub is_last: bool, + } + + pub struct ContinuationFrameHeader { + pub message_key: MessageKey, + pub sequence: Option, + pub body_len: usize, + pub is_last: bool, + } If header length accounting is needed for slicing, add a `ParsedFrameHeader { header_len, header }` wrapper. - Hook trait: - pub trait MessageAssembler: Send + Sync + 'static { - fn parse_frame_header( - &self, - payload: &[u8], - ) -> Result; - } + pub trait MessageAssembler: Send + Sync + 'static { + fn parse_frame_header( + &self, + payload: &[u8], + ) -> Result; + } - Builder configuration: diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 6996ffff..0126b1c3 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -460,6 +460,12 @@ This ordering ensures protocol crates migrating to `MessageAssembler` do not need to special-case transport fragments: the assembler only sees complete packet payloads and focuses on protocol continuity rules. +Wireframe exposes the hook surface as +`wireframe::message_assembler::MessageAssembler`, and applications register an +implementation via `WireframeApp::with_message_assembler`. The hook is stored +on the builder today and integrated into the inbound pipeline in roadmap item +8.2.5. + ### 9.3 Memory budget integration Per-connection memory budgets apply across both layers. Budgets cover: diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 487d1ef2..a88fbdb8 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -440,7 +440,9 @@ not hang. protocol crates to supply protocol-specific parsing and continuity rules while Wireframe provides shared buffering machinery and limit enforcement. See the [fragmentation design][frag-design] for how transport-level - fragmentation and protocol-level assembly compose. + fragmentation and protocol-level assembly compose. The hook is surfaced as + `wireframe::message_assembler::MessageAssembler` and registered via + `WireframeApp::with_message_assembler`. [frag-design]: generic-message-fragmentation-and-re-assembly-design.md diff --git a/docs/roadmap.md b/docs/roadmap.md index f65851b2..e5c9bd1d 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -277,9 +277,9 @@ and standardized per-connection memory budgets. ### 8.2. MessageAssembler abstraction -- [ ] 8.2.1. Define a `MessageAssembler` hook trait for protocol-specific +- [x] 8.2.1. Define a `MessageAssembler` hook trait for protocol-specific multi-frame parsing. -- [ ] 8.2.2. Implement per-frame header parsing with "first frame" versus +- [x] 8.2.2. Implement per-frame header parsing with "first frame" versus "continuation frame" handling. - [ ] 8.2.3. Add message key support for multiplexing interleaved assemblies. - [ ] 8.2.4. Implement continuity validation (ordering, missing frames, and diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 8bdb5c1d..ee3be6ee 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -217,6 +217,8 @@ The `MessageAssembler` hook allows a protocol to provide: Wireframe provides, centrally: +- a stable hook trait and header model exposed as + `wireframe::message_assembler::MessageAssembler`; - buffering and assembly state management; - enforcement of maximum total message size, maximum fragment size, timeouts, and "max in-flight bytes"; and diff --git a/docs/users-guide.md b/docs/users-guide.md index af9ec6c0..9dc73ab9 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -282,6 +282,56 @@ reconstruction), `RequestParts` carries only protocol-defined metadata required to interpret the streaming body. The body itself is consumed through a separate stream, enabling back-pressure and incremental processing.[^46] +### Message assembler hook + +Wireframe exposes a protocol-facing `MessageAssembler` hook that parses +per-frame headers into `FrameHeader::First` and `FrameHeader::Continuation` +values. It returns a `ParsedFrameHeader` that includes the header length so the +remaining bytes can be treated as the body chunk. + +Register an assembler with `WireframeApp::with_message_assembler`: + +```rust,no_run +use wireframe::{ + app::WireframeApp, + message_assembler::{ + FrameHeader, + FirstFrameHeader, + MessageAssembler, + MessageKey, + ParsedFrameHeader, + }, +}; + +struct DemoAssembler; + +impl MessageAssembler for DemoAssembler { + fn parse_frame_header( + &self, + _payload: &[u8], + ) -> Result { + Ok(ParsedFrameHeader::new( + FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(1), + metadata_len: 0, + body_len: 0, + total_body_len: None, + is_last: true, + }), + 0, + )) + } +} + +let _app = WireframeApp::new() + .expect("builder") + .with_message_assembler(DemoAssembler); +``` + +Note: the hook is stored on the application today but is wired into the inbound +connection path in roadmap item 8.2.5. Until that integration lands, protocol +crates can use the trait for shared parsing logic and tests. + ### Streaming request body consumption Handlers can opt into streaming request bodies using the `StreamingBody` diff --git a/src/app/builder.rs b/src/app/builder.rs index 294fc8fa..d4cfc7f8 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -23,6 +23,7 @@ use crate::{ codec::{FrameCodec, LengthDelimitedFrameCodec, clamp_frame_length}, fragment::FragmentationConfig, hooks::{ProtocolHooks, WireframeProtocol}, + message_assembler::MessageAssembler, middleware::HandlerService, serializer::{BincodeSerializer, Serializer}, }; @@ -50,6 +51,7 @@ pub struct WireframeApp< pub(super) codec: F, pub(super) read_timeout_ms: u64, pub(super) fragmentation: Option, + pub(super) message_assembler: Option>, } impl Default for WireframeApp @@ -77,6 +79,7 @@ where codec, read_timeout_ms: 100, fragmentation: default_fragmentation(max_frame_length), + message_assembler: None, } } } @@ -138,6 +141,7 @@ where codec: F2, protocol: Option>>, fragmentation: Option, + message_assembler: Option>, ) -> WireframeApp where S2: Serializer + Send + Sync, @@ -156,6 +160,7 @@ where codec, read_timeout_ms: self.read_timeout_ms, fragmentation, + message_assembler, } } @@ -171,7 +176,8 @@ where { let fragmentation = default_fragmentation(codec.max_frame_length()); let serializer = std::mem::take(&mut self.serializer); - self.rebuild_with_params(serializer, codec, None, fragmentation) + let message_assembler = self.message_assembler.take(); + self.rebuild_with_params(serializer, codec, None, fragmentation, message_assembler) } /// Register a route that maps `id` to `handler`. @@ -257,6 +263,7 @@ where codec: self.codec, read_timeout_ms: self.read_timeout_ms, fragmentation: self.fragmentation, + message_assembler: self.message_assembler, }) } @@ -294,6 +301,74 @@ where } } + /// Install a [`MessageAssembler`] implementation. + /// + /// The assembler parses protocol-specific frame headers to support + /// multi-frame request assembly once the inbound pipeline integrates it. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::{ + /// app::WireframeApp, + /// message_assembler::{MessageAssembler, ParsedFrameHeader}, + /// }; + /// + /// struct DemoAssembler; + /// + /// impl MessageAssembler for DemoAssembler { + /// fn parse_frame_header(&self, _payload: &[u8]) -> Result { + /// Err(std::io::Error::new( + /// std::io::ErrorKind::InvalidData, + /// "unimplemented", + /// )) + /// } + /// } + /// + /// let app = WireframeApp::new() + /// .expect("builder") + /// .with_message_assembler(DemoAssembler); + /// let _ = app; + /// ``` + #[must_use] + pub fn with_message_assembler(self, assembler: impl MessageAssembler + 'static) -> Self { + WireframeApp { + message_assembler: Some(Arc::new(assembler)), + ..self + } + } + + /// Get the configured message assembler, if any. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::{ + /// app::WireframeApp, + /// message_assembler::{MessageAssembler, ParsedFrameHeader}, + /// }; + /// + /// struct DemoAssembler; + /// + /// impl MessageAssembler for DemoAssembler { + /// fn parse_frame_header(&self, _payload: &[u8]) -> Result { + /// Err(std::io::Error::new( + /// std::io::ErrorKind::InvalidData, + /// "unimplemented", + /// )) + /// } + /// } + /// + /// let app = WireframeApp::new() + /// .expect("builder") + /// .with_message_assembler(DemoAssembler); + /// assert!(app.message_assembler().is_some()); + /// ``` + #[must_use] + pub fn message_assembler(&self) -> Option> { + self.message_assembler.clone() + } + /// Configure a Dead Letter Queue for dropped push frames. /// /// ```rust,no_run @@ -348,7 +423,14 @@ where let codec = std::mem::take(&mut self.codec); let protocol = self.protocol.take(); let fragmentation = self.fragmentation.take(); - self.rebuild_with_params(serializer, codec, protocol, fragmentation) + let message_assembler = self.message_assembler.take(); + self.rebuild_with_params( + serializer, + codec, + protocol, + fragmentation, + message_assembler, + ) } /// Configure the read timeout in milliseconds. diff --git a/src/lib.rs b/src/lib.rs index 6cb4355b..cf5ff282 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ pub mod fragment; pub mod frame; pub mod hooks; pub mod message; +pub mod message_assembler; pub mod metrics; pub mod middleware; pub mod panic; @@ -58,6 +59,15 @@ pub use fragment::{ fragment_overhead, }; pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}; +pub use message_assembler::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; pub use metrics::{CONNECTIONS_ACTIVE, Direction, ERRORS_TOTAL, FRAMES_PROCESSED}; pub use request::{ DEFAULT_BODY_CHANNEL_CAPACITY, diff --git a/src/message_assembler/header.rs b/src/message_assembler/header.rs new file mode 100644 index 00000000..c10f0740 --- /dev/null +++ b/src/message_assembler/header.rs @@ -0,0 +1,185 @@ +//! Header model for protocol-level message assembly. +//! +//! These types describe per-frame metadata required to reassemble multi-frame +//! requests. Protocol crates populate them via `MessageAssembler`. + +use std::fmt; + +/// Identifies a logical message being assembled from multiple frames. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct MessageKey(pub u64); + +impl From for MessageKey { + fn from(value: u64) -> Self { Self(value) } +} + +impl From for u64 { + fn from(value: MessageKey) -> Self { value.0 } +} + +impl fmt::Display for MessageKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } +} + +/// Optional sequence number for continuation frames. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct FrameSequence(pub u32); + +impl From for FrameSequence { + fn from(value: u32) -> Self { Self(value) } +} + +impl From for u32 { + fn from(value: FrameSequence) -> Self { value.0 } +} + +impl fmt::Display for FrameSequence { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } +} + +/// Parsed per-frame header information. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum FrameHeader { + /// Header describing the first frame in a multi-frame request. + First(FirstFrameHeader), + /// Header describing a continuation frame. + Continuation(ContinuationFrameHeader), +} + +/// Header metadata for the first frame in a message series. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct FirstFrameHeader { + /// Key used to correlate frames belonging to the same message. + pub message_key: MessageKey, + /// Protocol-specific metadata length in bytes. + pub metadata_len: usize, + /// Length of the body bytes in this frame. + pub body_len: usize, + /// Total expected body length across all frames, if declared by the protocol. + pub total_body_len: Option, + /// Whether this frame completes the message. + pub is_last: bool, +} + +/// Header metadata for continuation frames. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ContinuationFrameHeader { + /// Key used to correlate frames belonging to the same message. + pub message_key: MessageKey, + /// Optional frame sequence index, when supplied by the protocol. + pub sequence: Option, + /// Length of the body bytes in this frame. + pub body_len: usize, + /// Whether this frame completes the message. + pub is_last: bool, +} + +/// Result of parsing a frame header from payload bytes. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ParsedFrameHeader { + header: FrameHeader, + header_len: usize, +} + +impl ParsedFrameHeader { + /// Create a parsed header with its byte length. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(1), + /// metadata_len: 0, + /// body_len: 4, + /// total_body_len: None, + /// is_last: true, + /// }); + /// + /// let parsed = ParsedFrameHeader::new(header, 12); + /// assert_eq!(parsed.header_len(), 12); + /// ``` + #[must_use] + pub const fn new(header: FrameHeader, header_len: usize) -> Self { Self { header, header_len } } + + /// Return the parsed header information. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(7), + /// metadata_len: 0, + /// body_len: 1, + /// total_body_len: None, + /// is_last: true, + /// }); + /// let parsed = ParsedFrameHeader::new(header, 4); + /// assert!(matches!(parsed.header(), FrameHeader::First(_))); + /// ``` + #[must_use] + pub const fn header(&self) -> &FrameHeader { &self.header } + + /// Return the number of bytes consumed by the header. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(3), + /// metadata_len: 0, + /// body_len: 0, + /// total_body_len: None, + /// is_last: true, + /// }); + /// let parsed = ParsedFrameHeader::new(header, 12); + /// assert_eq!(parsed.header_len(), 12); + /// ``` + #[must_use] + pub const fn header_len(&self) -> usize { self.header_len } + + /// Consume the parsed header and return the underlying header value. + /// + /// # Examples + /// + /// ``` + /// use wireframe::message_assembler::{ + /// FirstFrameHeader, + /// FrameHeader, + /// MessageKey, + /// ParsedFrameHeader, + /// }; + /// + /// let header = FrameHeader::First(FirstFrameHeader { + /// message_key: MessageKey(1), + /// metadata_len: 0, + /// body_len: 0, + /// total_body_len: None, + /// is_last: true, + /// }); + /// let parsed = ParsedFrameHeader::new(header, 0); + /// assert!(matches!(parsed.into_header(), FrameHeader::First(_))); + /// ``` + #[must_use] + pub fn into_header(self) -> FrameHeader { self.header } +} diff --git a/src/message_assembler/mod.rs b/src/message_assembler/mod.rs new file mode 100644 index 00000000..fc26c014 --- /dev/null +++ b/src/message_assembler/mod.rs @@ -0,0 +1,101 @@ +//! Protocol-level message assembly hooks. +//! +//! A `MessageAssembler` parses protocol-specific frame headers to distinguish +//! "first" frames from "continuation" frames. The hook operates above +//! transport fragmentation and feeds the streaming request pipeline once the +//! connection actor integrates it (see ADR 0002). + +mod header; + +use std::io; + +pub use header::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageKey, + ParsedFrameHeader, +}; + +/// Hook trait for protocol-specific multi-frame request parsing. +/// +/// Implementations should parse only the per-frame header and return the +/// parsed header plus the number of bytes consumed. The remaining bytes are +/// treated as the frame's body chunk. +/// +/// # Examples +/// +/// ```rust,no_run +/// use bytes::Buf; +/// use wireframe::message_assembler::{ +/// FirstFrameHeader, +/// FrameHeader, +/// MessageAssembler, +/// MessageKey, +/// ParsedFrameHeader, +/// }; +/// +/// struct DemoAssembler; +/// +/// impl MessageAssembler for DemoAssembler { +/// fn parse_frame_header(&self, payload: &[u8]) -> Result { +/// let mut buf = payload; +/// if buf.remaining() < 9 { +/// return Err(std::io::Error::new( +/// std::io::ErrorKind::InvalidData, +/// "header too short", +/// )); +/// } +/// +/// let tag = buf.get_u8(); +/// let key = MessageKey::from(buf.get_u64()); +/// let header = match tag { +/// 0x01 => FrameHeader::First(FirstFrameHeader { +/// message_key: key, +/// metadata_len: 0, +/// body_len: buf.remaining(), +/// total_body_len: None, +/// is_last: true, +/// }), +/// _ => { +/// return Err(std::io::Error::new( +/// std::io::ErrorKind::InvalidData, +/// "unknown header tag", +/// )); +/// } +/// }; +/// +/// let header_len = payload.len() - buf.remaining(); +/// Ok(ParsedFrameHeader::new(header, header_len)) +/// } +/// } +/// ``` +pub trait MessageAssembler: Send + Sync + 'static { + /// Parse a protocol header from the provided payload bytes. + /// + /// # Errors + /// + /// Returns an `io::Error` when the header is malformed or incomplete. + /// + /// # Examples + /// + /// ```rust,no_run + /// use wireframe::message_assembler::{MessageAssembler, ParsedFrameHeader}; + /// + /// struct Demo; + /// + /// impl MessageAssembler for Demo { + /// fn parse_frame_header(&self, _payload: &[u8]) -> Result { + /// Err(std::io::Error::new( + /// std::io::ErrorKind::InvalidData, + /// "header not implemented", + /// )) + /// } + /// } + /// ``` + fn parse_frame_header(&self, payload: &[u8]) -> Result; +} + +#[cfg(test)] +mod tests; diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs new file mode 100644 index 00000000..30babf1f --- /dev/null +++ b/src/message_assembler/tests.rs @@ -0,0 +1,223 @@ +//! Unit tests for message assembler header parsing. + +use std::io; + +use bytes::{Buf, BufMut, BytesMut}; + +use super::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; + +struct TestAssembler; + +impl MessageAssembler for TestAssembler { + fn parse_frame_header(&self, payload: &[u8]) -> Result { + let mut buf = payload; + let initial = buf.remaining(); + + let kind = take_u8(&mut buf)?; + let flags = take_u8(&mut buf)?; + let message_key = MessageKey::from(take_u64(&mut buf)?); + + let header = match kind { + 0x01 => { + let metadata_len = usize::from(take_u16(&mut buf)?); + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let total_body_len = if flags & 0b10 == 0b10 { + Some( + usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("total length too large"))?, + ) + } else { + None + }; + + FrameHeader::First(FirstFrameHeader { + message_key, + metadata_len, + body_len, + total_body_len, + is_last: flags & 0b1 == 0b1, + }) + } + 0x02 => { + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let sequence = if flags & 0b10 == 0b10 { + Some(FrameSequence::from(take_u32(&mut buf)?)) + } else { + None + }; + + FrameHeader::Continuation(ContinuationFrameHeader { + message_key, + sequence, + body_len, + is_last: flags & 0b1 == 0b1, + }) + } + _ => return Err(invalid_data("unknown header kind")), + }; + + let header_len = initial - buf.remaining(); + Ok(ParsedFrameHeader::new(header, header_len)) + } +} + +#[test] +fn parse_first_frame_header_without_total() { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x01); // kind + bytes.put_u8(0b0); // flags + bytes.put_u64(9); + bytes.put_u16(2); + bytes.put_u32(12); + let payload = bytes.to_vec(); + + let parsed = TestAssembler + .parse_frame_header(&payload) + .expect("header parse"); + + let FrameHeader::First(header) = parsed.header() else { + panic!("expected first frame header"); + }; + + assert_eq!(header.message_key, MessageKey(9)); + assert_eq!(header.metadata_len, 2); + assert_eq!(header.body_len, 12); + assert_eq!(header.total_body_len, None); + assert!(!header.is_last); + assert_eq!(parsed.header_len(), payload.len()); +} + +#[test] +fn parse_first_frame_header_with_total_and_last() { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x01); + bytes.put_u8(0b11); // last + total + bytes.put_u64(42); + bytes.put_u16(0); + bytes.put_u32(8); + bytes.put_u32(64); + let payload = bytes.to_vec(); + + let parsed = TestAssembler + .parse_frame_header(&payload) + .expect("header parse"); + + let FrameHeader::First(header) = parsed.header() else { + panic!("expected first frame header"); + }; + + assert_eq!(header.message_key, MessageKey(42)); + assert_eq!(header.metadata_len, 0); + assert_eq!(header.body_len, 8); + assert_eq!(header.total_body_len, Some(64)); + assert!(header.is_last); + assert_eq!(parsed.header_len(), payload.len()); +} + +#[test] +fn parse_continuation_header_with_sequence() { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x02); + bytes.put_u8(0b10); // sequence present + bytes.put_u64(7); + bytes.put_u32(16); + bytes.put_u32(3); + let payload = bytes.to_vec(); + + let parsed = TestAssembler + .parse_frame_header(&payload) + .expect("header parse"); + + let FrameHeader::Continuation(header) = parsed.header() else { + panic!("expected continuation header"); + }; + + assert_eq!(header.message_key, MessageKey(7)); + assert_eq!(header.body_len, 16); + assert_eq!(header.sequence, Some(FrameSequence(3))); + assert!(!header.is_last); +} + +#[test] +fn parse_continuation_header_without_sequence() { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x02); + bytes.put_u8(0b1); // last, no sequence + bytes.put_u64(11); + bytes.put_u32(5); + let payload = bytes.to_vec(); + + let parsed = TestAssembler + .parse_frame_header(&payload) + .expect("header parse"); + + let FrameHeader::Continuation(header) = parsed.header() else { + panic!("expected continuation header"); + }; + + assert_eq!(header.message_key, MessageKey(11)); + assert_eq!(header.body_len, 5); + assert_eq!(header.sequence, None); + assert!(header.is_last); +} + +#[test] +fn short_header_errors() { + let payload = vec![0x01]; + let err = TestAssembler + .parse_frame_header(&payload) + .expect_err("expected error"); + + assert_eq!(err.kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn unknown_header_kind_errors() { + let payload = vec![0xff, 0x00, 0x00]; + let err = TestAssembler + .parse_frame_header(&payload) + .expect_err("expected error"); + + assert_eq!(err.kind(), io::ErrorKind::InvalidData); +} + +fn take_u8(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 1)?; + Ok(buf.get_u8()) +} + +fn take_u16(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 2)?; + Ok(buf.get_u16()) +} + +fn take_u32(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 4)?; + Ok(buf.get_u32()) +} + +fn take_u64(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 8)?; + Ok(buf.get_u64()) +} + +fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { + if buf.remaining() < needed { + return Err(invalid_data("header too short")); + } + Ok(()) +} + +fn invalid_data(message: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, message) +} diff --git a/tests/cucumber.rs b/tests/cucumber.rs index 74fc7252..7c497e6f 100644 --- a/tests/cucumber.rs +++ b/tests/cucumber.rs @@ -7,6 +7,7 @@ //! - `StreamEndWorld`: Verifies end-of-stream signalling //! - `MultiPacketWorld`: Tests channel-backed multi-packet response delivery //! - `FragmentWorld`: Tests fragment metadata enforcement and reassembly primitives +//! - `MessageAssemblerWorld`: Tests message assembler header parsing //! - `ClientRuntimeWorld`: Tests client runtime configuration and framing behaviour //! - `CodecStatefulWorld`: Tests instance-aware codec sequence counters //! - `RequestPartsWorld`: Tests request parts metadata handling @@ -21,6 +22,7 @@ //! tests/features/stream_end.feature -> StreamEndWorld context //! tests/features/multi_packet.feature -> MultiPacketWorld context //! tests/features/fragment.feature -> FragmentWorld context +//! tests/features/message_assembler.feature -> MessageAssemblerWorld context //! tests/features/client_runtime.feature -> ClientRuntimeWorld context //! tests/features/codec_stateful.feature -> CodecStatefulWorld context //! tests/features/request_parts.feature -> RequestPartsWorld context @@ -40,6 +42,7 @@ use world::{ CodecStatefulWorld, CorrelationWorld, FragmentWorld, + MessageAssemblerWorld, MultiPacketWorld, PanicWorld, RequestPartsWorld, @@ -53,6 +56,7 @@ async fn main() { StreamEndWorld::run("tests/features/stream_end.feature").await; MultiPacketWorld::run("tests/features/multi_packet.feature").await; FragmentWorld::run("tests/features/fragment.feature").await; + MessageAssemblerWorld::run("tests/features/message_assembler.feature").await; ClientRuntimeWorld::run("tests/features/client_runtime.feature").await; CodecStatefulWorld::run("tests/features/codec_stateful.feature").await; RequestPartsWorld::run("tests/features/request_parts.feature").await; diff --git a/tests/features/message_assembler.feature b/tests/features/message_assembler.feature new file mode 100644 index 00000000..9e54043c --- /dev/null +++ b/tests/features/message_assembler.feature @@ -0,0 +1,47 @@ +Feature: Message assembler header parsing + Wireframe exposes a MessageAssembler hook for protocol-specific header + parsing. The hook distinguishes first frames from continuation frames and + exposes key metadata needed for assembly. + + Scenario: Parsing a first frame header without total length + Given a first frame header with key 9 metadata length 2 body length 12 + When the message assembler parses the header + Then the parsed header is first + And the message key is 9 + And the metadata length is 2 + And the body length is 12 + And the total body length is absent + And the frame is marked last false + + Scenario: Parsing a first frame header with total length + Given a first frame header with key 42 body length 8 total 64 + When the message assembler parses the header + Then the parsed header is first + And the message key is 42 + And the metadata length is 0 + And the body length is 8 + And the total body length is 64 + And the frame is marked last true + + Scenario: Parsing a continuation header with sequence + Given a continuation header with key 7 body length 16 sequence 3 + When the message assembler parses the header + Then the parsed header is continuation + And the message key is 7 + And the body length is 16 + And the sequence is 3 + And the frame is marked last false + + Scenario: Parsing a continuation header without sequence + Given a continuation header with key 11 body length 5 + When the message assembler parses the header + Then the parsed header is continuation + And the message key is 11 + And the body length is 5 + And the sequence is absent + And the frame is marked last true + + Scenario: Invalid header payload returns error + Given an invalid message header + When the message assembler parses the header + Then the parse fails with invalid data diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs new file mode 100644 index 00000000..41130a2b --- /dev/null +++ b/tests/steps/message_assembler_steps.rs @@ -0,0 +1,132 @@ +//! Step definitions for message assembler header parsing. + +use cucumber::{given, then, when}; + +use crate::world::{ContinuationHeaderSpec, FirstHeaderSpec, MessageAssemblerWorld}; + +#[given(expr = "a first frame header with key {int} metadata length {int} body length {int}")] +fn given_first_header( + world: &mut MessageAssemblerWorld, + key: u64, + metadata_len: usize, + body_len: usize, +) -> crate::world::TestResult { + world.set_first_header(FirstHeaderSpec { + key, + metadata_len, + body_len, + total_len: None, + is_last: false, + }) +} + +#[given(expr = "a first frame header with key {int} body length {int} total {int}")] +fn given_first_header_with_total( + world: &mut MessageAssemblerWorld, + key: u64, + body_len: usize, + total_len: usize, +) -> crate::world::TestResult { + world.set_first_header(FirstHeaderSpec { + key, + metadata_len: 0, + body_len, + total_len: Some(total_len), + is_last: true, + }) +} + +#[given(expr = "a continuation header with key {int} body length {int} sequence {int}")] +fn given_continuation_header_with_sequence( + world: &mut MessageAssemblerWorld, + key: u64, + body_len: usize, + sequence: u32, +) -> crate::world::TestResult { + world.set_continuation_header(ContinuationHeaderSpec { + key, + body_len, + sequence: Some(sequence), + is_last: false, + }) +} + +#[given(expr = "a continuation header with key {int} body length {int}")] +fn given_continuation_header( + world: &mut MessageAssemblerWorld, + key: u64, + body_len: usize, +) -> crate::world::TestResult { + world.set_continuation_header(ContinuationHeaderSpec { + key, + body_len, + sequence: None, + is_last: true, + }) +} + +#[given("an invalid message header")] +fn given_invalid_header(world: &mut MessageAssemblerWorld) { world.set_invalid_payload(); } + +#[when("the message assembler parses the header")] +fn when_parsing(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.parse_header() +} + +#[then(expr = "the parsed header is {word}")] +fn then_header_kind(world: &mut MessageAssemblerWorld, kind: String) -> crate::world::TestResult { + let result = world.assert_header_kind(&kind); + drop(kind); + result +} + +#[then(expr = "the message key is {int}")] +fn then_message_key(world: &mut MessageAssemblerWorld, key: u64) -> crate::world::TestResult { + world.assert_message_key(key) +} + +#[then(expr = "the metadata length is {int}")] +fn then_metadata_len( + world: &mut MessageAssemblerWorld, + metadata_len: usize, +) -> crate::world::TestResult { + world.assert_metadata_len(metadata_len) +} + +#[then(expr = "the body length is {int}")] +fn then_body_len(world: &mut MessageAssemblerWorld, body_len: usize) -> crate::world::TestResult { + world.assert_body_len(body_len) +} + +#[then("the total body length is absent")] +fn then_total_absent(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.assert_total_len(None) +} + +#[then(expr = "the total body length is {int}")] +fn then_total_present(world: &mut MessageAssemblerWorld, total: usize) -> crate::world::TestResult { + world.assert_total_len(Some(total)) +} + +#[then(expr = "the sequence is {int}")] +fn then_sequence(world: &mut MessageAssemblerWorld, sequence: u32) -> crate::world::TestResult { + world.assert_sequence(Some(sequence)) +} + +#[then("the sequence is absent")] +fn then_sequence_absent(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.assert_sequence(None) +} + +#[then(expr = "the frame is marked last {word}")] +fn then_is_last(world: &mut MessageAssemblerWorld, expected: String) -> crate::world::TestResult { + let expected_str = expected; + let expected = expected_str == "true"; + drop(expected_str); + world.assert_is_last(expected) +} + +#[then("the parse fails with invalid data")] +fn then_invalid_data(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.assert_invalid_data_error() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index c1c6d2d6..f4ef427d 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -8,6 +8,7 @@ mod client_steps; mod codec_stateful_steps; mod correlation_steps; mod fragment_steps; +mod message_assembler_steps; mod multi_packet_steps; mod panic_steps; mod request_parts_steps; diff --git a/tests/world.rs b/tests/world.rs index a1c2bfae..96f38557 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -11,6 +11,7 @@ pub use worlds::{ common::TestResult, correlation::CorrelationWorld, fragment::FragmentWorld, + message_assembler::{ContinuationHeaderSpec, FirstHeaderSpec, MessageAssemblerWorld}, multi_packet::MultiPacketWorld, panic::PanicWorld, request_parts::RequestPartsWorld, diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs new file mode 100644 index 00000000..0cf72f89 --- /dev/null +++ b/tests/worlds/message_assembler.rs @@ -0,0 +1,363 @@ +//! Test world for message assembler header parsing. +#![cfg(not(loom))] + +use std::io; + +use bytes::{Buf, BufMut, BytesMut}; +use cucumber::World; +use wireframe::message_assembler::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; + +use super::TestResult; + +/// Specification for first-frame header encoding used in tests. +#[derive(Debug, Clone, Copy)] +pub struct FirstHeaderSpec { + /// Message key to encode into the header. + pub key: u64, + /// Metadata length in bytes. + pub metadata_len: usize, + /// Body length in bytes for this frame. + pub body_len: usize, + /// Optional total body length across all frames. + pub total_len: Option, + /// Whether the frame is the final one in the series. + pub is_last: bool, +} + +/// Specification for continuation-frame header encoding used in tests. +#[derive(Debug, Clone, Copy)] +pub struct ContinuationHeaderSpec { + /// Message key to encode into the header. + pub key: u64, + /// Body length in bytes for this frame. + pub body_len: usize, + /// Optional sequence number. + pub sequence: Option, + /// Whether the frame is the final one in the series. + pub is_last: bool, +} + +/// World used by Cucumber to test message assembler header parsing. +#[derive(Debug, Default, World)] +pub struct MessageAssemblerWorld { + payload: Option>, + parsed: Option, + error: Option, +} + +impl MessageAssemblerWorld { + /// Store an encoded first-frame header in the world payload. + /// + /// # Errors + /// + /// Returns an error if any length field exceeds the header encoding limits. + pub fn set_first_header(&mut self, spec: FirstHeaderSpec) -> TestResult { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x01); + let mut flags = 0u8; + if spec.is_last { + flags |= 0b1; + } + if spec.total_len.is_some() { + flags |= 0b10; + } + bytes.put_u8(flags); + bytes.put_u64(spec.key); + let metadata_len = + u16::try_from(spec.metadata_len).map_err(|_| "metadata length too large")?; + bytes.put_u16(metadata_len); + let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; + bytes.put_u32(body_len); + if let Some(total) = spec.total_len { + let total = u32::try_from(total).map_err(|_| "total length too large")?; + bytes.put_u32(total); + } + self.payload = Some(bytes.to_vec()); + Ok(()) + } + + /// Store an encoded continuation-frame header in the world payload. + /// + /// # Errors + /// + /// Returns an error if any length field exceeds the header encoding limits. + pub fn set_continuation_header(&mut self, spec: ContinuationHeaderSpec) -> TestResult { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x02); + let mut flags = 0u8; + if spec.is_last { + flags |= 0b1; + } + if spec.sequence.is_some() { + flags |= 0b10; + } + bytes.put_u8(flags); + bytes.put_u64(spec.key); + let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; + bytes.put_u32(body_len); + if let Some(seq) = spec.sequence { + bytes.put_u32(seq); + } + self.payload = Some(bytes.to_vec()); + Ok(()) + } + + /// Store a deliberately invalid header payload. + pub fn set_invalid_payload(&mut self) { self.payload = Some(vec![0x01]); } + + /// Parse the stored payload with the test assembler. + /// + /// # Errors + /// + /// Returns an error if no payload has been configured. + pub fn parse_header(&mut self) -> TestResult { + let payload = self.payload.as_deref().ok_or("payload not set")?; + match TestAssembler.parse_frame_header(payload) { + Ok(parsed) => { + self.parsed = Some(parsed); + self.error = None; + } + Err(err) => { + self.parsed = None; + self.error = Some(err); + } + } + Ok(()) + } + + /// Assert that the parsed header is of the expected kind. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the kind does not match. + pub fn assert_header_kind(&self, expected: &str) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let matches_kind = matches!( + (expected, parsed.header()), + ("first", FrameHeader::First(_)) | ("continuation", FrameHeader::Continuation(_)) + ); + if matches_kind { + Ok(()) + } else { + Err(format!("expected {expected} header").into()) + } + } + + /// Assert that the parsed header contains the expected message key. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the key does not match. + pub fn assert_message_key(&self, expected: u64) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let key = match parsed.header() { + FrameHeader::First(header) => header.message_key, + FrameHeader::Continuation(header) => header.message_key, + }; + if key != MessageKey(expected) { + return Err(format!("expected key {expected}, got {key}").into()); + } + Ok(()) + } + + /// Assert that the parsed header contains the expected metadata length. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the metadata length differs. + pub fn assert_metadata_len(&self, expected: usize) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let FrameHeader::First(header) = parsed.header() else { + return Err("expected first header".into()); + }; + if header.metadata_len != expected { + return Err(format!( + "expected metadata length {expected}, got {}", + header.metadata_len + ) + .into()); + } + Ok(()) + } + + /// Assert that the parsed header contains the expected body length. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the body length differs. + pub fn assert_body_len(&self, expected: usize) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let body_len = match parsed.header() { + FrameHeader::First(header) => header.body_len, + FrameHeader::Continuation(header) => header.body_len, + }; + if body_len != expected { + return Err(format!("expected body length {expected}, got {body_len}").into()); + } + Ok(()) + } + + /// Assert that the parsed header contains the expected total body length. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the total length differs. + pub fn assert_total_len(&self, expected: Option) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let FrameHeader::First(header) = parsed.header() else { + return Err("expected first header".into()); + }; + if header.total_body_len != expected { + return Err(format!( + "expected total length {:?}, got {:?}", + expected, header.total_body_len + ) + .into()); + } + Ok(()) + } + + /// Assert that the parsed header contains the expected sequence. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the sequence differs. + pub fn assert_sequence(&self, expected: Option) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let FrameHeader::Continuation(header) = parsed.header() else { + return Err("expected continuation header".into()); + }; + let expected = expected.map(FrameSequence::from); + if header.sequence != expected { + return Err(format!( + "expected sequence {:?}, got {:?}", + expected, header.sequence + ) + .into()); + } + Ok(()) + } + + /// Assert that the parsed header matches the expected `is_last` flag. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the flag differs. + pub fn assert_is_last(&self, expected: bool) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let is_last = match parsed.header() { + FrameHeader::First(header) => header.is_last, + FrameHeader::Continuation(header) => header.is_last, + }; + if is_last != expected { + return Err(format!("expected is_last {expected}, got {is_last}").into()); + } + Ok(()) + } + + /// Assert that the parse failed with `InvalidData`. + /// + /// # Errors + /// + /// Returns an error if no parse error was captured or the kind differs. + pub fn assert_invalid_data_error(&self) -> TestResult { + let err = self.error.as_ref().ok_or("expected error")?; + if err.kind() != io::ErrorKind::InvalidData { + return Err(format!("expected InvalidData error, got {:?}", err.kind()).into()); + } + Ok(()) + } +} + +struct TestAssembler; + +impl MessageAssembler for TestAssembler { + fn parse_frame_header(&self, payload: &[u8]) -> Result { + let mut buf = payload; + let initial = buf.remaining(); + let kind = take_u8(&mut buf)?; + let flags = take_u8(&mut buf)?; + let message_key = MessageKey::from(take_u64(&mut buf)?); + + let header = match kind { + 0x01 => { + let metadata_len = usize::from(take_u16(&mut buf)?); + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let total_body_len = if flags & 0b10 == 0b10 { + Some( + usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("total length too large"))?, + ) + } else { + None + }; + FrameHeader::First(FirstFrameHeader { + message_key, + metadata_len, + body_len, + total_body_len, + is_last: flags & 0b1 == 0b1, + }) + } + 0x02 => { + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let sequence = if flags & 0b10 == 0b10 { + Some(FrameSequence::from(take_u32(&mut buf)?)) + } else { + None + }; + FrameHeader::Continuation(ContinuationFrameHeader { + message_key, + sequence, + body_len, + is_last: flags & 0b1 == 0b1, + }) + } + _ => return Err(invalid_data("unknown header kind")), + }; + + let header_len = initial - buf.remaining(); + Ok(ParsedFrameHeader::new(header, header_len)) + } +} + +fn take_u8(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 1)?; + Ok(buf.get_u8()) +} + +fn take_u16(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 2)?; + Ok(buf.get_u16()) +} + +fn take_u32(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 4)?; + Ok(buf.get_u32()) +} + +fn take_u64(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 8)?; + Ok(buf.get_u64()) +} + +fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { + if buf.remaining() < needed { + return Err(invalid_data("header too short")); + } + Ok(()) +} + +fn invalid_data(message: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, message) +} diff --git a/tests/worlds/mod.rs b/tests/worlds/mod.rs index 2be85b58..32534de3 100644 --- a/tests/worlds/mod.rs +++ b/tests/worlds/mod.rs @@ -2,9 +2,9 @@ //! //! Provides world types for behaviour-driven tests covering fragmentation, //! correlation, panic recovery, stream termination, multi-packet channels, -//! stateful codecs, and request parts. Shared utilities like -//! `build_small_queues` keep individual worlds focused on their respective -//! scenarios. +//! stateful codecs, request parts, and message assembler parsing. Shared +//! utilities like `build_small_queues` keep individual worlds focused on their +//! respective scenarios. #![cfg(not(loom))] #[path = "../common/mod.rs"] @@ -32,6 +32,7 @@ pub mod client_runtime; pub mod codec_stateful; pub mod correlation; pub mod fragment; +pub mod message_assembler; pub mod multi_packet; pub mod panic; pub mod request_parts; From 79b0eb2118f86d0c7cee01685070a0412625807e Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 4 Jan 2026 16:04:40 +0000 Subject: [PATCH 03/14] test(message_assembler): refactor frame header tests for readability and reuse Refactored frame header tests to use helper functions and specs for payload construction and assertions. This removes duplication, improves clarity, and makes tests easier to maintain by encapsulating common setup and checks in reusable functions and structs. Co-authored-by: terragon-labs[bot] --- src/message_assembler/tests.rs | 222 ++++++++++++++++++++------------- 1 file changed, 138 insertions(+), 84 deletions(-) diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 30babf1f..a45b9b6b 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -73,102 +73,90 @@ impl MessageAssembler for TestAssembler { #[test] fn parse_first_frame_header_without_total() { - let mut bytes = BytesMut::new(); - bytes.put_u8(0x01); // kind - bytes.put_u8(0b0); // flags - bytes.put_u64(9); - bytes.put_u16(2); - bytes.put_u32(12); - let payload = bytes.to_vec(); - - let parsed = TestAssembler - .parse_frame_header(&payload) - .expect("header parse"); - - let FrameHeader::First(header) = parsed.header() else { - panic!("expected first frame header"); - }; - - assert_eq!(header.message_key, MessageKey(9)); - assert_eq!(header.metadata_len, 2); - assert_eq!(header.body_len, 12); - assert_eq!(header.total_body_len, None); - assert!(!header.is_last); - assert_eq!(parsed.header_len(), payload.len()); + let payload = build_first_header_payload(FirstHeaderSpec { + flags: 0b0, + message_key: 9, + metadata_len: 2, + body_len: 12, + total_body_len: None, + }); + let parsed = parse_header(&payload); + assert_first_header( + &parsed, + FirstFrameHeader { + message_key: MessageKey(9), + metadata_len: 2, + body_len: 12, + total_body_len: None, + is_last: false, + }, + payload.len(), + ); } #[test] fn parse_first_frame_header_with_total_and_last() { - let mut bytes = BytesMut::new(); - bytes.put_u8(0x01); - bytes.put_u8(0b11); // last + total - bytes.put_u64(42); - bytes.put_u16(0); - bytes.put_u32(8); - bytes.put_u32(64); - let payload = bytes.to_vec(); - - let parsed = TestAssembler - .parse_frame_header(&payload) - .expect("header parse"); - - let FrameHeader::First(header) = parsed.header() else { - panic!("expected first frame header"); - }; - - assert_eq!(header.message_key, MessageKey(42)); - assert_eq!(header.metadata_len, 0); - assert_eq!(header.body_len, 8); - assert_eq!(header.total_body_len, Some(64)); - assert!(header.is_last); - assert_eq!(parsed.header_len(), payload.len()); + let payload = build_first_header_payload(FirstHeaderSpec { + flags: 0b11, + message_key: 42, + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + }); + let parsed = parse_header(&payload); + assert_first_header( + &parsed, + FirstFrameHeader { + message_key: MessageKey(42), + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + is_last: true, + }, + payload.len(), + ); } #[test] fn parse_continuation_header_with_sequence() { - let mut bytes = BytesMut::new(); - bytes.put_u8(0x02); - bytes.put_u8(0b10); // sequence present - bytes.put_u64(7); - bytes.put_u32(16); - bytes.put_u32(3); - let payload = bytes.to_vec(); - - let parsed = TestAssembler - .parse_frame_header(&payload) - .expect("header parse"); - - let FrameHeader::Continuation(header) = parsed.header() else { - panic!("expected continuation header"); - }; - - assert_eq!(header.message_key, MessageKey(7)); - assert_eq!(header.body_len, 16); - assert_eq!(header.sequence, Some(FrameSequence(3))); - assert!(!header.is_last); + let payload = build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b10, + message_key: 7, + body_len: 16, + sequence: Some(3), + }); + let parsed = parse_header(&payload); + assert_continuation_header( + &parsed, + ContinuationFrameHeader { + message_key: MessageKey(7), + sequence: Some(FrameSequence(3)), + body_len: 16, + is_last: false, + }, + payload.len(), + ); } #[test] fn parse_continuation_header_without_sequence() { - let mut bytes = BytesMut::new(); - bytes.put_u8(0x02); - bytes.put_u8(0b1); // last, no sequence - bytes.put_u64(11); - bytes.put_u32(5); - let payload = bytes.to_vec(); - - let parsed = TestAssembler - .parse_frame_header(&payload) - .expect("header parse"); - - let FrameHeader::Continuation(header) = parsed.header() else { - panic!("expected continuation header"); - }; - - assert_eq!(header.message_key, MessageKey(11)); - assert_eq!(header.body_len, 5); - assert_eq!(header.sequence, None); - assert!(header.is_last); + let payload = build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b1, + message_key: 11, + body_len: 5, + sequence: None, + }); + let parsed = parse_header(&payload); + assert_continuation_header( + &parsed, + ContinuationFrameHeader { + message_key: MessageKey(11), + sequence: None, + body_len: 5, + is_last: true, + }, + payload.len(), + ); } #[test] @@ -221,3 +209,69 @@ fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { fn invalid_data(message: &'static str) -> io::Error { io::Error::new(io::ErrorKind::InvalidData, message) } + +#[derive(Clone, Copy)] +struct FirstHeaderSpec { + flags: u8, + message_key: u64, + metadata_len: u16, + body_len: u32, + total_body_len: Option, +} + +#[derive(Clone, Copy)] +struct ContinuationHeaderSpec { + flags: u8, + message_key: u64, + body_len: u32, + sequence: Option, +} + +fn build_first_header_payload(spec: FirstHeaderSpec) -> Vec { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x01); + bytes.put_u8(spec.flags); + bytes.put_u64(spec.message_key); + bytes.put_u16(spec.metadata_len); + bytes.put_u32(spec.body_len); + if let Some(total_body_len) = spec.total_body_len { + bytes.put_u32(total_body_len); + } + bytes.to_vec() +} + +fn build_continuation_header_payload(spec: ContinuationHeaderSpec) -> Vec { + let mut bytes = BytesMut::new(); + bytes.put_u8(0x02); + bytes.put_u8(spec.flags); + bytes.put_u64(spec.message_key); + bytes.put_u32(spec.body_len); + if let Some(sequence) = spec.sequence { + bytes.put_u32(sequence); + } + bytes.to_vec() +} + +fn parse_header(payload: &[u8]) -> ParsedFrameHeader { + TestAssembler + .parse_frame_header(payload) + .expect("header parse") +} + +fn assert_first_header( + parsed: &ParsedFrameHeader, + expected: FirstFrameHeader, + expected_len: usize, +) { + assert_eq!(parsed.header(), &FrameHeader::First(expected)); + assert_eq!(parsed.header_len(), expected_len); +} + +fn assert_continuation_header( + parsed: &ParsedFrameHeader, + expected: ContinuationFrameHeader, + expected_len: usize, +) { + assert_eq!(parsed.header(), &FrameHeader::Continuation(expected)); + assert_eq!(parsed.header_len(), expected_len); +} From 66fa2b2d69e8b45c024069aa81e590ed1710ac09 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 5 Jan 2026 12:32:43 +0000 Subject: [PATCH 04/14] test(message_assembler): refactor header parse tests to table-driven style Refactor header parsing tests by replacing multiple individual tests with a table-driven test that iterates over multiple cases. This improves maintainability and clarity. Also introduce common assertion helpers in the test world for concise test validations. Minor test step refactoring to use constants for flags and options. Co-authored-by: terragon-labs[bot] --- src/message_assembler/tests.rs | 179 +++++++++++-------------- tests/steps/message_assembler_steps.rs | 20 ++- tests/worlds/message_assembler.rs | 46 +++---- 3 files changed, 117 insertions(+), 128 deletions(-) diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index a45b9b6b..fbfdb0a4 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -72,91 +72,88 @@ impl MessageAssembler for TestAssembler { } #[test] -fn parse_first_frame_header_without_total() { - let payload = build_first_header_payload(FirstHeaderSpec { - flags: 0b0, - message_key: 9, - metadata_len: 2, - body_len: 12, - total_body_len: None, - }); - let parsed = parse_header(&payload); - assert_first_header( - &parsed, - FirstFrameHeader { - message_key: MessageKey(9), - metadata_len: 2, - body_len: 12, - total_body_len: None, - is_last: false, +fn parse_frame_headers() { + let cases = vec![ + HeaderCase { + name: "first frame without total", + build_payload: Box::new(|| { + build_first_header_payload(FirstHeaderSpec { + flags: 0b0, + message_key: 9, + metadata_len: 2, + body_len: 12, + total_body_len: None, + }) + }), + expected_header: FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(9), + metadata_len: 2, + body_len: 12, + total_body_len: None, + is_last: false, + }), }, - payload.len(), - ); -} - -#[test] -fn parse_first_frame_header_with_total_and_last() { - let payload = build_first_header_payload(FirstHeaderSpec { - flags: 0b11, - message_key: 42, - metadata_len: 0, - body_len: 8, - total_body_len: Some(64), - }); - let parsed = parse_header(&payload); - assert_first_header( - &parsed, - FirstFrameHeader { - message_key: MessageKey(42), - metadata_len: 0, - body_len: 8, - total_body_len: Some(64), - is_last: true, + HeaderCase { + name: "first frame with total and last", + build_payload: Box::new(|| { + build_first_header_payload(FirstHeaderSpec { + flags: 0b11, + message_key: 42, + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + }) + }), + expected_header: FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(42), + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + is_last: true, + }), }, - payload.len(), - ); -} - -#[test] -fn parse_continuation_header_with_sequence() { - let payload = build_continuation_header_payload(ContinuationHeaderSpec { - flags: 0b10, - message_key: 7, - body_len: 16, - sequence: Some(3), - }); - let parsed = parse_header(&payload); - assert_continuation_header( - &parsed, - ContinuationFrameHeader { - message_key: MessageKey(7), - sequence: Some(FrameSequence(3)), - body_len: 16, - is_last: false, + HeaderCase { + name: "continuation frame with sequence", + build_payload: Box::new(|| { + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b10, + message_key: 7, + body_len: 16, + sequence: Some(3), + }) + }), + expected_header: FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(7), + sequence: Some(FrameSequence(3)), + body_len: 16, + is_last: false, + }), }, - payload.len(), - ); -} - -#[test] -fn parse_continuation_header_without_sequence() { - let payload = build_continuation_header_payload(ContinuationHeaderSpec { - flags: 0b1, - message_key: 11, - body_len: 5, - sequence: None, - }); - let parsed = parse_header(&payload); - assert_continuation_header( - &parsed, - ContinuationFrameHeader { - message_key: MessageKey(11), - sequence: None, - body_len: 5, - is_last: true, + HeaderCase { + name: "continuation frame without sequence", + build_payload: Box::new(|| { + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b1, + message_key: 11, + body_len: 5, + sequence: None, + }) + }), + expected_header: FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(11), + sequence: None, + body_len: 5, + is_last: true, + }), }, - payload.len(), - ); + ]; + + for case in cases { + let payload = (case.build_payload)(); + let parsed = parse_header(&payload); + assert_eq!(parsed.header(), &case.expected_header, "case: {}", case.name); + assert_eq!(parsed.header_len(), payload.len(), "case: {}", case.name); + } } #[test] @@ -258,20 +255,8 @@ fn parse_header(payload: &[u8]) -> ParsedFrameHeader { .expect("header parse") } -fn assert_first_header( - parsed: &ParsedFrameHeader, - expected: FirstFrameHeader, - expected_len: usize, -) { - assert_eq!(parsed.header(), &FrameHeader::First(expected)); - assert_eq!(parsed.header_len(), expected_len); -} - -fn assert_continuation_header( - parsed: &ParsedFrameHeader, - expected: ContinuationFrameHeader, - expected_len: usize, -) { - assert_eq!(parsed.header(), &FrameHeader::Continuation(expected)); - assert_eq!(parsed.header_len(), expected_len); +struct HeaderCase { + name: &'static str, + build_payload: Box Vec>, + expected_header: FrameHeader, } diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs index 41130a2b..bbfe5a33 100644 --- a/tests/steps/message_assembler_steps.rs +++ b/tests/steps/message_assembler_steps.rs @@ -4,6 +4,12 @@ use cucumber::{given, then, when}; use crate::world::{ContinuationHeaderSpec, FirstHeaderSpec, MessageAssemblerWorld}; +const DEFAULT_METADATA_LEN: usize = 0; +const FLAG_NONE: bool = false; +const FLAG_LAST: bool = true; +const NO_SEQUENCE: Option = None; +const NO_TOTAL_LEN: Option = None; + #[given(expr = "a first frame header with key {int} metadata length {int} body length {int}")] fn given_first_header( world: &mut MessageAssemblerWorld, @@ -15,8 +21,8 @@ fn given_first_header( key, metadata_len, body_len, - total_len: None, - is_last: false, + total_len: NO_TOTAL_LEN, + is_last: FLAG_NONE, }) } @@ -29,10 +35,10 @@ fn given_first_header_with_total( ) -> crate::world::TestResult { world.set_first_header(FirstHeaderSpec { key, - metadata_len: 0, + metadata_len: DEFAULT_METADATA_LEN, body_len, total_len: Some(total_len), - is_last: true, + is_last: FLAG_LAST, }) } @@ -47,7 +53,7 @@ fn given_continuation_header_with_sequence( key, body_len, sequence: Some(sequence), - is_last: false, + is_last: FLAG_NONE, }) } @@ -60,8 +66,8 @@ fn given_continuation_header( world.set_continuation_header(ContinuationHeaderSpec { key, body_len, - sequence: None, - is_last: true, + sequence: NO_SEQUENCE, + is_last: FLAG_LAST, }) } diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index 0cf72f89..87e88646 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -1,7 +1,7 @@ //! Test world for message assembler header parsing. #![cfg(not(loom))] -use std::io; +use std::{fmt::Display, io}; use bytes::{Buf, BufMut, BytesMut}; use cucumber::World; @@ -54,6 +54,19 @@ pub struct MessageAssemblerWorld { } impl MessageAssemblerWorld { + fn assert_common_field(&self, field: &str, expected: T, extractor: F) -> TestResult + where + T: PartialEq + Display, + F: FnOnce(&FrameHeader) -> T, + { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let actual = extractor(parsed.header()); + if actual != expected { + return Err(format!("expected {field} {expected}, got {actual}").into()); + } + Ok(()) + } + /// Store an encoded first-frame header in the world payload. /// /// # Errors @@ -157,15 +170,10 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the key does not match. pub fn assert_message_key(&self, expected: u64) -> TestResult { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let key = match parsed.header() { - FrameHeader::First(header) => header.message_key, - FrameHeader::Continuation(header) => header.message_key, - }; - if key != MessageKey(expected) { - return Err(format!("expected key {expected}, got {key}").into()); - } - Ok(()) + self.assert_common_field("key", expected, |header| match header { + FrameHeader::First(header) => u64::from(header.message_key), + FrameHeader::Continuation(header) => u64::from(header.message_key), + }) } /// Assert that the parsed header contains the expected metadata length. @@ -194,15 +202,10 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the body length differs. pub fn assert_body_len(&self, expected: usize) -> TestResult { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let body_len = match parsed.header() { + self.assert_common_field("body length", expected, |header| match header { FrameHeader::First(header) => header.body_len, FrameHeader::Continuation(header) => header.body_len, - }; - if body_len != expected { - return Err(format!("expected body length {expected}, got {body_len}").into()); - } - Ok(()) + }) } /// Assert that the parsed header contains the expected total body length. @@ -252,15 +255,10 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the flag differs. pub fn assert_is_last(&self, expected: bool) -> TestResult { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let is_last = match parsed.header() { + self.assert_common_field("is_last", expected, |header| match header { FrameHeader::First(header) => header.is_last, FrameHeader::Continuation(header) => header.is_last, - }; - if is_last != expected { - return Err(format!("expected is_last {expected}, got {is_last}").into()); - } - Ok(()) + }) } /// Assert that the parse failed with `InvalidData`. From 63c4c023d28583a9e02773e9cf1cab06d92356b9 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 5 Jan 2026 15:13:07 +0000 Subject: [PATCH 05/14] refactor(tests): restructure message assembler header tests for clarity - Extracted test cases to a separate build_test_cases function in src/message_assembler/tests.rs for better organization. - Added assert_first_field and assert_continuation_field helper methods to MessageAssemblerWorld to reduce code duplication when asserting header fields. - Updated existing assertion methods to use the new helpers, improving readability. - Introduced DebugDisplay wrapper for cleaner debug output in test assertions. - Minor style improvements such as using fmt::Display instead of fmt::Display imported as Display. These changes improve test maintainability and clarity without altering test logic. Co-authored-by: terragon-labs[bot] --- src/message_assembler/tests.rs | 152 +++++++++++++++--------------- tests/worlds/message_assembler.rs | 95 +++++++++++-------- 2 files changed, 135 insertions(+), 112 deletions(-) diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index fbfdb0a4..8f91a4e6 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -73,80 +73,7 @@ impl MessageAssembler for TestAssembler { #[test] fn parse_frame_headers() { - let cases = vec![ - HeaderCase { - name: "first frame without total", - build_payload: Box::new(|| { - build_first_header_payload(FirstHeaderSpec { - flags: 0b0, - message_key: 9, - metadata_len: 2, - body_len: 12, - total_body_len: None, - }) - }), - expected_header: FrameHeader::First(FirstFrameHeader { - message_key: MessageKey(9), - metadata_len: 2, - body_len: 12, - total_body_len: None, - is_last: false, - }), - }, - HeaderCase { - name: "first frame with total and last", - build_payload: Box::new(|| { - build_first_header_payload(FirstHeaderSpec { - flags: 0b11, - message_key: 42, - metadata_len: 0, - body_len: 8, - total_body_len: Some(64), - }) - }), - expected_header: FrameHeader::First(FirstFrameHeader { - message_key: MessageKey(42), - metadata_len: 0, - body_len: 8, - total_body_len: Some(64), - is_last: true, - }), - }, - HeaderCase { - name: "continuation frame with sequence", - build_payload: Box::new(|| { - build_continuation_header_payload(ContinuationHeaderSpec { - flags: 0b10, - message_key: 7, - body_len: 16, - sequence: Some(3), - }) - }), - expected_header: FrameHeader::Continuation(ContinuationFrameHeader { - message_key: MessageKey(7), - sequence: Some(FrameSequence(3)), - body_len: 16, - is_last: false, - }), - }, - HeaderCase { - name: "continuation frame without sequence", - build_payload: Box::new(|| { - build_continuation_header_payload(ContinuationHeaderSpec { - flags: 0b1, - message_key: 11, - body_len: 5, - sequence: None, - }) - }), - expected_header: FrameHeader::Continuation(ContinuationFrameHeader { - message_key: MessageKey(11), - sequence: None, - body_len: 5, - is_last: true, - }), - }, - ]; + let cases = build_test_cases(); for case in cases { let payload = (case.build_payload)(); @@ -255,6 +182,83 @@ fn parse_header(payload: &[u8]) -> ParsedFrameHeader { .expect("header parse") } +fn build_test_cases() -> Vec { + vec![ + HeaderCase { + name: "first frame without total", + build_payload: Box::new(|| { + build_first_header_payload(FirstHeaderSpec { + flags: 0b0, + message_key: 9, + metadata_len: 2, + body_len: 12, + total_body_len: None, + }) + }), + expected_header: FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(9), + metadata_len: 2, + body_len: 12, + total_body_len: None, + is_last: false, + }), + }, + HeaderCase { + name: "first frame with total and last", + build_payload: Box::new(|| { + build_first_header_payload(FirstHeaderSpec { + flags: 0b11, + message_key: 42, + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + }) + }), + expected_header: FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(42), + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + is_last: true, + }), + }, + HeaderCase { + name: "continuation frame with sequence", + build_payload: Box::new(|| { + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b10, + message_key: 7, + body_len: 16, + sequence: Some(3), + }) + }), + expected_header: FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(7), + sequence: Some(FrameSequence(3)), + body_len: 16, + is_last: false, + }), + }, + HeaderCase { + name: "continuation frame without sequence", + build_payload: Box::new(|| { + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b1, + message_key: 11, + body_len: 5, + sequence: None, + }) + }), + expected_header: FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(11), + sequence: None, + body_len: 5, + is_last: true, + }), + }, + ] +} + struct HeaderCase { name: &'static str, build_payload: Box Vec>, diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index 87e88646..336f3eb4 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -1,7 +1,7 @@ //! Test world for message assembler header parsing. #![cfg(not(loom))] -use std::{fmt::Display, io}; +use std::{fmt, io}; use bytes::{Buf, BufMut, BytesMut}; use cucumber::World; @@ -56,7 +56,7 @@ pub struct MessageAssemblerWorld { impl MessageAssemblerWorld { fn assert_common_field(&self, field: &str, expected: T, extractor: F) -> TestResult where - T: PartialEq + Display, + T: PartialEq + fmt::Display, F: FnOnce(&FrameHeader) -> T, { let parsed = self.parsed.as_ref().ok_or("no parsed header")?; @@ -67,6 +67,43 @@ impl MessageAssemblerWorld { Ok(()) } + fn assert_first_field(&self, field_name: &str, expected: T, extractor: F) -> TestResult + where + T: PartialEq + fmt::Display, + F: FnOnce(&FirstFrameHeader) -> T, + { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let FrameHeader::First(header) = parsed.header() else { + return Err("expected first header".into()); + }; + let actual = extractor(header); + if actual != expected { + return Err(format!("expected {field_name} {expected}, got {actual}").into()); + } + Ok(()) + } + + fn assert_continuation_field( + &self, + field_name: &str, + expected: T, + extractor: F, + ) -> TestResult + where + T: PartialEq + fmt::Display, + F: FnOnce(&ContinuationFrameHeader) -> T, + { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let FrameHeader::Continuation(header) = parsed.header() else { + return Err("expected continuation header".into()); + }; + let actual = extractor(header); + if actual != expected { + return Err(format!("expected {field_name} {expected}, got {actual}").into()); + } + Ok(()) + } + /// Store an encoded first-frame header in the world payload. /// /// # Errors @@ -182,18 +219,7 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the metadata length differs. pub fn assert_metadata_len(&self, expected: usize) -> TestResult { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let FrameHeader::First(header) = parsed.header() else { - return Err("expected first header".into()); - }; - if header.metadata_len != expected { - return Err(format!( - "expected metadata length {expected}, got {}", - header.metadata_len - ) - .into()); - } - Ok(()) + self.assert_first_field("metadata length", expected, |header| header.metadata_len) } /// Assert that the parsed header contains the expected body length. @@ -214,18 +240,11 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the total length differs. pub fn assert_total_len(&self, expected: Option) -> TestResult { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let FrameHeader::First(header) = parsed.header() else { - return Err("expected first header".into()); - }; - if header.total_body_len != expected { - return Err(format!( - "expected total length {:?}, got {:?}", - expected, header.total_body_len - ) - .into()); - } - Ok(()) + self.assert_first_field( + "total length", + DebugDisplay(expected), + |header| DebugDisplay(header.total_body_len), + ) } /// Assert that the parsed header contains the expected sequence. @@ -234,19 +253,12 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the sequence differs. pub fn assert_sequence(&self, expected: Option) -> TestResult { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let FrameHeader::Continuation(header) = parsed.header() else { - return Err("expected continuation header".into()); - }; let expected = expected.map(FrameSequence::from); - if header.sequence != expected { - return Err(format!( - "expected sequence {:?}, got {:?}", - expected, header.sequence - ) - .into()); - } - Ok(()) + self.assert_continuation_field( + "sequence", + DebugDisplay(expected), + |header| DebugDisplay(header.sequence), + ) } /// Assert that the parsed header matches the expected `is_last` flag. @@ -275,6 +287,13 @@ impl MessageAssemblerWorld { } } +#[derive(Clone, Copy, Debug, PartialEq)] +struct DebugDisplay(T); + +impl fmt::Display for DebugDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self.0) } +} + struct TestAssembler; impl MessageAssembler for TestAssembler { From 565c053727b826f74190eea1829c8397303b09b6 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 5 Jan 2026 15:41:46 +0000 Subject: [PATCH 06/14] refactor(tests): refine message assembler tests and assertion methods - Split header test cases into first frame and continuation frame sets for clarity - Update assertion helper methods to take expected values by reference with Copy trait - Improve formatting and matching of test assertions for parsed headers - Enhance test readability and maintainability by clearer separation of test concerns Co-authored-by: terragon-labs[bot] --- src/message_assembler/tests.rs | 18 ++++++++++--- tests/worlds/message_assembler.rs | 44 +++++++++++++++---------------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 8f91a4e6..4d5120fb 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -73,12 +73,19 @@ impl MessageAssembler for TestAssembler { #[test] fn parse_frame_headers() { - let cases = build_test_cases(); + let mut cases = Vec::new(); + cases.extend(build_first_frame_test_cases()); + cases.extend(build_continuation_frame_test_cases()); for case in cases { let payload = (case.build_payload)(); let parsed = parse_header(&payload); - assert_eq!(parsed.header(), &case.expected_header, "case: {}", case.name); + assert_eq!( + parsed.header(), + &case.expected_header, + "case: {}", + case.name + ); assert_eq!(parsed.header_len(), payload.len(), "case: {}", case.name); } } @@ -182,7 +189,7 @@ fn parse_header(payload: &[u8]) -> ParsedFrameHeader { .expect("header parse") } -fn build_test_cases() -> Vec { +fn build_first_frame_test_cases() -> Vec { vec![ HeaderCase { name: "first frame without total", @@ -222,6 +229,11 @@ fn build_test_cases() -> Vec { is_last: true, }), }, + ] +} + +fn build_continuation_frame_test_cases() -> Vec { + vec![ HeaderCase { name: "continuation frame with sequence", build_payload: Box::new(|| { diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index 336f3eb4..09ca2039 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -54,22 +54,22 @@ pub struct MessageAssemblerWorld { } impl MessageAssemblerWorld { - fn assert_common_field(&self, field: &str, expected: T, extractor: F) -> TestResult + fn assert_common_field(&self, field: &str, expected: &T, extractor: F) -> TestResult where - T: PartialEq + fmt::Display, + T: PartialEq + fmt::Display + Copy, F: FnOnce(&FrameHeader) -> T, { let parsed = self.parsed.as_ref().ok_or("no parsed header")?; let actual = extractor(parsed.header()); - if actual != expected { + if actual != *expected { return Err(format!("expected {field} {expected}, got {actual}").into()); } Ok(()) } - fn assert_first_field(&self, field_name: &str, expected: T, extractor: F) -> TestResult + fn assert_first_field(&self, field_name: &str, expected: &T, extractor: F) -> TestResult where - T: PartialEq + fmt::Display, + T: PartialEq + fmt::Display + Copy, F: FnOnce(&FirstFrameHeader) -> T, { let parsed = self.parsed.as_ref().ok_or("no parsed header")?; @@ -77,7 +77,7 @@ impl MessageAssemblerWorld { return Err("expected first header".into()); }; let actual = extractor(header); - if actual != expected { + if actual != *expected { return Err(format!("expected {field_name} {expected}, got {actual}").into()); } Ok(()) @@ -86,11 +86,11 @@ impl MessageAssemblerWorld { fn assert_continuation_field( &self, field_name: &str, - expected: T, + expected: &T, extractor: F, ) -> TestResult where - T: PartialEq + fmt::Display, + T: PartialEq + fmt::Display + Copy, F: FnOnce(&ContinuationFrameHeader) -> T, { let parsed = self.parsed.as_ref().ok_or("no parsed header")?; @@ -98,7 +98,7 @@ impl MessageAssemblerWorld { return Err("expected continuation header".into()); }; let actual = extractor(header); - if actual != expected { + if actual != *expected { return Err(format!("expected {field_name} {expected}, got {actual}").into()); } Ok(()) @@ -207,7 +207,7 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the key does not match. pub fn assert_message_key(&self, expected: u64) -> TestResult { - self.assert_common_field("key", expected, |header| match header { + self.assert_common_field("key", &expected, |header| match header { FrameHeader::First(header) => u64::from(header.message_key), FrameHeader::Continuation(header) => u64::from(header.message_key), }) @@ -219,7 +219,7 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the metadata length differs. pub fn assert_metadata_len(&self, expected: usize) -> TestResult { - self.assert_first_field("metadata length", expected, |header| header.metadata_len) + self.assert_first_field("metadata length", &expected, |header| header.metadata_len) } /// Assert that the parsed header contains the expected body length. @@ -228,7 +228,7 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the body length differs. pub fn assert_body_len(&self, expected: usize) -> TestResult { - self.assert_common_field("body length", expected, |header| match header { + self.assert_common_field("body length", &expected, |header| match header { FrameHeader::First(header) => header.body_len, FrameHeader::Continuation(header) => header.body_len, }) @@ -240,11 +240,10 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the total length differs. pub fn assert_total_len(&self, expected: Option) -> TestResult { - self.assert_first_field( - "total length", - DebugDisplay(expected), - |header| DebugDisplay(header.total_body_len), - ) + let expected = DebugDisplay(expected); + self.assert_first_field("total length", &expected, |header| { + DebugDisplay(header.total_body_len) + }) } /// Assert that the parsed header contains the expected sequence. @@ -254,11 +253,10 @@ impl MessageAssemblerWorld { /// Returns an error if no header was parsed or the sequence differs. pub fn assert_sequence(&self, expected: Option) -> TestResult { let expected = expected.map(FrameSequence::from); - self.assert_continuation_field( - "sequence", - DebugDisplay(expected), - |header| DebugDisplay(header.sequence), - ) + let expected = DebugDisplay(expected); + self.assert_continuation_field("sequence", &expected, |header| { + DebugDisplay(header.sequence) + }) } /// Assert that the parsed header matches the expected `is_last` flag. @@ -267,7 +265,7 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the flag differs. pub fn assert_is_last(&self, expected: bool) -> TestResult { - self.assert_common_field("is_last", expected, |header| match header { + self.assert_common_field("is_last", &expected, |header| match header { FrameHeader::First(header) => header.is_last, FrameHeader::Continuation(header) => header.is_last, }) From 68221e696eeb24bf0f3baf699c6702ab3c0e6818 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 5 Jan 2026 19:21:21 +0000 Subject: [PATCH 07/14] refactor(tests): introduce helper functions to reduce duplication in step defs Refactored message_assembler_steps.rs test code by adding helper functions to build first and continuation header specs. Updated step definitions to use these helpers, decreasing code duplication and improving readability. Co-authored-by: terragon-labs[bot] --- tests/steps/message_assembler_steps.rs | 76 +++++++++++++++++--------- 1 file changed, 51 insertions(+), 25 deletions(-) diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs index bbfe5a33..d6e3982b 100644 --- a/tests/steps/message_assembler_steps.rs +++ b/tests/steps/message_assembler_steps.rs @@ -10,20 +10,62 @@ const FLAG_LAST: bool = true; const NO_SEQUENCE: Option = None; const NO_TOTAL_LEN: Option = None; -#[given(expr = "a first frame header with key {int} metadata length {int} body length {int}")] -fn given_first_header( - world: &mut MessageAssemblerWorld, +// Helper builders to reduce duplication in step definitions +fn first_header_without_total( key: u64, metadata_len: usize, body_len: usize, -) -> crate::world::TestResult { - world.set_first_header(FirstHeaderSpec { +) -> FirstHeaderSpec { + FirstHeaderSpec { key, metadata_len, body_len, total_len: NO_TOTAL_LEN, is_last: FLAG_NONE, - }) + } +} + +fn first_header_with_total(key: u64, body_len: usize, total_len: usize) -> FirstHeaderSpec { + FirstHeaderSpec { + key, + metadata_len: DEFAULT_METADATA_LEN, + body_len, + total_len: Some(total_len), + is_last: FLAG_LAST, + } +} + +fn continuation_header_with_sequence( + key: u64, + body_len: usize, + sequence: u32, +) -> ContinuationHeaderSpec { + ContinuationHeaderSpec { + key, + body_len, + sequence: Some(sequence), + is_last: FLAG_NONE, + } +} + +fn continuation_header_without_sequence(key: u64, body_len: usize) -> ContinuationHeaderSpec { + ContinuationHeaderSpec { + key, + body_len, + sequence: NO_SEQUENCE, + is_last: FLAG_LAST, + } +} + +// Cucumber step definitions +#[given(expr = "a first frame header with key {int} metadata length {int} body length {int}")] +fn given_first_header( + world: &mut MessageAssemblerWorld, + key: u64, + metadata_len: usize, + body_len: usize, +) -> crate::world::TestResult { + world.set_first_header(first_header_without_total(key, metadata_len, body_len)) } #[given(expr = "a first frame header with key {int} body length {int} total {int}")] @@ -33,13 +75,7 @@ fn given_first_header_with_total( body_len: usize, total_len: usize, ) -> crate::world::TestResult { - world.set_first_header(FirstHeaderSpec { - key, - metadata_len: DEFAULT_METADATA_LEN, - body_len, - total_len: Some(total_len), - is_last: FLAG_LAST, - }) + world.set_first_header(first_header_with_total(key, body_len, total_len)) } #[given(expr = "a continuation header with key {int} body length {int} sequence {int}")] @@ -49,12 +85,7 @@ fn given_continuation_header_with_sequence( body_len: usize, sequence: u32, ) -> crate::world::TestResult { - world.set_continuation_header(ContinuationHeaderSpec { - key, - body_len, - sequence: Some(sequence), - is_last: FLAG_NONE, - }) + world.set_continuation_header(continuation_header_with_sequence(key, body_len, sequence)) } #[given(expr = "a continuation header with key {int} body length {int}")] @@ -63,12 +94,7 @@ fn given_continuation_header( key: u64, body_len: usize, ) -> crate::world::TestResult { - world.set_continuation_header(ContinuationHeaderSpec { - key, - body_len, - sequence: NO_SEQUENCE, - is_last: FLAG_LAST, - }) + world.set_continuation_header(continuation_header_without_sequence(key, body_len)) } #[given("an invalid message header")] From 74c50d738844c6c6920708e677272a49bf8b4167 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 5 Jan 2026 20:31:59 +0000 Subject: [PATCH 08/14] Apply formatting --- tests/steps/message_assembler_steps.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs index d6e3982b..fc64792f 100644 --- a/tests/steps/message_assembler_steps.rs +++ b/tests/steps/message_assembler_steps.rs @@ -11,11 +11,7 @@ const NO_SEQUENCE: Option = None; const NO_TOTAL_LEN: Option = None; // Helper builders to reduce duplication in step definitions -fn first_header_without_total( - key: u64, - metadata_len: usize, - body_len: usize, -) -> FirstHeaderSpec { +fn first_header_without_total(key: u64, metadata_len: usize, body_len: usize) -> FirstHeaderSpec { FirstHeaderSpec { key, metadata_len, From e1d2aa2293c4760ab1b724e0b7c26fa7ebb56e9f Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 6 Jan 2026 14:59:46 +0000 Subject: [PATCH 09/14] Refine message assembler tests - Share message assembler parsing helpers between unit and Cucumber tests. - Expand header parsing tests with rstest cases and body payload checks. - Add builder scenario coverage and reduce assertion duplication. - Return a borrowed message assembler arc from the app builder. --- .../8-2-1-message-assembler-hook-trait.md | 2 +- docs/users-guide.md | 3 + src/app/builder.rs | 4 +- src/lib.rs | 2 + src/message_assembler/tests.rs | 303 +++++++----------- tests/common/message_assembler.rs | 109 +++++++ tests/features/message_assembler.feature | 12 + tests/steps/message_assembler_steps.rs | 29 +- tests/worlds/message_assembler.rs | 147 ++++----- 9 files changed, 316 insertions(+), 295 deletions(-) create mode 100644 tests/common/message_assembler.rs diff --git a/docs/execplans/8-2-1-message-assembler-hook-trait.md b/docs/execplans/8-2-1-message-assembler-hook-trait.md index a17fd0d4..73dd3279 100644 --- a/docs/execplans/8-2-1-message-assembler-hook-trait.md +++ b/docs/execplans/8-2-1-message-assembler-hook-trait.md @@ -85,7 +85,7 @@ Streaming request types (`RequestParts`, `RequestBodyStream`, and the `StreamingBody` extractor) live in `src/request/mod.rs` and `src/extractor.rs`, but they are not yet wired into the connection loop. The `MessageAssembler` hook introduced here is the protocol-facing interface for that forthcoming -integration (see ADR 0002). +integration (see Architecture Decision Record (ADR) 0002). Key references: diff --git a/docs/users-guide.md b/docs/users-guide.md index 9dc73ab9..4f19247c 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -332,6 +332,9 @@ Note: the hook is stored on the application today but is wired into the inbound connection path in roadmap item 8.2.5. Until that integration lands, protocol crates can use the trait for shared parsing logic and tests. +`WireframeApp::message_assembler` returns the configured hook as an +`Option<&Arc>` if you need to access it directly. + ### Streaming request body consumption Handlers can opt into streaming request bodies using the `StreamingBody` diff --git a/src/app/builder.rs b/src/app/builder.rs index d4cfc7f8..0696f6d9 100644 --- a/src/app/builder.rs +++ b/src/app/builder.rs @@ -365,8 +365,8 @@ where /// assert!(app.message_assembler().is_some()); /// ``` #[must_use] - pub fn message_assembler(&self) -> Option> { - self.message_assembler.clone() + pub fn message_assembler(&self) -> Option<&Arc> { + self.message_assembler.as_ref() } /// Configure a Dead Letter Queue for dropped push frames. diff --git a/src/lib.rs b/src/lib.rs index cf5ff282..7a02a4c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ //! This crate provides building blocks for asynchronous binary protocol //! servers, including routing, middleware, and connection utilities. +extern crate self as wireframe; + pub mod app; pub mod byte_order; pub mod codec; diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 4d5120fb..b56bb652 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -2,7 +2,12 @@ use std::io; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{BufMut, BytesMut}; +use rstest::rstest; + +#[path = "../../tests/common/message_assembler.rs"] +mod message_assembler_helpers; +use message_assembler_helpers::TestAssembler; use super::{ ContinuationFrameHeader, @@ -14,80 +19,102 @@ use super::{ ParsedFrameHeader, }; -struct TestAssembler; - -impl MessageAssembler for TestAssembler { - fn parse_frame_header(&self, payload: &[u8]) -> Result { - let mut buf = payload; - let initial = buf.remaining(); - - let kind = take_u8(&mut buf)?; - let flags = take_u8(&mut buf)?; - let message_key = MessageKey::from(take_u64(&mut buf)?); - - let header = match kind { - 0x01 => { - let metadata_len = usize::from(take_u16(&mut buf)?); - let body_len = usize::try_from(take_u32(&mut buf)?) - .map_err(|_| invalid_data("body length too large"))?; - let total_body_len = if flags & 0b10 == 0b10 { - Some( - usize::try_from(take_u32(&mut buf)?) - .map_err(|_| invalid_data("total length too large"))?, - ) - } else { - None - }; - - FrameHeader::First(FirstFrameHeader { - message_key, - metadata_len, - body_len, - total_body_len, - is_last: flags & 0b1 == 0b1, - }) - } - 0x02 => { - let body_len = usize::try_from(take_u32(&mut buf)?) - .map_err(|_| invalid_data("body length too large"))?; - let sequence = if flags & 0b10 == 0b10 { - Some(FrameSequence::from(take_u32(&mut buf)?)) - } else { - None - }; - - FrameHeader::Continuation(ContinuationFrameHeader { - message_key, - sequence, - body_len, - is_last: flags & 0b1 == 0b1, - }) - } - _ => return Err(invalid_data("unknown header kind")), - }; - - let header_len = initial - buf.remaining(); - Ok(ParsedFrameHeader::new(header, header_len)) - } -} - -#[test] -fn parse_frame_headers() { - let mut cases = Vec::new(); - cases.extend(build_first_frame_test_cases()); - cases.extend(build_continuation_frame_test_cases()); - - for case in cases { - let payload = (case.build_payload)(); - let parsed = parse_header(&payload); - assert_eq!( - parsed.header(), - &case.expected_header, - "case: {}", - case.name - ); - assert_eq!(parsed.header_len(), payload.len(), "case: {}", case.name); - } +#[rstest] +#[case::first_frame_without_total( + "first frame without total", + build_first_header_payload(FirstHeaderSpec { + flags: 0b0, + message_key: 9, + metadata_len: 2, + body_len: 12, + total_body_len: None, + }), + FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(9), + metadata_len: 2, + body_len: 12, + total_body_len: None, + is_last: false, + }), +)] +#[case::first_frame_with_total_and_last( + "first frame with total and last", + build_first_header_payload(FirstHeaderSpec { + flags: 0b11, + message_key: 42, + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + }), + FrameHeader::First(FirstFrameHeader { + message_key: MessageKey(42), + metadata_len: 0, + body_len: 8, + total_body_len: Some(64), + is_last: true, + }), +)] +#[case::continuation_frame_with_sequence( + "continuation frame with sequence", + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b10, + message_key: 7, + body_len: 16, + sequence: Some(3), + }), + FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(7), + sequence: Some(FrameSequence(3)), + body_len: 16, + is_last: false, + }), +)] +#[case::continuation_frame_without_sequence( + "continuation frame without sequence", + build_continuation_header_payload(ContinuationHeaderSpec { + flags: 0b1, + message_key: 11, + body_len: 5, + sequence: None, + }), + FrameHeader::Continuation(ContinuationFrameHeader { + message_key: MessageKey(11), + sequence: None, + body_len: 5, + is_last: true, + }), +)] +fn parse_frame_headers( + #[case] case_name: &'static str, + #[case] payload: Vec, + #[case] expected_header: FrameHeader, +) { + let parsed = parse_header(&payload); + assert_eq!(parsed.header(), &expected_header, "case: {case_name}"); + assert_eq!( + parsed.header_len(), + payload.len(), + "case (header-only payload): {case_name}" + ); + + let mut payload_with_body = payload.clone(); + payload_with_body.extend_from_slice(&[0xaa, 0xbb, 0xcc]); + + let parsed_with_body = parse_header(&payload_with_body); + assert_eq!( + parsed_with_body.header(), + &expected_header, + "case (with body bytes): {case_name}" + ); + assert_eq!( + parsed_with_body.header_len(), + payload.len(), + "case (with body bytes, header_len mismatch): {case_name}" + ); + assert!( + parsed_with_body.header_len() < payload_with_body.len(), + "case (with body bytes, header_len not less than payload.len()): {case_name}" + ); } #[test] @@ -102,43 +129,17 @@ fn short_header_errors() { #[test] fn unknown_header_kind_errors() { - let payload = vec![0xff, 0x00, 0x00]; + let mut bytes = BytesMut::new(); + bytes.put_u8(0xff); + bytes.put_u8(0x00); + bytes.put_u64(0); + let payload = bytes.to_vec(); let err = TestAssembler .parse_frame_header(&payload) .expect_err("expected error"); assert_eq!(err.kind(), io::ErrorKind::InvalidData); -} - -fn take_u8(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 1)?; - Ok(buf.get_u8()) -} - -fn take_u16(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 2)?; - Ok(buf.get_u16()) -} - -fn take_u32(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 4)?; - Ok(buf.get_u32()) -} - -fn take_u64(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 8)?; - Ok(buf.get_u64()) -} - -fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { - if buf.remaining() < needed { - return Err(invalid_data("header too short")); - } - Ok(()) -} - -fn invalid_data(message: &'static str) -> io::Error { - io::Error::new(io::ErrorKind::InvalidData, message) + assert_eq!(err.to_string(), "unknown header kind"); } #[derive(Clone, Copy)] @@ -188,91 +189,3 @@ fn parse_header(payload: &[u8]) -> ParsedFrameHeader { .parse_frame_header(payload) .expect("header parse") } - -fn build_first_frame_test_cases() -> Vec { - vec![ - HeaderCase { - name: "first frame without total", - build_payload: Box::new(|| { - build_first_header_payload(FirstHeaderSpec { - flags: 0b0, - message_key: 9, - metadata_len: 2, - body_len: 12, - total_body_len: None, - }) - }), - expected_header: FrameHeader::First(FirstFrameHeader { - message_key: MessageKey(9), - metadata_len: 2, - body_len: 12, - total_body_len: None, - is_last: false, - }), - }, - HeaderCase { - name: "first frame with total and last", - build_payload: Box::new(|| { - build_first_header_payload(FirstHeaderSpec { - flags: 0b11, - message_key: 42, - metadata_len: 0, - body_len: 8, - total_body_len: Some(64), - }) - }), - expected_header: FrameHeader::First(FirstFrameHeader { - message_key: MessageKey(42), - metadata_len: 0, - body_len: 8, - total_body_len: Some(64), - is_last: true, - }), - }, - ] -} - -fn build_continuation_frame_test_cases() -> Vec { - vec![ - HeaderCase { - name: "continuation frame with sequence", - build_payload: Box::new(|| { - build_continuation_header_payload(ContinuationHeaderSpec { - flags: 0b10, - message_key: 7, - body_len: 16, - sequence: Some(3), - }) - }), - expected_header: FrameHeader::Continuation(ContinuationFrameHeader { - message_key: MessageKey(7), - sequence: Some(FrameSequence(3)), - body_len: 16, - is_last: false, - }), - }, - HeaderCase { - name: "continuation frame without sequence", - build_payload: Box::new(|| { - build_continuation_header_payload(ContinuationHeaderSpec { - flags: 0b1, - message_key: 11, - body_len: 5, - sequence: None, - }) - }), - expected_header: FrameHeader::Continuation(ContinuationFrameHeader { - message_key: MessageKey(11), - sequence: None, - body_len: 5, - is_last: true, - }), - }, - ] -} - -struct HeaderCase { - name: &'static str, - build_payload: Box Vec>, - expected_header: FrameHeader, -} diff --git a/tests/common/message_assembler.rs b/tests/common/message_assembler.rs new file mode 100644 index 00000000..57a39f93 --- /dev/null +++ b/tests/common/message_assembler.rs @@ -0,0 +1,109 @@ +//! Shared helpers for parsing message assembler headers in tests. + +use std::io; + +use bytes::Buf; +use wireframe::message_assembler::{ + ContinuationFrameHeader, + FirstFrameHeader, + FrameHeader, + FrameSequence, + MessageAssembler, + MessageKey, + ParsedFrameHeader, +}; + +/// Test-friendly message assembler implementation that shares parsing logic. +#[derive(Clone, Copy, Debug, Default)] +pub struct TestAssembler; + +impl MessageAssembler for TestAssembler { + fn parse_frame_header(&self, payload: &[u8]) -> Result { + parse_frame_header(payload) + } +} + +/// Parse a protocol-specific frame header for tests. +pub fn parse_frame_header(payload: &[u8]) -> Result { + let mut buf = payload; + let initial = buf.remaining(); + + let kind = take_u8(&mut buf)?; + let flags = take_u8(&mut buf)?; + let message_key = MessageKey::from(take_u64(&mut buf)?); + + let header = match kind { + 0x01 => { + let metadata_len = usize::from(take_u16(&mut buf)?); + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let total_body_len = if flags & 0b10 == 0b10 { + Some( + usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("total length too large"))?, + ) + } else { + None + }; + + FrameHeader::First(FirstFrameHeader { + message_key, + metadata_len, + body_len, + total_body_len, + is_last: flags & 0b1 == 0b1, + }) + } + 0x02 => { + let body_len = usize::try_from(take_u32(&mut buf)?) + .map_err(|_| invalid_data("body length too large"))?; + let sequence = if flags & 0b10 == 0b10 { + Some(FrameSequence::from(take_u32(&mut buf)?)) + } else { + None + }; + + FrameHeader::Continuation(ContinuationFrameHeader { + message_key, + sequence, + body_len, + is_last: flags & 0b1 == 0b1, + }) + } + _ => return Err(invalid_data("unknown header kind")), + }; + + let header_len = initial - buf.remaining(); + Ok(ParsedFrameHeader::new(header, header_len)) +} + +fn take_u8(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 1)?; + Ok(buf.get_u8()) +} + +fn take_u16(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 2)?; + Ok(buf.get_u16()) +} + +fn take_u32(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 4)?; + Ok(buf.get_u32()) +} + +fn take_u64(buf: &mut &[u8]) -> Result { + ensure_remaining(buf, 8)?; + Ok(buf.get_u64()) +} + +fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { + if buf.remaining() < needed { + return Err(invalid_data("header too short")); + } + Ok(()) +} + +fn invalid_data(message: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, message) +} diff --git a/tests/features/message_assembler.feature b/tests/features/message_assembler.feature index 9e54043c..c5ff4c3b 100644 --- a/tests/features/message_assembler.feature +++ b/tests/features/message_assembler.feature @@ -3,6 +3,18 @@ Feature: Message assembler header parsing parsing. The hook distinguishes first frames from continuation frames and exposes key metadata needed for assembly. + Scenario: Builder exposes a configured message assembler + Given a wireframe app with a message assembler + And a first frame header with key 9 metadata length 2 body length 12 + When the message assembler parses the header + Then the app exposes a message assembler + And the parsed header is first + And the message key is 9 + And the metadata length is 2 + And the body length is 12 + And the total body length is absent + And the frame is marked last false + Scenario: Parsing a first frame header without total length Given a first frame header with key 9 metadata length 2 body length 12 When the message assembler parses the header diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs index fc64792f..d69e5e62 100644 --- a/tests/steps/message_assembler_steps.rs +++ b/tests/steps/message_assembler_steps.rs @@ -93,6 +93,11 @@ fn given_continuation_header( world.set_continuation_header(continuation_header_without_sequence(key, body_len)) } +#[given("a wireframe app with a message assembler")] +fn given_app_with_message_assembler(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { + world.set_app_with_message_assembler() +} + #[given("an invalid message header")] fn given_invalid_header(world: &mut MessageAssemblerWorld) { world.set_invalid_payload(); } @@ -102,10 +107,12 @@ fn when_parsing(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { } #[then(expr = "the parsed header is {word}")] +#[expect( + clippy::needless_pass_by_value, + reason = "cucumber hands {word} captures to step functions as owned strings" +)] fn then_header_kind(world: &mut MessageAssemblerWorld, kind: String) -> crate::world::TestResult { - let result = world.assert_header_kind(&kind); - drop(kind); - result + world.assert_header_kind(&kind) } #[then(expr = "the message key is {int}")] @@ -147,14 +154,22 @@ fn then_sequence_absent(world: &mut MessageAssemblerWorld) -> crate::world::Test } #[then(expr = "the frame is marked last {word}")] +#[expect( + clippy::needless_pass_by_value, + reason = "cucumber hands {word} captures to step functions as owned strings" +)] fn then_is_last(world: &mut MessageAssemblerWorld, expected: String) -> crate::world::TestResult { - let expected_str = expected; - let expected = expected_str == "true"; - drop(expected_str); - world.assert_is_last(expected) + world.assert_is_last(expected == "true") } #[then("the parse fails with invalid data")] fn then_invalid_data(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { world.assert_invalid_data_error() } + +#[then("the app exposes a message assembler")] +fn then_app_exposes_message_assembler( + world: &mut MessageAssemblerWorld, +) -> crate::world::TestResult { + world.assert_message_assembler_configured() +} diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index 09ca2039..c1456037 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -3,7 +3,7 @@ use std::{fmt, io}; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{BufMut, BytesMut}; use cucumber::World; use wireframe::message_assembler::{ ContinuationFrameHeader, @@ -11,11 +11,13 @@ use wireframe::message_assembler::{ FrameHeader, FrameSequence, MessageAssembler, - MessageKey, ParsedFrameHeader, }; -use super::TestResult; +use super::{TestApp, TestResult}; +#[path = "../common/message_assembler.rs"] +mod message_assembler_helpers; +use message_assembler_helpers::TestAssembler; /// Specification for first-frame header encoding used in tests. #[derive(Debug, Clone, Copy)] @@ -46,11 +48,26 @@ pub struct ContinuationHeaderSpec { } /// World used by Cucumber to test message assembler header parsing. -#[derive(Debug, Default, World)] +#[derive(Default, World)] pub struct MessageAssemblerWorld { payload: Option>, parsed: Option, error: Option, + app: Option, +} + +impl fmt::Debug for MessageAssemblerWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MessageAssemblerWorld") + .field("payload", &self.payload) + .field("parsed", &self.parsed) + .field("error", &self.error) + .field( + "app", + &self.app.as_ref().map(|_| "wireframe::app::WireframeApp"), + ) + .finish() + } } impl MessageAssemblerWorld { @@ -170,7 +187,15 @@ impl MessageAssemblerWorld { /// Returns an error if no payload has been configured. pub fn parse_header(&mut self) -> TestResult { let payload = self.payload.as_deref().ok_or("payload not set")?; - match TestAssembler.parse_frame_header(payload) { + let fallback = TestAssembler; + let assembler: &dyn MessageAssembler = match self.app.as_ref() { + Some(app) => app + .message_assembler() + .ok_or("message assembler not set")? + .as_ref(), + None => &fallback, + }; + match assembler.parse_frame_header(payload) { Ok(parsed) => { self.parsed = Some(parsed); self.error = None; @@ -283,6 +308,33 @@ impl MessageAssemblerWorld { } Ok(()) } + + /// Store a wireframe app configured with a test message assembler. + /// + /// # Errors + /// + /// Returns an error if the app builder fails. + pub fn set_app_with_message_assembler(&mut self) -> TestResult { + let app = TestApp::new() + .map_err(|err| format!("failed to build app: {err}"))? + .with_message_assembler(TestAssembler); + self.app = Some(app); + Ok(()) + } + + /// Assert that the app exposes a message assembler. + /// + /// # Errors + /// + /// Returns an error if the app or assembler is missing. + pub fn assert_message_assembler_configured(&self) -> TestResult { + let app = self.app.as_ref().ok_or("app not set")?; + if app.message_assembler().is_some() { + Ok(()) + } else { + Err("expected message assembler".into()) + } + } } #[derive(Clone, Copy, Debug, PartialEq)] @@ -291,88 +343,3 @@ struct DebugDisplay(T); impl fmt::Display for DebugDisplay { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self.0) } } - -struct TestAssembler; - -impl MessageAssembler for TestAssembler { - fn parse_frame_header(&self, payload: &[u8]) -> Result { - let mut buf = payload; - let initial = buf.remaining(); - let kind = take_u8(&mut buf)?; - let flags = take_u8(&mut buf)?; - let message_key = MessageKey::from(take_u64(&mut buf)?); - - let header = match kind { - 0x01 => { - let metadata_len = usize::from(take_u16(&mut buf)?); - let body_len = usize::try_from(take_u32(&mut buf)?) - .map_err(|_| invalid_data("body length too large"))?; - let total_body_len = if flags & 0b10 == 0b10 { - Some( - usize::try_from(take_u32(&mut buf)?) - .map_err(|_| invalid_data("total length too large"))?, - ) - } else { - None - }; - FrameHeader::First(FirstFrameHeader { - message_key, - metadata_len, - body_len, - total_body_len, - is_last: flags & 0b1 == 0b1, - }) - } - 0x02 => { - let body_len = usize::try_from(take_u32(&mut buf)?) - .map_err(|_| invalid_data("body length too large"))?; - let sequence = if flags & 0b10 == 0b10 { - Some(FrameSequence::from(take_u32(&mut buf)?)) - } else { - None - }; - FrameHeader::Continuation(ContinuationFrameHeader { - message_key, - sequence, - body_len, - is_last: flags & 0b1 == 0b1, - }) - } - _ => return Err(invalid_data("unknown header kind")), - }; - - let header_len = initial - buf.remaining(); - Ok(ParsedFrameHeader::new(header, header_len)) - } -} - -fn take_u8(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 1)?; - Ok(buf.get_u8()) -} - -fn take_u16(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 2)?; - Ok(buf.get_u16()) -} - -fn take_u32(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 4)?; - Ok(buf.get_u32()) -} - -fn take_u64(buf: &mut &[u8]) -> Result { - ensure_remaining(buf, 8)?; - Ok(buf.get_u64()) -} - -fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { - if buf.remaining() < needed { - return Err(invalid_data("header too short")); - } - Ok(()) -} - -fn invalid_data(message: &'static str) -> io::Error { - io::Error::new(io::ErrorKind::InvalidData, message) -} From 2e8fc88738c6df166778118328041c500e4eb351 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 6 Jan 2026 15:08:00 +0000 Subject: [PATCH 10/14] test(message_assembler): add assertion on error message for short header errors Add a check that the error string equals "header too short" in the short_header_errors test to improve test completeness. Co-authored-by: terragon-labs[bot] --- src/message_assembler/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index b56bb652..1dcb7b3d 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -125,6 +125,7 @@ fn short_header_errors() { .expect_err("expected error"); assert_eq!(err.kind(), io::ErrorKind::InvalidData); + assert_eq!(err.to_string(), "header too short"); } #[test] From aa272caa0007190aececc2136f818630700327ae Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 6 Jan 2026 16:34:19 +0000 Subject: [PATCH 11/14] Update code Co-authored-by: terragon-labs[bot] --- Cargo.lock | 1 + Cargo.toml | 2 ++ src/lib.rs | 2 ++ src/message_assembler/tests.rs | 4 +--- tests/common/message_assembler.rs => src/test_helpers.rs | 6 ++++-- tests/worlds/message_assembler.rs | 4 +--- 6 files changed, 11 insertions(+), 8 deletions(-) rename tests/common/message_assembler.rs => src/test_helpers.rs (96%) diff --git a/Cargo.lock b/Cargo.lock index 91bccf75..9f7d4680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3009,6 +3009,7 @@ dependencies = [ "tracing", "tracing-subscriber", "tracing-test", + "wireframe", "wireframe_testing", ] diff --git a/Cargo.toml b/Cargo.toml index 130df8b1..0c01ddb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ socket2 = "0.6.0" [dev-dependencies] rstest = "0.26.1" +wireframe = { path = ".", features = ["test-helpers"] } wireframe_testing = { path = "./wireframe_testing" } logtest = "2.0.0" proptest = "1.7.0" @@ -85,6 +86,7 @@ advanced-tests = [] examples = [] cucumber-tests = [] test-support = [] +test-helpers = [] [lints.clippy] pedantic = { level = "warn", priority = -1 } diff --git a/src/lib.rs b/src/lib.rs index 7a02a4c8..38e03cb8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,8 @@ pub mod rewind_stream; #[cfg(not(loom))] pub mod server; pub mod session; +#[cfg(any(test, feature = "test-helpers"))] +pub mod test_helpers; pub use client::{ClientCodecConfig, ClientError, SocketOptions, WireframeClient}; pub use connection::ConnectionActor; diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 1dcb7b3d..7ef49692 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -5,9 +5,7 @@ use std::io; use bytes::{BufMut, BytesMut}; use rstest::rstest; -#[path = "../../tests/common/message_assembler.rs"] -mod message_assembler_helpers; -use message_assembler_helpers::TestAssembler; +use crate::test_helpers::TestAssembler; use super::{ ContinuationFrameHeader, diff --git a/tests/common/message_assembler.rs b/src/test_helpers.rs similarity index 96% rename from tests/common/message_assembler.rs rename to src/test_helpers.rs index 57a39f93..c2a85ff3 100644 --- a/tests/common/message_assembler.rs +++ b/src/test_helpers.rs @@ -1,9 +1,11 @@ -//! Shared helpers for parsing message assembler headers in tests. +#![cfg(any(test, feature = "test-helpers"))] +//! Test-only helpers for shared test utilities. use std::io; use bytes::Buf; -use wireframe::message_assembler::{ + +use crate::message_assembler::{ ContinuationFrameHeader, FirstFrameHeader, FrameHeader, diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index c1456037..c73ececa 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -15,9 +15,7 @@ use wireframe::message_assembler::{ }; use super::{TestApp, TestResult}; -#[path = "../common/message_assembler.rs"] -mod message_assembler_helpers; -use message_assembler_helpers::TestAssembler; +use wireframe::test_helpers::TestAssembler; /// Specification for first-frame header encoding used in tests. #[derive(Debug, Clone, Copy)] From d155fdc23255290adbe74c9904c83ded65f90129 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 6 Jan 2026 16:40:13 +0000 Subject: [PATCH 12/14] refactor(tests): consolidate header field assertions into a generic helper Replaced separate assert_first_field and assert_continuation_field methods with a single generic assert_header_field method that handles both header types. This simplifies the test code and improves maintainability by centralizing header type validation and field extraction logic. No functional changes introduced. Co-authored-by: terragon-labs[bot] --- tests/worlds/message_assembler.rs | 60 ++++++++++++++----------------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index c73ececa..a6c7260e 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -6,8 +6,6 @@ use std::{fmt, io}; use bytes::{BufMut, BytesMut}; use cucumber::World; use wireframe::message_assembler::{ - ContinuationFrameHeader, - FirstFrameHeader, FrameHeader, FrameSequence, MessageAssembler, @@ -82,37 +80,17 @@ impl MessageAssemblerWorld { Ok(()) } - fn assert_first_field(&self, field_name: &str, expected: &T, extractor: F) -> TestResult - where - T: PartialEq + fmt::Display + Copy, - F: FnOnce(&FirstFrameHeader) -> T, - { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let FrameHeader::First(header) = parsed.header() else { - return Err("expected first header".into()); - }; - let actual = extractor(header); - if actual != *expected { - return Err(format!("expected {field_name} {expected}, got {actual}").into()); - } - Ok(()) - } - - fn assert_continuation_field( - &self, - field_name: &str, - expected: &T, - extractor: F, - ) -> TestResult + /// Generic helper for asserting header-type-specific fields. + /// + /// The extractor performs both type-checking (via pattern matching) and field + /// extraction, returning an error message if the header type is incorrect. + fn assert_header_field(&self, field_name: &str, expected: &T, extractor: F) -> TestResult where T: PartialEq + fmt::Display + Copy, - F: FnOnce(&ContinuationFrameHeader) -> T, + F: FnOnce(&FrameHeader) -> Result, { let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let FrameHeader::Continuation(header) = parsed.header() else { - return Err("expected continuation header".into()); - }; - let actual = extractor(header); + let actual = extractor(parsed.header()).map_err(|err| err.to_string())?; if actual != *expected { return Err(format!("expected {field_name} {expected}, got {actual}").into()); } @@ -242,7 +220,13 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the metadata length differs. pub fn assert_metadata_len(&self, expected: usize) -> TestResult { - self.assert_first_field("metadata length", &expected, |header| header.metadata_len) + self.assert_header_field("metadata length", &expected, |header| { + if let FrameHeader::First(header) = header { + Ok(header.metadata_len) + } else { + Err("expected first header") + } + }) } /// Assert that the parsed header contains the expected body length. @@ -264,8 +248,12 @@ impl MessageAssemblerWorld { /// Returns an error if no header was parsed or the total length differs. pub fn assert_total_len(&self, expected: Option) -> TestResult { let expected = DebugDisplay(expected); - self.assert_first_field("total length", &expected, |header| { - DebugDisplay(header.total_body_len) + self.assert_header_field("total length", &expected, |header| { + if let FrameHeader::First(header) = header { + Ok(DebugDisplay(header.total_body_len)) + } else { + Err("expected first header") + } }) } @@ -277,8 +265,12 @@ impl MessageAssemblerWorld { pub fn assert_sequence(&self, expected: Option) -> TestResult { let expected = expected.map(FrameSequence::from); let expected = DebugDisplay(expected); - self.assert_continuation_field("sequence", &expected, |header| { - DebugDisplay(header.sequence) + self.assert_header_field("sequence", &expected, |header| { + if let FrameHeader::Continuation(header) = header { + Ok(DebugDisplay(header.sequence)) + } else { + Err("expected continuation header") + } }) } From 19798f4a8aec68aa79415c50f76986bd11efb54b Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 6 Jan 2026 17:03:48 +0000 Subject: [PATCH 13/14] test(message_assembler): add header length field checks to message assembler tests - Introduce header length assertions in the Cucumber test suite. - Enhance MessageAssemblerWorld with assert_header_len to verify parsed header length. - Update feature files and steps to include header length expectations. - Refactor test payload setup in worlds/message_assembler.rs for cleaner header encoding. - Minor reordering and imports cleanup in test modules. Co-authored-by: terragon-labs[bot] --- src/message_assembler/tests.rs | 3 +- src/test_helpers.rs | 4 + tests/features/message_assembler.feature | 5 + tests/steps/message_assembler_steps.rs | 8 ++ tests/worlds/message_assembler.rs | 138 ++++++++++++++--------- 5 files changed, 103 insertions(+), 55 deletions(-) diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 7ef49692..1ea199eb 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -5,8 +5,6 @@ use std::io; use bytes::{BufMut, BytesMut}; use rstest::rstest; -use crate::test_helpers::TestAssembler; - use super::{ ContinuationFrameHeader, FirstFrameHeader, @@ -16,6 +14,7 @@ use super::{ MessageKey, ParsedFrameHeader, }; +use crate::test_helpers::TestAssembler; #[rstest] #[case::first_frame_without_total( diff --git a/src/test_helpers.rs b/src/test_helpers.rs index c2a85ff3..8fe5641a 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -26,6 +26,10 @@ impl MessageAssembler for TestAssembler { } /// Parse a protocol-specific frame header for tests. +/// +/// # Errors +/// +/// Returns an error if the payload is too short or contains an invalid header. pub fn parse_frame_header(payload: &[u8]) -> Result { let mut buf = payload; let initial = buf.remaining(); diff --git a/tests/features/message_assembler.feature b/tests/features/message_assembler.feature index c5ff4c3b..98597a37 100644 --- a/tests/features/message_assembler.feature +++ b/tests/features/message_assembler.feature @@ -12,6 +12,7 @@ Feature: Message assembler header parsing And the message key is 9 And the metadata length is 2 And the body length is 12 + And the header length is 16 And the total body length is absent And the frame is marked last false @@ -22,6 +23,7 @@ Feature: Message assembler header parsing And the message key is 9 And the metadata length is 2 And the body length is 12 + And the header length is 16 And the total body length is absent And the frame is marked last false @@ -32,6 +34,7 @@ Feature: Message assembler header parsing And the message key is 42 And the metadata length is 0 And the body length is 8 + And the header length is 20 And the total body length is 64 And the frame is marked last true @@ -41,6 +44,7 @@ Feature: Message assembler header parsing Then the parsed header is continuation And the message key is 7 And the body length is 16 + And the header length is 18 And the sequence is 3 And the frame is marked last false @@ -50,6 +54,7 @@ Feature: Message assembler header parsing Then the parsed header is continuation And the message key is 11 And the body length is 5 + And the header length is 14 And the sequence is absent And the frame is marked last true diff --git a/tests/steps/message_assembler_steps.rs b/tests/steps/message_assembler_steps.rs index d69e5e62..c167cb23 100644 --- a/tests/steps/message_assembler_steps.rs +++ b/tests/steps/message_assembler_steps.rs @@ -133,6 +133,14 @@ fn then_body_len(world: &mut MessageAssemblerWorld, body_len: usize) -> crate::w world.assert_body_len(body_len) } +#[then(expr = "the header length is {int}")] +fn then_header_len( + world: &mut MessageAssemblerWorld, + header_len: usize, +) -> crate::world::TestResult { + world.assert_header_len(header_len) +} + #[then("the total body length is absent")] fn then_total_absent(world: &mut MessageAssemblerWorld) -> crate::world::TestResult { world.assert_total_len(None) diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index a6c7260e..210c7cf8 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -5,15 +5,12 @@ use std::{fmt, io}; use bytes::{BufMut, BytesMut}; use cucumber::World; -use wireframe::message_assembler::{ - FrameHeader, - FrameSequence, - MessageAssembler, - ParsedFrameHeader, +use wireframe::{ + message_assembler::{FrameHeader, FrameSequence, MessageAssembler, ParsedFrameHeader}, + test_helpers::TestAssembler, }; use super::{TestApp, TestResult}; -use wireframe::test_helpers::TestAssembler; /// Specification for first-frame header encoding used in tests. #[derive(Debug, Clone, Copy)] @@ -43,6 +40,13 @@ pub struct ContinuationHeaderSpec { pub is_last: bool, } +#[derive(Debug, Clone, Copy)] +struct HeaderEnvelope { + kind: u8, + flags: u8, + key: u64, +} + /// World used by Cucumber to test message assembler header parsing. #[derive(Default, World)] pub struct MessageAssemblerWorld { @@ -67,19 +71,6 @@ impl fmt::Debug for MessageAssemblerWorld { } impl MessageAssemblerWorld { - fn assert_common_field(&self, field: &str, expected: &T, extractor: F) -> TestResult - where - T: PartialEq + fmt::Display + Copy, - F: FnOnce(&FrameHeader) -> T, - { - let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let actual = extractor(parsed.header()); - if actual != *expected { - return Err(format!("expected {field} {expected}, got {actual}").into()); - } - Ok(()) - } - /// Generic helper for asserting header-type-specific fields. /// /// The extractor performs both type-checking (via pattern matching) and field @@ -90,7 +81,7 @@ impl MessageAssemblerWorld { F: FnOnce(&FrameHeader) -> Result, { let parsed = self.parsed.as_ref().ok_or("no parsed header")?; - let actual = extractor(parsed.header()).map_err(|err| err.to_string())?; + let actual = extractor(parsed.header()).map_err(ToString::to_string)?; if actual != *expected { return Err(format!("expected {field_name} {expected}, got {actual}").into()); } @@ -103,8 +94,6 @@ impl MessageAssemblerWorld { /// /// Returns an error if any length field exceeds the header encoding limits. pub fn set_first_header(&mut self, spec: FirstHeaderSpec) -> TestResult { - let mut bytes = BytesMut::new(); - bytes.put_u8(0x01); let mut flags = 0u8; if spec.is_last { flags |= 0b1; @@ -112,19 +101,25 @@ impl MessageAssemblerWorld { if spec.total_len.is_some() { flags |= 0b10; } - bytes.put_u8(flags); - bytes.put_u64(spec.key); - let metadata_len = - u16::try_from(spec.metadata_len).map_err(|_| "metadata length too large")?; - bytes.put_u16(metadata_len); - let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; - bytes.put_u32(body_len); - if let Some(total) = spec.total_len { - let total = u32::try_from(total).map_err(|_| "total length too large")?; - bytes.put_u32(total); - } - self.payload = Some(bytes.to_vec()); - Ok(()) + self.set_payload_with_header( + HeaderEnvelope { + kind: 0x01, + flags, + key: spec.key, + }, + |bytes| { + let metadata_len = + u16::try_from(spec.metadata_len).map_err(|_| "metadata length too large")?; + bytes.put_u16(metadata_len); + let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; + bytes.put_u32(body_len); + if let Some(total) = spec.total_len { + let total = u32::try_from(total).map_err(|_| "total length too large")?; + bytes.put_u32(total); + } + Ok(()) + }, + ) } /// Store an encoded continuation-frame header in the world payload. @@ -133,8 +128,6 @@ impl MessageAssemblerWorld { /// /// Returns an error if any length field exceeds the header encoding limits. pub fn set_continuation_header(&mut self, spec: ContinuationHeaderSpec) -> TestResult { - let mut bytes = BytesMut::new(); - bytes.put_u8(0x02); let mut flags = 0u8; if spec.is_last { flags |= 0b1; @@ -142,13 +135,32 @@ impl MessageAssemblerWorld { if spec.sequence.is_some() { flags |= 0b10; } - bytes.put_u8(flags); - bytes.put_u64(spec.key); - let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; - bytes.put_u32(body_len); - if let Some(seq) = spec.sequence { - bytes.put_u32(seq); - } + self.set_payload_with_header( + HeaderEnvelope { + kind: 0x02, + flags, + key: spec.key, + }, + |bytes| { + let body_len = u32::try_from(spec.body_len).map_err(|_| "body length too large")?; + bytes.put_u32(body_len); + if let Some(seq) = spec.sequence { + bytes.put_u32(seq); + } + Ok(()) + }, + ) + } + + fn set_payload_with_header(&mut self, envelope: HeaderEnvelope, encode: F) -> TestResult + where + F: FnOnce(&mut BytesMut) -> TestResult, + { + let mut bytes = BytesMut::new(); + bytes.put_u8(envelope.kind); + bytes.put_u8(envelope.flags); + bytes.put_u64(envelope.key); + encode(&mut bytes)?; self.payload = Some(bytes.to_vec()); Ok(()) } @@ -208,9 +220,11 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the key does not match. pub fn assert_message_key(&self, expected: u64) -> TestResult { - self.assert_common_field("key", &expected, |header| match header { - FrameHeader::First(header) => u64::from(header.message_key), - FrameHeader::Continuation(header) => u64::from(header.message_key), + self.assert_header_field("key", &expected, |header| { + Ok(match header { + FrameHeader::First(header) => u64::from(header.message_key), + FrameHeader::Continuation(header) => u64::from(header.message_key), + }) }) } @@ -235,9 +249,11 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the body length differs. pub fn assert_body_len(&self, expected: usize) -> TestResult { - self.assert_common_field("body length", &expected, |header| match header { - FrameHeader::First(header) => header.body_len, - FrameHeader::Continuation(header) => header.body_len, + self.assert_header_field("body length", &expected, |header| { + Ok(match header { + FrameHeader::First(header) => header.body_len, + FrameHeader::Continuation(header) => header.body_len, + }) }) } @@ -280,12 +296,28 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the flag differs. pub fn assert_is_last(&self, expected: bool) -> TestResult { - self.assert_common_field("is_last", &expected, |header| match header { - FrameHeader::First(header) => header.is_last, - FrameHeader::Continuation(header) => header.is_last, + self.assert_header_field("is_last", &expected, |header| { + Ok(match header { + FrameHeader::First(header) => header.is_last, + FrameHeader::Continuation(header) => header.is_last, + }) }) } + /// Assert that the parsed header length matches the expected value. + /// + /// # Errors + /// + /// Returns an error if no header was parsed or the length differs. + pub fn assert_header_len(&self, expected: usize) -> TestResult { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let actual = parsed.header_len(); + if actual != expected { + return Err(format!("expected header length {expected}, got {actual}").into()); + } + Ok(()) + } + /// Assert that the parse failed with `InvalidData`. /// /// # Errors From 61be42fe3a731a1e7df1c101cdf32a36aa19494b Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 6 Jan 2026 18:37:24 +0000 Subject: [PATCH 14/14] Update code Co-authored-by: terragon-labs[bot] --- tests/worlds/message_assembler.rs | 37 ++++++++++++++++++------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/tests/worlds/message_assembler.rs b/tests/worlds/message_assembler.rs index 210c7cf8..2ef637dd 100644 --- a/tests/worlds/message_assembler.rs +++ b/tests/worlds/message_assembler.rs @@ -71,6 +71,19 @@ impl fmt::Debug for MessageAssemblerWorld { } impl MessageAssemblerWorld { + fn assert_common_field(&self, field: &str, expected: &T, extractor: F) -> TestResult + where + T: PartialEq + fmt::Display + Copy, + F: FnOnce(&FrameHeader) -> T, + { + let parsed = self.parsed.as_ref().ok_or("no parsed header")?; + let actual = extractor(parsed.header()); + if actual != *expected { + return Err(format!("expected {field} {expected}, got {actual}").into()); + } + Ok(()) + } + /// Generic helper for asserting header-type-specific fields. /// /// The extractor performs both type-checking (via pattern matching) and field @@ -220,11 +233,9 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the key does not match. pub fn assert_message_key(&self, expected: u64) -> TestResult { - self.assert_header_field("key", &expected, |header| { - Ok(match header { - FrameHeader::First(header) => u64::from(header.message_key), - FrameHeader::Continuation(header) => u64::from(header.message_key), - }) + self.assert_common_field("key", &expected, |header| match header { + FrameHeader::First(header) => u64::from(header.message_key), + FrameHeader::Continuation(header) => u64::from(header.message_key), }) } @@ -249,11 +260,9 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the body length differs. pub fn assert_body_len(&self, expected: usize) -> TestResult { - self.assert_header_field("body length", &expected, |header| { - Ok(match header { - FrameHeader::First(header) => header.body_len, - FrameHeader::Continuation(header) => header.body_len, - }) + self.assert_common_field("body length", &expected, |header| match header { + FrameHeader::First(header) => header.body_len, + FrameHeader::Continuation(header) => header.body_len, }) } @@ -296,11 +305,9 @@ impl MessageAssemblerWorld { /// /// Returns an error if no header was parsed or the flag differs. pub fn assert_is_last(&self, expected: bool) -> TestResult { - self.assert_header_field("is_last", &expected, |header| { - Ok(match header { - FrameHeader::First(header) => header.is_last, - FrameHeader::Continuation(header) => header.is_last, - }) + self.assert_common_field("is_last", &expected, |header| match header { + FrameHeader::First(header) => header.is_last, + FrameHeader::Continuation(header) => header.is_last, }) }