diff --git a/Cargo.lock b/Cargo.lock index 11a67702..a238a0bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1761,9 +1761,9 @@ dependencies = [ [[package]] name = "rstest-bdd" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e741d97bce6ea0a7d0f074716041e0d0eebe6ab9edcd243e7d12f9fd2b5e8b6" +checksum = "138d8e4f97e16906ebeb0bfb12d2c94f0b05913a96fc522111bec62ca8328522" dependencies = [ "ctor", "derive_more 0.99.20", @@ -1785,9 +1785,9 @@ dependencies = [ [[package]] name = "rstest-bdd-macros" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7e3b51c032a6174f82d87843a5e62cfd08ce4606005eafde07ed12561d73196" +checksum = "fe104196f61dc8911a8da1b10005e9401e7c2e14ad9302e2de6688311c0beec7" dependencies = [ "camino", "cap-std", @@ -1809,9 +1809,9 @@ dependencies = [ [[package]] name = "rstest-bdd-patterns" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75d730afd8727e5b18bd276ebf5ea4c881760138762d0cd7d415dc6b6662c6c6" +checksum = "39abaa69316cdbbad0512dc0f37b704b09e4cdb5018d80f197cbdb77a2269d06" dependencies = [ "gherkin", "regex", @@ -1820,9 +1820,9 @@ dependencies = [ [[package]] name = "rstest-bdd-policy" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d256efeb01f08e281cef9cd1e54dab33d8778e871090f5fa49b7a9a70f0caf81" +checksum = "88f2ca584f7d9d359a09f616900be015302c959d58c2c2da6c075573352dfa0d" [[package]] name = "rstest_macros" diff --git a/Cargo.toml b/Cargo.toml index 6b778522..9044cca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,8 +49,8 @@ socket2 = "0.6.0" [dev-dependencies] rstest = "0.26.1" -rstest-bdd = "0.4.0" -rstest-bdd-macros = { version = "0.4.0", features = ["strict-compile-time-validation"] } +rstest-bdd = "0.5.0" +rstest-bdd-macros = { version = "0.5.0", features = ["strict-compile-time-validation"] } wireframe = { path = ".", features = ["test-helpers"] } wireframe_testing = { path = "./wireframe_testing" } logtest = "2.0.0" diff --git a/docs/adr-004-pluggable-protocol-codecs.md b/docs/adr-004-pluggable-protocol-codecs.md index 9a7d8abe..59b06b6a 100644 --- a/docs/adr-004-pluggable-protocol-codecs.md +++ b/docs/adr-004-pluggable-protocol-codecs.md @@ -170,7 +170,8 @@ pub trait FrameCodec: Send + Sync + Clone + 'static { - Parameterize `FrameHandlingContext` and `ResponseContext` over the codec. - Replace `LengthDelimitedCodec` usage with `FrameCodec` decoder/encoder calls. -- Use `max_frame_length()` for buffer sizing and fragmentation defaults. +- Use `max_frame_length()` for buffer sizing and explicit fragmentation + configuration helpers (`enable_fragmentation`). ### Phase 4: Update `WireframeServer` diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md new file mode 100644 index 00000000..011d96b3 --- /dev/null +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -0,0 +1,413 @@ +# 9.2.1 FragmentAdapter trait and fragmentation opt-in hardening + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: COMPLETE + +No `PLANS.md` exists in this repository as of 2026-02-12. + +## Purpose / big picture + +Wireframe already ships transport fragmentation primitives, but current app +integration still has open hardening gaps called out by roadmap item 9.2.1: +fragmentation is enabled by default, duplicate versus out-of-order handling is +not explicit, purge scheduling ownership is implicit, and there is no public +fragment-adapter abstraction tying these rules together. This plan introduces a +public `FragmentAdapter` trait and aligns runtime behaviour, so fragmentation +is explicitly opt-in, purge control is public, duplicate and out-of-order +policies are deterministic, and edge cases (zero-length fragments and index +overflow) are defined and tested. + +Success is observable when: + +- `WireframeApp::new()` no longer fragments unless the caller explicitly opts + in. +- a public fragment adapter API exists and exposes purge control. +- interleaved fragment streams reassemble correctly, while duplicate and + out-of-order series follow documented policies. +- unit tests (`rstest`), integration tests, and behavioural tests + (`rstest-bdd` v0.5.0) cover the new rules. +- design docs, user-facing docs, and roadmap status are updated consistently. + +## Constraints + +- Keep existing routing, middleware, serializer, and codec public APIs stable + except for the intentional fragmentation opt-in behaviour change. +- Do not introduce `unsafe`. +- Preserve current default codec behaviour and frame-length guardrails. +- Keep module-level `//!` comments and rustdoc examples for new public items. +- Use `rstest` fixtures/parameterization for new unit and integration tests. +- Use `rstest-bdd` v0.5.0 for behavioural test coverage required by this + feature. +- Update `docs/users-guide.md` for any public API change made by this work. +- Record decisions in the fragmentation design document + `docs/generic-message-fragmentation-and-re-assembly-design.md` (and companion + docs when composition guidance changes). + +## Tolerances (exception triggers) + +- Scope: if implementation exceeds 20 files or 1,200 net changed lines, pause + and confirm before continuing. +- Interface: if this work requires changing unrelated public APIs (client + runtime, push queue APIs, or serializer trait signatures), pause and confirm. +- Dependencies: only `rstest-bdd` and `rstest-bdd-macros` may be upgraded (to + 0.5.0) in this milestone; any other new dependency requires escalation. +- Iterations: if `make lint` or `make test` fails three consecutive times for + the same root cause, stop and reassess approach. +- Time: if any single stage exceeds one focused day of work without reaching + stage acceptance criteria, log the blocker and re-scope before proceeding. +- Ambiguity: if `FragmentAdapter` naming conflicts with existing design text + cannot be resolved by documentation updates alone, stop and request naming + direction. + +## Risks + +- Risk: behavioural change from default-enabled to opt-in fragmentation may + break existing tests/examples that relied on implicit defaults. Severity: + high. Likelihood: medium. Mitigation: migrate call sites to explicit builder + configuration and add dedicated regression tests for disabled/default and + enabled modes. + +- Risk: `rstest-bdd` 0.5.0 may introduce macro/runtime changes that affect the + current scenario harness. Severity: medium. Likelihood: medium. Mitigation: + upgrade dependency early in the branch, run `make test-bdd`, adapt + step/scenario signatures before feature-specific additions. + +- Risk: duplicate suppression policy can mask protocol bugs if duplicates are + not observably tracked. Severity: medium. Likelihood: medium. Mitigation: + define explicit duplicate semantics in types/errors/docs and add tests that + assert suppression versus rejection outcomes. + +- Risk: purge scheduling ownership can remain ambiguous between adapter and + connection loop. Severity: medium. Likelihood: high. Mitigation: codify + ownership in the `FragmentAdapter` contract and document exactly when + Wireframe calls purge versus when callers may drive it. + +## Progress + +- [x] (2026-02-12 00:00Z) Draft ExecPlan for roadmap item 9.2.1. +- [x] (2026-02-12 00:25Z) Finalize `FragmentAdapter` API contract and error + taxonomy updates. +- [x] (2026-02-12 00:37Z) Make fragmentation opt-in on `WireframeApp` builder + defaults. +- [x] (2026-02-12 00:42Z) Expose public purge API and wire it through adapter + implementation. +- [x] (2026-02-12 01:03Z) Implement duplicate suppression and out-of-order + handling policy. +- [x] (2026-02-12 01:11Z) Define zero-length fragment behaviour and + index-overflow handling. +- [x] (2026-02-12 01:35Z) Add/upgrade unit, integration, and behavioural + tests. +- [x] (2026-02-12 02:00Z) Update design docs, user guide, and roadmap + checkboxes. +- [x] (2026-02-12 02:20Z) Run formatting, lint, and full test gates. + +## Surprises & Discoveries + +- Observation: the fragmentation design doc still states default-enabled + behaviour, while roadmap 9.2.1 now requires explicit opt-in. Evidence: + `docs/generic-message-fragmentation-and-re-assembly-design.md` section 3.4 + versus `docs/roadmap.md` section 9.2. Impact: this milestone must include + design-document corrections, not only code/test edits. + +- Observation: behavioural testing guidance for this repository now lives in + `docs/rstest-bdd-users-guide.md`. Evidence: current testing policy and + roadmap requirements for `rstest-bdd`. Impact: behavioural testing updates + for this feature should follow that guide's conventions and version updates. + +- Observation: the first version of the interleaved transport integration test + assumed deterministic response ordering and intermittently timed out. + Evidence: `tests/fragment_transport.rs` failures during local `make test` + runs. Impact: the assertion strategy was changed to drain both responses and + compare an order-independent payload set, which matches scheduler reality. + +## Decision Log + +- Decision: introduce a public `FragmentAdapter` trait plus one default + implementation backed by existing `Fragmenter` and `Reassembler` primitives. + Rationale: satisfies roadmap wording while minimizing rewrite risk by reusing + proven internals. Date/Author: 2026-02-12 / Codex. + +- Decision: change app-level fragmentation from implicit default to explicit + opt-in at builder time. Rationale: aligns with roadmap hardening goal and + prevents hidden performance and behavioural costs for protocols that do not + need fragmentation. Date/Author: 2026-02-12 / Codex. + +- Decision: codify duplicate suppression and out-of-order rejection as separate + outcomes. Rationale: duplicate retransmissions can be tolerated safely, while + true out-of-order delivery should fail deterministically and clear partial + state. Date/Author: 2026-02-12 / Codex. + +- Decision: make `with_codec(...)` clear fragmentation state and require a + fresh explicit `enable_fragmentation()` call. Rationale: fragmentation + settings depend on codec max-frame details; retaining old settings after + codec replacement can silently produce mismatched thresholds. Date/Author: + 2026-02-12 / Codex. + +## Outcomes & Retrospective + +Implemented outcomes: + +- Added a public `FragmentAdapter` trait and `DefaultFragmentAdapter` + implementation in `src/fragment/adapter.rs`, and exported them through + `src/fragment/mod.rs` and `src/lib.rs`. +- Shifted runtime integration to the new adapter contract while preserving app + call-site behaviour through the `src/app/fragmentation_state.rs` alias layer. +- Made fragmentation opt-in by default in `WireframeApp` builder construction + and introduced `enable_fragmentation()` for explicit activation. +- Added caller-driven purge access through the adapter API and documented purge + ownership in design and user docs. +- Defined duplicate suppression as non-fatal (`Duplicate`) and preserved + out-of-order rejection semantics with deterministic state cleanup. +- Added/updated coverage for opt-in defaults, interleaved reassembly, duplicate + suppression, out-of-order fragments, zero-length fragments, and index + overflow across unit, integration, and behavioural test suites. +- Updated roadmap 9.2.1 and companion design/user documents to reflect final + behaviour and composition order. + +Retrospective: + +- Separating duplicate from out-of-order outcomes reduced ambiguity in both code + and tests and made policy documentation straightforward. +- Explicit opt-in defaults prevent hidden transport costs for users that do not + need fragmentation, but this increases migration burden for existing builder + call sites; the new builder tests now guard this contract. + +## Context and orientation + +Current transport fragmentation internals live in: + +- `src/fragment/fragmenter.rs` (outbound splitting). +- `src/fragment/reassembler.rs` (inbound assembly and timeout purge). +- `src/fragment/series.rs` (index ordering rules). +- `src/app/fragmentation_state.rs` (connection-scoped wrapper currently used by + app frame handling). + +Current app integration points are: + +- `src/app/builder/core.rs` and `src/app/builder/codec.rs`, where + `default_fragmentation(...)` currently auto-enables fragmentation. +- `src/app/connection.rs`, which instantiates optional fragmentation state and + calls purge on read-timeout ticks. +- `src/app/frame_handling/response.rs` and + `src/app/frame_handling/reassembly.rs`, which apply fragmentation and + reassembly around handler processing. + +Current tests and fixtures already cover part of the domain: + +- `src/fragment/tests.rs` for primitive/unit coverage. +- `tests/fragment_transport.rs` and `tests/fragment_transport/*` for transport + integration coverage. +- `tests/features/fragment.feature`, `tests/steps/fragment_steps.rs`, and + `tests/scenarios/fragment_scenarios.rs` for behavioural coverage. + +Documentation currently needing alignment: + +- `docs/generic-message-fragmentation-and-re-assembly-design.md` (adapter + contract, duplicate/out-of-order policy, purge ownership, opt-in semantics). +- `docs/multi-packet-and-streaming-responses-design.md` (layer composition + order references). +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` + (hardening narrative alignment). +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` (hardening + narrative alignment). +- `docs/users-guide.md` (public configuration surface and behaviour changes). +- `docs/roadmap.md` (mark 9.2.1 sub-items done on completion). + +## Plan of work + +### Stage A: contract and policy definition (no behavioural changes yet) + +Define the `FragmentAdapter` public contract in `src/fragment`, including purge +methods and explicit result/error shapes for duplicate suppression, +out-of-order fragments, zero-length fragments, and overflow paths. Update the +fragmentation design document first, so code follows an agreed contract. + +Go/no-go: + +- Go when design docs and trait signatures agree on ownership of purge + scheduling and policy terms. +- No-go if naming or ownership remains ambiguous after doc updates. + +### Stage B: wire adapter into app path and enforce opt-in + +Implement the default adapter using existing `Fragmenter` + `Reassembler` +logic, then switch app frame handling to use the adapter contract. Change +builder defaults, so fragmentation is disabled unless explicitly configured. +Update builder docs/comments and any helper defaults that currently turn +fragmentation on implicitly. + +Go/no-go: + +- Go when `WireframeApp::new()` has no fragmentation state by default and + explicit config paths still work. +- No-go if disabling defaults causes unbounded regressions outside + fragmentation-related tests. + +### Stage C: policy enforcement and test expansion + +Implement duplicate suppression versus out-of-order rejection behaviour in the +fragment series/reassembler path. Add explicit zero-length and overflow +handling tests. Expand integration tests for interleaved reassembly and opt-in +semantics. Upgrade behavioural tests to `rstest-bdd` v0.5.0 and add scenarios +that prove the new policies. + +Go/no-go: + +- Go when new tests fail before implementation and pass after changes. +- No-go if policy cannot be expressed without broad unrelated API changes. + +### Stage D: documentation, roadmap completion, and hardening gates + +Update user-facing and design docs to match implemented behaviour and +composition order. Mark roadmap 9.2.1 checklist items as done. Run full +formatting, lint, and test gates. + +Go/no-go: + +- Go when docs and code behaviour match and all quality gates pass. +- No-go if any gate fails; fix root causes before finalizing. + +## Concrete steps + +1. Add/adjust fragmentation adapter API surface: + `src/fragment/adapter.rs` (new), `src/fragment/mod.rs`, `src/lib.rs`. + +2. Refactor connection-facing adapter implementation: + `src/app/fragmentation_state.rs`, `src/app/frame_handling/core.rs`, + `src/app/frame_handling/reassembly.rs`, + `src/app/frame_handling/response.rs`, `src/app/connection.rs`. + +3. Enforce opt-in builder behaviour: + `src/app/builder_defaults.rs`, `src/app/builder/core.rs`, + `src/app/builder/codec.rs`, `src/app/builder/config.rs`, plus affected + doctests and call sites. + +4. Implement policy-specific fragment logic and tests: + `src/fragment/series.rs`, `src/fragment/reassembler.rs`, + `src/fragment/error.rs`, `src/fragment/tests.rs`. + +5. Expand integration and behavioural suites: + `tests/fragment_transport.rs`, `tests/fragment_transport/rejection.rs`, + `tests/fragment_transport/eviction.rs`, `tests/common/fragment_helpers.rs`, + `tests/features/fragment.feature`, `tests/steps/fragment_steps.rs`, + `tests/scenarios/fragment_scenarios.rs`, `tests/fixtures/fragment/mod.rs`, + `tests/fixtures/fragment/reassembly.rs`. + +6. Upgrade behavioural-test dependencies to v0.5.0 and adapt harness if needed: + `Cargo.toml`, `Cargo.lock`, and any `rstest-bdd` API call sites. + +7. Update documentation and roadmap: + `docs/generic-message-fragmentation-and-re-assembly-design.md`, + `docs/multi-packet-and-streaming-responses-design.md`, + `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`, + `docs/hardening-wireframe-a-guide-to-production-resilience.md`, + `docs/users-guide.md`, and `docs/roadmap.md`. + +8. Run validation commands from repository root, capturing full logs: + + set -o pipefail + timeout 300 make fmt 2>&1 | tee /tmp/wireframe-fmt.log + + set -o pipefail + timeout 300 make markdownlint 2>&1 | tee /tmp/wireframe-markdownlint.log + + set -o pipefail + timeout 300 make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log + + set -o pipefail + timeout 300 make lint 2>&1 | tee /tmp/wireframe-lint.log + + set -o pipefail + timeout 300 make test-bdd 2>&1 | tee /tmp/wireframe-test-bdd.log + + set -o pipefail + timeout 300 make test 2>&1 | tee /tmp/wireframe-test.log + + If Mermaid diagrams are edited, also run: + + set -o pipefail + timeout 300 make nixie 2>&1 | tee /tmp/wireframe-nixie.log + +## Validation and acceptance + +Acceptance is complete when all statements below are true: + +- Default app behaviour: creating `WireframeApp::new()` without explicit + fragmentation configuration does not fragment traffic. +- Opt-in behaviour: explicit builder configuration enables fragmentation and + preserves round-trip correctness for large payloads. +- Purge API: callers can invoke a documented public purge method on the + adapter/reassembly path and observe stale state eviction. +- Duplicate/out-of-order policy: duplicate fragments are handled per documented + suppression policy; out-of-order fragments trigger deterministic rejection + and cleanup. +- Edge cases: zero-length fragments and fragment index overflow semantics are + defined and covered by tests. +- Interleaving: integration tests verify interleaved fragment streams + reassemble correctly. +- Behavioural coverage: `rstest-bdd` scenarios validate fragment-series policy + and any new public behaviour, running under v0.5.0 dependencies. +- Documentation parity: user guide and design docs describe the same behaviour + that tests verify. +- Roadmap update: `docs/roadmap.md` item 9.2.1 and all requested sub-items are + checked as done. + +## Idempotence and recovery + +All changes are additive or local refactors and can be safely reapplied. If any +stage fails: + +- revert only the files touched in that stage; +- keep prior completed stages intact; +- rerun the stage-local tests first, then full gates. + +If dependency upgrade causes widespread unrelated breakage, pin the updated +version in branch commits, fix compatibility incrementally, and only then +resume feature-specific edits. + +## Artifacts and notes + +Validation evidence logs: + +- `/tmp/wireframe-fmt.log` (`make fmt`) +- `/tmp/wireframe-markdownlint.log` (`make markdownlint`) +- `/tmp/wireframe-check-fmt.log` (`make check-fmt`) +- `/tmp/wireframe-lint.log` (`make lint`) +- `/tmp/wireframe-test-bdd.log` (`make test-bdd`) +- `/tmp/wireframe-test.log` (`make test`) + +## Interfaces and dependencies + +Target public interface shape (exact naming may be adjusted during Stage A): + + pub trait FragmentAdapter: Send + Sync { + fn fragment( + &self, + packet: E, + ) -> Result, crate::fragment::FragmentationError>; + fn reassemble( + &mut self, + packet: E, + ) -> Result, crate::fragment::FragmentAdapterError>; + fn purge_expired(&mut self) -> Vec; + } + +Target behavioural dependency changes: + +- `rstest-bdd = "0.5.0"` +- `rstest-bdd-macros = { version = "0.5.0", ... }` + +These remain `dev-dependencies` only. + +## Revision note (2026-02-12) + +Initial draft created for roadmap item 9.2.1 with explicit constraints, +tolerances, staged implementation flow, and required testing/documentation +updates. + +Completed update: plan status advanced to COMPLETE, required milestones were +checked off, outcomes and policy decisions were recorded, and artefacts were +captured for reproducibility. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 769ff22b..a3e8bf9d 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -40,9 +40,10 @@ The implementation must satisfy the following core requirements: ## 3. Core architecture: the `FragmentAdapter` -The feature will be implemented as a codec middleware called `FragmentAdapter`. -It is instantiated with a protocol-specific `FragmentStrategy` and wraps any -subsequent codecs in the chain. +The feature is exposed as a public `FragmentAdapter` trait plus a default +implementation (`DefaultFragmentAdapter`) that wraps `Fragmenter` and +`Reassembler`. Protocol-specific stacks can still layer additional strategy +codecs around this adapter. ```plaintext Socket I/O ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers @@ -67,38 +68,24 @@ connection, and bytes buffered across in-flight assemblies. See model. ```rust -use dashmap::DashMap; -use std::sync::atomic::AtomicU64; -use std::time::{Duration, Instant}; - -pub struct FragmentAdapter { - strategy: S, - /// Hard cap on the size of a reassembled logical message. - max_message_size: usize, - /// Timeout for completing a partial message reassembly. - reassembly_timeout: Duration, - /// Concurrently accessible map for in-flight message reassembly. - reassembly_buffers: DashMap, - /// Atomic counter for generating unique outbound message IDs. - next_outbound_msg_id: AtomicU64, +pub trait FragmentAdapter: Send + Sync { + fn fragment(&self, packet: E) -> Result, FragmentationError>; + fn reassemble( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError>; + fn purge_expired(&mut self) -> Vec; } -/// State for a single, in-progress message reassembly. -struct PartialMessage { - /// The buffer holding the accumulating payload. - buffer: BytesMut, - /// The advertised total payload size, if known. - expected_total: Option, - /// The sequence number of the last fragment received. - last_sequence: u64, - /// The time the first fragment was received. - started_at: Instant, +pub struct DefaultFragmentAdapter { + fragmenter: Fragmenter, + reassembler: Reassembler, } ``` -The use of `dashmap::DashMap` allows for lock-free reads and sharded writes, -providing efficient and concurrent access to the reassembly buffers without -blocking the entire connection task. +`DefaultFragmentAdapter` delegates message-ID sequencing to `Fragmenter` and +in-flight reassembly state management (including timeout expiry) to +`Reassembler`. ### 3.2 Canonical fragment header (November 2025 update) @@ -132,7 +119,7 @@ control how headers are encoded on the wire, but it now produces a ### 3.3 Fragmenter helper (17 November 2025 update) -To simplify outbound slicing before the full adaptor arrives, the crate now +To simplify outbound slicing before the full adapter arrives, the crate now ships a small `Fragmenter` helper. It accepts a `NonZeroUsize` payload cap and creates sequential fragments tagged with `MessageId`, `FragmentIndex`, and the `is_last_fragment` flag described above. The helper exposes three entry points: @@ -164,16 +151,21 @@ configuration requires updating this section alongside the code. The length-delimited codec therefore observes one frame per fragment; the encoded body is bounded by `fragment_payload_cap + fragment_overhead`. -`WireframeApp` and `ConnectionActor` enable this adaptor by default when a -frame budget is available. Defaults derive `fragment_payload_cap` from -`buffer_capacity`, cap reassembled messages at 16× that budget, and evict -partial assemblies after 30 seconds. +`WireframeApp` keeps fragmentation disabled by default and now requires +explicit configuration via `enable_fragmentation()` or +`fragmentation(Some(cfg))`. When enabled, defaults derive +`fragment_payload_cap` from `buffer_capacity`, cap reassembled messages at 16× +that budget, and use a 30-second expiry window. -## 4. Public API: the `FragmentStrategy` trait +## 4. Public API: `FragmentAdapter` and `FragmentStrategy` -The power and flexibility of this feature come from the `FragmentStrategy` -trait. Protocol implementers will provide a type that implements this trait to -inject their specific fragmentation rules into the generic `FragmentAdapter`. +The `FragmentAdapter` contract defines lifecycle ownership for fragmentation +and reassembly, including purge control. `WireframeApp` drives purge scheduling +on read-timeout ticks, while external callers can drive eviction manually +through `FragmentAdapter::purge_expired`. + +The `FragmentStrategy` trait remains the protocol-specific extension point for +parsing and encoding strategy headers when protocols need custom wire layouts. ### 4.1 Trait Definition @@ -279,10 +271,10 @@ async fn streamed() -> Response> { The reassembly logic is the most complex part of the feature and must be robust against errors and attacks. -1. **Header Decoding:** The adaptor reads from the socket buffer and calls +1. **Header Decoding:** The adapter reads from the socket buffer and calls `strategy.decode_header()`. If it returns `Ok(None)`, it waits for more data. -2. **Payload Extraction:** Once a header is decoded, the adaptor ensures the +2. **Payload Extraction:** Once a header is decoded, the adapter ensures the full payload for that fragment is available in the buffer before proceeding. 3. **Multiplexed State Management:** @@ -291,7 +283,7 @@ against errors and attacks. (if `is_final`) or an error (if not `is_final` and a non-multiplexed reassembly is already in progress). - - If `meta.msg_id` is `Some(id)`, the adaptor accesses the + - If `meta.msg_id` is `Some(id)`, the adapter accesses the `reassembly_buffers` map. - **New Message (**`.entry().or_insert_with(...)`**):** @@ -306,9 +298,10 @@ against errors and attacks. - **Continuing Message (**`.get_mut()`**):** - - The `last_sequence` is checked to ensure fragments are monotonic. An - out-of-order fragment results in an error and the `PartialMessage` being - dropped. + - The `last_sequence` is checked to ensure fragments are monotonic. + Duplicate fragments (repeated indices) are suppressed, while + out-of-order fragments (indices ahead of the expected position) result + in an error and the `PartialMessage` being dropped. - The buffer's potential new size is checked against `max_message_size`. Exceeding this limit results in an error. @@ -319,17 +312,16 @@ against errors and attacks. extracted from the `PartialMessage`, the entry is removed from the map, then pass the complete logical frame down the codec chain. -4. **Timeout handling:** Run a background task within the - `FragmentAdapter` that periodically iterates over the re‑assembly buffers, - checks each `PartialMessage`’s `started_at` timestamp, and removes any entry - that has exceeded the re‑assembly timeout, emitting a `WARN`‑level `tracing` - event. +4. **Timeout handling:** Purge ownership is explicit. Callers invoke + `FragmentAdapter::purge_expired()` according to their scheduling policy. + `WireframeApp` invokes this on read-timeout ticks; protocol harnesses and + tests can invoke it directly to drive deterministic eviction. ### 5.2 Outbound path (fragmentation) The outbound path is simpler and purely procedural. -1. **Size Check:** When `write(frame)` is called, the adaptor checks +1. **Size Check:** When `write(frame)` is called, the adapter checks `frame.len()` against `strategy.max_fragment_payload(&frame)`. 2. **No Fragmentation:** If the frame is small enough, it is passed directly to @@ -340,7 +332,7 @@ The outbound path is simpler and purely procedural. - A new `msg_id` is generated via `next_outbound_msg_id.fetch_add(1, Ordering::Relaxed)`. - - The adaptor iterates through the frame's payload in chunks of + - The adapter iterates through the frame's payload in chunks of `max_fragment_payload`. - For each chunk, it calls `strategy.encode_header()` to write the fragment @@ -371,11 +363,11 @@ This feature is designed as a foundational layer that other features build upon. | --------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | API Correctness | The FragmentStrategy trait and FragmentAdapter are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | | Functionality | A large logical frame is correctly split into N fragments, and a sequence of N fragments is correctly reassembled into the original frame. | An end-to-end test confirms byte-for-byte identity of a payload at the configured max_message_size after being fragmented and reassembled. | -| Multiplexing | The adaptor can correctly reassemble two messages whose fragments are interleaved. | A test sending fragments A1, B1, A2, B2, A3, B3 must result in two correctly reassembled messages, A and B. | -| Resilience | The adaptor protects against memory exhaustion from oversized messages. | A test sending fragments that exceed max_message_size must terminate the connection and not allocate beyond the configured cap (including allocator overhead). | -| Resilience | The adaptor protects against resource leaks from abandoned partial messages. | A test that sends an initial fragment but never the final one must result in the partial buffer being purged after the reassembly_timeout duration has passed. | -| Performance | The overhead for messages that do not require fragmentation is minimal. | A criterion benchmark passing a stream of small, non-fragmented frames through the FragmentAdapter must show < 5% throughput degradation compared to a build without the adaptor. | -| Resilience | The adaptor enforces the configured `max_message_size`, `fragment_payload_cap`, and `reassembly_timeout` used in production. | Benchmarks and regression tests assert the 16× message cap, per-fragment payload cap derived from buffer capacity, and a 30s timeout for purging stale assemblies (WireframeApp defaults). | +| Multiplexing | The adapter can correctly reassemble two messages whose fragments are interleaved. | A test sending fragments A1, B1, A2, B2, A3, B3 must result in two correctly reassembled messages, A and B. | +| Resilience | The adapter protects against memory exhaustion from oversized messages. | A test sending fragments that exceed max_message_size must terminate the connection and not allocate beyond the configured cap (including allocator overhead). | +| Resilience | The adapter protects against resource leaks from abandoned partial messages. | A test that sends an initial fragment but never the final one must result in the partial buffer being purged after the reassembly_timeout duration has passed. | +| Performance | The overhead for messages that do not require fragmentation is minimal. | A criterion benchmark passing a stream of small, non-fragmented frames through the FragmentAdapter must show < 5% throughput degradation compared to a build without the adapter. | +| Resilience | The adapter enforces the configured `max_message_size`, `fragment_payload_cap`, and `reassembly_timeout` used in production. | Benchmarks and regression tests assert the 16× message cap, per-fragment payload cap derived from buffer capacity, and a 30s timeout for purging stale assemblies (WireframeApp defaults). | ## 8. Design decisions (14 November 2025, updated 17 November 2025) @@ -401,6 +393,15 @@ This feature is designed as a foundational layer that other features build upon. drops the partial buffer when ordering breaks, enforces a configurable `max_message_size`, and exposes caller-driven timeout purging. This prevents abandoned assemblies from exhausting memory. +- Added an explicit duplicate-handling policy for active series: duplicate + fragment indices are suppressed and do not append payload bytes, while + out-of-order indices still fail reassembly and clear partial state. +- Changed app-level fragmentation configuration to explicit opt-in. Builders no + longer auto-enable fragmentation; callers must use `enable_fragmentation()` + or `fragmentation(Some(cfg))`. +- Assigned purge scheduling ownership to the adapter caller via + `FragmentAdapter::purge_expired()`. `WireframeApp` drives this on timeout + ticks, and external callers can schedule purges directly. ## 9. Composition with streaming requests and MessageAssembler diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 2485b579..8d0960ed 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -312,12 +312,13 @@ fragmentation layer must be hardened. assemblies, which are purged if they are not completed within the time limit (e.g., 30 seconds). -`WireframeApp` enables these guards by default via -`default_fragmentation(buffer_capacity)`, which builds a `FragmentationConfig` -from the connection's `buffer_capacity`. The fragment payload cap matches the -usable frame budget, `max_message_size` defaults to 16× `buffer_capacity`, and -partial assemblies are purged after 30 seconds. Applications can override or -disable these defaults via `fragmentation(...)`. +`WireframeApp` keeps fragmentation disabled by default. Applications opt in via +`enable_fragmentation()` (codec-derived defaults) or `fragmentation(Some(cfg))` +(explicit values). Defaults derive `fragment_payload_cap` from +`buffer_capacity`, set `max_message_size` to 16× `buffer_capacity`, and use a +30-second timeout. Purge scheduling is caller-owned through +`FragmentAdapter::purge_expired()`; `WireframeApp` invokes it on read-timeout +ticks, while custom runtimes can drive eviction explicitly. ## 5. Advanced Resilience Patterns diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 04617951..cb7c6329 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -427,6 +427,8 @@ not hang. `FragmentAdapter` will operate at a lower layer, transparently splitting any large frames yielded by the stream before they are written to the socket. The handler and streaming logic remain completely unaware of fragmentation. + Outbound order is serializer → fragmentation → codec framing; inbound order + is codec decode → fragment reassembly → deserialization. - **Streaming Request Bodies:** [ADR 0002][adr-0002] introduces first-class streaming request bodies as the inbound counterpart to streaming responses. diff --git a/docs/roadmap.md b/docs/roadmap.md index a0d641ec..f8516227 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -370,20 +370,20 @@ integration boundaries. ### 9.2. Fragment adaptor alignment -- [ ] 9.2.1. Introduce a `FragmentAdapter` trait as described in the +- [x] 9.2.1. Introduce a `FragmentAdapter` trait as described in the fragmentation design.[^fragmentation-design] Fragmentation behaviour must explicitly define duplicate handling, out-of-order policies, and ownership of purge scheduling. - - [ ] Make fragmentation opt-in by requiring explicit configuration on the + - [x] Make fragmentation opt-in by requiring explicit configuration on the `WireframeApp` builder. - - [ ] Expose a public purge API, so callers can drive timeout eviction. - - [ ] Document the composition order for codec, fragmentation, and + - [x] Expose a public purge API, so callers can drive timeout eviction. + - [x] Document the composition order for codec, fragmentation, and serialization layers. - - [ ] Define and implement duplicate suppression and out-of-order handling + - [x] Define and implement duplicate suppression and out-of-order handling for fragment series. - - [ ] Define and test zero-length fragment behaviour and fragment index + - [x] Define and test zero-length fragment behaviour and fragment index overflow handling. - - [ ] Add unit and integration tests for opt-in behaviour, interleaved + - [x] Add unit and integration tests for opt-in behaviour, interleaved reassembly, and duplicate and out-of-order fragments. ### 9.3. Unified codec handling diff --git a/docs/rstest-bdd-users-guide.md b/docs/rstest-bdd-users-guide.md index b2e0baed..74c8cf17 100644 --- a/docs/rstest-bdd-users-guide.md +++ b/docs/rstest-bdd-users-guide.md @@ -838,14 +838,14 @@ To enable validation, pin a feature in the project's `dev-dependencies`: ```toml [dev-dependencies] -rstest-bdd-macros = { version = "0.4.0", features = ["compile-time-validation"] } +rstest-bdd-macros = { version = "0.5.0", features = ["compile-time-validation"] } ``` For strict checking use: ```toml [dev-dependencies] -rstest-bdd-macros = { version = "0.4.0", features = ["strict-compile-time-validation"] } +rstest-bdd-macros = { version = "0.5.0", features = ["strict-compile-time-validation"] } ``` Steps are only validated when one of these features is enabled. @@ -1238,7 +1238,7 @@ Localization tooling can be added to `Cargo.toml` as follows: ```toml [dependencies] -rstest-bdd = "0.4.0" +rstest-bdd = "0.5.0" i18n-embed = { version = "0.16", features = ["fluent-system", "desktop-requester"] } unic-langid = "0.9" ``` diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index ee3be6ee..d6d56654 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -292,6 +292,9 @@ Rust's ownership model and `Drop` trait are the foundation of resource safety. Budgets are configured on the `WireframeApp` instance via a builder method (for example `memory_budgets(...)`). If budgets are not set explicitly, defaults are derived from `buffer_capacity`. + Transport fragmentation itself is opt-in on the builder (`enable_fragmentation()` + or `fragmentation(Some(cfg))`) so protocols that do not require + fragmentation keep zero additional buffering overhead. - **Back-pressure and Hard Caps:** When budgets are approached, Wireframe applies back-pressure by pausing further reads and assembly work (soft diff --git a/docs/users-guide.md b/docs/users-guide.md index 8874f372..37abada1 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -106,7 +106,10 @@ and a 100 ms read timeout. Clamp the length-delimited limit with `buffer_capacity` (length-delimited only), swap codecs with `with_codec`, and override the serializer with `with_serializer` when a different encoding strategy is required.[^3][^4] Custom protocols implement `FrameCodec` to -describe their framing rules. +describe their framing rules. Changing frame budgets with `buffer_capacity` or +swapping codecs with `with_codec` clears fragmentation settings, so call +`enable_fragmentation()` (or `fragmentation(Some(cfg))`) again when transport +fragmentation is required. Once a stream is accepted—either from a manual accept loop or via `WireframeServer`—`handle_connection(stream)` builds (or reuses) the middleware @@ -128,19 +131,20 @@ A codec implementation must: - Define a `Frame` type and paired decoder/encoder implementations that return `std::io::Error` on failure. - Return only the logical payload bytes from `frame_payload` so metadata parsing - and deserialisation run against the right buffer. + and deserialization run against the right buffer. - Wrap outbound payloads with `wrap_payload(&self, Bytes)`, adding any protocol headers or metadata required by the wire format. - Provide `correlation_id` when the protocol stores it outside the payload; Wireframe only uses this hook when the deserialized envelope is missing a correlation identifier. -- Report `max_frame_length`, which clamps inbound frames and seeds default - fragmentation limits. - -Install a custom codec with `with_codec`. The builder resets fragmentation to -the codec-derived defaults, so override fragmentation afterwards if the -protocol uses a different budget. Wireframe clones the codec per connection, so -stateful codecs should ensure `Clone` produces an independent state (for +- Report `max_frame_length`, which clamps inbound frames and determines the + budget used by `enable_fragmentation`. + +Install a custom codec with `with_codec`. The builder disables fragmentation +when codecs or the length-delimited frame budget change, so explicitly call +`enable_fragmentation()` (or `fragmentation(Some(cfg))`) afterwards when +transport fragmentation is required. Wireframe clones the codec per connection, +so stateful codecs should ensure `Clone` produces an independent state (for example, reset sequence counters) when per-connection isolation is required. When a framed stream is already available, use `send_response_framed_with_codec`, so responses pass through @@ -360,7 +364,7 @@ The standalone `Fragmenter` helper now slices oversized payloads into capped fragments while stamping the shared `MessageId` and sequential `FragmentIndex`. Each call returns a `FragmentBatch` that reports whether the message required fragmentation and yields individual `FragmentFrame` values for serialization or -logging. This keeps transport experiments lightweight while the full adaptor +logging. This keeps transport experiments lightweight while the full adapter layer evolves. The helper is fallible—`FragmentationError` surfaces encoding failures or index overflows—so production code should bubble the error up or log it rather than unwrapping. @@ -671,7 +675,7 @@ async fn handle_upload(parts: RequestParts, body: RequestBodyStream) { } ``` -The `RequestBodyReader` adaptor implements `AsyncRead`, allowing protocol +The `RequestBodyReader` adapter implements `AsyncRead`, allowing protocol crates to reuse existing parsers. For raw stream access, use the `RequestBodyStream` directly with `StreamExt` methods: @@ -827,14 +831,18 @@ async fn main() -> Result<(), SendError> { ## Message fragmentation -`WireframeApp` now fragments oversized payloads automatically. The builder -derives a `FragmentationConfig` from the active frame codec's maximum frame -length (the default length-delimited codec uses `buffer_capacity`): any payload -that will not fit into a single frame is split into fragments carrying a +`WireframeApp` keeps transport fragmentation disabled by default. Enable it +explicitly with `enable_fragmentation()` or provide a bespoke +`FragmentationConfig` using `fragmentation(Some(cfg))`. When enabled, payloads +that exceed the frame budget are split into fragments carrying a `FragmentHeader` (`message_id`, `fragment_index`, `is_last_fragment`) wrapped with the `FRAG` marker. The connection reassembles fragments before invoking handlers, so handlers continue to work with complete `Envelope` values.[^6] +Layering order is fixed. Outbound processing runs serializer → fragmentation → +codec wrapping. Inbound processing runs codec decode → fragment reassembly → +deserialization. + Fragmented messages enforce two guards: `max_message_size` caps the total reassembled payload, and `reassembly_timeout` evicts stale partial messages. Customize or disable fragmentation via the builder: @@ -858,11 +866,11 @@ let app = WireframeApp::new()? .route(42, handler)?; ``` -Set `fragmentation(None)` when the transport already supports large frames, or -when fragmentation should be deferred to an upstream gateway. The -`ConnectionActor` mirrors the same behaviour for push traffic and streaming -responses through `enable_fragmentation`, ensuring client-visible frames follow -the same format. +Call `fragmentation(None)` to keep fragmentation disabled after explicit +configuration (for example, when the transport already supports large frames or +fragmentation is delegated to an upstream gateway). The `ConnectionActor` +mirrors the same behaviour for push traffic and streaming responses through +`enable_fragmentation`, ensuring client-visible frames follow the same format. ## Protocol hooks @@ -1277,7 +1285,7 @@ Phase out older message versions without breaking clients: - Emit version N on egress so clients observe a single schema. - Publish metrics and logs describing legacy usage to support operator dashboards.[^33][^8] -- Remove adaptors once the sunset window ends. +- Remove adapters once the sunset window ends. ```rust use std::sync::Arc; diff --git a/src/app/builder/codec.rs b/src/app/builder/codec.rs index 934c65dd..3dd07ec2 100644 --- a/src/app/builder/codec.rs +++ b/src/app/builder/codec.rs @@ -2,7 +2,7 @@ use super::WireframeApp; use crate::{ - app::{Packet, builder_defaults::default_fragmentation}, + app::Packet, codec::{FrameCodec, LengthDelimitedFrameCodec, clamp_frame_length}, serializer::Serializer, }; @@ -17,17 +17,16 @@ where /// Replace the frame codec used for framing I/O. /// /// This resets any installed protocol hooks because the frame type may - /// change across codecs. Fragmentation configuration is reset to the - /// codec-derived default. + /// change across codecs. Fragmentation is disabled so callers can + /// reconfigure explicitly for the new frame budget. #[must_use] pub fn with_codec(mut self, codec: F2) -> WireframeApp where S: Default, { - let fragmentation = default_fragmentation(codec.max_frame_length()); let serializer = std::mem::take(&mut self.serializer); let message_assembler = self.message_assembler.take(); - self.rebuild_with_params(serializer, codec, None, fragmentation, message_assembler) + self.rebuild_with_params(serializer, codec, None, None, message_assembler) } /// Replace the serializer used for messages. @@ -59,11 +58,15 @@ where { /// Set the initial buffer capacity for framed reads. /// Clamped between 64 bytes and 16 MiB. + /// + /// This also clears any previously configured fragmentation settings. + /// Re-enable fragmentation explicitly with [`WireframeApp::enable_fragmentation`] + /// (or [`WireframeApp::fragmentation`]) after changing the frame budget. #[must_use] pub fn buffer_capacity(mut self, capacity: usize) -> Self { let capacity = clamp_frame_length(capacity); self.codec = LengthDelimitedFrameCodec::new(capacity); - self.fragmentation = default_fragmentation(capacity); + self.fragmentation = None; self } } diff --git a/src/app/builder/config.rs b/src/app/builder/config.rs index dac658d9..28ba8aff 100644 --- a/src/app/builder/config.rs +++ b/src/app/builder/config.rs @@ -6,7 +6,7 @@ use super::WireframeApp; use crate::{ app::{ Packet, - builder_defaults::{MAX_READ_TIMEOUT_MS, MIN_READ_TIMEOUT_MS}, + builder_defaults::{MAX_READ_TIMEOUT_MS, MIN_READ_TIMEOUT_MS, default_fragmentation}, }, codec::FrameCodec, fragment::FragmentationConfig, @@ -37,6 +37,17 @@ where self } + /// Enable transport fragmentation using codec-derived defaults. + /// + /// The derived settings are bounded by the current frame codec budget. + /// Call this after `with_codec` or `buffer_capacity` so defaults align with + /// the final frame length. + #[must_use] + pub fn enable_fragmentation(mut self) -> Self { + self.fragmentation = default_fragmentation(self.codec.max_frame_length()); + self + } + /// Configure a Dead Letter Queue for dropped push frames. /// /// ```rust,no_run diff --git a/src/app/builder/core.rs b/src/app/builder/core.rs index 8ebc6795..29e7eb3b 100644 --- a/src/app/builder/core.rs +++ b/src/app/builder/core.rs @@ -10,7 +10,7 @@ use tokio::sync::{OnceCell, mpsc}; use crate::{ app::{ - builder_defaults::{DEFAULT_READ_TIMEOUT_MS, default_fragmentation}, + builder_defaults::DEFAULT_READ_TIMEOUT_MS, envelope::{Envelope, Packet}, error::Result, lifecycle::{ConnectionSetup, ConnectionTeardown}, @@ -61,7 +61,6 @@ where /// default serializer and no lifecycle hooks. fn default() -> Self { let codec = F::default(); - let max_frame_length = codec.max_frame_length(); Self { handlers: HashMap::new(), routes: OnceCell::new(), @@ -74,7 +73,7 @@ where push_dlq: None, codec, read_timeout_ms: DEFAULT_READ_TIMEOUT_MS, - fragmentation: default_fragmentation(max_frame_length), + fragmentation: None, message_assembler: None, } } @@ -160,3 +159,49 @@ where } } } + +#[cfg(test)] +mod tests { + use rstest::{fixture, rstest}; + + use super::WireframeApp; + use crate::{app::Envelope, codec::LengthDelimitedFrameCodec, serializer::BincodeSerializer}; + + type TestApp = WireframeApp; + + #[fixture] + fn app_builder() -> TestApp { + let app = TestApp::new().expect("build app"); + assert!( + app.fragmentation.is_none(), + "fixture expects default fragmentation disabled" + ); + app + } + + #[rstest] + fn builder_defaults_fragmentation_to_disabled(app_builder: TestApp) { + let app = app_builder; + assert!(app.fragmentation.is_none()); + } + + #[rstest] + fn enable_fragmentation_requires_explicit_opt_in(app_builder: TestApp) { + let app = app_builder.enable_fragmentation(); + assert!(app.fragmentation.is_some()); + } + + #[rstest] + fn with_codec_clears_fragmentation_to_require_reconfiguration(app_builder: TestApp) { + let app = app_builder + .enable_fragmentation() + .with_codec(LengthDelimitedFrameCodec::new(2048)); + assert!(app.fragmentation.is_none()); + } + + #[rstest] + fn buffer_capacity_clears_fragmentation_to_require_reconfiguration(app_builder: TestApp) { + let app = app_builder.enable_fragmentation().buffer_capacity(2048); + assert!(app.fragmentation.is_none()); + } +} diff --git a/src/app/fragmentation_state.rs b/src/app/fragmentation_state.rs index fb15779d..83c07e84 100644 --- a/src/app/fragmentation_state.rs +++ b/src/app/fragmentation_state.rs @@ -1,80 +1,7 @@ -//! Connection-scoped helpers for outbound fragmentation and inbound reassembly. -//! -//! This module encapsulates the fragmentation state used by `ConnectionActor` -//! and `WireframeApp` to keep the main connection module concise. +//! Connection-scoped fragmentation type aliases used by app frame handling. -use bincode::error::DecodeError; -use thiserror::Error; +/// Concrete adapter state used by `WireframeApp` connection processing. +pub(crate) type FragmentationState = crate::fragment::DefaultFragmentAdapter; -use super::{Packet, PacketParts}; -use crate::fragment::{ - Fragmentable, - FragmentationError, - Fragmenter, - MessageId, - Reassembler, - ReassemblyError, - decode_fragment_payload, -}; - -/// Bundles outbound fragmentation and inbound reassembly state for a connection. -pub(crate) struct FragmentationState { - fragmenter: Fragmenter, - reassembler: Reassembler, -} - -/// Decode or reassembly failures encountered while handling fragments. -#[derive(Debug, Error)] -pub(crate) enum FragmentProcessError { - #[error("decode error: {0}")] - Decode(DecodeError), - #[error("reassembly error: {0}")] - Reassembly(ReassemblyError), -} - -impl FragmentationState { - pub(crate) fn new(config: crate::fragment::FragmentationConfig) -> Self { - Self { - fragmenter: Fragmenter::new(config.fragment_payload_cap), - reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout), - } - } - - pub(crate) fn fragment( - &self, - packet: E, - ) -> Result, FragmentationError> { - crate::fragment::fragment_packet(&self.fragmenter, packet) - } - - pub(crate) fn reassemble( - &mut self, - packet: E, - ) -> Result, FragmentProcessError> { - let parts = packet.into_parts(); - let id = parts.id(); - let correlation = parts.correlation_id(); - let payload = parts.into_payload(); - - match decode_fragment_payload(&payload) { - Ok(Some((header, fragment_payload))) => { - match self.reassembler.push(header, fragment_payload) { - Ok(Some(message)) => { - let rebuilt = PacketParts::new(id, correlation, message.into_payload()); - Ok(Some(E::from_parts(rebuilt))) - } - Ok(None) => Ok(None), - Err(err) => Err(FragmentProcessError::Reassembly(err)), - } - } - Ok(None) => Ok(Some(E::from_parts(PacketParts::new( - id, - correlation, - payload, - )))), - Err(err) => Err(FragmentProcessError::Decode(err)), - } - } - - pub(crate) fn purge_expired(&mut self) -> Vec { self.reassembler.purge_expired() } -} +/// Decode and reassembly errors surfaced by adapter processing. +pub(crate) type FragmentProcessError = crate::fragment::FragmentAdapterError; diff --git a/src/fragment/adapter.rs b/src/fragment/adapter.rs new file mode 100644 index 00000000..a6a0137d --- /dev/null +++ b/src/fragment/adapter.rs @@ -0,0 +1,134 @@ +//! Public adapter contract for transport-level fragmentation and reassembly. +//! +//! [`FragmentAdapter`] captures the minimal behaviour required to split outbound +//! packets into transport fragments, rebuild inbound fragments into full +//! packets, and purge stale partial assemblies. + +use bincode::error::DecodeError; +use thiserror::Error; + +use super::{ + Fragmentable, + FragmentationConfig, + FragmentationError, + Fragmenter, + MessageId, + Reassembler, + ReassemblyError, + decode_fragment_payload, + fragment_packet, + packet::FragmentParts, +}; + +/// Error returned by [`FragmentAdapter::reassemble`]. +#[derive(Debug, Error)] +pub enum FragmentAdapterError { + /// Fragment payload marker/header decoding failed. + #[error("decode error: {0}")] + Decode(#[from] DecodeError), + /// Reassembly ordering or size validation failed. + #[error("reassembly error: {0}")] + Reassembly(#[from] ReassemblyError), +} + +/// Adapter contract for transport-level fragmentation and reassembly. +pub trait FragmentAdapter: Send + Sync { + /// Attempt to fragment a packet for outbound transport. + /// + /// # Errors + /// + /// Returns [`FragmentationError`] when payload chunking or header encoding + /// fails. + fn fragment(&self, packet: E) -> Result, FragmentationError>; + + /// Attempt to reassemble an inbound packet. + /// + /// Returns `Ok(Some(packet))` when a complete packet is available, + /// `Ok(None)` when more fragments are required, and an error when decoding + /// or reassembly invariants fail. + /// + /// # Errors + /// + /// Returns [`FragmentAdapterError`] when fragment decoding fails or when + /// ordering and size guarantees are violated. + fn reassemble(&mut self, packet: E) + -> Result, FragmentAdapterError>; + + /// Purge stale partial reassembly state. + /// + /// Returns the identifiers that were evicted. + fn purge_expired(&mut self) -> Vec; +} + +/// Default adapter backed by [`Fragmenter`] and [`Reassembler`]. +#[derive(Debug)] +pub struct DefaultFragmentAdapter { + fragmenter: Fragmenter, + reassembler: Reassembler, +} + +impl DefaultFragmentAdapter { + /// Create a default adapter from fragmentation configuration. + #[must_use] + pub fn new(config: FragmentationConfig) -> Self { + Self { + fragmenter: Fragmenter::new(config.fragment_payload_cap), + reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout), + } + } + + /// Fragment outbound packet data. + /// + /// # Errors + /// + /// Returns [`FragmentationError`] when fragment emission fails. + pub fn fragment(&self, packet: E) -> Result, FragmentationError> { + fragment_packet(&self.fragmenter, packet) + } + + /// Reassemble inbound packet data. + /// + /// # Errors + /// + /// Returns [`FragmentAdapterError`] when decoding or reassembly fails. + pub fn reassemble( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError> { + let parts = packet.into_fragment_parts(); + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.into_payload(); + + if let Some((header, fragment_payload)) = decode_fragment_payload(&payload)? { + match self.reassembler.push(header, fragment_payload)? { + Some(message) => { + let rebuilt = FragmentParts::new(id, correlation_id, message.into_payload()); + Ok(Some(E::from_fragment_parts(rebuilt))) + } + None => Ok(None), + } + } else { + let passthrough = FragmentParts::new(id, correlation_id, payload); + Ok(Some(E::from_fragment_parts(passthrough))) + } + } + + /// Purge stale reassembly entries and return evicted identifiers. + pub fn purge_expired(&mut self) -> Vec { self.reassembler.purge_expired() } +} + +impl FragmentAdapter for DefaultFragmentAdapter { + fn fragment(&self, packet: E) -> Result, FragmentationError> { + DefaultFragmentAdapter::fragment(self, packet) + } + + fn reassemble( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError> { + DefaultFragmentAdapter::reassemble(self, packet) + } + + fn purge_expired(&mut self) -> Vec { DefaultFragmentAdapter::purge_expired(self) } +} diff --git a/src/fragment/error.rs b/src/fragment/error.rs index 55d0a4d8..122eec59 100644 --- a/src/fragment/error.rs +++ b/src/fragment/error.rs @@ -16,6 +16,8 @@ use super::{FragmentIndex, MessageId}; pub enum FragmentStatus { /// The logical message still expects more fragments. Incomplete, + /// The fragment duplicated data that was already accepted and was ignored. + Duplicate, /// The fragment completed the logical message. Complete, } diff --git a/src/fragment/mod.rs b/src/fragment/mod.rs index 881f3061..87a424d4 100644 --- a/src/fragment/mod.rs +++ b/src/fragment/mod.rs @@ -5,6 +5,7 @@ //! code small and easy to audit while still providing a cohesive API at the //! crate root. +pub mod adapter; pub mod config; pub mod error; pub mod fragmenter; @@ -16,6 +17,7 @@ pub mod payload; pub mod reassembler; pub mod series; +pub use adapter::{DefaultFragmentAdapter, FragmentAdapter, FragmentAdapterError}; pub use config::FragmentationConfig; pub use error::{FragmentError, FragmentStatus, FragmentationError, ReassemblyError}; pub use fragmenter::{FragmentBatch, FragmentFrame, Fragmenter}; diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs index 1162114b..5eca40cc 100644 --- a/src/fragment/reassembler.rs +++ b/src/fragment/reassembler.rs @@ -157,6 +157,7 @@ impl Reassembler { payload, false, ), + Ok(FragmentStatus::Duplicate) => Ok(None), Ok(FragmentStatus::Complete) => Self::append_and_maybe_complete( self.max_message_size, occupied, @@ -180,6 +181,14 @@ impl Reassembler { vacant.insert(PartialMessage::new(series, payload, now)); Ok(None) } + FragmentStatus::Duplicate => { + debug_assert!( + false, + "newly created FragmentSeries starts at index 0; a first fragment \ + cannot be duplicate" + ); + Ok(None) + } FragmentStatus::Complete => Ok(Some(ReassembledMessage::new( header.message_id(), payload.to_vec(), diff --git a/src/fragment/series.rs b/src/fragment/series.rs index f370b440..58f6b6b7 100644 --- a/src/fragment/series.rs +++ b/src/fragment/series.rs @@ -60,9 +60,13 @@ impl FragmentSeries { /// /// Returns [`FragmentError::MessageMismatch`] when the fragment belongs to /// a different message, [`FragmentError::IndexMismatch`] when the fragment - /// arrives out of order, [`FragmentError::SeriesComplete`] when the series - /// already consumed a final fragment, and [`FragmentError::IndexOverflow`] - /// when the fragment index cannot advance further. + /// arrives ahead of the expected index, [`FragmentError::SeriesComplete`] + /// when a non-duplicate fragment arrives after completion, and + /// [`FragmentError::IndexOverflow`] when the fragment index cannot advance + /// further. + /// + /// When a fragment repeats an already accepted index, this method returns + /// [`FragmentStatus::Duplicate`] and leaves the series position unchanged. pub fn accept(&mut self, fragment: FragmentHeader) -> Result { if fragment.message_id() != self.message_id { return Err(FragmentError::MessageMismatch { @@ -71,11 +75,15 @@ impl FragmentSeries { }); } + if fragment.fragment_index() < self.next_index { + return Ok(FragmentStatus::Duplicate); + } + if self.complete { return Err(FragmentError::SeriesComplete); } - if fragment.fragment_index() != self.next_index { + if fragment.fragment_index() > self.next_index { return Err(FragmentError::IndexMismatch { expected: self.next_index, found: fragment.fragment_index(), diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index 1338aff1..9394dc59 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -1,405 +1,9 @@ //! Unit tests for the fragmentation and reassembly subsystem. //! -//! Covers `FragmentHeader` field access, `FragmentSeries` ordering and -//! validation, `Fragmenter` splitting and message ID management, and -//! `Reassembler` assembly with size limits and expiry handling. +//! Tests are split into focused submodules to keep each file short and easy +//! to navigate. -use std::{ - num::NonZeroUsize, - time::{Duration, Instant}, -}; - -use bincode::{BorrowDecode, Encode}; -use rstest::rstest; - -use super::*; -use crate::fragment::fragmenter::FragmentCursor; - -fn setup_reassembler_with_first_fragment( - message_id: u64, - first_payload: impl AsRef<[u8]>, -) -> Reassembler { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(30), - ); - let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false); - assert!( - reassembler - .push(first, first_payload) - .expect("first fragment accepted") - .is_none() - ); - reassembler -} - -#[test] -fn fragment_header_exposes_fields() { - let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::new(2), true); - assert_eq!(header.message_id(), MessageId::new(9)); - assert_eq!(header.fragment_index(), FragmentIndex::new(2)); - assert!(header.is_last_fragment()); -} - -#[rstest] -#[case(1)] -#[case(5)] -fn series_accepts_sequential_fragments(#[case] message: u64) { - let mut series = FragmentSeries::new(MessageId::new(message)); - let first = FragmentHeader::new(MessageId::new(message), FragmentIndex::zero(), false); - let second = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(1), true); - - assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); - assert_eq!(series.accept(second), Ok(FragmentStatus::Complete)); - assert!(series.is_complete()); -} - -#[test] -fn series_rejects_other_message() { - let mut series = FragmentSeries::new(MessageId::new(7)); - let header = FragmentHeader::new(MessageId::new(8), FragmentIndex::zero(), false); - let err = series - .accept(header) - .expect_err("fragment from another message must be rejected"); - assert!(matches!(err, FragmentError::MessageMismatch { .. })); -} - -#[test] -fn series_rejects_out_of_order_fragment() { - let mut series = FragmentSeries::new(MessageId::new(7)); - let header = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(2), false); - let err = series - .accept(header) - .expect_err("out-of-order fragment must be rejected"); - assert!(matches!(err, FragmentError::IndexMismatch { .. })); -} - -#[test] -fn series_rejects_after_completion() { - let mut series = FragmentSeries::new(MessageId::new(1)); - let first = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); - assert_eq!(series.accept(first), Ok(FragmentStatus::Complete)); - let err = series - .accept(FragmentHeader::new( - MessageId::new(1), - FragmentIndex::new(1), - true, - )) - .expect_err("series must reject fragments after completion"); - assert!(matches!(err, FragmentError::SeriesComplete)); -} - -#[test] -fn series_detects_index_overflow() { - let mut series = FragmentSeries::new(MessageId::new(1)); - series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); - let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::new(u32::MAX), false); - let err = series - .accept(header) - .expect_err("overflow must raise an error"); - assert_eq!( - err, - FragmentError::IndexOverflow { - last: FragmentIndex::new(u32::MAX) - } - ); -} - -#[test] -fn series_accepts_final_fragment_at_max_index() { - let mut series = FragmentSeries::new(MessageId::new(2)); - series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); - let header = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(u32::MAX), true); - assert_eq!(series.accept(header), Ok(FragmentStatus::Complete)); - assert!(series.is_complete()); -} - -#[test] -fn fragmenter_splits_payload_into_multiple_frames() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(3).expect("non-zero")); - let payload: Vec = (0..8).collect(); - let batch = fragmenter - .fragment_bytes(payload) - .expect("fragment payload"); - - assert_eq!(batch.len(), 3); - assert!(batch.is_fragmented()); - assert_eq!(batch.message_id(), MessageId::new(0)); - - assert_fragment(&batch, 0, &[0, 1, 2], false); - assert_fragment(&batch, 1, &[3, 4, 5], false); - assert_fragment(&batch, 2, &[6, 7], true); -} - -#[test] -fn fragmenter_handles_empty_payload() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(8).expect("non-zero")); - let batch = fragmenter.fragment_bytes([]).expect("fragment empty"); - - assert_eq!(batch.len(), 1); - assert!(!batch.is_fragmented()); - let fragment = batch - .fragments() - .first() - .expect("batch should contain at least one fragment"); - assert!(fragment.payload().is_empty()); - assert!(fragment.header().is_last_fragment()); - assert_eq!(fragment.header().fragment_index(), FragmentIndex::zero()); -} - -#[derive(Debug, Encode, BorrowDecode)] -struct DummyMessage(Vec); - -fn assert_fragment(batch: &FragmentBatch, index: usize, payload: &[u8], is_last: bool) { - let fragment = batch - .fragments() - .get(index) - .expect("fragment missing at requested index"); - assert_eq!(fragment.payload(), payload); - assert_eq!(fragment.header().is_last_fragment(), is_last); -} - -#[test] -fn fragmenter_fragments_messages_and_increments_ids() { - let fragmenter = - Fragmenter::with_starting_id(NonZeroUsize::new(4).expect("non-zero"), MessageId::new(7)); - - let batch = fragmenter - .fragment_message(&DummyMessage(vec![1, 2, 3, 4, 5])) - .expect("fragment message"); - assert_eq!(batch.message_id(), MessageId::new(7)); - assert_eq!(batch.len(), 2); - assert!(batch.is_fragmented()); - - let next_payload = vec![9, 9, 9]; - let next = fragmenter - .fragment_bytes(next_payload) - .expect("fragment bytes"); - assert_eq!(next.message_id(), MessageId::new(8)); - assert_eq!(next.len(), 1); - assert!(!next.is_fragmented()); -} - -#[test] -fn fragment_batch_into_iterator_yields_all_fragments() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); - let payload = [1_u8, 2, 3]; - let batch = fragmenter - .fragment_bytes(payload) - .expect("split into fragments"); - - let payloads: Vec> = batch - .into_iter() - .map(|fragment| fragment.payload().to_vec()) - .collect(); - assert_eq!(payloads, vec![vec![1, 2], vec![3]]); -} - -#[test] -fn fragmenter_respects_explicit_message_ids() { - let fragmenter = - Fragmenter::with_starting_id(NonZeroUsize::new(2).expect("non-zero"), MessageId::new(10)); - let payload = [7_u8, 8, 9]; - let batch = fragmenter - .fragment_with_id(MessageId::new(500), payload) - .expect("fragment with explicit id"); - assert_eq!(batch.message_id(), MessageId::new(500)); - assert_eq!(batch.len(), 2); - - let next = fragmenter.fragment_bytes([1_u8]).expect("next fragment"); - assert_eq!(next.message_id(), MessageId::new(10)); -} - -#[test] -fn fragmenter_returns_error_for_out_of_bounds_slice() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(4).expect("non-zero")); - let payload = [1_u8, 2, 3, 4]; - - let err = fragmenter - .build_fragments_from_for_tests( - MessageId::new(1), - &payload, - FragmentCursor::new(payload.len() + 1, FragmentIndex::zero()), - ) - .expect_err("invalid slice should produce an error"); - match err { - FragmentationError::SliceBounds { offset, end, total } => { - assert_eq!(offset, payload.len() + 1); - assert_eq!(end, payload.len() + 1); - assert_eq!(total, payload.len()); - } - other => panic!("expected SliceBounds, got {other:?}"), - } -} - -#[test] -fn reassembler_allows_single_fragment_at_max_message_size() { - let max_message_size = NonZeroUsize::new(16).expect("non-zero"); - let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); - - let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true); - let payload = vec![0_u8; max_message_size.get()]; - - let result = reassembler - .push(header, payload) - .expect("fragment within limit should be accepted"); - - let assembled = result.expect("single fragment should complete reassembly"); - assert_eq!(assembled.payload().len(), max_message_size.get()); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_allows_multi_fragment_at_max_message_size() { - let max_message_size = NonZeroUsize::new(16).expect("non-zero"); - let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); - - let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false); - let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true); - - let first_payload = vec![0_u8; 8]; - let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()]; - - assert!( - reassembler - .push(first_header, first_payload) - .expect("first fragment within limit") - .is_none(), - "first fragment should not complete the message", - ); - - let result = reassembler - .push(second_header, second_payload) - .expect("second fragment keeps total at limit"); - - let assembled = result.expect("fragments should complete reassembly at exact limit"); - assert_eq!(assembled.payload().len(), max_message_size.get()); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_returns_single_fragment_immediately() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(16).expect("non-zero"), - Duration::from_secs(5), - ); - let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); - let payload = vec![1_u8, 2, 3, 4]; - - let complete = reassembler - .push(header, payload.clone()) - .expect("reassembly must succeed") - .expect("single fragment should complete message"); - - assert_eq!(complete.message_id(), MessageId::new(1)); - assert_eq!(complete.payload(), payload.as_slice()); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_accumulates_ordered_fragments() { - let mut reassembler = setup_reassembler_with_first_fragment(2, [5_u8, 6, 7]); - let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); - - let complete = reassembler - .push(final_fragment, [8_u8, 9]) - .expect("final fragment accepted") - .expect("message should complete"); - - assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_rejects_out_of_order_and_drops_partial() { - let mut reassembler = setup_reassembler_with_first_fragment(3, [1_u8, 2]); - let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); - - let err = reassembler - .push(skipped, [3_u8]) - .expect_err("out-of-order fragment must be rejected"); - assert!(matches!( - err, - ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) - )); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_enforces_maximum_payload_size() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(4).expect("non-zero"), - Duration::from_secs(30), - ); - let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false); - let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true); - - assert!( - reassembler - .push(first, [1_u8, 2, 3]) - .expect("first fragment accepted") - .is_none() - ); - - let err = reassembler - .push(final_fragment, [4_u8, 5]) - .expect_err("payload growth beyond cap must be rejected"); - assert_eq!( - err, - ReassemblyError::MessageTooLarge { - message_id: MessageId::new(4), - attempted: 5, - limit: NonZeroUsize::new(4).expect("non-zero"), - } - ); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_purges_expired_messages() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(2), - ); - let now = Instant::now(); - let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false); - - assert!( - reassembler - .push_at(header, [0_u8, 1], now) - .expect("first fragment accepted") - .is_none() - ); - assert_eq!(reassembler.buffered_len(), 1); - - let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3)); - assert_eq!(evicted, vec![MessageId::new(5)]); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)] -struct ExampleMessage(u8); - -#[test] -fn reassembler_decodes_reconstructed_message() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); - let batch = fragmenter - .fragment_message(&ExampleMessage(11)) - .expect("fragment message"); - let mut reassembler = Reassembler::new( - NonZeroUsize::new(4).expect("non-zero"), - Duration::from_secs(10), - ); - - let mut output = None; - for fragment in batch { - let (header, payload) = fragment.into_parts(); - output = reassembler - .push(header, payload) - .expect("fragment accepted"); - } - - let assembled = output.expect("message should complete"); - let decoded: ExampleMessage = assembled.decode().expect("decode message"); - assert_eq!(decoded, ExampleMessage(11)); -} +mod adapter_tests; +mod fragmenter_tests; +mod header_series_tests; +mod reassembler_tests; diff --git a/src/fragment/tests/adapter_tests.rs b/src/fragment/tests/adapter_tests.rs new file mode 100644 index 00000000..e0d5b23c --- /dev/null +++ b/src/fragment/tests/adapter_tests.rs @@ -0,0 +1,152 @@ +//! Tests for the public default fragment adapter wrapper. + +use std::{num::NonZeroUsize, time::Duration}; + +use crate::fragment::{ + DefaultFragmentAdapter, + FragmentHeader, + FragmentIndex, + FragmentParts, + Fragmentable, + FragmentationConfig, + MessageId, + encode_fragment_payload, +}; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct TestPacket { + id: u32, + correlation_id: Option, + payload: Vec, +} + +impl Fragmentable for TestPacket { + fn into_fragment_parts(self) -> FragmentParts { + FragmentParts::new(self.id, self.correlation_id, self.payload) + } + + fn from_fragment_parts(parts: FragmentParts) -> Self { + Self { + id: parts.id(), + correlation_id: parts.correlation_id(), + payload: parts.into_payload(), + } + } +} + +fn adapter_config() -> FragmentationConfig { + FragmentationConfig { + fragment_payload_cap: NonZeroUsize::new(4).expect("non-zero"), + max_message_size: NonZeroUsize::new(64).expect("non-zero"), + reassembly_timeout: Duration::from_secs(30), + } +} + +fn build_test_packet() -> TestPacket { + TestPacket { + id: 42, + correlation_id: Some(777), + payload: (0_u8..10).collect(), + } +} + +fn assert_fragment_metadata(fragments: &[TestPacket], packet: &TestPacket) { + assert!( + fragments.len() > 1, + "payload beyond fragment cap should produce multiple fragments" + ); + for fragment in fragments { + assert_eq!(fragment.id, packet.id); + assert_eq!(fragment.correlation_id, packet.correlation_id); + } +} + +fn reassemble_fragment_sequence( + adapter: &mut DefaultFragmentAdapter, + fragments: &[TestPacket], +) -> Vec { + let first = fragments + .first() + .cloned() + .expect("fragment list must contain at least one fragment"); + assert!( + adapter + .reassemble(first.clone()) + .expect("first fragment should be accepted") + .is_none() + ); + assert!( + adapter + .reassemble(first) + .expect("duplicate fragment should be suppressed") + .is_none() + ); + + fragments + .iter() + .skip(1) + .cloned() + .filter_map(|fragment| { + adapter + .reassemble(fragment) + .expect("reassembly should not fail") + }) + .collect() +} + +#[test] +fn default_fragment_adapter_fragments_and_reassembles_test_packets() { + let mut adapter = DefaultFragmentAdapter::new(adapter_config()); + let packet = build_test_packet(); + + let fragments = adapter + .fragment(packet.clone()) + .expect("fragmenting packet should succeed"); + assert_fragment_metadata(&fragments, &packet); + + let reconstructed = reassemble_fragment_sequence(&mut adapter, &fragments); + assert_eq!(reconstructed.len(), 1); + assert_eq!( + reconstructed.first(), + Some(&packet), + "expected exactly one reconstructed packet matching input" + ); +} + +#[test] +fn default_fragment_adapter_passes_through_non_fragment_payloads() { + let mut adapter = DefaultFragmentAdapter::new(adapter_config()); + let packet = TestPacket { + id: 12, + correlation_id: Some(9), + payload: b"not encoded as fragment payload".to_vec(), + }; + + let result = adapter + .reassemble(packet.clone()) + .expect("non-fragment payload should pass through"); + + assert_eq!(result, Some(packet)); +} + +#[test] +fn default_fragment_adapter_exposes_purge_api() { + let mut config = adapter_config(); + config.reassembly_timeout = Duration::ZERO; + let mut adapter = DefaultFragmentAdapter::new(config); + let header = FragmentHeader::new(MessageId::new(81), FragmentIndex::zero(), false); + let encoded_payload = encode_fragment_payload(header, &[1_u8, 2]).expect("encode fragment"); + let packet = TestPacket { + id: 42, + correlation_id: Some(7), + payload: encoded_payload, + }; + + assert!( + adapter + .reassemble(packet) + .expect("adapter should accept first fragment") + .is_none() + ); + assert_eq!(adapter.purge_expired(), vec![MessageId::new(81)]); +} diff --git a/src/fragment/tests/fragmenter_tests.rs b/src/fragment/tests/fragmenter_tests.rs new file mode 100644 index 00000000..de29f893 --- /dev/null +++ b/src/fragment/tests/fragmenter_tests.rs @@ -0,0 +1,132 @@ +//! Tests for outbound fragmentation and fragment batch helpers. + +use std::num::NonZeroUsize; + +use bincode::{BorrowDecode, Encode}; + +use crate::fragment::{ + FragmentBatch, + FragmentIndex, + FragmentationError, + Fragmenter, + MessageId, + fragmenter::FragmentCursor, +}; + +#[derive(Debug, Encode, BorrowDecode)] +struct DummyMessage(Vec); + +fn assert_fragment(batch: &FragmentBatch, index: usize, payload: &[u8], is_last: bool) { + let fragment = batch + .fragments() + .get(index) + .expect("fragment missing at requested index"); + assert_eq!(fragment.payload(), payload); + assert_eq!(fragment.header().is_last_fragment(), is_last); +} + +#[test] +fn fragmenter_splits_payload_into_multiple_frames() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(3).expect("non-zero")); + let payload: Vec = (0..8).collect(); + let batch = fragmenter + .fragment_bytes(payload) + .expect("fragment payload"); + + assert_eq!(batch.len(), 3); + assert!(batch.is_fragmented()); + assert_eq!(batch.message_id(), MessageId::new(0)); + + assert_fragment(&batch, 0, &[0, 1, 2], false); + assert_fragment(&batch, 1, &[3, 4, 5], false); + assert_fragment(&batch, 2, &[6, 7], true); +} + +#[test] +fn fragmenter_handles_empty_payload() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(8).expect("non-zero")); + let batch = fragmenter.fragment_bytes([]).expect("fragment empty"); + + assert_eq!(batch.len(), 1); + assert!(!batch.is_fragmented()); + let fragment = batch + .fragments() + .first() + .expect("batch should contain at least one fragment"); + assert!(fragment.payload().is_empty()); + assert!(fragment.header().is_last_fragment()); + assert_eq!(fragment.header().fragment_index(), FragmentIndex::zero()); +} + +#[test] +fn fragmenter_fragments_messages_and_increments_ids() { + let fragmenter = + Fragmenter::with_starting_id(NonZeroUsize::new(4).expect("non-zero"), MessageId::new(7)); + + let batch = fragmenter + .fragment_message(&DummyMessage(vec![1, 2, 3, 4, 5])) + .expect("fragment message"); + assert_eq!(batch.message_id(), MessageId::new(7)); + assert_eq!(batch.len(), 2); + assert!(batch.is_fragmented()); + + let next_payload = vec![9, 9, 9]; + let next = fragmenter + .fragment_bytes(next_payload) + .expect("fragment bytes"); + assert_eq!(next.message_id(), MessageId::new(8)); + assert_eq!(next.len(), 1); + assert!(!next.is_fragmented()); +} + +#[test] +fn fragment_batch_into_iterator_yields_all_fragments() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); + let payload = [1_u8, 2, 3]; + let batch = fragmenter + .fragment_bytes(payload) + .expect("split into fragments"); + + let payloads: Vec> = batch + .into_iter() + .map(|fragment| fragment.payload().to_vec()) + .collect(); + assert_eq!(payloads, vec![vec![1, 2], vec![3]]); +} + +#[test] +fn fragmenter_respects_explicit_message_ids() { + let fragmenter = + Fragmenter::with_starting_id(NonZeroUsize::new(2).expect("non-zero"), MessageId::new(10)); + let payload = [7_u8, 8, 9]; + let batch = fragmenter + .fragment_with_id(MessageId::new(500), payload) + .expect("fragment with explicit id"); + assert_eq!(batch.message_id(), MessageId::new(500)); + assert_eq!(batch.len(), 2); + + let next = fragmenter.fragment_bytes([1_u8]).expect("next fragment"); + assert_eq!(next.message_id(), MessageId::new(10)); +} + +#[test] +fn fragmenter_returns_error_for_out_of_bounds_slice() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(4).expect("non-zero")); + let payload = [1_u8, 2, 3, 4]; + + let err = fragmenter + .build_fragments_from_for_tests( + MessageId::new(1), + &payload, + FragmentCursor::new(payload.len() + 1, FragmentIndex::zero()), + ) + .expect_err("invalid slice should produce an error"); + match err { + FragmentationError::SliceBounds { offset, end, total } => { + assert_eq!(offset, payload.len() + 1); + assert_eq!(end, payload.len() + 1); + assert_eq!(total, payload.len()); + } + other => panic!("expected SliceBounds, got {other:?}"), + } +} diff --git a/src/fragment/tests/header_series_tests.rs b/src/fragment/tests/header_series_tests.rs new file mode 100644 index 00000000..bce8fa83 --- /dev/null +++ b/src/fragment/tests/header_series_tests.rs @@ -0,0 +1,109 @@ +//! Tests for fragment header accessors and fragment-series sequencing rules. + +use rstest::rstest; + +use crate::fragment::*; + +#[test] +fn fragment_header_exposes_fields() { + let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::new(2), true); + assert_eq!(header.message_id(), MessageId::new(9)); + assert_eq!(header.fragment_index(), FragmentIndex::new(2)); + assert!(header.is_last_fragment()); +} + +#[rstest] +#[case(1)] +#[case(5)] +fn series_accepts_sequential_fragments(#[case] message: u64) { + let mut series = FragmentSeries::new(MessageId::new(message)); + let first = FragmentHeader::new(MessageId::new(message), FragmentIndex::zero(), false); + let second = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(1), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); + assert_eq!(series.accept(second), Ok(FragmentStatus::Complete)); + assert!(series.is_complete()); +} + +#[test] +fn series_rejects_other_message() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let header = FragmentHeader::new(MessageId::new(8), FragmentIndex::zero(), false); + let err = series + .accept(header) + .expect_err("fragment from another message must be rejected"); + assert!(matches!(err, FragmentError::MessageMismatch { .. })); +} + +#[test] +fn series_rejects_out_of_order_fragment() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let header = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(2), false); + let err = series + .accept(header) + .expect_err("out-of-order fragment must be rejected"); + assert!(matches!(err, FragmentError::IndexMismatch { .. })); +} + +#[test] +fn series_suppresses_duplicate_fragment() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let first = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); + let duplicate = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(1), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); + assert_eq!(series.accept(duplicate), Ok(FragmentStatus::Duplicate)); + assert_eq!(series.accept(final_fragment), Ok(FragmentStatus::Complete)); +} + +#[test] +fn series_rejects_after_completion() { + let mut series = FragmentSeries::new(MessageId::new(1)); + let first = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); + assert_eq!(series.accept(first), Ok(FragmentStatus::Complete)); + let err = series + .accept(FragmentHeader::new( + MessageId::new(1), + FragmentIndex::new(1), + true, + )) + .expect_err("series must reject fragments after completion"); + assert!(matches!(err, FragmentError::SeriesComplete)); +} + +#[test] +fn series_reports_duplicate_final_fragment_after_completion() { + let mut series = FragmentSeries::new(MessageId::new(12)); + let first = FragmentHeader::new(MessageId::new(12), FragmentIndex::zero(), true); + let duplicate = FragmentHeader::new(MessageId::new(12), FragmentIndex::zero(), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Complete)); + assert_eq!(series.accept(duplicate), Ok(FragmentStatus::Duplicate)); + assert!(series.is_complete()); +} + +#[test] +fn series_detects_index_overflow() { + let mut series = FragmentSeries::new(MessageId::new(1)); + series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); + let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::new(u32::MAX), false); + let err = series + .accept(header) + .expect_err("overflow must raise an error"); + assert_eq!( + err, + FragmentError::IndexOverflow { + last: FragmentIndex::new(u32::MAX) + } + ); +} + +#[test] +fn series_accepts_final_fragment_at_max_index() { + let mut series = FragmentSeries::new(MessageId::new(2)); + series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); + let header = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(u32::MAX), true); + assert_eq!(series.accept(header), Ok(FragmentStatus::Complete)); + assert!(series.is_complete()); +} diff --git a/src/fragment/tests/reassembler_tests.rs b/src/fragment/tests/reassembler_tests.rs new file mode 100644 index 00000000..ff096283 --- /dev/null +++ b/src/fragment/tests/reassembler_tests.rs @@ -0,0 +1,258 @@ +//! Tests for inbound reassembly ordering, limits, and decoding. + +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use bincode::{BorrowDecode, Encode}; +use rstest::{fixture, rstest}; + +use crate::fragment::{ + FragmentError, + FragmentHeader, + FragmentIndex, + Fragmenter, + MessageId, + ReassembledMessage, + Reassembler, + ReassemblyError, +}; + +#[fixture] +fn reassembler_with_first_fragment( + #[default(1)] message_id: u64, + #[default(&[])] first_payload: &'static [u8], +) -> Reassembler { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false); + assert!( + reassembler + .push(first, first_payload) + .expect("first fragment accepted") + .is_none() + ); + reassembler +} + +#[test] +fn reassembler_allows_single_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true); + let payload = vec![0_u8; max_message_size.get()]; + + let result = reassembler + .push(header, payload) + .expect("fragment within limit should be accepted"); + + let assembled = result.expect("single fragment should complete reassembly"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_allows_multi_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false); + let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true); + + let first_payload = vec![0_u8; 8]; + let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()]; + + assert!( + reassembler + .push(first_header, first_payload) + .expect("first fragment within limit") + .is_none(), + "first fragment should not complete the message", + ); + + let result = reassembler + .push(second_header, second_payload) + .expect("second fragment keeps total at limit"); + + let assembled = result.expect("fragments should complete reassembly at exact limit"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_returns_single_fragment_immediately() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(16).expect("non-zero"), + Duration::from_secs(5), + ); + let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); + let payload = vec![1_u8, 2, 3, 4]; + + let complete = reassembler + .push(header, payload.clone()) + .expect("reassembly must succeed") + .expect("single fragment should complete message"); + + assert_eq!(complete.message_id(), MessageId::new(1)); + assert_eq!(complete.payload(), payload.as_slice()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[rstest] +fn reassembler_accumulates_ordered_fragments( + #[with(2, &[5_u8, 6, 7])] mut reassembler_with_first_fragment: Reassembler, +) { + let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); + + let complete = reassembler_with_first_fragment + .push(final_fragment, [8_u8, 9]) + .expect("final fragment accepted") + .expect("message should complete"); + + assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); + assert_eq!(reassembler_with_first_fragment.buffered_len(), 0); +} + +#[rstest] +fn reassembler_rejects_out_of_order_and_drops_partial( + #[with(3, &[1_u8, 2])] mut reassembler_with_first_fragment: Reassembler, +) { + let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); + + let err = reassembler_with_first_fragment + .push(skipped, [3_u8]) + .expect_err("out-of-order fragment must be rejected"); + assert!(matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + )); + assert_eq!(reassembler_with_first_fragment.buffered_len(), 0); +} + +#[rstest] +fn reassembler_suppresses_duplicate_fragment( + #[with(31, &[1_u8, 2])] mut reassembler_with_first_fragment: Reassembler, +) { + let duplicate = FragmentHeader::new(MessageId::new(31), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(31), FragmentIndex::new(1), true); + + assert!( + reassembler_with_first_fragment + .push(duplicate, [9_u8, 9]) + .expect("duplicate fragment should be suppressed") + .is_none() + ); + assert_eq!(reassembler_with_first_fragment.buffered_len(), 1); + + let complete = reassembler_with_first_fragment + .push(final_fragment, [3_u8]) + .expect("final fragment should complete message") + .expect("message should be complete"); + assert_eq!(complete.payload(), &[1, 2, 3]); +} + +#[test] +fn reassembler_accepts_zero_length_fragments() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(10), + ); + let first = FragmentHeader::new(MessageId::new(44), FragmentIndex::zero(), false); + let second = FragmentHeader::new(MessageId::new(44), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, []) + .expect("empty fragment should be accepted") + .is_none() + ); + + let complete = reassembler + .push(second, [7_u8, 8]) + .expect("final fragment should complete message") + .expect("message should be complete"); + assert_eq!(complete.payload(), &[7, 8]); +} + +#[test] +fn reassembler_enforces_maximum_payload_size() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, [1_u8, 2, 3]) + .expect("first fragment accepted") + .is_none() + ); + + let err = reassembler + .push(final_fragment, [4_u8, 5]) + .expect_err("payload growth beyond cap must be rejected"); + assert_eq!( + err, + ReassemblyError::MessageTooLarge { + message_id: MessageId::new(4), + attempted: 5, + limit: NonZeroUsize::new(4).expect("non-zero"), + } + ); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_purges_expired_messages() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(2), + ); + let now = Instant::now(); + let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false); + + assert!( + reassembler + .push_at(header, [0_u8, 1], now) + .expect("first fragment accepted") + .is_none() + ); + assert_eq!(reassembler.buffered_len(), 1); + + let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3)); + assert_eq!(evicted, vec![MessageId::new(5)]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)] +struct ExampleMessage(u8); + +#[test] +fn reassembler_decodes_reconstructed_message() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); + let batch = fragmenter + .fragment_message(&ExampleMessage(11)) + .expect("fragment message"); + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(10), + ); + + let mut output: Option = None; + for fragment in batch { + let (header, payload) = fragment.into_parts(); + output = reassembler + .push(header, payload) + .expect("fragment accepted"); + } + + let assembled = output.expect("message should complete"); + let decoded: ExampleMessage = assembled.decode().expect("decode message"); + assert_eq!(decoded, ExampleMessage(11)); +} diff --git a/src/lib.rs b/src/lib.rs index a6e8d431..f07ebea0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,10 @@ pub use client::{ClientCodecConfig, ClientError, SocketOptions, WireframeClient} pub use connection::ConnectionActor; pub use correlation::CorrelatableFrame; pub use fragment::{ + DefaultFragmentAdapter, FRAGMENT_MAGIC, + FragmentAdapter, + FragmentAdapterError, FragmentBatch, FragmentError, FragmentFrame, diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index aa7df5f2..80756b9c 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -75,3 +75,30 @@ Feature: Fragment metadata enforcement When fragment 1 for message 24 with 2 bytes arrives marked non-final Then the reassembler reports an out-of-order fragment error And the reassembler is buffering 0 messages + + Scenario: Reassembler suppresses duplicate fragments + Given a reassembler allowing 8 bytes with a 30-second reassembly timeout + When fragment 0 for message 25 with 2 bytes arrives marked non-final + And fragment 0 for message 25 with 2 bytes arrives marked non-final + Then no message has been reassembled yet + And the reassembler is buffering 1 message + When fragment 1 for message 25 with 1 byte arrives marked final + Then the reassembler outputs a payload of 3 bytes + And the reassembler is buffering 0 messages + + Scenario: Reassembler handles zero-length fragments + Given a reassembler allowing 8 bytes with a 30-second reassembly timeout + When fragment 0 for message 26 with 0 bytes arrives marked final + Then the reassembler outputs a payload of 0 bytes + And the reassembler is buffering 0 messages + + Scenario: Reassembler rebuilds interleaved messages + Given a reassembler allowing 12 bytes with a 30-second reassembly timeout + When fragment 0 for message 27 with 3 bytes arrives marked non-final + And fragment 0 for message 28 with 4 bytes arrives marked non-final + And fragment 1 for message 27 with 2 bytes arrives marked final + Then the reassembler outputs a payload of 5 bytes + And the reassembler is buffering 1 message + When fragment 1 for message 28 with 1 byte arrives marked final + Then the reassembler outputs a payload of 5 bytes + And the reassembler is buffering 0 messages diff --git a/tests/fragment_transport.rs b/tests/fragment_transport.rs index 380b363f..9ba363c2 100644 --- a/tests/fragment_transport.rs +++ b/tests/fragment_transport.rs @@ -8,12 +8,12 @@ use std::time::Duration; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use tokio::{io::AsyncWriteExt, sync::mpsc, time::timeout}; use wireframe::{ Serializer, - app::{Envelope, WireframeApp}, - fragment::decode_fragment_payload, + app::{Envelope, Packet, WireframeApp}, + fragment::{Fragmenter, decode_fragment_payload}, serializer::BincodeSerializer, }; @@ -30,6 +30,7 @@ use crate::fragment_helpers::{ TestResult, assert_handler_observed, build_envelopes, + fragment_envelope, fragmentation_config, make_app, make_handler, @@ -147,3 +148,148 @@ async fn fragmentation_can_be_disabled_via_public_api() -> TestResult { Ok(()) } + +#[tokio::test] +async fn fragmentation_is_opt_in_by_default() -> TestResult { + let capacity = 512; + let payload = vec![b'd'; 128]; + let (tx, mut rx) = mpsc::unbounded_channel(); + let handler = make_handler(&tx); + + let app: WireframeApp = WireframeApp::new()? + .buffer_capacity(capacity) + .route(ROUTE_ID, handler)?; + let (mut client, server) = spawn_app(app); + + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + let serializer = BincodeSerializer; + let bytes = serializer.serialize(&request)?; + client.send(bytes.into()).await?; + client.get_mut().shutdown().await?; + + assert_handler_observed(&mut rx, &payload).await?; + let response_bytes = timeout(Duration::from_secs(1), client.next()) + .await? + .ok_or(TestError::Setup("response frame missing"))??; + let (response_env, _) = serializer.deserialize::(&response_bytes)?; + let response_payload = response_env.into_parts().into_payload(); + if decode_fragment_payload(&response_payload)?.is_some() { + return Err(TestError::Assertion( + "default app should keep fragmentation disabled".to_string(), + )); + } + + server.await??; + + Ok(()) +} + +#[tokio::test] +async fn duplicate_fragment_is_suppressed_and_reassembles() -> TestResult { + let buffer_capacity = 512; + let config = fragmentation_config(buffer_capacity)?; + let (tx, mut rx) = mpsc::unbounded_channel(); + let app = make_app(buffer_capacity, config, &tx)?; + let (mut client, server) = spawn_app(app); + let fragmenter = Fragmenter::new(config.fragment_payload_cap); + + let payload = vec![b'D'; 800]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + let mut fragments = fragment_envelope(&request, &fragmenter)?; + let duplicate = fragments + .first() + .cloned() + .ok_or(TestError::Setup("fragmenter produced no fragments"))?; + fragments.insert(1, duplicate); + + send_envelopes(&mut client, &fragments).await?; + client.flush().await?; + client.get_mut().shutdown().await?; + + assert_handler_observed(&mut rx, &payload).await?; + if let Ok(Some(_)) = timeout(Duration::from_millis(200), rx.recv()).await { + return Err(TestError::Assertion( + "duplicate suppression should prevent duplicate handler delivery".to_string(), + )); + } + + let response = read_reassembled_response(&mut client, &config).await?; + if response != payload { + return Err(TestError::Assertion( + "reassembled payload mismatch after duplicate suppression".to_string(), + )); + } + + server.await??; + Ok(()) +} + +#[tokio::test] +async fn interleaved_fragment_streams_reassemble_independently() -> TestResult { + let buffer_capacity = 512; + let config = fragmentation_config(buffer_capacity)?; + let (tx, mut rx) = mpsc::unbounded_channel(); + let app = make_app(buffer_capacity, config, &tx)?; + let (mut client, server) = spawn_app(app); + let fragmenter = Fragmenter::new(config.fragment_payload_cap); + + let payload_a = vec![b'A'; 800]; + let payload_b = vec![b'B'; 900]; + let request_a = Envelope::new(ROUTE_ID, Some(101), payload_a.clone()); + let request_b = Envelope::new(ROUTE_ID, Some(202), payload_b.clone()); + let fragments_a = fragment_envelope(&request_a, &fragmenter)?; + let fragments_b = fragment_envelope(&request_b, &fragmenter)?; + + let mut interleaved = Vec::with_capacity(fragments_a.len() + fragments_b.len()); + let mut idx = 0usize; + while idx < fragments_a.len() || idx < fragments_b.len() { + if let Some(fragment) = fragments_a.get(idx) { + interleaved.push(fragment.clone()); + } + if let Some(fragment) = fragments_b.get(idx) { + interleaved.push(fragment.clone()); + } + idx += 1; + } + + send_envelopes(&mut client, &interleaved).await?; + client.flush().await?; + client.get_mut().shutdown().await?; + + let response_a = timeout( + Duration::from_secs(1), + read_reassembled_response(&mut client, &config), + ) + .await??; + let response_b = timeout( + Duration::from_secs(1), + read_reassembled_response(&mut client, &config), + ) + .await??; + let mut observed_responses = vec![response_a, response_b]; + observed_responses.sort(); + let mut expected_payloads = vec![payload_a.clone(), payload_b.clone()]; + expected_payloads.sort(); + if observed_responses != expected_payloads { + return Err(TestError::Assertion( + "interleaved responses were not reassembled correctly".to_string(), + )); + } + + let first = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("first handler payload missing"))?; + let second = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("second handler payload missing"))?; + let mut observed_requests = vec![first, second]; + observed_requests.sort(); + if observed_requests != expected_payloads { + return Err(TestError::Assertion( + "interleaved reassembly delivered unexpected handler payloads".to_string(), + )); + } + + server.await??; + Ok(()) +} diff --git a/tests/fragment_transport/mod.rs b/tests/fragment_transport/mod.rs index 106e639c..8c88c0d3 100644 --- a/tests/fragment_transport/mod.rs +++ b/tests/fragment_transport/mod.rs @@ -1,7 +1,7 @@ //! Module organization for fragment transport integration tests. //! //! Splits tests by concern: -//! - `rejection`: Tests for malformed, duplicate, and out-of-order fragments +//! - `rejection`: Tests for malformed and out-of-order fragments //! - `eviction`: Tests for reassembly timeout and eviction behaviour pub mod eviction; diff --git a/tests/fragment_transport/rejection.rs b/tests/fragment_transport/rejection.rs index 61b30036..a9548614 100644 --- a/tests/fragment_transport/rejection.rs +++ b/tests/fragment_transport/rejection.rs @@ -1,4 +1,4 @@ -//! Fragment rejection tests for malformed, out-of-order, and duplicate fragments. +//! Fragment rejection tests for malformed and out-of-order fragments. //! //! Verifies that the fragment transport layer correctly rejects invalid fragment //! sequences and prevents them from reaching the application handler. @@ -98,16 +98,6 @@ fn mutate_out_of_order(mut fragments: Vec) -> TestResult Ok(fragments) } -/// Mutate fragments by duplicating the first fragment. -fn mutate_duplicate(mut fragments: Vec) -> TestResult> { - let duplicate = fragments - .first() - .cloned() - .ok_or(TestError::Setup("fragmenter produced no fragments"))?; - fragments.insert(1, duplicate); - Ok(fragments) -} - /// Mutate fragments by truncating the header of the first fragment. #[expect( clippy::panic_in_result_fn, @@ -152,10 +142,6 @@ fn mutate_malformed_header(mut fragments: Vec) -> TestResult TestResul Ok(()) } -#[then("the reassembler is buffering {expected:usize} messages")] -fn then_buffered_messages(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { +fn assert_buffered_messages(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { fragment_world.assert_buffered_messages(expected)?; Ok(()) } +#[then("the reassembler is buffering {expected:usize} message")] +fn then_buffered_message(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { + assert_buffered_messages(fragment_world, expected) +} + +#[then("the reassembler is buffering {expected:usize} messages")] +fn then_buffered_messages(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { + assert_buffered_messages(fragment_world, expected) +} + #[then("message {message:u64} is evicted")] fn then_message_evicted(fragment_world: &mut FragmentWorld, message: u64) -> TestResult { fragment_world.assert_evicted_message(message)?;