From 9f083233cf1e7061ba27b425f99b0b53bd9a2675 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 12 Feb 2026 09:02:00 +0000 Subject: [PATCH 1/7] docs(execplans): add detailed execplan for fragmentation adapter trait hardening - Introduces a comprehensive ExecPlan draft for roadmap item 9.2.1 focused on the FragmentAdapter trait and fragmentation opt-in hardening. - Covers purpose, constraints, tolerances, risks, progress, decisions, and plans. - Defines public API shape, behavioral policies, and testing strategy. - Guides staged implementation and documentation updates. - Serves as a reference point for fragmentation-related development and QA. Co-authored-by: devboxerhub[bot] --- .../execplans/9-1-3-fragment-adapter-trait.md | 363 ++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 docs/execplans/9-1-3-fragment-adapter-trait.md diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md new file mode 100644 index 00000000..3be59eea --- /dev/null +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -0,0 +1,363 @@ +# 9.2.1 FragmentAdapter trait and fragmentation opt-in hardening + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: DRAFT + +No `PLANS.md` exists in this repository as of 2026-02-12. + +## Purpose / big picture + +Wireframe already ships transport fragmentation primitives, but current app +integration still has open hardening gaps called out by roadmap item 9.2.1: +fragmentation is enabled by default, duplicate versus out-of-order handling is +not explicit, purge scheduling ownership is implicit, and there is no public +fragment-adapter abstraction tying these rules together. This plan introduces a +public `FragmentAdapter` trait and aligns runtime behaviour so fragmentation is +explicitly opt-in, purge control is public, duplicate and out-of-order policies +are deterministic, and edge cases (zero-length fragments and index overflow) +are defined and tested. + +Success is observable when: + +- `WireframeApp::new()` no longer fragments unless the caller explicitly opts + in. +- a public fragment adapter API exists and exposes purge control. +- interleaved fragment streams reassemble correctly while duplicate and + out-of-order series follow documented policies. +- unit tests (`rstest`), integration tests, and behavioural tests + (`rstest-bdd` v0.5.0) cover the new rules. +- design docs, user-facing docs, and roadmap status are updated consistently. + +## Constraints + +- Keep existing routing, middleware, serializer, and codec public APIs stable + except for the intentional fragmentation opt-in behaviour change. +- Do not introduce `unsafe`. +- Preserve current default codec behaviour and frame-length guardrails. +- Keep module-level `//!` comments and rustdoc examples for new public items. +- Use `rstest` fixtures/parameterization for new unit and integration tests. +- Use `rstest-bdd` v0.5.0 for behavioural test coverage required by this + feature. +- Update `docs/users-guide.md` for any public API change made by this work. +- Record decisions in `docs/generic-message-fragmentation-and-re-assembly- + design.md` (and companion docs when composition guidance changes). + +## Tolerances (exception triggers) + +- Scope: if implementation exceeds 20 files or 1,200 net changed lines, pause + and confirm before continuing. +- Interface: if this work requires changing unrelated public APIs (client + runtime, push queue APIs, or serializer trait signatures), pause and confirm. +- Dependencies: only `rstest-bdd` and `rstest-bdd-macros` may be upgraded (to + 0.5.0) in this milestone; any other new dependency requires escalation. +- Iterations: if `make lint` or `make test` fails three consecutive times for + the same root cause, stop and reassess approach. +- Time: if any single stage exceeds one focused day of work without reaching + stage acceptance criteria, log the blocker and re-scope before proceeding. +- Ambiguity: if `FragmentAdapter` naming conflicts with existing design text + cannot be resolved by documentation updates alone, stop and request naming + direction. + +## Risks + +- Risk: behavioural change from default-enabled to opt-in fragmentation may + break existing tests/examples that relied on implicit defaults. Severity: + high. Likelihood: medium. Mitigation: migrate call sites to explicit builder + configuration and add dedicated regression tests for disabled/default and + enabled modes. + +- Risk: `rstest-bdd` 0.5.0 may introduce macro/runtime changes that affect the + current scenario harness. Severity: medium. Likelihood: medium. Mitigation: + upgrade dependency early in the branch, run `make test-bdd`, adapt + step/scenario signatures before feature-specific additions. + +- Risk: duplicate suppression policy can mask protocol bugs if duplicates are + not observably tracked. Severity: medium. Likelihood: medium. Mitigation: + define explicit duplicate semantics in types/errors/docs and add tests that + assert suppression versus rejection outcomes. + +- Risk: purge scheduling ownership can remain ambiguous between adapter and + connection loop. Severity: medium. Likelihood: high. Mitigation: codify + ownership in the `FragmentAdapter` contract and document exactly when + Wireframe calls purge versus when callers may drive it. + +## Progress + +- [x] (2026-02-12 00:00Z) Draft ExecPlan for roadmap item 9.2.1. +- [ ] Finalize `FragmentAdapter` API contract and error taxonomy updates. +- [ ] Make fragmentation opt-in on `WireframeApp` builder defaults. +- [ ] Expose public purge API and wire it through adapter implementation. +- [ ] Implement duplicate suppression and out-of-order handling policy. +- [ ] Define zero-length fragment behaviour and index-overflow handling. +- [ ] Add/upgrade unit, integration, and behavioural tests. +- [ ] Update design docs, user guide, and roadmap checkboxes. +- [ ] Run formatting, lint, and full test gates. + +## Surprises & Discoveries + +- Observation: the fragmentation design doc still states default-enabled + behaviour, while roadmap 9.2.1 now requires explicit opt-in. Evidence: + `docs/generic-message-fragmentation-and-re-assembly-design.md` section 3.4 + versus `docs/roadmap.md` section 9.2. Impact: this milestone must include + design-document corrections, not only code/test edits. + +- Observation: `docs/behavioural-testing-in-rust-with-cucumber.md` is + historical and explicitly points to `rstest-bdd` guidance. Evidence: banner + at the top of that document. Impact: behavioural testing updates for this + feature should follow `docs/rstest-bdd-users-guide.md` conventions and + version updates. + +## Decision Log + +- Decision: introduce a public `FragmentAdapter` trait plus one default + implementation backed by existing `Fragmenter` and `Reassembler` primitives. + Rationale: satisfies roadmap wording while minimizing rewrite risk by reusing + proven internals. Date/Author: 2026-02-12 / Codex. + +- Decision: change app-level fragmentation from implicit default to explicit + opt-in at builder time. Rationale: aligns with roadmap hardening goal and + prevents hidden performance and behavioural costs for protocols that do not + need fragmentation. Date/Author: 2026-02-12 / Codex. + +- Decision: codify duplicate suppression and out-of-order rejection as separate + outcomes. Rationale: duplicate retransmissions can be tolerated safely, while + true out-of-order delivery should fail deterministically and clear partial + state. Date/Author: 2026-02-12 / Codex. + +## Outcomes & Retrospective + +To be completed when implementation finishes. + +## Context and orientation + +Current transport fragmentation internals live in: + +- `src/fragment/fragmenter.rs` (outbound splitting). +- `src/fragment/reassembler.rs` (inbound assembly and timeout purge). +- `src/fragment/series.rs` (index ordering rules). +- `src/app/fragmentation_state.rs` (connection-scoped wrapper currently used by + app frame handling). + +Current app integration points are: + +- `src/app/builder/core.rs` and `src/app/builder/codec.rs`, where + `default_fragmentation(...)` currently auto-enables fragmentation. +- `src/app/connection.rs`, which instantiates optional fragmentation state and + calls purge on read-timeout ticks. +- `src/app/frame_handling/response.rs` and + `src/app/frame_handling/reassembly.rs`, which apply fragmentation and + reassembly around handler processing. + +Current tests and fixtures already cover part of the domain: + +- `src/fragment/tests.rs` for primitive/unit coverage. +- `tests/fragment_transport.rs` and `tests/fragment_transport/*` for transport + integration coverage. +- `tests/features/fragment.feature`, `tests/steps/fragment_steps.rs`, and + `tests/scenarios/fragment_scenarios.rs` for behavioural coverage. + +Documentation currently needing alignment: + +- `docs/generic-message-fragmentation-and-re-assembly-design.md` (adapter + contract, duplicate/out-of-order policy, purge ownership, opt-in semantics). +- `docs/multi-packet-and-streaming-responses-design.md` (layer composition + order references). +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability- + maturity.md` (hardening narrative alignment). +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` (hardening + narrative alignment). +- `docs/users-guide.md` (public configuration surface and behaviour changes). +- `docs/roadmap.md` (mark 9.2.1 sub-items done on completion). + +## Plan of work + +### Stage A: contract and policy definition (no behavioural changes yet) + +Define the `FragmentAdapter` public contract in `src/fragment`, including purge +methods and explicit result/error shapes for duplicate suppression, +out-of-order fragments, zero-length fragments, and overflow paths. Update the +fragmentation design document first so code follows an agreed contract. + +Go/no-go: + +- Go when design docs and trait signatures agree on ownership of purge + scheduling and policy terms. +- No-go if naming or ownership remains ambiguous after doc updates. + +### Stage B: wire adapter into app path and enforce opt-in + +Implement the default adapter using existing `Fragmenter` + `Reassembler` +logic, then switch app frame handling to use the adapter contract. Change +builder defaults so fragmentation is disabled unless explicitly configured. +Update builder docs/comments and any helper defaults that currently turn +fragmentation on implicitly. + +Go/no-go: + +- Go when `WireframeApp::new()` has no fragmentation state by default and + explicit config paths still work. +- No-go if disabling defaults causes unbounded regressions outside + fragmentation-related tests. + +### Stage C: policy enforcement and test expansion + +Implement duplicate suppression versus out-of-order rejection behaviour in the +fragment series/reassembler path. Add explicit zero-length and overflow +handling tests. Expand integration tests for interleaved reassembly and opt-in +semantics. Upgrade behavioural tests to `rstest-bdd` v0.5.0 and add scenarios +that prove the new policies. + +Go/no-go: + +- Go when new tests fail before implementation and pass after changes. +- No-go if policy cannot be expressed without broad unrelated API changes. + +### Stage D: documentation, roadmap completion, and hardening gates + +Update user-facing and design docs to match implemented behaviour and +composition order. Mark roadmap 9.2.1 checklist items as done. Run full +formatting, lint, and test gates. + +Go/no-go: + +- Go when docs and code behaviour match and all quality gates pass. +- No-go if any gate fails; fix root causes before finalizing. + +## Concrete steps + +1. Add/adjust fragmentation adapter API surface: + `src/fragment/adapter.rs` (new), `src/fragment/mod.rs`, `src/lib.rs`. + +2. Refactor connection-facing adapter implementation: + `src/app/fragmentation_state.rs`, `src/app/frame_handling/core.rs`, + `src/app/frame_handling/reassembly.rs`, + `src/app/frame_handling/response.rs`, `src/app/connection.rs`. + +3. Enforce opt-in builder behaviour: + `src/app/builder_defaults.rs`, `src/app/builder/core.rs`, + `src/app/builder/codec.rs`, `src/app/builder/config.rs`, plus affected + doctests and call sites. + +4. Implement policy-specific fragment logic and tests: + `src/fragment/series.rs`, `src/fragment/reassembler.rs`, + `src/fragment/error.rs`, `src/fragment/tests.rs`. + +5. Expand integration and behavioural suites: + `tests/fragment_transport.rs`, `tests/fragment_transport/rejection.rs`, + `tests/fragment_transport/eviction.rs`, `tests/common/fragment_helpers.rs`, + `tests/features/fragment.feature`, `tests/steps/fragment_steps.rs`, + `tests/scenarios/fragment_scenarios.rs`, `tests/fixtures/fragment/mod.rs`, + `tests/fixtures/fragment/reassembly.rs`. + +6. Upgrade behavioural-test dependencies to v0.5.0 and adapt harness if needed: + `Cargo.toml`, `Cargo.lock`, and any `rstest-bdd` API call sites. + +7. Update documentation and roadmap: + `docs/generic-message-fragmentation-and-re-assembly-design.md`, + `docs/multi-packet-and-streaming-responses-design.md`, + `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability- maturity.md`, + `docs/hardening-wireframe-a-guide-to-production-resilience.md`, + `docs/users-guide.md`, and `docs/roadmap.md`. + +8. Run validation commands from repository root, capturing full logs: + + set -o pipefail + timeout 300 make fmt 2>&1 | tee /tmp/wireframe-fmt.log + + set -o pipefail + timeout 300 make markdownlint 2>&1 | tee /tmp/wireframe-markdownlint.log + + set -o pipefail + timeout 300 make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log + + set -o pipefail + timeout 300 make lint 2>&1 | tee /tmp/wireframe-lint.log + + set -o pipefail + timeout 300 make test-bdd 2>&1 | tee /tmp/wireframe-test-bdd.log + + set -o pipefail + timeout 300 make test 2>&1 | tee /tmp/wireframe-test.log + + If Mermaid diagrams are edited, also run: + + set -o pipefail + timeout 300 make nixie 2>&1 | tee /tmp/wireframe-nixie.log + +## Validation and acceptance + +Acceptance is complete when all statements below are true: + +- Default app behaviour: creating `WireframeApp::new()` without explicit + fragmentation configuration does not fragment traffic. +- Opt-in behaviour: explicit builder configuration enables fragmentation and + preserves round-trip correctness for large payloads. +- Purge API: callers can invoke a documented public purge method on the + adapter/reassembly path and observe stale state eviction. +- Duplicate/out-of-order policy: duplicate fragments are handled per documented + suppression policy; out-of-order fragments trigger deterministic rejection + and cleanup. +- Edge cases: zero-length fragments and fragment index overflow semantics are + defined and covered by tests. +- Interleaving: integration tests verify interleaved fragment streams + reassemble correctly. +- Behavioural coverage: `rstest-bdd` scenarios validate fragment-series policy + and any new public behaviour, running under v0.5.0 dependencies. +- Documentation parity: users guide and design docs describe the same behaviour + that tests verify. +- Roadmap update: `docs/roadmap.md` item 9.2.1 and all requested sub-items are + checked as done. + +## Idempotence and recovery + +All changes are additive or local refactors and can be safely reapplied. If any +stage fails: + +- revert only the files touched in that stage; +- keep prior completed stages intact; +- rerun the stage-local tests first, then full gates. + +If dependency upgrade causes widespread unrelated breakage, pin the updated +version in branch commits, fix compatibility incrementally, and only then +resume feature-specific edits. + +## Artifacts and notes + +Populate this section during implementation with log paths and short evidence +snippets, for example: + +- `/tmp/wireframe-fmt.log` +- `/tmp/wireframe-markdownlint.log` +- `/tmp/wireframe-check-fmt.log` +- `/tmp/wireframe-lint.log` +- `/tmp/wireframe-test-bdd.log` +- `/tmp/wireframe-test.log` + +## Interfaces and dependencies + +Target public interface shape (exact naming may be adjusted during Stage A): + + pub trait FragmentAdapter: Send + Sync { + type Error: std::error::Error + Send + Sync + 'static; + + fn fragment(&self, packet: E) + -> Result, crate::fragment::FragmentationError>; + fn reassemble(&mut self, packet: E) -> Result, Self::Error>; + fn purge_expired(&mut self) -> Vec; + } + +Target behavioural dependency changes: + +- `rstest-bdd = "0.5.0"` +- `rstest-bdd-macros = { version = "0.5.0", ... }` + +These remain `dev-dependencies` only. + +## Revision note (2026-02-12) + +Initial draft created for roadmap item 9.2.1 with explicit constraints, +tolerances, staged implementation flow, and required testing/documentation +updates. From e08f28f42d39cdf55f0a7e0ca32003520a50e7da Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 12 Feb 2026 09:35:35 +0000 Subject: [PATCH 2/7] feat(fragmentation): make transport fragmentation opt-in and introduce FragmentAdapter API - Introduce a public `FragmentAdapter` trait and `DefaultFragmentAdapter` implementation to unify fragmentation and reassembly behavior. - Shift fragmentation from default-enabled to explicit opt-in via `enable_fragmentation()` or `fragmentation(Some(cfg))` on `WireframeApp` builder. - Make `with_codec()` clear fragmentation config to require reconfiguration aligned with codec frame budget. - Add explicit duplicate fragment suppression (non-fatal) and preserve out-of-order rejection with deterministic cleanup. - Expose caller-driven purge API via `FragmentAdapter::purge_expired()` for manual timeout eviction control. - Update related docs, design ADRs, and user guides to reflect new fragmentation control and policies. - Add comprehensive unit, integration, and behavioral tests verifying opt-in semantics, duplicate suppression, zero-length fragment support, interleaved reassembly, and purge API usage. This change improves resource control, clarity, and flexibility of transport fragmentation handling, preventing unintended overhead and enabling tailored fragment handling strategies. Co-authored-by: devboxerhub[bot] --- Cargo.lock | 16 +- Cargo.toml | 4 +- docs/adr-004-pluggable-protocol-codecs.md | 3 +- .../execplans/9-1-3-fragment-adapter-trait.md | 83 +++++++--- ...ge-fragmentation-and-re-assembly-design.md | 64 +++++--- ...eframe-a-guide-to-production-resilience.md | 13 +- ...i-packet-and-streaming-responses-design.md | 2 + docs/roadmap.md | 14 +- docs/rstest-bdd-users-guide.md | 6 +- ...-set-philosophy-and-capability-maturity.md | 3 + docs/users-guide.md | 45 +++--- src/app/builder/codec.rs | 10 +- src/app/builder/config.rs | 13 +- src/app/builder/core.rs | 35 +++- src/app/fragmentation_state.rs | 83 +--------- src/fragment/adapter.rs | 149 ++++++++++++++++++ src/fragment/error.rs | 2 + src/fragment/mod.rs | 2 + src/fragment/reassembler.rs | 2 + src/fragment/series.rs | 16 +- src/fragment/tests.rs | 102 ++++++++++++ src/lib.rs | 3 + tests/features/fragment.feature | 27 ++++ tests/fragment_transport.rs | 144 ++++++++++++++++- tests/fragment_transport/mod.rs | 2 +- tests/fragment_transport/rejection.rs | 16 +- tests/scenarios/fragment_scenarios.rs | 21 +++ 27 files changed, 683 insertions(+), 197 deletions(-) create mode 100644 src/fragment/adapter.rs diff --git a/Cargo.lock b/Cargo.lock index 11a67702..a238a0bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1761,9 +1761,9 @@ dependencies = [ [[package]] name = "rstest-bdd" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e741d97bce6ea0a7d0f074716041e0d0eebe6ab9edcd243e7d12f9fd2b5e8b6" +checksum = "138d8e4f97e16906ebeb0bfb12d2c94f0b05913a96fc522111bec62ca8328522" dependencies = [ "ctor", "derive_more 0.99.20", @@ -1785,9 +1785,9 @@ dependencies = [ [[package]] name = "rstest-bdd-macros" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7e3b51c032a6174f82d87843a5e62cfd08ce4606005eafde07ed12561d73196" +checksum = "fe104196f61dc8911a8da1b10005e9401e7c2e14ad9302e2de6688311c0beec7" dependencies = [ "camino", "cap-std", @@ -1809,9 +1809,9 @@ dependencies = [ [[package]] name = "rstest-bdd-patterns" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75d730afd8727e5b18bd276ebf5ea4c881760138762d0cd7d415dc6b6662c6c6" +checksum = "39abaa69316cdbbad0512dc0f37b704b09e4cdb5018d80f197cbdb77a2269d06" dependencies = [ "gherkin", "regex", @@ -1820,9 +1820,9 @@ dependencies = [ [[package]] name = "rstest-bdd-policy" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d256efeb01f08e281cef9cd1e54dab33d8778e871090f5fa49b7a9a70f0caf81" +checksum = "88f2ca584f7d9d359a09f616900be015302c959d58c2c2da6c075573352dfa0d" [[package]] name = "rstest_macros" diff --git a/Cargo.toml b/Cargo.toml index 6b778522..9044cca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,8 +49,8 @@ socket2 = "0.6.0" [dev-dependencies] rstest = "0.26.1" -rstest-bdd = "0.4.0" -rstest-bdd-macros = { version = "0.4.0", features = ["strict-compile-time-validation"] } +rstest-bdd = "0.5.0" +rstest-bdd-macros = { version = "0.5.0", features = ["strict-compile-time-validation"] } wireframe = { path = ".", features = ["test-helpers"] } wireframe_testing = { path = "./wireframe_testing" } logtest = "2.0.0" diff --git a/docs/adr-004-pluggable-protocol-codecs.md b/docs/adr-004-pluggable-protocol-codecs.md index 9a7d8abe..59b06b6a 100644 --- a/docs/adr-004-pluggable-protocol-codecs.md +++ b/docs/adr-004-pluggable-protocol-codecs.md @@ -170,7 +170,8 @@ pub trait FrameCodec: Send + Sync + Clone + 'static { - Parameterize `FrameHandlingContext` and `ResponseContext` over the codec. - Replace `LengthDelimitedCodec` usage with `FrameCodec` decoder/encoder calls. -- Use `max_frame_length()` for buffer sizing and fragmentation defaults. +- Use `max_frame_length()` for buffer sizing and explicit fragmentation + configuration helpers (`enable_fragmentation`). ### Phase 4: Update `WireframeServer` diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md index 3be59eea..4e4444f7 100644 --- a/docs/execplans/9-1-3-fragment-adapter-trait.md +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -4,7 +4,7 @@ This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work proceeds. -Status: DRAFT +Status: COMPLETE No `PLANS.md` exists in this repository as of 2026-02-12. @@ -87,14 +87,21 @@ Success is observable when: ## Progress - [x] (2026-02-12 00:00Z) Draft ExecPlan for roadmap item 9.2.1. -- [ ] Finalize `FragmentAdapter` API contract and error taxonomy updates. -- [ ] Make fragmentation opt-in on `WireframeApp` builder defaults. -- [ ] Expose public purge API and wire it through adapter implementation. -- [ ] Implement duplicate suppression and out-of-order handling policy. -- [ ] Define zero-length fragment behaviour and index-overflow handling. -- [ ] Add/upgrade unit, integration, and behavioural tests. -- [ ] Update design docs, user guide, and roadmap checkboxes. -- [ ] Run formatting, lint, and full test gates. +- [x] (2026-02-12 00:25Z) Finalize `FragmentAdapter` API contract and error + taxonomy updates. +- [x] (2026-02-12 00:37Z) Make fragmentation opt-in on `WireframeApp` builder + defaults. +- [x] (2026-02-12 00:42Z) Expose public purge API and wire it through adapter + implementation. +- [x] (2026-02-12 01:03Z) Implement duplicate suppression and out-of-order + handling policy. +- [x] (2026-02-12 01:11Z) Define zero-length fragment behaviour and + index-overflow handling. +- [x] (2026-02-12 01:35Z) Add/upgrade unit, integration, and behavioural + tests. +- [x] (2026-02-12 02:00Z) Update design docs, user guide, and roadmap + checkboxes. +- [x] (2026-02-12 02:20Z) Run formatting, lint, and full test gates. ## Surprises & Discoveries @@ -110,6 +117,12 @@ Success is observable when: feature should follow `docs/rstest-bdd-users-guide.md` conventions and version updates. +- Observation: the first version of the interleaved transport integration test + assumed deterministic response ordering and intermittently timed out. + Evidence: `tests/fragment_transport.rs` failures during local `make test` + runs. Impact: the assertion strategy was changed to drain both responses and + compare an order-independent payload set, which matches scheduler reality. + ## Decision Log - Decision: introduce a public `FragmentAdapter` trait plus one default @@ -127,9 +140,40 @@ Success is observable when: true out-of-order delivery should fail deterministically and clear partial state. Date/Author: 2026-02-12 / Codex. +- Decision: make `with_codec(...)` clear fragmentation state and require a + fresh explicit `enable_fragmentation()` call. Rationale: fragmentation + settings depend on codec max-frame details; retaining old settings after + codec replacement can silently produce mismatched thresholds. Date/Author: + 2026-02-12 / Codex. + ## Outcomes & Retrospective -To be completed when implementation finishes. +Implemented outcomes: + +- Added a public `FragmentAdapter` trait and `DefaultFragmentAdapter` + implementation in `src/fragment/adapter.rs`, and exported them through + `src/fragment/mod.rs` and `src/lib.rs`. +- Shifted runtime integration to the new adapter contract while preserving app + call-site behaviour through the `src/app/fragmentation_state.rs` alias layer. +- Made fragmentation opt-in by default in `WireframeApp` builder construction + and introduced `enable_fragmentation()` for explicit activation. +- Added caller-driven purge access through the adapter API and documented purge + ownership in design and user docs. +- Defined duplicate suppression as non-fatal (`Duplicate`) and preserved + out-of-order rejection semantics with deterministic state cleanup. +- Added/updated coverage for opt-in defaults, interleaved reassembly, duplicate + suppression, out-of-order fragments, zero-length fragments, and index + overflow across unit, integration, and behavioural test suites. +- Updated roadmap 9.2.1 and companion design/user documents to reflect final + behaviour and composition order. + +Retrospective: + +- Separating duplicate from out-of-order outcomes reduced ambiguity in both code + and tests and made policy documentation straightforward. +- Explicit opt-in defaults prevent hidden transport costs for users that do not + need fragmentation, but this increases migration burden for existing builder + call sites; the new builder tests now guard this contract. ## Context and orientation @@ -326,15 +370,14 @@ resume feature-specific edits. ## Artifacts and notes -Populate this section during implementation with log paths and short evidence -snippets, for example: +Validation evidence logs: -- `/tmp/wireframe-fmt.log` -- `/tmp/wireframe-markdownlint.log` -- `/tmp/wireframe-check-fmt.log` -- `/tmp/wireframe-lint.log` -- `/tmp/wireframe-test-bdd.log` -- `/tmp/wireframe-test.log` +- `/tmp/wireframe-fmt.log` (`make fmt`) +- `/tmp/wireframe-markdownlint.log` (`make markdownlint`) +- `/tmp/wireframe-check-fmt.log` (`make check-fmt`) +- `/tmp/wireframe-lint.log` (`make lint`) +- `/tmp/wireframe-test-bdd.log` (`make test-bdd`) +- `/tmp/wireframe-test.log` (`make test`) ## Interfaces and dependencies @@ -361,3 +404,7 @@ These remain `dev-dependencies` only. Initial draft created for roadmap item 9.2.1 with explicit constraints, tolerances, staged implementation flow, and required testing/documentation updates. + +Completed update: plan status advanced to COMPLETE, required milestones were +checked off, outcomes and policy decisions were recorded, and artefacts were +captured for reproducibility. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 769ff22b..579c0901 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -40,9 +40,10 @@ The implementation must satisfy the following core requirements: ## 3. Core architecture: the `FragmentAdapter` -The feature will be implemented as a codec middleware called `FragmentAdapter`. -It is instantiated with a protocol-specific `FragmentStrategy` and wraps any -subsequent codecs in the chain. +The feature is exposed as a public `FragmentAdapter` trait plus a default +implementation (`DefaultFragmentAdapter`) that wraps `Fragmenter` and +`Reassembler`. Protocol-specific stacks can still layer additional strategy +codecs around this adaptor. ```plaintext Socket I/O ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers @@ -71,8 +72,15 @@ use dashmap::DashMap; use std::sync::atomic::AtomicU64; use std::time::{Duration, Instant}; -pub struct FragmentAdapter { - strategy: S, +pub trait FragmentAdapter { + fn fragment(&self, packet: E) -> Result, FragmentationError>; + fn reassemble(&mut self, packet: E) -> Result, FragmentAdapterError>; + fn purge_expired(&mut self) -> Vec; +} + +pub struct DefaultFragmentAdapter { + fragmenter: Fragmenter, + reassembler: Reassembler, /// Hard cap on the size of a reassembled logical message. max_message_size: usize, /// Timeout for completing a partial message reassembly. @@ -164,16 +172,21 @@ configuration requires updating this section alongside the code. The length-delimited codec therefore observes one frame per fragment; the encoded body is bounded by `fragment_payload_cap + fragment_overhead`. -`WireframeApp` and `ConnectionActor` enable this adaptor by default when a -frame budget is available. Defaults derive `fragment_payload_cap` from -`buffer_capacity`, cap reassembled messages at 16× that budget, and evict -partial assemblies after 30 seconds. +`WireframeApp` keeps fragmentation disabled by default and now requires +explicit configuration via `enable_fragmentation()` or +`fragmentation(Some(cfg))`. When enabled, defaults derive +`fragment_payload_cap` from `buffer_capacity`, cap reassembled messages at 16× +that budget, and use a 30-second expiry window. + +## 4. Public API: `FragmentAdapter` and `FragmentStrategy` -## 4. Public API: the `FragmentStrategy` trait +The `FragmentAdapter` contract defines lifecycle ownership for fragmentation +and reassembly, including purge control. `WireframeApp` drives purge scheduling +on read-timeout ticks, while external callers can drive eviction manually +through `FragmentAdapter::purge_expired`. -The power and flexibility of this feature come from the `FragmentStrategy` -trait. Protocol implementers will provide a type that implements this trait to -inject their specific fragmentation rules into the generic `FragmentAdapter`. +The `FragmentStrategy` trait remains the protocol-specific extension point for +parsing and encoding strategy headers when protocols need custom wire layouts. ### 4.1 Trait Definition @@ -306,9 +319,10 @@ against errors and attacks. - **Continuing Message (**`.get_mut()`**):** - - The `last_sequence` is checked to ensure fragments are monotonic. An - out-of-order fragment results in an error and the `PartialMessage` being - dropped. + - The `last_sequence` is checked to ensure fragments are monotonic. + Duplicate fragments (repeated indices) are suppressed, while + out-of-order fragments (indices ahead of the expected position) result + in an error and the `PartialMessage` being dropped. - The buffer's potential new size is checked against `max_message_size`. Exceeding this limit results in an error. @@ -319,11 +333,10 @@ against errors and attacks. extracted from the `PartialMessage`, the entry is removed from the map, then pass the complete logical frame down the codec chain. -4. **Timeout handling:** Run a background task within the - `FragmentAdapter` that periodically iterates over the re‑assembly buffers, - checks each `PartialMessage`’s `started_at` timestamp, and removes any entry - that has exceeded the re‑assembly timeout, emitting a `WARN`‑level `tracing` - event. +4. **Timeout handling:** Purge ownership is explicit. Callers invoke + `FragmentAdapter::purge_expired()` according to their scheduling policy. + `WireframeApp` invokes this on read-timeout ticks; protocol harnesses and + tests can invoke it directly to drive deterministic eviction. ### 5.2 Outbound path (fragmentation) @@ -401,6 +414,15 @@ This feature is designed as a foundational layer that other features build upon. drops the partial buffer when ordering breaks, enforces a configurable `max_message_size`, and exposes caller-driven timeout purging. This prevents abandoned assemblies from exhausting memory. +- Added an explicit duplicate-handling policy for active series: duplicate + fragment indices are suppressed and do not append payload bytes, while + out-of-order indices still fail reassembly and clear partial state. +- Changed app-level fragmentation configuration to explicit opt-in. Builders no + longer auto-enable fragmentation; callers must use `enable_fragmentation()` + or `fragmentation(Some(cfg))`. +- Assigned purge scheduling ownership to the adaptor caller via + `FragmentAdapter::purge_expired()`. `WireframeApp` drives this on timeout + ticks, and external callers can schedule purges directly. ## 9. Composition with streaming requests and MessageAssembler diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 2485b579..8d0960ed 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -312,12 +312,13 @@ fragmentation layer must be hardened. assemblies, which are purged if they are not completed within the time limit (e.g., 30 seconds). -`WireframeApp` enables these guards by default via -`default_fragmentation(buffer_capacity)`, which builds a `FragmentationConfig` -from the connection's `buffer_capacity`. The fragment payload cap matches the -usable frame budget, `max_message_size` defaults to 16× `buffer_capacity`, and -partial assemblies are purged after 30 seconds. Applications can override or -disable these defaults via `fragmentation(...)`. +`WireframeApp` keeps fragmentation disabled by default. Applications opt in via +`enable_fragmentation()` (codec-derived defaults) or `fragmentation(Some(cfg))` +(explicit values). Defaults derive `fragment_payload_cap` from +`buffer_capacity`, set `max_message_size` to 16× `buffer_capacity`, and use a +30-second timeout. Purge scheduling is caller-owned through +`FragmentAdapter::purge_expired()`; `WireframeApp` invokes it on read-timeout +ticks, while custom runtimes can drive eviction explicitly. ## 5. Advanced Resilience Patterns diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index 04617951..cb7c6329 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -427,6 +427,8 @@ not hang. `FragmentAdapter` will operate at a lower layer, transparently splitting any large frames yielded by the stream before they are written to the socket. The handler and streaming logic remain completely unaware of fragmentation. + Outbound order is serializer → fragmentation → codec framing; inbound order + is codec decode → fragment reassembly → deserialization. - **Streaming Request Bodies:** [ADR 0002][adr-0002] introduces first-class streaming request bodies as the inbound counterpart to streaming responses. diff --git a/docs/roadmap.md b/docs/roadmap.md index a0d641ec..f8516227 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -370,20 +370,20 @@ integration boundaries. ### 9.2. Fragment adaptor alignment -- [ ] 9.2.1. Introduce a `FragmentAdapter` trait as described in the +- [x] 9.2.1. Introduce a `FragmentAdapter` trait as described in the fragmentation design.[^fragmentation-design] Fragmentation behaviour must explicitly define duplicate handling, out-of-order policies, and ownership of purge scheduling. - - [ ] Make fragmentation opt-in by requiring explicit configuration on the + - [x] Make fragmentation opt-in by requiring explicit configuration on the `WireframeApp` builder. - - [ ] Expose a public purge API, so callers can drive timeout eviction. - - [ ] Document the composition order for codec, fragmentation, and + - [x] Expose a public purge API, so callers can drive timeout eviction. + - [x] Document the composition order for codec, fragmentation, and serialization layers. - - [ ] Define and implement duplicate suppression and out-of-order handling + - [x] Define and implement duplicate suppression and out-of-order handling for fragment series. - - [ ] Define and test zero-length fragment behaviour and fragment index + - [x] Define and test zero-length fragment behaviour and fragment index overflow handling. - - [ ] Add unit and integration tests for opt-in behaviour, interleaved + - [x] Add unit and integration tests for opt-in behaviour, interleaved reassembly, and duplicate and out-of-order fragments. ### 9.3. Unified codec handling diff --git a/docs/rstest-bdd-users-guide.md b/docs/rstest-bdd-users-guide.md index b2e0baed..74c8cf17 100644 --- a/docs/rstest-bdd-users-guide.md +++ b/docs/rstest-bdd-users-guide.md @@ -838,14 +838,14 @@ To enable validation, pin a feature in the project's `dev-dependencies`: ```toml [dev-dependencies] -rstest-bdd-macros = { version = "0.4.0", features = ["compile-time-validation"] } +rstest-bdd-macros = { version = "0.5.0", features = ["compile-time-validation"] } ``` For strict checking use: ```toml [dev-dependencies] -rstest-bdd-macros = { version = "0.4.0", features = ["strict-compile-time-validation"] } +rstest-bdd-macros = { version = "0.5.0", features = ["strict-compile-time-validation"] } ``` Steps are only validated when one of these features is enabled. @@ -1238,7 +1238,7 @@ Localization tooling can be added to `Cargo.toml` as follows: ```toml [dependencies] -rstest-bdd = "0.4.0" +rstest-bdd = "0.5.0" i18n-embed = { version = "0.16", features = ["fluent-system", "desktop-requester"] } unic-langid = "0.9" ``` diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index ee3be6ee..d6d56654 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -292,6 +292,9 @@ Rust's ownership model and `Drop` trait are the foundation of resource safety. Budgets are configured on the `WireframeApp` instance via a builder method (for example `memory_budgets(...)`). If budgets are not set explicitly, defaults are derived from `buffer_capacity`. + Transport fragmentation itself is opt-in on the builder (`enable_fragmentation()` + or `fragmentation(Some(cfg))`) so protocols that do not require + fragmentation keep zero additional buffering overhead. - **Back-pressure and Hard Caps:** When budgets are approached, Wireframe applies back-pressure by pausing further reads and assembly work (soft diff --git a/docs/users-guide.md b/docs/users-guide.md index 8874f372..d6d6acff 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -134,17 +134,17 @@ A codec implementation must: - Provide `correlation_id` when the protocol stores it outside the payload; Wireframe only uses this hook when the deserialized envelope is missing a correlation identifier. -- Report `max_frame_length`, which clamps inbound frames and seeds default - fragmentation limits. - -Install a custom codec with `with_codec`. The builder resets fragmentation to -the codec-derived defaults, so override fragmentation afterwards if the -protocol uses a different budget. Wireframe clones the codec per connection, so -stateful codecs should ensure `Clone` produces an independent state (for -example, reset sequence counters) when per-connection isolation is required. -When a framed stream is already available, use -`send_response_framed_with_codec`, so responses pass through -`FrameCodec::wrap_payload`. +- Report `max_frame_length`, which clamps inbound frames and determines the + budget used by `enable_fragmentation`. + +Install a custom codec with `with_codec`. The builder disables fragmentation +when codecs change, so explicitly call `enable_fragmentation()` (or +`fragmentation(Some(cfg))`) afterwards when transport fragmentation is +required. Wireframe clones the codec per connection, so stateful codecs should +ensure `Clone` produces an independent state (for example, reset sequence +counters) when per-connection isolation is required. When a framed stream is +already available, use `send_response_framed_with_codec`, so responses pass +through `FrameCodec::wrap_payload`. Assume `MyCodec` implements `FrameCodec`: @@ -827,14 +827,18 @@ async fn main() -> Result<(), SendError> { ## Message fragmentation -`WireframeApp` now fragments oversized payloads automatically. The builder -derives a `FragmentationConfig` from the active frame codec's maximum frame -length (the default length-delimited codec uses `buffer_capacity`): any payload -that will not fit into a single frame is split into fragments carrying a +`WireframeApp` keeps transport fragmentation disabled by default. Enable it +explicitly with `enable_fragmentation()` or provide a bespoke +`FragmentationConfig` using `fragmentation(Some(cfg))`. When enabled, payloads +that exceed the frame budget are split into fragments carrying a `FragmentHeader` (`message_id`, `fragment_index`, `is_last_fragment`) wrapped with the `FRAG` marker. The connection reassembles fragments before invoking handlers, so handlers continue to work with complete `Envelope` values.[^6] +Layering order is fixed. Outbound processing runs serializer → fragmentation → +codec wrapping. Inbound processing runs codec decode → fragment reassembly → +deserialization. + Fragmented messages enforce two guards: `max_message_size` caps the total reassembled payload, and `reassembly_timeout` evicts stale partial messages. Customize or disable fragmentation via the builder: @@ -854,15 +858,16 @@ let cfg = FragmentationConfig::for_frame_budget( ).expect("frame budget too small for fragments"); let app = WireframeApp::new()? + .enable_fragmentation() .fragmentation(Some(cfg)) .route(42, handler)?; ``` -Set `fragmentation(None)` when the transport already supports large frames, or -when fragmentation should be deferred to an upstream gateway. The -`ConnectionActor` mirrors the same behaviour for push traffic and streaming -responses through `enable_fragmentation`, ensuring client-visible frames follow -the same format. +Call `fragmentation(None)` to keep fragmentation disabled after explicit +configuration (for example, when the transport already supports large frames or +fragmentation is delegated to an upstream gateway). The `ConnectionActor` +mirrors the same behaviour for push traffic and streaming responses through +`enable_fragmentation`, ensuring client-visible frames follow the same format. ## Protocol hooks diff --git a/src/app/builder/codec.rs b/src/app/builder/codec.rs index 934c65dd..b967b1ae 100644 --- a/src/app/builder/codec.rs +++ b/src/app/builder/codec.rs @@ -2,7 +2,7 @@ use super::WireframeApp; use crate::{ - app::{Packet, builder_defaults::default_fragmentation}, + app::Packet, codec::{FrameCodec, LengthDelimitedFrameCodec, clamp_frame_length}, serializer::Serializer, }; @@ -17,17 +17,16 @@ where /// Replace the frame codec used for framing I/O. /// /// This resets any installed protocol hooks because the frame type may - /// change across codecs. Fragmentation configuration is reset to the - /// codec-derived default. + /// change across codecs. Fragmentation is disabled so callers can + /// reconfigure explicitly for the new frame budget. #[must_use] pub fn with_codec(mut self, codec: F2) -> WireframeApp where S: Default, { - let fragmentation = default_fragmentation(codec.max_frame_length()); let serializer = std::mem::take(&mut self.serializer); let message_assembler = self.message_assembler.take(); - self.rebuild_with_params(serializer, codec, None, fragmentation, message_assembler) + self.rebuild_with_params(serializer, codec, None, None, message_assembler) } /// Replace the serializer used for messages. @@ -63,7 +62,6 @@ where pub fn buffer_capacity(mut self, capacity: usize) -> Self { let capacity = clamp_frame_length(capacity); self.codec = LengthDelimitedFrameCodec::new(capacity); - self.fragmentation = default_fragmentation(capacity); self } } diff --git a/src/app/builder/config.rs b/src/app/builder/config.rs index dac658d9..28ba8aff 100644 --- a/src/app/builder/config.rs +++ b/src/app/builder/config.rs @@ -6,7 +6,7 @@ use super::WireframeApp; use crate::{ app::{ Packet, - builder_defaults::{MAX_READ_TIMEOUT_MS, MIN_READ_TIMEOUT_MS}, + builder_defaults::{MAX_READ_TIMEOUT_MS, MIN_READ_TIMEOUT_MS, default_fragmentation}, }, codec::FrameCodec, fragment::FragmentationConfig, @@ -37,6 +37,17 @@ where self } + /// Enable transport fragmentation using codec-derived defaults. + /// + /// The derived settings are bounded by the current frame codec budget. + /// Call this after `with_codec` or `buffer_capacity` so defaults align with + /// the final frame length. + #[must_use] + pub fn enable_fragmentation(mut self) -> Self { + self.fragmentation = default_fragmentation(self.codec.max_frame_length()); + self + } + /// Configure a Dead Letter Queue for dropped push frames. /// /// ```rust,no_run diff --git a/src/app/builder/core.rs b/src/app/builder/core.rs index 8ebc6795..c80523cc 100644 --- a/src/app/builder/core.rs +++ b/src/app/builder/core.rs @@ -10,7 +10,7 @@ use tokio::sync::{OnceCell, mpsc}; use crate::{ app::{ - builder_defaults::{DEFAULT_READ_TIMEOUT_MS, default_fragmentation}, + builder_defaults::DEFAULT_READ_TIMEOUT_MS, envelope::{Envelope, Packet}, error::Result, lifecycle::{ConnectionSetup, ConnectionTeardown}, @@ -61,7 +61,6 @@ where /// default serializer and no lifecycle hooks. fn default() -> Self { let codec = F::default(); - let max_frame_length = codec.max_frame_length(); Self { handlers: HashMap::new(), routes: OnceCell::new(), @@ -74,7 +73,7 @@ where push_dlq: None, codec, read_timeout_ms: DEFAULT_READ_TIMEOUT_MS, - fragmentation: default_fragmentation(max_frame_length), + fragmentation: None, message_assembler: None, } } @@ -160,3 +159,33 @@ where } } } + +#[cfg(test)] +mod tests { + use super::WireframeApp; + use crate::{app::Envelope, codec::LengthDelimitedFrameCodec, serializer::BincodeSerializer}; + + #[test] + fn builder_defaults_fragmentation_to_disabled() { + let app = WireframeApp::::new() + .expect("build app"); + assert!(app.fragmentation.is_none()); + } + + #[test] + fn enable_fragmentation_requires_explicit_opt_in() { + let app = WireframeApp::::new() + .expect("build app") + .enable_fragmentation(); + assert!(app.fragmentation.is_some()); + } + + #[test] + fn with_codec_clears_fragmentation_to_require_reconfiguration() { + let app = WireframeApp::::new() + .expect("build app") + .enable_fragmentation() + .with_codec(LengthDelimitedFrameCodec::new(2048)); + assert!(app.fragmentation.is_none()); + } +} diff --git a/src/app/fragmentation_state.rs b/src/app/fragmentation_state.rs index fb15779d..83c07e84 100644 --- a/src/app/fragmentation_state.rs +++ b/src/app/fragmentation_state.rs @@ -1,80 +1,7 @@ -//! Connection-scoped helpers for outbound fragmentation and inbound reassembly. -//! -//! This module encapsulates the fragmentation state used by `ConnectionActor` -//! and `WireframeApp` to keep the main connection module concise. +//! Connection-scoped fragmentation type aliases used by app frame handling. -use bincode::error::DecodeError; -use thiserror::Error; +/// Concrete adapter state used by `WireframeApp` connection processing. +pub(crate) type FragmentationState = crate::fragment::DefaultFragmentAdapter; -use super::{Packet, PacketParts}; -use crate::fragment::{ - Fragmentable, - FragmentationError, - Fragmenter, - MessageId, - Reassembler, - ReassemblyError, - decode_fragment_payload, -}; - -/// Bundles outbound fragmentation and inbound reassembly state for a connection. -pub(crate) struct FragmentationState { - fragmenter: Fragmenter, - reassembler: Reassembler, -} - -/// Decode or reassembly failures encountered while handling fragments. -#[derive(Debug, Error)] -pub(crate) enum FragmentProcessError { - #[error("decode error: {0}")] - Decode(DecodeError), - #[error("reassembly error: {0}")] - Reassembly(ReassemblyError), -} - -impl FragmentationState { - pub(crate) fn new(config: crate::fragment::FragmentationConfig) -> Self { - Self { - fragmenter: Fragmenter::new(config.fragment_payload_cap), - reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout), - } - } - - pub(crate) fn fragment( - &self, - packet: E, - ) -> Result, FragmentationError> { - crate::fragment::fragment_packet(&self.fragmenter, packet) - } - - pub(crate) fn reassemble( - &mut self, - packet: E, - ) -> Result, FragmentProcessError> { - let parts = packet.into_parts(); - let id = parts.id(); - let correlation = parts.correlation_id(); - let payload = parts.into_payload(); - - match decode_fragment_payload(&payload) { - Ok(Some((header, fragment_payload))) => { - match self.reassembler.push(header, fragment_payload) { - Ok(Some(message)) => { - let rebuilt = PacketParts::new(id, correlation, message.into_payload()); - Ok(Some(E::from_parts(rebuilt))) - } - Ok(None) => Ok(None), - Err(err) => Err(FragmentProcessError::Reassembly(err)), - } - } - Ok(None) => Ok(Some(E::from_parts(PacketParts::new( - id, - correlation, - payload, - )))), - Err(err) => Err(FragmentProcessError::Decode(err)), - } - } - - pub(crate) fn purge_expired(&mut self) -> Vec { self.reassembler.purge_expired() } -} +/// Decode and reassembly errors surfaced by adapter processing. +pub(crate) type FragmentProcessError = crate::fragment::FragmentAdapterError; diff --git a/src/fragment/adapter.rs b/src/fragment/adapter.rs new file mode 100644 index 00000000..b25fb4f1 --- /dev/null +++ b/src/fragment/adapter.rs @@ -0,0 +1,149 @@ +//! Public adapter contract for transport-level fragmentation and reassembly. +//! +//! [`FragmentAdapter`] captures the minimal behaviour required to split outbound +//! packets into transport fragments, rebuild inbound fragments into full +//! packets, and purge stale partial assemblies. + +use bincode::error::DecodeError; +use thiserror::Error; + +use super::{ + Fragmentable, + FragmentationConfig, + FragmentationError, + Fragmenter, + MessageId, + Reassembler, + ReassemblyError, + decode_fragment_payload, + fragment_packet, + packet::FragmentParts, +}; + +/// Error returned by [`FragmentAdapter::reassemble`]. +#[derive(Debug, Error)] +pub enum FragmentAdapterError { + /// Fragment payload marker/header decoding failed. + #[error("decode error: {0}")] + Decode(DecodeError), + /// Reassembly ordering or size validation failed. + #[error("reassembly error: {0}")] + Reassembly(ReassemblyError), +} + +/// Adapter contract for transport-level fragmentation and reassembly. +pub trait FragmentAdapter: Send + Sync { + /// Attempt to fragment a packet for outbound transport. + /// + /// # Errors + /// + /// Returns [`FragmentationError`] when payload chunking or header encoding + /// fails. + fn fragment(&self, packet: E) -> Result, FragmentationError>; + + /// Attempt to reassemble an inbound packet. + /// + /// Returns `Ok(Some(packet))` when a complete packet is available, + /// `Ok(None)` when more fragments are required, and an error when decoding + /// or reassembly invariants fail. + /// + /// # Errors + /// + /// Returns [`FragmentAdapterError`] when fragment decoding fails or when + /// ordering and size guarantees are violated. + fn reassemble(&mut self, packet: E) -> Result, FragmentAdapterError>; + + /// Purge stale partial reassembly state. + /// + /// Returns the identifiers that were evicted. + fn purge_expired(&mut self) -> Vec; +} + +/// Default adapter backed by [`Fragmenter`] and [`Reassembler`]. +#[derive(Debug)] +pub struct DefaultFragmentAdapter { + fragmenter: Fragmenter, + reassembler: Reassembler, +} + +impl DefaultFragmentAdapter { + /// Create a default adapter from fragmentation configuration. + #[must_use] + pub fn new(config: FragmentationConfig) -> Self { + Self { + fragmenter: Fragmenter::new(config.fragment_payload_cap), + reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout), + } + } + + fn fragment_inner(&self, packet: E) -> Result, FragmentationError> { + fragment_packet(&self.fragmenter, packet) + } + + fn reassemble_inner( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError> { + let parts = packet.into_fragment_parts(); + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.into_payload(); + + match decode_fragment_payload(&payload) { + Ok(Some((header, fragment_payload))) => { + match self.reassembler.push(header, fragment_payload) { + Ok(Some(message)) => { + let rebuilt = + FragmentParts::new(id, correlation_id, message.into_payload()); + Ok(Some(E::from_fragment_parts(rebuilt))) + } + Ok(None) => Ok(None), + Err(err) => Err(FragmentAdapterError::Reassembly(err)), + } + } + Ok(None) => { + let passthrough = FragmentParts::new(id, correlation_id, payload); + Ok(Some(E::from_fragment_parts(passthrough))) + } + Err(err) => Err(FragmentAdapterError::Decode(err)), + } + } + + fn purge_expired_inner(&mut self) -> Vec { self.reassembler.purge_expired() } + + /// Fragment outbound packet data. + /// + /// # Errors + /// + /// Returns [`FragmentationError`] when fragment emission fails. + pub fn fragment(&self, packet: E) -> Result, FragmentationError> { + self.fragment_inner(packet) + } + + /// Reassemble inbound packet data. + /// + /// # Errors + /// + /// Returns [`FragmentAdapterError`] when decoding or reassembly fails. + pub fn reassemble( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError> { + self.reassemble_inner(packet) + } + + /// Purge stale reassembly entries and return evicted identifiers. + pub fn purge_expired(&mut self) -> Vec { self.purge_expired_inner() } +} + +impl FragmentAdapter for DefaultFragmentAdapter { + fn fragment(&self, packet: E) -> Result, FragmentationError> { + self.fragment_inner(packet) + } + + fn reassemble(&mut self, packet: E) -> Result, FragmentAdapterError> { + self.reassemble_inner(packet) + } + + fn purge_expired(&mut self) -> Vec { self.purge_expired_inner() } +} diff --git a/src/fragment/error.rs b/src/fragment/error.rs index 55d0a4d8..122eec59 100644 --- a/src/fragment/error.rs +++ b/src/fragment/error.rs @@ -16,6 +16,8 @@ use super::{FragmentIndex, MessageId}; pub enum FragmentStatus { /// The logical message still expects more fragments. Incomplete, + /// The fragment duplicated data that was already accepted and was ignored. + Duplicate, /// The fragment completed the logical message. Complete, } diff --git a/src/fragment/mod.rs b/src/fragment/mod.rs index 881f3061..87a424d4 100644 --- a/src/fragment/mod.rs +++ b/src/fragment/mod.rs @@ -5,6 +5,7 @@ //! code small and easy to audit while still providing a cohesive API at the //! crate root. +pub mod adapter; pub mod config; pub mod error; pub mod fragmenter; @@ -16,6 +17,7 @@ pub mod payload; pub mod reassembler; pub mod series; +pub use adapter::{DefaultFragmentAdapter, FragmentAdapter, FragmentAdapterError}; pub use config::FragmentationConfig; pub use error::{FragmentError, FragmentStatus, FragmentationError, ReassemblyError}; pub use fragmenter::{FragmentBatch, FragmentFrame, Fragmenter}; diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs index 1162114b..b4304588 100644 --- a/src/fragment/reassembler.rs +++ b/src/fragment/reassembler.rs @@ -157,6 +157,7 @@ impl Reassembler { payload, false, ), + Ok(FragmentStatus::Duplicate) => Ok(None), Ok(FragmentStatus::Complete) => Self::append_and_maybe_complete( self.max_message_size, occupied, @@ -180,6 +181,7 @@ impl Reassembler { vacant.insert(PartialMessage::new(series, payload, now)); Ok(None) } + FragmentStatus::Duplicate => Ok(None), FragmentStatus::Complete => Ok(Some(ReassembledMessage::new( header.message_id(), payload.to_vec(), diff --git a/src/fragment/series.rs b/src/fragment/series.rs index f370b440..a358027a 100644 --- a/src/fragment/series.rs +++ b/src/fragment/series.rs @@ -60,9 +60,13 @@ impl FragmentSeries { /// /// Returns [`FragmentError::MessageMismatch`] when the fragment belongs to /// a different message, [`FragmentError::IndexMismatch`] when the fragment - /// arrives out of order, [`FragmentError::SeriesComplete`] when the series - /// already consumed a final fragment, and [`FragmentError::IndexOverflow`] - /// when the fragment index cannot advance further. + /// arrives ahead of the expected index, [`FragmentError::SeriesComplete`] + /// when the series already consumed a final fragment, and + /// [`FragmentError::IndexOverflow`] when the fragment index cannot advance + /// further. + /// + /// When a fragment repeats an already accepted index, this method returns + /// [`FragmentStatus::Duplicate`] and leaves the series position unchanged. pub fn accept(&mut self, fragment: FragmentHeader) -> Result { if fragment.message_id() != self.message_id { return Err(FragmentError::MessageMismatch { @@ -75,7 +79,11 @@ impl FragmentSeries { return Err(FragmentError::SeriesComplete); } - if fragment.fragment_index() != self.next_index { + if fragment.fragment_index() < self.next_index { + return Ok(FragmentStatus::Duplicate); + } + + if fragment.fragment_index() > self.next_index { return Err(FragmentError::IndexMismatch { expected: self.next_index, found: fragment.fragment_index(), diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index 1338aff1..c89dc434 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -74,6 +74,18 @@ fn series_rejects_out_of_order_fragment() { assert!(matches!(err, FragmentError::IndexMismatch { .. })); } +#[test] +fn series_suppresses_duplicate_fragment() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let first = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); + let duplicate = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(1), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); + assert_eq!(series.accept(duplicate), Ok(FragmentStatus::Duplicate)); + assert_eq!(series.accept(final_fragment), Ok(FragmentStatus::Complete)); +} + #[test] fn series_rejects_after_completion() { let mut series = FragmentSeries::new(MessageId::new(1)); @@ -150,6 +162,27 @@ fn fragmenter_handles_empty_payload() { #[derive(Debug, Encode, BorrowDecode)] struct DummyMessage(Vec); +#[derive(Clone, Debug, PartialEq, Eq)] +struct TestPacket { + id: u32, + correlation_id: Option, + payload: Vec, +} + +impl Fragmentable for TestPacket { + fn into_fragment_parts(self) -> FragmentParts { + FragmentParts::new(self.id, self.correlation_id, self.payload) + } + + fn from_fragment_parts(parts: FragmentParts) -> Self { + Self { + id: parts.id(), + correlation_id: parts.correlation_id(), + payload: parts.into_payload(), + } + } +} + fn assert_fragment(batch: &FragmentBatch, index: usize, payload: &[u8], is_last: bool) { let fragment = batch .fragments() @@ -232,6 +265,31 @@ fn fragmenter_returns_error_for_out_of_bounds_slice() { } } +#[test] +fn default_fragment_adapter_exposes_purge_api() { + let config = FragmentationConfig { + fragment_payload_cap: NonZeroUsize::new(8).expect("non-zero"), + max_message_size: NonZeroUsize::new(16).expect("non-zero"), + reassembly_timeout: Duration::ZERO, + }; + let mut adapter = DefaultFragmentAdapter::new(config); + let header = FragmentHeader::new(MessageId::new(81), FragmentIndex::zero(), false); + let encoded_payload = encode_fragment_payload(header, &[1_u8, 2]).expect("encode fragment"); + let packet = TestPacket { + id: 42, + correlation_id: Some(7), + payload: encoded_payload, + }; + + assert!( + adapter + .reassemble(packet) + .expect("adapter should accept first fragment") + .is_none() + ); + assert_eq!(adapter.purge_expired(), vec![MessageId::new(81)]); +} + #[test] fn reassembler_allows_single_fragment_at_max_message_size() { let max_message_size = NonZeroUsize::new(16).expect("non-zero"); @@ -325,6 +383,50 @@ fn reassembler_rejects_out_of_order_and_drops_partial() { assert_eq!(reassembler.buffered_len(), 0); } +#[test] +fn reassembler_suppresses_duplicate_fragment() { + let mut reassembler = setup_reassembler_with_first_fragment(31, [1_u8, 2]); + let duplicate = FragmentHeader::new(MessageId::new(31), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(31), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(duplicate, [9_u8, 9]) + .expect("duplicate fragment should be suppressed") + .is_none() + ); + assert_eq!(reassembler.buffered_len(), 1); + + let complete = reassembler + .push(final_fragment, [3_u8]) + .expect("final fragment should complete message") + .expect("message should be complete"); + assert_eq!(complete.payload(), &[1, 2, 3]); +} + +#[test] +fn reassembler_accepts_zero_length_fragments() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(10), + ); + let first = FragmentHeader::new(MessageId::new(44), FragmentIndex::zero(), false); + let second = FragmentHeader::new(MessageId::new(44), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, []) + .expect("empty fragment should be accepted") + .is_none() + ); + + let complete = reassembler + .push(second, [7_u8, 8]) + .expect("final fragment should complete message") + .expect("message should be complete"); + assert_eq!(complete.payload(), &[7, 8]); +} + #[test] fn reassembler_enforces_maximum_payload_size() { let mut reassembler = Reassembler::new( diff --git a/src/lib.rs b/src/lib.rs index a6e8d431..f07ebea0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,10 @@ pub use client::{ClientCodecConfig, ClientError, SocketOptions, WireframeClient} pub use connection::ConnectionActor; pub use correlation::CorrelatableFrame; pub use fragment::{ + DefaultFragmentAdapter, FRAGMENT_MAGIC, + FragmentAdapter, + FragmentAdapterError, FragmentBatch, FragmentError, FragmentFrame, diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index aa7df5f2..2620b812 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -75,3 +75,30 @@ Feature: Fragment metadata enforcement When fragment 1 for message 24 with 2 bytes arrives marked non-final Then the reassembler reports an out-of-order fragment error And the reassembler is buffering 0 messages + + Scenario: Reassembler suppresses duplicate fragments + Given a reassembler allowing 8 bytes with a 30-second reassembly timeout + When fragment 0 for message 25 with 2 bytes arrives marked non-final + And fragment 0 for message 25 with 2 bytes arrives marked non-final + Then no message has been reassembled yet + And the reassembler is buffering 1 messages + When fragment 1 for message 25 with 1 bytes arrives marked final + Then the reassembler outputs a payload of 3 bytes + And the reassembler is buffering 0 messages + + Scenario: Reassembler handles zero-length fragments + Given a reassembler allowing 8 bytes with a 30-second reassembly timeout + When fragment 0 for message 26 with 0 bytes arrives marked final + Then the reassembler outputs a payload of 0 bytes + And the reassembler is buffering 0 messages + + Scenario: Reassembler rebuilds interleaved messages + Given a reassembler allowing 12 bytes with a 30-second reassembly timeout + When fragment 0 for message 27 with 3 bytes arrives marked non-final + And fragment 0 for message 28 with 4 bytes arrives marked non-final + And fragment 1 for message 27 with 2 bytes arrives marked final + Then the reassembler outputs a payload of 5 bytes + And the reassembler is buffering 1 messages + When fragment 1 for message 28 with 1 bytes arrives marked final + Then the reassembler outputs a payload of 5 bytes + And the reassembler is buffering 0 messages diff --git a/tests/fragment_transport.rs b/tests/fragment_transport.rs index 380b363f..06b985fb 100644 --- a/tests/fragment_transport.rs +++ b/tests/fragment_transport.rs @@ -8,12 +8,12 @@ use std::time::Duration; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use tokio::{io::AsyncWriteExt, sync::mpsc, time::timeout}; use wireframe::{ Serializer, - app::{Envelope, WireframeApp}, - fragment::decode_fragment_payload, + app::{Envelope, Packet, WireframeApp}, + fragment::{Fragmenter, decode_fragment_payload}, serializer::BincodeSerializer, }; @@ -30,6 +30,7 @@ use crate::fragment_helpers::{ TestResult, assert_handler_observed, build_envelopes, + fragment_envelope, fragmentation_config, make_app, make_handler, @@ -147,3 +148,140 @@ async fn fragmentation_can_be_disabled_via_public_api() -> TestResult { Ok(()) } + +#[tokio::test] +async fn fragmentation_is_opt_in_by_default() -> TestResult { + let capacity = 512; + let payload = vec![b'd'; 128]; + let (tx, mut rx) = mpsc::unbounded_channel(); + let handler = make_handler(&tx); + + let app: WireframeApp = WireframeApp::new()? + .buffer_capacity(capacity) + .route(ROUTE_ID, handler)?; + let (mut client, server) = spawn_app(app); + + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + let serializer = BincodeSerializer; + let bytes = serializer.serialize(&request)?; + client.send(bytes.into()).await?; + client.get_mut().shutdown().await?; + + assert_handler_observed(&mut rx, &payload).await?; + let response_bytes = timeout(Duration::from_secs(1), client.next()) + .await? + .ok_or(TestError::Setup("response frame missing"))??; + let (response_env, _) = serializer.deserialize::(&response_bytes)?; + let response_payload = response_env.into_parts().into_payload(); + if decode_fragment_payload(&response_payload)?.is_some() { + return Err(TestError::Assertion( + "default app should keep fragmentation disabled".to_string(), + )); + } + + server.await??; + + Ok(()) +} + +#[tokio::test] +async fn duplicate_fragment_is_suppressed_and_reassembles() -> TestResult { + let buffer_capacity = 512; + let config = fragmentation_config(buffer_capacity)?; + let (tx, mut rx) = mpsc::unbounded_channel(); + let app = make_app(buffer_capacity, config, &tx)?; + let (mut client, server) = spawn_app(app); + let fragmenter = Fragmenter::new(config.fragment_payload_cap); + + let payload = vec![b'D'; 800]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + let mut fragments = fragment_envelope(&request, &fragmenter)?; + let duplicate = fragments + .first() + .cloned() + .ok_or(TestError::Setup("fragmenter produced no fragments"))?; + fragments.insert(1, duplicate); + + send_envelopes(&mut client, &fragments).await?; + client.flush().await?; + client.get_mut().shutdown().await?; + + assert_handler_observed(&mut rx, &payload).await?; + if let Ok(Some(_)) = timeout(Duration::from_millis(200), rx.recv()).await { + return Err(TestError::Assertion( + "duplicate suppression should prevent duplicate handler delivery".to_string(), + )); + } + + let response = read_reassembled_response(&mut client, &config).await?; + if response != payload { + return Err(TestError::Assertion( + "reassembled payload mismatch after duplicate suppression".to_string(), + )); + } + + server.await??; + Ok(()) +} + +#[tokio::test] +async fn interleaved_fragment_streams_reassemble_independently() -> TestResult { + let buffer_capacity = 512; + let config = fragmentation_config(buffer_capacity)?; + let (tx, mut rx) = mpsc::unbounded_channel(); + let app = make_app(buffer_capacity, config, &tx)?; + let (mut client, server) = spawn_app(app); + let fragmenter = Fragmenter::new(config.fragment_payload_cap); + + let payload_a = vec![b'A'; 800]; + let payload_b = vec![b'B'; 900]; + let request_a = Envelope::new(ROUTE_ID, Some(101), payload_a.clone()); + let request_b = Envelope::new(ROUTE_ID, Some(202), payload_b.clone()); + let fragments_a = fragment_envelope(&request_a, &fragmenter)?; + let fragments_b = fragment_envelope(&request_b, &fragmenter)?; + + let mut interleaved = Vec::with_capacity(fragments_a.len() + fragments_b.len()); + let mut idx = 0usize; + while idx < fragments_a.len() || idx < fragments_b.len() { + if let Some(fragment) = fragments_a.get(idx) { + interleaved.push(fragment.clone()); + } + if let Some(fragment) = fragments_b.get(idx) { + interleaved.push(fragment.clone()); + } + idx += 1; + } + + send_envelopes(&mut client, &interleaved).await?; + client.flush().await?; + client.get_mut().shutdown().await?; + + let response_a = read_reassembled_response(&mut client, &config).await?; + let response_b = read_reassembled_response(&mut client, &config).await?; + let mut observed_responses = vec![response_a, response_b]; + observed_responses.sort(); + let mut expected_payloads = vec![payload_a.clone(), payload_b.clone()]; + expected_payloads.sort(); + if observed_responses != expected_payloads { + return Err(TestError::Assertion( + "interleaved responses were not reassembled correctly".to_string(), + )); + } + + let first = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("first handler payload missing"))?; + let second = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("second handler payload missing"))?; + let mut observed_requests = vec![first, second]; + observed_requests.sort(); + if observed_requests != expected_payloads { + return Err(TestError::Assertion( + "interleaved reassembly delivered unexpected handler payloads".to_string(), + )); + } + + server.await??; + Ok(()) +} diff --git a/tests/fragment_transport/mod.rs b/tests/fragment_transport/mod.rs index 106e639c..8c88c0d3 100644 --- a/tests/fragment_transport/mod.rs +++ b/tests/fragment_transport/mod.rs @@ -1,7 +1,7 @@ //! Module organization for fragment transport integration tests. //! //! Splits tests by concern: -//! - `rejection`: Tests for malformed, duplicate, and out-of-order fragments +//! - `rejection`: Tests for malformed and out-of-order fragments //! - `eviction`: Tests for reassembly timeout and eviction behaviour pub mod eviction; diff --git a/tests/fragment_transport/rejection.rs b/tests/fragment_transport/rejection.rs index 61b30036..a9548614 100644 --- a/tests/fragment_transport/rejection.rs +++ b/tests/fragment_transport/rejection.rs @@ -1,4 +1,4 @@ -//! Fragment rejection tests for malformed, out-of-order, and duplicate fragments. +//! Fragment rejection tests for malformed and out-of-order fragments. //! //! Verifies that the fragment transport layer correctly rejects invalid fragment //! sequences and prevents them from reaching the application handler. @@ -98,16 +98,6 @@ fn mutate_out_of_order(mut fragments: Vec) -> TestResult Ok(fragments) } -/// Mutate fragments by duplicating the first fragment. -fn mutate_duplicate(mut fragments: Vec) -> TestResult> { - let duplicate = fragments - .first() - .cloned() - .ok_or(TestError::Setup("fragmenter produced no fragments"))?; - fragments.insert(1, duplicate); - Ok(fragments) -} - /// Mutate fragments by truncating the header of the first fragment. #[expect( clippy::panic_in_result_fn, @@ -152,10 +142,6 @@ fn mutate_malformed_header(mut fragments: Vec) -> TestResult Date: Thu, 12 Feb 2026 18:39:13 +0000 Subject: [PATCH 3/7] feat(fragment): refactor FragmentAdapter trait to use generic fragment types - Updated FragmentAdapter trait to remove generic parameter on trait and instead use generic methods for fragment and reassemble. - Adjusted DefaultFragmentAdapter implementation accordingly. - Improved error handling with #[from] attributes on FragmentAdapterError variants. - Revised related documentation to use "adapter" instead of "adaptor" and align terminology. - Split large fragment tests module into focused submodules for clarity and maintainability. - Added new well-structured unit tests for fragment adapter, fragmenter, header series, and reassembler. - Updated builder and codec modules to require explicit opt-in for fragmentation when frame budgets or codec change. This refactor improves the fragment adapter API ergonomics and type safety, and enhances test coverage and documentation clarity. Co-authored-by: devboxerhub[bot] --- .../execplans/9-1-3-fragment-adapter-trait.md | 21 +- ...ge-fragmentation-and-re-assembly-design.md | 65 +-- docs/users-guide.md | 20 +- src/app/builder/codec.rs | 5 + src/app/builder/core.rs | 42 +- src/fragment/adapter.rs | 83 ++- src/fragment/reassembler.rs | 9 +- src/fragment/series.rs | 10 +- src/fragment/tests.rs | 510 +----------------- src/fragment/tests/adapter_tests.rs | 152 ++++++ src/fragment/tests/fragmenter_tests.rs | 132 +++++ src/fragment/tests/header_series_tests.rs | 109 ++++ src/fragment/tests/reassembler_tests.rs | 253 +++++++++ tests/features/fragment.feature | 4 +- tests/fragment_transport.rs | 12 +- tests/steps/fragment_steps.rs | 13 +- 16 files changed, 802 insertions(+), 638 deletions(-) create mode 100644 src/fragment/tests/adapter_tests.rs create mode 100644 src/fragment/tests/fragmenter_tests.rs create mode 100644 src/fragment/tests/header_series_tests.rs create mode 100644 src/fragment/tests/reassembler_tests.rs diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md index 4e4444f7..1ef95494 100644 --- a/docs/execplans/9-1-3-fragment-adapter-trait.md +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -209,8 +209,8 @@ Documentation currently needing alignment: contract, duplicate/out-of-order policy, purge ownership, opt-in semantics). - `docs/multi-packet-and-streaming-responses-design.md` (layer composition order references). -- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability- - maturity.md` (hardening narrative alignment). +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` + (hardening narrative alignment). - `docs/hardening-wireframe-a-guide-to-production-resilience.md` (hardening narrative alignment). - `docs/users-guide.md` (public configuration surface and behaviour changes). @@ -302,7 +302,7 @@ Go/no-go: 7. Update documentation and roadmap: `docs/generic-message-fragmentation-and-re-assembly-design.md`, `docs/multi-packet-and-streaming-responses-design.md`, - `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability- maturity.md`, + `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`, `docs/hardening-wireframe-a-guide-to-production-resilience.md`, `docs/users-guide.md`, and `docs/roadmap.md`. @@ -383,12 +383,15 @@ Validation evidence logs: Target public interface shape (exact naming may be adjusted during Stage A): - pub trait FragmentAdapter: Send + Sync { - type Error: std::error::Error + Send + Sync + 'static; - - fn fragment(&self, packet: E) - -> Result, crate::fragment::FragmentationError>; - fn reassemble(&mut self, packet: E) -> Result, Self::Error>; + pub trait FragmentAdapter: Send + Sync { + fn fragment( + &self, + packet: E, + ) -> Result, crate::fragment::FragmentationError>; + fn reassemble( + &mut self, + packet: E, + ) -> Result, crate::fragment::FragmentAdapterError>; fn purge_expired(&mut self) -> Vec; } diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 579c0901..a3e8bf9d 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -43,7 +43,7 @@ The implementation must satisfy the following core requirements: The feature is exposed as a public `FragmentAdapter` trait plus a default implementation (`DefaultFragmentAdapter`) that wraps `Fragmenter` and `Reassembler`. Protocol-specific stacks can still layer additional strategy -codecs around this adaptor. +codecs around this adapter. ```plaintext Socket I/O ↔ [Compression] ↔ FragmentAdapter ↔ Router/Handlers @@ -68,45 +68,24 @@ connection, and bytes buffered across in-flight assemblies. See model. ```rust -use dashmap::DashMap; -use std::sync::atomic::AtomicU64; -use std::time::{Duration, Instant}; - -pub trait FragmentAdapter { - fn fragment(&self, packet: E) -> Result, FragmentationError>; - fn reassemble(&mut self, packet: E) -> Result, FragmentAdapterError>; +pub trait FragmentAdapter: Send + Sync { + fn fragment(&self, packet: E) -> Result, FragmentationError>; + fn reassemble( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError>; fn purge_expired(&mut self) -> Vec; } pub struct DefaultFragmentAdapter { fragmenter: Fragmenter, reassembler: Reassembler, - /// Hard cap on the size of a reassembled logical message. - max_message_size: usize, - /// Timeout for completing a partial message reassembly. - reassembly_timeout: Duration, - /// Concurrently accessible map for in-flight message reassembly. - reassembly_buffers: DashMap, - /// Atomic counter for generating unique outbound message IDs. - next_outbound_msg_id: AtomicU64, -} - -/// State for a single, in-progress message reassembly. -struct PartialMessage { - /// The buffer holding the accumulating payload. - buffer: BytesMut, - /// The advertised total payload size, if known. - expected_total: Option, - /// The sequence number of the last fragment received. - last_sequence: u64, - /// The time the first fragment was received. - started_at: Instant, } ``` -The use of `dashmap::DashMap` allows for lock-free reads and sharded writes, -providing efficient and concurrent access to the reassembly buffers without -blocking the entire connection task. +`DefaultFragmentAdapter` delegates message-ID sequencing to `Fragmenter` and +in-flight reassembly state management (including timeout expiry) to +`Reassembler`. ### 3.2 Canonical fragment header (November 2025 update) @@ -140,7 +119,7 @@ control how headers are encoded on the wire, but it now produces a ### 3.3 Fragmenter helper (17 November 2025 update) -To simplify outbound slicing before the full adaptor arrives, the crate now +To simplify outbound slicing before the full adapter arrives, the crate now ships a small `Fragmenter` helper. It accepts a `NonZeroUsize` payload cap and creates sequential fragments tagged with `MessageId`, `FragmentIndex`, and the `is_last_fragment` flag described above. The helper exposes three entry points: @@ -292,10 +271,10 @@ async fn streamed() -> Response> { The reassembly logic is the most complex part of the feature and must be robust against errors and attacks. -1. **Header Decoding:** The adaptor reads from the socket buffer and calls +1. **Header Decoding:** The adapter reads from the socket buffer and calls `strategy.decode_header()`. If it returns `Ok(None)`, it waits for more data. -2. **Payload Extraction:** Once a header is decoded, the adaptor ensures the +2. **Payload Extraction:** Once a header is decoded, the adapter ensures the full payload for that fragment is available in the buffer before proceeding. 3. **Multiplexed State Management:** @@ -304,7 +283,7 @@ against errors and attacks. (if `is_final`) or an error (if not `is_final` and a non-multiplexed reassembly is already in progress). - - If `meta.msg_id` is `Some(id)`, the adaptor accesses the + - If `meta.msg_id` is `Some(id)`, the adapter accesses the `reassembly_buffers` map. - **New Message (**`.entry().or_insert_with(...)`**):** @@ -342,7 +321,7 @@ against errors and attacks. The outbound path is simpler and purely procedural. -1. **Size Check:** When `write(frame)` is called, the adaptor checks +1. **Size Check:** When `write(frame)` is called, the adapter checks `frame.len()` against `strategy.max_fragment_payload(&frame)`. 2. **No Fragmentation:** If the frame is small enough, it is passed directly to @@ -353,7 +332,7 @@ The outbound path is simpler and purely procedural. - A new `msg_id` is generated via `next_outbound_msg_id.fetch_add(1, Ordering::Relaxed)`. - - The adaptor iterates through the frame's payload in chunks of + - The adapter iterates through the frame's payload in chunks of `max_fragment_payload`. - For each chunk, it calls `strategy.encode_header()` to write the fragment @@ -384,11 +363,11 @@ This feature is designed as a foundational layer that other features build upon. | --------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | API Correctness | The FragmentStrategy trait and FragmentAdapter are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | | Functionality | A large logical frame is correctly split into N fragments, and a sequence of N fragments is correctly reassembled into the original frame. | An end-to-end test confirms byte-for-byte identity of a payload at the configured max_message_size after being fragmented and reassembled. | -| Multiplexing | The adaptor can correctly reassemble two messages whose fragments are interleaved. | A test sending fragments A1, B1, A2, B2, A3, B3 must result in two correctly reassembled messages, A and B. | -| Resilience | The adaptor protects against memory exhaustion from oversized messages. | A test sending fragments that exceed max_message_size must terminate the connection and not allocate beyond the configured cap (including allocator overhead). | -| Resilience | The adaptor protects against resource leaks from abandoned partial messages. | A test that sends an initial fragment but never the final one must result in the partial buffer being purged after the reassembly_timeout duration has passed. | -| Performance | The overhead for messages that do not require fragmentation is minimal. | A criterion benchmark passing a stream of small, non-fragmented frames through the FragmentAdapter must show < 5% throughput degradation compared to a build without the adaptor. | -| Resilience | The adaptor enforces the configured `max_message_size`, `fragment_payload_cap`, and `reassembly_timeout` used in production. | Benchmarks and regression tests assert the 16× message cap, per-fragment payload cap derived from buffer capacity, and a 30s timeout for purging stale assemblies (WireframeApp defaults). | +| Multiplexing | The adapter can correctly reassemble two messages whose fragments are interleaved. | A test sending fragments A1, B1, A2, B2, A3, B3 must result in two correctly reassembled messages, A and B. | +| Resilience | The adapter protects against memory exhaustion from oversized messages. | A test sending fragments that exceed max_message_size must terminate the connection and not allocate beyond the configured cap (including allocator overhead). | +| Resilience | The adapter protects against resource leaks from abandoned partial messages. | A test that sends an initial fragment but never the final one must result in the partial buffer being purged after the reassembly_timeout duration has passed. | +| Performance | The overhead for messages that do not require fragmentation is minimal. | A criterion benchmark passing a stream of small, non-fragmented frames through the FragmentAdapter must show < 5% throughput degradation compared to a build without the adapter. | +| Resilience | The adapter enforces the configured `max_message_size`, `fragment_payload_cap`, and `reassembly_timeout` used in production. | Benchmarks and regression tests assert the 16× message cap, per-fragment payload cap derived from buffer capacity, and a 30s timeout for purging stale assemblies (WireframeApp defaults). | ## 8. Design decisions (14 November 2025, updated 17 November 2025) @@ -420,7 +399,7 @@ This feature is designed as a foundational layer that other features build upon. - Changed app-level fragmentation configuration to explicit opt-in. Builders no longer auto-enable fragmentation; callers must use `enable_fragmentation()` or `fragmentation(Some(cfg))`. -- Assigned purge scheduling ownership to the adaptor caller via +- Assigned purge scheduling ownership to the adapter caller via `FragmentAdapter::purge_expired()`. `WireframeApp` drives this on timeout ticks, and external callers can schedule purges directly. diff --git a/docs/users-guide.md b/docs/users-guide.md index d6d6acff..4042aa7d 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -106,7 +106,10 @@ and a 100 ms read timeout. Clamp the length-delimited limit with `buffer_capacity` (length-delimited only), swap codecs with `with_codec`, and override the serializer with `with_serializer` when a different encoding strategy is required.[^3][^4] Custom protocols implement `FrameCodec` to -describe their framing rules. +describe their framing rules. Changing frame budgets with `buffer_capacity` or +swapping codecs with `with_codec` clears fragmentation settings, so call +`enable_fragmentation()` (or `fragmentation(Some(cfg))`) again when transport +fragmentation is required. Once a stream is accepted—either from a manual accept loop or via `WireframeServer`—`handle_connection(stream)` builds (or reuses) the middleware @@ -138,13 +141,14 @@ A codec implementation must: budget used by `enable_fragmentation`. Install a custom codec with `with_codec`. The builder disables fragmentation -when codecs change, so explicitly call `enable_fragmentation()` (or -`fragmentation(Some(cfg))`) afterwards when transport fragmentation is -required. Wireframe clones the codec per connection, so stateful codecs should -ensure `Clone` produces an independent state (for example, reset sequence -counters) when per-connection isolation is required. When a framed stream is -already available, use `send_response_framed_with_codec`, so responses pass -through `FrameCodec::wrap_payload`. +when codecs or the length-delimited frame budget change, so explicitly call +`enable_fragmentation()` (or `fragmentation(Some(cfg))`) afterwards when +transport fragmentation is required. Wireframe clones the codec per connection, +so stateful codecs should ensure `Clone` produces an independent state (for +example, reset sequence counters) when per-connection isolation is required. +When a framed stream is already available, use +`send_response_framed_with_codec`, so responses pass through +`FrameCodec::wrap_payload`. Assume `MyCodec` implements `FrameCodec`: diff --git a/src/app/builder/codec.rs b/src/app/builder/codec.rs index b967b1ae..3dd07ec2 100644 --- a/src/app/builder/codec.rs +++ b/src/app/builder/codec.rs @@ -58,10 +58,15 @@ where { /// Set the initial buffer capacity for framed reads. /// Clamped between 64 bytes and 16 MiB. + /// + /// This also clears any previously configured fragmentation settings. + /// Re-enable fragmentation explicitly with [`WireframeApp::enable_fragmentation`] + /// (or [`WireframeApp::fragmentation`]) after changing the frame budget. #[must_use] pub fn buffer_capacity(mut self, capacity: usize) -> Self { let capacity = clamp_frame_length(capacity); self.codec = LengthDelimitedFrameCodec::new(capacity); + self.fragmentation = None; self } } diff --git a/src/app/builder/core.rs b/src/app/builder/core.rs index c80523cc..29e7eb3b 100644 --- a/src/app/builder/core.rs +++ b/src/app/builder/core.rs @@ -162,30 +162,46 @@ where #[cfg(test)] mod tests { + use rstest::{fixture, rstest}; + use super::WireframeApp; use crate::{app::Envelope, codec::LengthDelimitedFrameCodec, serializer::BincodeSerializer}; - #[test] - fn builder_defaults_fragmentation_to_disabled() { - let app = WireframeApp::::new() - .expect("build app"); + type TestApp = WireframeApp; + + #[fixture] + fn app_builder() -> TestApp { + let app = TestApp::new().expect("build app"); + assert!( + app.fragmentation.is_none(), + "fixture expects default fragmentation disabled" + ); + app + } + + #[rstest] + fn builder_defaults_fragmentation_to_disabled(app_builder: TestApp) { + let app = app_builder; assert!(app.fragmentation.is_none()); } - #[test] - fn enable_fragmentation_requires_explicit_opt_in() { - let app = WireframeApp::::new() - .expect("build app") - .enable_fragmentation(); + #[rstest] + fn enable_fragmentation_requires_explicit_opt_in(app_builder: TestApp) { + let app = app_builder.enable_fragmentation(); assert!(app.fragmentation.is_some()); } - #[test] - fn with_codec_clears_fragmentation_to_require_reconfiguration() { - let app = WireframeApp::::new() - .expect("build app") + #[rstest] + fn with_codec_clears_fragmentation_to_require_reconfiguration(app_builder: TestApp) { + let app = app_builder .enable_fragmentation() .with_codec(LengthDelimitedFrameCodec::new(2048)); assert!(app.fragmentation.is_none()); } + + #[rstest] + fn buffer_capacity_clears_fragmentation_to_require_reconfiguration(app_builder: TestApp) { + let app = app_builder.enable_fragmentation().buffer_capacity(2048); + assert!(app.fragmentation.is_none()); + } } diff --git a/src/fragment/adapter.rs b/src/fragment/adapter.rs index b25fb4f1..a6a0137d 100644 --- a/src/fragment/adapter.rs +++ b/src/fragment/adapter.rs @@ -25,21 +25,21 @@ use super::{ pub enum FragmentAdapterError { /// Fragment payload marker/header decoding failed. #[error("decode error: {0}")] - Decode(DecodeError), + Decode(#[from] DecodeError), /// Reassembly ordering or size validation failed. #[error("reassembly error: {0}")] - Reassembly(ReassemblyError), + Reassembly(#[from] ReassemblyError), } /// Adapter contract for transport-level fragmentation and reassembly. -pub trait FragmentAdapter: Send + Sync { +pub trait FragmentAdapter: Send + Sync { /// Attempt to fragment a packet for outbound transport. /// /// # Errors /// /// Returns [`FragmentationError`] when payload chunking or header encoding /// fails. - fn fragment(&self, packet: E) -> Result, FragmentationError>; + fn fragment(&self, packet: E) -> Result, FragmentationError>; /// Attempt to reassemble an inbound packet. /// @@ -51,7 +51,8 @@ pub trait FragmentAdapter: Send + Sync { /// /// Returns [`FragmentAdapterError`] when fragment decoding fails or when /// ordering and size guarantees are violated. - fn reassemble(&mut self, packet: E) -> Result, FragmentAdapterError>; + fn reassemble(&mut self, packet: E) + -> Result, FragmentAdapterError>; /// Purge stale partial reassembly state. /// @@ -76,48 +77,13 @@ impl DefaultFragmentAdapter { } } - fn fragment_inner(&self, packet: E) -> Result, FragmentationError> { - fragment_packet(&self.fragmenter, packet) - } - - fn reassemble_inner( - &mut self, - packet: E, - ) -> Result, FragmentAdapterError> { - let parts = packet.into_fragment_parts(); - let id = parts.id(); - let correlation_id = parts.correlation_id(); - let payload = parts.into_payload(); - - match decode_fragment_payload(&payload) { - Ok(Some((header, fragment_payload))) => { - match self.reassembler.push(header, fragment_payload) { - Ok(Some(message)) => { - let rebuilt = - FragmentParts::new(id, correlation_id, message.into_payload()); - Ok(Some(E::from_fragment_parts(rebuilt))) - } - Ok(None) => Ok(None), - Err(err) => Err(FragmentAdapterError::Reassembly(err)), - } - } - Ok(None) => { - let passthrough = FragmentParts::new(id, correlation_id, payload); - Ok(Some(E::from_fragment_parts(passthrough))) - } - Err(err) => Err(FragmentAdapterError::Decode(err)), - } - } - - fn purge_expired_inner(&mut self) -> Vec { self.reassembler.purge_expired() } - /// Fragment outbound packet data. /// /// # Errors /// /// Returns [`FragmentationError`] when fragment emission fails. pub fn fragment(&self, packet: E) -> Result, FragmentationError> { - self.fragment_inner(packet) + fragment_packet(&self.fragmenter, packet) } /// Reassemble inbound packet data. @@ -129,21 +95,40 @@ impl DefaultFragmentAdapter { &mut self, packet: E, ) -> Result, FragmentAdapterError> { - self.reassemble_inner(packet) + let parts = packet.into_fragment_parts(); + let id = parts.id(); + let correlation_id = parts.correlation_id(); + let payload = parts.into_payload(); + + if let Some((header, fragment_payload)) = decode_fragment_payload(&payload)? { + match self.reassembler.push(header, fragment_payload)? { + Some(message) => { + let rebuilt = FragmentParts::new(id, correlation_id, message.into_payload()); + Ok(Some(E::from_fragment_parts(rebuilt))) + } + None => Ok(None), + } + } else { + let passthrough = FragmentParts::new(id, correlation_id, payload); + Ok(Some(E::from_fragment_parts(passthrough))) + } } /// Purge stale reassembly entries and return evicted identifiers. - pub fn purge_expired(&mut self) -> Vec { self.purge_expired_inner() } + pub fn purge_expired(&mut self) -> Vec { self.reassembler.purge_expired() } } -impl FragmentAdapter for DefaultFragmentAdapter { - fn fragment(&self, packet: E) -> Result, FragmentationError> { - self.fragment_inner(packet) +impl FragmentAdapter for DefaultFragmentAdapter { + fn fragment(&self, packet: E) -> Result, FragmentationError> { + DefaultFragmentAdapter::fragment(self, packet) } - fn reassemble(&mut self, packet: E) -> Result, FragmentAdapterError> { - self.reassemble_inner(packet) + fn reassemble( + &mut self, + packet: E, + ) -> Result, FragmentAdapterError> { + DefaultFragmentAdapter::reassemble(self, packet) } - fn purge_expired(&mut self) -> Vec { self.purge_expired_inner() } + fn purge_expired(&mut self) -> Vec { DefaultFragmentAdapter::purge_expired(self) } } diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs index b4304588..5eca40cc 100644 --- a/src/fragment/reassembler.rs +++ b/src/fragment/reassembler.rs @@ -181,7 +181,14 @@ impl Reassembler { vacant.insert(PartialMessage::new(series, payload, now)); Ok(None) } - FragmentStatus::Duplicate => Ok(None), + FragmentStatus::Duplicate => { + debug_assert!( + false, + "newly created FragmentSeries starts at index 0; a first fragment \ + cannot be duplicate" + ); + Ok(None) + } FragmentStatus::Complete => Ok(Some(ReassembledMessage::new( header.message_id(), payload.to_vec(), diff --git a/src/fragment/series.rs b/src/fragment/series.rs index a358027a..58f6b6b7 100644 --- a/src/fragment/series.rs +++ b/src/fragment/series.rs @@ -61,7 +61,7 @@ impl FragmentSeries { /// Returns [`FragmentError::MessageMismatch`] when the fragment belongs to /// a different message, [`FragmentError::IndexMismatch`] when the fragment /// arrives ahead of the expected index, [`FragmentError::SeriesComplete`] - /// when the series already consumed a final fragment, and + /// when a non-duplicate fragment arrives after completion, and /// [`FragmentError::IndexOverflow`] when the fragment index cannot advance /// further. /// @@ -75,14 +75,14 @@ impl FragmentSeries { }); } - if self.complete { - return Err(FragmentError::SeriesComplete); - } - if fragment.fragment_index() < self.next_index { return Ok(FragmentStatus::Duplicate); } + if self.complete { + return Err(FragmentError::SeriesComplete); + } + if fragment.fragment_index() > self.next_index { return Err(FragmentError::IndexMismatch { expected: self.next_index, diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index c89dc434..9394dc59 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -1,507 +1,9 @@ //! Unit tests for the fragmentation and reassembly subsystem. //! -//! Covers `FragmentHeader` field access, `FragmentSeries` ordering and -//! validation, `Fragmenter` splitting and message ID management, and -//! `Reassembler` assembly with size limits and expiry handling. +//! Tests are split into focused submodules to keep each file short and easy +//! to navigate. -use std::{ - num::NonZeroUsize, - time::{Duration, Instant}, -}; - -use bincode::{BorrowDecode, Encode}; -use rstest::rstest; - -use super::*; -use crate::fragment::fragmenter::FragmentCursor; - -fn setup_reassembler_with_first_fragment( - message_id: u64, - first_payload: impl AsRef<[u8]>, -) -> Reassembler { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(30), - ); - let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false); - assert!( - reassembler - .push(first, first_payload) - .expect("first fragment accepted") - .is_none() - ); - reassembler -} - -#[test] -fn fragment_header_exposes_fields() { - let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::new(2), true); - assert_eq!(header.message_id(), MessageId::new(9)); - assert_eq!(header.fragment_index(), FragmentIndex::new(2)); - assert!(header.is_last_fragment()); -} - -#[rstest] -#[case(1)] -#[case(5)] -fn series_accepts_sequential_fragments(#[case] message: u64) { - let mut series = FragmentSeries::new(MessageId::new(message)); - let first = FragmentHeader::new(MessageId::new(message), FragmentIndex::zero(), false); - let second = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(1), true); - - assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); - assert_eq!(series.accept(second), Ok(FragmentStatus::Complete)); - assert!(series.is_complete()); -} - -#[test] -fn series_rejects_other_message() { - let mut series = FragmentSeries::new(MessageId::new(7)); - let header = FragmentHeader::new(MessageId::new(8), FragmentIndex::zero(), false); - let err = series - .accept(header) - .expect_err("fragment from another message must be rejected"); - assert!(matches!(err, FragmentError::MessageMismatch { .. })); -} - -#[test] -fn series_rejects_out_of_order_fragment() { - let mut series = FragmentSeries::new(MessageId::new(7)); - let header = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(2), false); - let err = series - .accept(header) - .expect_err("out-of-order fragment must be rejected"); - assert!(matches!(err, FragmentError::IndexMismatch { .. })); -} - -#[test] -fn series_suppresses_duplicate_fragment() { - let mut series = FragmentSeries::new(MessageId::new(7)); - let first = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); - let duplicate = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); - let final_fragment = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(1), true); - - assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); - assert_eq!(series.accept(duplicate), Ok(FragmentStatus::Duplicate)); - assert_eq!(series.accept(final_fragment), Ok(FragmentStatus::Complete)); -} - -#[test] -fn series_rejects_after_completion() { - let mut series = FragmentSeries::new(MessageId::new(1)); - let first = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); - assert_eq!(series.accept(first), Ok(FragmentStatus::Complete)); - let err = series - .accept(FragmentHeader::new( - MessageId::new(1), - FragmentIndex::new(1), - true, - )) - .expect_err("series must reject fragments after completion"); - assert!(matches!(err, FragmentError::SeriesComplete)); -} - -#[test] -fn series_detects_index_overflow() { - let mut series = FragmentSeries::new(MessageId::new(1)); - series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); - let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::new(u32::MAX), false); - let err = series - .accept(header) - .expect_err("overflow must raise an error"); - assert_eq!( - err, - FragmentError::IndexOverflow { - last: FragmentIndex::new(u32::MAX) - } - ); -} - -#[test] -fn series_accepts_final_fragment_at_max_index() { - let mut series = FragmentSeries::new(MessageId::new(2)); - series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); - let header = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(u32::MAX), true); - assert_eq!(series.accept(header), Ok(FragmentStatus::Complete)); - assert!(series.is_complete()); -} - -#[test] -fn fragmenter_splits_payload_into_multiple_frames() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(3).expect("non-zero")); - let payload: Vec = (0..8).collect(); - let batch = fragmenter - .fragment_bytes(payload) - .expect("fragment payload"); - - assert_eq!(batch.len(), 3); - assert!(batch.is_fragmented()); - assert_eq!(batch.message_id(), MessageId::new(0)); - - assert_fragment(&batch, 0, &[0, 1, 2], false); - assert_fragment(&batch, 1, &[3, 4, 5], false); - assert_fragment(&batch, 2, &[6, 7], true); -} - -#[test] -fn fragmenter_handles_empty_payload() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(8).expect("non-zero")); - let batch = fragmenter.fragment_bytes([]).expect("fragment empty"); - - assert_eq!(batch.len(), 1); - assert!(!batch.is_fragmented()); - let fragment = batch - .fragments() - .first() - .expect("batch should contain at least one fragment"); - assert!(fragment.payload().is_empty()); - assert!(fragment.header().is_last_fragment()); - assert_eq!(fragment.header().fragment_index(), FragmentIndex::zero()); -} - -#[derive(Debug, Encode, BorrowDecode)] -struct DummyMessage(Vec); - -#[derive(Clone, Debug, PartialEq, Eq)] -struct TestPacket { - id: u32, - correlation_id: Option, - payload: Vec, -} - -impl Fragmentable for TestPacket { - fn into_fragment_parts(self) -> FragmentParts { - FragmentParts::new(self.id, self.correlation_id, self.payload) - } - - fn from_fragment_parts(parts: FragmentParts) -> Self { - Self { - id: parts.id(), - correlation_id: parts.correlation_id(), - payload: parts.into_payload(), - } - } -} - -fn assert_fragment(batch: &FragmentBatch, index: usize, payload: &[u8], is_last: bool) { - let fragment = batch - .fragments() - .get(index) - .expect("fragment missing at requested index"); - assert_eq!(fragment.payload(), payload); - assert_eq!(fragment.header().is_last_fragment(), is_last); -} - -#[test] -fn fragmenter_fragments_messages_and_increments_ids() { - let fragmenter = - Fragmenter::with_starting_id(NonZeroUsize::new(4).expect("non-zero"), MessageId::new(7)); - - let batch = fragmenter - .fragment_message(&DummyMessage(vec![1, 2, 3, 4, 5])) - .expect("fragment message"); - assert_eq!(batch.message_id(), MessageId::new(7)); - assert_eq!(batch.len(), 2); - assert!(batch.is_fragmented()); - - let next_payload = vec![9, 9, 9]; - let next = fragmenter - .fragment_bytes(next_payload) - .expect("fragment bytes"); - assert_eq!(next.message_id(), MessageId::new(8)); - assert_eq!(next.len(), 1); - assert!(!next.is_fragmented()); -} - -#[test] -fn fragment_batch_into_iterator_yields_all_fragments() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); - let payload = [1_u8, 2, 3]; - let batch = fragmenter - .fragment_bytes(payload) - .expect("split into fragments"); - - let payloads: Vec> = batch - .into_iter() - .map(|fragment| fragment.payload().to_vec()) - .collect(); - assert_eq!(payloads, vec![vec![1, 2], vec![3]]); -} - -#[test] -fn fragmenter_respects_explicit_message_ids() { - let fragmenter = - Fragmenter::with_starting_id(NonZeroUsize::new(2).expect("non-zero"), MessageId::new(10)); - let payload = [7_u8, 8, 9]; - let batch = fragmenter - .fragment_with_id(MessageId::new(500), payload) - .expect("fragment with explicit id"); - assert_eq!(batch.message_id(), MessageId::new(500)); - assert_eq!(batch.len(), 2); - - let next = fragmenter.fragment_bytes([1_u8]).expect("next fragment"); - assert_eq!(next.message_id(), MessageId::new(10)); -} - -#[test] -fn fragmenter_returns_error_for_out_of_bounds_slice() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(4).expect("non-zero")); - let payload = [1_u8, 2, 3, 4]; - - let err = fragmenter - .build_fragments_from_for_tests( - MessageId::new(1), - &payload, - FragmentCursor::new(payload.len() + 1, FragmentIndex::zero()), - ) - .expect_err("invalid slice should produce an error"); - match err { - FragmentationError::SliceBounds { offset, end, total } => { - assert_eq!(offset, payload.len() + 1); - assert_eq!(end, payload.len() + 1); - assert_eq!(total, payload.len()); - } - other => panic!("expected SliceBounds, got {other:?}"), - } -} - -#[test] -fn default_fragment_adapter_exposes_purge_api() { - let config = FragmentationConfig { - fragment_payload_cap: NonZeroUsize::new(8).expect("non-zero"), - max_message_size: NonZeroUsize::new(16).expect("non-zero"), - reassembly_timeout: Duration::ZERO, - }; - let mut adapter = DefaultFragmentAdapter::new(config); - let header = FragmentHeader::new(MessageId::new(81), FragmentIndex::zero(), false); - let encoded_payload = encode_fragment_payload(header, &[1_u8, 2]).expect("encode fragment"); - let packet = TestPacket { - id: 42, - correlation_id: Some(7), - payload: encoded_payload, - }; - - assert!( - adapter - .reassemble(packet) - .expect("adapter should accept first fragment") - .is_none() - ); - assert_eq!(adapter.purge_expired(), vec![MessageId::new(81)]); -} - -#[test] -fn reassembler_allows_single_fragment_at_max_message_size() { - let max_message_size = NonZeroUsize::new(16).expect("non-zero"); - let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); - - let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true); - let payload = vec![0_u8; max_message_size.get()]; - - let result = reassembler - .push(header, payload) - .expect("fragment within limit should be accepted"); - - let assembled = result.expect("single fragment should complete reassembly"); - assert_eq!(assembled.payload().len(), max_message_size.get()); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_allows_multi_fragment_at_max_message_size() { - let max_message_size = NonZeroUsize::new(16).expect("non-zero"); - let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); - - let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false); - let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true); - - let first_payload = vec![0_u8; 8]; - let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()]; - - assert!( - reassembler - .push(first_header, first_payload) - .expect("first fragment within limit") - .is_none(), - "first fragment should not complete the message", - ); - - let result = reassembler - .push(second_header, second_payload) - .expect("second fragment keeps total at limit"); - - let assembled = result.expect("fragments should complete reassembly at exact limit"); - assert_eq!(assembled.payload().len(), max_message_size.get()); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_returns_single_fragment_immediately() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(16).expect("non-zero"), - Duration::from_secs(5), - ); - let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); - let payload = vec![1_u8, 2, 3, 4]; - - let complete = reassembler - .push(header, payload.clone()) - .expect("reassembly must succeed") - .expect("single fragment should complete message"); - - assert_eq!(complete.message_id(), MessageId::new(1)); - assert_eq!(complete.payload(), payload.as_slice()); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_accumulates_ordered_fragments() { - let mut reassembler = setup_reassembler_with_first_fragment(2, [5_u8, 6, 7]); - let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); - - let complete = reassembler - .push(final_fragment, [8_u8, 9]) - .expect("final fragment accepted") - .expect("message should complete"); - - assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_rejects_out_of_order_and_drops_partial() { - let mut reassembler = setup_reassembler_with_first_fragment(3, [1_u8, 2]); - let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); - - let err = reassembler - .push(skipped, [3_u8]) - .expect_err("out-of-order fragment must be rejected"); - assert!(matches!( - err, - ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) - )); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_suppresses_duplicate_fragment() { - let mut reassembler = setup_reassembler_with_first_fragment(31, [1_u8, 2]); - let duplicate = FragmentHeader::new(MessageId::new(31), FragmentIndex::zero(), false); - let final_fragment = FragmentHeader::new(MessageId::new(31), FragmentIndex::new(1), true); - - assert!( - reassembler - .push(duplicate, [9_u8, 9]) - .expect("duplicate fragment should be suppressed") - .is_none() - ); - assert_eq!(reassembler.buffered_len(), 1); - - let complete = reassembler - .push(final_fragment, [3_u8]) - .expect("final fragment should complete message") - .expect("message should be complete"); - assert_eq!(complete.payload(), &[1, 2, 3]); -} - -#[test] -fn reassembler_accepts_zero_length_fragments() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(10), - ); - let first = FragmentHeader::new(MessageId::new(44), FragmentIndex::zero(), false); - let second = FragmentHeader::new(MessageId::new(44), FragmentIndex::new(1), true); - - assert!( - reassembler - .push(first, []) - .expect("empty fragment should be accepted") - .is_none() - ); - - let complete = reassembler - .push(second, [7_u8, 8]) - .expect("final fragment should complete message") - .expect("message should be complete"); - assert_eq!(complete.payload(), &[7, 8]); -} - -#[test] -fn reassembler_enforces_maximum_payload_size() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(4).expect("non-zero"), - Duration::from_secs(30), - ); - let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false); - let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true); - - assert!( - reassembler - .push(first, [1_u8, 2, 3]) - .expect("first fragment accepted") - .is_none() - ); - - let err = reassembler - .push(final_fragment, [4_u8, 5]) - .expect_err("payload growth beyond cap must be rejected"); - assert_eq!( - err, - ReassemblyError::MessageTooLarge { - message_id: MessageId::new(4), - attempted: 5, - limit: NonZeroUsize::new(4).expect("non-zero"), - } - ); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[test] -fn reassembler_purges_expired_messages() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(2), - ); - let now = Instant::now(); - let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false); - - assert!( - reassembler - .push_at(header, [0_u8, 1], now) - .expect("first fragment accepted") - .is_none() - ); - assert_eq!(reassembler.buffered_len(), 1); - - let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3)); - assert_eq!(evicted, vec![MessageId::new(5)]); - assert_eq!(reassembler.buffered_len(), 0); -} - -#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)] -struct ExampleMessage(u8); - -#[test] -fn reassembler_decodes_reconstructed_message() { - let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); - let batch = fragmenter - .fragment_message(&ExampleMessage(11)) - .expect("fragment message"); - let mut reassembler = Reassembler::new( - NonZeroUsize::new(4).expect("non-zero"), - Duration::from_secs(10), - ); - - let mut output = None; - for fragment in batch { - let (header, payload) = fragment.into_parts(); - output = reassembler - .push(header, payload) - .expect("fragment accepted"); - } - - let assembled = output.expect("message should complete"); - let decoded: ExampleMessage = assembled.decode().expect("decode message"); - assert_eq!(decoded, ExampleMessage(11)); -} +mod adapter_tests; +mod fragmenter_tests; +mod header_series_tests; +mod reassembler_tests; diff --git a/src/fragment/tests/adapter_tests.rs b/src/fragment/tests/adapter_tests.rs new file mode 100644 index 00000000..e0d5b23c --- /dev/null +++ b/src/fragment/tests/adapter_tests.rs @@ -0,0 +1,152 @@ +//! Tests for the public default fragment adapter wrapper. + +use std::{num::NonZeroUsize, time::Duration}; + +use crate::fragment::{ + DefaultFragmentAdapter, + FragmentHeader, + FragmentIndex, + FragmentParts, + Fragmentable, + FragmentationConfig, + MessageId, + encode_fragment_payload, +}; + +#[derive(Clone, Debug, PartialEq, Eq)] +struct TestPacket { + id: u32, + correlation_id: Option, + payload: Vec, +} + +impl Fragmentable for TestPacket { + fn into_fragment_parts(self) -> FragmentParts { + FragmentParts::new(self.id, self.correlation_id, self.payload) + } + + fn from_fragment_parts(parts: FragmentParts) -> Self { + Self { + id: parts.id(), + correlation_id: parts.correlation_id(), + payload: parts.into_payload(), + } + } +} + +fn adapter_config() -> FragmentationConfig { + FragmentationConfig { + fragment_payload_cap: NonZeroUsize::new(4).expect("non-zero"), + max_message_size: NonZeroUsize::new(64).expect("non-zero"), + reassembly_timeout: Duration::from_secs(30), + } +} + +fn build_test_packet() -> TestPacket { + TestPacket { + id: 42, + correlation_id: Some(777), + payload: (0_u8..10).collect(), + } +} + +fn assert_fragment_metadata(fragments: &[TestPacket], packet: &TestPacket) { + assert!( + fragments.len() > 1, + "payload beyond fragment cap should produce multiple fragments" + ); + for fragment in fragments { + assert_eq!(fragment.id, packet.id); + assert_eq!(fragment.correlation_id, packet.correlation_id); + } +} + +fn reassemble_fragment_sequence( + adapter: &mut DefaultFragmentAdapter, + fragments: &[TestPacket], +) -> Vec { + let first = fragments + .first() + .cloned() + .expect("fragment list must contain at least one fragment"); + assert!( + adapter + .reassemble(first.clone()) + .expect("first fragment should be accepted") + .is_none() + ); + assert!( + adapter + .reassemble(first) + .expect("duplicate fragment should be suppressed") + .is_none() + ); + + fragments + .iter() + .skip(1) + .cloned() + .filter_map(|fragment| { + adapter + .reassemble(fragment) + .expect("reassembly should not fail") + }) + .collect() +} + +#[test] +fn default_fragment_adapter_fragments_and_reassembles_test_packets() { + let mut adapter = DefaultFragmentAdapter::new(adapter_config()); + let packet = build_test_packet(); + + let fragments = adapter + .fragment(packet.clone()) + .expect("fragmenting packet should succeed"); + assert_fragment_metadata(&fragments, &packet); + + let reconstructed = reassemble_fragment_sequence(&mut adapter, &fragments); + assert_eq!(reconstructed.len(), 1); + assert_eq!( + reconstructed.first(), + Some(&packet), + "expected exactly one reconstructed packet matching input" + ); +} + +#[test] +fn default_fragment_adapter_passes_through_non_fragment_payloads() { + let mut adapter = DefaultFragmentAdapter::new(adapter_config()); + let packet = TestPacket { + id: 12, + correlation_id: Some(9), + payload: b"not encoded as fragment payload".to_vec(), + }; + + let result = adapter + .reassemble(packet.clone()) + .expect("non-fragment payload should pass through"); + + assert_eq!(result, Some(packet)); +} + +#[test] +fn default_fragment_adapter_exposes_purge_api() { + let mut config = adapter_config(); + config.reassembly_timeout = Duration::ZERO; + let mut adapter = DefaultFragmentAdapter::new(config); + let header = FragmentHeader::new(MessageId::new(81), FragmentIndex::zero(), false); + let encoded_payload = encode_fragment_payload(header, &[1_u8, 2]).expect("encode fragment"); + let packet = TestPacket { + id: 42, + correlation_id: Some(7), + payload: encoded_payload, + }; + + assert!( + adapter + .reassemble(packet) + .expect("adapter should accept first fragment") + .is_none() + ); + assert_eq!(adapter.purge_expired(), vec![MessageId::new(81)]); +} diff --git a/src/fragment/tests/fragmenter_tests.rs b/src/fragment/tests/fragmenter_tests.rs new file mode 100644 index 00000000..de29f893 --- /dev/null +++ b/src/fragment/tests/fragmenter_tests.rs @@ -0,0 +1,132 @@ +//! Tests for outbound fragmentation and fragment batch helpers. + +use std::num::NonZeroUsize; + +use bincode::{BorrowDecode, Encode}; + +use crate::fragment::{ + FragmentBatch, + FragmentIndex, + FragmentationError, + Fragmenter, + MessageId, + fragmenter::FragmentCursor, +}; + +#[derive(Debug, Encode, BorrowDecode)] +struct DummyMessage(Vec); + +fn assert_fragment(batch: &FragmentBatch, index: usize, payload: &[u8], is_last: bool) { + let fragment = batch + .fragments() + .get(index) + .expect("fragment missing at requested index"); + assert_eq!(fragment.payload(), payload); + assert_eq!(fragment.header().is_last_fragment(), is_last); +} + +#[test] +fn fragmenter_splits_payload_into_multiple_frames() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(3).expect("non-zero")); + let payload: Vec = (0..8).collect(); + let batch = fragmenter + .fragment_bytes(payload) + .expect("fragment payload"); + + assert_eq!(batch.len(), 3); + assert!(batch.is_fragmented()); + assert_eq!(batch.message_id(), MessageId::new(0)); + + assert_fragment(&batch, 0, &[0, 1, 2], false); + assert_fragment(&batch, 1, &[3, 4, 5], false); + assert_fragment(&batch, 2, &[6, 7], true); +} + +#[test] +fn fragmenter_handles_empty_payload() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(8).expect("non-zero")); + let batch = fragmenter.fragment_bytes([]).expect("fragment empty"); + + assert_eq!(batch.len(), 1); + assert!(!batch.is_fragmented()); + let fragment = batch + .fragments() + .first() + .expect("batch should contain at least one fragment"); + assert!(fragment.payload().is_empty()); + assert!(fragment.header().is_last_fragment()); + assert_eq!(fragment.header().fragment_index(), FragmentIndex::zero()); +} + +#[test] +fn fragmenter_fragments_messages_and_increments_ids() { + let fragmenter = + Fragmenter::with_starting_id(NonZeroUsize::new(4).expect("non-zero"), MessageId::new(7)); + + let batch = fragmenter + .fragment_message(&DummyMessage(vec![1, 2, 3, 4, 5])) + .expect("fragment message"); + assert_eq!(batch.message_id(), MessageId::new(7)); + assert_eq!(batch.len(), 2); + assert!(batch.is_fragmented()); + + let next_payload = vec![9, 9, 9]; + let next = fragmenter + .fragment_bytes(next_payload) + .expect("fragment bytes"); + assert_eq!(next.message_id(), MessageId::new(8)); + assert_eq!(next.len(), 1); + assert!(!next.is_fragmented()); +} + +#[test] +fn fragment_batch_into_iterator_yields_all_fragments() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); + let payload = [1_u8, 2, 3]; + let batch = fragmenter + .fragment_bytes(payload) + .expect("split into fragments"); + + let payloads: Vec> = batch + .into_iter() + .map(|fragment| fragment.payload().to_vec()) + .collect(); + assert_eq!(payloads, vec![vec![1, 2], vec![3]]); +} + +#[test] +fn fragmenter_respects_explicit_message_ids() { + let fragmenter = + Fragmenter::with_starting_id(NonZeroUsize::new(2).expect("non-zero"), MessageId::new(10)); + let payload = [7_u8, 8, 9]; + let batch = fragmenter + .fragment_with_id(MessageId::new(500), payload) + .expect("fragment with explicit id"); + assert_eq!(batch.message_id(), MessageId::new(500)); + assert_eq!(batch.len(), 2); + + let next = fragmenter.fragment_bytes([1_u8]).expect("next fragment"); + assert_eq!(next.message_id(), MessageId::new(10)); +} + +#[test] +fn fragmenter_returns_error_for_out_of_bounds_slice() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(4).expect("non-zero")); + let payload = [1_u8, 2, 3, 4]; + + let err = fragmenter + .build_fragments_from_for_tests( + MessageId::new(1), + &payload, + FragmentCursor::new(payload.len() + 1, FragmentIndex::zero()), + ) + .expect_err("invalid slice should produce an error"); + match err { + FragmentationError::SliceBounds { offset, end, total } => { + assert_eq!(offset, payload.len() + 1); + assert_eq!(end, payload.len() + 1); + assert_eq!(total, payload.len()); + } + other => panic!("expected SliceBounds, got {other:?}"), + } +} diff --git a/src/fragment/tests/header_series_tests.rs b/src/fragment/tests/header_series_tests.rs new file mode 100644 index 00000000..bce8fa83 --- /dev/null +++ b/src/fragment/tests/header_series_tests.rs @@ -0,0 +1,109 @@ +//! Tests for fragment header accessors and fragment-series sequencing rules. + +use rstest::rstest; + +use crate::fragment::*; + +#[test] +fn fragment_header_exposes_fields() { + let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::new(2), true); + assert_eq!(header.message_id(), MessageId::new(9)); + assert_eq!(header.fragment_index(), FragmentIndex::new(2)); + assert!(header.is_last_fragment()); +} + +#[rstest] +#[case(1)] +#[case(5)] +fn series_accepts_sequential_fragments(#[case] message: u64) { + let mut series = FragmentSeries::new(MessageId::new(message)); + let first = FragmentHeader::new(MessageId::new(message), FragmentIndex::zero(), false); + let second = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(1), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); + assert_eq!(series.accept(second), Ok(FragmentStatus::Complete)); + assert!(series.is_complete()); +} + +#[test] +fn series_rejects_other_message() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let header = FragmentHeader::new(MessageId::new(8), FragmentIndex::zero(), false); + let err = series + .accept(header) + .expect_err("fragment from another message must be rejected"); + assert!(matches!(err, FragmentError::MessageMismatch { .. })); +} + +#[test] +fn series_rejects_out_of_order_fragment() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let header = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(2), false); + let err = series + .accept(header) + .expect_err("out-of-order fragment must be rejected"); + assert!(matches!(err, FragmentError::IndexMismatch { .. })); +} + +#[test] +fn series_suppresses_duplicate_fragment() { + let mut series = FragmentSeries::new(MessageId::new(7)); + let first = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); + let duplicate = FragmentHeader::new(MessageId::new(7), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(7), FragmentIndex::new(1), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Incomplete)); + assert_eq!(series.accept(duplicate), Ok(FragmentStatus::Duplicate)); + assert_eq!(series.accept(final_fragment), Ok(FragmentStatus::Complete)); +} + +#[test] +fn series_rejects_after_completion() { + let mut series = FragmentSeries::new(MessageId::new(1)); + let first = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); + assert_eq!(series.accept(first), Ok(FragmentStatus::Complete)); + let err = series + .accept(FragmentHeader::new( + MessageId::new(1), + FragmentIndex::new(1), + true, + )) + .expect_err("series must reject fragments after completion"); + assert!(matches!(err, FragmentError::SeriesComplete)); +} + +#[test] +fn series_reports_duplicate_final_fragment_after_completion() { + let mut series = FragmentSeries::new(MessageId::new(12)); + let first = FragmentHeader::new(MessageId::new(12), FragmentIndex::zero(), true); + let duplicate = FragmentHeader::new(MessageId::new(12), FragmentIndex::zero(), true); + + assert_eq!(series.accept(first), Ok(FragmentStatus::Complete)); + assert_eq!(series.accept(duplicate), Ok(FragmentStatus::Duplicate)); + assert!(series.is_complete()); +} + +#[test] +fn series_detects_index_overflow() { + let mut series = FragmentSeries::new(MessageId::new(1)); + series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); + let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::new(u32::MAX), false); + let err = series + .accept(header) + .expect_err("overflow must raise an error"); + assert_eq!( + err, + FragmentError::IndexOverflow { + last: FragmentIndex::new(u32::MAX) + } + ); +} + +#[test] +fn series_accepts_final_fragment_at_max_index() { + let mut series = FragmentSeries::new(MessageId::new(2)); + series.force_next_index_for_tests(FragmentIndex::new(u32::MAX)); + let header = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(u32::MAX), true); + assert_eq!(series.accept(header), Ok(FragmentStatus::Complete)); + assert!(series.is_complete()); +} diff --git a/src/fragment/tests/reassembler_tests.rs b/src/fragment/tests/reassembler_tests.rs new file mode 100644 index 00000000..7a130412 --- /dev/null +++ b/src/fragment/tests/reassembler_tests.rs @@ -0,0 +1,253 @@ +//! Tests for inbound reassembly ordering, limits, and decoding. + +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use bincode::{BorrowDecode, Encode}; + +use crate::fragment::{ + FragmentError, + FragmentHeader, + FragmentIndex, + Fragmenter, + MessageId, + ReassembledMessage, + Reassembler, + ReassemblyError, +}; + +fn setup_reassembler_with_first_fragment( + message_id: u64, + first_payload: impl AsRef<[u8]>, +) -> Reassembler { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false); + assert!( + reassembler + .push(first, first_payload) + .expect("first fragment accepted") + .is_none() + ); + reassembler +} + +#[test] +fn reassembler_allows_single_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true); + let payload = vec![0_u8; max_message_size.get()]; + + let result = reassembler + .push(header, payload) + .expect("fragment within limit should be accepted"); + + let assembled = result.expect("single fragment should complete reassembly"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_allows_multi_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false); + let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true); + + let first_payload = vec![0_u8; 8]; + let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()]; + + assert!( + reassembler + .push(first_header, first_payload) + .expect("first fragment within limit") + .is_none(), + "first fragment should not complete the message", + ); + + let result = reassembler + .push(second_header, second_payload) + .expect("second fragment keeps total at limit"); + + let assembled = result.expect("fragments should complete reassembly at exact limit"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_returns_single_fragment_immediately() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(16).expect("non-zero"), + Duration::from_secs(5), + ); + let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); + let payload = vec![1_u8, 2, 3, 4]; + + let complete = reassembler + .push(header, payload.clone()) + .expect("reassembly must succeed") + .expect("single fragment should complete message"); + + assert_eq!(complete.message_id(), MessageId::new(1)); + assert_eq!(complete.payload(), payload.as_slice()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_accumulates_ordered_fragments() { + let mut reassembler = setup_reassembler_with_first_fragment(2, [5_u8, 6, 7]); + let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); + + let complete = reassembler + .push(final_fragment, [8_u8, 9]) + .expect("final fragment accepted") + .expect("message should complete"); + + assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_rejects_out_of_order_and_drops_partial() { + let mut reassembler = setup_reassembler_with_first_fragment(3, [1_u8, 2]); + let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); + + let err = reassembler + .push(skipped, [3_u8]) + .expect_err("out-of-order fragment must be rejected"); + assert!(matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + )); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_suppresses_duplicate_fragment() { + let mut reassembler = setup_reassembler_with_first_fragment(31, [1_u8, 2]); + let duplicate = FragmentHeader::new(MessageId::new(31), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(31), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(duplicate, [9_u8, 9]) + .expect("duplicate fragment should be suppressed") + .is_none() + ); + assert_eq!(reassembler.buffered_len(), 1); + + let complete = reassembler + .push(final_fragment, [3_u8]) + .expect("final fragment should complete message") + .expect("message should be complete"); + assert_eq!(complete.payload(), &[1, 2, 3]); +} + +#[test] +fn reassembler_accepts_zero_length_fragments() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(10), + ); + let first = FragmentHeader::new(MessageId::new(44), FragmentIndex::zero(), false); + let second = FragmentHeader::new(MessageId::new(44), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, []) + .expect("empty fragment should be accepted") + .is_none() + ); + + let complete = reassembler + .push(second, [7_u8, 8]) + .expect("final fragment should complete message") + .expect("message should be complete"); + assert_eq!(complete.payload(), &[7, 8]); +} + +#[test] +fn reassembler_enforces_maximum_payload_size() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, [1_u8, 2, 3]) + .expect("first fragment accepted") + .is_none() + ); + + let err = reassembler + .push(final_fragment, [4_u8, 5]) + .expect_err("payload growth beyond cap must be rejected"); + assert_eq!( + err, + ReassemblyError::MessageTooLarge { + message_id: MessageId::new(4), + attempted: 5, + limit: NonZeroUsize::new(4).expect("non-zero"), + } + ); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_purges_expired_messages() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(2), + ); + let now = Instant::now(); + let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false); + + assert!( + reassembler + .push_at(header, [0_u8, 1], now) + .expect("first fragment accepted") + .is_none() + ); + assert_eq!(reassembler.buffered_len(), 1); + + let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3)); + assert_eq!(evicted, vec![MessageId::new(5)]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)] +struct ExampleMessage(u8); + +#[test] +fn reassembler_decodes_reconstructed_message() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); + let batch = fragmenter + .fragment_message(&ExampleMessage(11)) + .expect("fragment message"); + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(10), + ); + + let mut output: Option = None; + for fragment in batch { + let (header, payload) = fragment.into_parts(); + output = reassembler + .push(header, payload) + .expect("fragment accepted"); + } + + let assembled = output.expect("message should complete"); + let decoded: ExampleMessage = assembled.decode().expect("decode message"); + assert_eq!(decoded, ExampleMessage(11)); +} diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index 2620b812..a76445d8 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -81,7 +81,7 @@ Feature: Fragment metadata enforcement When fragment 0 for message 25 with 2 bytes arrives marked non-final And fragment 0 for message 25 with 2 bytes arrives marked non-final Then no message has been reassembled yet - And the reassembler is buffering 1 messages + And the reassembler is buffering 1 message When fragment 1 for message 25 with 1 bytes arrives marked final Then the reassembler outputs a payload of 3 bytes And the reassembler is buffering 0 messages @@ -98,7 +98,7 @@ Feature: Fragment metadata enforcement And fragment 0 for message 28 with 4 bytes arrives marked non-final And fragment 1 for message 27 with 2 bytes arrives marked final Then the reassembler outputs a payload of 5 bytes - And the reassembler is buffering 1 messages + And the reassembler is buffering 1 message When fragment 1 for message 28 with 1 bytes arrives marked final Then the reassembler outputs a payload of 5 bytes And the reassembler is buffering 0 messages diff --git a/tests/fragment_transport.rs b/tests/fragment_transport.rs index 06b985fb..9ba363c2 100644 --- a/tests/fragment_transport.rs +++ b/tests/fragment_transport.rs @@ -256,8 +256,16 @@ async fn interleaved_fragment_streams_reassemble_independently() -> TestResult { client.flush().await?; client.get_mut().shutdown().await?; - let response_a = read_reassembled_response(&mut client, &config).await?; - let response_b = read_reassembled_response(&mut client, &config).await?; + let response_a = timeout( + Duration::from_secs(1), + read_reassembled_response(&mut client, &config), + ) + .await??; + let response_b = timeout( + Duration::from_secs(1), + read_reassembled_response(&mut client, &config), + ) + .await??; let mut observed_responses = vec![response_a, response_b]; observed_responses.sort(); let mut expected_payloads = vec![payload_a.clone(), payload_b.clone()]; diff --git a/tests/steps/fragment_steps.rs b/tests/steps/fragment_steps.rs index 33d45f76..41ddf64f 100644 --- a/tests/steps/fragment_steps.rs +++ b/tests/steps/fragment_steps.rs @@ -194,12 +194,21 @@ fn then_reassembly_out_of_order(fragment_world: &mut FragmentWorld) -> TestResul Ok(()) } -#[then("the reassembler is buffering {expected:usize} messages")] -fn then_buffered_messages(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { +fn assert_buffered_messages(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { fragment_world.assert_buffered_messages(expected)?; Ok(()) } +#[then("the reassembler is buffering {expected:usize} message")] +fn then_buffered_message(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { + assert_buffered_messages(fragment_world, expected) +} + +#[then("the reassembler is buffering {expected:usize} messages")] +fn then_buffered_messages(fragment_world: &mut FragmentWorld, expected: usize) -> TestResult { + assert_buffered_messages(fragment_world, expected) +} + #[then("message {message:u64} is evicted")] fn then_message_evicted(fragment_world: &mut FragmentWorld, message: u64) -> TestResult { fragment_world.assert_evicted_message(message)?; From ca4703f19d00139791afb439a9424ff0a04a8d67 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 13 Feb 2026 17:57:53 +0000 Subject: [PATCH 4/7] Update code Co-authored-by: devboxerhub[bot] --- .../execplans/9-1-3-fragment-adapter-trait.md | 9 ++-- docs/users-guide.md | 1 - src/fragment/tests/reassembler_tests.rs | 43 +++++++++++-------- tests/features/fragment.feature | 4 +- tests/steps/fragment_steps.rs | 1 + 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md index 1ef95494..7cffa81f 100644 --- a/docs/execplans/9-1-3-fragment-adapter-trait.md +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -111,11 +111,10 @@ Success is observable when: versus `docs/roadmap.md` section 9.2. Impact: this milestone must include design-document corrections, not only code/test edits. -- Observation: `docs/behavioural-testing-in-rust-with-cucumber.md` is - historical and explicitly points to `rstest-bdd` guidance. Evidence: banner - at the top of that document. Impact: behavioural testing updates for this - feature should follow `docs/rstest-bdd-users-guide.md` conventions and - version updates. +- Observation: behavioural testing guidance for this repository now lives in + `docs/rstest-bdd-users-guide.md`. Evidence: current testing policy and + roadmap requirements for `rstest-bdd`. Impact: behavioural testing updates + for this feature should follow that guide's conventions and version updates. - Observation: the first version of the interleaved transport integration test assumed deterministic response ordering and intermittently timed out. diff --git a/docs/users-guide.md b/docs/users-guide.md index 4042aa7d..15db1e38 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -862,7 +862,6 @@ let cfg = FragmentationConfig::for_frame_budget( ).expect("frame budget too small for fragments"); let app = WireframeApp::new()? - .enable_fragmentation() .fragmentation(Some(cfg)) .route(42, handler)?; ``` diff --git a/src/fragment/tests/reassembler_tests.rs b/src/fragment/tests/reassembler_tests.rs index 7a130412..ff096283 100644 --- a/src/fragment/tests/reassembler_tests.rs +++ b/src/fragment/tests/reassembler_tests.rs @@ -6,6 +6,7 @@ use std::{ }; use bincode::{BorrowDecode, Encode}; +use rstest::{fixture, rstest}; use crate::fragment::{ FragmentError, @@ -18,9 +19,10 @@ use crate::fragment::{ ReassemblyError, }; -fn setup_reassembler_with_first_fragment( - message_id: u64, - first_payload: impl AsRef<[u8]>, +#[fixture] +fn reassembler_with_first_fragment( + #[default(1)] message_id: u64, + #[default(&[])] first_payload: &'static [u8], ) -> Reassembler { let mut reassembler = Reassembler::new( NonZeroUsize::new(8).expect("non-zero"), @@ -100,50 +102,53 @@ fn reassembler_returns_single_fragment_immediately() { assert_eq!(reassembler.buffered_len(), 0); } -#[test] -fn reassembler_accumulates_ordered_fragments() { - let mut reassembler = setup_reassembler_with_first_fragment(2, [5_u8, 6, 7]); +#[rstest] +fn reassembler_accumulates_ordered_fragments( + #[with(2, &[5_u8, 6, 7])] mut reassembler_with_first_fragment: Reassembler, +) { let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); - let complete = reassembler + let complete = reassembler_with_first_fragment .push(final_fragment, [8_u8, 9]) .expect("final fragment accepted") .expect("message should complete"); assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); - assert_eq!(reassembler.buffered_len(), 0); + assert_eq!(reassembler_with_first_fragment.buffered_len(), 0); } -#[test] -fn reassembler_rejects_out_of_order_and_drops_partial() { - let mut reassembler = setup_reassembler_with_first_fragment(3, [1_u8, 2]); +#[rstest] +fn reassembler_rejects_out_of_order_and_drops_partial( + #[with(3, &[1_u8, 2])] mut reassembler_with_first_fragment: Reassembler, +) { let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); - let err = reassembler + let err = reassembler_with_first_fragment .push(skipped, [3_u8]) .expect_err("out-of-order fragment must be rejected"); assert!(matches!( err, ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) )); - assert_eq!(reassembler.buffered_len(), 0); + assert_eq!(reassembler_with_first_fragment.buffered_len(), 0); } -#[test] -fn reassembler_suppresses_duplicate_fragment() { - let mut reassembler = setup_reassembler_with_first_fragment(31, [1_u8, 2]); +#[rstest] +fn reassembler_suppresses_duplicate_fragment( + #[with(31, &[1_u8, 2])] mut reassembler_with_first_fragment: Reassembler, +) { let duplicate = FragmentHeader::new(MessageId::new(31), FragmentIndex::zero(), false); let final_fragment = FragmentHeader::new(MessageId::new(31), FragmentIndex::new(1), true); assert!( - reassembler + reassembler_with_first_fragment .push(duplicate, [9_u8, 9]) .expect("duplicate fragment should be suppressed") .is_none() ); - assert_eq!(reassembler.buffered_len(), 1); + assert_eq!(reassembler_with_first_fragment.buffered_len(), 1); - let complete = reassembler + let complete = reassembler_with_first_fragment .push(final_fragment, [3_u8]) .expect("final fragment should complete message") .expect("message should be complete"); diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index a76445d8..80756b9c 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -82,7 +82,7 @@ Feature: Fragment metadata enforcement And fragment 0 for message 25 with 2 bytes arrives marked non-final Then no message has been reassembled yet And the reassembler is buffering 1 message - When fragment 1 for message 25 with 1 bytes arrives marked final + When fragment 1 for message 25 with 1 byte arrives marked final Then the reassembler outputs a payload of 3 bytes And the reassembler is buffering 0 messages @@ -99,6 +99,6 @@ Feature: Fragment metadata enforcement And fragment 1 for message 27 with 2 bytes arrives marked final Then the reassembler outputs a payload of 5 bytes And the reassembler is buffering 1 message - When fragment 1 for message 28 with 1 bytes arrives marked final + When fragment 1 for message 28 with 1 byte arrives marked final Then the reassembler outputs a payload of 5 bytes And the reassembler is buffering 0 messages diff --git a/tests/steps/fragment_steps.rs b/tests/steps/fragment_steps.rs index 41ddf64f..5a668b90 100644 --- a/tests/steps/fragment_steps.rs +++ b/tests/steps/fragment_steps.rs @@ -147,6 +147,7 @@ fn when_reassembler_fragment_non_final( #[when( "fragment {index:u32} for message {message:u64} with {len:usize} bytes arrives marked final" )] +#[when("fragment {index:u32} for message {message:u64} with {len:usize} byte arrives marked final")] fn when_reassembler_fragment_final( fragment_world: &mut FragmentWorld, index: u32, From 7829fe376cb68c8f5654d3decc0580b43698d82d Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 14 Feb 2026 23:15:24 +0000 Subject: [PATCH 5/7] docs(documentation): fix UK English spelling in user guide and execplan docs Correct multiple instances of "deserialization" to "deserialisation" and "documentation" to "documentation" to align with UK English spelling conventions in users-guide.md and 9-1-3-fragment-adapter-trait.md. Co-authored-by: devboxerhub[bot] --- docs/execplans/9-1-3-fragment-adapter-trait.md | 14 +++++++------- docs/users-guide.md | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md index 7cffa81f..b6437a05 100644 --- a/docs/execplans/9-1-3-fragment-adapter-trait.md +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -15,10 +15,10 @@ integration still has open hardening gaps called out by roadmap item 9.2.1: fragmentation is enabled by default, duplicate versus out-of-order handling is not explicit, purge scheduling ownership is implicit, and there is no public fragment-adapter abstraction tying these rules together. This plan introduces a -public `FragmentAdapter` trait and aligns runtime behaviour so fragmentation is -explicitly opt-in, purge control is public, duplicate and out-of-order policies -are deterministic, and edge cases (zero-length fragments and index overflow) -are defined and tested. +public `FragmentAdapter` trait and aligns runtime behaviour, so fragmentation +is explicitly opt-in, purge control is public, duplicate and out-of-order +policies are deterministic, and edge cases (zero-length fragments and index +overflow) are defined and tested. Success is observable when: @@ -222,7 +222,7 @@ Documentation currently needing alignment: Define the `FragmentAdapter` public contract in `src/fragment`, including purge methods and explicit result/error shapes for duplicate suppression, out-of-order fragments, zero-length fragments, and overflow paths. Update the -fragmentation design document first so code follows an agreed contract. +fragmentation design document first, so code follows an agreed contract. Go/no-go: @@ -234,7 +234,7 @@ Go/no-go: Implement the default adapter using existing `Fragmenter` + `Reassembler` logic, then switch app frame handling to use the adapter contract. Change -builder defaults so fragmentation is disabled unless explicitly configured. +builder defaults, so fragmentation is disabled unless explicitly configured. Update builder docs/comments and any helper defaults that currently turn fragmentation on implicitly. @@ -349,7 +349,7 @@ Acceptance is complete when all statements below are true: reassemble correctly. - Behavioural coverage: `rstest-bdd` scenarios validate fragment-series policy and any new public behaviour, running under v0.5.0 dependencies. -- Documentation parity: users guide and design docs describe the same behaviour +- Documentation parity: user guide and design docs describe the same behaviour that tests verify. - Roadmap update: `docs/roadmap.md` item 9.2.1 and all requested sub-items are checked as done. diff --git a/docs/users-guide.md b/docs/users-guide.md index 15db1e38..6ed5cec4 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -118,7 +118,7 @@ default), enforces per-frame read timeouts, and writes responses. Serialization helpers `send_response` and `send_response_framed` (or `send_response_framed_with_codec` for custom codecs) return typed `SendError` variants when encoding or I/O fails, and the connection closes after ten -consecutive deserialization errors.[^6][^7] +consecutive deserialisation errors.[^6][^7] ### Custom frame codecs @@ -338,7 +338,7 @@ that meets the trait bounds.[^4] When `FrameMetadata::parse` succeeds, the framework extracts identifiers from metadata without deserializing the payload. If parsing fails, it falls back to -full deserialization.[^9][^6] Application messages implement the `Message` +full deserialisation.[^9][^6] Application messages implement the `Message` trait, gaining `to_bytes` and `from_bytes` helpers that use bincode with the standard configuration.[^10] @@ -841,7 +841,7 @@ handlers, so handlers continue to work with complete `Envelope` values.[^6] Layering order is fixed. Outbound processing runs serializer → fragmentation → codec wrapping. Inbound processing runs codec decode → fragment reassembly → -deserialization. +deserialisation. Fragmented messages enforce two guards: `max_message_size` caps the total reassembled payload, and `reassembly_timeout` evicts stale partial messages. From c90c50fcad5af122ab101d1b48487c08b97c8325 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 14 Feb 2026 23:50:33 +0000 Subject: [PATCH 6/7] docs(users-guide): fix spelling of 'adapter' in users guide Corrected instances of "adaptor" to "adapter" for consistent terminology in the users guide documentation. Co-authored-by: devboxerhub[bot] --- docs/users-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/users-guide.md b/docs/users-guide.md index 6ed5cec4..5d7e4235 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -364,7 +364,7 @@ The standalone `Fragmenter` helper now slices oversized payloads into capped fragments while stamping the shared `MessageId` and sequential `FragmentIndex`. Each call returns a `FragmentBatch` that reports whether the message required fragmentation and yields individual `FragmentFrame` values for serialization or -logging. This keeps transport experiments lightweight while the full adaptor +logging. This keeps transport experiments lightweight while the full adapter layer evolves. The helper is fallible—`FragmentationError` surfaces encoding failures or index overflows—so production code should bubble the error up or log it rather than unwrapping. @@ -675,7 +675,7 @@ async fn handle_upload(parts: RequestParts, body: RequestBodyStream) { } ``` -The `RequestBodyReader` adaptor implements `AsyncRead`, allowing protocol +The `RequestBodyReader` adapter implements `AsyncRead`, allowing protocol crates to reuse existing parsers. For raw stream access, use the `RequestBodyStream` directly with `StreamExt` methods: @@ -1285,7 +1285,7 @@ Phase out older message versions without breaking clients: - Emit version N on egress so clients observe a single schema. - Publish metrics and logs describing legacy usage to support operator dashboards.[^33][^8] -- Remove adaptors once the sunset window ends. +- Remove adapters once the sunset window ends. ```rust use std::sync::Arc; From 1dd23ea363e817e2e5f8ca05d15cd36dd4cd0961 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 15 Feb 2026 00:28:15 +0000 Subject: [PATCH 7/7] docs(docs): Fix spelling and punctuation in documentation - Corrected comma usage for clarity in fragmentation design doc. - Fixed spelling from 'deserialisation' to 'deserialization' in multiple places. - Improved wording and formatting in execution plans and user guide docs. Co-authored-by: devboxerhub[bot] --- docs/execplans/9-1-3-fragment-adapter-trait.md | 7 ++++--- docs/users-guide.md | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/execplans/9-1-3-fragment-adapter-trait.md b/docs/execplans/9-1-3-fragment-adapter-trait.md index b6437a05..011d96b3 100644 --- a/docs/execplans/9-1-3-fragment-adapter-trait.md +++ b/docs/execplans/9-1-3-fragment-adapter-trait.md @@ -25,7 +25,7 @@ Success is observable when: - `WireframeApp::new()` no longer fragments unless the caller explicitly opts in. - a public fragment adapter API exists and exposes purge control. -- interleaved fragment streams reassemble correctly while duplicate and +- interleaved fragment streams reassemble correctly, while duplicate and out-of-order series follow documented policies. - unit tests (`rstest`), integration tests, and behavioural tests (`rstest-bdd` v0.5.0) cover the new rules. @@ -42,8 +42,9 @@ Success is observable when: - Use `rstest-bdd` v0.5.0 for behavioural test coverage required by this feature. - Update `docs/users-guide.md` for any public API change made by this work. -- Record decisions in `docs/generic-message-fragmentation-and-re-assembly- - design.md` (and companion docs when composition guidance changes). +- Record decisions in the fragmentation design document + `docs/generic-message-fragmentation-and-re-assembly-design.md` (and companion + docs when composition guidance changes). ## Tolerances (exception triggers) diff --git a/docs/users-guide.md b/docs/users-guide.md index 5d7e4235..37abada1 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -118,7 +118,7 @@ default), enforces per-frame read timeouts, and writes responses. Serialization helpers `send_response` and `send_response_framed` (or `send_response_framed_with_codec` for custom codecs) return typed `SendError` variants when encoding or I/O fails, and the connection closes after ten -consecutive deserialisation errors.[^6][^7] +consecutive deserialization errors.[^6][^7] ### Custom frame codecs @@ -131,7 +131,7 @@ A codec implementation must: - Define a `Frame` type and paired decoder/encoder implementations that return `std::io::Error` on failure. - Return only the logical payload bytes from `frame_payload` so metadata parsing - and deserialisation run against the right buffer. + and deserialization run against the right buffer. - Wrap outbound payloads with `wrap_payload(&self, Bytes)`, adding any protocol headers or metadata required by the wire format. - Provide `correlation_id` when the protocol stores it outside the payload; @@ -338,7 +338,7 @@ that meets the trait bounds.[^4] When `FrameMetadata::parse` succeeds, the framework extracts identifiers from metadata without deserializing the payload. If parsing fails, it falls back to -full deserialisation.[^9][^6] Application messages implement the `Message` +full deserialization.[^9][^6] Application messages implement the `Message` trait, gaining `to_bytes` and `from_bytes` helpers that use bincode with the standard configuration.[^10] @@ -841,7 +841,7 @@ handlers, so handlers continue to work with complete `Envelope` values.[^6] Layering order is fixed. Outbound processing runs serializer → fragmentation → codec wrapping. Inbound processing runs codec decode → fragment reassembly → -deserialisation. +deserialization. Fragmented messages enforce two guards: `max_message_size` caps the total reassembled payload, and `reassembly_timeout` evicts stale partial messages.