diff --git a/Cargo.lock b/Cargo.lock index 11a67702..fbfdc08e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,7 +268,7 @@ dependencies = [ "maybe-owned", "rustix 1.0.8", "rustix-linux-procfs", - "windows-sys 0.59.0", + "windows-sys 0.52.0", "winx", ] @@ -512,7 +512,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.52.0", ] [[package]] @@ -607,7 +607,7 @@ checksum = "94e7099f6313ecacbe1256e8ff9d617b75d1bcb16a6fddef94866d225a01a14a" dependencies = [ "io-lifetimes", "rustix 1.0.8", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -1044,7 +1044,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2285ddfe3054097ef4b2fe909ef8c3bcd1ea52a8f0d274416caebeef39f04a65" dependencies = [ "io-lifetimes", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -1761,9 +1761,9 @@ dependencies = [ [[package]] name = "rstest-bdd" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e741d97bce6ea0a7d0f074716041e0d0eebe6ab9edcd243e7d12f9fd2b5e8b6" +checksum = "138d8e4f97e16906ebeb0bfb12d2c94f0b05913a96fc522111bec62ca8328522" dependencies = [ "ctor", "derive_more 0.99.20", @@ -1785,9 +1785,9 @@ dependencies = [ [[package]] name = "rstest-bdd-macros" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7e3b51c032a6174f82d87843a5e62cfd08ce4606005eafde07ed12561d73196" +checksum = "fe104196f61dc8911a8da1b10005e9401e7c2e14ad9302e2de6688311c0beec7" dependencies = [ "camino", "cap-std", @@ -1809,9 +1809,9 @@ dependencies = [ [[package]] name = "rstest-bdd-patterns" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75d730afd8727e5b18bd276ebf5ea4c881760138762d0cd7d415dc6b6662c6c6" +checksum = "39abaa69316cdbbad0512dc0f37b704b09e4cdb5018d80f197cbdb77a2269d06" dependencies = [ "gherkin", "regex", @@ -1820,9 +1820,9 @@ dependencies = [ [[package]] name = "rstest-bdd-policy" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d256efeb01f08e281cef9cd1e54dab33d8778e871090f5fa49b7a9a70f0caf81" +checksum = "88f2ca584f7d9d359a09f616900be015302c959d58c2c2da6c075573352dfa0d" [[package]] name = "rstest_macros" @@ -1930,7 +1930,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -1943,7 +1943,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.60.2", + "windows-sys 0.52.0", ] [[package]] @@ -2305,7 +2305,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.8", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2866,7 +2866,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2995,15 +2995,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.2", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -3157,7 +3148,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d" dependencies = [ "bitflags", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6b778522..9044cca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,8 +49,8 @@ socket2 = "0.6.0" [dev-dependencies] rstest = "0.26.1" -rstest-bdd = "0.4.0" -rstest-bdd-macros = { version = "0.4.0", features = ["strict-compile-time-validation"] } +rstest-bdd = "0.5.0" +rstest-bdd-macros = { version = "0.5.0", features = ["strict-compile-time-validation"] } wireframe = { path = ".", features = ["test-helpers"] } wireframe_testing = { path = "./wireframe_testing" } logtest = "2.0.0" diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 18f1f821..7184514d 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -135,6 +135,18 @@ sequenceDiagram end ``` +#### Implementation decisions (2026-02-12) + +- The inbound integration seam for this ADR is `src/app/connection.rs` and + `src/app/frame_handling/`, not `src/connection/`, because inbound decode and + dispatch already occur in the app-layer connection loop. +- Message-assembly parsing, length, and continuity violations are treated as + inbound deserialization failures and therefore use the same threshold-based + invalid-data policy as other malformed inbound input. +- Idle timeout ticks purge expired entries from both transport fragmentation + and message-assembly state, so stale partial assemblies do not linger across + long-lived connections. + At a minimum, the hook must allow a protocol to provide: - a per-frame header parser (including “first frame” versus “continuation diff --git a/docs/execplans/8-2-5-integrate-with-the-connection-actor-inbound-path.md b/docs/execplans/8-2-5-integrate-with-the-connection-actor-inbound-path.md new file mode 100644 index 00000000..f582035e --- /dev/null +++ b/docs/execplans/8-2-5-integrate-with-the-connection-actor-inbound-path.md @@ -0,0 +1,372 @@ +# Integrate MessageAssembler into the inbound connection path (8.2.5/8.2.6) + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: COMPLETE + +No `PLANS.md` exists in this repository as of 2026-02-12. + +## Purpose / big picture + +Wireframe already exposes a `MessageAssembler` trait and a +`MessageAssemblyState`, but the inbound runtime path does not invoke them yet. +After this work, inbound packets will be processed in the required order: +transport decode, transport reassembly, protocol message assembly, then handler +routing. + +Success is observable when a configured assembler can combine interleaved +message-key streams on a live connection, reject continuity violations +predictably, purge stale partial assemblies on timeout, and dispatch only +completed payloads to handlers. Unit tests (`rstest`) and behavioural tests +(`rstest-bdd` v0.5.0) must prove these behaviours. + +## Constraints + +- Preserve the composition order from ADR 0002: + decode -> transport reassembly -> message assembly -> handler dispatch. +- Keep transport fragmentation logic in `src/app/fragmentation_state.rs` + separate from protocol message assembly logic in `src/message_assembler/` and + `src/app/frame_handling/`. +- Do not change `MessageAssembler` trait method signatures unless explicitly + approved. +- Do not add new external dependencies beyond updating existing + `rstest-bdd` crates to v0.5.0. +- Maintain current behaviour when no assembler is configured. +- Keep all new or modified source files under 400 lines. +- Use `rstest` for unit tests and `rstest-bdd` v0.5.0 for behavioural tests. +- Record design decisions in a relevant design document (`docs/adr-002-...` + and supporting design docs where needed). +- Update `docs/users-guide.md` for any library-consumer visible behaviour + change. +- Mark `docs/roadmap.md` items 8.2.5 and 8.2.6 as done only after all quality + gates pass. + +## Tolerances (exception triggers) + +- Scope: if implementation requires more than 18 files or more than 900 net + lines of code (LOC), stop and escalate. +- Interface: if public API changes beyond removing the existing + "not yet integrated" caveat are required, stop and escalate. +- Dependencies: if work requires adding any crate beyond the planned + `rstest-bdd` version bump, stop and escalate. +- Behavioural ambiguity: if metadata delivery semantics to handlers cannot be + completed without additional API changes, stop and present options. +- Iterations: if the same failing test/lint issue persists after 3 focused + fix attempts, stop and escalate. +- Time: if any single stage takes more than 4 hours elapsed effort, stop and + escalate with status and options. + +## Risks + +- Risk: The term "connection actor inbound path" is ambiguous because + `src/connection/` is outbound-focused, while inbound routing lives in + `src/app/connection.rs`. Severity: medium Likelihood: high Mitigation: + Implement in `src/app/connection.rs` and record this naming clarification in + the design docs. + +- Risk: Message assembly error handling may diverge from existing inbound + decode-failure policy. Severity: high Likelihood: medium Mitigation: Route + parse/continuity failures through the same deserialization failure tracker + semantics (`InvalidData` threshold handling). + +- Risk: Timeout behaviour can become flaky in behavioural tests if tied to wall + clock timing. Severity: medium Likelihood: medium Mitigation: Use + deterministic fixture-controlled time progression where possible; keep + network waits bounded and explicit. + +- Risk: Upgrading `rstest-bdd` from 0.4.0 to 0.5.0 may require macro or step + registration adjustments. Severity: medium Likelihood: medium Mitigation: + perform the version bump first and fix compile-time macro breakage before + changing runtime logic. + +## Progress + +- [x] (2026-02-12 00:00Z) Drafted ExecPlan for roadmap items 8.2.5 and 8.2.6. +- [x] Confirmed and documented the exact inbound integration seam in + `src/app/connection.rs` and `src/app/frame_handling/`. +- [x] Upgraded behavioural test dependencies to `rstest-bdd` v0.5.0 and + restored + green compilation. +- [x] Implemented message assembly application after transport reassembly. +- [x] Added `rstest` unit tests for interleaving, ordering violations, and + timeout purging in the inbound runtime path. +- [x] Added `rstest-bdd` v0.5.0 behavioural tests that exercise the same + runtime + behaviours through the test harness. +- [x] Updated design docs and `docs/users-guide.md` for the new integrated + behaviour. +- [x] Marked roadmap entries 8.2.5 and 8.2.6 as done. +- [x] Ran full quality gates (`fmt`, markdown lint, Rust format check, lint, + tests) and captured logs with `tee`. + +## Surprises & discoveries + +- Observation: `WireframeApp::with_message_assembler` is currently a stored + configuration only; inbound code in `src/app/connection.rs` does not yet call + the hook. Evidence: no `message_assembler` usage in inbound frame handling. + Impact: 8.2.5 requires runtime integration work, not only API exposure. + +- Observation: `src/connection/` is centred on outbound frame delivery and does + not own inbound decode/reassembly. Evidence: `src/connection/mod.rs` actor + loop handles push/response streams, while inbound decode happens in + `src/app/connection.rs`. Impact: roadmap wording must be interpreted as + inbound path in `WireframeApp` connection handling. + +- Observation: the workspace currently pins `rstest-bdd = "0.4.0"` in + `Cargo.toml`. Evidence: `Cargo.toml` dev-dependencies. Impact: this feature + must include the requested upgrade to v0.5.0. + +- Observation: adding the new assembly unit coverage to + `src/app/frame_handling/tests.rs` pushed the file past the repository's + 400-line guidance. Evidence: local line-count check during implementation. + Impact: extracted inbound-assembly tests to + `src/app/frame_handling/assembly_tests.rs` to keep files maintainable. + +## Decision log + +- Decision: Integrate message assembly in `src/app/connection.rs` using + `frame_handling` helpers, immediately after `reassemble_if_needed` and before + handler lookup. Rationale: this is the existing inbound choke point and + preserves ADR 0002 layer ordering. Date/Author: 2026-02-12 / Codex. + +- Decision: Keep route selection (`Envelope.id`) and correlation inheritance + semantics unchanged; message assembly changes only payload readiness and + buffering behaviour for this milestone. Rationale: avoids unplanned public + API expansion while satisfying 8.2.5/8.2.6 scope. Date/Author: 2026-02-12 / + Codex. + +- Decision: Treat message assembly parse/continuity failures as inbound + deserialization failures under the existing threshold policy. Rationale: + aligns with hardening guidance and keeps invalid inbound data handling + deterministic. Date/Author: 2026-02-12 / Codex. + +## Outcomes & retrospective + +Completed. + +- Inbound runtime now applies message assembly after transport reassembly in + `src/app/connection.rs` via `src/app/frame_handling/assembly.rs`. +- Failure handling is unified with the existing deserialization-failure policy + for parse, continuity, and declared-length errors. +- Unit coverage includes interleaving, ordering violations, and timeout purge + in `src/app/frame_handling/assembly_tests.rs`. +- Behavioural coverage includes the same scenarios through + `tests/features/message_assembly_inbound.feature` and associated steps and + fixtures. +- Documentation and roadmap were updated to remove stale caveats and mark + 8.2.5/8.2.6 complete. +- Quality gates passed with `tee` logs: + `/tmp/wireframe-make-fmt.log`, `/tmp/wireframe-make-markdownlint.log`, + `/tmp/wireframe-make-check-fmt.log`, `/tmp/wireframe-make-lint.log`, + `/tmp/wireframe-make-test.log`, `/tmp/wireframe-make-nixie.log`. + +## Context and orientation + +The inbound runtime pipeline in `src/app/connection.rs` is now: + +- `decode_envelope` converts codec frames into `Envelope` values. +- `frame_handling::reassemble_if_needed` applies transport-level fragment + reassembly (`src/app/frame_handling/reassembly.rs`). +- `frame_handling::assemble_if_needed` applies protocol-level assembly. +- Completed envelopes are dispatched to handlers via + `frame_handling::forward_response`. + +Design references that must stay aligned: + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` +- `docs/generic-message-fragmentation-and-re-assembly-design.md` +- `docs/multi-packet-and-streaming-responses-design.md` +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` +- `docs/rust-testing-with-rstest-fixtures.md` +- `docs/reliable-testing-in-rust-via-dependency-injection.md` +- `docs/rstest-bdd-users-guide.md` +- `docs/rust-doctest-dry-guide.md` + +Testing topology to extend: + +- Unit tests under `src/app/` and `src/message_assembler/` use `rstest`. +- Behavioural tests live under `tests/features/`, `tests/fixtures/`, + `tests/steps/`, and `tests/scenarios/` and run through the `bdd` test target. + +## Plan of work + +Stage A focuses on test harness readiness and interface stability. Upgrade +`rstest-bdd` and `rstest-bdd-macros` to v0.5.0, resolve any compile-time macro +or fixture wiring updates, and ensure existing behavioural tests are green +before runtime changes. Go/no-go: do not start runtime integration until the +BDD harness compiles on v0.5.0. + +Stage B adds inbound message assembly plumbing. Introduce a dedicated helper in +`src/app/frame_handling/` (for example `assembly.rs`) that consumes complete +post-fragmentation envelopes, invokes the configured `MessageAssembler`, feeds +`MessageAssemblyState`, and returns either `None` (assembly still in progress) +or a completed envelope for dispatch. Keep this helper small and explicitly +route failures through existing deserialization failure accounting. + +Stage C integrates the helper into connection processing. Extend connection +state in `src/app/connection.rs` to hold optional message assembly runtime +state, invoke the new helper after transport reassembly, and purge expired +assemblies on idle read timeouts alongside existing fragmentation purging. +Go/no-go: if composition order or failure policy differs from ADR 0002, stop +and update the Decision Log before proceeding. + +Stage D adds tests. Unit tests (`rstest`) must cover: interleaved assembly +across keys in inbound flow, ordering-violation rejection in inbound flow, and +idle timeout purge behaviour. Behavioural tests (`rstest-bdd` v0.5.0) must +cover the same user-visible behaviours through feature scenarios and step +fixtures. + +Stage E updates documentation and roadmap status. Remove outdated caveats in +`docs/users-guide.md`, add implementation decisions to ADR/design docs, and +mark roadmap entries 8.2.5 and 8.2.6 as done only after all gates pass. + +## Concrete steps + +1. Baseline and dependency upgrade. + + - Edit `Cargo.toml` dev-dependencies: + - `rstest-bdd = "0.5.0"` + - `rstest-bdd-macros = { version = "0.5.0", features =` + `["strict-compile-time-validation"] }` + - Refresh lockfile: + + cargo update -p rstest-bdd --precise 0.5.0 + cargo update -p rstest-bdd-macros --precise 0.5.0 + +2. Add inbound assembly helper module. + + - Create or extend `src/app/frame_handling/assembly.rs` with: + - a function that accepts `Envelope`, optional assembler reference, + mutable `MessageAssemblyState`, and failure tracker context; + - strict payload slicing checks (`header_len`, `metadata_len`, `body_len`); + - first-frame and continuation handling through `FirstFrameInput` and + `accept_continuation_frame`; + - conversion of completed assembly output into an `Envelope` payload. + - Export helper from `src/app/frame_handling/mod.rs`. + +3. Wire runtime state in connection handling. + + - Update `src/app/connection.rs` to: + - hold optional inbound message assembly state per connection; + - call assembly helper after `reassemble_if_needed` and before route + lookup; + - purge expired message assemblies during read-timeout ticks; + - keep no-assembler path as pass-through. + +4. Add or extend unit tests with `rstest`. + + - Add targeted tests in `src/app/frame_handling/tests.rs` and/or + `src/app/connection/tests.rs`: + - interleaved keyed assembly dispatches only completed messages; + - out-of-order continuation is rejected and does not dispatch; + - timeout purge evicts stale partial state and later continuation fails as + missing-first-frame. + +5. Add behavioural tests with `rstest-bdd` v0.5.0. + + - Add feature file, fixture, steps, and scenario registrations under: + - `tests/features/` + - `tests/fixtures/` + - `tests/steps/` + - `tests/scenarios/` + - Ensure scenarios cover interleaving, ordering violations, and timeout + behaviour through an app-level inbound flow. + +6. Update docs and roadmap. + + - Update `docs/adr-002-streaming-requests-and-shared-message-assembly.md` + with the concrete inbound integration and failure-handling decisions. + - Update supporting design docs listed above to remove "planned" wording for + 8.2.5 and document the implemented composition. + - Update `docs/users-guide.md` to describe active inbound integration and + expected behaviour for configured assemblers. + - Update `docs/roadmap.md` to mark 8.2.5 and 8.2.6 as done. + +## Validation and acceptance + +Acceptance conditions: + +- Inbound runtime applies message assembly after transport fragmentation. +- Interleaved keyed assemblies work in inbound dispatch path. +- Ordering violations are rejected deterministically and follow existing + invalid-data handling policy. +- Timeout purging removes stale partial assemblies. +- Unit coverage uses `rstest`; behavioural coverage uses `rstest-bdd` v0.5.0. +- Design docs, user guide, and roadmap are updated and consistent. + +Run from repository root, capturing complete logs: + + set -o pipefail + make fmt 2>&1 | tee /tmp/wireframe-fmt.log + + set -o pipefail + make markdownlint 2>&1 | tee /tmp/wireframe-markdownlint.log + + set -o pipefail + make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log + + set -o pipefail + make lint 2>&1 | tee /tmp/wireframe-lint.log + + set -o pipefail + make test 2>&1 | tee /tmp/wireframe-test.log + +Targeted checks before full suite: + + set -o pipefail + cargo test --test bdd --all-features message_assembly 2>&1 | tee /tmp/wireframe-bdd-message-assembly.log + + set -o pipefail + cargo test --all-features connection_fragmentation 2>&1 | tee /tmp/wireframe-connection-fragmentation.log + +If Mermaid diagrams are touched, also run: + + set -o pipefail + make nixie 2>&1 | tee /tmp/wireframe-nixie.log + +## Idempotence and recovery + +All plan steps are additive and can be re-run safely. If a stage fails: + +- fix the specific issue; +- re-run only the affected command group first; +- then re-run full gates. + +Avoid destructive Git operations. If rollback is needed, revert only the +specific files introduced by this feature branch. + +## Artifacts and notes + +Expected deliverables: + +- inbound message assembly integration in `src/app/connection.rs` and + `src/app/frame_handling/`; +- `rstest` unit tests for inbound interleaving, ordering violation, timeout; +- `rstest-bdd` v0.5.0 behavioural scenarios for the same behaviours; +- updated ADR/design docs and user guide; +- roadmap entries 8.2.5 and 8.2.6 checked as complete. + +## Interfaces and dependencies + +Public API expectations after completion: + +- `WireframeApp::with_message_assembler(...)` remains the configuration entry + point and is now operational in inbound runtime processing. +- `WireframeApp::message_assembler()` remains available for inspection. +- Existing message assembler public types remain stable: + `MessageAssembler`, `ParsedFrameHeader`, `FrameHeader`, `FirstFrameHeader`, + `ContinuationFrameHeader`, `MessageAssemblyState`, and related error types. + +Dependency expectation: + +- Behavioural test stack uses `rstest-bdd` v0.5.0 and matching + `rstest-bdd-macros` v0.5.0. + +## Revision note + +Initial draft created for roadmap items 8.2.5 and 8.2.6. This version defines +scope, constraints, staged execution, validation commands, documentation +updates, and the final roadmap completion step. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 769ff22b..ab4f9d6d 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -463,8 +463,9 @@ packet payloads and focuses on protocol continuity rules. Wireframe exposes the hook surface as `wireframe::message_assembler::MessageAssembler`, and applications register an implementation via `WireframeApp::with_message_assembler`. The hook is stored -on the builder today and integrated into the inbound pipeline in roadmap item -8.2.5. +on the app instance and is applied in the inbound runtime pipeline in +`WireframeApp` connection handling, after transport reassembly and before +handler dispatch. ### 9.3 Memory budget integration diff --git a/docs/roadmap.md b/docs/roadmap.md index a0d641ec..ea30ce16 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -284,9 +284,9 @@ and standardized per-connection memory budgets. - [x] 8.2.3. Add message key support for multiplexing interleaved assemblies. - [x] 8.2.4. Implement continuity validation (ordering, missing frames, and duplicate frames). -- [ ] 8.2.5. Integrate with the connection actor's inbound path, applying +- [x] 8.2.5. Integrate with the connection actor's inbound path, applying after transport fragmentation. -- [ ] 8.2.6. Write tests for interleaved assembly, ordering violations, and +- [x] 8.2.6. Write tests for interleaved assembly, ordering violations, and timeout behaviour. ### 8.3. Per-connection memory budgets diff --git a/docs/users-guide.md b/docs/users-guide.md index 8874f372..2c070250 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -485,12 +485,15 @@ let _app = WireframeApp::new() .with_message_assembler(DemoAssembler); ``` -Note: the hook is stored on the application today but is wired into the inbound -connection path in roadmap item 8.2.5. Until that integration lands, protocol -crates can use the trait for shared parsing logic and tests. +When configured, this hook now runs on the inbound connection path after +transport fragmentation reassembly and before handler dispatch. Incomplete +assemblies remain buffered per message key until completion or timeout eviction. + +Message-assembly parsing and continuity failures are treated as inbound +deserialization failures and follow the existing failure threshold policy. `WireframeApp::message_assembler` returns the configured hook as an -`Option<&Arc>` if you need to access it directly. +`Option<&Arc>` if direct access is required. #### Message key multiplexing (8.2.3) @@ -502,6 +505,8 @@ different logical messages arrive on the same connection: use std::{num::NonZeroUsize, time::Duration}; use wireframe::message_assembler::{ ContinuationFrameHeader, + EnvelopeId, + EnvelopeRouting, FirstFrameHeader, FirstFrameInput, FrameSequence, @@ -522,7 +527,8 @@ let first1 = FirstFrameHeader { total_body_len: Some(15), is_last: false, }; -let input1 = FirstFrameInput::new(&first1, vec![], b"hello") +let routing1 = EnvelopeRouting { envelope_id: EnvelopeId(1), correlation_id: None }; +let input1 = FirstFrameInput::new(&first1, routing1, vec![], b"hello") .expect("header lengths match"); state.accept_first_frame(input1)?; @@ -534,7 +540,8 @@ let first2 = FirstFrameHeader { total_body_len: None, is_last: false, }; -let input2 = FirstFrameInput::new(&first2, vec![], b"world") +let routing2 = EnvelopeRouting { envelope_id: EnvelopeId(2), correlation_id: None }; +let input2 = FirstFrameInput::new(&first2, routing2, vec![], b"world") .expect("header lengths match"); state.accept_first_frame(input2)?; diff --git a/src/app/connection.rs b/src/app/connection.rs index b8f2cb34..c9416ed2 100644 --- a/src/app/connection.rs +++ b/src/app/connection.rs @@ -24,14 +24,19 @@ use crate::{ fragment::FragmentationConfig, frame::FrameMetadata, message::Message, + message_assembler::MessageAssemblyState, middleware::HandlerService, serializer::Serializer, }; -fn purge_expired(fragmentation: &mut Option) { +fn purge_expired( + fragmentation: &mut Option, + message_assembly: &mut Option, +) { if let Some(frag) = fragmentation.as_mut() { frag.purge_expired(); } + frame_handling::purge_expired_assemblies(message_assembly); } /// Maximum consecutive deserialization failures before closing a connection. @@ -48,6 +53,7 @@ where deser_failures: &'a mut u32, routes: &'a HashMap>, fragmentation: &'a mut Option, + message_assembly: &'a mut Option, } impl WireframeApp @@ -254,6 +260,9 @@ where framed.read_buffer_mut().reserve(max_frame_length); let mut deser_failures = 0u32; let mut fragmentation = self.fragmentation_config().map(FragmentationState::new); + let mut message_assembly = self.message_assembler.as_ref().map(|_| { + frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length) + }); let timeout_dur = Duration::from_millis(self.read_timeout_ms); loop { @@ -266,6 +275,7 @@ where deser_failures: &mut deser_failures, routes, fragmentation: &mut fragmentation, + message_assembly: &mut message_assembly, }, &codec, ) @@ -275,7 +285,7 @@ where Ok(None) => break, Err(_) => { debug!("read timeout elapsed; continuing to wait for next frame"); - purge_expired(&mut fragmentation); + purge_expired(&mut fragmentation, &mut message_assembly); } } } @@ -297,10 +307,17 @@ where deser_failures, routes, fragmentation, + message_assembly, } = ctx; crate::metrics::inc_frames(crate::metrics::Direction::Inbound); - let Some(env) = self.decode_envelope(frame, deser_failures)? else { + let Some(env) = frame_handling::decode_envelope::( + self.parse_envelope(F::frame_payload(frame)), + frame, + deser_failures, + MAX_DESER_FAILURES, + )? + else { return Ok(()); }; let Some(env) = frame_handling::reassemble_if_needed( @@ -312,6 +329,20 @@ where else { return Ok(()); }; + let Some(env) = frame_handling::assemble_if_needed( + frame_handling::AssemblyRuntime::new(self.message_assembler.as_ref(), message_assembly), + deser_failures, + env, + MAX_DESER_FAILURES, + )? + else { + return Ok(()); + }; + + // Reset failure counter only after the entire inbound pipeline + // (decode, reassemble, assemble) succeeds, so that assembly-stage + // failures accumulate towards the threshold. + *deser_failures = 0; if let Some(service) = routes.get(&env.id) { frame_handling::forward_response( @@ -334,51 +365,6 @@ where Ok(()) } - - /// Increment deserialization failures and close the connection if the threshold is exceeded. - fn handle_decode_failure( - deser_failures: &mut u32, - correlation_id: Option, - context: &str, - err: impl std::fmt::Debug, - ) -> Result, io::Error> { - *deser_failures += 1; - warn!("{context}: correlation_id={correlation_id:?}, error={err:?}"); - crate::metrics::inc_deser_errors(); - if *deser_failures >= MAX_DESER_FAILURES { - warn!("closing connection after {deser_failures} deserialization failures: {context}"); - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "too many deserialization failures", - )); - } - Ok(None) - } - - fn decode_envelope( - &self, - frame: &F::Frame, - deser_failures: &mut u32, - ) -> Result, io::Error> { - match self.parse_envelope(F::frame_payload(frame)) { - Ok((mut env, _)) => { - if env.correlation_id.is_none() { - env.correlation_id = F::correlation_id(frame); - } - *deser_failures = 0; - Ok(Some(env)) - } - Err(err) => { - let correlation_id = F::correlation_id(frame); - Self::handle_decode_failure( - deser_failures, - correlation_id, - "failed to decode message", - err, - ) - } - } - } } #[cfg(test)] diff --git a/src/app/connection/tests.rs b/src/app/connection/tests.rs index e7f8763d..d1f6cb63 100644 --- a/src/app/connection/tests.rs +++ b/src/app/connection/tests.rs @@ -75,14 +75,23 @@ fn decode_envelope_tracks_failures_and_logs_correlation_id() { let mut deser_failures = 0_u32; for _ in 1..MAX_DESER_FAILURES { - let result = app.decode_envelope(&frame, &mut deser_failures); + let result = frame_handling::decode_envelope::( + app.parse_envelope(BadCodec::frame_payload(&frame)), + &frame, + &mut deser_failures, + MAX_DESER_FAILURES, + ); assert!(result.is_ok(), "expected recoverable decode failure"); assert!(result.expect("decode result").is_none()); } - let err = app - .decode_envelope(&frame, &mut deser_failures) - .expect_err("expected decode failure to close connection"); + let err = frame_handling::decode_envelope::( + app.parse_envelope(BadCodec::frame_payload(&frame)), + &frame, + &mut deser_failures, + MAX_DESER_FAILURES, + ) + .expect_err("expected decode failure to close connection"); assert_eq!(err.kind(), io::ErrorKind::InvalidData); let mut found = false; diff --git a/src/app/frame_handling/assembly.rs b/src/app/frame_handling/assembly.rs new file mode 100644 index 00000000..18222d3a --- /dev/null +++ b/src/app/frame_handling/assembly.rs @@ -0,0 +1,272 @@ +//! Helpers for protocol-level message assembly on the inbound path. + +use std::{io, num::NonZeroUsize, sync::Arc, time::Duration}; + +use log::debug; + +use super::core::DeserFailureTracker; +use crate::{ + app::{Envelope, builder_defaults::default_fragmentation}, + codec::clamp_frame_length, + fragment::FragmentationConfig, + message_assembler::{ + AssembledMessage, + ContinuationFrameHeader, + EnvelopeRouting, + FirstFrameHeader, + FirstFrameInput, + FrameHeader, + MessageAssembler, + MessageAssemblyState, + }, +}; + +/// Default timeout used when no fragmentation-derived timeout is available. +const DEFAULT_MESSAGE_ASSEMBLY_TIMEOUT: Duration = Duration::from_secs(30); + +/// Borrowed inbound runtime state used for message assembly. +pub(crate) struct AssemblyRuntime<'a> { + pub(crate) assembler: Option<&'a Arc>, + pub(crate) state: &'a mut Option, +} + +impl<'a> AssemblyRuntime<'a> { + /// Create assembly runtime accessors for one inbound frame. + #[must_use] + pub(crate) fn new( + assembler: Option<&'a Arc>, + state: &'a mut Option, + ) -> Self { + Self { assembler, state } + } +} + +/// Build a connection-scoped message assembly state from known budgets. +#[must_use] +pub(crate) fn new_message_assembly_state( + fragmentation: Option, + frame_budget: usize, +) -> MessageAssemblyState { + let config = fragmentation.or_else(|| default_fragmentation(frame_budget)); + let max_message_size = config.map_or_else( + || NonZeroUsize::new(clamp_frame_length(frame_budget)).unwrap_or(NonZeroUsize::MIN), + |cfg| cfg.max_message_size, + ); + let timeout = config.map_or(DEFAULT_MESSAGE_ASSEMBLY_TIMEOUT, |cfg| { + cfg.reassembly_timeout + }); + + MessageAssemblyState::new(max_message_size, timeout) +} + +/// Purge stale in-flight assemblies. +pub(crate) fn purge_expired_assemblies(assembly: &mut Option) { + let Some(state) = assembly.as_mut() else { + return; + }; + + let evicted = state.purge_expired(); + if !evicted.is_empty() { + debug!( + "purged expired message assemblies: count={}, keys={evicted:?}", + evicted.len() + ); + } +} + +/// Apply protocol-level message assembly to a complete post-fragment envelope. +pub(crate) fn assemble_if_needed( + runtime: AssemblyRuntime<'_>, + deser_failures: &mut u32, + env: Envelope, + max_deser_failures: u32, +) -> io::Result> { + let AssemblyRuntime { + assembler, + state: assembly, + } = runtime; + let Some(assembler) = assembler else { + return Ok(Some(env)); + }; + let Some(state) = assembly.as_mut() else { + return Ok(Some(env)); + }; + + let mut failures = DeserFailureTracker::new(deser_failures, max_deser_failures); + let correlation_id = env.correlation_id; + + let parsed = match assembler.parse_frame_header(env.payload_bytes()) { + Ok(parsed) => parsed, + Err(err) => { + failures.record( + correlation_id, + "failed to parse message assembly frame header", + err, + )?; + return Ok(None); + } + }; + + let payload = env.payload_bytes(); + let Some(frame_bytes) = payload.get(parsed.header_len()..) else { + failures.record( + correlation_id, + "message assembly header length exceeds payload length", + io::Error::new( + io::ErrorKind::InvalidData, + "message assembly header length exceeds payload", + ), + )?; + return Ok(None); + }; + + let mut context = AssemblyContext { + state, + failures: &mut failures, + correlation_id, + }; + + let routing = EnvelopeRouting { + envelope_id: env.id.into(), + correlation_id: env.correlation_id.map(Into::into), + }; + + match parsed.into_header() { + FrameHeader::First(header) => { + let Some(result) = process_first_frame(&mut context, &header, frame_bytes, routing)? + else { + return Ok(None); + }; + Ok(Some(Envelope::from_assembled(&result))) + } + FrameHeader::Continuation(header) => { + let result = process_continuation_frame(&mut context, &header, frame_bytes)?; + Ok(result.map(|assembled| Envelope::from_assembled(&assembled))) + } + } +} + +struct AssemblyContext<'a, 'b> { + state: &'a mut MessageAssemblyState, + failures: &'a mut DeserFailureTracker<'b>, + correlation_id: Option, +} + +impl AssemblyContext<'_, '_> { + /// Record a failure and return `Ok(None)` to continue processing. + fn fail_invalid_none( + &mut self, + context: &str, + err: impl std::fmt::Debug, + ) -> io::Result> { + self.failures.record(self.correlation_id, context, err)?; + Ok(None) + } +} + +fn process_first_frame( + context: &mut AssemblyContext<'_, '_>, + header: &FirstFrameHeader, + frame_bytes: &[u8], + routing: EnvelopeRouting, +) -> io::Result> { + let Some(expected_len) = header.metadata_len.checked_add(header.body_len) else { + return context.fail_invalid_none( + "message assembly first frame length overflow", + io::Error::new( + io::ErrorKind::InvalidData, + "message assembly first-frame declared length overflow", + ), + ); + }; + + if frame_bytes.len() != expected_len { + return context.fail_invalid_none( + "message assembly first frame length mismatch", + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "message assembly first-frame length mismatch: expected {expected_len}, got {}", + frame_bytes.len() + ), + ), + ); + } + + let Some((metadata, body)) = frame_bytes.split_at_checked(header.metadata_len) else { + return context.fail_invalid_none( + "message assembly first frame metadata split failed", + io::Error::new( + io::ErrorKind::InvalidData, + "message assembly first-frame metadata split failed", + ), + ); + }; + + let input = match FirstFrameInput::new(header, routing, metadata.to_vec(), body) { + Ok(input) => input, + Err(err) => { + return context.fail_invalid_none( + "message assembly first frame input validation failed", + io::Error::new(io::ErrorKind::InvalidData, err), + ); + } + }; + + match context.state.accept_first_frame(input) { + Ok(result) => Ok(result), + Err(err) => context.fail_invalid_none( + "message assembly first frame rejected", + io::Error::new(io::ErrorKind::InvalidData, err), + ), + } +} + +fn process_continuation_frame( + context: &mut AssemblyContext<'_, '_>, + header: &ContinuationFrameHeader, + frame_bytes: &[u8], +) -> io::Result> { + if frame_bytes.len() != header.body_len { + return context.fail_invalid_none( + "message assembly continuation frame length mismatch", + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "message assembly continuation length mismatch: expected {}, got {}", + header.body_len, + frame_bytes.len() + ), + ), + ); + } + + match context.state.accept_continuation_frame(header, frame_bytes) { + Ok(result) => Ok(result), + Err(err) => context.fail_invalid_none( + "message assembly continuation frame rejected", + io::Error::new(io::ErrorKind::InvalidData, err), + ), + } +} + +impl Envelope { + /// Construct an envelope from a completed message assembly result. + /// + /// Uses the [`EnvelopeRouting`] stored in the assembled message, which + /// originates from the first frame. + fn from_assembled(assembled: &AssembledMessage) -> Self { + let routing = assembled.routing(); + let metadata = assembled.metadata(); + let body = assembled.body(); + let mut payload = Vec::with_capacity(metadata.len().saturating_add(body.len())); + payload.extend_from_slice(metadata); + payload.extend_from_slice(body); + + Self::new( + routing.envelope_id.into(), + routing.correlation_id.map(Into::into), + payload, + ) + } +} diff --git a/src/app/frame_handling/assembly_tests.rs b/src/app/frame_handling/assembly_tests.rs new file mode 100644 index 00000000..1fd135d8 --- /dev/null +++ b/src/app/frame_handling/assembly_tests.rs @@ -0,0 +1,382 @@ +//! Unit tests for inbound message assembly integration. + +use std::{ + io, + num::NonZeroUsize, + sync::Arc, + time::{Duration, Instant}, +}; + +use rstest::{fixture, rstest}; + +use super::{AssemblyRuntime, assemble_if_needed}; +use crate::{ + app::Envelope, + message_assembler::MessageAssemblyState, + test_helpers::{self, TestAssembler}, +}; + +#[fixture] +fn message_assembler() -> Arc { + Arc::new(TestAssembler) +} + +#[fixture] +fn message_assembly_state() -> Result { + let size = NonZeroUsize::new(1024).ok_or("failed to create NonZeroUsize")?; + Ok(MessageAssemblyState::new(size, Duration::from_millis(5))) +} + +fn inbound_envelope(id: u32, payload: Vec) -> Envelope { Envelope::new(id, Some(7), payload) } + +/// Test helper to process a frame through the assembly pipeline. +fn process_assembly_frame( + assembler: &Arc, + state: &mut Option, + deser_failures: &mut u32, + envelope: Envelope, +) -> io::Result> { + assemble_if_needed( + AssemblyRuntime::new(Some(assembler), state), + deser_failures, + envelope, + 10, + ) +} + +#[rstest] +#[expect( + clippy::panic_in_result_fn, + reason = "test assertions are the correct failure mechanism in unit tests" +)] +fn inbound_assembly_handles_interleaved_sequences( + message_assembler: Arc, + message_assembly_state: Result, +) -> Result<(), &'static str> { + let mut deser_failures = 0_u32; + let mut message_assembly_state = Some(message_assembly_state?); + + let key1_first = inbound_envelope( + 9, + test_helpers::first_frame_payload(1, b"A1", false, Some(4)), + ); + let key2_first = inbound_envelope( + 9, + test_helpers::first_frame_payload(2, b"B1", false, Some(4)), + ); + let key1_last = inbound_envelope( + 9, + test_helpers::continuation_frame_payload(1, 1, b"A2", true), + ); + let key2_last = inbound_envelope( + 9, + test_helpers::continuation_frame_payload(2, 1, b"B2", true), + ); + + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key1_first, + ) + .expect("first key1 should process") + .is_none() + ); + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key2_first, + ) + .expect("first key2 should process") + .is_none() + ); + + let completed_a = process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key1_last, + ) + .expect("key1 completion should process") + .expect("key1 should complete"); + assert_eq!(completed_a.payload_bytes(), b"A1A2"); + + let completed_b = process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key2_last, + ) + .expect("key2 completion should process") + .expect("key2 should complete"); + assert_eq!(completed_b.payload_bytes(), b"B1B2"); + + assert_eq!(deser_failures, 0, "no failures expected"); + Ok(()) +} + +#[rstest] +#[expect( + clippy::panic_in_result_fn, + reason = "test assertions are the correct failure mechanism in unit tests" +)] +fn inbound_assembly_rejects_ordering_violations( + message_assembler: Arc, + message_assembly_state: Result, +) -> Result<(), &'static str> { + let mut deser_failures = 0_u32; + let mut message_assembly_state = Some(message_assembly_state?); + + let first = inbound_envelope( + 3, + test_helpers::first_frame_payload(99, b"ab", false, Some(6)), + ); + let cont_seq1 = inbound_envelope( + 3, + test_helpers::continuation_frame_payload(99, 1, b"cd", false), + ); + let cont_seq3 = inbound_envelope( + 3, + test_helpers::continuation_frame_payload(99, 3, b"ef", false), + ); + let cont_seq2 = inbound_envelope( + 3, + test_helpers::continuation_frame_payload(99, 2, b"gh", true), + ); + + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + first, + ) + .expect("first frame should process") + .is_none() + ); + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + cont_seq1, + ) + .expect("first continuation should process") + .is_none() + ); + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + cont_seq3, + ) + .expect("out-of-order continuation should be recoverable") + .is_none() + ); + assert_eq!( + deser_failures, 1, + "ordering violation should count as one failure" + ); + + let completed = process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + cont_seq2, + ) + .expect("recovery continuation should process") + .expect("message should complete after recovery"); + assert_eq!(completed.payload_bytes(), b"abcdgh"); + Ok(()) +} + +#[rstest] +#[expect( + clippy::panic_in_result_fn, + reason = "test assertions are the correct failure mechanism in unit tests" +)] +fn inbound_assembly_timeout_purges_partial_state( + message_assembler: Arc, + message_assembly_state: Result, +) -> Result<(), &'static str> { + let mut deser_failures = 0_u32; + let mut message_assembly_state = Some(message_assembly_state?); + + let first = inbound_envelope( + 5, + test_helpers::first_frame_payload(7, b"ab", false, Some(4)), + ); + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + first, + ) + .expect("first frame should process") + .is_none() + ); + + // Advance a synthetic clock well past the 5ms assembly timeout so the + // purge is deterministic and independent of real wall-clock scheduling. + let well_past_timeout = Instant::now() + Duration::from_secs(1); + message_assembly_state + .as_mut() + .expect("assembly state should exist") + .purge_expired_at(well_past_timeout); + + let continuation = inbound_envelope( + 5, + test_helpers::continuation_frame_payload(7, 1, b"cd", true), + ); + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + continuation, + ) + .expect("continuation after purge should be recoverable") + .is_none() + ); + assert_eq!( + deser_failures, 1, + "continuation after timeout purge should count as missing first frame", + ); + Ok(()) +} + +#[rstest] +#[expect( + clippy::panic_in_result_fn, + reason = "test assertions are the correct failure mechanism in unit tests" +)] +fn assemble_if_needed_passes_through_when_assembler_is_none( + message_assembly_state: Result, +) -> Result<(), &'static str> { + let mut deser_failures = 0_u32; + let mut state = Some(message_assembly_state?); + + let envelope = inbound_envelope(9, test_helpers::first_frame_payload(1, b"A", true, Some(1))); + let result = assemble_if_needed( + AssemblyRuntime::new(None, &mut state), + &mut deser_failures, + envelope.clone(), + 10, + ) + .expect("assemble_if_needed should succeed"); + + assert_eq!(result.as_ref(), Some(&envelope)); + assert_eq!(deser_failures, 0, "no failures when assembler is absent"); + Ok(()) +} + +#[rstest] +fn assemble_if_needed_passes_through_when_state_is_none( + message_assembler: Arc, +) { + let mut deser_failures = 0_u32; + let mut state: Option = None; + + let envelope = inbound_envelope(9, test_helpers::first_frame_payload(1, b"A", true, Some(1))); + let result = assemble_if_needed( + AssemblyRuntime::new(Some(&message_assembler), &mut state), + &mut deser_failures, + envelope.clone(), + 10, + ) + .expect("assemble_if_needed should succeed"); + + assert_eq!(result.as_ref(), Some(&envelope)); + assert_eq!(deser_failures, 0, "no failures when state is absent"); +} + +#[rstest] +#[expect( + clippy::panic_in_result_fn, + reason = "test assertions are the correct failure mechanism in unit tests" +)] +fn interleaved_assemblies_preserve_first_frame_routing_metadata( + message_assembler: Arc, + message_assembly_state: Result, +) -> Result<(), &'static str> { + let mut deser_failures = 0_u32; + let mut message_assembly_state = Some(message_assembly_state?); + + // Key 1: envelope_id=10, correlation_id=100 + let key1_first = Envelope::new( + 10, + Some(100), + test_helpers::first_frame_payload(1, b"A1", false, Some(4)), + ); + // Key 2: envelope_id=20, correlation_id=200 + let key2_first = Envelope::new( + 20, + Some(200), + test_helpers::first_frame_payload(2, b"B1", false, Some(4)), + ); + + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key1_first, + ) + .expect("first key1 should process") + .is_none() + ); + assert!( + process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key2_first, + ) + .expect("first key2 should process") + .is_none() + ); + + // Complete key 2 first (with a different envelope id on the continuation) + let key2_last = Envelope::new( + 88, + Some(888), + test_helpers::continuation_frame_payload(2, 1, b"B2", true), + ); + let completed_b = process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key2_last, + ) + .expect("key2 completion should process") + .expect("key2 should complete"); + + assert_eq!(completed_b.id, 20, "key2 envelope id from first frame"); + assert_eq!(completed_b.correlation_id, Some(200)); + assert_eq!(completed_b.payload_bytes(), b"B1B2"); + + // Complete key 1 + let key1_last = Envelope::new( + 77, + Some(777), + test_helpers::continuation_frame_payload(1, 1, b"A2", true), + ); + let completed_a = process_assembly_frame( + &message_assembler, + &mut message_assembly_state, + &mut deser_failures, + key1_last, + ) + .expect("key1 completion should process") + .expect("key1 should complete"); + + assert_eq!(completed_a.id, 10, "key1 envelope id from first frame"); + assert_eq!(completed_a.correlation_id, Some(100)); + assert_eq!(completed_a.payload_bytes(), b"A1A2"); + Ok(()) +} diff --git a/src/app/frame_handling/decode.rs b/src/app/frame_handling/decode.rs new file mode 100644 index 00000000..dfa5b541 --- /dev/null +++ b/src/app/frame_handling/decode.rs @@ -0,0 +1,42 @@ +//! Helpers for decoding envelopes from inbound frames. + +use std::io; + +use crate::{app::Envelope, codec::FrameCodec}; + +/// Decode an envelope and apply connection-level deserialization failure policy. +pub(crate) fn decode_envelope( + parse_result: Result<(Envelope, usize), Box>, + frame: &F::Frame, + deser_failures: &mut u32, + max_deser_failures: u32, +) -> io::Result> +where + F: FrameCodec, +{ + match parse_result { + Ok((mut env, _)) => { + if env.correlation_id.is_none() { + env.correlation_id = F::correlation_id(frame); + } + Ok(Some(env)) + } + Err(err) => { + *deser_failures += 1; + let correlation_id = F::correlation_id(frame); + let context = "failed to decode message"; + log::warn!("{context}: correlation_id={correlation_id:?}, error={err:?}"); + crate::metrics::inc_deser_errors(); + if *deser_failures >= max_deser_failures { + log::warn!( + "closing connection after {deser_failures} deserialization failures: {context}" + ); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "too many deserialization failures", + )); + } + Ok(None) + } + } +} diff --git a/src/app/frame_handling/mod.rs b/src/app/frame_handling/mod.rs index 9cce4b6f..f15062f3 100644 --- a/src/app/frame_handling/mod.rs +++ b/src/app/frame_handling/mod.rs @@ -2,14 +2,25 @@ //! //! Extracted from `connection.rs` to keep modules small and focused. +mod assembly; mod core; +mod decode; mod reassembly; mod response; pub(crate) use core::ResponseContext; +pub(crate) use assembly::{ + AssemblyRuntime, + assemble_if_needed, + new_message_assembly_state, + purge_expired_assemblies, +}; +pub(crate) use decode::decode_envelope; pub(crate) use reassembly::reassemble_if_needed; pub(crate) use response::forward_response; +#[cfg(all(test, not(loom)))] +mod assembly_tests; #[cfg(all(test, not(loom)))] mod tests; diff --git a/src/message_assembler/mod.rs b/src/message_assembler/mod.rs index bfd2d32b..2a0e44a4 100644 --- a/src/message_assembler/mod.rs +++ b/src/message_assembler/mod.rs @@ -39,7 +39,14 @@ pub use header::{ }; pub use series::MessageSeries; pub use state::MessageAssemblyState; -pub use types::{AssembledMessage, FirstFrameInput, FirstFrameInputError}; +pub use types::{ + AssembledMessage, + CorrelationId, + EnvelopeId, + EnvelopeRouting, + FirstFrameInput, + FirstFrameInputError, +}; /// Hook trait for protocol-specific multi-frame request parsing. /// diff --git a/src/message_assembler/state.rs b/src/message_assembler/state.rs index ff1e7085..863579cc 100644 --- a/src/message_assembler/state.rs +++ b/src/message_assembler/state.rs @@ -15,22 +15,24 @@ use super::{ MessageKey, error::{MessageAssemblyError, MessageSeriesError, MessageSeriesStatus}, series::MessageSeries, - types::{AssembledMessage, FirstFrameInput}, + types::{AssembledMessage, EnvelopeRouting, FirstFrameInput}, }; /// Partial message assembly in progress. #[derive(Debug)] struct PartialAssembly { series: MessageSeries, + routing: EnvelopeRouting, metadata: Vec, body_buffer: Vec, started_at: Instant, } impl PartialAssembly { - fn new(series: MessageSeries, started_at: Instant) -> Self { + fn new(series: MessageSeries, routing: EnvelopeRouting, started_at: Instant) -> Self { Self { series, + routing, metadata: Vec::new(), body_buffer: Vec::new(), started_at, @@ -56,6 +58,8 @@ impl PartialAssembly { /// /// use wireframe::message_assembler::{ /// ContinuationFrameHeader, +/// EnvelopeId, +/// EnvelopeRouting, /// FirstFrameHeader, /// FirstFrameInput, /// FrameSequence, @@ -76,8 +80,12 @@ impl PartialAssembly { /// total_body_len: Some(10), /// is_last: false, /// }; -/// let input = -/// FirstFrameInput::new(&first, vec![0x01, 0x02], b"hello").expect("header lengths match"); +/// let routing = EnvelopeRouting { +/// envelope_id: EnvelopeId(1), +/// correlation_id: None, +/// }; +/// let input = FirstFrameInput::new(&first, routing, vec![0x01, 0x02], b"hello") +/// .expect("header lengths match"); /// let msg = state /// .accept_first_frame(input) /// .expect("first frame accepted"); @@ -179,13 +187,15 @@ impl MessageAssemblyState { if input.header.is_last { return Ok(Some(AssembledMessage::new( key, + input.routing, input.metadata, input.body.to_vec(), ))); } - // Start new assembly - let mut partial = PartialAssembly::new(series, now); + // Start new assembly, preserving envelope routing metadata from the + // first frame so the completed message is dispatched correctly. + let mut partial = PartialAssembly::new(series, input.routing, now); partial.set_metadata(input.metadata); partial.push_body(input.body); self.assemblies.insert(key, partial); @@ -318,6 +328,7 @@ impl MessageAssemblyState { let partial = entry.remove(); Ok(Some(AssembledMessage::new( key, + partial.routing, partial.metadata, partial.body_buffer, ))) diff --git a/src/message_assembler/state_tests.rs b/src/message_assembler/state_tests.rs index ca9a7c24..4b7105f7 100644 --- a/src/message_assembler/state_tests.rs +++ b/src/message_assembler/state_tests.rs @@ -9,6 +9,9 @@ use rstest::{fixture, rstest}; use crate::message_assembler::{ AssembledMessage, + CorrelationId, + EnvelopeId, + EnvelopeRouting, FirstFrameHeader, FirstFrameInput, MessageAssemblyError, @@ -43,7 +46,13 @@ fn state_tracks_single_message_assembly( total_body_len: Some(10), is_last: false, }; - let input = FirstFrameInput::new(&first, vec![0x01, 0x02], b"hello").expect("valid input"); + let input = FirstFrameInput::new( + &first, + EnvelopeRouting::default(), + vec![0x01, 0x02], + b"hello", + ) + .expect("valid input"); let result = state.accept_first_frame(input).expect("accept first frame"); assert!(result.is_none()); assert_eq!(state.buffered_count(), 1); @@ -66,13 +75,19 @@ fn state_tracks_multiple_interleaved_messages( // Start message 1 let first1 = first_header(1, 2, false); state - .accept_first_frame(FirstFrameInput::new(&first1, vec![], b"A1").expect("valid input")) + .accept_first_frame( + FirstFrameInput::new(&first1, EnvelopeRouting::default(), vec![], b"A1") + .expect("valid input"), + ) .expect("first frame 1"); // Start message 2 let first2 = first_header(2, 2, false); state - .accept_first_frame(FirstFrameInput::new(&first2, vec![], b"B1").expect("valid input")) + .accept_first_frame( + FirstFrameInput::new(&first2, EnvelopeRouting::default(), vec![], b"B1") + .expect("valid input"), + ) .expect("first frame 2"); assert_eq!(state.buffered_count(), 2); @@ -118,12 +133,18 @@ fn state_rejects_duplicate_first_frame( ) { let first = first_header(1, 5, false); state - .accept_first_frame(FirstFrameInput::new(&first, vec![], b"hello").expect("valid input")) + .accept_first_frame( + FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], b"hello") + .expect("valid input"), + ) .expect("first frame"); // Try duplicate first frame let err = state - .accept_first_frame(FirstFrameInput::new(&first, vec![], b"again").expect("valid input")) + .accept_first_frame( + FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], b"again") + .expect("valid input"), + ) .expect_err("should reject duplicate"); assert!(matches!( err, @@ -173,7 +194,8 @@ fn state_enforces_size_limit(#[case] params: SizeLimitCase) { params.continuation_body_len.is_none(), ), }; - let input = FirstFrameInput::new(&first, vec![], &first_body).expect("valid input"); + let input = FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], &first_body) + .expect("valid input"); match params.continuation_body_len { None => { @@ -225,7 +247,8 @@ fn state_purges_expired_assemblies() { let first = first_header(1, 5, false); state .accept_first_frame_at( - FirstFrameInput::new(&first, vec![], b"hello").expect("valid input"), + FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![], b"hello") + .expect("valid input"), now, ) .expect("accept first frame"); @@ -252,7 +275,8 @@ fn state_returns_single_frame_message_immediately( }; let msg = state .accept_first_frame( - FirstFrameInput::new(&first, vec![0xaa], b"hello").expect("valid input"), + FirstFrameInput::new(&first, EnvelopeRouting::default(), vec![0xaa], b"hello") + .expect("valid input"), ) .expect("accept first frame") .expect("single frame should complete"); @@ -263,9 +287,98 @@ fn state_returns_single_frame_message_immediately( assert_eq!(state.buffered_count(), 0); } +#[rstest] +fn completed_assembly_preserves_first_frame_routing_metadata( + #[from(state_with_defaults)] mut state: MessageAssemblyState, +) { + let routing = EnvelopeRouting { + envelope_id: EnvelopeId(42), + correlation_id: Some(CorrelationId(99)), + }; + let first = first_header(1, 5, false); + state + .accept_first_frame( + FirstFrameInput::new(&first, routing, vec![], b"hello").expect("valid input"), + ) + .expect("first frame"); + + let cont = continuation_header(1, 1, 5, true); + let msg = state + .accept_continuation_frame(&cont, b"world") + .expect("accept continuation") + .expect("message should complete"); + + assert_eq!( + msg.routing().envelope_id, + EnvelopeId(42), + "envelope_id should come from first frame" + ); + assert_eq!( + msg.routing().correlation_id, + Some(CorrelationId(99)), + "correlation_id should come from first frame" + ); +} + +#[rstest] +fn interleaved_assemblies_preserve_distinct_routing_metadata( + #[from(state_with_defaults)] mut state: MessageAssemblyState, +) { + // Start assembly for key 1 with envelope_id=10, correlation_id=100 + let routing1 = EnvelopeRouting { + envelope_id: EnvelopeId(10), + correlation_id: Some(CorrelationId(100)), + }; + let first1 = first_header(1, 2, false); + state + .accept_first_frame( + FirstFrameInput::new(&first1, routing1, vec![], b"A1").expect("valid input"), + ) + .expect("first frame 1"); + + // Start assembly for key 2 with envelope_id=20, correlation_id=200 + let routing2 = EnvelopeRouting { + envelope_id: EnvelopeId(20), + correlation_id: Some(CorrelationId(200)), + }; + let first2 = first_header(2, 2, false); + state + .accept_first_frame( + FirstFrameInput::new(&first2, routing2, vec![], b"B1").expect("valid input"), + ) + .expect("first frame 2"); + + // Complete key 1 + let cont1 = continuation_header(1, 1, 2, true); + let msg1 = state + .accept_continuation_frame(&cont1, b"A2") + .expect("continuation 1") + .expect("message 1 should complete"); + + assert_eq!(msg1.routing().envelope_id, EnvelopeId(10)); + assert_eq!(msg1.routing().correlation_id, Some(CorrelationId(100))); + assert_eq!(msg1.body(), b"A1A2"); + + // Complete key 2 + let cont2 = continuation_header(2, 1, 2, true); + let msg2 = state + .accept_continuation_frame(&cont2, b"B2") + .expect("continuation 2") + .expect("message 2 should complete"); + + assert_eq!(msg2.routing().envelope_id, EnvelopeId(20)); + assert_eq!(msg2.routing().correlation_id, Some(CorrelationId(200))); + assert_eq!(msg2.body(), b"B1B2"); +} + #[test] fn assembled_message_into_body() { - let msg = AssembledMessage::new(MessageKey(1), vec![0x01], vec![0x02, 0x03]); + let msg = AssembledMessage::new( + MessageKey(1), + EnvelopeRouting::default(), + vec![0x01], + vec![0x02, 0x03], + ); let body = msg.into_body(); assert_eq!(body, vec![0x02, 0x03]); } diff --git a/src/message_assembler/types.rs b/src/message_assembler/types.rs index 60ead24c..be013add 100644 --- a/src/message_assembler/types.rs +++ b/src/message_assembler/types.rs @@ -1,21 +1,68 @@ //! Public types for message assembly inputs and outputs. //! -//! This module contains `FirstFrameInput`, `FirstFrameInputError`, and -//! `AssembledMessage`, extracted from `state.rs` to meet the 400-line file -//! limit. +//! This module contains `FirstFrameInput`, `FirstFrameInputError`, +//! `EnvelopeRouting`, and `AssembledMessage`, extracted from `state.rs` +//! to meet the 400-line file limit. use thiserror::Error; use super::{FirstFrameHeader, MessageKey}; +/// Envelope identifier from the enclosing transport frame. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +pub struct EnvelopeId(pub u32); + +impl From for EnvelopeId { + fn from(value: u32) -> Self { Self(value) } +} + +impl From for u32 { + fn from(value: EnvelopeId) -> Self { value.0 } +} + +/// Correlation identifier from the enclosing transport frame. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct CorrelationId(pub u64); + +impl From for CorrelationId { + fn from(value: u64) -> Self { Self(value) } +} + +impl From for u64 { + fn from(value: CorrelationId) -> Self { value.0 } +} + +/// Routing metadata from the transport envelope that carried a first frame. +/// +/// Captured at first-frame time so the completed [`AssembledMessage`] can +/// be dispatched to the correct handler and logged under the original +/// correlation identifier, regardless of which continuation frame +/// completed the assembly. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct EnvelopeRouting { + /// Envelope identifier from the enclosing transport frame. + pub envelope_id: EnvelopeId, + /// Correlation identifier from the enclosing transport frame. + pub correlation_id: Option, +} + /// Input data for a first frame. /// -/// Groups the header and payload components that comprise a first frame. +/// Groups the header and payload components that comprise a first frame, +/// along with the [`EnvelopeRouting`] metadata so the assembler can +/// preserve it for the completed message. /// /// # Examples /// /// ``` -/// use wireframe::message_assembler::{FirstFrameHeader, FirstFrameInput, MessageKey}; +/// use wireframe::message_assembler::{ +/// CorrelationId, +/// EnvelopeId, +/// EnvelopeRouting, +/// FirstFrameHeader, +/// FirstFrameInput, +/// MessageKey, +/// }; /// /// let header = FirstFrameHeader { /// message_key: MessageKey(1), @@ -24,14 +71,21 @@ use super::{FirstFrameHeader, MessageKey}; /// total_body_len: None, /// is_last: false, /// }; -/// let input = FirstFrameInput::new(&header, vec![0x01, 0x02], b"hello") +/// let routing = EnvelopeRouting { +/// envelope_id: EnvelopeId(42), +/// correlation_id: Some(CorrelationId(7)), +/// }; +/// let input = FirstFrameInput::new(&header, routing, vec![0x01, 0x02], b"hello") /// .expect("header lengths match payload sizes"); /// assert_eq!(input.header.message_key, MessageKey(1)); +/// assert_eq!(input.routing.envelope_id, EnvelopeId(42)); /// ``` #[derive(Debug)] pub struct FirstFrameInput<'a> { /// The frame header. pub header: &'a FirstFrameHeader, + /// Envelope routing metadata from the enclosing transport frame. + pub routing: EnvelopeRouting, /// Protocol-specific metadata. pub metadata: Vec, /// Body payload slice. @@ -68,6 +122,7 @@ impl<'a> FirstFrameInput<'a> { /// or `header.body_len` does not match `body.len()`. pub fn new( header: &'a FirstFrameHeader, + routing: EnvelopeRouting, metadata: Vec, body: &'a [u8], ) -> Result { @@ -85,6 +140,7 @@ impl<'a> FirstFrameInput<'a> { } Ok(Self { header, + routing, metadata, body, }) @@ -93,21 +149,38 @@ impl<'a> FirstFrameInput<'a> { /// Container for a fully assembled message. /// +/// Preserves [`EnvelopeRouting`] from the first frame so that completed +/// messages are dispatched to the correct handler and logged under the +/// original correlation identifier. +/// /// # Examples /// /// ``` -/// use wireframe::message_assembler::{AssembledMessage, MessageKey}; +/// use wireframe::message_assembler::{ +/// AssembledMessage, +/// CorrelationId, +/// EnvelopeId, +/// EnvelopeRouting, +/// MessageKey, +/// }; /// /// // Normally obtained from MessageAssemblyState::accept_first_frame or /// // accept_continuation_frame when a message completes. -/// let msg = AssembledMessage::new(MessageKey(1), vec![0x01], vec![0x02, 0x03]); +/// let routing = EnvelopeRouting { +/// envelope_id: EnvelopeId(42), +/// correlation_id: Some(CorrelationId(7)), +/// }; +/// let msg = AssembledMessage::new(MessageKey(1), routing, vec![0x01], vec![0x02, 0x03]); /// assert_eq!(msg.message_key(), MessageKey(1)); +/// assert_eq!(msg.routing().envelope_id, EnvelopeId(42)); +/// assert_eq!(msg.routing().correlation_id, Some(CorrelationId(7))); /// assert_eq!(msg.metadata(), &[0x01]); /// assert_eq!(msg.body(), &[0x02, 0x03]); /// ``` #[derive(Clone, Debug, PartialEq, Eq)] pub struct AssembledMessage { message_key: MessageKey, + routing: EnvelopeRouting, metadata: Vec, body: Vec, } @@ -115,9 +188,15 @@ pub struct AssembledMessage { impl AssembledMessage { /// Create a new assembled message. #[must_use] - pub fn new(message_key: MessageKey, metadata: Vec, body: Vec) -> Self { + pub fn new( + message_key: MessageKey, + routing: EnvelopeRouting, + metadata: Vec, + body: Vec, + ) -> Self { Self { message_key, + routing, metadata, body, } @@ -127,6 +206,10 @@ impl AssembledMessage { #[must_use] pub const fn message_key(&self) -> MessageKey { self.message_key } + /// Envelope routing metadata from the first frame. + #[must_use] + pub const fn routing(&self) -> EnvelopeRouting { self.routing } + /// Protocol-specific metadata from the first frame. #[must_use] pub fn metadata(&self) -> &[u8] { &self.metadata } diff --git a/src/test_helpers.rs b/src/test_helpers.rs index 86a2223b..e02f1bd3 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -3,7 +3,7 @@ use std::io; -use bytes::Buf; +use bytes::{Buf, BufMut, BytesMut}; use crate::message_assembler::{ ContinuationFrameHeader, @@ -117,3 +117,43 @@ fn ensure_remaining(buf: &mut &[u8], needed: usize) -> Result<(), io::Error> { fn invalid_data(message: &'static str) -> io::Error { io::Error::new(io::ErrorKind::InvalidData, message) } + +/// Build a first-frame payload for the test protocol. +#[must_use] +pub fn first_frame_payload(key: u64, body: &[u8], is_last: bool, total: Option) -> Vec { + let mut payload = BytesMut::new(); + payload.put_u8(0x01); + let mut flags = 0u8; + if is_last { + flags |= 0b1; + } + if total.is_some() { + flags |= 0b10; + } + payload.put_u8(flags); + payload.put_u64(key); + payload.put_u16(0); + payload.put_u32(u32::try_from(body.len()).unwrap_or(u32::MAX)); + if let Some(total) = total { + payload.put_u32(total); + } + payload.extend_from_slice(body); + payload.to_vec() +} + +/// Build a continuation-frame payload for the test protocol. +#[must_use] +pub fn continuation_frame_payload(key: u64, sequence: u32, body: &[u8], is_last: bool) -> Vec { + let mut payload = BytesMut::new(); + payload.put_u8(0x02); + let mut flags = 0b10; + if is_last { + flags |= 0b1; + } + payload.put_u8(flags); + payload.put_u64(key); + payload.put_u32(u32::try_from(body.len()).unwrap_or(u32::MAX)); + payload.put_u32(sequence); + payload.extend_from_slice(body); + payload.to_vec() +} diff --git a/tests/features/message_assembly_inbound.feature b/tests/features/message_assembly_inbound.feature new file mode 100644 index 00000000..d841971c --- /dev/null +++ b/tests/features/message_assembly_inbound.feature @@ -0,0 +1,31 @@ +Feature: Inbound message assembly integration + Message assembly runs on the inbound connection path after transport + fragmentation and before handler dispatch. + + Scenario: Interleaved assembly dispatches completed payloads + Given an inbound app with message assembly timeout 200 milliseconds + When a first frame for key 1 with body "A1" arrives + And a first frame for key 2 with body "B1" arrives + And a final continuation frame for key 1 sequence 1 with body "A2" arrives + And a final continuation frame for key 2 sequence 1 with body "B2" arrives + Then the handler eventually receives payload "A1A2" + And the handler eventually receives payload "B1B2" + And no send error is recorded + + Scenario: Ordering violations are rejected and recovery can complete + Given an inbound app with message assembly timeout 200 milliseconds + When a first frame for key 9 with body "ab" arrives + And a continuation frame for key 9 sequence 1 with body "cd" arrives + And a continuation frame for key 9 sequence 3 with body "ef" arrives + And a final continuation frame for key 9 sequence 2 with body "gh" arrives + Then the handler eventually receives payload "abcdgh" + And the handler receives 1 payloads + And no send error is recorded + + Scenario: Timeout purges partial assembly before continuation arrives + Given an inbound app with message assembly timeout 10 milliseconds + When a first frame for key 7 with body "ab" arrives + And time advances by 30 milliseconds + And a final continuation frame for key 7 sequence 1 with body "cd" arrives + Then the handler receives 0 payloads + And no send error is recorded diff --git a/tests/fixtures/message_assembly.rs b/tests/fixtures/message_assembly.rs index 7e2fbace..d60a666b 100644 --- a/tests/fixtures/message_assembly.rs +++ b/tests/fixtures/message_assembly.rs @@ -17,6 +17,7 @@ use rstest::fixture; use wireframe::message_assembler::{ AssembledMessage, ContinuationFrameHeader, + EnvelopeRouting, FirstFrameHeader, FirstFrameInput, FrameSequence, @@ -142,8 +143,13 @@ impl MessageAssemblyWorld { let Some(now) = self.current_time else { return Err("time not set".into()); }; - let input = FirstFrameInput::new(&pending.header, pending.metadata, &pending.body) - .map_err(|e| format!("invalid input: {e}"))?; + let input = FirstFrameInput::new( + &pending.header, + EnvelopeRouting::default(), + pending.metadata, + &pending.body, + ) + .map_err(|e| format!("invalid input: {e}"))?; self.last_result = Some(state.accept_first_frame_at(input, now)); if let Some(Ok(Some(msg))) = &self.last_result { self.completed_messages.push(msg.clone()); @@ -165,8 +171,13 @@ impl MessageAssemblyWorld { }; while let Some(pending) = self.pending_first_frames.pop_front() { - let input = FirstFrameInput::new(&pending.header, pending.metadata, &pending.body) - .map_err(|e| format!("invalid input: {e}"))?; + let input = FirstFrameInput::new( + &pending.header, + EnvelopeRouting::default(), + pending.metadata, + &pending.body, + ) + .map_err(|e| format!("invalid input: {e}"))?; let result = state.accept_first_frame_at(input, now); if let Ok(Some(msg)) = &result { self.completed_messages.push(msg.clone()); diff --git a/tests/fixtures/message_assembly_inbound.rs b/tests/fixtures/message_assembly_inbound.rs new file mode 100644 index 00000000..2c41967f --- /dev/null +++ b/tests/fixtures/message_assembly_inbound.rs @@ -0,0 +1,380 @@ +//! `MessageAssemblyInboundWorld` fixture for inbound assembly integration. + +use std::{fmt, future::Future, num::NonZeroUsize, time::Duration}; + +use futures::SinkExt; +use rstest::fixture; +use tokio::{io::DuplexStream, sync::mpsc, task::JoinHandle, time::timeout}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + Serializer, + app::{Envelope, Handler, WireframeApp}, + fragment::FragmentationConfig, + serializer::BincodeSerializer, + test_helpers::{self, TestAssembler}, +}; +pub use wireframe_testing::TestResult; + +/// Protocol-level message key identifying a logical message stream. +/// +/// Wraps a `u64` so step definitions can parse it from feature-file parameters. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct MessageKey(pub u64); + +impl From for MessageKey { + fn from(value: u64) -> Self { Self(value) } +} + +impl From for u64 { + fn from(value: MessageKey) -> Self { value.0 } +} + +/// Continuation-frame sequence number for ordering validation. +/// +/// Wraps a `u32` so step definitions can parse it from feature-file parameters. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct FrameSequence(pub u32); + +impl From for FrameSequence { + fn from(value: u32) -> Self { Self(value) } +} + +impl From for u32 { + fn from(value: FrameSequence) -> Self { value.0 } +} + +const ROUTE_ID: u32 = 77; +const CORRELATION_ID: Option = Some(5); +const BUFFER_CAPACITY: usize = 512; +const PAYLOAD_COLLECT_TIMEOUT_MS: u64 = 200; +const COUNT_COLLECT_TIMEOUT_MS: u64 = 120; + +/// Runtime-backed fixture that drives inbound message assembly through +/// `WireframeApp::handle_connection_result`. +pub struct MessageAssemblyInboundWorld { + runtime: Option, + runtime_error: Option, + client: Option>, + server: Option>>, + observed_rx: Option>>, + observed_payloads: Vec>, + last_send_error: Option, +} + +impl fmt::Debug for MessageAssemblyInboundWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MessageAssemblyInboundWorld") + .field("client_initialized", &self.client.is_some()) + .field("server_initialized", &self.server.is_some()) + .field("observed_payloads", &self.observed_payloads.len()) + .field("last_send_error", &self.last_send_error) + .finish_non_exhaustive() + } +} + +#[derive(Debug, Clone, Copy)] +struct ContinuationFrameParams<'a> { + key: u64, + sequence: u32, + body: &'a str, + is_last: bool, +} + +impl<'a> ContinuationFrameParams<'a> { + fn new(key: u64, sequence: u32, body: &'a str, is_last: bool) -> Self { + Self { + key, + sequence, + body, + is_last, + } + } +} + +impl Default for MessageAssemblyInboundWorld { + fn default() -> Self { + match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => Self { + runtime: Some(runtime), + runtime_error: None, + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + }, + Err(err) => Self { + runtime: None, + runtime_error: Some(format!("failed to create runtime: {err}")), + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + }, + } + } +} + +/// Construct and return a default [`MessageAssemblyInboundWorld`] for BDD +/// test scenarios exercising inbound message assembly integration. +/// +/// Injected by rstest into each `#[scenario]` function so that step +/// definitions can drive `WireframeApp::handle_connection_result` through +/// the full assembly pipeline. +// rustfmt collapses simple fixtures into one line, which triggers unused_braces. +#[rustfmt::skip] +#[fixture] +pub fn message_assembly_inbound_world() -> MessageAssemblyInboundWorld { + MessageAssemblyInboundWorld::default() +} + +impl MessageAssemblyInboundWorld { + fn runtime(&self) -> TestResult<&tokio::runtime::Runtime> { + self.runtime.as_ref().ok_or_else(|| { + self.runtime_error + .clone() + .unwrap_or_else(|| "runtime unavailable".to_string()) + .into() + }) + } + + fn block_on(&self, future: F) -> TestResult + where + F: Future, + { + if tokio::runtime::Handle::try_current().is_ok() { + return Err("nested Tokio runtime detected in inbound assembly fixture".into()); + } + Ok(self.runtime()?.block_on(future)) + } + + /// Build and start a `WireframeApp` with message assembly enabled. + /// + /// Pauses the Tokio clock so that subsequent `wait_millis` calls advance + /// time deterministically. The handler forwards received payloads to an + /// internal channel for later assertion. + /// + /// # Errors + /// + /// Returns an error if the fragmentation config, app builder, or runtime + /// initialisation fails. + pub fn start_app(&mut self, timeout_ms: u64) -> TestResult { + let message_limit = + NonZeroUsize::new(BUFFER_CAPACITY.saturating_mul(16)).unwrap_or(NonZeroUsize::MIN); + let config = FragmentationConfig::for_frame_budget( + BUFFER_CAPACITY, + message_limit, + Duration::from_millis(timeout_ms), + ) + .ok_or("frame budget too small for fragmentation config")?; + + let (tx, rx) = mpsc::unbounded_channel::>(); + let handler: Handler = std::sync::Arc::new(move |env: &Envelope| { + let tx = tx.clone(); + let payload = env.payload_bytes().to_vec(); + Box::pin(async move { + let _ = tx.send(payload); + }) + }); + + let app: WireframeApp = WireframeApp::new()? + .buffer_capacity(BUFFER_CAPACITY) + .fragmentation(Some(config)) + .with_message_assembler(TestAssembler) + .route(ROUTE_ID, handler)?; + + let codec = app.length_codec(); + let (client_stream, server_stream) = tokio::io::duplex(2048); + let client = Framed::new(client_stream, codec); + + // Pause the tokio clock so that time-advance steps are deterministic + // and independent of real wall-clock scheduling. + self.block_on(async { tokio::time::pause() })?; + let server = self + .runtime()? + .spawn(async move { app.handle_connection_result(server_stream).await }); + + self.client = Some(client); + self.server = Some(server); + self.observed_rx = Some(rx); + self.observed_payloads.clear(); + self.last_send_error = None; + Ok(()) + } + + /// Serialize and send a first frame for the given message `key`. + /// + /// # Errors + /// + /// Returns an error if the client is not initialised or the send fails. + pub fn send_first_frame(&mut self, key: impl Into, body: &str) -> TestResult { + let key = key.into(); + self.send_payload(test_helpers::first_frame_payload( + key.0, + body.as_bytes(), + false, + None, + )) + } + + /// Serialize and send a non-final continuation frame. + /// + /// # Errors + /// + /// Returns an error if the client is not initialised or the send fails. + pub fn send_continuation_frame( + &mut self, + key: impl Into, + sequence: impl Into, + body: &str, + ) -> TestResult { + let key = key.into(); + let sequence = sequence.into(); + let params = ContinuationFrameParams::new(key.0, sequence.0, body, false); + self.send_continuation_frame_impl(params) + } + + /// Serialize and send a final continuation frame that completes assembly. + /// + /// # Errors + /// + /// Returns an error if the client is not initialised or the send fails. + pub fn send_final_continuation_frame( + &mut self, + key: impl Into, + sequence: impl Into, + body: &str, + ) -> TestResult { + let key = key.into(); + let sequence = sequence.into(); + let params = ContinuationFrameParams::new(key.0, sequence.0, body, true); + self.send_continuation_frame_impl(params) + } + + fn send_continuation_frame_impl(&mut self, params: ContinuationFrameParams<'_>) -> TestResult { + let ContinuationFrameParams { + key, + sequence, + body, + is_last, + } = params; + self.send_payload(test_helpers::continuation_frame_payload( + key, + sequence, + body.as_bytes(), + is_last, + )) + } + + /// Advance the paused Tokio clock by `millis` milliseconds. + /// + /// # Errors + /// + /// Returns an error if the runtime is unavailable. + pub fn wait_millis(&mut self, millis: u64) -> TestResult { + self.block_on(async { tokio::time::advance(Duration::from_millis(millis)).await })?; + Ok(()) + } + + /// Assert that the handler has received a payload matching `expected`. + /// + /// Collects observed payloads for up to + /// [`PAYLOAD_COLLECT_TIMEOUT_MS`] before checking. + /// + /// # Errors + /// + /// Returns an error if the expected payload was not observed. + pub fn assert_received_payload(&mut self, expected: &str) -> TestResult { + self.collect_observed_for(Duration::from_millis(PAYLOAD_COLLECT_TIMEOUT_MS))?; + let expected = expected.as_bytes(); + if self + .observed_payloads + .iter() + .any(|payload| payload.as_slice() == expected) + { + return Ok(()); + } + + Err(format!( + "expected payload {:?} not observed; observed={:?}", + expected, self.observed_payloads + ) + .into()) + } + + /// Assert that exactly `expected_count` payloads have been received. + /// + /// Collects observed payloads for up to [`COUNT_COLLECT_TIMEOUT_MS`] + /// before checking. + /// + /// # Errors + /// + /// Returns an error if the count does not match. + pub fn assert_received_count(&mut self, expected_count: usize) -> TestResult { + self.collect_observed_for(Duration::from_millis(COUNT_COLLECT_TIMEOUT_MS))?; + let actual = self.observed_payloads.len(); + if actual == expected_count { + return Ok(()); + } + Err(format!("expected {expected_count} payloads, got {actual}").into()) + } + + /// Assert that no send error has been recorded. + /// + /// # Errors + /// + /// Returns an error containing the recorded send error message. + pub fn assert_no_send_error(&self) -> TestResult { + if self.last_send_error.is_none() { + return Ok(()); + } + Err(format!("unexpected send error: {:?}", self.last_send_error).into()) + } + + fn send_payload(&mut self, payload: Vec) -> TestResult { + let envelope = Envelope::new(ROUTE_ID, CORRELATION_ID, payload); + let serializer = BincodeSerializer; + let frame = serializer.serialize(&envelope)?; + + let mut client = self.client.take().ok_or("client not initialized")?; + let send_result = self.block_on(async { + client.send(frame.into()).await?; + client.flush().await?; + Ok::<(), std::io::Error>(()) + }); + + self.client = Some(client); + match send_result { + Ok(Ok(())) => { + self.last_send_error = None; + Ok(()) + } + Ok(Err(err)) => { + self.last_send_error = Some(err.to_string()); + Err(err.into()) + } + Err(err) => { + self.last_send_error = Some(err.to_string()); + Err(err) + } + } + } + + fn collect_observed_for(&mut self, max_wait: Duration) -> TestResult { + let mut observed_rx = self.observed_rx.take().ok_or("receiver not initialized")?; + let result = self.block_on(async { + let mut collected = Vec::new(); + while let Ok(Some(payload)) = timeout(max_wait, observed_rx.recv()).await { + collected.push(payload); + } + collected + }); + self.observed_rx = Some(observed_rx); + self.observed_payloads.extend(result?); + Ok(()) + } +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index d69efa54..c9137778 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -13,6 +13,7 @@ pub mod correlation; pub mod fragment; pub mod message_assembler; pub mod message_assembly; +pub mod message_assembly_inbound; pub mod multi_packet; pub mod panic; pub mod request_parts; diff --git a/tests/scenarios/message_assembly_inbound_scenarios.rs b/tests/scenarios/message_assembly_inbound_scenarios.rs new file mode 100644 index 00000000..0a0c8e76 --- /dev/null +++ b/tests/scenarios/message_assembly_inbound_scenarios.rs @@ -0,0 +1,33 @@ +//! Scenario tests for inbound message assembly integration. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::message_assembly_inbound::*; + +#[scenario( + path = "tests/features/message_assembly_inbound.feature", + name = "Interleaved assembly dispatches completed payloads" +)] +fn message_assembly_inbound_interleaved( + message_assembly_inbound_world: MessageAssemblyInboundWorld, +) { + drop(message_assembly_inbound_world); +} + +#[scenario( + path = "tests/features/message_assembly_inbound.feature", + name = "Ordering violations are rejected and recovery can complete" +)] +fn message_assembly_inbound_ordering_violation( + message_assembly_inbound_world: MessageAssemblyInboundWorld, +) { + drop(message_assembly_inbound_world); +} + +#[scenario( + path = "tests/features/message_assembly_inbound.feature", + name = "Timeout purges partial assembly before continuation arrives" +)] +fn message_assembly_inbound_timeout(message_assembly_inbound_world: MessageAssemblyInboundWorld) { + drop(message_assembly_inbound_world); +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index a66eb841..e92912a8 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -16,6 +16,7 @@ mod codec_stateful_scenarios; mod correlation_scenarios; mod fragment_scenarios; mod message_assembler_scenarios; +mod message_assembly_inbound_scenarios; mod message_assembly_scenarios; mod multi_packet_scenarios; mod panic_scenarios; diff --git a/tests/steps/message_assembly_inbound_steps.rs b/tests/steps/message_assembly_inbound_steps.rs new file mode 100644 index 00000000..0f2e9fc5 --- /dev/null +++ b/tests/steps/message_assembly_inbound_steps.rs @@ -0,0 +1,79 @@ +//! Step definitions for inbound message assembly integration scenarios. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::message_assembly_inbound::{MessageAssemblyInboundWorld, TestResult}; + +#[given("an inbound app with message assembly timeout {timeout_ms:u64} milliseconds")] +fn given_inbound_app( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + timeout_ms: u64, +) -> TestResult { + message_assembly_inbound_world.start_app(timeout_ms) +} + +#[when("a first frame for key {key:u64} with body {body:string} arrives")] +fn when_first_frame_arrives( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + key: u64, + body: String, +) -> TestResult { + message_assembly_inbound_world.send_first_frame(key, &body) +} + +#[when( + "a continuation frame for key {key:u64} sequence {sequence:u32} with body {body:string} \ + arrives" +)] +fn when_continuation_frame_arrives( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + key: u64, + sequence: u32, + body: String, +) -> TestResult { + message_assembly_inbound_world.send_continuation_frame(key, sequence, &body) +} + +#[when( + "a final continuation frame for key {key:u64} sequence {sequence:u32} with body {body:string} \ + arrives" +)] +fn when_final_continuation_frame_arrives( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + key: u64, + sequence: u32, + body: String, +) -> TestResult { + message_assembly_inbound_world.send_final_continuation_frame(key, sequence, &body) +} + +#[when("time advances by {millis:u64} milliseconds")] +fn when_time_advances( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + millis: u64, +) -> TestResult { + message_assembly_inbound_world.wait_millis(millis) +} + +#[then("the handler eventually receives payload {payload:string}")] +fn then_handler_receives_payload( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + payload: String, +) -> TestResult { + message_assembly_inbound_world.assert_received_payload(&payload) +} + +#[then("the handler receives {count:usize} payloads")] +fn then_handler_receives_count( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, + count: usize, +) -> TestResult { + message_assembly_inbound_world.assert_received_count(count) +} + +#[then("no send error is recorded")] +fn then_no_send_error( + message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, +) -> TestResult { + message_assembly_inbound_world.assert_no_send_error() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index a6f4f598..8b9a8f76 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -12,6 +12,7 @@ mod codec_stateful_steps; mod correlation_steps; mod fragment_steps; mod message_assembler_steps; +mod message_assembly_inbound_steps; mod message_assembly_steps; mod multi_packet_steps; mod panic_steps;