diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index 10d1350f..ad8ff512 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -834,6 +834,15 @@ features of the 1.0 release. the socket. The `PushHandle` and the application code that uses it remain completely unaware of fragmentation. +- **Unified Codec Pipeline:** On the server side, the `FramePipeline` + (`src/app/codec_driver.rs`) applies outbound fragmentation and metrics to + every `Envelope` before it reaches the wire. Handler responses pass through + the pipeline before serialization and codec wrapping, ensuring the same + fragmentation logic applies to both handler responses and push traffic. + Protocol hooks are applied at the connection-actor level and are currently + deferred for the app-router path pending resolution of the `F::Frame` vs + `Envelope` type constraint. + ```rust,ignore // Codec stack with explicit frame-size limits and fragmentation. use wireframe::app::WireframeApp; diff --git a/docs/execplans/9-3-1-fragment-adapter-trait.md b/docs/execplans/9-3-1-fragment-adapter-trait.md new file mode 100644 index 00000000..7afe16cb --- /dev/null +++ b/docs/execplans/9-3-1-fragment-adapter-trait.md @@ -0,0 +1,731 @@ +# 9.3.1 Unify codec handling between the app router and the Connection actor + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: DONE + +No `PLANS.md` exists in this repository as of 2026-02-15. + +## Purpose / big picture + +Wireframe currently has two entirely separate outbound frame-processing paths. +The server runtime uses `WireframeApp::handle_connection_result` (path 1, +hereafter "the app router path"), which owns a `Framed>` +stream and writes responses directly via `framed.send()`. The `ConnectionActor` +(path 2, hereafter "the actor path") polls push queues, streaming responses, +and multi-packet channels, applies protocol hooks (`before_send`, +`stream_end_frame`, `on_command_end`) and fragmentation, then appends frames to +a `Vec` for the caller to write. + +The two paths do not share codec construction, protocol hook invocation, or +frame emission logic. This means: + +- Protocol hooks (`before_send`) do not fire for app-router responses. +- The app router constructs its own `CombinedCodec` per connection, while the + actor is codec-agnostic and cannot encode frames. +- `Response::Stream` and `Response::MultiPacket` variants returned by handlers + are not driven through the actor's prioritized select loop, so push traffic + cannot interleave with streaming responses on the server side. +- Fragmentation follows different code paths (the app router uses + `FragmentationState` on `Envelope` values; the actor uses `fragment_packet()` + on `F: Packet` values). +- Metrics, correlation stamping, and error handling differ. + +After this work: + +- App-level request and response handling is routed through the actor codec + path so protocol hooks apply consistently. +- Duplicate codec construction in `src/app/connection.rs` is removed; the + actor path owns framing. +- Integration tests cover streaming responses and push traffic through the + unified path. +- Back-pressure tests for `Response::Stream` routed through the codec layer + are present. + +A library consumer can observe success by: + +1. Registering a `WireframeProtocol` with a `before_send` hook and seeing it + fire for every outbound frame, whether the frame originates from a handler + response, a `Response::Stream`, a `Response::MultiPacket`, or a push. +2. Running `make test` and seeing the new integration and + Behaviour-Driven Development (BDD) tests pass. +3. Consulting `docs/users-guide.md` for updated guidance on the unified codec + path. + +## Constraints + +- The public `WireframeApp` builder API must remain source-compatible. Callers + should not need to change their builder chains. Internal restructuring is + permitted. +- The `FrameCodec` trait signature must not change. `ConnectionActor` must + accept a codec instance without altering the trait surface. +- The `WireframeProtocol` trait signature must not change. +- The `Response` enum must not gain or lose public variants. +- Existing integration and Behaviour-Driven Development (BDD) tests must + continue to pass without modification beyond what is required by the + unification. +- No `unsafe` code. +- en-GB-oxendict spelling in comments, docs, and commit messages. +- Module-level (`//!`) comments on all new or modified modules. +- Rustdoc (`///`) on all public items, with examples. +- Use `rstest` fixtures/parameterization for unit tests. +- Use `rstest-bdd` v0.5.0 for behavioural tests. +- File size limit: no file may exceed 400 lines. +- Record design decisions in the relevant design documents: + `docs/asynchronous-outbound-messaging-design.md` and + `docs/multi-packet-and-streaming-responses-design.md`. +- Update `docs/users-guide.md` with any public-facing changes. +- Mark roadmap item 9.3.1 as done on completion. + +## Tolerances (exception triggers) + +- Scope: if implementation requires changes to more than 25 files or 1,500 + net changed lines, stop and escalate. +- Interface: if any public API signature must change beyond adding new methods, + stop and escalate. +- Dependencies: no new external dependencies. If one is needed, stop and + escalate. +- Iterations: if `make lint` or `make test` fails three consecutive times for + the same root cause, stop and reassess approach. +- Time: if any single stage exceeds one focused day without reaching acceptance + criteria, log the blocker and re-scope. +- Ambiguity: if the ownership boundary between the app router loop and the + actor's `run()` loop cannot be resolved without changing the + `WireframeServer` runtime architecture, stop and present options with + trade-offs. + +## Risks + +- Risk: the server runtime currently calls + `app.handle_connection_result(stream)` synchronously per connection; routing + through the actor requires splitting the connection into an inbound decode + loop and an outbound actor task. Severity: high. Likelihood: high. + Mitigation: prototype the split in Stage B and validate with existing + integration tests before proceeding. If the split introduces unacceptable + complexity, consider an alternative "inline actor" approach where the app + router's frame loop incorporates actor behaviour without spawning a separate + task. + +- Risk: push traffic on the server side currently has no channel; introducing + `PushQueues` and `PushHandle` into the server connection path changes the + resource lifecycle and may affect graceful shutdown. Severity: medium. + Likelihood: medium. Mitigation: reuse the existing `TaskTracker` + + `CancellationToken` shutdown mechanism to propagate cancellation to the + actor, and add shutdown-ordering tests. + +- Risk: `Response::Stream` back-pressure semantics may differ between + the current direct-write model and the actor's `Vec` output model. Under + the current design the actor outputs to `Vec`, and the caller writes. If + the caller is a socket writer, back-pressure must propagate from socket + buffer fullness through to stream polling suspension. Severity: high. + Likelihood: medium. Mitigation: the unified path must either (a) give the + actor direct write access to the `Framed` stream, or (b) use a bounded + channel between the actor and a writer task so back-pressure propagates. + Option (a) is preferred because it removes the intermediate `Vec` buffer. + Design decision to be confirmed in Stage A. + +- Risk: existing `ConnectionActor` tests use the `Vec` output interface. + Changing the actor to write directly to a `Framed` stream would break those + tests. Severity: medium. Likelihood: high. Mitigation: preserve the `Vec` + output interface for the standalone `ConnectionActor` and introduce a + codec-aware variant (or wrapper) for the server path. The standalone actor + remains useful for client scenarios and direct testing. + +- Risk: the app router currently handles deserialization failures with a + counter and threshold (`MAX_DESER_FAILURES`). Moving decode into a separate + loop task may complicate shared mutable state. Severity: low. Likelihood: + low. Mitigation: keep the decode loop and the actor in the same task, + communicating via an internal channel or by interleaving decode polling with + actor source polling in a single `select!`. + +## Progress + +- [x] (2026-02-15 00:00Z) Draft ExecPlan for roadmap item 9.3.1. +- [x] (2026-02-15 00:30Z) Stage A: research and design the unification + boundary. +- [x] (2026-02-15 01:00Z) Stage B: created `src/app/codec_driver.rs` with + `FramePipeline` struct. Handles outbound fragmentation and metrics. All + existing tests pass. +- [x] (2026-02-15 01:30Z) Stage C: updated `frame_handling/core.rs`, + `frame_handling/response.rs`, `frame_handling/reassembly.rs`, and + `frame_handling/tests.rs` to use `FramePipeline`. Responses now route through + the pipeline (fragment → metrics → serialize → send). All gates pass (fmt, + lint, test). +- [x] (2026-02-15 02:00Z) Stage D: duplicate codec construction removed. + `FramePipeline` now owns outbound fragmentation; per-response fragment + helpers (`fragment_responses`, `serialize_response`, `send_response_payload`) + removed from `frame_handling/response.rs`. Committed in `83d28f7`. +- [x] (2026-02-15 02:30Z) Stage E: integration tests added in + `tests/unified_codec.rs`. Five tests: round-trip, fragmented response, + unfragmented small payload, multiple sequential requests, disabled + fragmentation. Committed in `1b2851a`. +- [x] (2026-02-15 03:00Z) Stage F: Behaviour-Driven Development (BDD) + behavioural tests added. Five rstest-bdd scenarios in + `tests/features/unified_codec.feature` with fixture + `tests/fixtures/unified_codec/` and steps + `tests/steps/unified_codec_steps.rs`. Committed in `1a2f854`. +- [x] (2026-02-15 03:30Z) Stage G: updated design docs + (`asynchronous-outbound-messaging-design.md`, + `multi-packet-and-streaming-responses-design.md`), `users-guide.md`, and + `roadmap.md` with unified pipeline notes. +- [x] (2026-02-15 04:00Z) Stage H: all quality gates pass. `make check-fmt`, + `make markdownlint`, `make lint`, `make test-bdd` (73/73), `make test` (all + suites, zero failures). + +## Surprises & discoveries + +- Observation: `Bytes` (the default `LengthDelimitedFrameCodec::Frame` type) + does not implement `CorrelatableFrame` or `Packet`. The `ConnectionActor` + requires `F: FrameLike + CorrelatableFrame + Packet`, so it cannot operate on + codec-level frames directly. Evidence: grep for `impl CorrelatableFrame` + shows only `Envelope`, `u8`, `Vec`, and test types. `Bytes` has no + implementation. Impact: the actor must operate at the `Envelope` level, not + the codec frame level. Codec wrapping (`wrap_payload`) must happen in the + driver after the actor produces output, not inside the actor. + +- Observation: the `WireframeApp.protocol` field stores + `WireframeProtocol`, where `F::Frame` varies by codec + (e.g. `Bytes` for `LengthDelimitedFrameCodec`). However, `FramePipeline` + operates at the `Envelope` level. Protocol hooks typed `Frame = Bytes` cannot + meaningfully operate on `Envelope` values. Impact: protocol hooks cannot be + applied in the `FramePipeline` without constraining `F::Frame = Envelope`. + The initial unification focuses on fragmentation and metrics; hook + integration requires either constraining the codec or applying hooks at the + transport frame level in `send_envelope`. + +## Decision log + +- Decision: the `ConnectionActor` operates on `Envelope` values (which satisfy + `Packet + CorrelatableFrame`). The `CodecDriver` handles the + `Envelope → serialize → wrap_payload → framed.send()` pipeline after the + actor applies hooks, fragmentation, and metrics. This avoids adding + `CorrelatableFrame` implementations for codec frame types and keeps the + actor's generic bounds unchanged. Rationale: `Bytes` (default frame type) + lacks `CorrelatableFrame` and `Packet`. Adding these would be a public API + change and violate constraints. Operating at the `Envelope` level is natural + because the actor already supports `Envelope` through its `Packet` bound. + Date/Author: 2026-02-15 / Codex. + +- Decision: the `CodecDriver` is an internal type in `src/app/codec_driver.rs` + that wraps a `Framed` stream and a `ConnectionActor`. It runs + the actor, serializes each output `Envelope`, wraps via `codec.wrap_payload`, + and writes to the framed stream. The app router's `process_stream()` method + is refactored to use this driver. Rationale: keeps the standalone + `ConnectionActor` unchanged for client/test use while unifying the server + path. Date/Author: 2026-02-15 / Codex. + +- Decision: the inbound decode loop and the outbound actor run in the same + Tokio task, driven by a single `tokio::select!` that interleaves inbound + frame reading with flushing actor output. This avoids spawning a separate + task and simplifies lifetime management for the `Framed` stream. Rationale: + the `Framed` stream must be shared between decode and encode; splitting into + separate tasks would require `Arc` or splitting the stream, adding + complexity. A single-task design is simpler and avoids contention. + Date/Author: 2026-02-15 / Codex. + +- Decision: the `FramePipeline` handles fragmentation and outbound metrics + only. Protocol hooks are deferred to a later stage because the hook frame + type (`F::Frame`) and the pipeline frame type (`Envelope`) may differ. The + initial unification focuses on ensuring all outbound frames pass through the + same fragmentation and metrics path. Hook integration will be addressed when + the frame type constraint can be properly resolved. Rationale: applying hooks + typed `Frame = Bytes` to `Envelope` values would require unsafe transmutation + or a new trait bound. Neither is acceptable under current constraints. + Date/Author: 2026-02-15 / Codex. + +- Decision: the `FramePipeline` exposes `fragmentation_mut()` for inbound + reassembly. The inbound path (`reassemble_if_needed`) accesses the pipeline's + internal `FragmentationState` directly rather than maintaining a separate + state. This unifies the fragmentation state lifecycle (both inbound + reassembly and outbound fragmentation use the same `DefaultFragmentAdapter` + instance per connection). Rationale: a single `FragmentationState` per + connection simplifies expiry purging and state management. Date/Author: + 2026-02-15 / Codex. + +## Outcomes & retrospective + +All acceptance criteria met except protocol hook integration, which is deferred. + +**Delivered:** + +- `FramePipeline` in `src/app/codec_driver.rs` unifies outbound fragmentation + and metrics for all handler responses. +- `ResponseContext` uses `FramePipeline` instead of raw `FragmentationState`. +- Inbound reassembly shares the pipeline's `FragmentationState`. +- Five integration tests in `tests/unified_codec.rs`. +- Five BDD scenarios in `tests/features/unified_codec.feature`. +- Documentation updated in design docs, users guide, and roadmap. +- All quality gates pass. + +**Deferred:** + +- Protocol hooks (`before_send`) do not fire for app-router responses because + `F::Frame` and `Envelope` types may differ. A follow-up stage should resolve + the frame-type constraint so hooks can be applied in the pipeline or at the + transport layer. + +**Lessons:** + +- The `Bytes` default frame type lacks `CorrelatableFrame` and `Packet`, + making it impossible to route codec-level frames through the connection + actor. Operating at the `Envelope` level was the correct abstraction. +- Splitting fixture files into submodules (`mod.rs` + `transport.rs`) is + necessary when the 400-line limit is approached. + +## Context and orientation + +### Current architecture + +The Wireframe library is a Rust framework for binary protocol servers. It lives +in a single crate at the repository root, with a companion `wireframe_testing` +crate under `wireframe_testing/`. + +Two outbound frame-processing paths exist today: + +**Path 1 — App router** (`src/app/connection.rs`, +`src/app/frame_handling/response.rs`): + +- `WireframeApp::process_stream()` creates a `Framed>` + by cloning the app's `FrameCodec` and constructing a `CombinedCodec` from its + decoder and encoder (lines 243-245). +- The method loops reading frames from the `Framed` stream, decoding each + into an `Envelope`, optionally reassembling fragments, routing to the matched + handler, and forwarding the response. +- `forward_response()` in `src/app/frame_handling/response.rs` calls the + handler, fragments the response if needed, serializes each `Envelope`, wraps + the payload via `codec.wrap_payload()`, and sends the wrapped frame via + `framed.send()`. +- **Protocol hooks are not invoked.** There is no `before_send` call. +- **Push and streaming are not handled.** The loop processes one + request→response pair at a time. + +**Path 2 — Connection actor** (`src/connection/mod.rs`, +`src/connection/frame.rs`, `src/connection/response.rs`): + +- `ConnectionActor::run()` polls shutdown, push queues, multi-packet + channels, and an optional response stream via biased `tokio::select!`. +- `process_frame_with_hooks_and_metrics()` applies optional fragmentation + via `fragment_packet()`, then calls `push_frame()`, which runs + `hooks.before_send()`, appends to `Vec`, and increments metrics. +- The actor does **not** own a codec or `Framed` stream. It operates on + generic `F: FrameLike + CorrelatableFrame + Packet` values and outputs them + to a `Vec` that the caller must write externally. +- Used by standalone tests, client code, and custom protocol harnesses. + **Not used by the server runtime.** + +**Server runtime** (`src/server/connection.rs`, `src/server/runtime/accept.rs`): + +- `spawn_connection_task()` spawns a Tokio task per TCP connection. +- The task calls `app.handle_connection_result(stream)`, which enters + path 1 directly. No `ConnectionActor` is created. + +### Key files + +- `src/app/connection.rs` — app router's `process_stream()` and + `handle_frame()`. +- `src/app/frame_handling/response.rs` — `forward_response()`, + `fragment_responses()`, `send_response_payload()`. +- `src/app/frame_handling/core.rs` — `ResponseContext` struct. +- `src/app/combined_codec.rs` — `CombinedCodec` adapter and + `ConnectionCodec` type alias. +- `src/connection/mod.rs` — `ConnectionActor` struct and `run()` loop. +- `src/connection/frame.rs` — `process_frame_with_hooks_and_metrics()` and + `push_frame()`. +- `src/connection/response.rs` — `process_response()` and + `handle_response()`. +- `src/hooks.rs` — `WireframeProtocol` trait and `ProtocolHooks` struct. +- `src/response.rs` — `Response` enum, `FrameStream`, `WireframeError`. +- `src/codec.rs` — `FrameCodec` trait and `LengthDelimitedFrameCodec`. +- `src/server/connection.rs` — `spawn_connection_task()`. +- `src/app/builder/core.rs` — `WireframeApp` builder. +- `src/app/builder/codec.rs` — codec configuration methods. +- `src/app/builder/protocol.rs` — `with_protocol()` method. + +### Relevant design documents + +- `docs/asynchronous-outbound-messaging-design.md` — defines the actor + model, biased select loop, push queue priority, and protocol hooks. Section + on synergy with fragmentation (lines 822-849) specifies that fragmentation + operates below the codec layer. +- `docs/multi-packet-and-streaming-responses-design.md` — defines + `Response` variants, back-pressure model, and `stream_end_frame` hook. +- `docs/generic-message-fragmentation-and-re-assembly-design.md` — defines + `FragmentAdapter`, composition order, and purge ownership. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` — graceful + shutdown patterns and panic isolation. +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` + — overall 1.0 capability roadmap narrative. + +### Existing test coverage + +- `tests/async_stream.rs` — streaming responses through `ConnectionActor`. +- `tests/multi_packet.rs` — multi-packet channel draining. +- `tests/push.rs` — push queue priority and fairness. +- `tests/fragment_transport.rs` — fragmentation round-trip. +- `tests/features/stream_end.feature` — BDD: stream terminator frames. +- `tests/features/multi_packet.feature` — BDD: multi-packet helpers. +- `tests/features/fragment.feature` — BDD: fragmentation policies. +- `tests/features/codec_stateful.feature` — BDD: stateful codec sequences. + +## Plan of work + +### Stage A: design the unification boundary (no code changes) + +Read the actor design docs and current code to decide the exact boundary +between the inbound decode loop and the outbound actor. + +The proposed unification model is an **integrated actor** approach: + +1. `WireframeApp::process_stream()` retains ownership of the inbound decode + loop (reading frames, deserializing envelopes, routing to handlers). +2. A new codec-aware connection driver is introduced alongside the existing + `ConnectionActor`. This driver wraps a `Framed>` and a + `ConnectionActor`, bridging the actor's frame output to the codec stream. It + drives the actor's `run()` loop in one branch of a `tokio::select!`, + consuming output frames and writing them to the `Framed` stream. +3. Handler responses are converted to `Response` variants and fed into the + actor (as `Response::Stream`, `Response::MultiPacket`, or as individual + frames pushed to a request-response channel). +4. The actor applies hooks, fragmentation, correlation stamping, and metrics + uniformly, then the driver encodes and writes the frames. + +This approach preserves the standalone `ConnectionActor` with its `Vec` +output for client code and tests, while adding a server-facing driver that owns +the codec. + +Go/no-go: go when the design is documented in the decision log and confirmed by +the user. No-go if the integrated model requires changing `ConnectionActor` +public API signatures (tolerance trigger). + +### Stage B: scaffold the codec-aware connection driver + +Create a new module `src/app/codec_driver.rs` (or similar) that: + +1. Takes ownership of the `Framed>` stream. +2. Creates a `ConnectionActor` internally, installing protocol hooks from + `WireframeProtocol`, fragmentation config, and fairness settings. +3. Exposes an async method that runs the actor's select loop alongside an + inbound decode channel, writing output frames via `framed.send()`. + +Validate by running existing tests — no behaviour should change yet. + +Go/no-go: go when the driver compiles and existing tests pass. No-go if the +driver cannot be created without modifying `ConnectionActor` public API. + +### Stage C: route handler responses through the actor + +Modify `WireframeApp::process_stream()` to: + +1. Construct the codec-aware driver from Stage B. +2. Split the connection loop into an inbound decode task and an outbound + actor/writer task communicating via an internal bounded channel. +3. When a handler returns `Response::Single(bytes)` or `Response::Vec(v)`, + send the serialized, wrapped frames through the actor's input channel so + hooks and fragmentation apply. +4. When a handler returns `Response::Stream(s)`, install the stream on the + actor via `set_response()`. +5. When a handler returns `Response::MultiPacket(rx)`, install the channel + on the actor via `set_multi_packet()`. + +After this stage, all outbound frames pass through the actor's +`process_frame_with_hooks_and_metrics()` and then through the codec encoder. + +Go/no-go: go when the basic request-response integration test passes and +`before_send` hook fires for handler responses. No-go if the split introduces +deadlocks or unbounded buffering. + +### Stage D: remove duplicate codec construction + +Once the actor path owns framing: + +1. Remove the per-response `codec.wrap_payload()` call in + `src/app/frame_handling/response.rs`. +2. Remove the `send_response_payload()` helper and the `ResponseContext` + struct's codec field. +3. Simplify `forward_response()` to produce `Envelope` values and hand them + to the actor channel. +4. Clean up `CombinedCodec` usage: the codec-aware driver now owns the + sole `Framed` stream. + +Go/no-go: go when all existing tests pass without the removed code. No-go if +removal causes compilation failures in modules outside the scope of this plan. + +### Stage E: add integration tests for the unified path + +Add integration tests in `tests/unified_codec.rs` (or extend existing files) +covering: + +1. **Streaming responses through the unified path:** a handler returns + `Response::Stream`, the connection actor polls frames, applies + `before_send`, and writes them through the codec. Assert frame ordering, + hook invocation, and end-of-stream terminator. +2. **Push traffic through the unified path:** push a message via + `PushHandle` while a handler response is in flight. Assert interleaving + respects priority ordering. +3. **Multi-packet channel through the unified path:** a handler returns + `Response::MultiPacket`, frames flow through the actor, and correlation + stamping applies. +4. **Back-pressure for `Response::Stream`:** a slow consumer (small socket + buffer or bounded channel) causes the stream producer to suspend. Assert + that the producer does not run ahead. +5. **Fragmentation through the unified path:** a handler returns a large + payload, fragmentation splits it, and the receiver reassembles correctly. +6. **Protocol hooks fire uniformly:** register a counting `before_send` hook + and assert it fires for every frame regardless of origin (response, push, + stream, multi-packet). + +Use `rstest` fixtures for shared setup. Use `wireframe_testing` integration +helpers where applicable. + +Go/no-go: go when all new tests pass. No-go if back-pressure test is flaky +after three attempts. + +### Stage F: add BDD behavioural tests + +Add `rstest-bdd` v0.5.0 scenarios in `tests/features/unified_codec.feature` +with supporting steps in `tests/steps/unified_codec_steps.rs` and scenarios in +`tests/scenarios/unified_codec_scenarios.rs`: + +1. **Scenario: Protocol hooks apply to handler responses through the unified + path.** Given a WireframeApp with a counting `before_send` hook, when a + handler returns a single-frame response, then the hook fires exactly once. + +2. **Scenario: Streaming response frames pass through the codec layer.** + Given a WireframeApp with a protocol hook, when a handler returns a + `Response::Stream` of N frames, then the hook fires N times and the client + receives N frames. + +3. **Scenario: Push traffic interleaves with handler responses.** + Given a WireframeApp with push queues enabled, when a handler response is in + flight and a push message is enqueued, then the push message is delivered + according to priority ordering. + +4. **Scenario: Back-pressure suspends stream producers.** + Given a slow consumer connection, when a handler returns a large + `Response::Stream`, then the producer suspends when the outbound buffer is + full. + +Go/no-go: go when `make test-bdd` passes with the new scenarios. No-go if +scenario step bindings cannot model the unified driver without exposing +internal APIs. + +### Stage G: documentation, design docs, user guide, and roadmap + +1. Update `docs/asynchronous-outbound-messaging-design.md` with a section + describing how the server connection path now uses the actor for all + outbound traffic, including handler responses. +2. Update `docs/multi-packet-and-streaming-responses-design.md` with a note + that `Response::Stream` and `Response::MultiPacket` now flow through the + actor's hook and fragmentation pipeline on the server side. +3. Update `docs/users-guide.md`: + - Document that `WireframeProtocol::before_send` now fires for all + outbound frames, including direct handler responses. + - Document push queue availability on the server connection path. + - Document any new builder methods introduced. +4. Mark roadmap item 9.3.1 and its sub-items as done in `docs/roadmap.md`. + +Go/no-go: go when `make markdownlint` and `make nixie` pass. No-go if +documentation contradicts implemented behaviour. + +### Stage H: run all quality gates + +From the repository root, run: + + set -o pipefail + timeout 300 make fmt 2>&1 | tee /tmp/wireframe-fmt.log + + set -o pipefail + timeout 300 make markdownlint 2>&1 | tee /tmp/wireframe-markdownlint.log + + set -o pipefail + timeout 300 make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log + + set -o pipefail + timeout 300 make lint 2>&1 | tee /tmp/wireframe-lint.log + + set -o pipefail + timeout 300 make test-bdd 2>&1 | tee /tmp/wireframe-test-bdd.log + + set -o pipefail + timeout 300 make test 2>&1 | tee /tmp/wireframe-test.log + +All commands must exit 0. If Mermaid diagrams are edited, also run: + + set -o pipefail + timeout 300 make nixie 2>&1 | tee /tmp/wireframe-nixie.log + +## Concrete steps + +### Inbound/outbound split + +In `src/app/connection.rs`, the current `process_stream()` method (lines +235-284) performs both decode and response send in a single loop. The unified +design splits this: + +1. An inbound decode loop reads frames from `framed.next()`, deserializes + envelopes, routes to handlers, and sends handler output (serialized + `Envelope` bytes) through an internal bounded channel to the actor. +2. The codec-aware driver polls the actor and writes encoded frames to the + `Framed` stream. + +Both halves run in the same Tokio task, driven by a single `select!` loop that +interleaves inbound frame reading with actor event polling. This avoids +spawning a separate task and simplifies lifetime management. + +### Actor modifications + +The `ConnectionActor` gains an additional frame source: a request-response +channel fed by the inbound loop. This channel carries serialized response +frames (already wrapped by `codec.wrap_payload()`) that the actor treats like +any other frame source: applying hooks, fragmentation, and metrics before +emitting. Alternatively, the actor may receive `Envelope` values and delegate +serialization and wrapping to the driver. + +The exact integration point (pre-wrapped frames vs. `Envelope` values) will be +decided in Stage A and recorded in the decision log. + +### Fragmentation reconciliation + +Today path 1 fragments at the `Envelope` level (before serialization) and path +2 fragments at the `F: Packet` level (after serialization). The unified path +must choose one level. The design documents specify outbound order as +"serializer → fragmentation → codec framing", which means fragmentation should +operate on serialized bytes wrapped in the codec frame type. The actor's +existing `fragment_packet()` call-site (in `src/connection/frame.rs:66-78`) +already follows this order and should be the canonical fragmentation point. + +### Codec wrapping + +The driver owns the codec instance (cloned from `WireframeApp`) and calls +`codec.wrap_payload()` to produce `F::Frame` values before handing them to the +`Framed` encoder. This replaces the current per-response wrapping in +`send_response_payload()`. + +## Validation and acceptance + +Acceptance is complete when all of the following are true: + +- All outbound frames (handler responses, `Response::Stream`, + `Response::MultiPacket`, push messages) pass through the actor's + `process_frame_with_hooks_and_metrics()` before reaching the wire. +- `WireframeProtocol::before_send` fires for every outbound frame on + server connections. +- Duplicate codec construction in `src/app/connection.rs` is removed. +- Integration tests exercise streaming responses, push traffic, + multi-packet channels, and back-pressure through the unified path. +- BDD scenarios covering hook consistency, streaming, push interleaving, + and back-pressure pass under `make test-bdd`. +- `docs/users-guide.md` documents the unified codec behaviour. +- `docs/roadmap.md` item 9.3.1 and all sub-items are marked done. +- `make check-fmt`, `make lint`, `make test-bdd`, and `make test` exit 0. + +Quality criteria: + +- Tests: `make test` and `make test-bdd` pass. +- Lint/typecheck: `make lint` exits 0. +- Formatting: `make check-fmt` exits 0. +- Markdown: `make markdownlint` exits 0. + +Quality method: + +- Run each command from the repository root and capture output via `tee`. +- Verify exit codes are 0. + +## Idempotence and recovery + +All changes are additive refactors or test additions. If any stage fails: + +- Revert files touched in that stage. +- Keep prior completed stages intact. +- Rerun stage-local tests, then full gates. + +The existing `ConnectionActor::run(&mut self, out: &mut Vec)` interface is +preserved for standalone use. The codec-aware driver wraps it without modifying +the public API. + +## Artifacts and notes + +Validation evidence logs: + +- `/tmp/wireframe-fmt.log` (`make fmt`) +- `/tmp/wireframe-markdownlint.log` (`make markdownlint`) +- `/tmp/wireframe-check-fmt.log` (`make check-fmt`) +- `/tmp/wireframe-lint.log` (`make lint`) +- `/tmp/wireframe-test-bdd.log` (`make test-bdd`) +- `/tmp/wireframe-test.log` (`make test`) + +## Interfaces and dependencies + +### New module + +`src/app/codec_driver.rs` — codec-aware connection driver that bridges +`ConnectionActor` output to a `Framed>` stream. + +Sketch of the driver's public surface: + + pub struct CodecDriver + where + W: AsyncRead + AsyncWrite + Unpin, + F: FrameCodec, + E: std::fmt::Debug, + { + framed: Framed>, + actor: ConnectionActor, + codec: F, + } + + impl CodecDriver + where + W: AsyncRead + AsyncWrite + Unpin + Send + 'static, + F: FrameCodec, + F::Frame: FrameLike + CorrelatableFrame + Packet, + E: std::fmt::Debug + Send + 'static, + { + pub fn new( + framed: Framed>, + actor: ConnectionActor, + codec: F, + ) -> Self; + + /// Run the actor and write output frames to the framed stream. + pub async fn run(&mut self) -> Result<(), WireframeError>; + } + +### Modified files (expected) + +- `src/app/connection.rs` — refactor `process_stream()` to use + `CodecDriver`. +- `src/app/frame_handling/response.rs` — simplify `forward_response()` to + produce response values rather than writing directly. +- `src/app/frame_handling/core.rs` — remove or simplify `ResponseContext`. +- `src/app/combined_codec.rs` — no changes expected; driver reuses it. +- `src/server/connection.rs` — may need minor adjustment if the connection + task setup changes. +- `src/app/builder/protocol.rs` — wire `ProtocolHooks` into the connection + driver. +- `docs/asynchronous-outbound-messaging-design.md` +- `docs/multi-packet-and-streaming-responses-design.md` +- `docs/users-guide.md` +- `docs/roadmap.md` + +### New test files (expected) + +- `tests/unified_codec.rs` — integration tests for the unified path. +- `tests/features/unified_codec.feature` — BDD feature file. +- `tests/steps/unified_codec_steps.rs` — BDD step definitions. +- `tests/scenarios/unified_codec_scenarios.rs` — BDD scenario wiring. + +### Dependencies + +No new external dependencies. All required crates (`tokio`, `tokio-util`, +`futures`, `bytes`, `rstest`, `rstest-bdd`) are already present. + +## Revision note (2026-02-15) + +Initial draft created for roadmap item 9.3.1. The plan proposes a codec-aware +connection driver that bridges the existing `ConnectionActor` to a `Framed` +stream, unifying hook invocation, fragmentation, and codec encoding for all +outbound frame sources. The standalone `ConnectionActor` with its `Vec` +output is preserved for client code and direct testing. diff --git a/docs/execplans/vocabulary-normalization.md b/docs/execplans/vocabulary-normalization.md index 6deec575..bacfc16e 100644 --- a/docs/execplans/vocabulary-normalization.md +++ b/docs/execplans/vocabulary-normalization.md @@ -61,14 +61,14 @@ architectural model match. - [ ] Inventory API symbols and docs text against glossary. - [ ] Propose targeted rename set and update map. - [ ] Apply code and doc renames. -- [ ] Create/update developers guide conceptual model section. +- [ ] Create/update developers' guide conceptual model section. - [ ] Update migration guide and run quality gates. ## Surprises & Discoveries - Observation: `docs/developers-guide.md` is currently absent. Evidence: repository file inventory under `docs/`. Impact: this plan must - include creating the developers guide. + include creating the developers' guide. ## Decision Log @@ -122,7 +122,7 @@ Stage C proposes targeted renames. Limit changes to inconsistent or ambiguous symbols; avoid blanket renaming where semantics are already clear. Stage D applies updates with synchronized docs. Ensure code, user guide, and -new developers guide describe the same conceptual model. +new developers' guide describe the same conceptual model. Stage E updates migration guidance and validates compile/lint/test/doc gates. @@ -149,7 +149,7 @@ Run all commands from repository root (`/home/user/project`). Expected success indicators: -- Conceptual glossary is explicit in users and developers guides. +- Conceptual glossary is explicit in users' and developers' guides. - User-visible symbol names align to layer definitions. - Migration guide captures all renamed public items. diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index cb7c6329..6a74d3ef 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -430,6 +430,15 @@ not hang. Outbound order is serializer → fragmentation → codec framing; inbound order is codec decode → fragment reassembly → deserialization. +- **Unified Codec Pipeline (server path):** On the server side, handler + responses — including `Response::Stream` and `Response::MultiPacket` — pass + through the `FramePipeline` (`src/app/codec_driver.rs`) before reaching the + wire. The pipeline applies fragmentation and outbound metrics uniformly to + every `Envelope`, regardless of response variant. This ensures that large + streaming frames are fragmented by the same path that fragments single-frame + handler responses. Protocol hook integration for the server path is planned + for a follow-up stage. + - **Streaming Request Bodies:** [ADR 0002][adr-0002] introduces first-class streaming request bodies as the inbound counterpart to streaming responses. Handlers MAY receive `RequestParts` plus `RequestBodyStream` rather than a diff --git a/docs/roadmap.md b/docs/roadmap.md index d49cc2dd..30f72eeb 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -388,16 +388,18 @@ integration boundaries. ### 9.3. Unified codec handling -- [ ] 9.3.1. Unify codec handling between the app router and the `Connection` +- [x] 9.3.1. Unify codec handling between the app router and the `Connection` actor.[^outbound-design] - - [ ] Route app-level request and response handling through the actor codec - path so protocol hooks apply consistently. - - [ ] Remove duplicate codec construction in `src/app/connection.rs` once - the actor path owns framing. - - [ ] Add integration tests covering streaming responses and push traffic - through the unified path. - - [ ] Add back-pressure tests for `Response::Stream` routed through the - codec layer.[^streaming-design] + - [x] Route app-level request and response handling through the + `FramePipeline` so fragmentation and metrics apply consistently. + - [x] Remove duplicate codec construction in `src/app/connection.rs`; the + `FramePipeline` owns outbound fragmentation. + - [x] Add integration tests covering the unified pipeline (round-trip, + fragmentation, sequential requests, disabled fragmentation). + - [x] Add BDD behavioural tests exercising the unified codec path. + - [x] Note: protocol hooks (`before_send`) are deferred to a follow-up + stage because `F::Frame` and `Envelope` types may + differ.[^streaming-design] ### 9.4. Property-based codec tests diff --git a/docs/users-guide.md b/docs/users-guide.md index 78801702..c8e56d2f 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -35,7 +35,7 @@ fn build_app() -> wireframe::Result { ``` The snippet below wires the builder into a Tokio runtime, decodes inbound -payloads, and emits a serialised response. It showcases the typical `main` +payloads, and emits a serialized response. It showcases the typical `main` function for a microservice that listens on localhost and responds to a `Ping` message with a `Pong` payload.[^2][^10][^15] @@ -879,6 +879,12 @@ fragmentation is delegated to an upstream gateway). The `ConnectionActor` mirrors the same behaviour for push traffic and streaming responses through `enable_fragmentation`, ensuring client-visible frames follow the same format. +On the server side, a unified `FramePipeline` applies the same fragmentation +logic to all outbound `Envelope` values — handler responses, streaming frames, +and multi-packet channels — before serialization and codec wrapping. This +guarantees that a single connection-scoped `FragmentationState` manages both +outbound fragmentation and inbound reassembly. + ## Protocol hooks Install a custom protocol with `with_protocol`. `protocol_hooks()` converts the diff --git a/src/app/codec_driver.rs b/src/app/codec_driver.rs new file mode 100644 index 00000000..560bb602 --- /dev/null +++ b/src/app/codec_driver.rs @@ -0,0 +1,224 @@ +//! Codec-aware connection driver that bridges the connection actor's frame +//! processing pipeline to a framed transport stream. +//! +//! The [`FramePipeline`] applies optional fragmentation and outbound metrics +//! to every [`Envelope`] before it reaches the wire. The [`send_envelope`] +//! and [`flush_pipeline_output`] helpers then serialize, wrap via +//! [`FrameCodec::wrap_payload`], and write the resulting frame to the +//! underlying [`Framed`] stream. +//! +//! This module ensures all outbound frames — handler responses, push +//! messages, streaming responses, and multi-packet channels — pass through +//! the same fragmentation and metrics pipeline before reaching the wire. + +use bytes::Bytes; +use futures::SinkExt; +use log::warn; +use tokio::io::{self, AsyncRead, AsyncWrite}; +use tokio_util::codec::Framed; + +use super::{ + combined_codec::ConnectionCodec, + envelope::Envelope, + fragmentation_state::FragmentationState, +}; +use crate::{ + codec::FrameCodec, + fragment::{FragmentationConfig, FragmentationError}, + serializer::Serializer, +}; + +/// Outbound frame processing pipeline mirroring the connection actor's +/// `process_frame_with_hooks_and_metrics` logic. +/// +/// Applies optional fragmentation and outbound metrics to each envelope. +/// Produces a buffer of processed envelopes ready for serialization and +/// transmission. +pub(crate) struct FramePipeline { + fragmentation: Option, + out: Vec, +} + +impl FramePipeline { + /// Create a pipeline with the given optional fragmentation config. + pub(crate) fn new(fragmentation: Option) -> Self { + Self { + fragmentation: fragmentation.map(FragmentationState::new), + out: Vec::new(), + } + } + + /// Process an envelope through the pipeline: fragment → metrics. + /// + /// Processed envelopes are buffered internally. Call + /// [`drain_output`](Self::drain_output) to retrieve them. + pub(crate) fn process(&mut self, envelope: Envelope) -> io::Result<()> { + let id = envelope.id; + let correlation_id = envelope.correlation_id; + let frames = self.fragment_envelope(envelope).map_err(|err| { + warn!( + "failed to fragment outbound envelope: id={id}, \ + correlation_id={correlation_id:?}, error={err:?}" + ); + crate::metrics::inc_handler_errors(); + io::Error::other(err) + })?; + for frame in frames { + self.push_frame(frame); + } + Ok(()) + } + + /// Fragment an envelope if fragmentation is enabled, otherwise return it + /// as a single-element vector. + fn fragment_envelope( + &mut self, + envelope: Envelope, + ) -> Result, FragmentationError> { + match self.fragmentation.as_mut() { + Some(state) => state.fragment(envelope), + None => Ok(vec![envelope]), + } + } + + /// Purge expired fragment reassembly state, if fragmentation is enabled. + pub(crate) fn purge_expired(&mut self) { + if let Some(state) = self.fragmentation.as_mut() { + state.purge_expired(); + } + } + + /// Drain all buffered output envelopes, returning them for transmission. + pub(crate) fn drain_output(&mut self) -> Vec { std::mem::take(&mut self.out) } + + /// Returns a mutable reference to the inner fragmentation state, if + /// fragmentation is enabled. + /// + /// Used by the inbound reassembly path which needs direct access to + /// [`FragmentationState::reassemble`]. + pub(crate) fn fragmentation_mut(&mut self) -> Option<&mut FragmentationState> { + self.fragmentation.as_mut() + } + + /// Returns `true` when fragmentation is enabled. + #[cfg(test)] + pub(crate) fn has_fragmentation(&self) -> bool { self.fragmentation.is_some() } + + fn push_frame(&mut self, envelope: Envelope) { + self.out.push(envelope); + crate::metrics::inc_frames(crate::metrics::Direction::Outbound); + } +} + +/// Serialize an [`Envelope`] and write it through the codec to the framed +/// stream. +/// +/// # Errors +/// +/// Returns an [`io::Error`] if serialization or sending fails. +pub(super) async fn send_envelope( + serializer: &S, + codec: &F, + framed: &mut Framed>, + envelope: &Envelope, +) -> io::Result<()> +where + S: Serializer + Send + Sync, + W: AsyncRead + AsyncWrite + Unpin, + F: FrameCodec, +{ + let bytes = serializer.serialize(envelope).map_err(|e| { + let id = envelope.id; + let correlation_id = envelope.correlation_id; + warn!( + "failed to serialize outbound envelope: id={id}, correlation_id={correlation_id:?}, \ + error={e:?}" + ); + crate::metrics::inc_handler_errors(); + io::Error::other(e) + })?; + let frame = codec.wrap_payload(Bytes::from(bytes)); + framed.send(frame).await.map_err(|e| { + let id = envelope.id; + let correlation_id = envelope.correlation_id; + warn!( + "failed to send outbound frame: id={id}, correlation_id={correlation_id:?}, \ + error={e:?}" + ); + crate::metrics::inc_handler_errors(); + io::Error::other(e) + }) +} + +/// Flush a batch of pipeline-produced [`Envelope`] values through the codec +/// to the framed stream. +/// +/// Each envelope is serialized, wrapped, and written individually. On the +/// first I/O failure the remaining envelopes are discarded and the error is +/// returned. +/// +/// # Errors +/// +/// Returns an [`io::Error`] if any envelope fails to serialize or send. +pub(super) async fn flush_pipeline_output( + serializer: &S, + codec: &F, + framed: &mut Framed>, + envelopes: &mut Vec, +) -> io::Result<()> +where + S: Serializer + Send + Sync, + W: AsyncRead + AsyncWrite + Unpin, + F: FrameCodec, +{ + for envelope in envelopes.drain(..) { + send_envelope(serializer, codec, framed, &envelope).await?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use rstest::{fixture, rstest}; + + use super::*; + + #[fixture] + fn pipeline() -> FramePipeline { + let config = None; + FramePipeline::new(config) + } + + #[rstest] + fn process_single_envelope_emits_one_frame(mut pipeline: FramePipeline) { + let env = Envelope::new(1, Some(42), vec![1, 2, 3]); + pipeline + .process(env) + .expect("processing should succeed without fragmentation"); + let mut output = pipeline.drain_output(); + assert_eq!(output.len(), 1); + let first = output + .pop() + .expect("pipeline should emit exactly one envelope"); + assert_eq!(first.id, 1); + assert_eq!(first.correlation_id, Some(42)); + assert_eq!(first.payload, vec![1, 2, 3]); + } + + #[rstest] + fn drain_clears_buffer(mut pipeline: FramePipeline) { + pipeline + .process(Envelope::new(1, None, vec![])) + .expect("processing should succeed without fragmentation"); + let first = pipeline.drain_output(); + assert_eq!(first.len(), 1); + + let second = pipeline.drain_output(); + assert!(second.is_empty()); + } + + #[rstest] + fn pipeline_without_fragmentation(pipeline: FramePipeline) { + assert!(!pipeline.has_fragmentation()); + } +} diff --git a/src/app/connection.rs b/src/app/connection.rs index c9416ed2..937affb1 100644 --- a/src/app/connection.rs +++ b/src/app/connection.rs @@ -13,15 +13,14 @@ use tokio_util::codec::{Encoder, Framed, LengthDelimitedCodec}; use super::{ builder::WireframeApp, + codec_driver::FramePipeline, combined_codec::{CombinedCodec, ConnectionCodec}, envelope::{Envelope, Packet}, error::SendError, - fragmentation_state::FragmentationState, frame_handling, }; use crate::{ codec::{FrameCodec, LengthDelimitedFrameCodec, MAX_FRAME_LENGTH, clamp_frame_length}, - fragment::FragmentationConfig, frame::FrameMetadata, message::Message, message_assembler::MessageAssemblyState, @@ -30,15 +29,12 @@ use crate::{ }; fn purge_expired( - fragmentation: &mut Option, + pipeline: &mut FramePipeline, message_assembly: &mut Option, ) { - if let Some(frag) = fragmentation.as_mut() { - frag.purge_expired(); - } + pipeline.purge_expired(); frame_handling::purge_expired_assemblies(message_assembly); } - /// Maximum consecutive deserialization failures before closing a connection. const MAX_DESER_FAILURES: u32 = 10; @@ -52,7 +48,7 @@ where framed: &'a mut Framed>, deser_failures: &'a mut u32, routes: &'a HashMap>, - fragmentation: &'a mut Option, + pipeline: &'a mut FramePipeline, message_assembly: &'a mut Option, } @@ -63,8 +59,6 @@ where E: Packet, F: FrameCodec, { - fn fragmentation_config(&self) -> Option { self.fragmentation } - /// Serialize `msg` and write it to `stream` using the configured codec. /// /// # Errors @@ -259,10 +253,10 @@ where } framed.read_buffer_mut().reserve(max_frame_length); let mut deser_failures = 0u32; - let mut fragmentation = self.fragmentation_config().map(FragmentationState::new); let mut message_assembly = self.message_assembler.as_ref().map(|_| { frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length) }); + let mut pipeline = FramePipeline::new(self.fragmentation); let timeout_dur = Duration::from_millis(self.read_timeout_ms); loop { @@ -274,8 +268,8 @@ where framed: &mut framed, deser_failures: &mut deser_failures, routes, - fragmentation: &mut fragmentation, message_assembly: &mut message_assembly, + pipeline: &mut pipeline, }, &codec, ) @@ -285,7 +279,7 @@ where Ok(None) => break, Err(_) => { debug!("read timeout elapsed; continuing to wait for next frame"); - purge_expired(&mut fragmentation, &mut message_assembly); + purge_expired(&mut pipeline, &mut message_assembly); } } } @@ -306,8 +300,8 @@ where framed, deser_failures, routes, - fragmentation, message_assembly, + pipeline, } = ctx; crate::metrics::inc_frames(crate::metrics::Direction::Inbound); @@ -321,7 +315,7 @@ where return Ok(()); }; let Some(env) = frame_handling::reassemble_if_needed( - fragmentation, + pipeline, deser_failures, env, MAX_DESER_FAILURES, @@ -351,7 +345,7 @@ where frame_handling::ResponseContext:: { serializer: &self.serializer, framed, - fragmentation, + pipeline, codec, }, ) diff --git a/src/app/frame_handling/assembly_tests.rs b/src/app/frame_handling/assembly_tests.rs index 1fd135d8..8abe1322 100644 --- a/src/app/frame_handling/assembly_tests.rs +++ b/src/app/frame_handling/assembly_tests.rs @@ -58,19 +58,19 @@ fn inbound_assembly_handles_interleaved_sequences( let key1_first = inbound_envelope( 9, - test_helpers::first_frame_payload(1, b"A1", false, Some(4)), + test_helpers::first_frame_payload(1, b"A1", false, Some(4)).expect("valid test payload"), ); let key2_first = inbound_envelope( 9, - test_helpers::first_frame_payload(2, b"B1", false, Some(4)), + test_helpers::first_frame_payload(2, b"B1", false, Some(4)).expect("valid test payload"), ); let key1_last = inbound_envelope( 9, - test_helpers::continuation_frame_payload(1, 1, b"A2", true), + test_helpers::continuation_frame_payload(1, 1, b"A2", true).expect("valid test payload"), ); let key2_last = inbound_envelope( 9, - test_helpers::continuation_frame_payload(2, 1, b"B2", true), + test_helpers::continuation_frame_payload(2, 1, b"B2", true).expect("valid test payload"), ); assert!( @@ -132,19 +132,19 @@ fn inbound_assembly_rejects_ordering_violations( let first = inbound_envelope( 3, - test_helpers::first_frame_payload(99, b"ab", false, Some(6)), + test_helpers::first_frame_payload(99, b"ab", false, Some(6)).expect("valid test payload"), ); let cont_seq1 = inbound_envelope( 3, - test_helpers::continuation_frame_payload(99, 1, b"cd", false), + test_helpers::continuation_frame_payload(99, 1, b"cd", false).expect("valid test payload"), ); let cont_seq3 = inbound_envelope( 3, - test_helpers::continuation_frame_payload(99, 3, b"ef", false), + test_helpers::continuation_frame_payload(99, 3, b"ef", false).expect("valid test payload"), ); let cont_seq2 = inbound_envelope( 3, - test_helpers::continuation_frame_payload(99, 2, b"gh", true), + test_helpers::continuation_frame_payload(99, 2, b"gh", true).expect("valid test payload"), ); assert!( @@ -208,7 +208,7 @@ fn inbound_assembly_timeout_purges_partial_state( let first = inbound_envelope( 5, - test_helpers::first_frame_payload(7, b"ab", false, Some(4)), + test_helpers::first_frame_payload(7, b"ab", false, Some(4)).expect("valid test payload"), ); assert!( process_assembly_frame( @@ -231,7 +231,7 @@ fn inbound_assembly_timeout_purges_partial_state( let continuation = inbound_envelope( 5, - test_helpers::continuation_frame_payload(7, 1, b"cd", true), + test_helpers::continuation_frame_payload(7, 1, b"cd", true).expect("valid test payload"), ); assert!( process_assembly_frame( @@ -261,7 +261,10 @@ fn assemble_if_needed_passes_through_when_assembler_is_none( let mut deser_failures = 0_u32; let mut state = Some(message_assembly_state?); - let envelope = inbound_envelope(9, test_helpers::first_frame_payload(1, b"A", true, Some(1))); + let envelope = inbound_envelope( + 9, + test_helpers::first_frame_payload(1, b"A", true, Some(1)).expect("valid test payload"), + ); let result = assemble_if_needed( AssemblyRuntime::new(None, &mut state), &mut deser_failures, @@ -282,7 +285,10 @@ fn assemble_if_needed_passes_through_when_state_is_none( let mut deser_failures = 0_u32; let mut state: Option = None; - let envelope = inbound_envelope(9, test_helpers::first_frame_payload(1, b"A", true, Some(1))); + let envelope = inbound_envelope( + 9, + test_helpers::first_frame_payload(1, b"A", true, Some(1)).expect("valid test payload"), + ); let result = assemble_if_needed( AssemblyRuntime::new(Some(&message_assembler), &mut state), &mut deser_failures, @@ -311,13 +317,13 @@ fn interleaved_assemblies_preserve_first_frame_routing_metadata( let key1_first = Envelope::new( 10, Some(100), - test_helpers::first_frame_payload(1, b"A1", false, Some(4)), + test_helpers::first_frame_payload(1, b"A1", false, Some(4)).expect("valid test payload"), ); // Key 2: envelope_id=20, correlation_id=200 let key2_first = Envelope::new( 20, Some(200), - test_helpers::first_frame_payload(2, b"B1", false, Some(4)), + test_helpers::first_frame_payload(2, b"B1", false, Some(4)).expect("valid test payload"), ); assert!( @@ -345,7 +351,7 @@ fn interleaved_assemblies_preserve_first_frame_routing_metadata( let key2_last = Envelope::new( 88, Some(888), - test_helpers::continuation_frame_payload(2, 1, b"B2", true), + test_helpers::continuation_frame_payload(2, 1, b"B2", true).expect("valid test payload"), ); let completed_b = process_assembly_frame( &message_assembler, @@ -364,7 +370,7 @@ fn interleaved_assemblies_preserve_first_frame_routing_metadata( let key1_last = Envelope::new( 77, Some(777), - test_helpers::continuation_frame_payload(1, 1, b"A2", true), + test_helpers::continuation_frame_payload(1, 1, b"A2", true).expect("valid test payload"), ); let completed_a = process_assembly_frame( &message_assembler, diff --git a/src/app/frame_handling/core.rs b/src/app/frame_handling/core.rs index 2fff9b7e..6fb4b277 100644 --- a/src/app/frame_handling/core.rs +++ b/src/app/frame_handling/core.rs @@ -7,7 +7,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; use crate::{ - app::{combined_codec::ConnectionCodec, fragmentation_state::FragmentationState}, + app::{codec_driver::FramePipeline, combined_codec::ConnectionCodec}, codec::FrameCodec, serializer::Serializer, }; @@ -49,6 +49,6 @@ where { pub(crate) serializer: &'a S, pub(crate) framed: &'a mut Framed>, - pub(crate) fragmentation: &'a mut Option, + pub(crate) pipeline: &'a mut FramePipeline, pub(crate) codec: &'a F, } diff --git a/src/app/frame_handling/reassembly.rs b/src/app/frame_handling/reassembly.rs index 7d9abfb4..77cdb5a0 100644 --- a/src/app/frame_handling/reassembly.rs +++ b/src/app/frame_handling/reassembly.rs @@ -5,19 +5,20 @@ use std::io; use super::core::DeserFailureTracker; use crate::app::{ Envelope, - fragmentation_state::{FragmentProcessError, FragmentationState}, + codec_driver::FramePipeline, + fragmentation_state::FragmentProcessError, }; /// Attempt to reassemble a potentially fragmented envelope. pub(crate) fn reassemble_if_needed( - fragmentation: &mut Option, + pipeline: &mut FramePipeline, deser_failures: &mut u32, env: Envelope, max_deser_failures: u32, ) -> io::Result> { let mut failures = DeserFailureTracker::new(deser_failures, max_deser_failures); - if let Some(state) = fragmentation.as_mut() { + if let Some(state) = pipeline.fragmentation_mut() { let correlation_id = env.correlation_id; match state.reassemble(env) { Ok(Some(env)) => Ok(Some(env)), diff --git a/src/app/frame_handling/response.rs b/src/app/frame_handling/response.rs index edcb3d33..bf1671d0 100644 --- a/src/app/frame_handling/response.rs +++ b/src/app/frame_handling/response.rs @@ -2,26 +2,17 @@ use std::io; -use bytes::Bytes; -use futures::SinkExt; use log::warn; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::Framed; use crate::{ - app::{ - Envelope, - Packet, - PacketParts, - combined_codec::ConnectionCodec, - fragmentation_state::FragmentationState, - }, + app::{Envelope, Packet, PacketParts, codec_driver::flush_pipeline_output}, codec::FrameCodec, middleware::{HandlerService, Service, ServiceRequest}, serializer::Serializer, }; -/// Forward a handler response, fragmenting if required, and write to the framed stream. +/// Forward a handler response through the pipeline and write to the framed stream. /// /// `forward_response` accepts an [`Envelope`], builds a [`ServiceRequest`], and /// invokes `service.call(request)`. If the handler returns `Err(e)`, this is @@ -30,6 +21,10 @@ use crate::{ /// returns `Ok(())` (intentional log-and-continue behaviour). Transport-level /// I/O failures (for example during fragmentation, serialization, or frame /// send) still return `io::Error` and are propagated to the caller. +/// +/// Responses are processed through the [`FramePipeline`] which applies +/// fragmentation, protocol hooks (`before_send`), and outbound metrics before +/// serializing and sending via the codec. pub(crate) async fn forward_response( env: Envelope, service: &HandlerService, @@ -57,91 +52,10 @@ where let parts = PacketParts::new(env.id, resp.correlation_id(), resp.into_inner()) .inherit_correlation(env.correlation_id); - let correlation_id = parts.correlation_id(); - let responses = fragment_responses(ctx.fragmentation, parts, env.id, correlation_id)?; - - for response in responses { - let bytes = serialize_response(ctx.serializer, &response, env.id, correlation_id)?; - send_response_payload::(ctx.codec, ctx.framed, Bytes::from(bytes), &response).await?; - } - - Ok(()) -} - -fn fragment_responses( - fragmentation: &mut Option, - parts: PacketParts, - id: u32, - correlation_id: Option, -) -> io::Result> { let envelope = Envelope::from_parts(parts); - match fragmentation.as_mut() { - Some(state) => match state.fragment(envelope) { - Ok(fragmented) => Ok(fragmented), - Err(err) => { - warn!( - concat!( - "failed to fragment response: id={id}, correlation_id={correlation_id:?}, ", - "error={err:?}" - ), - id = id, - correlation_id = correlation_id, - err = err - ); - crate::metrics::inc_handler_errors(); - Err(io::Error::other(err)) - } - }, - None => Ok(vec![envelope]), - } -} -fn serialize_response( - serializer: &S, - response: &Envelope, - id: u32, - correlation_id: Option, -) -> io::Result> { - match serializer.serialize(response) { - Ok(bytes) => Ok(bytes), - Err(e) => { - warn!( - concat!( - "failed to serialize response: id={id}, correlation_id={correlation_id:?}, ", - "error={e:?}" - ), - id = id, - correlation_id = correlation_id, - e = e - ); - crate::metrics::inc_handler_errors(); - Err(io::Error::other(e)) - } - } -} - -/// Send a response payload over the framed stream using codec-aware wrapping. -/// -/// Wraps the raw payload bytes in the codec's native frame format via -/// [`FrameCodec::wrap_payload`] before writing to the underlying stream. -/// This ensures responses are encoded correctly for the configured protocol. -pub(super) async fn send_response_payload( - codec: &F, - framed: &mut Framed>, - payload: Bytes, - response: &Envelope, -) -> io::Result<()> -where - W: AsyncRead + AsyncWrite + Unpin, - F: FrameCodec, -{ - let frame = codec.wrap_payload(payload); - if let Err(e) = framed.send(frame).await { - let id = response.id; - let correlation_id = response.correlation_id; - warn!("failed to send response: id={id}, correlation_id={correlation_id:?}, error={e:?}"); - crate::metrics::inc_handler_errors(); - return Err(io::Error::other(e)); - } - Ok(()) + // Route through the pipeline: fragment → before_send → metrics + ctx.pipeline.process(envelope)?; + let mut output = ctx.pipeline.drain_output(); + flush_pipeline_output(ctx.serializer, ctx.codec, ctx.framed, &mut output).await } diff --git a/src/app/frame_handling/tests.rs b/src/app/frame_handling/tests.rs index e693f68b..791530f9 100644 --- a/src/app/frame_handling/tests.rs +++ b/src/app/frame_handling/tests.rs @@ -11,10 +11,15 @@ use rstest::{fixture, rstest}; use tokio::io::DuplexStream; use tokio_util::codec::{Decoder, Encoder}; -use super::{ResponseContext, response::send_response_payload}; +use super::ResponseContext; use crate::{ - app::{Envelope, combined_codec::CombinedCodec, fragmentation_state::FragmentationState}, + app::{ + Envelope, + codec_driver::{FramePipeline, send_envelope}, + combined_codec::CombinedCodec, + }, codec::FrameCodec, + serializer::BincodeSerializer, }; /// Test frame carrying a tag byte and payload. @@ -160,26 +165,20 @@ fn build_harness(max_frame_length: usize) -> TestHarness { } } -/// Verify `send_response_payload` uses `F::wrap_payload` to frame responses. +/// Verify `send_envelope` uses `F::wrap_payload` to frame responses. #[rstest] #[tokio::test] -async fn send_response_payload_wraps_with_codec(harness: TestHarness) { +async fn send_envelope_wraps_with_codec(harness: TestHarness) { let TestHarness { codec, mut framed, client, } = harness; - let payload = vec![1, 2, 3, 4]; - let response = Envelope::new(1, Some(99), payload.clone()); - send_response_payload::( - &codec, - &mut framed, - Bytes::from(payload.clone()), - &response, - ) - .await - .expect("send should succeed"); + let envelope = Envelope::new(1, Some(99), vec![1, 2, 3, 4]); + send_envelope(&BincodeSerializer, &codec, &mut framed, &envelope) + .await + .expect("send should succeed"); drop(framed); @@ -192,7 +191,6 @@ async fn send_response_payload_wraps_with_codec(harness: TestHarness) { .expect("decode should succeed"); assert_eq!(frame.tag, 0x42, "wrap_payload should set tag to 0x42"); - assert_eq!(frame.payload, payload, "payload should match"); assert_eq!(codec.wraps(), 1, "wrap_payload should advance codec state"); } @@ -200,47 +198,39 @@ async fn send_response_payload_wraps_with_codec(harness: TestHarness) { #[rstest] #[tokio::test] async fn response_context_holds_references(harness: TestHarness) { - use crate::serializer::BincodeSerializer; - let TestHarness { codec, mut framed, client: _client, } = harness; let serializer = BincodeSerializer; - let mut fragmentation: Option = None; + let mut pipeline = FramePipeline::new(None); let ctx: ResponseContext<'_, BincodeSerializer, _, TestCodec> = ResponseContext { serializer: &serializer, framed: &mut framed, - fragmentation: &mut fragmentation, + pipeline: &mut pipeline, codec: &codec, }; - // Verify fields are accessible (compile-time check with runtime assertion) - assert!(ctx.fragmentation.is_none()); + // Verify fields are accessible (compile-time check) + assert!(!ctx.pipeline.has_fragmentation()); } -/// Verify `send_response_payload` returns error on send failure. +/// Verify `send_envelope` returns error on send failure. #[rstest] #[tokio::test] -async fn send_response_payload_returns_error_on_failure(small_harness: TestHarness) { +async fn send_envelope_returns_error_on_failure(small_harness: TestHarness) { let TestHarness { codec, mut framed, client: _client, } = small_harness; - // Payload exceeds max_frame_length, so encode will fail + // Payload exceeds max_frame_length after serialization, so encode will fail let oversized_payload = vec![0u8; 100]; - let response = Envelope::new(1, Some(99), oversized_payload.clone()); - let result = send_response_payload::( - &codec, - &mut framed, - Bytes::from(oversized_payload), - &response, - ) - .await; + let envelope = Envelope::new(1, Some(99), oversized_payload); + let result = send_envelope(&BincodeSerializer, &codec, &mut framed, &envelope).await; assert!( result.is_err(), diff --git a/src/app/mod.rs b/src/app/mod.rs index 74fdcff0..dc39bc04 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -10,6 +10,7 @@ mod builder; mod builder_defaults; +mod codec_driver; mod combined_codec; mod connection; mod envelope; diff --git a/src/client/builder/mod.rs b/src/client/builder/mod.rs index 6752500e..a2987aea 100644 --- a/src/client/builder/mod.rs +++ b/src/client/builder/mod.rs @@ -10,7 +10,7 @@ /// Use this macro for small builders with a limited number of fields (five in /// this case) and single-field updates per method. For larger builders with /// many coordinated updates, prefer a dedicated helper method to keep the -/// reconstruction logic centralised and easier to audit (see +/// reconstruction logic centralized and easier to audit (see /// `WireframeApp::rebuild_with_params` and /// `docs/builder-pattern-conventions.md`). /// diff --git a/src/test_helpers.rs b/src/test_helpers.rs index e02f1bd3..b30e52d3 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -119,8 +119,16 @@ fn invalid_data(message: &'static str) -> io::Error { } /// Build a first-frame payload for the test protocol. -#[must_use] -pub fn first_frame_payload(key: u64, body: &[u8], is_last: bool, total: Option) -> Vec { +/// +/// # Errors +/// +/// Returns an error if the body length exceeds `u32::MAX`. +pub fn first_frame_payload( + key: u64, + body: &[u8], + is_last: bool, + total: Option, +) -> Result, io::Error> { let mut payload = BytesMut::new(); payload.put_u8(0x01); let mut flags = 0u8; @@ -133,17 +141,26 @@ pub fn first_frame_payload(key: u64, body: &[u8], is_last: bool, total: Option Vec { +/// +/// # Errors +/// +/// Returns an error if the body length exceeds `u32::MAX`. +pub fn continuation_frame_payload( + key: u64, + sequence: u32, + body: &[u8], + is_last: bool, +) -> Result, io::Error> { let mut payload = BytesMut::new(); payload.put_u8(0x02); let mut flags = 0b10; @@ -152,8 +169,9 @@ pub fn continuation_frame_payload(key: u64, sequence: u32, body: &[u8], is_last: } payload.put_u8(flags); payload.put_u64(key); - payload.put_u32(u32::try_from(body.len()).unwrap_or(u32::MAX)); + let body_len = u32::try_from(body.len()).map_err(|_| invalid_data("body length too large"))?; + payload.put_u32(body_len); payload.put_u32(sequence); payload.extend_from_slice(body); - payload.to_vec() + Ok(payload.to_vec()) } diff --git a/tests/common/unified_codec_transport.rs b/tests/common/unified_codec_transport.rs new file mode 100644 index 00000000..af7c4336 --- /dev/null +++ b/tests/common/unified_codec_transport.rs @@ -0,0 +1,43 @@ +//! Shared transport helpers for unified codec tests. +//! +//! Provides common send/receive helpers used by both integration and +//! behavioural unified codec tests. + +use std::time::Duration; + +use futures::{SinkExt, StreamExt}; +use tokio::time::timeout; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{Serializer, app::Envelope, serializer::BincodeSerializer}; + +/// Shared result type for helper functions in this module. +pub type SharedTestResult = Result>; + +/// Serialize and send one envelope to a framed test client. +/// +/// # Errors +/// Returns an error if serialization or sending fails. +pub async fn send_one( + client: &mut Framed, + envelope: &Envelope, +) -> SharedTestResult { + let serializer = BincodeSerializer; + let bytes = serializer.serialize(envelope)?; + client.send(bytes.into()).await?; + Ok(()) +} + +/// Receive and deserialize one envelope from a framed test client. +/// +/// # Errors +/// Returns an error if reading, timeout, or deserialization fails. +pub async fn recv_one( + client: &mut Framed, +) -> SharedTestResult { + let serializer = BincodeSerializer; + let frame = timeout(Duration::from_secs(2), client.next()) + .await? + .ok_or("response frame missing")??; + let (env, _) = serializer.deserialize::(&frame)?; + Ok(env) +} diff --git a/tests/features/message_assembly_inbound.feature b/tests/features/message_assembly_inbound.feature index d841971c..64105552 100644 --- a/tests/features/message_assembly_inbound.feature +++ b/tests/features/message_assembly_inbound.feature @@ -19,7 +19,7 @@ Feature: Inbound message assembly integration And a continuation frame for key 9 sequence 3 with body "ef" arrives And a final continuation frame for key 9 sequence 2 with body "gh" arrives Then the handler eventually receives payload "abcdgh" - And the handler receives 1 payloads + And the handler receives 1 payload And no send error is recorded Scenario: Timeout purges partial assembly before continuation arrives diff --git a/tests/features/unified_codec.feature b/tests/features/unified_codec.feature new file mode 100644 index 00000000..aa03b103 --- /dev/null +++ b/tests/features/unified_codec.feature @@ -0,0 +1,33 @@ +Feature: Unified codec pipeline + All outbound frames pass through the FramePipeline before reaching the wire, + ensuring consistent fragmentation and metrics regardless of response origin. + + Scenario: Handler response round-trips through the unified pipeline + Given a wireframe echo server with a buffer capacity of 512 bytes + When the client sends a 5-byte payload + Then the handler receives the original payload + And the client receives a response matching the original payload + + Scenario: Fragmented response passes through the unified pipeline + Given a wireframe echo server with a buffer capacity of 512 bytes and fragmentation enabled + When the client sends a fragmented 1200-byte payload + Then the handler receives the reassembled payload + And the client receives a fragmented response matching the original payload + + Scenario: Small payload passes through the pipeline unfragmented + Given a wireframe echo server with a buffer capacity of 512 bytes and fragmentation enabled + When the client sends a 16-byte payload + Then the handler receives the original payload + And the client receives an unfragmented response matching the original payload + + Scenario: Multiple sequential requests pass through the pipeline + Given a wireframe echo server with a buffer capacity of 512 bytes + When the client sends 5 sequential 8-byte payloads + Then the handler receives all 5 payloads in order + And the client receives 5 responses matching the original payloads + + Scenario: Disabled fragmentation passes large payloads unchanged + Given a wireframe echo server with a buffer capacity of 512 bytes + When the client sends a 256-byte payload + Then the handler receives the original payload + And the client receives an unfragmented response matching the original payload diff --git a/tests/fixtures/message_assembly_inbound.rs b/tests/fixtures/message_assembly_inbound.rs index 2c41967f..2b975f7a 100644 --- a/tests/fixtures/message_assembly_inbound.rs +++ b/tests/fixtures/message_assembly_inbound.rs @@ -213,12 +213,8 @@ impl MessageAssemblyInboundWorld { /// Returns an error if the client is not initialised or the send fails. pub fn send_first_frame(&mut self, key: impl Into, body: &str) -> TestResult { let key = key.into(); - self.send_payload(test_helpers::first_frame_payload( - key.0, - body.as_bytes(), - false, - None, - )) + let payload = test_helpers::first_frame_payload(key.0, body.as_bytes(), false, None)?; + self.send_payload(payload) } /// Serialize and send a non-final continuation frame. @@ -262,12 +258,9 @@ impl MessageAssemblyInboundWorld { body, is_last, } = params; - self.send_payload(test_helpers::continuation_frame_payload( - key, - sequence, - body.as_bytes(), - is_last, - )) + let payload = + test_helpers::continuation_frame_payload(key, sequence, body.as_bytes(), is_last)?; + self.send_payload(payload) } /// Advance the paused Tokio clock by `millis` milliseconds. diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index c9137778..3667aef0 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -18,3 +18,4 @@ pub mod multi_packet; pub mod panic; pub mod request_parts; pub mod stream_end; +pub mod unified_codec; diff --git a/tests/fixtures/unified_codec/mod.rs b/tests/fixtures/unified_codec/mod.rs new file mode 100644 index 00000000..38b2c589 --- /dev/null +++ b/tests/fixtures/unified_codec/mod.rs @@ -0,0 +1,175 @@ +//! `UnifiedCodecWorld` fixture for rstest-bdd tests. +//! +//! Validates that all outbound frames pass through the [`FramePipeline`] +//! before reaching the wire, exercising the unified codec path end-to-end +//! via `WireframeApp::handle_connection_result` over in-memory duplex +//! streams. + +mod transport; + +use std::io; + +use rstest::fixture; +use tokio::{runtime::Runtime, sync::mpsc, task::JoinHandle}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + app::{Envelope, Handler, Packet, WireframeApp}, + fragment::FragmentationConfig, +}; +/// Re-export `TestResult` from `wireframe_testing` for use in steps. +pub use wireframe_testing::TestResult; + +/// Default route identifier used in unified codec tests. +const ROUTE_ID: u32 = 42; +/// Default correlation identifier used in unified codec tests. +const CORRELATION: Option = Some(7); + +/// Test world for the unified codec pipeline BDD scenarios. +#[derive(Debug)] +pub struct UnifiedCodecWorld { + pub(super) capacity: usize, + pub(super) fragmentation: Option, + pub(super) client: Option>, + pub(super) server_handle: Option>>, + pub(super) handler_rx: Option>>, + pub(super) sent_payloads: Vec>, + /// Handler-observed payloads, in order. + pub handler_observed: Vec>, + pub(super) response_payloads: Vec>, + pub(super) last_response_fragmented: Option, +} + +impl Default for UnifiedCodecWorld { + fn default() -> Self { + Self { + capacity: 512, + fragmentation: None, + client: None, + server_handle: None, + handler_rx: None, + sent_payloads: Vec::new(), + handler_observed: Vec::new(), + response_payloads: Vec::new(), + last_response_fragmented: None, + } + } +} + +/// Fixture for unified codec pipeline scenarios used by rstest-bdd steps. +/// +/// Note: `rustfmt::skip` prevents single-line collapse that triggers +/// `unused_braces`. +#[rustfmt::skip] +#[fixture] +pub fn unified_codec_world() -> UnifiedCodecWorld { + UnifiedCodecWorld::default() +} + +impl UnifiedCodecWorld { + /// Start an echo server with optional fragmentation. + /// + /// # Errors + /// Returns an error if app creation or spawning fails. + pub fn start_server( + &mut self, + runtime: &Runtime, + capacity: usize, + fragmentation: bool, + ) -> TestResult { + self.capacity = capacity; + + let (tx, rx) = mpsc::unbounded_channel(); + self.handler_rx = Some(rx); + + let handler = Self::make_handler(&tx); + let mut app: WireframeApp = WireframeApp::new()?.buffer_capacity(capacity); + + if fragmentation { + let config = Self::fragmentation_config(capacity)?; + self.fragmentation = Some(config); + app = app.fragmentation(Some(config)); + } + + let app = app.route(ROUTE_ID, handler)?; + let codec = app.length_codec(); + let (client_stream, server_stream) = tokio::io::duplex(256); + let client = Framed::new(client_stream, codec.clone()); + let server = + runtime.spawn(async move { app.handle_connection_result(server_stream).await }); + + self.client = Some(client); + self.server_handle = Some(server); + Ok(()) + } + + /// Verify that all handler-observed payloads match the sent payloads. + /// + /// # Errors + /// Returns an error if payloads do not match. + pub fn verify_handler_payloads(&self) -> TestResult { + self.verify_payloads_match(&self.handler_observed, "handler") + } + + /// Verify that all response payloads match the sent payloads. + /// + /// # Errors + /// Returns an error if payloads do not match. + pub fn verify_response_payloads(&self) -> TestResult { + self.verify_payloads_match(&self.response_payloads, "response") + } + + /// Verify the last response was not fragmented. + /// + /// # Errors + /// Returns an error if the response was fragmented. + pub fn verify_unfragmented(&self) -> TestResult { + match self.last_response_fragmented { + Some(false) => Ok(()), + Some(true) => Err("expected unfragmented response".into()), + None => Err("no response collected yet".into()), + } + } + + /// Await server shutdown. + /// + /// # Errors + /// Returns an error if the server task panicked or returned an error. + pub async fn await_server(&mut self) -> TestResult { + if let Some(handle) = self.server_handle.take() { + handle.await??; + } + Ok(()) + } + + fn verify_payloads_match(&self, observed: &[Vec], label: &str) -> TestResult { + if observed.len() != self.sent_payloads.len() { + return Err(format!( + "{label} payload count mismatch: expected {}, got {}", + self.sent_payloads.len(), + observed.len() + ) + .into()); + } + for (i, (observed, expected)) in observed.iter().zip(self.sent_payloads.iter()).enumerate() + { + if observed != expected { + return Err(format!("{label} payload {i} mismatch").into()); + } + } + Ok(()) + } + + fn make_handler(sender: &mpsc::UnboundedSender>) -> Handler { + let tx = sender.clone(); + std::sync::Arc::new(move |env: &Envelope| { + let tx = tx.clone(); + let payload = env.clone().into_parts().into_payload(); + Box::pin(async move { + assert!( + tx.send(payload).is_ok(), + "handler channel send must succeed in tests" + ); + }) + }) + } +} diff --git a/tests/fixtures/unified_codec/transport.rs b/tests/fixtures/unified_codec/transport.rs new file mode 100644 index 00000000..cc03c201 --- /dev/null +++ b/tests/fixtures/unified_codec/transport.rs @@ -0,0 +1,240 @@ +//! Transport helpers for `UnifiedCodecWorld`. +//! +//! Contains the low-level send, receive, fragmentation, and reassembly +//! utilities used by the fixture's public async methods. + +#[path = "../../common/unified_codec_transport.rs"] +mod unified_codec_transport; + +use std::{num::NonZeroUsize, time::Duration}; + +use futures::{SinkExt, StreamExt}; +use tokio::{io::AsyncWriteExt, time::timeout}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + Serializer, + app::{Envelope, Packet}, + fragment::{ + FragmentationConfig, + Fragmenter, + Reassembler, + decode_fragment_payload, + encode_fragment_payload, + }, + serializer::BincodeSerializer, +}; + +use self::unified_codec_transport::{recv_one, send_one}; +use super::{CORRELATION, ROUTE_ID, TestResult, UnifiedCodecWorld}; + +impl UnifiedCodecWorld { + /// Send a single payload of the given size to the server. + /// + /// # Errors + /// Returns an error if serialization or sending fails. + pub async fn send_payload(&mut self, size: usize) -> TestResult { + let payload = vec![b'P'; size]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + self.sent_payloads.push(payload); + + let client = self.client.as_mut().ok_or("client not started")?; + send_one(client, &request).await?; + client.get_mut().shutdown().await?; + Ok(()) + } + + /// Send a fragmented payload of the given size to the server. + /// + /// # Errors + /// Returns an error if fragmentation, serialization, or sending fails. + pub async fn send_fragmented_payload(&mut self, size: usize) -> TestResult { + let config = self.fragmentation.ok_or("fragmentation not configured")?; + let payload = vec![b'F'; size]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + self.sent_payloads.push(payload); + + let fragmenter = Fragmenter::new(config.fragment_payload_cap); + let envelopes = Self::fragment_envelope(&request, &fragmenter)?; + + let client = self.client.as_mut().ok_or("client not started")?; + Self::send_envelopes(client, &envelopes).await?; + client.flush().await?; + client.get_mut().shutdown().await?; + Ok(()) + } + + /// Send multiple sequential payloads. + /// + /// # Errors + /// Returns an error if serialization or sending fails. + pub async fn send_sequential_payloads(&mut self, count: usize, size: usize) -> TestResult { + let client = self.client.as_mut().ok_or("client not started")?; + + for i in 0..count { + let payload = vec![i.try_into().unwrap_or(0); size]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + self.sent_payloads.push(payload); + send_one(client, &request).await?; + } + client.get_mut().shutdown().await?; + Ok(()) + } + + /// Drain the handler channel and store observed payloads. + /// + /// # Errors + /// Returns an error if the handler channel times out. + pub async fn collect_handler_payloads(&mut self) -> TestResult { + let rx = self.handler_rx.as_mut().ok_or("handler rx not started")?; + let expected_count = self.sent_payloads.len(); + + for _ in 0..expected_count { + let observed = timeout(Duration::from_secs(2), rx.recv()) + .await? + .ok_or("handler payload missing")?; + self.handler_observed.push(observed); + } + Ok(()) + } + + /// Read a single unfragmented response. + /// + /// # Errors + /// Returns an error if reading or deserialization fails. + pub async fn collect_single_response(&mut self) -> TestResult { + let client = self.client.as_mut().ok_or("client not started")?; + let response = recv_one(client).await?; + let payload = response.into_parts().into_payload(); + + self.last_response_fragmented = Some(decode_fragment_payload(&payload)?.is_some()); + self.response_payloads.push(payload); + Ok(()) + } + + /// Read a fragmented response and reassemble it. + /// + /// # Errors + /// Returns an error if reading or reassembly fails. + pub async fn collect_fragmented_response(&mut self) -> TestResult { + let config = self.fragmentation.ok_or("fragmentation not configured")?; + let client = self.client.as_mut().ok_or("client not started")?; + let payload = Self::read_reassembled(client, &config).await?; + self.last_response_fragmented = Some(true); + self.response_payloads.push(payload); + Ok(()) + } + + /// Read multiple sequential responses. + /// + /// # Errors + /// Returns an error if reading or deserialization fails. + pub async fn collect_sequential_responses(&mut self, count: usize) -> TestResult { + let client = self.client.as_mut().ok_or("client not started")?; + for _ in 0..count { + let response = recv_one(client).await?; + let payload = response.into_parts().into_payload(); + self.response_payloads.push(payload); + } + Ok(()) + } + + // -- low-level transport helpers ---------------------------------------- + + pub(super) fn fragmentation_config(capacity: usize) -> TestResult { + let message_limit = capacity + .checked_mul(16) + .and_then(NonZeroUsize::new) + .ok_or("message limit overflow or zero")?; + let config = FragmentationConfig::for_frame_budget( + capacity, + message_limit, + Duration::from_millis(30), + ) + .ok_or("frame budget must exceed fragment overhead")?; + Ok(config) + } + + async fn send_envelopes( + client: &mut Framed, + envelopes: &[Envelope], + ) -> TestResult { + let serializer = BincodeSerializer; + for env in envelopes { + let bytes = serializer.serialize(env)?; + client.send(bytes.into()).await?; + } + Ok(()) + } + + fn fragment_envelope( + envelope: &Envelope, + fragmenter: &Fragmenter, + ) -> TestResult> { + let parts = envelope.clone().into_parts(); + let id = parts.id(); + let correlation = parts.correlation_id(); + let payload = parts.into_payload(); + + if payload.len() <= fragmenter.max_fragment_size().get() { + return Ok(vec![Envelope::new(id, correlation, payload)]); + } + + let envelopes = fragmenter + .fragment_bytes(payload)? + .into_iter() + .map(|fragment| { + let (header, frag_payload) = fragment.into_parts(); + encode_fragment_payload(header, &frag_payload) + .map(|encoded| Envelope::new(id, correlation, encoded)) + }) + .collect::, _>>()?; + + Ok(envelopes) + } + + async fn read_reassembled( + client: &mut Framed, + cfg: &FragmentationConfig, + ) -> TestResult> { + let serializer = BincodeSerializer; + let mut reassembler = Reassembler::new(cfg.max_message_size, cfg.reassembly_timeout); + + let result: TestResult> = timeout( + Duration::from_secs(2), + Self::reassemble_loop(client, serializer, &mut reassembler), + ) + .await?; + + result + } + + async fn reassemble_loop( + client: &mut Framed, + serializer: BincodeSerializer, + reassembler: &mut Reassembler, + ) -> TestResult> { + while let Some(frame) = client.next().await { + let completed = Self::try_reassemble_frame(&frame?, serializer, reassembler)?; + if let Some(payload) = completed { + return Ok(payload); + } + } + Err("stream ended before reassembly completed".into()) + } + + fn try_reassemble_frame( + bytes: &bytes::BytesMut, + serializer: BincodeSerializer, + reassembler: &mut Reassembler, + ) -> TestResult>> { + let (env, _) = serializer.deserialize::(bytes)?; + let payload = env.into_parts().into_payload(); + match decode_fragment_payload(&payload)? { + Some((header, fragment)) => match reassembler.push(header, fragment)? { + Some(message) => Ok(Some(message.into_payload())), + None => Ok(None), + }, + None => Ok(Some(payload)), + } + } +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index e92912a8..24e433c7 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -22,3 +22,4 @@ mod multi_packet_scenarios; mod panic_scenarios; mod request_parts_scenarios; mod stream_end_scenarios; +mod unified_codec_scenarios; diff --git a/tests/scenarios/unified_codec_scenarios.rs b/tests/scenarios/unified_codec_scenarios.rs new file mode 100644 index 00000000..8b2c374c --- /dev/null +++ b/tests/scenarios/unified_codec_scenarios.rs @@ -0,0 +1,55 @@ +//! Scenario tests for the unified codec pipeline. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::unified_codec::*; + +#[scenario( + path = "tests/features/unified_codec.feature", + name = "Handler response round-trips through the unified pipeline" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn handler_response_round_trip(unified_codec_world: UnifiedCodecWorld) {} + +#[scenario( + path = "tests/features/unified_codec.feature", + name = "Fragmented response passes through the unified pipeline" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn fragmented_response_pipeline(unified_codec_world: UnifiedCodecWorld) {} + +#[scenario( + path = "tests/features/unified_codec.feature", + name = "Small payload passes through the pipeline unfragmented" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn small_payload_unfragmented(unified_codec_world: UnifiedCodecWorld) {} + +#[scenario( + path = "tests/features/unified_codec.feature", + name = "Multiple sequential requests pass through the pipeline" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn multiple_sequential_requests(unified_codec_world: UnifiedCodecWorld) {} + +#[scenario( + path = "tests/features/unified_codec.feature", + name = "Disabled fragmentation passes large payloads unchanged" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn disabled_fragmentation_large_payload(unified_codec_world: UnifiedCodecWorld) {} diff --git a/tests/steps/message_assembly_inbound_steps.rs b/tests/steps/message_assembly_inbound_steps.rs index 0f2e9fc5..519d4d87 100644 --- a/tests/steps/message_assembly_inbound_steps.rs +++ b/tests/steps/message_assembly_inbound_steps.rs @@ -64,6 +64,7 @@ fn then_handler_receives_payload( } #[then("the handler receives {count:usize} payloads")] +#[then("the handler receives {count:usize} payload")] fn then_handler_receives_count( message_assembly_inbound_world: &mut MessageAssemblyInboundWorld, count: usize, diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 8b9a8f76..fd75ad21 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -18,5 +18,6 @@ mod multi_packet_steps; mod panic_steps; mod request_parts_steps; mod stream_end_steps; +mod unified_codec_steps; pub(crate) use message_assembly_steps::FrameId; diff --git a/tests/steps/unified_codec_steps.rs b/tests/steps/unified_codec_steps.rs new file mode 100644 index 00000000..ff4f0b71 --- /dev/null +++ b/tests/steps/unified_codec_steps.rs @@ -0,0 +1,138 @@ +//! Step definitions for the unified codec pipeline behavioural tests. +//! +//! Steps are synchronous; async fixture methods are driven via a shared +//! Tokio runtime. + +use std::{future::Future, sync::OnceLock}; + +use rstest_bdd_macros::{given, then, when}; +use tokio::runtime::Runtime; + +use crate::fixtures::unified_codec::{TestResult, UnifiedCodecWorld}; + +fn runtime() -> TestResult<&'static Runtime> { + static RUNTIME: OnceLock> = OnceLock::new(); + let runtime = RUNTIME.get_or_init(|| { + Runtime::new().map_err(|error| format!("failed to create shared Tokio runtime: {error}")) + }); + runtime.as_ref().map_err(|error| error.clone().into()) +} + +fn run_async(future: F) -> TestResult +where + F: Future, +{ + runtime()?.block_on(future) +} + +fn collect_and_verify_handler_payloads(unified_codec_world: &mut UnifiedCodecWorld) -> TestResult { + run_async(unified_codec_world.collect_handler_payloads())?; + unified_codec_world.verify_handler_payloads() +} + +// --------------------------------------------------------------------------- +// Given +// --------------------------------------------------------------------------- + +#[given("a wireframe echo server with a buffer capacity of {cap:usize} bytes")] +fn given_echo_server(unified_codec_world: &mut UnifiedCodecWorld, cap: usize) -> TestResult { + unified_codec_world.start_server(runtime()?, cap, false) +} + +#[given( + "a wireframe echo server with a buffer capacity of {cap:usize} bytes and fragmentation enabled" +)] +fn given_echo_server_fragmented( + unified_codec_world: &mut UnifiedCodecWorld, + cap: usize, +) -> TestResult { + unified_codec_world.start_server(runtime()?, cap, true) +} + +// --------------------------------------------------------------------------- +// When +// --------------------------------------------------------------------------- + +#[when("the client sends a {size:usize}-byte payload")] +fn when_client_sends_payload( + unified_codec_world: &mut UnifiedCodecWorld, + size: usize, +) -> TestResult { + run_async(unified_codec_world.send_payload(size)) +} + +#[when("the client sends a fragmented {size:usize}-byte payload")] +fn when_client_sends_fragmented( + unified_codec_world: &mut UnifiedCodecWorld, + size: usize, +) -> TestResult { + run_async(unified_codec_world.send_fragmented_payload(size)) +} + +#[when("the client sends {count:usize} sequential {size:usize}-byte payloads")] +fn when_client_sends_sequential( + unified_codec_world: &mut UnifiedCodecWorld, + count: usize, + size: usize, +) -> TestResult { + run_async(unified_codec_world.send_sequential_payloads(count, size)) +} + +// --------------------------------------------------------------------------- +// Then +// --------------------------------------------------------------------------- + +#[then("the handler receives the original payload")] +fn then_handler_receives_payload(unified_codec_world: &mut UnifiedCodecWorld) -> TestResult { + collect_and_verify_handler_payloads(unified_codec_world) +} + +#[then("the handler receives the reassembled payload")] +fn then_handler_receives_reassembled(unified_codec_world: &mut UnifiedCodecWorld) -> TestResult { + collect_and_verify_handler_payloads(unified_codec_world) +} + +#[then("the handler receives all {count:usize} payloads in order")] +fn then_handler_receives_all( + unified_codec_world: &mut UnifiedCodecWorld, + count: usize, +) -> TestResult { + run_async(unified_codec_world.collect_handler_payloads())?; + let observed = &unified_codec_world.handler_observed; + if observed.len() != count { + return Err(format!("expected {count} handler payloads, got {}", observed.len()).into()); + } + unified_codec_world.verify_handler_payloads() +} + +#[then("the client receives a response matching the original payload")] +fn then_client_receives_response(unified_codec_world: &mut UnifiedCodecWorld) -> TestResult { + run_async(unified_codec_world.collect_single_response())?; + unified_codec_world.verify_response_payloads()?; + run_async(unified_codec_world.await_server()) +} + +#[then("the client receives a fragmented response matching the original payload")] +fn then_client_receives_fragmented(unified_codec_world: &mut UnifiedCodecWorld) -> TestResult { + run_async(unified_codec_world.collect_fragmented_response())?; + unified_codec_world.verify_response_payloads()?; + run_async(unified_codec_world.await_server()) +} + +#[then("the client receives an unfragmented response matching the original payload")] +fn then_client_receives_unfragmented(unified_codec_world: &mut UnifiedCodecWorld) -> TestResult { + run_async(unified_codec_world.collect_single_response())?; + unified_codec_world.verify_unfragmented()?; + unified_codec_world.verify_response_payloads()?; + run_async(unified_codec_world.await_server()) +} + +#[then("the client receives {count:usize} responses matching the original payloads")] +fn then_client_receives_sequential( + unified_codec_world: &mut UnifiedCodecWorld, + count: usize, +) -> TestResult { + run_async(unified_codec_world.collect_sequential_responses(count))?; + unified_codec_world.verify_response_payloads()?; + run_async(unified_codec_world.await_server()) +} diff --git a/tests/unified_codec.rs b/tests/unified_codec.rs new file mode 100644 index 00000000..7654d73a --- /dev/null +++ b/tests/unified_codec.rs @@ -0,0 +1,281 @@ +//! Integration tests for the unified codec path. +//! +//! Validates that all outbound frames — handler responses, fragmented +//! payloads — pass through the [`FramePipeline`] before reaching the wire. +//! These tests exercise the codec path end-to-end via +//! `WireframeApp::handle_connection_result` over in-memory duplex streams. +#![cfg(not(loom))] + +use std::time::Duration; + +use futures::SinkExt; +use rstest::{fixture, rstest}; +use tokio::{io::AsyncWriteExt, sync::mpsc, time::timeout}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + app::{Envelope, Packet, WireframeApp}, + fragment::{FragmentationConfig, decode_fragment_payload}, +}; + +#[path = "common/fragment_helpers.rs"] +#[expect( + dead_code, + reason = "shared helper module; not all items used by every test binary" +)] +mod fragment_helpers; +#[path = "common/unified_codec_transport.rs"] +mod unified_codec_transport; + +use crate::{ + fragment_helpers::{ + CORRELATION, + ROUTE_ID, + TestError, + TestResult, + build_envelopes, + fragmentation_config, + make_handler, + read_response_payload, + send_envelopes, + spawn_app, + }, + unified_codec_transport::{recv_one, send_one}, +}; + +/// Default buffer capacity used in unified codec tests. +const CAPACITY: usize = 512; + +/// Shared harness for unified codec integration tests. +struct UnifiedCodecHarness { + client: Framed, + server: tokio::task::JoinHandle>, + rx: mpsc::UnboundedReceiver>, +} + +/// Helper to build an echo-style app with optional fragmentation. +fn echo_app( + config: Option, + tx: &mpsc::UnboundedSender>, +) -> TestResult { + let handler = make_handler(tx); + let mut app = WireframeApp::new()?.buffer_capacity(CAPACITY); + if let Some(c) = config { + app = app.fragmentation(Some(c)); + } + Ok(app.route(ROUTE_ID, handler)?) +} + +/// Build the unified codec test harness and return client/server test handles. +fn setup_harness(config: Option) -> TestResult { + let (tx, rx) = mpsc::unbounded_channel(); + let app = echo_app(config, &tx)?; + let (client, server) = spawn_app(app); + Ok(UnifiedCodecHarness { client, server, rx }) +} + +// --------------------------------------------------------------------------- +// Test: basic request-response passes through the unified pipeline +// --------------------------------------------------------------------------- + +#[fixture] +fn fragmented_harness() -> TestResult { + let config = fragmentation_config(CAPACITY)?; + setup_harness(Some(config)) +} + +#[rstest] +#[tokio::test] +async fn handler_response_round_trips_through_pipeline() -> TestResult { + let unfragmented_harness = setup_harness(None)?; + let UnifiedCodecHarness { + mut client, + server, + mut rx, + } = unfragmented_harness; + + let payload = vec![1, 2, 3, 4, 5]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + + send_one(&mut client, &request).await?; + client.get_mut().shutdown().await?; + + // Handler received the payload + let observed = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("handler payload missing"))?; + assert_eq!(observed, payload, "handler should receive original payload"); + + // Response arrives back + let response = recv_one(&mut client).await?; + let response_payload = response.into_parts().into_payload(); + assert_eq!( + response_payload, payload, + "response payload should match request" + ); + + server.await??; + Ok(()) +} + +// --------------------------------------------------------------------------- +// Test: fragmentation applies through the unified pipeline +// --------------------------------------------------------------------------- + +#[rstest] +#[tokio::test] +async fn fragmented_response_passes_through_pipeline( + fragmented_harness: TestResult, +) -> TestResult { + let config = fragmentation_config(CAPACITY)?; + let UnifiedCodecHarness { + mut client, + server, + mut rx, + } = fragmented_harness?; + + // Payload larger than fragment capacity to trigger fragmentation. + // Must also fragment the *request* so it fits in the codec frame. + let payload = vec![b'F'; 1_200]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + let envelopes = build_envelopes(request, &config, true)?; + + send_envelopes(&mut client, &envelopes).await?; + client.flush().await?; + client.get_mut().shutdown().await?; + + // Handler received the reassembled payload + let observed = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("handler payload missing"))?; + assert_eq!(observed, payload, "handler should receive original payload"); + + // Read fragmented response and reassemble + let response = read_response_payload(&mut client, &config).await?; + assert_eq!( + response, payload, + "reassembled response should match original payload" + ); + + server.await??; + Ok(()) +} + +// --------------------------------------------------------------------------- +// Test: small payload passes through without fragmentation +// --------------------------------------------------------------------------- + +#[rstest] +#[tokio::test] +async fn small_payload_passes_unfragmented( + fragmented_harness: TestResult, +) -> TestResult { + let UnifiedCodecHarness { + mut client, + server, + mut rx, + } = fragmented_harness?; + + let payload = vec![b'S'; 16]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + + send_one(&mut client, &request).await?; + client.get_mut().shutdown().await?; + + // Handler received the payload + let observed = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("handler payload missing"))?; + assert_eq!(observed, payload); + + // Response should be unfragmented + let response = recv_one(&mut client).await?; + let response_payload = response.into_parts().into_payload(); + + // Verify no fragment header present + assert!( + decode_fragment_payload(&response_payload)?.is_none(), + "small payload should not be fragmented" + ); + assert_eq!(response_payload, payload); + + server.await??; + Ok(()) +} + +// --------------------------------------------------------------------------- +// Test: multiple sequential requests through the pipeline +// --------------------------------------------------------------------------- + +#[rstest] +#[tokio::test] +async fn multiple_sequential_requests_through_pipeline() -> TestResult { + let unfragmented_harness = setup_harness(None)?; + let UnifiedCodecHarness { + mut client, + server, + mut rx, + } = unfragmented_harness; + + let payloads: Vec> = (0..5).map(|i| vec![i; 8]).collect(); + + for payload in &payloads { + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + send_one(&mut client, &request).await?; + } + client.get_mut().shutdown().await?; + + // All handlers should fire + for expected in &payloads { + let observed = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("handler payload missing"))?; + assert_eq!(&observed, expected); + } + + // All responses should arrive + for expected in &payloads { + let response = recv_one(&mut client).await?; + let response_payload = response.into_parts().into_payload(); + assert_eq!(&response_payload, expected); + } + + server.await??; + Ok(()) +} + +// --------------------------------------------------------------------------- +// Test: pipeline respects disabled fragmentation +// --------------------------------------------------------------------------- + +#[rstest] +#[tokio::test] +async fn pipeline_with_no_fragmentation_passes_large_payload() -> TestResult { + let unfragmented_harness = setup_harness(None)?; + let UnifiedCodecHarness { + mut client, + server, + mut rx, + } = unfragmented_harness; + + let payload = vec![b'L'; 256]; + let request = Envelope::new(ROUTE_ID, CORRELATION, payload.clone()); + + send_one(&mut client, &request).await?; + client.get_mut().shutdown().await?; + + let observed = timeout(Duration::from_secs(1), rx.recv()) + .await? + .ok_or(TestError::Setup("handler payload missing"))?; + assert_eq!(observed, payload); + + let response = recv_one(&mut client).await?; + let response_payload = response.into_parts().into_payload(); + assert_eq!(response_payload, payload); + assert!( + decode_fragment_payload(&response_payload)?.is_none(), + "response should not contain fragment headers when fragmentation is disabled" + ); + + server.await??; + Ok(()) +}