From 50fda449c87ebde79fa4aaf84a3cdbe217159ddf Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Feb 2026 18:51:45 +0000 Subject: [PATCH 1/6] feat(inbound_handler): add soft-limit read pausing for memory budgets Implemented roadmap item 8.3.3 to apply back-pressure by pausing inbound socket reads under configured per-connection memory budgets before hitting hard caps. Introduced a new internal helper module in frame_handling to decide paused reads based on current buffered bytes and aggregate caps. Modified the inbound read loop to consult this policy and asynchronously pause when soft pressure is active, ensuring paced reads for memory pressure without regressions to existing hard-cap enforcement (8.3.2). Added rstest unit tests for policy and integration points, plus rstest-bdd behavioural scenarios to validate pause and resume semantics with deterministic virtual time. Updated documentation, decision logs, and roadmap to reflect implementation. No public API changes or new dependencies. Co-authored-by: devboxerhub[bot] --- docs/execplans/8-3-3-soft-limit-behaviour.md | 397 +++++++++++++++++++ 1 file changed, 397 insertions(+) create mode 100644 docs/execplans/8-3-3-soft-limit-behaviour.md diff --git a/docs/execplans/8-3-3-soft-limit-behaviour.md b/docs/execplans/8-3-3-soft-limit-behaviour.md new file mode 100644 index 00000000..3b5a59cd --- /dev/null +++ b/docs/execplans/8-3-3-soft-limit-behaviour.md @@ -0,0 +1,397 @@ +# Implement soft-limit inbound read pausing for memory budgets (8.3.3) + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: DRAFT + +No `PLANS.md` exists in this repository as of 2026-02-23. + +## Purpose / big picture + +Roadmap item `8.3.3` requires soft-limit behaviour for per-connection memory +budgets: when inbound buffered bytes approach configured aggregate caps, +Wireframe should apply back-pressure by pausing socket reads rather than +immediately consuming the next frame. + +After this change, operators who configure `WireframeApp::memory_budgets(...)` +will get two complementary protections on inbound assembly paths: + +- soft pressure: paced reads under high buffered-byte pressure; and +- hard cap: deterministic rejection and cleanup when a limit is exceeded + (already implemented in `8.3.2`). + +Success is observable when: + +- inbound reads are paused under soft pressure using the configured budgets; +- the pause path is covered by `rstest` unit tests; +- behaviour is covered by `rstest-bdd` (`0.5.0`) scenarios using a live inbound + runtime fixture; +- design decisions are recorded in + `docs/adr-002-streaming-requests-and-shared-message-assembly.md`; +- `docs/users-guide.md` explains the soft-limit runtime behaviour for library + consumers; and +- `docs/roadmap.md` marks `8.3.3` as done only after all quality gates pass. + +## Constraints + +- Scope is strictly roadmap item `8.3.3`: implement soft-limit read pausing for + inbound assembly pressure. +- Do not regress or re-scope hard-cap enforcement semantics from `8.3.2`. +- Preserve runtime behaviour when `memory_budgets` is not configured. +- Keep public API changes to zero unless unavoidable. If a public API change is + required, stop and escalate. +- Do not add new external dependencies. +- Keep all modified or new source files at or below the 400-line repository + guidance. +- Validate with `rstest` unit tests and `rstest-bdd` behavioural tests. +- Follow testing guidance from: + `docs/rust-testing-with-rstest-fixtures.md`, + `docs/reliable-testing-in-rust-via-dependency-injection.md`, and + `docs/rstest-bdd-users-guide.md`. +- Record implementation decisions in the relevant design document(s), primarily + ADR 0002. +- Update `docs/users-guide.md` for any consumer-visible behavioural change. +- Mark roadmap item `8.3.3` done only after all gates pass. + +## Tolerances (exception triggers) + +- Scope: if implementation requires changes to more than 14 files or more than + 700 net LOC, stop and escalate. +- Interface: if implementing `8.3.3` requires a new public builder method, + changed public function signatures, or new public types, stop and escalate. +- Dependencies: if any new crate is required, stop and escalate. +- Semantics ambiguity: if the project requires a specific soft-limit threshold + or pause cadence not documented in ADR/design docs, stop and present options + with trade-offs before finalizing behaviour. +- Iterations: if the same failing gate persists after 3 focused attempts, stop + and escalate. +- Time: if any single stage exceeds 4 hours elapsed effort, stop and escalate. + +## Risks + +- Risk: `src/app/inbound_handler.rs` is currently 383 lines, so adding soft + limit logic may exceed the 400-line file cap. Severity: high Likelihood: high + Mitigation: place threshold calculation and pause-decision logic in a new + helper module under `src/app/frame_handling/`, keeping inbound loop changes + minimal. + +- Risk: `src/message_assembler/state.rs` is already 400 lines. + Severity: medium Likelihood: medium Mitigation: avoid adding logic there for + this item; consume existing `total_buffered_bytes()` query from the inbound + runtime instead. + +- Risk: full read suspension can deadlock in buffered multi-frame assembly if no + bytes can be reclaimed without reading additional frames. Severity: high + Likelihood: medium Mitigation: implement soft pressure as paced polling + (bounded read pauses per loop iteration), not indefinite suspension until + below threshold. + +- Risk: timing-sensitive behaviour tests can become flaky. + Severity: medium Likelihood: medium Mitigation: use Tokio paused time + (`tokio::time::pause`/`advance`) in a dedicated BDD world and assert + immediate-vs-delayed availability with deterministic checks (`try_recv` + before time advance, eventual receive after advance). + +## Progress + +- [x] (2026-02-23 19:10Z) Drafted ExecPlan for roadmap item `8.3.3`. +- [ ] Stage A: add soft-limit policy helper and unit coverage. +- [ ] Stage B: integrate paced read pausing into inbound loop. +- [ ] Stage C: add behavioural coverage (`rstest-bdd` v0.5.0). +- [ ] Stage D: documentation and roadmap updates. +- [ ] Stage E: run all quality gates and finalize. + +## Surprises & Discoveries + +- Observation: hard-cap budget enforcement was already completed in `8.3.2`. + Evidence: `src/message_assembler/state.rs` and + `docs/execplans/8-3-2-budget-enforcement.md`. Impact: `8.3.3` should add + pacing/back-pressure only, not duplicate hard-cap rejection logic. + +- Observation: inbound read polling currently happens in + `WireframeApp::process_stream` (`src/app/inbound_handler.rs`), not in + `src/connection/`. Evidence: `src/server/connection_spawner.rs` invokes + `app.handle_connection_result`, and the read loop is in + `src/app/inbound_handler.rs`. Impact: soft-limit read pausing must be + implemented in the app inbound path. + +- Observation: current user guide memory-budget section documents hard-cap + rejection semantics but not explicit soft-limit pacing details. Evidence: + `docs/users-guide.md` section "Per-connection memory budgets". Impact: docs + must be updated so consumers understand runtime behaviour under near-cap + pressure. + +## Decision Log + +- Decision: implement soft pressure as paced read pausing before `framed.next` + in the inbound loop, driven by current buffered bytes and configured + aggregate budgets. Rationale: this satisfies "pause reads" semantics while + avoiding structural changes to assembly-state ownership. Date/Author: + 2026-02-23 / Codex. + +- Decision: derive soft pressure from the minimum active aggregate cap + (`bytes_per_connection`, `bytes_in_flight`) and compare without integer + division to satisfy strict clippy settings. Rationale: the smaller cap is the + earliest risk boundary, and avoiding integer division keeps lint policy + intact. Date/Author: 2026-02-23 / Codex. + +- Decision: keep soft-limit policy internal (no new public API) for `8.3.3`. + Rationale: roadmap scope calls for runtime behaviour, and existing + `memory_budgets(...)` configuration is sufficient. Date/Author: 2026-02-23 / + Codex. + +## Outcomes & Retrospective + +Not started. This section will be completed after implementation and quality +validation. + +## Context and orientation + +Current budget plumbing: + +- `src/app/memory_budgets.rs` defines `MemoryBudgets` and `BudgetBytes`. +- `src/app/builder/config.rs` exposes `WireframeApp::memory_budgets(...)`. +- `src/app/frame_handling/assembly.rs::new_message_assembly_state` threads + budgets into `MessageAssemblyState::with_budgets(...)`. +- `src/message_assembler/state.rs` enforces hard caps and exposes + `total_buffered_bytes()`. + +Current inbound read path: + +- `WireframeApp::process_stream` in `src/app/inbound_handler.rs` drives + `timeout(timeout_dur, framed.next()).await` in a loop. +- decoded envelopes flow through decode -> transport reassembly -> message + assembly -> handler dispatch. + +Relevant docs to keep aligned: + +- `docs/roadmap.md` (`8.3.3` target row). +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +- `docs/generic-message-fragmentation-and-re-assembly-design.md`. +- `docs/multi-packet-and-streaming-responses-design.md`. +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md`. +- `docs/users-guide.md`. +- `docs/rust-doctest-dry-guide.md` (for doc snippet safety expectations). + +Testing topology to use: + +- unit tests in crate modules using `rstest`; +- behavioural tests via `tests/features/`, `tests/fixtures/`, `tests/steps/`, + `tests/scenarios/` with `rstest-bdd` v0.5.0; +- quality gates through Makefile targets. + +## Plan of work + +### Stage A: implement a soft-limit policy helper with unit tests + +Create a new helper module for inbound soft-pressure decisions, keeping +`src/app/inbound_handler.rs` under the file-size limit. + +Expected edits: + +- Add `src/app/frame_handling/backpressure.rs` with: + - a function that computes whether soft pressure is active using: + `buffered_bytes` from `MessageAssemblyState` and configured aggregate caps; + - a private soft-threshold rule based on the smallest active aggregate cap; + - a pause-duration constant (internal) used by inbound loop pacing. +- Register helper exports in `src/app/frame_handling/mod.rs`. +- Add `src/app/frame_handling/backpressure_tests.rs` with `rstest` coverage. + +Unit tests must cover at least: + +- no budgets configured -> never pause; +- below threshold -> no pause; +- at/above threshold -> pause; +- dual-budget case uses the smallest cap as the governing threshold; +- helper remains pure and deterministic (no async/timing in policy unit tests). + +Go/no-go: if policy requires public API changes, stop and escalate. + +### Stage B: integrate paced pausing into inbound read loop + +Modify `src/app/inbound_handler.rs` to consult the new soft-pressure helper +before polling `framed.next()`. + +Integration behaviour: + +- if soft pressure is active, pause reads for the configured duration, + purge expired assembly/fragment state, and continue loop; +- otherwise proceed with normal `timeout(..., framed.next())` processing; +- preserve existing decode/reassemble/assemble/dispatch ordering and error + handling. + +Keep loop changes small and explicitly logged (debug-level) for observability. + +Go/no-go: if this pushes `src/app/inbound_handler.rs` past 400 lines, extract +additional local logic into `src/app/frame_handling/` helpers before continuing. + +### Stage C: add behavioural tests (`rstest-bdd` v0.5.0) + +Add a dedicated behavioural suite for soft-limit read pausing. + +Expected edits: + +- `tests/features/memory_budget_backpressure.feature` +- `tests/fixtures/memory_budget_backpressure.rs` +- `tests/steps/memory_budget_backpressure_steps.rs` +- `tests/scenarios/memory_budget_backpressure_scenarios.rs` +- register modules in: + - `tests/fixtures/mod.rs` + - `tests/steps/mod.rs` + - `tests/scenarios/mod.rs` + +Scenario coverage target: + +- under soft pressure, inbound payload completion is not immediately dispatched + before virtual time advances; +- after advancing virtual time, the delayed read resumes and the payload is + delivered; +- when buffered bytes are comfortably below threshold, dispatch proceeds without + pressure-induced delay. + +Go/no-go: if timing assertions are flaky under virtual time, refactor the world +fixture to use deterministic availability checks (`try_recv`) plus explicit +`advance` steps. + +### Stage D: documentation and roadmap updates + +Update documentation to reflect implemented behaviour and decisions. + +Required updates: + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md`: + add `8.3.3` implementation decisions and soft-limit semantics. +- `docs/users-guide.md`: + update "Per-connection memory budgets" to explain soft-limit read pausing and + its relation to hard-cap rejection. +- `docs/generic-message-fragmentation-and-re-assembly-design.md`: + align section 9.3 wording with concrete runtime behaviour (if needed). +- `docs/roadmap.md`: + mark `8.3.3` as done only after all gates pass. + +### Stage E: quality gates and final verification + +Run all required gates and capture logs with `tee`. + +No roadmap checkbox update until every gate is green. + +## Concrete steps + +Run from repository root (`/home/user/project`). + +1. Implement Stage A helper + unit tests. + +2. Run focused unit tests for helper and inbound assembly behaviour: + + set -o pipefail + cargo test --lib frame_handling 2>&1 | tee /tmp/wireframe-8-3-3-unit-a.log + cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-3-unit-b.log + +3. Integrate Stage B inbound loop pausing and re-run focused unit tests: + + set -o pipefail + cargo test --lib inbound_handler 2>&1 | tee /tmp/wireframe-8-3-3-unit-c.log + +4. Add Stage C behavioural suite and run targeted BDD scenarios: + + set -o pipefail + cargo test --test bdd --all-features memory_budget_backpressure 2>&1 | tee /tmp/wireframe-8-3-3-bdd.log + +5. Update Stage D docs and roadmap. + +6. Run full quality gates: + + set -o pipefail + make fmt 2>&1 | tee /tmp/wireframe-8-3-3-fmt.log + make markdownlint 2>&1 | tee /tmp/wireframe-8-3-3-markdownlint.log + make check-fmt 2>&1 | tee /tmp/wireframe-8-3-3-check-fmt.log + make lint 2>&1 | tee /tmp/wireframe-8-3-3-lint.log + make test 2>&1 | tee /tmp/wireframe-8-3-3-test.log + make nixie 2>&1 | tee /tmp/wireframe-8-3-3-nixie.log + +7. If any gate fails, fix only the failing area and rerun the failing command + until green, then rerun affected downstream gates. + +## Validation and acceptance + +Acceptance criteria: + +- Soft-limit behaviour: inbound reads are paced under memory pressure before + hard cap violation. +- Hard-cap behaviour from `8.3.2` remains intact (no regressions). +- Unit tests (`rstest`) validate policy and inbound integration points. +- Behavioural tests (`rstest-bdd` 0.5.0) validate delayed-vs-resumed read + behaviour through scenario steps. +- Design documentation records final soft-limit decisions. +- User guide reflects consumer-visible runtime behaviour. +- `docs/roadmap.md` marks `8.3.3` done. + +Quality criteria: + +- tests: `make test` passes; +- lint: `make lint` passes with no warnings; +- formatting: `make fmt` and `make check-fmt` pass; +- markdown: `make markdownlint` passes; +- mermaid validation: `make nixie` passes. + +## Idempotence and recovery + +All planned edits are additive and safe to rerun. + +If a step fails: + +- preserve local changes; +- inspect the relevant `/tmp/wireframe-8-3-3-*.log` file; +- apply the minimal fix; +- rerun only the failed command first, then downstream gates. + +Avoid destructive git commands. If rollback is required, revert only files +changed for `8.3.3`. + +## Artefacts and notes + +Expected artefacts after completion: + +- New: `src/app/frame_handling/backpressure.rs`. +- New: `src/app/frame_handling/backpressure_tests.rs`. +- Modified: `src/app/frame_handling/mod.rs`. +- Modified: `src/app/inbound_handler.rs`. +- New: `tests/features/memory_budget_backpressure.feature`. +- New: `tests/fixtures/memory_budget_backpressure.rs`. +- New: `tests/steps/memory_budget_backpressure_steps.rs`. +- New: `tests/scenarios/memory_budget_backpressure_scenarios.rs`. +- Modified: `tests/fixtures/mod.rs`. +- Modified: `tests/steps/mod.rs`. +- Modified: `tests/scenarios/mod.rs`. +- Modified: `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +- Modified: `docs/users-guide.md`. +- Modified: `docs/generic-message-fragmentation-and-re-assembly-design.md` + (if wording requires alignment). +- Modified: `docs/roadmap.md` (`8.3.3` checkbox). +- Gate logs: `/tmp/wireframe-8-3-3-*.log`. + +## Interfaces and dependencies + +No new external dependencies are required. + +Internal interfaces expected at the end of this milestone: + +In `src/app/frame_handling/backpressure.rs`: + + pub(crate) fn should_pause_inbound_reads( + state: Option<&crate::message_assembler::MessageAssemblyState>, + budgets: Option, + ) -> bool + +In `src/app/inbound_handler.rs`: + +- `process_stream` consults `should_pause_inbound_reads(...)` before polling + `framed.next()` and applies an async sleep when soft pressure is active. + +In behavioural tests: + +- a new rstest-bdd world validates soft-limit pause/resume semantics with + deterministic virtual-time control. From d7cef9221d360940e7b1cc92945c08de39e5fe6f Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Feb 2026 19:20:00 +0000 Subject: [PATCH 2/6] feat(frame_handling): implement soft-limit back-pressure read pacing Implemented roadmap item 8.3.3 to apply soft-limit back-pressure by pacing inbound reads when buffered assembly bytes approach memory budget thresholds. Added back-pressure helpers in src/app/frame_handling/backpressure.rs, integrated paced pauses in the inbound read loop, and provided comprehensive tests and rstest-bdd coverage. Updated related documentation and completed all quality gates. - Added backpressure.rs with soft-limit detection and pacing duration - Integrated soft-limit pauses before polling additional frames - Added unit tests and behavioral tests for memory budget back-pressure - Updated ADR, exec plans, user guide, and roadmap docs - Fixed lint and unused_must_use issues in test fixtures This change allows the system to propagate back-pressure without stalling in-flight assemblies, improving flow control under memory pressure. Co-authored-by: devboxerhub[bot] --- ...ng-requests-and-shared-message-assembly.md | 18 +- docs/execplans/8-3-3-soft-limit-behaviour.md | 53 +++- ...ge-fragmentation-and-re-assembly-design.md | 6 + docs/roadmap.md | 2 +- docs/users-guide.md | 7 + src/app/frame_handling/backpressure.rs | 48 +++ src/app/frame_handling/backpressure_tests.rs | 96 ++++++ src/app/frame_handling/mod.rs | 4 + src/app/inbound_handler.rs | 11 +- .../memory_budget_backpressure.feature | 20 ++ tests/fixtures/interleaved_push_queues.rs | 22 +- tests/fixtures/memory_budget_backpressure.rs | 296 ++++++++++++++++++ tests/fixtures/mod.rs | 1 + tests/interleaved_push_queues.rs | 44 +-- .../memory_budget_backpressure_scenarios.rs | 25 ++ tests/scenarios/mod.rs | 1 + .../steps/memory_budget_backpressure_steps.rs | 69 ++++ tests/steps/mod.rs | 1 + 18 files changed, 680 insertions(+), 44 deletions(-) create mode 100644 src/app/frame_handling/backpressure.rs create mode 100644 src/app/frame_handling/backpressure_tests.rs create mode 100644 tests/features/memory_budget_backpressure.feature create mode 100644 tests/fixtures/memory_budget_backpressure.rs create mode 100644 tests/scenarios/memory_budget_backpressure_scenarios.rs create mode 100644 tests/steps/memory_budget_backpressure_steps.rs diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 9633c886..25a0c6a0 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -313,7 +313,23 @@ Precedence is: routed through the existing `DeserFailureTracker` as `InvalidData`. - Budget enforcement helpers are extracted to `src/message_assembler/budget.rs` to keep `state.rs` under the 400-line file limit. -- Back-pressure (`8.3.3`) and derived defaults (`8.3.5`) remain future work. +- At this point, back-pressure (`8.3.3`) and derived defaults (`8.3.5`) + remained future work. + +#### Implementation decisions (2026-02-23) + +- Roadmap item `8.3.3` now implements soft-limit back-pressure in the inbound + read loop (`src/app/inbound_handler.rs`) by pausing briefly before polling + the next frame when buffered assembly bytes are under pressure. +- Soft pressure is computed from assembly-state buffered bytes versus the + smaller aggregate cap (`min(bytes_per_connection, bytes_in_flight)`), with a + threshold of 80% to engage pacing before hard-cap rejection. +- The soft-limit policy is implemented in + `src/app/frame_handling/backpressure.rs` to keep `inbound_handler.rs` and + `state.rs` within the repository file-size constraint. +- Under sustained pressure, pacing is applied as short per-iteration pauses + rather than an indefinite read stop, so in-flight assemblies can continue to + make progress. #### Budget enforcement diff --git a/docs/execplans/8-3-3-soft-limit-behaviour.md b/docs/execplans/8-3-3-soft-limit-behaviour.md index 3b5a59cd..e0d17ae1 100644 --- a/docs/execplans/8-3-3-soft-limit-behaviour.md +++ b/docs/execplans/8-3-3-soft-limit-behaviour.md @@ -4,7 +4,7 @@ This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work proceeds. -Status: DRAFT +Status: COMPLETE No `PLANS.md` exists in this repository as of 2026-02-23. @@ -97,11 +97,12 @@ Success is observable when: ## Progress - [x] (2026-02-23 19:10Z) Drafted ExecPlan for roadmap item `8.3.3`. -- [ ] Stage A: add soft-limit policy helper and unit coverage. -- [ ] Stage B: integrate paced read pausing into inbound loop. -- [ ] Stage C: add behavioural coverage (`rstest-bdd` v0.5.0). -- [ ] Stage D: documentation and roadmap updates. -- [ ] Stage E: run all quality gates and finalize. +- [x] (2026-02-23) Stage A: added soft-limit policy helper and unit coverage. +- [x] (2026-02-23) Stage B: integrated paced read pausing into inbound loop. +- [x] (2026-02-23) Stage C: added behavioural coverage (`rstest-bdd` v0.5.0). +- [x] (2026-02-23) Stage D: updated ADR, design docs, user guide, and roadmap. +- [x] (2026-02-23) Stage E: reran all quality gates and finalized this + ExecPlan. ## Surprises & Discoveries @@ -123,6 +124,17 @@ Success is observable when: must be updated so consumers understand runtime behaviour under near-cap pressure. +- Observation: new back-pressure step text initially conflicted with existing + message-assembly step definitions. Evidence: rstest-bdd ambiguity during + scenario expansion. Impact: step phrases were renamed with a `budgeted` + prefix to keep glue deterministic. + +- Observation: pre-existing `push_expect!` call sites failed strict + `unused_must_use` linting while running workspace gates. Evidence: + `make lint` output for `tests/interleaved_push_queues.rs` and + `tests/fixtures/interleaved_push_queues.rs`. Impact: the existing call sites + were fixed (`let _ = ...`) so the 8.3.3 gate run can complete cleanly. + ## Decision Log - Decision: implement soft pressure as paced read pausing before `framed.next` @@ -144,8 +156,33 @@ Success is observable when: ## Outcomes & Retrospective -Not started. This section will be completed after implementation and quality -validation. +Implemented roadmap item `8.3.3` with no public API changes: + +- Added `src/app/frame_handling/backpressure.rs` and unit tests in + `src/app/frame_handling/backpressure_tests.rs`. +- Integrated soft-limit pacing into `WireframeApp::process_stream` via + `should_pause_inbound_reads(...)` and a short async pause before polling + additional frames. +- Added `rstest-bdd` coverage via: + `tests/features/memory_budget_backpressure.feature`, + `tests/fixtures/memory_budget_backpressure.rs`, + `tests/steps/memory_budget_backpressure_steps.rs`, and + `tests/scenarios/memory_budget_backpressure_scenarios.rs`. +- Updated documentation in: + `docs/adr-002-streaming-requests-and-shared-message-assembly.md`, + `docs/generic-message-fragmentation-and-re-assembly-design.md`, + `docs/users-guide.md`, and `docs/roadmap.md`. + +Validation completed with fresh logs: + +- `make fmt` (`/tmp/wireframe-8-3-3-fmt.log`) +- `make markdownlint` (`/tmp/wireframe-8-3-3-markdownlint.log`) +- `make check-fmt` (`/tmp/wireframe-8-3-3-check-fmt.log`) +- `make lint` (`/tmp/wireframe-8-3-3-lint.log`) +- `make test` (`/tmp/wireframe-8-3-3-test.log`) +- `make nixie` (`/tmp/wireframe-8-3-3-nixie.log`) + +All gates passed on 2026-02-23. ## Context and orientation diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index a8a615be..64160989 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -481,6 +481,12 @@ reads and assembly work. When a hard cap is exceeded, Wireframe aborts early, releases partial state, and surfaces `std::io::ErrorKind::InvalidData` from the inbound processing path. +The current soft-limit implementation paces reads by inserting a short pause +before polling the next inbound frame once buffered bytes reach 80% of the +smaller aggregate budget (`bytes_per_connection` and `bytes_in_flight`). This +keeps pressure visible without permanently stalling assemblies that still need +additional frames to complete. + If both transport fragmentation and `MessageAssembler` are enabled, the effective message cap is whichever guard triggers first. Operators should set the fragmentation `max_message_size` and the message assembly per-message cap diff --git a/docs/roadmap.md b/docs/roadmap.md index 3bb2d9f9..fde523e6 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -294,7 +294,7 @@ and standardized per-connection memory budgets. - [x] 8.3.1. Add `WireframeApp::memory_budgets(...)` builder method. - [x] 8.3.2. Implement budget enforcement covering bytes per message, bytes per connection, and bytes across in-flight assemblies. -- [ ] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. +- [x] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. - [ ] 8.3.4. Implement hard cap (abort early, release partial state, surface `InvalidData`) behaviour. - [ ] 8.3.5. Define derived defaults based on `buffer_capacity` when budgets diff --git a/docs/users-guide.md b/docs/users-guide.md index d142f2fc..80feeb83 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -609,6 +609,13 @@ is the minimum of the fragmentation `max_message_size` and the configured `bytes_per_message`. Single-frame messages that complete immediately are never counted against aggregate budgets, since they do not buffer. +Wireframe also applies soft-limit read pacing before hard-cap rejection. When +buffered assembly bytes reach the soft-pressure threshold (80% of the smaller +aggregate cap from `bytes_per_connection` and `bytes_in_flight`), the inbound +connection loop briefly pauses socket reads before polling the next frame. This +propagates back-pressure to senders while still allowing progress on in-flight +assemblies. + #### Message key multiplexing (8.2.3) The `MessageAssemblyState` type manages multiple concurrent message assemblies diff --git a/src/app/frame_handling/backpressure.rs b/src/app/frame_handling/backpressure.rs new file mode 100644 index 00000000..40c6a8ea --- /dev/null +++ b/src/app/frame_handling/backpressure.rs @@ -0,0 +1,48 @@ +//! Soft-limit back-pressure helpers for inbound read pacing. +//! +//! These helpers detect when buffered assembly bytes approach configured +//! aggregate memory budgets and provide a small pause duration that throttles +//! subsequent socket reads. + +use std::time::Duration; + +use crate::{app::MemoryBudgets, message_assembler::MessageAssemblyState}; + +/// Soft-pressure threshold numerator (4/5 == 80%). +const SOFT_LIMIT_NUMERATOR: u128 = 4; +/// Soft-pressure threshold denominator (4/5 == 80%). +const SOFT_LIMIT_DENOMINATOR: u128 = 5; +/// Read-pacing delay applied while under soft budget pressure. +const SOFT_LIMIT_PAUSE_DURATION: Duration = Duration::from_millis(5); + +/// Return `true` when inbound reads should be paced due to soft budget pressure. +#[must_use] +pub(crate) fn should_pause_inbound_reads( + state: Option<&MessageAssemblyState>, + budgets: Option, +) -> bool { + let (Some(state), Some(budgets)) = (state, budgets) else { + return false; + }; + + let buffered_bytes = state.total_buffered_bytes(); + let aggregate_limit = active_aggregate_limit_bytes(budgets); + is_at_or_above_soft_limit(buffered_bytes, aggregate_limit) +} + +/// Duration to pause between inbound reads while soft pressure is active. +#[must_use] +pub(crate) const fn soft_limit_pause_duration() -> Duration { SOFT_LIMIT_PAUSE_DURATION } + +fn active_aggregate_limit_bytes(budgets: MemoryBudgets) -> usize { + budgets + .bytes_per_connection() + .as_usize() + .min(budgets.bytes_in_flight().as_usize()) +} + +fn is_at_or_above_soft_limit(buffered_bytes: usize, aggregate_limit: usize) -> bool { + let lhs = (buffered_bytes as u128).saturating_mul(SOFT_LIMIT_DENOMINATOR); + let rhs = (aggregate_limit as u128).saturating_mul(SOFT_LIMIT_NUMERATOR); + lhs >= rhs +} diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs new file mode 100644 index 00000000..de5377d3 --- /dev/null +++ b/src/app/frame_handling/backpressure_tests.rs @@ -0,0 +1,96 @@ +//! Unit tests for soft-limit back-pressure policy helpers. + +use std::{num::NonZeroUsize, time::Duration}; + +use rstest::{fixture, rstest}; + +use super::should_pause_inbound_reads; +use crate::{ + app::{BudgetBytes, MemoryBudgets}, + message_assembler::{ + EnvelopeRouting, + FirstFrameHeader, + FirstFrameInput, + MessageAssemblyState, + MessageKey, + }, +}; + +#[fixture] +fn budgets() -> MemoryBudgets { + MemoryBudgets::new( + BudgetBytes::new(NonZeroUsize::new(1024).unwrap_or(NonZeroUsize::MIN)), + BudgetBytes::new(NonZeroUsize::new(100).unwrap_or(NonZeroUsize::MIN)), + BudgetBytes::new(NonZeroUsize::new(100).unwrap_or(NonZeroUsize::MIN)), + ) +} + +fn state_with_buffered_bytes(buffered_bytes: usize) -> MessageAssemblyState { + let max = NonZeroUsize::new(buffered_bytes.saturating_add(64)).unwrap_or(NonZeroUsize::MIN); + let mut state = MessageAssemblyState::new(max, Duration::from_secs(30)); + if buffered_bytes == 0 { + return state; + } + + let body = vec![0_u8; buffered_bytes]; + let header = FirstFrameHeader { + message_key: MessageKey(1), + metadata_len: 0, + body_len: buffered_bytes, + total_body_len: None, + is_last: false, + }; + let input = match FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) { + Ok(input) => input, + Err(error) => panic!("test first frame input should be valid: {error}"), + }; + let result = match state.accept_first_frame(input) { + Ok(result) => result, + Err(error) => panic!("first frame should be accepted: {error}"), + }; + assert!( + result.is_none(), + "first frame should start an in-flight assembly" + ); + state +} + +#[rstest] +fn should_not_pause_when_budgets_are_not_configured() { + let state = state_with_buffered_bytes(95); + assert!(!should_pause_inbound_reads(Some(&state), None)); +} + +#[rstest] +fn should_not_pause_when_state_is_not_available(budgets: MemoryBudgets) { + assert!(!should_pause_inbound_reads(None, Some(budgets))); +} + +#[rstest] +fn should_not_pause_below_soft_limit(budgets: MemoryBudgets) { + let state = state_with_buffered_bytes(79); + assert!(!should_pause_inbound_reads(Some(&state), Some(budgets))); +} + +#[rstest] +fn should_pause_at_soft_limit(budgets: MemoryBudgets) { + let state = state_with_buffered_bytes(80); + assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); +} + +#[rstest] +fn should_pause_above_soft_limit(budgets: MemoryBudgets) { + let state = state_with_buffered_bytes(95); + assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); +} + +#[rstest] +fn uses_smallest_aggregate_budget_dimension() { + let budgets = MemoryBudgets::new( + BudgetBytes::new(NonZeroUsize::new(1024).unwrap_or(NonZeroUsize::MIN)), + BudgetBytes::new(NonZeroUsize::new(200).unwrap_or(NonZeroUsize::MIN)), + BudgetBytes::new(NonZeroUsize::new(100).unwrap_or(NonZeroUsize::MIN)), + ); + let state = state_with_buffered_bytes(80); + assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); +} diff --git a/src/app/frame_handling/mod.rs b/src/app/frame_handling/mod.rs index f15062f3..f55917b8 100644 --- a/src/app/frame_handling/mod.rs +++ b/src/app/frame_handling/mod.rs @@ -3,6 +3,7 @@ //! Extracted from `connection.rs` to keep modules small and focused. mod assembly; +mod backpressure; mod core; mod decode; mod reassembly; @@ -16,6 +17,7 @@ pub(crate) use assembly::{ new_message_assembly_state, purge_expired_assemblies, }; +pub(crate) use backpressure::{should_pause_inbound_reads, soft_limit_pause_duration}; pub(crate) use decode::decode_envelope; pub(crate) use reassembly::reassemble_if_needed; pub(crate) use response::forward_response; @@ -23,4 +25,6 @@ pub(crate) use response::forward_response; #[cfg(all(test, not(loom)))] mod assembly_tests; #[cfg(all(test, not(loom)))] +mod backpressure_tests; +#[cfg(all(test, not(loom)))] mod tests; diff --git a/src/app/inbound_handler.rs b/src/app/inbound_handler.rs index 3f7aa992..aa9d6c95 100644 --- a/src/app/inbound_handler.rs +++ b/src/app/inbound_handler.rs @@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt}; use log::{debug, warn}; use tokio::{ io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}, - time::{Duration, timeout}, + time::{Duration, sleep, timeout}, }; use tokio_util::codec::{Encoder, Framed, LengthDelimitedCodec}; @@ -278,6 +278,15 @@ where let timeout_dur = Duration::from_millis(self.read_timeout_ms); loop { + if frame_handling::should_pause_inbound_reads( + message_assembly.as_ref(), + self.memory_budgets, + ) { + debug!("soft memory budget pressure detected; pausing inbound reads briefly"); + sleep(frame_handling::soft_limit_pause_duration()).await; + purge_expired(&mut pipeline, &mut message_assembly); + } + match timeout(timeout_dur, framed.next()).await { Ok(Some(Ok(frame))) => { self.handle_frame( diff --git a/tests/features/memory_budget_backpressure.feature b/tests/features/memory_budget_backpressure.feature new file mode 100644 index 00000000..0a76a370 --- /dev/null +++ b/tests/features/memory_budget_backpressure.feature @@ -0,0 +1,20 @@ +@memory_budget_backpressure +Feature: Soft-limit memory budget back-pressure + Inbound reads are paced when buffered assembly bytes approach configured + aggregate memory budgets. + + Scenario: Soft pressure delays completion until virtual time advances + Given a back-pressure inbound app configured as 200/2048/10/10 + When a budgeted first frame for key 1 with body "aaaaaaaa" arrives + And a budgeted final continuation frame for key 1 sequence 1 with body "bb" arrives + Then no budgeted payload is available before virtual time advances + When budgeted virtual time advances by 5 milliseconds + Then budgeted payload "aaaaaaaabb" is eventually received + And no budgeted send error is recorded + + Scenario: Reads continue without delay when pressure is low + Given a back-pressure inbound app configured as 200/2048/100/100 + When a budgeted first frame for key 2 with body "aa" arrives + And a budgeted final continuation frame for key 2 sequence 1 with body "bb" arrives + Then budgeted payload "aabb" is eventually received + And no budgeted send error is recorded diff --git a/tests/fixtures/interleaved_push_queues.rs b/tests/fixtures/interleaved_push_queues.rs index 4f17e46b..03472438 100644 --- a/tests/fixtures/interleaved_push_queues.rs +++ b/tests/fixtures/interleaved_push_queues.rs @@ -64,11 +64,11 @@ impl InterleavedPushWorld { time_slice: None, }, |handle| async move { - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); - push_expect!(handle.push_high_priority(1)); - push_expect!(handle.push_high_priority(2)); - push_expect!(handle.push_high_priority(3)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(2)); + let _ = push_expect!(handle.push_high_priority(3)); }, ) .await @@ -87,10 +87,10 @@ impl InterleavedPushWorld { }, |handle| async move { for n in 1..=6 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); }, ) .await @@ -110,7 +110,7 @@ impl InterleavedPushWorld { .rate(Some(1)) .build()?; - push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(1)); // The low-priority push should be blocked: the single token was // consumed by the high-priority push above. @@ -139,10 +139,10 @@ impl InterleavedPushWorld { }, |handle| async move { for n in 1..=5 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } for n in 101..=105 { - push_expect!(handle.push_low_priority(n)); + let _ = push_expect!(handle.push_low_priority(n)); } }, ) diff --git a/tests/fixtures/memory_budget_backpressure.rs b/tests/fixtures/memory_budget_backpressure.rs new file mode 100644 index 00000000..f42be0e3 --- /dev/null +++ b/tests/fixtures/memory_budget_backpressure.rs @@ -0,0 +1,296 @@ +//! Behavioural fixture for soft-limit memory-budget back-pressure scenarios. + +use std::{fmt, future::Future, num::NonZeroUsize, str::FromStr, time::Duration}; + +use futures::SinkExt; +use rstest::fixture; +use tokio::{io::DuplexStream, sync::mpsc, task::JoinHandle}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + app::{BudgetBytes, Envelope, Handler, MemoryBudgets, WireframeApp}, + fragment::FragmentationConfig, + serializer::{BincodeSerializer, Serializer}, + test_helpers::{self, TestAssembler}, +}; +pub use wireframe_testing::TestResult; + +const ROUTE_ID: u32 = 88; +const CORRELATION_ID: Option = Some(9); +const BUFFER_CAPACITY: usize = 512; +const SPIN_ATTEMPTS: usize = 64; + +/// Parsed as +/// "`timeout_ms` / `per_message` / `per_connection` / `in_flight`". +#[derive(Clone, Copy)] +pub struct BackpressureConfig { + pub timeout_ms: u64, + pub per_message: usize, + pub per_connection: usize, + pub in_flight: usize, +} + +impl FromStr for BackpressureConfig { + type Err = String; + + fn from_str(s: &str) -> Result { + let mut values = s.split('/').map(str::trim); + let timeout_ms = values.next().ok_or("missing timeout_ms")?; + let per_message = values.next().ok_or("missing per_message")?; + let per_connection = values.next().ok_or("missing per_connection")?; + let in_flight = values.next().ok_or("missing in_flight")?; + Ok(Self { + timeout_ms: timeout_ms + .parse() + .map_err(|error| format!("timeout_ms: {error}"))?, + per_message: per_message + .parse() + .map_err(|error| format!("per_message: {error}"))?, + per_connection: per_connection + .parse() + .map_err(|error| format!("per_connection: {error}"))?, + in_flight: in_flight + .parse() + .map_err(|error| format!("in_flight: {error}"))?, + }) + } +} + +/// Runtime-backed fixture that drives inbound assembly with memory budgets. +pub struct MemoryBudgetBackpressureWorld { + runtime: Option, + runtime_error: Option, + client: Option>, + server: Option>>, + observed_rx: Option>>, + observed_payloads: Vec>, + last_send_error: Option, +} + +impl Default for MemoryBudgetBackpressureWorld { + fn default() -> Self { + match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => Self { + runtime: Some(runtime), + runtime_error: None, + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + }, + Err(error) => Self { + runtime: None, + runtime_error: Some(format!("failed to create runtime: {error}")), + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + }, + } + } +} + +impl fmt::Debug for MemoryBudgetBackpressureWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemoryBudgetBackpressureWorld") + .field("client_initialized", &self.client.is_some()) + .field("server_initialized", &self.server.is_some()) + .field("observed_payloads", &self.observed_payloads.len()) + .field("last_send_error", &self.last_send_error) + .finish_non_exhaustive() + } +} + +#[fixture] +#[rustfmt::skip] +pub fn memory_budget_backpressure_world() -> MemoryBudgetBackpressureWorld { + MemoryBudgetBackpressureWorld::default() +} + +impl MemoryBudgetBackpressureWorld { + fn runtime(&self) -> TestResult<&tokio::runtime::Runtime> { + self.runtime.as_ref().ok_or_else(|| { + self.runtime_error + .clone() + .unwrap_or_else(|| "runtime unavailable".to_string()) + .into() + }) + } + + fn block_on(&self, future: F) -> TestResult + where + F: Future, + { + if tokio::runtime::Handle::try_current().is_ok() { + return Err("nested Tokio runtime detected in memory-budget fixture".into()); + } + Ok(self.runtime()?.block_on(future)) + } + + pub fn start_app(&mut self, config: BackpressureConfig) -> TestResult { + let fragment_limit = + NonZeroUsize::new(BUFFER_CAPACITY.saturating_mul(16)).unwrap_or(NonZeroUsize::MIN); + let fragmentation = FragmentationConfig::for_frame_budget( + BUFFER_CAPACITY, + fragment_limit, + Duration::from_millis(config.timeout_ms), + ) + .ok_or("failed to derive fragmentation config for test fixture")?; + + let per_message = + NonZeroUsize::new(config.per_message).ok_or("per_message must be non-zero")?; + let per_connection = + NonZeroUsize::new(config.per_connection).ok_or("per_connection must be non-zero")?; + let in_flight = NonZeroUsize::new(config.in_flight).ok_or("in_flight must be non-zero")?; + let budgets = MemoryBudgets::new( + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), + ); + + let (tx, rx) = mpsc::unbounded_channel::>(); + let handler: Handler = std::sync::Arc::new(move |envelope: &Envelope| { + let tx = tx.clone(); + let payload = envelope.payload_bytes().to_vec(); + Box::pin(async move { + let _ = tx.send(payload); + }) + }); + + let app: WireframeApp = WireframeApp::new()? + .buffer_capacity(BUFFER_CAPACITY) + .fragmentation(Some(fragmentation)) + .with_message_assembler(TestAssembler) + .memory_budgets(budgets) + .route(ROUTE_ID, handler)?; + + let codec = app.length_codec(); + let (client_stream, server_stream) = tokio::io::duplex(2048); + let client = Framed::new(client_stream, codec); + + self.block_on(async { tokio::time::pause() })?; + let server = self + .runtime()? + .spawn(async move { app.handle_connection_result(server_stream).await }); + + self.client = Some(client); + self.server = Some(server); + self.observed_rx = Some(rx); + self.observed_payloads.clear(); + self.last_send_error = None; + Ok(()) + } + + pub fn send_first_frame(&mut self, key: u64, body: &str) -> TestResult { + let payload = test_helpers::first_frame_payload(key, body.as_bytes(), false, None)?; + self.send_payload(payload) + } + + pub fn send_final_continuation_frame( + &mut self, + key: u64, + sequence: u32, + body: &str, + ) -> TestResult { + let payload = + test_helpers::continuation_frame_payload(key, sequence, body.as_bytes(), true)?; + self.send_payload(payload) + } + + pub fn assert_no_payload_ready(&mut self) -> TestResult { + self.spin_runtime()?; + self.drain_ready_payloads()?; + if self.observed_payloads.is_empty() { + return Ok(()); + } + Err(format!( + "expected no payload to be ready, observed={:?}", + self.observed_payloads + ) + .into()) + } + + pub fn advance_millis(&mut self, millis: u64) -> TestResult { + self.block_on(async { tokio::time::advance(Duration::from_millis(millis)).await })?; + Ok(()) + } + + pub fn assert_payload_received(&mut self, expected: &str) -> TestResult { + let expected = expected.as_bytes(); + for _ in 0..SPIN_ATTEMPTS { + self.drain_ready_payloads()?; + if self + .observed_payloads + .iter() + .any(|payload| payload.as_slice() == expected) + { + return Ok(()); + } + self.block_on(async { tokio::task::yield_now().await })?; + } + + Err(format!( + "expected payload {:?} not observed; observed={:?}", + expected, self.observed_payloads + ) + .into()) + } + + pub fn assert_no_send_error(&self) -> TestResult { + if self.last_send_error.is_none() { + return Ok(()); + } + Err(format!("unexpected send error: {:?}", self.last_send_error).into()) + } + + fn send_payload(&mut self, payload: Vec) -> TestResult { + let envelope = Envelope::new(ROUTE_ID, CORRELATION_ID, payload); + let serializer = BincodeSerializer; + let frame = serializer.serialize(&envelope)?; + + let mut client = self.client.take().ok_or("client not initialized")?; + let send_result = self.block_on(async { + client.send(frame.into()).await?; + client.flush().await?; + Ok::<(), std::io::Error>(()) + }); + self.client = Some(client); + + match send_result { + Ok(Ok(())) => { + self.last_send_error = None; + Ok(()) + } + Ok(Err(error)) => { + self.last_send_error = Some(error.to_string()); + Err(error.into()) + } + Err(error) => { + self.last_send_error = Some(error.to_string()); + Err(error) + } + } + } + + fn spin_runtime(&self) -> TestResult { + self.block_on(async { + for _ in 0..8 { + tokio::task::yield_now().await; + } + })?; + Ok(()) + } + + fn drain_ready_payloads(&mut self) -> TestResult { + let mut observed_rx = self.observed_rx.take().ok_or("receiver not initialized")?; + while let Ok(payload) = observed_rx.try_recv() { + self.observed_payloads.push(payload); + } + self.observed_rx = Some(observed_rx); + Ok(()) + } +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index be7b89a3..5a5fe11a 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -15,6 +15,7 @@ pub mod codec_stateful; pub mod correlation; pub mod fragment; pub mod interleaved_push_queues; +pub mod memory_budget_backpressure; pub mod memory_budgets; pub mod message_assembler; pub mod message_assembly; diff --git a/tests/interleaved_push_queues.rs b/tests/interleaved_push_queues.rs index a0416940..05b98861 100644 --- a/tests/interleaved_push_queues.rs +++ b/tests/interleaved_push_queues.rs @@ -117,7 +117,7 @@ async fn rate_limit_symmetric_mixed() -> TestResult { .build()?; // High push consumes the single token. - push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(1)); // Low push should now be blocked. let mut blocked = handle.push_low_priority(2).boxed(); @@ -128,7 +128,7 @@ async fn rate_limit_symmetric_mixed() -> TestResult { ); time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_low_priority(3)); + let _ = push_expect!(handle.push_low_priority(3)); let (_, a) = queues.recv().await.ok_or("recv 1 failed")?; let (_, b) = queues.recv().await.ok_or("recv 2 failed")?; @@ -153,10 +153,10 @@ async fn interleaved_fairness_yields_at_threshold() -> TestResult { // Preload: 6 high, 2 low. |handle| async move { for n in 1..=6 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); }, // Expected: H H H L H H H L |out| { @@ -183,10 +183,10 @@ async fn interleaved_all_frames_delivered() -> TestResult { }, |handle| async move { for n in 1..=5 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } for n in 101..=105 { - push_expect!(handle.push_low_priority(n)); + let _ = push_expect!(handle.push_low_priority(n)); } }, |out| { @@ -238,14 +238,14 @@ async fn interleaved_time_slice_fairness(shutdown_token: CancellationToken) -> T Ok::<_, String>(out) }); - push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(1)); time::advance(Duration::from_millis(5)).await; - push_expect!(handle.push_high_priority(2)); + let _ = push_expect!(handle.push_high_priority(2)); // Advance past the time slice so the next high frame triggers yield. time::advance(Duration::from_millis(15)).await; - push_expect!(handle.push_low_priority(42)); + let _ = push_expect!(handle.push_low_priority(42)); for n in 3..=5 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } drop(handle); @@ -278,10 +278,10 @@ async fn rate_limit_interleaved_total_throughput() -> TestResult { .build()?; // First 4 pushes (mix of priorities) should succeed immediately. - push_expect!(handle.push_high_priority(1)); - push_expect!(handle.push_low_priority(2)); - push_expect!(handle.push_high_priority(3)); - push_expect!(handle.push_low_priority(4)); + let _ = push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_low_priority(2)); + let _ = push_expect!(handle.push_high_priority(3)); + let _ = push_expect!(handle.push_low_priority(4)); // The 5th push (either priority) should block. let mut blocked = handle.push_high_priority(5).boxed(); @@ -292,8 +292,8 @@ async fn rate_limit_interleaved_total_throughput() -> TestResult { ); time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_high_priority(6)); - push_expect!(handle.push_low_priority(7)); + let _ = push_expect!(handle.push_high_priority(6)); + let _ = push_expect!(handle.push_low_priority(7)); // Drain all frames to verify delivery. let mut out = Vec::new(); @@ -318,11 +318,11 @@ async fn fairness_disabled_strict_priority() -> TestResult { time_slice: None, }, |handle| async move { - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); - push_expect!(handle.push_high_priority(1)); - push_expect!(handle.push_high_priority(2)); - push_expect!(handle.push_high_priority(3)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(2)); + let _ = push_expect!(handle.push_high_priority(3)); }, |out| { assert_eq!( diff --git a/tests/scenarios/memory_budget_backpressure_scenarios.rs b/tests/scenarios/memory_budget_backpressure_scenarios.rs new file mode 100644 index 00000000..5579fd69 --- /dev/null +++ b/tests/scenarios/memory_budget_backpressure_scenarios.rs @@ -0,0 +1,25 @@ +//! Scenario test functions for memory budget back-pressure. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::memory_budget_backpressure::*; + +#[scenario( + path = "tests/features/memory_budget_backpressure.feature", + name = "Soft pressure delays completion until virtual time advances" +)] +fn soft_pressure_delays_completion( + memory_budget_backpressure_world: MemoryBudgetBackpressureWorld, +) { + drop(memory_budget_backpressure_world); +} + +#[scenario( + path = "tests/features/memory_budget_backpressure.feature", + name = "Reads continue without delay when pressure is low" +)] +fn low_pressure_continues_without_delay( + memory_budget_backpressure_world: MemoryBudgetBackpressureWorld, +) { + drop(memory_budget_backpressure_world); +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index 688c58d7..7d61087a 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -19,6 +19,7 @@ mod codec_stateful_scenarios; mod correlation_scenarios; mod fragment_scenarios; mod interleaved_push_queues_scenarios; +mod memory_budget_backpressure_scenarios; mod memory_budgets_scenarios; mod message_assembler_scenarios; mod message_assembly_inbound_scenarios; diff --git a/tests/steps/memory_budget_backpressure_steps.rs b/tests/steps/memory_budget_backpressure_steps.rs new file mode 100644 index 00000000..853bb6ab --- /dev/null +++ b/tests/steps/memory_budget_backpressure_steps.rs @@ -0,0 +1,69 @@ +//! Step definitions for soft-limit memory budget back-pressure scenarios. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::memory_budget_backpressure::{ + BackpressureConfig, + MemoryBudgetBackpressureWorld, + TestResult, +}; + +#[given("a back-pressure inbound app configured as {config}")] +fn given_backpressure_app( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + config: BackpressureConfig, +) -> TestResult { + memory_budget_backpressure_world.start_app(config) +} + +#[when("a budgeted first frame for key {key:u64} with body {body:string} arrives")] +fn when_first_frame_arrives( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + key: u64, + body: String, +) -> TestResult { + memory_budget_backpressure_world.send_first_frame(key, &body) +} + +#[when( + "a budgeted final continuation frame for key {key:u64} sequence {sequence:u32} with body \ + {body:string} arrives" +)] +fn when_final_continuation_arrives( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + key: u64, + sequence: u32, + body: String, +) -> TestResult { + memory_budget_backpressure_world.send_final_continuation_frame(key, sequence, &body) +} + +#[then("no budgeted payload is available before virtual time advances")] +fn then_no_payload_available( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, +) -> TestResult { + memory_budget_backpressure_world.assert_no_payload_ready() +} + +#[when("budgeted virtual time advances by {millis:u64} milliseconds")] +fn when_virtual_time_advances( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + millis: u64, +) -> TestResult { + memory_budget_backpressure_world.advance_millis(millis) +} + +#[then("budgeted payload {expected:string} is eventually received")] +fn then_payload_eventually_received( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + expected: String, +) -> TestResult { + memory_budget_backpressure_world.assert_payload_received(&expected) +} + +#[then("no budgeted send error is recorded")] +fn then_no_send_error( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, +) -> TestResult { + memory_budget_backpressure_world.assert_no_send_error() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 92c6868d..4c1b169c 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -15,6 +15,7 @@ mod codec_stateful_steps; mod correlation_steps; mod fragment_steps; mod interleaved_push_queues_steps; +mod memory_budget_backpressure_steps; mod memory_budgets_steps; mod message_assembler_steps; mod message_assembly_inbound_steps; From b31550adc5abfb73f48054b79237cb6384cf4bc6 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 24 Feb 2026 07:17:20 +0000 Subject: [PATCH 3/6] feat(backpressure): improve memory budget back-pressure config parsing and testing - Enhance BackpressureConfig::from_str to reject empty and trailing segments with clear errors. - Add extensive unit tests validating parsing errors and correct config processing. - Refactor test fixtures with clearer NonZeroUsize construction and panics. - Add new test helper methods and more descriptive documentation in backpressure BDD tests. - Minor doc fixes in soft limit behavior and back-pressure ADR. Co-authored-by: devboxerhub[bot] --- ...ng-requests-and-shared-message-assembly.md | 8 +- docs/execplans/8-3-3-soft-limit-behaviour.md | 2 +- src/app/frame_handling/backpressure_tests.rs | 34 +++-- tests/fixtures/memory_budget_backpressure.rs | 117 +++++++++++++++++- 4 files changed, 143 insertions(+), 18 deletions(-) diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 25a0c6a0..185c818b 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -219,10 +219,10 @@ Wireframe will make budget failures explicit and deterministic so protocol implementers can rely on the behaviour across crates: - **Soft budget pressure (back-pressure):** when inbound buffering approaches - the per-connection in-flight cap, Wireframe MUST stop reading further packets - from the socket until buffered bytes fall below the cap. For streaming bodies - backed by bounded queues, this naturally suspends the reader when the queue - is full. + the per-connection in-flight cap, Wireframe MUST pace further reads by + applying short per-iteration pauses before polling the next packet. For + streaming bodies backed by bounded queues, this naturally suspends the reader + when the queue is full. - **Hard cap exceeded during assembly (pre-handler):** if accepting a fragment or continuation packet would exceed the configured per-message or per- connection cap, Wireframe MUST: diff --git a/docs/execplans/8-3-3-soft-limit-behaviour.md b/docs/execplans/8-3-3-soft-limit-behaviour.md index e0d17ae1..1dc388b0 100644 --- a/docs/execplans/8-3-3-soft-limit-behaviour.md +++ b/docs/execplans/8-3-3-soft-limit-behaviour.md @@ -58,7 +58,7 @@ Success is observable when: ## Tolerances (exception triggers) - Scope: if implementation requires changes to more than 14 files or more than - 700 net LOC, stop and escalate. + 700 net lines of code (LOC), stop and escalate. - Interface: if implementing `8.3.3` requires a new public builder method, changed public function signatures, or new public types, stop and escalate. - Dependencies: if any new crate is required, stop and escalate. diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs index de5377d3..7c51cfdc 100644 --- a/src/app/frame_handling/backpressure_tests.rs +++ b/src/app/frame_handling/backpressure_tests.rs @@ -18,15 +18,26 @@ use crate::{ #[fixture] fn budgets() -> MemoryBudgets { + let Some(per_message) = NonZeroUsize::new(1024) else { + panic!("1024 is non-zero"); + }; + let Some(per_connection) = NonZeroUsize::new(100) else { + panic!("100 is non-zero"); + }; + let Some(in_flight) = NonZeroUsize::new(100) else { + panic!("100 is non-zero"); + }; MemoryBudgets::new( - BudgetBytes::new(NonZeroUsize::new(1024).unwrap_or(NonZeroUsize::MIN)), - BudgetBytes::new(NonZeroUsize::new(100).unwrap_or(NonZeroUsize::MIN)), - BudgetBytes::new(NonZeroUsize::new(100).unwrap_or(NonZeroUsize::MIN)), + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), ) } fn state_with_buffered_bytes(buffered_bytes: usize) -> MessageAssemblyState { - let max = NonZeroUsize::new(buffered_bytes.saturating_add(64)).unwrap_or(NonZeroUsize::MIN); + let Some(max) = NonZeroUsize::new(buffered_bytes.saturating_add(64)) else { + panic!("buffer size plus padding should be non-zero"); + }; let mut state = MessageAssemblyState::new(max, Duration::from_secs(30)); if buffered_bytes == 0 { return state; @@ -86,10 +97,19 @@ fn should_pause_above_soft_limit(budgets: MemoryBudgets) { #[rstest] fn uses_smallest_aggregate_budget_dimension() { + let Some(per_message) = NonZeroUsize::new(1024) else { + panic!("1024 is non-zero"); + }; + let Some(per_connection) = NonZeroUsize::new(200) else { + panic!("200 is non-zero"); + }; + let Some(in_flight) = NonZeroUsize::new(100) else { + panic!("100 is non-zero"); + }; let budgets = MemoryBudgets::new( - BudgetBytes::new(NonZeroUsize::new(1024).unwrap_or(NonZeroUsize::MIN)), - BudgetBytes::new(NonZeroUsize::new(200).unwrap_or(NonZeroUsize::MIN)), - BudgetBytes::new(NonZeroUsize::new(100).unwrap_or(NonZeroUsize::MIN)), + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), ); let state = state_with_buffered_bytes(80); assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); diff --git a/tests/fixtures/memory_budget_backpressure.rs b/tests/fixtures/memory_budget_backpressure.rs index f42be0e3..c59fbc9a 100644 --- a/tests/fixtures/memory_budget_backpressure.rs +++ b/tests/fixtures/memory_budget_backpressure.rs @@ -34,10 +34,25 @@ impl FromStr for BackpressureConfig { fn from_str(s: &str) -> Result { let mut values = s.split('/').map(str::trim); - let timeout_ms = values.next().ok_or("missing timeout_ms")?; - let per_message = values.next().ok_or("missing per_message")?; - let per_connection = values.next().ok_or("missing per_connection")?; - let in_flight = values.next().ok_or("missing in_flight")?; + let timeout_ms = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing timeout_ms")?; + let per_message = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing per_message")?; + let per_connection = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing per_connection")?; + let in_flight = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing in_flight")?; + if values.next().is_some() { + return Err("unexpected trailing segments".to_string()); + } Ok(Self { timeout_ms: timeout_ms .parse() @@ -105,6 +120,7 @@ impl fmt::Debug for MemoryBudgetBackpressureWorld { } } +/// Construct the default world used by memory-budget back-pressure BDD tests. #[fixture] #[rustfmt::skip] pub fn memory_budget_backpressure_world() -> MemoryBudgetBackpressureWorld { @@ -131,9 +147,11 @@ impl MemoryBudgetBackpressureWorld { Ok(self.runtime()?.block_on(future)) } + /// Start the app under test using the supplied budget and timeout config. pub fn start_app(&mut self, config: BackpressureConfig) -> TestResult { - let fragment_limit = - NonZeroUsize::new(BUFFER_CAPACITY.saturating_mul(16)).unwrap_or(NonZeroUsize::MIN); + let Some(fragment_limit) = NonZeroUsize::new(BUFFER_CAPACITY.saturating_mul(16)) else { + return Err("buffer-derived fragment limit should be non-zero".into()); + }; let fragmentation = FragmentationConfig::for_frame_budget( BUFFER_CAPACITY, fragment_limit, @@ -185,11 +203,13 @@ impl MemoryBudgetBackpressureWorld { Ok(()) } + /// Send a non-final first frame for the provided message key. pub fn send_first_frame(&mut self, key: u64, body: &str) -> TestResult { let payload = test_helpers::first_frame_payload(key, body.as_bytes(), false, None)?; self.send_payload(payload) } + /// Send a final continuation frame for the provided message key. pub fn send_final_continuation_frame( &mut self, key: u64, @@ -201,6 +221,7 @@ impl MemoryBudgetBackpressureWorld { self.send_payload(payload) } + /// Assert that no payload has been emitted yet. pub fn assert_no_payload_ready(&mut self) -> TestResult { self.spin_runtime()?; self.drain_ready_payloads()?; @@ -214,11 +235,13 @@ impl MemoryBudgetBackpressureWorld { .into()) } + /// Advance Tokio virtual time by the supplied duration in milliseconds. pub fn advance_millis(&mut self, millis: u64) -> TestResult { self.block_on(async { tokio::time::advance(Duration::from_millis(millis)).await })?; Ok(()) } + /// Assert that the expected payload is eventually observed. pub fn assert_payload_received(&mut self, expected: &str) -> TestResult { let expected = expected.as_bytes(); for _ in 0..SPIN_ATTEMPTS { @@ -240,6 +263,7 @@ impl MemoryBudgetBackpressureWorld { .into()) } + /// Assert that sending frames into the app did not record any send error. pub fn assert_no_send_error(&self) -> TestResult { if self.last_send_error.is_none() { return Ok(()); @@ -294,3 +318,84 @@ impl MemoryBudgetBackpressureWorld { Ok(()) } } + +#[cfg(test)] +mod backpressure_config_from_str_tests { + use std::str::FromStr; + + use super::BackpressureConfig; + + #[test] + fn parses_valid_backpressure_config() { + let Ok(config) = BackpressureConfig::from_str("1000/10/20/30") else { + panic!("valid config should parse"); + }; + + assert_eq!(config.timeout_ms, 1000); + assert_eq!(config.per_message, 10); + assert_eq!(config.per_connection, 20); + assert_eq!(config.in_flight, 30); + } + + #[test] + fn fails_when_missing_fields() { + let Err(missing_in_flight) = BackpressureConfig::from_str("1000/10/20") else { + panic!("missing in_flight should fail"); + }; + assert_eq!(missing_in_flight, "missing in_flight"); + + let Err(missing_per_message) = BackpressureConfig::from_str("1000") else { + panic!("missing per_message should fail"); + }; + assert_eq!(missing_per_message, "missing per_message"); + + let Err(missing_timeout) = BackpressureConfig::from_str("") else { + panic!("missing timeout_ms should fail"); + }; + assert_eq!(missing_timeout, "missing timeout_ms"); + } + + #[test] + fn fails_with_clear_error_on_non_numeric_segments() { + let Err(timeout_error) = BackpressureConfig::from_str("not-a-number/10/20/30") else { + panic!("non-numeric timeout_ms should fail"); + }; + assert!( + timeout_error.starts_with("timeout_ms:"), + "unexpected error message: {timeout_error}" + ); + + let Err(per_message_error) = BackpressureConfig::from_str("1000/not-a-number/20/30") else { + panic!("non-numeric per_message should fail"); + }; + assert!( + per_message_error.starts_with("per_message:"), + "unexpected error message: {per_message_error}" + ); + + let Err(per_connection_error) = BackpressureConfig::from_str("1000/10/not-a-number/30") + else { + panic!("non-numeric per_connection should fail"); + }; + assert!( + per_connection_error.starts_with("per_connection:"), + "unexpected error message: {per_connection_error}" + ); + + let Err(in_flight_error) = BackpressureConfig::from_str("1000/10/20/not-a-number") else { + panic!("non-numeric in_flight should fail"); + }; + assert!( + in_flight_error.starts_with("in_flight:"), + "unexpected error message: {in_flight_error}" + ); + } + + #[test] + fn fails_when_segments_are_present_after_in_flight() { + let Err(trailing_segments) = BackpressureConfig::from_str("1000/10/20/30/40") else { + panic!("trailing segments should fail"); + }; + assert_eq!(trailing_segments, "unexpected trailing segments"); + } +} From b5911c0adf85643e92787c40d12f59b92a3f4d3e Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 24 Feb 2026 11:04:53 +0000 Subject: [PATCH 4/6] test(memory_budget_backpressure): add comprehensive BackpressureConfig parsing tests - Added unit tests for BackpressureConfig::from_str covering valid input parsing. - Added tests for missing fields and non-numeric segment errors. - Added test for unexpected trailing segments ensuring error correctness. - Implemented tests as new file `memory_budget_backpressure_tests.rs` and integrated in fixtures mod. This improves coverage and validation of backpressure configuration parsing logic. Co-authored-by: devboxerhub[bot] --- docs/execplans/8-3-3-soft-limit-behaviour.md | 58 ++++---- src/app/frame_handling/backpressure_tests.rs | 28 ++-- tests/fixtures/memory_budget_backpressure.rs | 125 ++++-------------- .../memory_budget_backpressure_tests.rs | 68 ++++++++++ tests/fixtures/mod.rs | 1 + 5 files changed, 140 insertions(+), 140 deletions(-) create mode 100644 tests/fixtures/memory_budget_backpressure_tests.rs diff --git a/docs/execplans/8-3-3-soft-limit-behaviour.md b/docs/execplans/8-3-3-soft-limit-behaviour.md index 1dc388b0..7143122b 100644 --- a/docs/execplans/8-3-3-soft-limit-behaviour.md +++ b/docs/execplans/8-3-3-soft-limit-behaviour.md @@ -317,39 +317,47 @@ No roadmap checkbox update until every gate is green. ## Concrete steps -Run from repository root (`/home/user/project`). +Run from the repository root (`.`). 1. Implement Stage A helper + unit tests. 2. Run focused unit tests for helper and inbound assembly behaviour: - set -o pipefail - cargo test --lib frame_handling 2>&1 | tee /tmp/wireframe-8-3-3-unit-a.log - cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-3-unit-b.log +```shell +set -o pipefail +cargo test --lib frame_handling 2>&1 | tee /tmp/wireframe-8-3-3-unit-a.log +cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-3-unit-b.log +``` -3. Integrate Stage B inbound loop pausing and re-run focused unit tests: +1. Integrate Stage B inbound loop pausing and re-run focused unit tests: - set -o pipefail - cargo test --lib inbound_handler 2>&1 | tee /tmp/wireframe-8-3-3-unit-c.log +```shell +set -o pipefail +cargo test --lib inbound_handler 2>&1 | tee /tmp/wireframe-8-3-3-unit-c.log +``` -4. Add Stage C behavioural suite and run targeted BDD scenarios: +1. Add Stage C behavioural suite and run targeted BDD scenarios: - set -o pipefail - cargo test --test bdd --all-features memory_budget_backpressure 2>&1 | tee /tmp/wireframe-8-3-3-bdd.log +```shell +set -o pipefail +cargo test --test bdd --all-features memory_budget_backpressure 2>&1 | tee /tmp/wireframe-8-3-3-bdd.log +``` -5. Update Stage D docs and roadmap. +1. Update Stage D docs and roadmap. -6. Run full quality gates: +2. Run full quality gates: - set -o pipefail - make fmt 2>&1 | tee /tmp/wireframe-8-3-3-fmt.log - make markdownlint 2>&1 | tee /tmp/wireframe-8-3-3-markdownlint.log - make check-fmt 2>&1 | tee /tmp/wireframe-8-3-3-check-fmt.log - make lint 2>&1 | tee /tmp/wireframe-8-3-3-lint.log - make test 2>&1 | tee /tmp/wireframe-8-3-3-test.log - make nixie 2>&1 | tee /tmp/wireframe-8-3-3-nixie.log +```shell +set -o pipefail +make fmt 2>&1 | tee /tmp/wireframe-8-3-3-fmt.log +make markdownlint 2>&1 | tee /tmp/wireframe-8-3-3-markdownlint.log +make check-fmt 2>&1 | tee /tmp/wireframe-8-3-3-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-8-3-3-lint.log +make test 2>&1 | tee /tmp/wireframe-8-3-3-test.log +make nixie 2>&1 | tee /tmp/wireframe-8-3-3-nixie.log +``` -7. If any gate fails, fix only the failing area and rerun the failing command +1. If any gate fails, fix only the failing area and rerun the failing command until green, then rerun affected downstream gates. ## Validation and acceptance @@ -418,10 +426,12 @@ Internal interfaces expected at the end of this milestone: In `src/app/frame_handling/backpressure.rs`: - pub(crate) fn should_pause_inbound_reads( - state: Option<&crate::message_assembler::MessageAssemblyState>, - budgets: Option, - ) -> bool +```rust +pub(crate) fn should_pause_inbound_reads( + state: Option<&crate::message_assembler::MessageAssemblyState>, + budgets: Option, +) -> bool +``` In `src/app/inbound_handler.rs`: diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs index 7c51cfdc..6f006aa0 100644 --- a/src/app/frame_handling/backpressure_tests.rs +++ b/src/app/frame_handling/backpressure_tests.rs @@ -78,21 +78,19 @@ fn should_not_pause_when_state_is_not_available(budgets: MemoryBudgets) { } #[rstest] -fn should_not_pause_below_soft_limit(budgets: MemoryBudgets) { - let state = state_with_buffered_bytes(79); - assert!(!should_pause_inbound_reads(Some(&state), Some(budgets))); -} - -#[rstest] -fn should_pause_at_soft_limit(budgets: MemoryBudgets) { - let state = state_with_buffered_bytes(80); - assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); -} - -#[rstest] -fn should_pause_above_soft_limit(budgets: MemoryBudgets) { - let state = state_with_buffered_bytes(95); - assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); +#[case(79, false)] +#[case(80, true)] +#[case(95, true)] +fn soft_limit_pause_threshold_behaviour( + #[case] buffered_bytes: usize, + #[case] should_pause: bool, + budgets: MemoryBudgets, +) { + let state = state_with_buffered_bytes(buffered_bytes); + assert_eq!( + should_pause_inbound_reads(Some(&state), Some(budgets)), + should_pause + ); } #[rstest] diff --git a/tests/fixtures/memory_budget_backpressure.rs b/tests/fixtures/memory_budget_backpressure.rs index c59fbc9a..c29e9667 100644 --- a/tests/fixtures/memory_budget_backpressure.rs +++ b/tests/fixtures/memory_budget_backpressure.rs @@ -21,7 +21,7 @@ const SPIN_ATTEMPTS: usize = 64; /// Parsed as /// "`timeout_ms` / `per_message` / `per_connection` / `in_flight`". -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub struct BackpressureConfig { pub timeout_ms: u64, pub per_message: usize, @@ -81,30 +81,33 @@ pub struct MemoryBudgetBackpressureWorld { last_send_error: Option, } +impl MemoryBudgetBackpressureWorld { + fn with_runtime( + runtime: Option, + runtime_error: Option, + ) -> Self { + Self { + runtime, + runtime_error, + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + } + } +} + impl Default for MemoryBudgetBackpressureWorld { fn default() -> Self { match tokio::runtime::Builder::new_current_thread() .enable_all() .build() { - Ok(runtime) => Self { - runtime: Some(runtime), - runtime_error: None, - client: None, - server: None, - observed_rx: None, - observed_payloads: Vec::new(), - last_send_error: None, - }, - Err(error) => Self { - runtime: None, - runtime_error: Some(format!("failed to create runtime: {error}")), - client: None, - server: None, - observed_rx: None, - observed_payloads: Vec::new(), - last_send_error: None, - }, + Ok(runtime) => Self::with_runtime(Some(runtime), None), + Err(error) => { + Self::with_runtime(None, Some(format!("failed to create runtime: {error}"))) + } } } } @@ -122,7 +125,6 @@ impl fmt::Debug for MemoryBudgetBackpressureWorld { /// Construct the default world used by memory-budget back-pressure BDD tests. #[fixture] -#[rustfmt::skip] pub fn memory_budget_backpressure_world() -> MemoryBudgetBackpressureWorld { MemoryBudgetBackpressureWorld::default() } @@ -276,6 +278,8 @@ impl MemoryBudgetBackpressureWorld { let serializer = BincodeSerializer; let frame = serializer.serialize(&envelope)?; + // Temporarily move the framed client into the async block so send/flush + // can run without holding a mutable borrow of `self`; then restore it. let mut client = self.client.take().ok_or("client not initialized")?; let send_result = self.block_on(async { client.send(frame.into()).await?; @@ -318,84 +322,3 @@ impl MemoryBudgetBackpressureWorld { Ok(()) } } - -#[cfg(test)] -mod backpressure_config_from_str_tests { - use std::str::FromStr; - - use super::BackpressureConfig; - - #[test] - fn parses_valid_backpressure_config() { - let Ok(config) = BackpressureConfig::from_str("1000/10/20/30") else { - panic!("valid config should parse"); - }; - - assert_eq!(config.timeout_ms, 1000); - assert_eq!(config.per_message, 10); - assert_eq!(config.per_connection, 20); - assert_eq!(config.in_flight, 30); - } - - #[test] - fn fails_when_missing_fields() { - let Err(missing_in_flight) = BackpressureConfig::from_str("1000/10/20") else { - panic!("missing in_flight should fail"); - }; - assert_eq!(missing_in_flight, "missing in_flight"); - - let Err(missing_per_message) = BackpressureConfig::from_str("1000") else { - panic!("missing per_message should fail"); - }; - assert_eq!(missing_per_message, "missing per_message"); - - let Err(missing_timeout) = BackpressureConfig::from_str("") else { - panic!("missing timeout_ms should fail"); - }; - assert_eq!(missing_timeout, "missing timeout_ms"); - } - - #[test] - fn fails_with_clear_error_on_non_numeric_segments() { - let Err(timeout_error) = BackpressureConfig::from_str("not-a-number/10/20/30") else { - panic!("non-numeric timeout_ms should fail"); - }; - assert!( - timeout_error.starts_with("timeout_ms:"), - "unexpected error message: {timeout_error}" - ); - - let Err(per_message_error) = BackpressureConfig::from_str("1000/not-a-number/20/30") else { - panic!("non-numeric per_message should fail"); - }; - assert!( - per_message_error.starts_with("per_message:"), - "unexpected error message: {per_message_error}" - ); - - let Err(per_connection_error) = BackpressureConfig::from_str("1000/10/not-a-number/30") - else { - panic!("non-numeric per_connection should fail"); - }; - assert!( - per_connection_error.starts_with("per_connection:"), - "unexpected error message: {per_connection_error}" - ); - - let Err(in_flight_error) = BackpressureConfig::from_str("1000/10/20/not-a-number") else { - panic!("non-numeric in_flight should fail"); - }; - assert!( - in_flight_error.starts_with("in_flight:"), - "unexpected error message: {in_flight_error}" - ); - } - - #[test] - fn fails_when_segments_are_present_after_in_flight() { - let Err(trailing_segments) = BackpressureConfig::from_str("1000/10/20/30/40") else { - panic!("trailing segments should fail"); - }; - assert_eq!(trailing_segments, "unexpected trailing segments"); - } -} diff --git a/tests/fixtures/memory_budget_backpressure_tests.rs b/tests/fixtures/memory_budget_backpressure_tests.rs new file mode 100644 index 00000000..c68d2aea --- /dev/null +++ b/tests/fixtures/memory_budget_backpressure_tests.rs @@ -0,0 +1,68 @@ +//! Unit tests for `BackpressureConfig` parsing. + +use std::str::FromStr; + +use super::memory_budget_backpressure::BackpressureConfig; + +#[test] +fn parses_valid_backpressure_config() { + let config = BackpressureConfig::from_str("1000/10/20/30").expect("valid config should parse"); + + assert_eq!(config.timeout_ms, 1000); + assert_eq!(config.per_message, 10); + assert_eq!(config.per_connection, 20); + assert_eq!(config.in_flight, 30); +} + +#[test] +fn fails_when_missing_fields() { + let missing_in_flight = + BackpressureConfig::from_str("1000/10/20").expect_err("missing in_flight should fail"); + assert_eq!(missing_in_flight, "missing in_flight"); + + let missing_per_message = + BackpressureConfig::from_str("1000").expect_err("missing per_message should fail"); + assert_eq!(missing_per_message, "missing per_message"); + + let missing_timeout = + BackpressureConfig::from_str("").expect_err("missing timeout_ms should fail"); + assert_eq!(missing_timeout, "missing timeout_ms"); +} + +#[test] +fn fails_with_clear_error_on_non_numeric_segments() { + let timeout_error = BackpressureConfig::from_str("not-a-number/10/20/30") + .expect_err("non-numeric timeout_ms should fail"); + assert!( + timeout_error.starts_with("timeout_ms:"), + "unexpected error message: {timeout_error}" + ); + + let per_message_error = BackpressureConfig::from_str("1000/not-a-number/20/30") + .expect_err("non-numeric per_message should fail"); + assert!( + per_message_error.starts_with("per_message:"), + "unexpected error message: {per_message_error}" + ); + + let per_connection_error = BackpressureConfig::from_str("1000/10/not-a-number/30") + .expect_err("non-numeric per_connection should fail"); + assert!( + per_connection_error.starts_with("per_connection:"), + "unexpected error message: {per_connection_error}" + ); + + let in_flight_error = BackpressureConfig::from_str("1000/10/20/not-a-number") + .expect_err("non-numeric in_flight should fail"); + assert!( + in_flight_error.starts_with("in_flight:"), + "unexpected error message: {in_flight_error}" + ); +} + +#[test] +fn fails_when_segments_are_present_after_in_flight() { + let trailing_segments = BackpressureConfig::from_str("1000/10/20/30/40") + .expect_err("trailing segments should fail"); + assert_eq!(trailing_segments, "unexpected trailing segments"); +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 5a5fe11a..c8d1e330 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -16,6 +16,7 @@ pub mod correlation; pub mod fragment; pub mod interleaved_push_queues; pub mod memory_budget_backpressure; +mod memory_budget_backpressure_tests; pub mod memory_budgets; pub mod message_assembler; pub mod message_assembly; From 078526fd2dd7a577c4ef55998ee0518a2c8d9ca1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 24 Feb 2026 16:39:39 +0000 Subject: [PATCH 5/6] refactor(frame_handling): simplify backpressure_tests.rs using expect calls Replaced multiple uses of pattern matching with `let Some(..) else { panic! }` by concise `.expect(..)` calls in backpressure_tests.rs. This improves code readability and aligns with the added Clippy lint expectation for use of `expect` in test setup for concise failure diagnostics. Co-authored-by: devboxerhub[bot] --- src/app/frame_handling/backpressure_tests.rs | 46 ++++++++------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs index 6f006aa0..80daf90f 100644 --- a/src/app/frame_handling/backpressure_tests.rs +++ b/src/app/frame_handling/backpressure_tests.rs @@ -1,4 +1,8 @@ //! Unit tests for soft-limit back-pressure policy helpers. +#![expect( + clippy::expect_used, + reason = "Test setup intentionally uses expect for concise failure diagnostics." +)] use std::{num::NonZeroUsize, time::Duration}; @@ -18,15 +22,9 @@ use crate::{ #[fixture] fn budgets() -> MemoryBudgets { - let Some(per_message) = NonZeroUsize::new(1024) else { - panic!("1024 is non-zero"); - }; - let Some(per_connection) = NonZeroUsize::new(100) else { - panic!("100 is non-zero"); - }; - let Some(in_flight) = NonZeroUsize::new(100) else { - panic!("100 is non-zero"); - }; + let per_message = NonZeroUsize::new(1024).expect("1024 is non-zero"); + let per_connection = NonZeroUsize::new(100).expect("100 is non-zero"); + let in_flight = NonZeroUsize::new(100).expect("100 is non-zero"); MemoryBudgets::new( BudgetBytes::new(per_message), BudgetBytes::new(per_connection), @@ -35,9 +33,8 @@ fn budgets() -> MemoryBudgets { } fn state_with_buffered_bytes(buffered_bytes: usize) -> MessageAssemblyState { - let Some(max) = NonZeroUsize::new(buffered_bytes.saturating_add(64)) else { - panic!("buffer size plus padding should be non-zero"); - }; + let max = NonZeroUsize::new(buffered_bytes.saturating_add(64)) + .expect("buffer size plus padding should be non-zero"); let mut state = MessageAssemblyState::new(max, Duration::from_secs(30)); if buffered_bytes == 0 { return state; @@ -51,14 +48,11 @@ fn state_with_buffered_bytes(buffered_bytes: usize) -> MessageAssemblyState { total_body_len: None, is_last: false, }; - let input = match FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) { - Ok(input) => input, - Err(error) => panic!("test first frame input should be valid: {error}"), - }; - let result = match state.accept_first_frame(input) { - Ok(result) => result, - Err(error) => panic!("first frame should be accepted: {error}"), - }; + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .expect("test first frame input should be valid"); + let result = state + .accept_first_frame(input) + .expect("first frame should be accepted"); assert!( result.is_none(), "first frame should start an in-flight assembly" @@ -95,15 +89,9 @@ fn soft_limit_pause_threshold_behaviour( #[rstest] fn uses_smallest_aggregate_budget_dimension() { - let Some(per_message) = NonZeroUsize::new(1024) else { - panic!("1024 is non-zero"); - }; - let Some(per_connection) = NonZeroUsize::new(200) else { - panic!("200 is non-zero"); - }; - let Some(in_flight) = NonZeroUsize::new(100) else { - panic!("100 is non-zero"); - }; + let per_message = NonZeroUsize::new(1024).expect("1024 is non-zero"); + let per_connection = NonZeroUsize::new(200).expect("200 is non-zero"); + let in_flight = NonZeroUsize::new(100).expect("100 is non-zero"); let budgets = MemoryBudgets::new( BudgetBytes::new(per_message), BudgetBytes::new(per_connection), From 23b6ee8281472a08aa52f1d393d226d9e4af4fce Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 24 Feb 2026 18:05:23 +0000 Subject: [PATCH 6/6] test(frame_handling): refactor backpressure tests to use Result for error handling Refactored backpressure test functions to return Result and use error propagation instead of unwraps and expects. This improves error diagnostic clarity, avoiding panics and employing error handling idioms consistently throughout tests. Co-authored-by: devboxerhub[bot] --- src/app/frame_handling/backpressure_tests.rs | 98 +++++++++++--------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs index 80daf90f..97e2d702 100644 --- a/src/app/frame_handling/backpressure_tests.rs +++ b/src/app/frame_handling/backpressure_tests.rs @@ -1,10 +1,6 @@ //! Unit tests for soft-limit back-pressure policy helpers. -#![expect( - clippy::expect_used, - reason = "Test setup intentionally uses expect for concise failure diagnostics." -)] -use std::{num::NonZeroUsize, time::Duration}; +use std::{error::Error, io, num::NonZeroUsize, time::Duration}; use rstest::{fixture, rstest}; @@ -20,24 +16,28 @@ use crate::{ }, }; +type TestResult = Result>; + #[fixture] -fn budgets() -> MemoryBudgets { - let per_message = NonZeroUsize::new(1024).expect("1024 is non-zero"); - let per_connection = NonZeroUsize::new(100).expect("100 is non-zero"); - let in_flight = NonZeroUsize::new(100).expect("100 is non-zero"); - MemoryBudgets::new( +fn budgets() -> TestResult { + let per_message = + NonZeroUsize::new(1024).ok_or_else(|| io::Error::other("1024 is non-zero"))?; + let per_connection = + NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?; + let in_flight = NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?; + Ok(MemoryBudgets::new( BudgetBytes::new(per_message), BudgetBytes::new(per_connection), BudgetBytes::new(in_flight), - ) + )) } -fn state_with_buffered_bytes(buffered_bytes: usize) -> MessageAssemblyState { +fn state_with_buffered_bytes(buffered_bytes: usize) -> TestResult { let max = NonZeroUsize::new(buffered_bytes.saturating_add(64)) - .expect("buffer size plus padding should be non-zero"); + .ok_or_else(|| io::Error::other("buffer size plus padding should be non-zero"))?; let mut state = MessageAssemblyState::new(max, Duration::from_secs(30)); if buffered_bytes == 0 { - return state; + return Ok(state); } let body = vec![0_u8; buffered_bytes]; @@ -48,27 +48,31 @@ fn state_with_buffered_bytes(buffered_bytes: usize) -> MessageAssemblyState { total_body_len: None, is_last: false, }; - let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) - .expect("test first frame input should be valid"); - let result = state - .accept_first_frame(input) - .expect("first frame should be accepted"); - assert!( - result.is_none(), - "first frame should start an in-flight assembly" - ); - state + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body)?; + let result = state.accept_first_frame(input)?; + if result.is_some() { + return Err(io::Error::other("first frame should start an in-flight assembly").into()); + } + Ok(state) } #[rstest] -fn should_not_pause_when_budgets_are_not_configured() { - let state = state_with_buffered_bytes(95); - assert!(!should_pause_inbound_reads(Some(&state), None)); +fn should_not_pause_when_budgets_are_not_configured() -> TestResult { + let state = state_with_buffered_bytes(95)?; + if should_pause_inbound_reads(Some(&state), None) { + return Err( + io::Error::other("did not expect pause when budgets are not configured").into(), + ); + } + Ok(()) } #[rstest] -fn should_not_pause_when_state_is_not_available(budgets: MemoryBudgets) { - assert!(!should_pause_inbound_reads(None, Some(budgets))); +fn should_not_pause_when_state_is_not_available(budgets: TestResult) -> TestResult { + if should_pause_inbound_reads(None, Some(budgets?)) { + return Err(io::Error::other("did not expect pause when state is not available").into()); + } + Ok(()) } #[rstest] @@ -78,25 +82,35 @@ fn should_not_pause_when_state_is_not_available(budgets: MemoryBudgets) { fn soft_limit_pause_threshold_behaviour( #[case] buffered_bytes: usize, #[case] should_pause: bool, - budgets: MemoryBudgets, -) { - let state = state_with_buffered_bytes(buffered_bytes); - assert_eq!( - should_pause_inbound_reads(Some(&state), Some(budgets)), - should_pause - ); + budgets: TestResult, +) -> TestResult { + let state = state_with_buffered_bytes(buffered_bytes)?; + let actual = should_pause_inbound_reads(Some(&state), Some(budgets?)); + if actual != should_pause { + return Err(io::Error::other(format!( + "soft limit mismatch: buffered_bytes={buffered_bytes}, expected={should_pause}, \ + actual={actual}" + )) + .into()); + } + Ok(()) } #[rstest] -fn uses_smallest_aggregate_budget_dimension() { - let per_message = NonZeroUsize::new(1024).expect("1024 is non-zero"); - let per_connection = NonZeroUsize::new(200).expect("200 is non-zero"); - let in_flight = NonZeroUsize::new(100).expect("100 is non-zero"); +fn uses_smallest_aggregate_budget_dimension() -> TestResult { + let per_message = + NonZeroUsize::new(1024).ok_or_else(|| io::Error::other("1024 is non-zero"))?; + let per_connection = + NonZeroUsize::new(200).ok_or_else(|| io::Error::other("200 is non-zero"))?; + let in_flight = NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?; let budgets = MemoryBudgets::new( BudgetBytes::new(per_message), BudgetBytes::new(per_connection), BudgetBytes::new(in_flight), ); - let state = state_with_buffered_bytes(80); - assert!(should_pause_inbound_reads(Some(&state), Some(budgets))); + let state = state_with_buffered_bytes(80)?; + if !should_pause_inbound_reads(Some(&state), Some(budgets)) { + return Err(io::Error::other("expected pause at derived soft limit threshold").into()); + } + Ok(()) }