From 8fcce855599097b785f850cc5e6657ad561e1455 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 14 Mar 2026 12:59:20 +0000 Subject: [PATCH 1/8] docs(execplans): add draft execplan for streaming requests documentation closure Add a comprehensive draft ExecPlan outlining the closing documentation phase for roadmap items 8.6.1 to 8.6.3. This new document audits existing materials, aligns multi-document claims, specifies constraints, risks, and test coverage gaps, and plans targeted edits and validation steps to finalize streaming request body documentation, MessageAssembler usage, and memory budgets guidance. It ensures consistency across design docs, user guide, tests, and enforcement of quality gates before marking roadmap items complete. Co-authored-by: devboxerhub[bot] --- ...te-documentation-for-streaming-requests.md | 614 ++++++++++++++++++ 1 file changed, 614 insertions(+) create mode 100644 docs/execplans/8-6-1-update-documentation-for-streaming-requests.md diff --git a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md new file mode 100644 index 00000000..0eba0a41 --- /dev/null +++ b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md @@ -0,0 +1,614 @@ +# Close phase 8.6 documentation for streaming requests and shared message assembly + +This ExecPlan (execution plan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: DRAFT + +## Purpose / big picture + +Roadmap items `8.6.1` through `8.6.3` close the documentation phase for +streaming request bodies, the generic `MessageAssembler` abstraction, and the +standardized per-connection memory-budget model introduced by +`docs/adr-002-streaming-requests-and-shared-message-assembly.md`. + +After this work, a protocol author or library consumer should be able to read +the design documents and understand: + +- where transport fragmentation stops and protocol-level message assembly + begins; +- how `RequestParts`, `RequestBodyStream`, and `StreamingBody` fit into the + inbound request path; +- how `MessageAssembler` composes with transport fragmentation and handler + dispatch; +- how per-connection budgets, soft pressure, and hard caps apply across the + shared inbound assembly pipeline; and +- which public APIs and behavioural guarantees are stable enough to rely on. + +Success is observable when: + +1. `docs/generic-message-fragmentation-and-re-assembly-design.md` contains + composition guidance that matches the implemented ordering and budget + semantics. +2. `docs/multi-packet-and-streaming-responses-design.md` contains a streaming + request-body section that explains the implemented inbound API rather than + only outbound symmetry. +3. `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` + describes `MessageAssembler` and the three-tier budget model as current + capability, not as a vague future idea. +4. `docs/users-guide.md` is updated anywhere the public interface or consumer + guidance would otherwise be stale or incomplete. +5. The feature is backed by both `rstest` and `rstest-bdd` coverage, including + closing the current behavioural-test gap for streaming request bodies. +6. `docs/roadmap.md` marks `8.6.1`, `8.6.2`, and `8.6.3` done only after all + relevant gates pass. + +## Constraints + +- Scope is the documentation closure for roadmap phase `8.6`, plus any tests + and public API examples needed to make the documented behaviour trustworthy. +- Do not introduce new product behaviour merely to make the documents read + better. If documentation work exposes an implementation bug, fix only the + smallest issue required to align the code with ADR 0002 and document the + decision. +- Preserve existing public API signatures unless a contradiction in the current + API surface makes the documentation impossible to write accurately. If that + happens, stop and escalate. +- Prefer editing the existing relevant sections in place instead of appending + duplicate sections that drift later. +- Keep all new or expanded source and test files at or below the repository's + 400-line guideline. +- Use `rstest` for unit or integration validation and `rstest-bdd` for + behavioural validation. +- Use the existing design docs as the source of truth to reconcile wording: + `docs/adr-002-streaming-requests-and-shared-message-assembly.md`, + `docs/hardening-wireframe-a-guide-to-production-resilience.md`, + `docs/generic-message-fragmentation-and-re-assembly-design.md`, + `docs/multi-packet-and-streaming-responses-design.md`, and + `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`. +- Keep documentation and comments in en-GB-oxendict spelling. +- Use Makefile targets for final quality gates, and run them with + `set -o pipefail` and `tee` so truncated terminal output does not hide + failures. +- Do not mark roadmap item `8.6` complete until documentation, tests, and the + user-facing guide are all in sync. + +## Tolerances (exception triggers) + +- Scope: if the work grows beyond 16 touched files or 1,000 net lines, stop + and re-evaluate the split between documentation closure and follow-up + implementation work. +- Interface: if any public API signature must change, stop and escalate. +- Behaviour: if accurate documentation would require a semantic redesign of the + inbound assembly pipeline rather than a narrow fix, stop and escalate. +- Dependencies: if any new external crate is required, stop and escalate. +- Test gap: if adding behavioural coverage for streaming request bodies + requires a second new BDD suite beyond one focused `streaming_request` + feature flow, stop and re-scope. +- Baseline quality gates: if repo-wide Markdown or doctest gates fail because + of unrelated pre-existing issues, document the exact failure, do not mark the + roadmap item done, and escalate with options. +- Iterations: if the same targeted gate still fails after 3 focused attempts, + stop and escalate with evidence. + +## Risks + +- Risk: the target documents already contain partial material for roadmap items + `8.6.1` through `8.6.3`, so careless edits could duplicate or contradict + existing sections instead of clarifying them. Severity: medium Likelihood: + high Mitigation: start with a claim-by-claim audit against ADR 0002, then + revise the existing sections in place. + +- Risk: the user guide already documents `RequestParts`, `MessageAssembler`, + and `MemoryBudgets`, so a narrow docs-only pass could miss smaller public API + examples in Rustdoc comments. Severity: medium Likelihood: medium Mitigation: + audit both `docs/users-guide.md` and the Rustdoc on `src/request/mod.rs`, + `src/extractor/streaming.rs`, `src/app/builder/protocol.rs`, and + `src/app/builder/config.rs`. + +- Risk: streaming request bodies currently have `rstest` coverage but no + matching `rstest-bdd` flow, which blocks the prompt's requirement for both + unit and behavioural validation of the feature. Severity: high Likelihood: + high Mitigation: add one focused BDD suite for streaming request bodies and + keep it small by reusing existing fixture patterns. + +- Risk: repo-wide Markdown validation may still fail on unrelated historical + docs, preventing clean roadmap closure. Severity: medium Likelihood: medium + Mitigation: run the full gates near the end, capture logs, and repair any + failures caused by this change immediately. Escalate only if the remaining + failure is unrelated baseline debt. + +## Progress + +- [x] (2026-03-14 00:00Z) Drafted this ExecPlan after auditing the roadmap, + ADR 0002, current design docs, user guide, and the existing test surface. +- [ ] Stage A: build a documentation-claim matrix and decide the minimum set of + in-place edits needed across the three roadmap documents. +- [ ] Stage B: revise the three target design documents and record any new + design decisions in the relevant design document. +- [ ] Stage C: update `docs/users-guide.md` and any public Rustdoc examples + that need to reflect the documented public interface. +- [ ] Stage D: close the missing behavioural-test coverage for streaming + request bodies and add any smaller focused `rstest` coverage required by new + claims. +- [ ] Stage E: run all relevant quality gates and mark roadmap items `8.6.1` + through `8.6.3` done. + +## Surprises & discoveries + +- Observation: all three target documents already contain partial material for + the roadmap item. The work is not blank-sheet authoring; it is a + harmonization pass. Evidence: section `9` in + `docs/generic-message-fragmentation-and-re-assembly-design.md`, section `11` + in `docs/multi-packet-and-streaming-responses-design.md`, and the + `MessageAssembler` and memory-cap sections in + `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`. + Impact: edit existing sections in place instead of adding new headings + unless structure is genuinely missing. + +- Observation: `docs/users-guide.md` already documents `RequestParts`, + `MessageAssembler`, and `MemoryBudgets`. Evidence: the guide sections around + lines `761`, `788`, `842`, and `1074`. Impact: user-guide work should be a + targeted sync pass, not a full new chapter. + +- Observation: streaming request bodies currently have strong `rstest` + coverage in `tests/streaming_request.rs`, but there is no matching + behavioural suite under `tests/features/`. Impact: the implementation cannot + honestly claim both unit and behavioural validation for the inbound streaming + half of ADR 0002 until that BDD gap is closed. + +- Observation: `MessageAssembler` and memory budgets already have substantial + behavioural coverage through existing feature files such as + `tests/features/message_assembler.feature`, + `tests/features/memory_budgets.feature`, + `tests/features/budget_enforcement.feature`, + `tests/features/budget_cleanup.feature`, and + `tests/features/budget_transitions.feature`. Impact: reuse those suites as + evidence first and add only narrowly scoped follow-up tests if new + documentation claims are not already covered. + +## Decision log + +- Decision: treat roadmap phase `8.6` as a documentation harmonization and + validation-closure task, not as a new architecture design pass. Rationale: + ADR 0002 and the implementation already define the feature; the missing work + is accurate narrative, cross-linking, and proof that the documented behaviour + is exercised. Date/Author: 2026-03-14 / Codex + +- Decision: add one focused `rstest-bdd` suite for streaming request bodies + unless the audit uncovers an existing hidden equivalent. Rationale: the + prompt requires both unit and behavioural validation, and the current inbound + streaming coverage is unit-only. Date/Author: 2026-03-14 / Codex + +- Decision: update `docs/users-guide.md` and public Rustdoc examples only where + public consumer guidance would otherwise be stale or incomplete. Rationale: + this keeps scope aligned with the roadmap item while still meeting the + prompt's requirement to reflect public interface changes. Date/Author: + 2026-03-14 / Codex + +- Decision: mark roadmap items `8.6.1` through `8.6.3` done only after the + docs, behavioural coverage, doctest gates, and repo quality gates all pass. + Rationale: a documentation checkbox should mean the docs are trustworthy and + demonstrably aligned with the code. Date/Author: 2026-03-14 / Codex + +## Outcomes & retrospective + +This plan is still in draft state. No implementation has started yet. + +Expected deliverables at completion: + +- revised design guidance in the three roadmap-target documents; +- an updated `docs/users-guide.md` section set wherever public consumer-facing + guidance is stale; +- one focused behavioural suite for streaming request bodies, plus any narrow + unit-test additions needed to support new claims; +- updated roadmap checkboxes for `8.6.1`, `8.6.2`, and `8.6.3`; and +- full captured gate logs proving the documentation and validation changes are + complete. + +## Context and orientation + +The relevant implementation and documentation surface already exists. The task +is to align and close it. + +### Existing documentation surface + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` is the + normative decision record. It defines: + - `RequestParts` and `RequestBodyStream`; + - the `MessageAssembler` hook and its composition with transport + fragmentation; and + - standardized per-connection memory budgets, soft pressure, and hard caps. +- `docs/generic-message-fragmentation-and-re-assembly-design.md` already has + section `9`, "Composition with streaming requests and MessageAssembler". It + needs to become operational guidance for protocol implementers, not just a + short restatement of ADR 0002. +- `docs/multi-packet-and-streaming-responses-design.md` already has section + `11`, "Streaming request bodies", and subsection `11.4`, "Composition with + MessageAssembler". These sections currently lean toward outbound symmetry and + need a clearer inbound request-body story. +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` + already references both `MessageAssembler` and memory budgets, but parts of + the text still read as aspirational future design rather than implemented + maturity. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` is the + resilience baseline for the budget story. It is a consistency source even if + it does not end up being edited. +- `docs/users-guide.md` already exposes the relevant public API concepts: + `RequestParts`, `RequestBodyStream`, `StreamingBody`, `MessageAssembler`, and + `MemoryBudgets`. + +### Existing implementation surface + +- `src/request/mod.rs` defines `RequestParts`, `RequestBodyStream`, + `RequestBodyReader`, and `body_channel`. +- `src/extractor/streaming.rs` defines the `StreamingBody` extractor. +- `src/app/builder/protocol.rs` exposes + `WireframeApp::with_message_assembler` and `WireframeApp::message_assembler`. +- `src/app/builder/config.rs` exposes `WireframeApp::memory_budgets`. +- `src/app/frame_handling/assembly.rs` and + `src/app/frame_handling/backpressure.rs` apply message-assembly and budget + enforcement in the inbound runtime path. + +### Existing test surface + +- `tests/streaming_request.rs` provides `rstest` coverage for inbound request + streaming, body readers, error propagation, ordering, and back-pressure. +- `tests/features/message_assembler.feature` plus its fixture, steps, and + scenarios cover the public `MessageAssembler` contract behaviourally. +- `tests/features/memory_budgets.feature`, + `tests/features/budget_enforcement.feature`, + `tests/features/budget_cleanup.feature`, + `tests/features/budget_transitions.feature`, + `tests/features/memory_budget_backpressure.feature`, + `tests/features/memory_budget_hard_cap.feature`, and + `tests/features/derived_memory_budgets.feature` already cover the budget + model behaviourally. +- There is no `tests/features/streaming_request.feature` or equivalent BDD + flow today. That is the clearest validation gap for this roadmap closure. + +### Terms used in this plan + +- Transport fragmentation: splitting one transport packet into several bounded + fragments and reassembling them before routing. +- Protocol-level message assembly: reconstructing or streaming a request body + whose bytes arrive across multiple protocol packets. +- Soft pressure: short read pauses once buffered bytes reach 80% of the active + aggregate budget. +- Hard cap: immediate `InvalidData` termination once buffered bytes strictly + exceed the active aggregate budget. + +## Plan of work + +### Stage A: audit claims and decide what must change + +Build a simple matrix that maps ADR 0002 claims to the current docs and tests. +Do not change files yet. The matrix should answer: + +1. Which claims already exist in each roadmap-target document. +2. Which claims are missing, vague, duplicated, or contradictory. +3. Which public consumer-facing examples in `docs/users-guide.md` or Rustdoc + need refreshing. +4. Which claims are already backed by `rstest` and `rstest-bdd`, and which + still lack one side of that proof. + +The most likely output of this stage is a short edit list such as: + +- strengthen composition guidance in section `9` of the fragmentation design; +- rewrite section `11` of the streaming-responses design so the inbound API is + concrete and implementation-aligned; +- revise the capability-maturity document so it describes the current + `MessageAssembler` and three-tier budget model without future-tense drift; +- sync `docs/users-guide.md` and any public Rustdoc examples that would + otherwise contradict those documents; and +- add one behavioural flow for streaming request bodies. + +Go/no-go: if this audit shows that the current implementation contradicts ADR +0002 in a way that requires more than narrow fixes, stop and escalate before +editing. + +### Stage B: revise the three roadmap-target design documents + +Edit the target docs in place. + +In `docs/generic-message-fragmentation-and-re-assembly-design.md`, revise +section `9` so it explains composition as guidance for protocol implementers. +It should explicitly answer: + +1. what transport fragmentation is responsible for; +2. what `MessageAssembler` is responsible for; +3. the exact ordering through the inbound pipeline; +4. how budgets are shared across both layers; and +5. what operators or protocol authors must configure so the two layers do not + fight each other. + +In `docs/multi-packet-and-streaming-responses-design.md`, revise section `11` +so the inbound request-body model is specific and actionable. It should cover: + +1. when a handler receives `RequestParts` plus `RequestBodyStream`; +2. how `StreamingBody` and `RequestBodyReader` relate to direct stream + consumption; +3. how back-pressure works on inbound streaming requests; and +4. how `MessageAssembler` turns multi-packet request bodies into either buffered + or streaming delivery. + +In +`docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`, + revise the relevant sections so they describe current capability maturity: + +1. `MessageAssembler` is part of the implemented feature set, not just a + concept; +2. per-connection budgets are standardized and derived from frame budgets when + left implicit; and +3. the soft-pressure and hard-cap behaviour is part of the production + hardening story. + +If any new normative decision is made while clarifying these documents, record +it in the most relevant design document. Use ADR 0002 only if the decision is +truly architectural rather than editorial. + +Validation at the end of Stage B: + +- the three target docs no longer disagree about pipeline order or budget + semantics; +- each doc links back to ADR 0002 where normative detail lives; and +- no duplicate sections were created unnecessarily. + +### Stage C: sync the public consumer-facing documentation + +Update `docs/users-guide.md` anywhere a library consumer would otherwise be +misled or left without enough guidance. The likely areas are: + +- the `RequestParts` and streaming request-body sections; +- the `MessageAssembler` registration and usage section; and +- the memory-budget section, especially the three-tier protection model and the + derived default story. + +Then audit the public Rustdoc examples in: + +- `src/request/mod.rs` +- `src/extractor/streaming.rs` +- `src/app/builder/protocol.rs` +- `src/app/builder/config.rs` + +Only edit those comments if the examples or wording would now be stale. The +goal is consistency between the manual and the API docs, not broad comment +rewriting. + +Validation at the end of Stage C: + +- a new user can follow the guide without needing to read the ADR first; and +- any Rustdoc examples touched by this stage are ready for `make test-doc`. + +### Stage D: close validation gaps before claiming the docs are complete + +Start by re-running the targeted existing tests that already back the feature. +If the doc changes introduce new claims that are not obviously covered, add the +smallest focused tests that prove them. + +The currently known gap is behavioural coverage for streaming request bodies. +Add one focused BDD flow: + +- `tests/features/streaming_request.feature` +- `tests/fixtures/streaming_request.rs` +- `tests/steps/streaming_request_steps.rs` +- `tests/scenarios/streaming_request_scenarios.rs` + +Register any new fixture, step, or scenario modules in the existing test module +tree. + +Keep the scenarios small and observable. Good candidates are: + +1. a handler consumes a streaming body incrementally and receives the chunks in + order; +2. back-pressure suspends the producer while the bounded body channel is full; + and +3. an inbound streaming-body error surfaces to the handler or reader as + `InvalidData` or the exact propagated I/O error, depending on the path under + test. + +Use the established `rstest-bdd` rules from this repository: + +- the fixture parameter name in every step must match the fixture function + name exactly; +- do not prefix unused parameters with `_`; bind them normally and discard them + inside the function body if needed; and +- prefer a focused world fixture over sprawling shared mutable state. + +If Stage B or Stage C added claims about pipeline composition that are not +already covered by existing tests, prefer adding focused `rstest` coverage in +existing files such as `tests/streaming_request.rs` or +`src/app/frame_handling/assembly_tests.rs` rather than building another large +BDD harness. + +Validation at the end of Stage D: + +- the streaming request body feature now has both `rstest` and `rstest-bdd` + coverage; +- all documentation claims added in Stages B and C are backed by tests or + existing implementation evidence; and +- no new test file exceeds the 400-line limit. + +### Stage E: run the full gates and close the roadmap item + +Run the relevant repository gates with log capture. Because this task may touch +manual docs, user-guide docs, and public Rustdoc comments, the final gate set +should include: + +- formatting and Markdown validation; +- Mermaid validation; +- Rust formatting, linting, and tests; +- doctests and the doctest-benchmark gate; and +- the roadmap checkbox update. + +Only after all gates pass should `docs/roadmap.md` change: + +- `8.6.1` to `[x]` +- `8.6.2` to `[x]` +- `8.6.3` to `[x]` + +Do not update those checkboxes earlier, or the roadmap will stop reflecting +reality. + +## Concrete steps + +Run all commands from `/home/user/project`. + +1. Audit the current docs and tests. + + ```bash + rg -n "MessageAssembler|RequestBodyStream|memory budget|soft-limit|hard-cap" \ + docs/generic-message-fragmentation-and-re-assembly-design.md \ + docs/multi-packet-and-streaming-responses-design.md \ + docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md \ + docs/users-guide.md \ + docs/adr-002-streaming-requests-and-shared-message-assembly.md \ + tests/streaming_request.rs \ + tests/features/message_assembler.feature \ + tests/features/memory_budgets.feature + ``` + + Expected evidence: matching hits in all four documentation files, `rstest` + coverage in `tests/streaming_request.rs`, and no existing + `tests/features/streaming_request.feature`. + +2. Run the targeted existing validation first. + + ```bash + set -o pipefail && cargo test --test streaming_request 2>&1 | tee /tmp/8-6-streaming-request.log + set -o pipefail && cargo test --test bdd --all-features -- message_assembler 2>&1 | tee /tmp/8-6-message-assembler-bdd.log + set -o pipefail && cargo test --test bdd --all-features -- memory_budgets 2>&1 | tee /tmp/8-6-memory-budgets-bdd.log + set -o pipefail && cargo test --test bdd --all-features -- budget_ 2>&1 | tee /tmp/8-6-budget-bdd.log + ``` + + Expected transcript fragments: + + ```plaintext + test result: ok. + ``` + +3. After editing docs and tests, run the final gates with logs. + + ```bash + set -o pipefail && timeout 300 make fmt 2>&1 | tee /tmp/8-6-fmt.log + set -o pipefail && timeout 300 make markdownlint MDLINT=/root/.bun/bin/markdownlint-cli2 2>&1 | tee /tmp/8-6-markdownlint.log + set -o pipefail && timeout 300 make nixie 2>&1 | tee /tmp/8-6-nixie.log + set -o pipefail && timeout 300 make check-fmt 2>&1 | tee /tmp/8-6-check-fmt.log + set -o pipefail && timeout 300 make lint 2>&1 | tee /tmp/8-6-lint.log + set -o pipefail && timeout 300 make test 2>&1 | tee /tmp/8-6-test.log + set -o pipefail && timeout 300 make test-doc 2>&1 | tee /tmp/8-6-test-doc.log + set -o pipefail && timeout 300 make doctest-benchmark 2>&1 | tee /tmp/8-6-doctest-benchmark.log + ``` + + Expected transcript fragments: + + ```plaintext + test result: ok. + Finished + ``` + +4. Update the roadmap checkboxes only after Step 3 succeeds. + +## Validation and acceptance + +Acceptance means all of the following are true: + +- Documentation: + - the three roadmap-target design documents describe the same composition + order and budget semantics; + - `docs/users-guide.md` does not leave library consumers with stale guidance + about streaming requests, `MessageAssembler`, or memory budgets; and + - any new design decisions are written in the relevant design document. + +- Tests: + - `tests/streaming_request.rs` passes; + - the `MessageAssembler` BDD suite passes; + - the memory-budget BDD suites pass; and + - the new streaming-request BDD suite passes. + +- Quality gates: + - `make fmt` passes; + - `make markdownlint MDLINT=/root/.bun/bin/markdownlint-cli2` passes; + - `make nixie` passes; + - `make check-fmt` passes; + - `make lint` passes; + - `make test` passes; + - `make test-doc` passes; and + - `make doctest-benchmark` passes. + +- Roadmap closure: + - `docs/roadmap.md` marks `8.6.1`, `8.6.2`, and `8.6.3` as done. + +## Idempotence and recovery + +All audit and test commands in this plan are safe to rerun. + +Documentation edits are also safe to redo, but `make fmt` may rewrite Markdown +outside the immediate paragraphs that were touched. After each formatting run: + +1. inspect `git diff --stat`; +2. confirm the changes are intentional; +3. if `make fmt` rewrote an unrelated document, either keep the clean + normalization if harmless or restore only that unrelated file before + continuing. + +If a gate fails halfway through: + +1. read the saved log under `/tmp/8-6-*.log`; +2. fix the smallest relevant issue; +3. rerun only the failed targeted gate first; and +4. rerun the full final gate set before touching the roadmap checkboxes. + +If repo-wide Markdown or doctest gates fail only because of unrelated baseline +debt, stop before marking the roadmap item done and record the exact blocking +files in `Decision Log`. + +## Artifacts and notes + +Key evidence gathered during planning: + +```plaintext +- Existing docs already contain partial 8.6 content. +- docs/users-guide.md already documents RequestParts, MessageAssembler, and MemoryBudgets. +- tests/streaming_request.rs exists and uses rstest. +- No tests/features/streaming_request.feature exists today. +``` + +Target new artefacts if the audit confirms the current gap: + +```plaintext +tests/features/streaming_request.feature +tests/fixtures/streaming_request.rs +tests/steps/streaming_request_steps.rs +tests/scenarios/streaming_request_scenarios.rs +``` + +## Interfaces and dependencies + +No new external dependencies are expected. + +The plan depends on these existing public interfaces remaining stable: + +- `wireframe::request::RequestParts` +- `wireframe::request::RequestBodyStream` +- `wireframe::request::RequestBodyReader` +- `wireframe::extractor::StreamingBody` +- `wireframe::message_assembler::MessageAssembler` +- `wireframe::app::MemoryBudgets` +- `WireframeApp::with_message_assembler(...)` +- `WireframeApp::message_assembler()` +- `WireframeApp::memory_budgets(...)` + +The plan depends on these existing validation frameworks: + +- `rstest` for focused Rust tests; +- `rstest-bdd` for behavioural tests; +- repo Makefile targets for formatting, linting, testing, doctests, and + documentation validation. + +Revision note (2026-03-14): Initial draft created after auditing the roadmap, +ADR 0002, current docs, user-guide coverage, and the existing tests. The +remaining work now explicitly includes a focused `rstest-bdd` streaming-request +suite because the current inbound streaming coverage is unit-only. From b95cdc4186ab20964899a7f8b2bb87933d163f18 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 14 Mar 2026 14:57:16 +0000 Subject: [PATCH 2/8] feat(streaming request): add support for streaming request bodies with memory budgets - Completed documentation updates for streaming request body APIs and message assembler integration. - Implement inbound streaming request bodies exposing RequestParts and RequestBodyStream or StreamingBody. - Enforce shared per-connection memory budgets with per-frame rejection, soft read pacing, and hard abort. - Added builder support for configuring memory budgets and message assembler hook. - Introduced comprehensive behavioural tests using rstest-bdd for back-pressure, error propagation, and AsyncRead adapter correctness. - Enhanced public API guidance ensuring protocol authors can adopt streaming incrementally and preserve ergonomic buffered requests. This allows handlers to consume large inbound payloads incrementally while preserving back-pressure and reliable error handling. Co-authored-by: devboxerhub[bot] --- ...te-documentation-for-streaming-requests.md | 51 ++-- ...ge-fragmentation-and-re-assembly-design.md | 58 +++- ...i-packet-and-streaming-responses-design.md | 58 +++- docs/roadmap.md | 6 +- ...-set-philosophy-and-capability-maturity.md | 27 +- docs/users-guide.md | 24 +- src/app/builder/config.rs | 5 + src/app/builder/protocol.rs | 7 +- tests/features/streaming_request.feature | 24 ++ tests/fixtures/mod.rs | 1 + tests/fixtures/streaming_request.rs | 281 ++++++++++++++++++ tests/scenarios/mod.rs | 1 + .../scenarios/streaming_request_scenarios.rs | 29 ++ tests/steps/mod.rs | 1 + tests/steps/streaming_request_steps.rs | 104 +++++++ 15 files changed, 632 insertions(+), 45 deletions(-) create mode 100644 tests/features/streaming_request.feature create mode 100644 tests/fixtures/streaming_request.rs create mode 100644 tests/scenarios/streaming_request_scenarios.rs create mode 100644 tests/steps/streaming_request_steps.rs diff --git a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md index 0eba0a41..9a459b97 100644 --- a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md +++ b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md @@ -5,7 +5,7 @@ This ExecPlan (execution plan) is a living document. The sections `Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work proceeds. -Status: DRAFT +Status: COMPLETE ## Purpose / big picture @@ -124,16 +124,16 @@ Success is observable when: - [x] (2026-03-14 00:00Z) Drafted this ExecPlan after auditing the roadmap, ADR 0002, current design docs, user guide, and the existing test surface. -- [ ] Stage A: build a documentation-claim matrix and decide the minimum set of +- [x] Stage A: build a documentation-claim matrix and decide the minimum set of in-place edits needed across the three roadmap documents. -- [ ] Stage B: revise the three target design documents and record any new +- [x] Stage B: revise the three target design documents and record any new design decisions in the relevant design document. -- [ ] Stage C: update `docs/users-guide.md` and any public Rustdoc examples +- [x] Stage C: update `docs/users-guide.md` and any public Rustdoc examples that need to reflect the documented public interface. -- [ ] Stage D: close the missing behavioural-test coverage for streaming +- [x] Stage D: close the missing behavioural-test coverage for streaming request bodies and add any smaller focused `rstest` coverage required by new claims. -- [ ] Stage E: run all relevant quality gates and mark roadmap items `8.6.1` +- [x] Stage E: run all relevant quality gates and mark roadmap items `8.6.1` through `8.6.3` done. ## Surprises & discoveries @@ -169,6 +169,13 @@ Success is observable when: evidence first and add only narrowly scoped follow-up tests if new documentation claims are not already covered. +- Observation: the missing behavioural gap for inbound streaming requests could + be closed without introducing another runtime-heavy integration harness. + Evidence: a focused `rstest-bdd` fixture around `body_channel`, + `StreamingBody`, and `RequestBodyReader` covered the public API guarantees + that were previously unit-test-only. Impact: phase `8.6` stayed within scope + and avoided duplicating the heavier inbound message-assembly harness. + ## Decision log - Decision: treat roadmap phase `8.6` as a documentation harmonization and @@ -193,20 +200,30 @@ Success is observable when: Rationale: a documentation checkbox should mean the docs are trustworthy and demonstrably aligned with the code. Date/Author: 2026-03-14 / Codex -## Outcomes & retrospective - -This plan is still in draft state. No implementation has started yet. +- Decision: cover the streaming-request behavioural gap at the public request + API boundary instead of through a second full inbound-actor integration + fixture. Rationale: the missing guarantees were handler-facing stream + semantics (`AsyncRead` adaptation, back-pressure, and error propagation), and + those behaviours are exercised more directly through the request API than + through another transport harness. Date/Author: 2026-03-14 / Codex -Expected deliverables at completion: +## Outcomes & retrospective -- revised design guidance in the three roadmap-target documents; -- an updated `docs/users-guide.md` section set wherever public consumer-facing - guidance is stale; -- one focused behavioural suite for streaming request bodies, plus any narrow - unit-test additions needed to support new claims; +Completed deliverables: + +- revised composition and budget guidance in + `docs/generic-message-fragmentation-and-re-assembly-design.md`; +- revised inbound streaming-request guidance in + `docs/multi-packet-and-streaming-responses-design.md`; +- revised capability-maturity language in + `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`; +- synced public guidance in `docs/users-guide.md` and public Rustdoc on + `WireframeApp::with_message_assembler` and `WireframeApp::memory_budgets`; +- a focused `rstest-bdd` suite for `tests/features/streaming_request.feature`; - updated roadmap checkboxes for `8.6.1`, `8.6.2`, and `8.6.3`; and -- full captured gate logs proving the documentation and validation changes are - complete. +- captured gate logs for `make fmt`, `make markdownlint`, `make check-fmt`, + `make lint`, `make test`, `make test-doc`, `make doctest-benchmark`, and + `make nixie`. ## Context and orientation diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 31a491df..eeaa01d8 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -437,6 +437,20 @@ When both layers are enabled, Wireframe applies them in the following order: - a streaming request body that the handler consumes incrementally. 4. Route and invoke handlers. +In operational terms, each layer owns a distinct hand-off: + +- the transport codec and optional `FragmentAdapter` are responsible only for + delivering one complete protocol packet; +- `MessageAssembler::parse_frame_header` classifies that packet as either the + first frame or a continuation frame for one logical message; +- Wireframe strips the parsed protocol header, tracks assembly state by + message key, and decides whether the body should stay buffered or become a + stream; and +- handlers observe only the protocol-facing request shape: + - a buffered payload via the existing request path, or + - `RequestParts` plus `RequestBodyStream` / `StreamingBody` for incremental + consumption. + For screen readers: the following sequence diagram shows the decode → fragment reassembly → message assembly → handler pipeline. @@ -468,18 +482,38 @@ on the app instance and is applied in the inbound runtime pipeline in `WireframeApp` connection handling, after transport reassembly and before handler dispatch. +For protocol authors, the boundary rule is simple: + +- use fragmentation when one protocol packet exceeds the transport frame + budget; +- use `MessageAssembler` when one logical request spans multiple protocol + packets; and +- enable both when a large logical request may itself contain transport + fragments. + +Fragmentation is transport plumbing. `MessageAssembler` is protocol continuity. + ### 9.3 Memory budget integration Per-connection memory budgets apply across both layers. Budgets cover: -- bytes buffered per message (existing `max_message_size`); +- bytes buffered per message; - bytes buffered per connection; and - bytes buffered across in-flight assemblies. -When budgets are approached, Wireframe applies back-pressure by pausing further -reads and assembly work. When a hard cap is exceeded, Wireframe aborts early, -releases partial state, and surfaces `std::io::ErrorKind::InvalidData` from the -inbound processing path. +These limits are configured through `WireframeApp::memory_budgets(...)`. When +that builder is not called explicitly, Wireframe derives defaults from the +current `buffer_capacity`, so the same protection model still applies. + +Budget enforcement is shared across the full inbound assembly pipeline: + +1. **Per-frame enforcement** rejects any newly arrived frame that would exceed + the configured message or aggregate caps. +2. **Soft-pressure pacing** pauses inbound reads once buffered bytes reach 80% + of the smaller aggregate budget (`bytes_per_connection` and + `bytes_in_flight`). +3. **Hard-cap abort** terminates the connection if total buffered bytes + strictly exceed that aggregate cap. The soft-limit implementation paces reads by inserting a short pause before polling the next inbound frame once buffered bytes reach 80% of the smaller @@ -490,9 +524,17 @@ aggregate cap (100%), the connection is terminated immediately with `InvalidData` as a defence-in-depth safety net. If both transport fragmentation and `MessageAssembler` are enabled, the -effective message cap is whichever guard triggers first. Operators should set -the fragmentation `max_message_size` and the message assembly per-message cap -to compatible values to avoid surprising early termination. +effective message cap is whichever guard triggers first: + +- fragmentation's `max_message_size` for one reassembled transport packet; or +- message assembly's `bytes_per_message` budget for one logical request. + +Operators should set those limits to compatible values. Otherwise a request may +be valid at the protocol layer but still terminate early at the transport +boundary, or vice versa. + +Single-frame requests that complete immediately do not count against aggregate +buffer budgets because they do not remain buffered in assembly state. See [ADR 0002][adr-0002] for the complete budget configuration surface and failure mode semantics. diff --git a/docs/multi-packet-and-streaming-responses-design.md b/docs/multi-packet-and-streaming-responses-design.md index dccf9ec5..febb7762 100644 --- a/docs/multi-packet-and-streaming-responses-design.md +++ b/docs/multi-packet-and-streaming-responses-design.md @@ -577,18 +577,31 @@ Handlers that opt into streaming request bodies receive two components: - `RequestBodyStream` yields body chunks incrementally, allowing handlers to process large payloads without buffering the entire message in memory. -See [ADR 0002][adr-0002] for the canonical type definitions. The exact field -names and types may evolve; protocol authors should consult the published API -documentation for current signatures. +In the current API, `RequestParts` exposes `id()`, `correlation_id()`, and +`metadata()`. `RequestBodyStream` is a pinned boxed stream of +`Result`, and `RequestBodyReader` adapts that stream to +`AsyncRead` for parser reuse. Handlers can also use the `StreamingBody` +extractor when they prefer an extractor-based signature over taking the stream +type directly. + +The key design point is that routing metadata becomes available before the body +is complete. A handler can inspect `RequestParts` immediately, then decide +whether to consume the body lazily, stream it into a parser, or stop early. ### 11.2 Opt-in semantics The default remains "buffered request" to preserve Wireframe's existing transparent assembly ergonomics for small messages and simple protocols. -Handlers MAY opt into streaming by declaring a compatible extractor signature. +Handlers opt into streaming by declaring a compatible signature such as: + +- `async fn handle(parts: RequestParts, body: RequestBodyStream)`; +- `async fn handle(parts: RequestParts, body: StreamingBody)`; or +- a handler that converts `StreamingBody` into `RequestBodyReader` for + `AsyncRead`-based parsing. -Wireframe MAY expose an `AsyncRead` adaptor for `RequestBodyStream` so protocol -crates can reuse existing parsers that expect `AsyncRead` rather than `Stream`. +If a handler does not opt in, Wireframe preserves the buffered request path. +This keeps the small-message path ergonomic and means protocol crates can adopt +streaming only where large inbound bodies justify the extra complexity. ### 11.3 Symmetry with response streaming @@ -602,6 +615,15 @@ Streaming request bodies mirror `Response::Stream`: Both directions apply the same per-connection memory budgets and back-pressure semantics, ensuring consistent resource accounting across the duplex channel. +The symmetry is conceptual, not identical in shape: + +- outbound streaming emits protocol frames chosen by the handler; and +- inbound streaming emits request-body chunks after Wireframe has already + separated protocol metadata into `RequestParts`. + +That separation is what lets request handlers start work before the full body +arrives without exposing lower transport concerns. + ### 11.4 Composition with MessageAssembler When a protocol uses the `MessageAssembler` abstraction for multi-frame message @@ -612,7 +634,29 @@ assembly, the assembler produces either: The assembler handles protocol-specific continuity rules (ordering, missing frames, duplicate frames) while Wireframe provides shared buffering machinery -and limit enforcement. See [ADR 0002][adr-0002] for the complete specification. +and limit enforcement. + +The inbound ordering is fixed: + +1. transport framing decodes one packet; +2. optional transport fragmentation reassembles that packet if required; +3. `MessageAssembler` maps protocol packets into one logical request; and +4. the handler receives either a buffered body or `RequestParts` plus a + streaming body. + +This means a protocol-specific assembler never needs to understand transport +fragments. It only sees complete protocol packets. + +Per-connection memory budgets apply to the shared inbound assembly pipeline. +That includes buffered request assembly and any streaming-request state that +has not yet been consumed by the handler. Wireframe enforces: + +- per-frame rejection when a new chunk would exceed the configured caps; +- soft-pressure read pacing at 80% of the smaller aggregate budget; and +- hard-cap abort with `InvalidData` if total buffered bytes strictly exceed the + aggregate cap. + +See [ADR 0002][adr-0002] for the complete specification. [adr-0002]: adr/0002-streaming-requests-and-shared-message-assembly.md diff --git a/docs/roadmap.md b/docs/roadmap.md index 738133ad..55f4e7c2 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -322,11 +322,11 @@ and standardized per-connection memory budgets. ### 8.6. Documentation -- [ ] 8.6.1. Update `generic-message-fragmentation-and-re-assembly-design.md` +- [x] 8.6.1. Update `generic-message-fragmentation-and-re-assembly-design.md` with composition guidance. -- [ ] 8.6.2. Update `multi-packet-and-streaming-responses-design.md` with a +- [x] 8.6.2. Update `multi-packet-and-streaming-responses-design.md` with a streaming request body section. -- [ ] 8.6.3. Update +- [x] 8.6.3. Update `the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md` with MessageAssembler and budget details. diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index d6d56654..fe416df2 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -98,10 +98,12 @@ incremental processing of large inbound payloads. The default remains buffered request handling to preserve existing ergonomics for small messages and simple protocols. -At a high level, `RequestParts` separates routing metadata from the body, and -`RequestBodyStream` yields body chunks as a pinned, boxed stream. See [ADR -0002][adr-0002] for the canonical type definitions; the exact field names and -types shown there are illustrative and may evolve before stabilization. +This is now a concrete part of the public API rather than a future sketch. +`RequestParts` separates routing metadata from the body, while +`RequestBodyStream` yields body chunks as a pinned boxed stream. Protocol +crates can adapt that stream to `AsyncRead` through `RequestBodyReader`, and +extractor-based handlers can use `StreamingBody` for the same underlying +capability. Completion of a streaming response is signalled by a protocol-defined terminator frame. The new `stream_end_frame` hook allows implementations to @@ -229,6 +231,15 @@ applies them in order: transport reassembly first, then message assembly. This ensures protocol crates migrating to `MessageAssembler` do not need to special-case transport fragments. +This capability has moved from design intent to implemented maturity: + +- applications register assemblers with + `WireframeApp::with_message_assembler(...)`; +- the inbound runtime path applies the assembler after transport reassembly and + before handler dispatch; and +- handlers receive either a buffered request or a streaming request body based + on the assembler's framing outcome. + ## III. Capability Maturity: From Functional to Production-Grade A feature-complete library is not necessarily a mature one. The road to @@ -302,6 +313,14 @@ Rust's ownership model and `Drop` trait are the foundation of resource safety. partial state, and surfaces `std::io::ErrorKind::InvalidData` from the inbound processing path. + This now ships as a three-tier protection model: + + - per-frame rejection when an incoming frame would exceed the configured + caps; + - soft-pressure read pacing at 80% of the smaller aggregate budget; and + - immediate connection abort when buffered bytes strictly exceed the hard + aggregate cap. + - **Timeouts:** The reassembly logic will include a non-optional, configurable timeout to automatically purge partial messages that are abandoned or sent too slowly. diff --git a/docs/users-guide.md b/docs/users-guide.md index cc83f885..2e751886 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -781,7 +781,10 @@ assert_eq!(parts.metadata(), &[0x01, 0x02]); Unlike `PacketParts` (which carries the raw payload for envelope reconstruction), `RequestParts` carries only protocol-defined metadata required to interpret the streaming body. The body itself is consumed through a separate -stream, enabling back-pressure and incremental processing.[^46] +stream, enabling back-pressure and incremental processing.[^46] When +`MessageAssembler` is configured, this split happens after any transport +fragment reassembly, so handlers and protocol extractors see protocol-level +metadata rather than lower-level fragment state. ### Message assembler hook @@ -833,6 +836,12 @@ When configured, this hook now runs on the inbound connection path after transport fragmentation reassembly and before handler dispatch. Incomplete assemblies remain buffered per message key until completion or timeout eviction. +The assembler's output drives the handler-facing request shape: + +- fully assembled messages continue through the buffered request path; and +- streaming-capable messages surface as `RequestParts` plus + `RequestBodyStream` / `StreamingBody`. + Message-assembly parsing and continuity failures are treated as inbound deserialization failures and follow the existing failure threshold policy. @@ -875,6 +884,10 @@ is the minimum of the fragmentation `max_message_size` and the configured `bytes_per_message`. Single-frame messages that complete immediately are never counted against aggregate budgets, since they do not buffer. +If `memory_budgets(...)` is not configured explicitly, Wireframe derives the +same three fields from `buffer_capacity`, so the protection model remains +active even on the default path. + Wireframe provides a three-tier protection model for inbound memory budgets: 1. **Per-frame enforcement** — frames that would cause total buffered bytes to @@ -1074,7 +1087,8 @@ Handlers can opt into streaming request bodies using the `StreamingBody` extractor or by accepting a `RequestBodyStream` directly. The framework creates a bounded channel and forwards body chunks as they arrive; back-pressure propagates automatically when the handler consumes slower than the network -delivers. +delivers. Routing and correlation metadata stay available immediately through +`RequestParts`, so handlers can begin work before the full body is assembled. ```rust,no_run use tokio::io::AsyncReadExt; @@ -1131,8 +1145,10 @@ async fn with_extractor(parts: RequestParts, body: StreamingBody) { Back-pressure is enforced via bounded channels: when the internal buffer fills, the framework pauses reading from the socket until the handler drains pending -chunks. This prevents memory exhaustion under slow consumer conditions. The -`body_channel` helper creates channels with configurable capacity: +chunks. This prevents memory exhaustion under slow consumer conditions. When +the request arrived through `MessageAssembler`, the same per-connection memory +budgets continue to govern partial buffered state before chunks reach the +handler. The `body_channel` helper creates channels with configurable capacity: ```rust use wireframe::request::body_channel; diff --git a/src/app/builder/config.rs b/src/app/builder/config.rs index e36aa09d..15a7db40 100644 --- a/src/app/builder/config.rs +++ b/src/app/builder/config.rs @@ -39,6 +39,11 @@ where } /// Configure per-connection memory budgets for inbound buffering. + /// + /// These budgets apply across the shared inbound assembly pipeline, + /// including protocol-level message assembly and streaming request + /// hand-off state. Wireframe uses them for per-frame rejection, + /// soft-pressure read pacing, and hard-cap connection aborts. #[must_use] pub fn memory_budgets(mut self, budgets: MemoryBudgets) -> Self { self.memory_budgets = Some(budgets); diff --git a/src/app/builder/protocol.rs b/src/app/builder/protocol.rs index f048f208..ca812c59 100644 --- a/src/app/builder/protocol.rs +++ b/src/app/builder/protocol.rs @@ -41,8 +41,11 @@ where /// Install a [`MessageAssembler`] implementation. /// - /// The assembler parses protocol-specific frame headers to support - /// multi-frame request assembly once the inbound pipeline integrates it. + /// The assembler parses protocol-specific frame headers for multi-frame + /// request assembly on the inbound pipeline. Wireframe applies the hook + /// after any transport fragmentation reassembly and before handler + /// dispatch, producing either buffered requests or streaming request + /// bodies. /// /// # Examples /// diff --git a/tests/features/streaming_request.feature b/tests/features/streaming_request.feature new file mode 100644 index 00000000..ede87585 --- /dev/null +++ b/tests/features/streaming_request.feature @@ -0,0 +1,24 @@ +@streaming_request +Feature: Streaming request body consumption + Streaming request bodies let handlers consume inbound payloads + incrementally while preserving back-pressure and error propagation. + + Scenario: StreamingBody exposes an AsyncRead adapter + Given a streaming request body channel with capacity 4 + When body chunks "hello " and "world" are sent + And the streaming body is read through the AsyncRead adapter + Then the collected body is "hello world" + + Scenario: Bounded request body channels apply back-pressure + Given a streaming request body channel with capacity 1 + When one body chunk "first" is buffered without draining the stream + And another body chunk "second" is sent with a 50 millisecond timeout + Then the send is blocked by back-pressure + + Scenario: Request body stream errors reach consumers + Given a streaming request body channel with capacity 4 + When body chunk "ok" is sent + And a request body error of kind "invalid data" is sent + And the request body stream is drained directly + Then one chunk is received before the error + And the last stream error kind is "invalid data" diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 55f310ef..65fc0c8e 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -40,5 +40,6 @@ pub mod request_parts; pub mod serializer_boundaries; pub mod slow_io_backpressure; pub mod stream_end; +pub mod streaming_request; pub mod test_observability; pub mod unified_codec; diff --git a/tests/fixtures/streaming_request.rs b/tests/fixtures/streaming_request.rs new file mode 100644 index 00000000..9cf73b08 --- /dev/null +++ b/tests/fixtures/streaming_request.rs @@ -0,0 +1,281 @@ +//! Behavioural fixture for streaming request body scenarios. + +use std::{fmt, future::Future, io, time::Duration}; + +use bytes::Bytes; +use futures::StreamExt; +use rstest::fixture; +use tokio::{io::AsyncReadExt, sync::mpsc}; +use wireframe::{ + extractor::StreamingBody, + request::{RequestBodyReader, RequestBodyStream, body_channel}, +}; +pub use wireframe_testing::TestResult; + +/// Runtime-backed world for streaming request body behaviour. +pub struct StreamingRequestWorld { + runtime: Option, + runtime_error: Option, + sender: Option>>, + stream: Option, + collected_body: Vec, + collected_chunks: usize, + last_error_kind: Option, + send_blocked_by_backpressure: Option, +} + +impl Default for StreamingRequestWorld { + fn default() -> Self { + match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => Self { + runtime: Some(runtime), + runtime_error: None, + sender: None, + stream: None, + collected_body: Vec::new(), + collected_chunks: 0, + last_error_kind: None, + send_blocked_by_backpressure: None, + }, + Err(err) => Self { + runtime: None, + runtime_error: Some(format!("failed to create runtime: {err}")), + sender: None, + stream: None, + collected_body: Vec::new(), + collected_chunks: 0, + last_error_kind: None, + send_blocked_by_backpressure: None, + }, + } + } +} + +impl fmt::Debug for StreamingRequestWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StreamingRequestWorld") + .field("sender_initialized", &self.sender.is_some()) + .field("stream_initialized", &self.stream.is_some()) + .field("collected_body_len", &self.collected_body.len()) + .field("collected_chunks", &self.collected_chunks) + .field("last_error_kind", &self.last_error_kind) + .field( + "send_blocked_by_backpressure", + &self.send_blocked_by_backpressure, + ) + .finish_non_exhaustive() + } +} + +/// Fixture for streaming request body behavioural tests. +#[rustfmt::skip] +#[fixture] +pub fn streaming_request_world() -> StreamingRequestWorld { + StreamingRequestWorld::default() +} + +impl StreamingRequestWorld { + fn runtime(&self) -> TestResult<&tokio::runtime::Runtime> { + self.runtime.as_ref().ok_or_else(|| { + self.runtime_error + .clone() + .unwrap_or_else(|| "runtime unavailable".to_string()) + .into() + }) + } + + fn block_on(&self, future: F) -> TestResult + where + F: Future, + { + if tokio::runtime::Handle::try_current().is_ok() { + return Err("nested Tokio runtime detected in streaming request fixture".into()); + } + Ok(self.runtime()?.block_on(future)) + } + + /// Create a fresh request body channel. + pub fn create_channel(&mut self, capacity: usize) { + let (sender, stream) = body_channel(capacity); + self.sender = Some(sender); + self.stream = Some(stream); + self.collected_body.clear(); + self.collected_chunks = 0; + self.last_error_kind = None; + self.send_blocked_by_backpressure = None; + } + + /// Send a body chunk into the request stream. + /// + /// # Errors + /// + /// Returns an error if the channel is unavailable or the send fails. + pub fn send_chunk(&mut self, chunk: &str) -> TestResult { + let sender = self.sender.clone().ok_or("request body sender missing")?; + self.block_on(async move { sender.send(Ok(Bytes::from(chunk.to_owned()))).await })??; + Ok(()) + } + + /// Attempt to send a body chunk with a timeout. + /// + /// # Errors + /// + /// Returns an error if the channel is unavailable. + pub fn send_chunk_with_timeout(&mut self, chunk: &str, timeout_ms: u64) -> TestResult { + let sender = self.sender.clone().ok_or("request body sender missing")?; + let blocked = self.block_on(async move { + tokio::time::timeout( + Duration::from_millis(timeout_ms), + sender.send(Ok(Bytes::from(chunk.to_owned()))), + ) + .await + .is_err() + })?; + self.send_blocked_by_backpressure = Some(blocked); + Ok(()) + } + + /// Send an I/O error into the request stream. + /// + /// # Errors + /// + /// Returns an error if the channel is unavailable or the send fails. + pub fn send_error(&mut self, kind: io::ErrorKind) -> TestResult { + let sender = self.sender.clone().ok_or("request body sender missing")?; + self.block_on(async move { + sender + .send(Err(io::Error::new( + kind, + format!("request body error: {kind}"), + ))) + .await + })??; + Ok(()) + } + + /// Read the request body through `StreamingBody` and `RequestBodyReader`. + /// + /// # Errors + /// + /// Returns an error if the stream is unavailable or the read fails. + pub fn drain_with_reader(&mut self) -> TestResult { + self.sender = None; + let stream = self.stream.take().ok_or("request body stream missing")?; + let body = StreamingBody::new(stream); + let mut reader: RequestBodyReader = body.into_reader(); + let mut buffer = Vec::new(); + self.block_on(async { reader.read_to_end(&mut buffer).await })??; + self.collected_chunks = usize::from(!buffer.is_empty()); + self.collected_body = buffer; + Ok(()) + } + + /// Drain the request stream directly and retain any observed error. + /// + /// # Errors + /// + /// Returns an error if the stream is unavailable. + pub fn drain_stream(&mut self) -> TestResult { + let mut stream = self.stream.take().ok_or("request body stream missing")?; + let observed = self.block_on(async move { + let mut chunks = Vec::new(); + let mut error_kind = None; + loop { + let next_chunk = stream.next().await.transpose(); + match classify_next_chunk(next_chunk, &mut error_kind) { + NextChunk::Chunk(chunk) => chunks.push(chunk), + NextChunk::End => break, + } + } + (chunks, error_kind) + })?; + + let (chunks, error_kind) = observed; + self.collected_chunks = chunks.len(); + self.collected_body = chunks + .into_iter() + .flat_map(|chunk| chunk.to_vec()) + .collect(); + self.last_error_kind = error_kind; + Ok(()) + } + + /// Assert the collected body matches the expected bytes. + pub fn assert_collected_body(&self, expected: &str) -> TestResult { + if self.collected_body == expected.as_bytes() { + return Ok(()); + } + + Err(format!( + "expected body {:?}, observed {:?}", + expected.as_bytes(), + self.collected_body + ) + .into()) + } + + /// Assert that back-pressure blocked the timed send. + pub fn assert_send_blocked_by_backpressure(&self) -> TestResult { + match self.send_blocked_by_backpressure { + Some(true) => Ok(()), + Some(false) => Err("expected back-pressure to block the send".into()), + None => Err("back-pressure result was not recorded".into()), + } + } + + /// Assert the number of successful chunks seen before termination. + pub fn assert_collected_chunks(&self, expected: usize) -> TestResult { + if self.collected_chunks == expected { + return Ok(()); + } + + Err(format!( + "expected {expected} collected chunks, observed {}", + self.collected_chunks + ) + .into()) + } + + /// Assert the last observed error kind. + pub fn assert_last_error_kind(&self, expected: io::ErrorKind) -> TestResult { + match self.last_error_kind { + Some(kind) if kind == expected => Ok(()), + Some(kind) => { + Err(format!("expected error kind {expected:?}, observed {kind:?}").into()) + } + None => Err("no stream error was observed".into()), + } + } +} + +/// Parse a human-readable error kind used in feature files. +pub fn parse_error_kind(value: &str) -> Result { + match value { + "invalid data" => Ok(io::ErrorKind::InvalidData), + "broken pipe" => Ok(io::ErrorKind::BrokenPipe), + "timed out" => Ok(io::ErrorKind::TimedOut), + other => Err(format!("unsupported error kind: {other}")), + } +} + +enum NextChunk { + Chunk(Bytes), + End, +} + +fn classify_next_chunk( + next_chunk: Result, io::Error>, + error_kind: &mut Option, +) -> NextChunk { + match next_chunk { + Ok(Some(chunk)) => NextChunk::Chunk(chunk), + Ok(None) => NextChunk::End, + Err(err) => { + *error_kind = Some(err.kind()); + NextChunk::End + } + } +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index 3a888c53..f2c6346b 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -42,5 +42,6 @@ mod request_parts_scenarios; mod serializer_boundaries_scenarios; mod slow_io_backpressure_scenarios; mod stream_end_scenarios; +mod streaming_request_scenarios; mod test_observability_scenarios; mod unified_codec_scenarios; diff --git a/tests/scenarios/streaming_request_scenarios.rs b/tests/scenarios/streaming_request_scenarios.rs new file mode 100644 index 00000000..5f2f9842 --- /dev/null +++ b/tests/scenarios/streaming_request_scenarios.rs @@ -0,0 +1,29 @@ +//! Scenario tests for streaming request body behaviour. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::streaming_request::*; + +#[scenario( + path = "tests/features/streaming_request.feature", + name = "StreamingBody exposes an AsyncRead adapter" +)] +fn streaming_body_async_read(streaming_request_world: StreamingRequestWorld) { + drop(streaming_request_world); +} + +#[scenario( + path = "tests/features/streaming_request.feature", + name = "Bounded request body channels apply back-pressure" +)] +fn streaming_request_backpressure(streaming_request_world: StreamingRequestWorld) { + drop(streaming_request_world); +} + +#[scenario( + path = "tests/features/streaming_request.feature", + name = "Request body stream errors reach consumers" +)] +fn streaming_request_stream_error(streaming_request_world: StreamingRequestWorld) { + drop(streaming_request_world); +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 386fc7be..630d9c37 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -38,6 +38,7 @@ mod request_parts_steps; mod serializer_boundaries_steps; mod slow_io_backpressure_steps; mod stream_end_steps; +mod streaming_request_steps; mod test_observability_steps; mod unified_codec_steps; diff --git a/tests/steps/streaming_request_steps.rs b/tests/steps/streaming_request_steps.rs new file mode 100644 index 00000000..5f6f67fb --- /dev/null +++ b/tests/steps/streaming_request_steps.rs @@ -0,0 +1,104 @@ +//! Step definitions for streaming request body scenarios. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::streaming_request::{StreamingRequestWorld, TestResult, parse_error_kind}; + +#[given("a streaming request body channel with capacity {capacity:usize}")] +fn given_request_body_channel( + streaming_request_world: &mut StreamingRequestWorld, + capacity: usize, +) { + streaming_request_world.create_channel(capacity); +} + +#[when("body chunks {first:string} and {second:string} are sent")] +fn when_body_chunks_are_sent( + streaming_request_world: &mut StreamingRequestWorld, + first: String, + second: String, +) -> TestResult { + streaming_request_world.send_chunk(&first)?; + streaming_request_world.send_chunk(&second) +} + +#[when("body chunk {chunk:string} is sent")] +fn when_body_chunk_is_sent( + streaming_request_world: &mut StreamingRequestWorld, + chunk: String, +) -> TestResult { + streaming_request_world.send_chunk(&chunk) +} + +#[when("one body chunk {chunk:string} is buffered without draining the stream")] +fn when_chunk_is_buffered_without_draining( + streaming_request_world: &mut StreamingRequestWorld, + chunk: String, +) -> TestResult { + streaming_request_world.send_chunk(&chunk) +} + +#[when("another body chunk {chunk:string} is sent with a {timeout_ms:u64} millisecond timeout")] +fn when_body_chunk_is_sent_with_timeout( + streaming_request_world: &mut StreamingRequestWorld, + chunk: String, + timeout_ms: u64, +) -> TestResult { + streaming_request_world.send_chunk_with_timeout(&chunk, timeout_ms) +} + +#[when("a request body error of kind {kind:string} is sent")] +fn when_request_body_error_is_sent( + streaming_request_world: &mut StreamingRequestWorld, + kind: String, +) -> TestResult { + let kind = match parse_error_kind(&kind) { + Ok(kind) => kind, + Err(err) => return Err(err.into()), + }; + streaming_request_world.send_error(kind) +} + +#[when("the streaming body is read through the AsyncRead adapter")] +fn when_streaming_body_is_read(streaming_request_world: &mut StreamingRequestWorld) -> TestResult { + streaming_request_world.drain_with_reader() +} + +#[when("the request body stream is drained directly")] +fn when_request_body_stream_is_drained( + streaming_request_world: &mut StreamingRequestWorld, +) -> TestResult { + streaming_request_world.drain_stream() +} + +#[then("the collected body is {expected:string}")] +fn then_collected_body_is( + streaming_request_world: &mut StreamingRequestWorld, + expected: String, +) -> TestResult { + streaming_request_world.assert_collected_body(&expected) +} + +#[then("the send is blocked by back-pressure")] +fn then_send_is_blocked_by_backpressure( + streaming_request_world: &mut StreamingRequestWorld, +) -> TestResult { + streaming_request_world.assert_send_blocked_by_backpressure() +} + +#[then("one chunk is received before the error")] +fn then_one_chunk_is_received(streaming_request_world: &mut StreamingRequestWorld) -> TestResult { + streaming_request_world.assert_collected_chunks(1) +} + +#[then("the last stream error kind is {kind:string}")] +fn then_last_stream_error_kind_is( + streaming_request_world: &mut StreamingRequestWorld, + kind: String, +) -> TestResult { + let kind = match parse_error_kind(&kind) { + Ok(kind) => kind, + Err(err) => return Err(err.into()), + }; + streaming_request_world.assert_last_error_kind(kind) +} From 938f27a161b9e80951a42072f0abc46d14e4b17f Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 16 Mar 2026 20:04:03 +0000 Subject: [PATCH 3/8] refactor(tests): extract assert_field_eq and try_parse_error_kind helpers Refactored tests fixtures and steps by introducing a common helper function `assert_field_eq` to simplify equality assertions and reduce repetition in error handling assertions. Also added `try_parse_error_kind` helper in test steps to clean up error kind parsing logic, improving readability and maintainability. Co-authored-by: devboxerhub[bot] --- tests/fixtures/streaming_request.rs | 31 ++++++++++---------------- tests/steps/streaming_request_steps.rs | 14 +++++------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/tests/fixtures/streaming_request.rs b/tests/fixtures/streaming_request.rs index 9cf73b08..9b2c3ec1 100644 --- a/tests/fixtures/streaming_request.rs +++ b/tests/fixtures/streaming_request.rs @@ -205,16 +205,7 @@ impl StreamingRequestWorld { /// Assert the collected body matches the expected bytes. pub fn assert_collected_body(&self, expected: &str) -> TestResult { - if self.collected_body == expected.as_bytes() { - return Ok(()); - } - - Err(format!( - "expected body {:?}, observed {:?}", - expected.as_bytes(), - self.collected_body - ) - .into()) + assert_field_eq("body", expected.as_bytes(), self.collected_body.as_slice()) } /// Assert that back-pressure blocked the timed send. @@ -228,15 +219,7 @@ impl StreamingRequestWorld { /// Assert the number of successful chunks seen before termination. pub fn assert_collected_chunks(&self, expected: usize) -> TestResult { - if self.collected_chunks == expected { - return Ok(()); - } - - Err(format!( - "expected {expected} collected chunks, observed {}", - self.collected_chunks - ) - .into()) + assert_field_eq("collected chunks count", expected, self.collected_chunks) } /// Assert the last observed error kind. @@ -261,6 +244,16 @@ pub fn parse_error_kind(value: &str) -> Result { } } +fn assert_field_eq(label: &str, expected: T, observed: T) -> TestResult +where + T: PartialEq + fmt::Debug, +{ + if expected == observed { + return Ok(()); + } + Err(format!("expected {label} {expected:?}, observed {observed:?}").into()) +} + enum NextChunk { Chunk(Bytes), End, diff --git a/tests/steps/streaming_request_steps.rs b/tests/steps/streaming_request_steps.rs index 5f6f67fb..771848e8 100644 --- a/tests/steps/streaming_request_steps.rs +++ b/tests/steps/streaming_request_steps.rs @@ -4,6 +4,10 @@ use rstest_bdd_macros::{given, then, when}; use crate::fixtures::streaming_request::{StreamingRequestWorld, TestResult, parse_error_kind}; +fn try_parse_error_kind(kind: &str) -> TestResult { + parse_error_kind(kind).map_err(Into::into) +} + #[given("a streaming request body channel with capacity {capacity:usize}")] fn given_request_body_channel( streaming_request_world: &mut StreamingRequestWorld, @@ -52,10 +56,7 @@ fn when_request_body_error_is_sent( streaming_request_world: &mut StreamingRequestWorld, kind: String, ) -> TestResult { - let kind = match parse_error_kind(&kind) { - Ok(kind) => kind, - Err(err) => return Err(err.into()), - }; + let kind = try_parse_error_kind(&kind)?; streaming_request_world.send_error(kind) } @@ -96,9 +97,6 @@ fn then_last_stream_error_kind_is( streaming_request_world: &mut StreamingRequestWorld, kind: String, ) -> TestResult { - let kind = match parse_error_kind(&kind) { - Ok(kind) => kind, - Err(err) => return Err(err.into()), - }; + let kind = try_parse_error_kind(&kind)?; streaming_request_world.assert_last_error_kind(kind) } From 96553c4e2d6cdb8513e07e43800e948a2f798efb Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 16 Mar 2026 23:19:38 +0000 Subject: [PATCH 4/8] refactor(tests): parse io::ErrorKind directly in BDD steps Introduce ErrorKindArg newtype that implements FromStr to allow concise capture of io::ErrorKind from Gherkin step strings via BDD macros. This removes manual parsing and unwrap logic from the streaming_request_steps test steps, improving clarity and type safety. Co-authored-by: devboxerhub[bot] --- tests/fixtures/streaming_request.rs | 18 ++++++++++++++++++ tests/steps/streaming_request_steps.rs | 16 +++++----------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/tests/fixtures/streaming_request.rs b/tests/fixtures/streaming_request.rs index 9b2c3ec1..bbf1d878 100644 --- a/tests/fixtures/streaming_request.rs +++ b/tests/fixtures/streaming_request.rs @@ -244,6 +244,24 @@ pub fn parse_error_kind(value: &str) -> Result { } } +/// Newtype that lets BDD macros capture an `io::ErrorKind` directly from a +/// Gherkin step string via `FromStr`. +pub struct ErrorKindArg(pub io::ErrorKind); + +impl std::str::FromStr for ErrorKindArg { + type Err = String; + + fn from_str(s: &str) -> Result { + parse_error_kind(s).map(ErrorKindArg) + } +} + +impl From for io::ErrorKind { + fn from(arg: ErrorKindArg) -> Self { + arg.0 + } +} + fn assert_field_eq(label: &str, expected: T, observed: T) -> TestResult where T: PartialEq + fmt::Debug, diff --git a/tests/steps/streaming_request_steps.rs b/tests/steps/streaming_request_steps.rs index 771848e8..522e4789 100644 --- a/tests/steps/streaming_request_steps.rs +++ b/tests/steps/streaming_request_steps.rs @@ -2,11 +2,7 @@ use rstest_bdd_macros::{given, then, when}; -use crate::fixtures::streaming_request::{StreamingRequestWorld, TestResult, parse_error_kind}; - -fn try_parse_error_kind(kind: &str) -> TestResult { - parse_error_kind(kind).map_err(Into::into) -} +use crate::fixtures::streaming_request::{ErrorKindArg, StreamingRequestWorld, TestResult}; #[given("a streaming request body channel with capacity {capacity:usize}")] fn given_request_body_channel( @@ -54,10 +50,9 @@ fn when_body_chunk_is_sent_with_timeout( #[when("a request body error of kind {kind:string} is sent")] fn when_request_body_error_is_sent( streaming_request_world: &mut StreamingRequestWorld, - kind: String, + kind: ErrorKindArg, ) -> TestResult { - let kind = try_parse_error_kind(&kind)?; - streaming_request_world.send_error(kind) + streaming_request_world.send_error(kind.into()) } #[when("the streaming body is read through the AsyncRead adapter")] @@ -95,8 +90,7 @@ fn then_one_chunk_is_received(streaming_request_world: &mut StreamingRequestWorl #[then("the last stream error kind is {kind:string}")] fn then_last_stream_error_kind_is( streaming_request_world: &mut StreamingRequestWorld, - kind: String, + kind: ErrorKindArg, ) -> TestResult { - let kind = try_parse_error_kind(&kind)?; - streaming_request_world.assert_last_error_kind(kind) + streaming_request_world.assert_last_error_kind(kind.into()) } From 5d9fcdb2281ff2a3ba9aed3f777bbcd9392e48c9 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Mar 2026 10:52:22 +0000 Subject: [PATCH 5/8] refactor(tests): enhance assert_field_eq to accept references Changed assert_field_eq function to take referenced arguments and support ?Sized trait. This makes the function more flexible and prevents unnecessary cloning. Adapted calls accordingly in streaming_request.rs test fixture. Co-authored-by: devboxerhub[bot] --- tests/fixtures/streaming_request.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/fixtures/streaming_request.rs b/tests/fixtures/streaming_request.rs index bbf1d878..e514ed23 100644 --- a/tests/fixtures/streaming_request.rs +++ b/tests/fixtures/streaming_request.rs @@ -219,7 +219,7 @@ impl StreamingRequestWorld { /// Assert the number of successful chunks seen before termination. pub fn assert_collected_chunks(&self, expected: usize) -> TestResult { - assert_field_eq("collected chunks count", expected, self.collected_chunks) + assert_field_eq("collected chunks count", &expected, &self.collected_chunks) } /// Assert the last observed error kind. @@ -251,20 +251,16 @@ pub struct ErrorKindArg(pub io::ErrorKind); impl std::str::FromStr for ErrorKindArg { type Err = String; - fn from_str(s: &str) -> Result { - parse_error_kind(s).map(ErrorKindArg) - } + fn from_str(s: &str) -> Result { parse_error_kind(s).map(ErrorKindArg) } } impl From for io::ErrorKind { - fn from(arg: ErrorKindArg) -> Self { - arg.0 - } + fn from(arg: ErrorKindArg) -> Self { arg.0 } } -fn assert_field_eq(label: &str, expected: T, observed: T) -> TestResult +fn assert_field_eq(label: &str, expected: &T, observed: &T) -> TestResult where - T: PartialEq + fmt::Debug, + T: PartialEq + fmt::Debug + ?Sized, { if expected == observed { return Ok(()); From 00aac1c2489ec73fb58d90898c6869376a5b3483 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 17 Mar 2026 11:09:22 +0000 Subject: [PATCH 6/8] docs(documentation): fix typos and clarify terminology in streaming requests doc Corrected typographical errors ('Artefacts') and expanded abbreviations ('ADR', 'BDD') to improve clarity and consistency in the documentation for streaming requests. Co-authored-by: devboxerhub[bot] --- .../8-6-1-update-documentation-for-streaming-requests.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md index 9a459b97..25296cd9 100644 --- a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md +++ b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md @@ -51,8 +51,8 @@ Success is observable when: and public API examples needed to make the documented behaviour trustworthy. - Do not introduce new product behaviour merely to make the documents read better. If documentation work exposes an implementation bug, fix only the - smallest issue required to align the code with ADR 0002 and document the - decision. + smallest issue required to align the code with Architecture Decision Record + (ADR) 0002 and document the decision. - Preserve existing public API signatures unless a contradiction in the current API surface makes the documentation impossible to write accurately. If that happens, stop and escalate. @@ -157,7 +157,7 @@ Success is observable when: coverage in `tests/streaming_request.rs`, but there is no matching behavioural suite under `tests/features/`. Impact: the implementation cannot honestly claim both unit and behavioural validation for the inbound streaming - half of ADR 0002 until that BDD gap is closed. + half of ADR 0002 until that behaviour-driven development (BDD) gap is closed. - Observation: `MessageAssembler` and memory budgets already have substantial behavioural coverage through existing feature files such as @@ -582,7 +582,7 @@ If repo-wide Markdown or doctest gates fail only because of unrelated baseline debt, stop before marking the roadmap item done and record the exact blocking files in `Decision Log`. -## Artifacts and notes +## Artefacts and notes Key evidence gathered during planning: From f866234add83bd8adf24357a335c3abbbf1c2819 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Mar 2026 09:30:51 +0000 Subject: [PATCH 7/8] refactor(streaming_request): refine send_chunk_with_timeout to handle backpressure states accurately - Improved backpressure detection logic in `send_chunk_with_timeout` by differentiating the timeout, successful send, and channel error cases. - Enhanced handling ensures clearer indication of whether send is blocked due to backpressure or channel error. - Minor editorial fixes in documentation for clarity and style without changing meaning. Co-authored-by: devboxerhub[bot] --- ...ate-documentation-for-streaming-requests.md | 18 +++++++++--------- tests/fixtures/streaming_request.rs | 8 ++++++-- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md index 25296cd9..6651ed26 100644 --- a/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md +++ b/docs/execplans/8-6-1-update-documentation-for-streaming-requests.md @@ -182,30 +182,30 @@ Success is observable when: validation-closure task, not as a new architecture design pass. Rationale: ADR 0002 and the implementation already define the feature; the missing work is accurate narrative, cross-linking, and proof that the documented behaviour - is exercised. Date/Author: 2026-03-14 / Codex + is exercised. Date/Author: 2026-03-14 / Codex. - Decision: add one focused `rstest-bdd` suite for streaming request bodies unless the audit uncovers an existing hidden equivalent. Rationale: the prompt requires both unit and behavioural validation, and the current inbound - streaming coverage is unit-only. Date/Author: 2026-03-14 / Codex + streaming coverage is unit-only. Date/Author: 2026-03-14 / Codex. - Decision: update `docs/users-guide.md` and public Rustdoc examples only where public consumer guidance would otherwise be stale or incomplete. Rationale: this keeps scope aligned with the roadmap item while still meeting the prompt's requirement to reflect public interface changes. Date/Author: - 2026-03-14 / Codex + 2026-03-14 / Codex. - Decision: mark roadmap items `8.6.1` through `8.6.3` done only after the docs, behavioural coverage, doctest gates, and repo quality gates all pass. Rationale: a documentation checkbox should mean the docs are trustworthy and - demonstrably aligned with the code. Date/Author: 2026-03-14 / Codex + demonstrably aligned with the code. Date/Author: 2026-03-14 / Codex. - Decision: cover the streaming-request behavioural gap at the public request API boundary instead of through a second full inbound-actor integration fixture. Rationale: the missing guarantees were handler-facing stream semantics (`AsyncRead` adaptation, back-pressure, and error propagation), and those behaviours are exercised more directly through the request API than - through another transport harness. Date/Author: 2026-03-14 / Codex + through another transport harness. Date/Author: 2026-03-14 / Codex. ## Outcomes & retrospective @@ -338,7 +338,7 @@ It should explicitly answer: 2. what `MessageAssembler` is responsible for; 3. the exact ordering through the inbound pipeline; 4. how budgets are shared across both layers; and -5. what operators or protocol authors must configure so the two layers do not +5. what operators or protocol authors must configure, so the two layers do not fight each other. In `docs/multi-packet-and-streaming-responses-design.md`, revise section `11` @@ -396,7 +396,7 @@ rewriting. Validation at the end of Stage C: -- a new user can follow the guide without needing to read the ADR first; and +- a new user can follow the guide without needing to read the ADR first, and - any Rustdoc examples touched by this stage are ready for `make test-doc`. ### Stage D: close validation gaps before claiming the docs are complete @@ -530,7 +530,7 @@ Run all commands from `/home/user/project`. ## Validation and acceptance -Acceptance means all of the following are true: +Acceptance means the following are true: - Documentation: - the three roadmap-target design documents describe the same composition @@ -575,7 +575,7 @@ If a gate fails halfway through: 1. read the saved log under `/tmp/8-6-*.log`; 2. fix the smallest relevant issue; -3. rerun only the failed targeted gate first; and +3. rerun only the failed targeted gate first, and 4. rerun the full final gate set before touching the roadmap checkboxes. If repo-wide Markdown or doctest gates fail only because of unrelated baseline diff --git a/tests/fixtures/streaming_request.rs b/tests/fixtures/streaming_request.rs index e514ed23..fd6c06fe 100644 --- a/tests/fixtures/streaming_request.rs +++ b/tests/fixtures/streaming_request.rs @@ -126,14 +126,18 @@ impl StreamingRequestWorld { /// Returns an error if the channel is unavailable. pub fn send_chunk_with_timeout(&mut self, chunk: &str, timeout_ms: u64) -> TestResult { let sender = self.sender.clone().ok_or("request body sender missing")?; - let blocked = self.block_on(async move { + let timeout_result = self.block_on(async move { tokio::time::timeout( Duration::from_millis(timeout_ms), sender.send(Ok(Bytes::from(chunk.to_owned()))), ) .await - .is_err() })?; + let blocked = match timeout_result { + Err(_) => true, // Timeout elapsed → blocked by backpressure + Ok(Ok(())) => false, // Send succeeded → not blocked + Ok(Err(err)) => return Err(err.into()), // Send failed → propagate channel error + }; self.send_blocked_by_backpressure = Some(blocked); Ok(()) } From 7c6826876d545d846d70870c5e417a381fd63703 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Mar 2026 10:10:33 +0000 Subject: [PATCH 8/8] fix(streaming_request): return error if request body channel capacity is zero Added error handling to StreamingRequestWorld::create_channel to return an error when capacity is zero, preventing invalid channel creation. Updated related test step to propagate the error accordingly. Co-authored-by: devboxerhub[bot] --- tests/fixtures/streaming_request.rs | 11 ++++++++++- tests/steps/streaming_request_steps.rs | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/fixtures/streaming_request.rs b/tests/fixtures/streaming_request.rs index fd6c06fe..04b0b46c 100644 --- a/tests/fixtures/streaming_request.rs +++ b/tests/fixtures/streaming_request.rs @@ -98,7 +98,14 @@ impl StreamingRequestWorld { } /// Create a fresh request body channel. - pub fn create_channel(&mut self, capacity: usize) { + /// + /// # Errors + /// + /// Returns an error if capacity is zero. + pub fn create_channel(&mut self, capacity: usize) -> TestResult { + if capacity == 0 { + return Err("request body channel capacity must be greater than zero".into()); + } let (sender, stream) = body_channel(capacity); self.sender = Some(sender); self.stream = Some(stream); @@ -106,6 +113,7 @@ impl StreamingRequestWorld { self.collected_chunks = 0; self.last_error_kind = None; self.send_blocked_by_backpressure = None; + Ok(()) } /// Send a body chunk into the request stream. @@ -183,6 +191,7 @@ impl StreamingRequestWorld { /// /// Returns an error if the stream is unavailable. pub fn drain_stream(&mut self) -> TestResult { + self.sender = None; let mut stream = self.stream.take().ok_or("request body stream missing")?; let observed = self.block_on(async move { let mut chunks = Vec::new(); diff --git a/tests/steps/streaming_request_steps.rs b/tests/steps/streaming_request_steps.rs index 522e4789..b361585d 100644 --- a/tests/steps/streaming_request_steps.rs +++ b/tests/steps/streaming_request_steps.rs @@ -8,8 +8,8 @@ use crate::fixtures::streaming_request::{ErrorKindArg, StreamingRequestWorld, Te fn given_request_body_channel( streaming_request_world: &mut StreamingRequestWorld, capacity: usize, -) { - streaming_request_world.create_channel(capacity); +) -> TestResult { + streaming_request_world.create_channel(capacity) } #[when("body chunks {first:string} and {second:string} are sent")]