diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 9633c886..185c818b 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -219,10 +219,10 @@ Wireframe will make budget failures explicit and deterministic so protocol implementers can rely on the behaviour across crates: - **Soft budget pressure (back-pressure):** when inbound buffering approaches - the per-connection in-flight cap, Wireframe MUST stop reading further packets - from the socket until buffered bytes fall below the cap. For streaming bodies - backed by bounded queues, this naturally suspends the reader when the queue - is full. + the per-connection in-flight cap, Wireframe MUST pace further reads by + applying short per-iteration pauses before polling the next packet. For + streaming bodies backed by bounded queues, this naturally suspends the reader + when the queue is full. - **Hard cap exceeded during assembly (pre-handler):** if accepting a fragment or continuation packet would exceed the configured per-message or per- connection cap, Wireframe MUST: @@ -313,7 +313,23 @@ Precedence is: routed through the existing `DeserFailureTracker` as `InvalidData`. - Budget enforcement helpers are extracted to `src/message_assembler/budget.rs` to keep `state.rs` under the 400-line file limit. -- Back-pressure (`8.3.3`) and derived defaults (`8.3.5`) remain future work. +- At this point, back-pressure (`8.3.3`) and derived defaults (`8.3.5`) + remained future work. + +#### Implementation decisions (2026-02-23) + +- Roadmap item `8.3.3` now implements soft-limit back-pressure in the inbound + read loop (`src/app/inbound_handler.rs`) by pausing briefly before polling + the next frame when buffered assembly bytes are under pressure. +- Soft pressure is computed from assembly-state buffered bytes versus the + smaller aggregate cap (`min(bytes_per_connection, bytes_in_flight)`), with a + threshold of 80% to engage pacing before hard-cap rejection. +- The soft-limit policy is implemented in + `src/app/frame_handling/backpressure.rs` to keep `inbound_handler.rs` and + `state.rs` within the repository file-size constraint. +- Under sustained pressure, pacing is applied as short per-iteration pauses + rather than an indefinite read stop, so in-flight assemblies can continue to + make progress. #### Budget enforcement diff --git a/docs/execplans/8-3-3-soft-limit-behaviour.md b/docs/execplans/8-3-3-soft-limit-behaviour.md new file mode 100644 index 00000000..7143122b --- /dev/null +++ b/docs/execplans/8-3-3-soft-limit-behaviour.md @@ -0,0 +1,444 @@ +# Implement soft-limit inbound read pausing for memory budgets (8.3.3) + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: COMPLETE + +No `PLANS.md` exists in this repository as of 2026-02-23. + +## Purpose / big picture + +Roadmap item `8.3.3` requires soft-limit behaviour for per-connection memory +budgets: when inbound buffered bytes approach configured aggregate caps, +Wireframe should apply back-pressure by pausing socket reads rather than +immediately consuming the next frame. + +After this change, operators who configure `WireframeApp::memory_budgets(...)` +will get two complementary protections on inbound assembly paths: + +- soft pressure: paced reads under high buffered-byte pressure; and +- hard cap: deterministic rejection and cleanup when a limit is exceeded + (already implemented in `8.3.2`). + +Success is observable when: + +- inbound reads are paused under soft pressure using the configured budgets; +- the pause path is covered by `rstest` unit tests; +- behaviour is covered by `rstest-bdd` (`0.5.0`) scenarios using a live inbound + runtime fixture; +- design decisions are recorded in + `docs/adr-002-streaming-requests-and-shared-message-assembly.md`; +- `docs/users-guide.md` explains the soft-limit runtime behaviour for library + consumers; and +- `docs/roadmap.md` marks `8.3.3` as done only after all quality gates pass. + +## Constraints + +- Scope is strictly roadmap item `8.3.3`: implement soft-limit read pausing for + inbound assembly pressure. +- Do not regress or re-scope hard-cap enforcement semantics from `8.3.2`. +- Preserve runtime behaviour when `memory_budgets` is not configured. +- Keep public API changes to zero unless unavoidable. If a public API change is + required, stop and escalate. +- Do not add new external dependencies. +- Keep all modified or new source files at or below the 400-line repository + guidance. +- Validate with `rstest` unit tests and `rstest-bdd` behavioural tests. +- Follow testing guidance from: + `docs/rust-testing-with-rstest-fixtures.md`, + `docs/reliable-testing-in-rust-via-dependency-injection.md`, and + `docs/rstest-bdd-users-guide.md`. +- Record implementation decisions in the relevant design document(s), primarily + ADR 0002. +- Update `docs/users-guide.md` for any consumer-visible behavioural change. +- Mark roadmap item `8.3.3` done only after all gates pass. + +## Tolerances (exception triggers) + +- Scope: if implementation requires changes to more than 14 files or more than + 700 net lines of code (LOC), stop and escalate. +- Interface: if implementing `8.3.3` requires a new public builder method, + changed public function signatures, or new public types, stop and escalate. +- Dependencies: if any new crate is required, stop and escalate. +- Semantics ambiguity: if the project requires a specific soft-limit threshold + or pause cadence not documented in ADR/design docs, stop and present options + with trade-offs before finalizing behaviour. +- Iterations: if the same failing gate persists after 3 focused attempts, stop + and escalate. +- Time: if any single stage exceeds 4 hours elapsed effort, stop and escalate. + +## Risks + +- Risk: `src/app/inbound_handler.rs` is currently 383 lines, so adding soft + limit logic may exceed the 400-line file cap. Severity: high Likelihood: high + Mitigation: place threshold calculation and pause-decision logic in a new + helper module under `src/app/frame_handling/`, keeping inbound loop changes + minimal. + +- Risk: `src/message_assembler/state.rs` is already 400 lines. + Severity: medium Likelihood: medium Mitigation: avoid adding logic there for + this item; consume existing `total_buffered_bytes()` query from the inbound + runtime instead. + +- Risk: full read suspension can deadlock in buffered multi-frame assembly if no + bytes can be reclaimed without reading additional frames. Severity: high + Likelihood: medium Mitigation: implement soft pressure as paced polling + (bounded read pauses per loop iteration), not indefinite suspension until + below threshold. + +- Risk: timing-sensitive behaviour tests can become flaky. + Severity: medium Likelihood: medium Mitigation: use Tokio paused time + (`tokio::time::pause`/`advance`) in a dedicated BDD world and assert + immediate-vs-delayed availability with deterministic checks (`try_recv` + before time advance, eventual receive after advance). + +## Progress + +- [x] (2026-02-23 19:10Z) Drafted ExecPlan for roadmap item `8.3.3`. +- [x] (2026-02-23) Stage A: added soft-limit policy helper and unit coverage. +- [x] (2026-02-23) Stage B: integrated paced read pausing into inbound loop. +- [x] (2026-02-23) Stage C: added behavioural coverage (`rstest-bdd` v0.5.0). +- [x] (2026-02-23) Stage D: updated ADR, design docs, user guide, and roadmap. +- [x] (2026-02-23) Stage E: reran all quality gates and finalized this + ExecPlan. + +## Surprises & Discoveries + +- Observation: hard-cap budget enforcement was already completed in `8.3.2`. + Evidence: `src/message_assembler/state.rs` and + `docs/execplans/8-3-2-budget-enforcement.md`. Impact: `8.3.3` should add + pacing/back-pressure only, not duplicate hard-cap rejection logic. + +- Observation: inbound read polling currently happens in + `WireframeApp::process_stream` (`src/app/inbound_handler.rs`), not in + `src/connection/`. Evidence: `src/server/connection_spawner.rs` invokes + `app.handle_connection_result`, and the read loop is in + `src/app/inbound_handler.rs`. Impact: soft-limit read pausing must be + implemented in the app inbound path. + +- Observation: current user guide memory-budget section documents hard-cap + rejection semantics but not explicit soft-limit pacing details. Evidence: + `docs/users-guide.md` section "Per-connection memory budgets". Impact: docs + must be updated so consumers understand runtime behaviour under near-cap + pressure. + +- Observation: new back-pressure step text initially conflicted with existing + message-assembly step definitions. Evidence: rstest-bdd ambiguity during + scenario expansion. Impact: step phrases were renamed with a `budgeted` + prefix to keep glue deterministic. + +- Observation: pre-existing `push_expect!` call sites failed strict + `unused_must_use` linting while running workspace gates. Evidence: + `make lint` output for `tests/interleaved_push_queues.rs` and + `tests/fixtures/interleaved_push_queues.rs`. Impact: the existing call sites + were fixed (`let _ = ...`) so the 8.3.3 gate run can complete cleanly. + +## Decision Log + +- Decision: implement soft pressure as paced read pausing before `framed.next` + in the inbound loop, driven by current buffered bytes and configured + aggregate budgets. Rationale: this satisfies "pause reads" semantics while + avoiding structural changes to assembly-state ownership. Date/Author: + 2026-02-23 / Codex. + +- Decision: derive soft pressure from the minimum active aggregate cap + (`bytes_per_connection`, `bytes_in_flight`) and compare without integer + division to satisfy strict clippy settings. Rationale: the smaller cap is the + earliest risk boundary, and avoiding integer division keeps lint policy + intact. Date/Author: 2026-02-23 / Codex. + +- Decision: keep soft-limit policy internal (no new public API) for `8.3.3`. + Rationale: roadmap scope calls for runtime behaviour, and existing + `memory_budgets(...)` configuration is sufficient. Date/Author: 2026-02-23 / + Codex. + +## Outcomes & Retrospective + +Implemented roadmap item `8.3.3` with no public API changes: + +- Added `src/app/frame_handling/backpressure.rs` and unit tests in + `src/app/frame_handling/backpressure_tests.rs`. +- Integrated soft-limit pacing into `WireframeApp::process_stream` via + `should_pause_inbound_reads(...)` and a short async pause before polling + additional frames. +- Added `rstest-bdd` coverage via: + `tests/features/memory_budget_backpressure.feature`, + `tests/fixtures/memory_budget_backpressure.rs`, + `tests/steps/memory_budget_backpressure_steps.rs`, and + `tests/scenarios/memory_budget_backpressure_scenarios.rs`. +- Updated documentation in: + `docs/adr-002-streaming-requests-and-shared-message-assembly.md`, + `docs/generic-message-fragmentation-and-re-assembly-design.md`, + `docs/users-guide.md`, and `docs/roadmap.md`. + +Validation completed with fresh logs: + +- `make fmt` (`/tmp/wireframe-8-3-3-fmt.log`) +- `make markdownlint` (`/tmp/wireframe-8-3-3-markdownlint.log`) +- `make check-fmt` (`/tmp/wireframe-8-3-3-check-fmt.log`) +- `make lint` (`/tmp/wireframe-8-3-3-lint.log`) +- `make test` (`/tmp/wireframe-8-3-3-test.log`) +- `make nixie` (`/tmp/wireframe-8-3-3-nixie.log`) + +All gates passed on 2026-02-23. + +## Context and orientation + +Current budget plumbing: + +- `src/app/memory_budgets.rs` defines `MemoryBudgets` and `BudgetBytes`. +- `src/app/builder/config.rs` exposes `WireframeApp::memory_budgets(...)`. +- `src/app/frame_handling/assembly.rs::new_message_assembly_state` threads + budgets into `MessageAssemblyState::with_budgets(...)`. +- `src/message_assembler/state.rs` enforces hard caps and exposes + `total_buffered_bytes()`. + +Current inbound read path: + +- `WireframeApp::process_stream` in `src/app/inbound_handler.rs` drives + `timeout(timeout_dur, framed.next()).await` in a loop. +- decoded envelopes flow through decode -> transport reassembly -> message + assembly -> handler dispatch. + +Relevant docs to keep aligned: + +- `docs/roadmap.md` (`8.3.3` target row). +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +- `docs/generic-message-fragmentation-and-re-assembly-design.md`. +- `docs/multi-packet-and-streaming-responses-design.md`. +- `docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md`. +- `docs/users-guide.md`. +- `docs/rust-doctest-dry-guide.md` (for doc snippet safety expectations). + +Testing topology to use: + +- unit tests in crate modules using `rstest`; +- behavioural tests via `tests/features/`, `tests/fixtures/`, `tests/steps/`, + `tests/scenarios/` with `rstest-bdd` v0.5.0; +- quality gates through Makefile targets. + +## Plan of work + +### Stage A: implement a soft-limit policy helper with unit tests + +Create a new helper module for inbound soft-pressure decisions, keeping +`src/app/inbound_handler.rs` under the file-size limit. + +Expected edits: + +- Add `src/app/frame_handling/backpressure.rs` with: + - a function that computes whether soft pressure is active using: + `buffered_bytes` from `MessageAssemblyState` and configured aggregate caps; + - a private soft-threshold rule based on the smallest active aggregate cap; + - a pause-duration constant (internal) used by inbound loop pacing. +- Register helper exports in `src/app/frame_handling/mod.rs`. +- Add `src/app/frame_handling/backpressure_tests.rs` with `rstest` coverage. + +Unit tests must cover at least: + +- no budgets configured -> never pause; +- below threshold -> no pause; +- at/above threshold -> pause; +- dual-budget case uses the smallest cap as the governing threshold; +- helper remains pure and deterministic (no async/timing in policy unit tests). + +Go/no-go: if policy requires public API changes, stop and escalate. + +### Stage B: integrate paced pausing into inbound read loop + +Modify `src/app/inbound_handler.rs` to consult the new soft-pressure helper +before polling `framed.next()`. + +Integration behaviour: + +- if soft pressure is active, pause reads for the configured duration, + purge expired assembly/fragment state, and continue loop; +- otherwise proceed with normal `timeout(..., framed.next())` processing; +- preserve existing decode/reassemble/assemble/dispatch ordering and error + handling. + +Keep loop changes small and explicitly logged (debug-level) for observability. + +Go/no-go: if this pushes `src/app/inbound_handler.rs` past 400 lines, extract +additional local logic into `src/app/frame_handling/` helpers before continuing. + +### Stage C: add behavioural tests (`rstest-bdd` v0.5.0) + +Add a dedicated behavioural suite for soft-limit read pausing. + +Expected edits: + +- `tests/features/memory_budget_backpressure.feature` +- `tests/fixtures/memory_budget_backpressure.rs` +- `tests/steps/memory_budget_backpressure_steps.rs` +- `tests/scenarios/memory_budget_backpressure_scenarios.rs` +- register modules in: + - `tests/fixtures/mod.rs` + - `tests/steps/mod.rs` + - `tests/scenarios/mod.rs` + +Scenario coverage target: + +- under soft pressure, inbound payload completion is not immediately dispatched + before virtual time advances; +- after advancing virtual time, the delayed read resumes and the payload is + delivered; +- when buffered bytes are comfortably below threshold, dispatch proceeds without + pressure-induced delay. + +Go/no-go: if timing assertions are flaky under virtual time, refactor the world +fixture to use deterministic availability checks (`try_recv`) plus explicit +`advance` steps. + +### Stage D: documentation and roadmap updates + +Update documentation to reflect implemented behaviour and decisions. + +Required updates: + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md`: + add `8.3.3` implementation decisions and soft-limit semantics. +- `docs/users-guide.md`: + update "Per-connection memory budgets" to explain soft-limit read pausing and + its relation to hard-cap rejection. +- `docs/generic-message-fragmentation-and-re-assembly-design.md`: + align section 9.3 wording with concrete runtime behaviour (if needed). +- `docs/roadmap.md`: + mark `8.3.3` as done only after all gates pass. + +### Stage E: quality gates and final verification + +Run all required gates and capture logs with `tee`. + +No roadmap checkbox update until every gate is green. + +## Concrete steps + +Run from the repository root (`.`). + +1. Implement Stage A helper + unit tests. + +2. Run focused unit tests for helper and inbound assembly behaviour: + +```shell +set -o pipefail +cargo test --lib frame_handling 2>&1 | tee /tmp/wireframe-8-3-3-unit-a.log +cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-3-unit-b.log +``` + +1. Integrate Stage B inbound loop pausing and re-run focused unit tests: + +```shell +set -o pipefail +cargo test --lib inbound_handler 2>&1 | tee /tmp/wireframe-8-3-3-unit-c.log +``` + +1. Add Stage C behavioural suite and run targeted BDD scenarios: + +```shell +set -o pipefail +cargo test --test bdd --all-features memory_budget_backpressure 2>&1 | tee /tmp/wireframe-8-3-3-bdd.log +``` + +1. Update Stage D docs and roadmap. + +2. Run full quality gates: + +```shell +set -o pipefail +make fmt 2>&1 | tee /tmp/wireframe-8-3-3-fmt.log +make markdownlint 2>&1 | tee /tmp/wireframe-8-3-3-markdownlint.log +make check-fmt 2>&1 | tee /tmp/wireframe-8-3-3-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-8-3-3-lint.log +make test 2>&1 | tee /tmp/wireframe-8-3-3-test.log +make nixie 2>&1 | tee /tmp/wireframe-8-3-3-nixie.log +``` + +1. If any gate fails, fix only the failing area and rerun the failing command + until green, then rerun affected downstream gates. + +## Validation and acceptance + +Acceptance criteria: + +- Soft-limit behaviour: inbound reads are paced under memory pressure before + hard cap violation. +- Hard-cap behaviour from `8.3.2` remains intact (no regressions). +- Unit tests (`rstest`) validate policy and inbound integration points. +- Behavioural tests (`rstest-bdd` 0.5.0) validate delayed-vs-resumed read + behaviour through scenario steps. +- Design documentation records final soft-limit decisions. +- User guide reflects consumer-visible runtime behaviour. +- `docs/roadmap.md` marks `8.3.3` done. + +Quality criteria: + +- tests: `make test` passes; +- lint: `make lint` passes with no warnings; +- formatting: `make fmt` and `make check-fmt` pass; +- markdown: `make markdownlint` passes; +- mermaid validation: `make nixie` passes. + +## Idempotence and recovery + +All planned edits are additive and safe to rerun. + +If a step fails: + +- preserve local changes; +- inspect the relevant `/tmp/wireframe-8-3-3-*.log` file; +- apply the minimal fix; +- rerun only the failed command first, then downstream gates. + +Avoid destructive git commands. If rollback is required, revert only files +changed for `8.3.3`. + +## Artefacts and notes + +Expected artefacts after completion: + +- New: `src/app/frame_handling/backpressure.rs`. +- New: `src/app/frame_handling/backpressure_tests.rs`. +- Modified: `src/app/frame_handling/mod.rs`. +- Modified: `src/app/inbound_handler.rs`. +- New: `tests/features/memory_budget_backpressure.feature`. +- New: `tests/fixtures/memory_budget_backpressure.rs`. +- New: `tests/steps/memory_budget_backpressure_steps.rs`. +- New: `tests/scenarios/memory_budget_backpressure_scenarios.rs`. +- Modified: `tests/fixtures/mod.rs`. +- Modified: `tests/steps/mod.rs`. +- Modified: `tests/scenarios/mod.rs`. +- Modified: `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +- Modified: `docs/users-guide.md`. +- Modified: `docs/generic-message-fragmentation-and-re-assembly-design.md` + (if wording requires alignment). +- Modified: `docs/roadmap.md` (`8.3.3` checkbox). +- Gate logs: `/tmp/wireframe-8-3-3-*.log`. + +## Interfaces and dependencies + +No new external dependencies are required. + +Internal interfaces expected at the end of this milestone: + +In `src/app/frame_handling/backpressure.rs`: + +```rust +pub(crate) fn should_pause_inbound_reads( + state: Option<&crate::message_assembler::MessageAssemblyState>, + budgets: Option, +) -> bool +``` + +In `src/app/inbound_handler.rs`: + +- `process_stream` consults `should_pause_inbound_reads(...)` before polling + `framed.next()` and applies an async sleep when soft pressure is active. + +In behavioural tests: + +- a new rstest-bdd world validates soft-limit pause/resume semantics with + deterministic virtual-time control. diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index a8a615be..64160989 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -481,6 +481,12 @@ reads and assembly work. When a hard cap is exceeded, Wireframe aborts early, releases partial state, and surfaces `std::io::ErrorKind::InvalidData` from the inbound processing path. +The current soft-limit implementation paces reads by inserting a short pause +before polling the next inbound frame once buffered bytes reach 80% of the +smaller aggregate budget (`bytes_per_connection` and `bytes_in_flight`). This +keeps pressure visible without permanently stalling assemblies that still need +additional frames to complete. + If both transport fragmentation and `MessageAssembler` are enabled, the effective message cap is whichever guard triggers first. Operators should set the fragmentation `max_message_size` and the message assembly per-message cap diff --git a/docs/roadmap.md b/docs/roadmap.md index 3bb2d9f9..fde523e6 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -294,7 +294,7 @@ and standardized per-connection memory budgets. - [x] 8.3.1. Add `WireframeApp::memory_budgets(...)` builder method. - [x] 8.3.2. Implement budget enforcement covering bytes per message, bytes per connection, and bytes across in-flight assemblies. -- [ ] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. +- [x] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. - [ ] 8.3.4. Implement hard cap (abort early, release partial state, surface `InvalidData`) behaviour. - [ ] 8.3.5. Define derived defaults based on `buffer_capacity` when budgets diff --git a/docs/users-guide.md b/docs/users-guide.md index d142f2fc..80feeb83 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -609,6 +609,13 @@ is the minimum of the fragmentation `max_message_size` and the configured `bytes_per_message`. Single-frame messages that complete immediately are never counted against aggregate budgets, since they do not buffer. +Wireframe also applies soft-limit read pacing before hard-cap rejection. When +buffered assembly bytes reach the soft-pressure threshold (80% of the smaller +aggregate cap from `bytes_per_connection` and `bytes_in_flight`), the inbound +connection loop briefly pauses socket reads before polling the next frame. This +propagates back-pressure to senders while still allowing progress on in-flight +assemblies. + #### Message key multiplexing (8.2.3) The `MessageAssemblyState` type manages multiple concurrent message assemblies diff --git a/src/app/frame_handling/backpressure.rs b/src/app/frame_handling/backpressure.rs new file mode 100644 index 00000000..40c6a8ea --- /dev/null +++ b/src/app/frame_handling/backpressure.rs @@ -0,0 +1,48 @@ +//! Soft-limit back-pressure helpers for inbound read pacing. +//! +//! These helpers detect when buffered assembly bytes approach configured +//! aggregate memory budgets and provide a small pause duration that throttles +//! subsequent socket reads. + +use std::time::Duration; + +use crate::{app::MemoryBudgets, message_assembler::MessageAssemblyState}; + +/// Soft-pressure threshold numerator (4/5 == 80%). +const SOFT_LIMIT_NUMERATOR: u128 = 4; +/// Soft-pressure threshold denominator (4/5 == 80%). +const SOFT_LIMIT_DENOMINATOR: u128 = 5; +/// Read-pacing delay applied while under soft budget pressure. +const SOFT_LIMIT_PAUSE_DURATION: Duration = Duration::from_millis(5); + +/// Return `true` when inbound reads should be paced due to soft budget pressure. +#[must_use] +pub(crate) fn should_pause_inbound_reads( + state: Option<&MessageAssemblyState>, + budgets: Option, +) -> bool { + let (Some(state), Some(budgets)) = (state, budgets) else { + return false; + }; + + let buffered_bytes = state.total_buffered_bytes(); + let aggregate_limit = active_aggregate_limit_bytes(budgets); + is_at_or_above_soft_limit(buffered_bytes, aggregate_limit) +} + +/// Duration to pause between inbound reads while soft pressure is active. +#[must_use] +pub(crate) const fn soft_limit_pause_duration() -> Duration { SOFT_LIMIT_PAUSE_DURATION } + +fn active_aggregate_limit_bytes(budgets: MemoryBudgets) -> usize { + budgets + .bytes_per_connection() + .as_usize() + .min(budgets.bytes_in_flight().as_usize()) +} + +fn is_at_or_above_soft_limit(buffered_bytes: usize, aggregate_limit: usize) -> bool { + let lhs = (buffered_bytes as u128).saturating_mul(SOFT_LIMIT_DENOMINATOR); + let rhs = (aggregate_limit as u128).saturating_mul(SOFT_LIMIT_NUMERATOR); + lhs >= rhs +} diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs new file mode 100644 index 00000000..97e2d702 --- /dev/null +++ b/src/app/frame_handling/backpressure_tests.rs @@ -0,0 +1,116 @@ +//! Unit tests for soft-limit back-pressure policy helpers. + +use std::{error::Error, io, num::NonZeroUsize, time::Duration}; + +use rstest::{fixture, rstest}; + +use super::should_pause_inbound_reads; +use crate::{ + app::{BudgetBytes, MemoryBudgets}, + message_assembler::{ + EnvelopeRouting, + FirstFrameHeader, + FirstFrameInput, + MessageAssemblyState, + MessageKey, + }, +}; + +type TestResult = Result>; + +#[fixture] +fn budgets() -> TestResult { + let per_message = + NonZeroUsize::new(1024).ok_or_else(|| io::Error::other("1024 is non-zero"))?; + let per_connection = + NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?; + let in_flight = NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?; + Ok(MemoryBudgets::new( + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), + )) +} + +fn state_with_buffered_bytes(buffered_bytes: usize) -> TestResult { + let max = NonZeroUsize::new(buffered_bytes.saturating_add(64)) + .ok_or_else(|| io::Error::other("buffer size plus padding should be non-zero"))?; + let mut state = MessageAssemblyState::new(max, Duration::from_secs(30)); + if buffered_bytes == 0 { + return Ok(state); + } + + let body = vec![0_u8; buffered_bytes]; + let header = FirstFrameHeader { + message_key: MessageKey(1), + metadata_len: 0, + body_len: buffered_bytes, + total_body_len: None, + is_last: false, + }; + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body)?; + let result = state.accept_first_frame(input)?; + if result.is_some() { + return Err(io::Error::other("first frame should start an in-flight assembly").into()); + } + Ok(state) +} + +#[rstest] +fn should_not_pause_when_budgets_are_not_configured() -> TestResult { + let state = state_with_buffered_bytes(95)?; + if should_pause_inbound_reads(Some(&state), None) { + return Err( + io::Error::other("did not expect pause when budgets are not configured").into(), + ); + } + Ok(()) +} + +#[rstest] +fn should_not_pause_when_state_is_not_available(budgets: TestResult) -> TestResult { + if should_pause_inbound_reads(None, Some(budgets?)) { + return Err(io::Error::other("did not expect pause when state is not available").into()); + } + Ok(()) +} + +#[rstest] +#[case(79, false)] +#[case(80, true)] +#[case(95, true)] +fn soft_limit_pause_threshold_behaviour( + #[case] buffered_bytes: usize, + #[case] should_pause: bool, + budgets: TestResult, +) -> TestResult { + let state = state_with_buffered_bytes(buffered_bytes)?; + let actual = should_pause_inbound_reads(Some(&state), Some(budgets?)); + if actual != should_pause { + return Err(io::Error::other(format!( + "soft limit mismatch: buffered_bytes={buffered_bytes}, expected={should_pause}, \ + actual={actual}" + )) + .into()); + } + Ok(()) +} + +#[rstest] +fn uses_smallest_aggregate_budget_dimension() -> TestResult { + let per_message = + NonZeroUsize::new(1024).ok_or_else(|| io::Error::other("1024 is non-zero"))?; + let per_connection = + NonZeroUsize::new(200).ok_or_else(|| io::Error::other("200 is non-zero"))?; + let in_flight = NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?; + let budgets = MemoryBudgets::new( + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), + ); + let state = state_with_buffered_bytes(80)?; + if !should_pause_inbound_reads(Some(&state), Some(budgets)) { + return Err(io::Error::other("expected pause at derived soft limit threshold").into()); + } + Ok(()) +} diff --git a/src/app/frame_handling/mod.rs b/src/app/frame_handling/mod.rs index f15062f3..f55917b8 100644 --- a/src/app/frame_handling/mod.rs +++ b/src/app/frame_handling/mod.rs @@ -3,6 +3,7 @@ //! Extracted from `connection.rs` to keep modules small and focused. mod assembly; +mod backpressure; mod core; mod decode; mod reassembly; @@ -16,6 +17,7 @@ pub(crate) use assembly::{ new_message_assembly_state, purge_expired_assemblies, }; +pub(crate) use backpressure::{should_pause_inbound_reads, soft_limit_pause_duration}; pub(crate) use decode::decode_envelope; pub(crate) use reassembly::reassemble_if_needed; pub(crate) use response::forward_response; @@ -23,4 +25,6 @@ pub(crate) use response::forward_response; #[cfg(all(test, not(loom)))] mod assembly_tests; #[cfg(all(test, not(loom)))] +mod backpressure_tests; +#[cfg(all(test, not(loom)))] mod tests; diff --git a/src/app/inbound_handler.rs b/src/app/inbound_handler.rs index 3f7aa992..aa9d6c95 100644 --- a/src/app/inbound_handler.rs +++ b/src/app/inbound_handler.rs @@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt}; use log::{debug, warn}; use tokio::{ io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}, - time::{Duration, timeout}, + time::{Duration, sleep, timeout}, }; use tokio_util::codec::{Encoder, Framed, LengthDelimitedCodec}; @@ -278,6 +278,15 @@ where let timeout_dur = Duration::from_millis(self.read_timeout_ms); loop { + if frame_handling::should_pause_inbound_reads( + message_assembly.as_ref(), + self.memory_budgets, + ) { + debug!("soft memory budget pressure detected; pausing inbound reads briefly"); + sleep(frame_handling::soft_limit_pause_duration()).await; + purge_expired(&mut pipeline, &mut message_assembly); + } + match timeout(timeout_dur, framed.next()).await { Ok(Some(Ok(frame))) => { self.handle_frame( diff --git a/tests/features/memory_budget_backpressure.feature b/tests/features/memory_budget_backpressure.feature new file mode 100644 index 00000000..0a76a370 --- /dev/null +++ b/tests/features/memory_budget_backpressure.feature @@ -0,0 +1,20 @@ +@memory_budget_backpressure +Feature: Soft-limit memory budget back-pressure + Inbound reads are paced when buffered assembly bytes approach configured + aggregate memory budgets. + + Scenario: Soft pressure delays completion until virtual time advances + Given a back-pressure inbound app configured as 200/2048/10/10 + When a budgeted first frame for key 1 with body "aaaaaaaa" arrives + And a budgeted final continuation frame for key 1 sequence 1 with body "bb" arrives + Then no budgeted payload is available before virtual time advances + When budgeted virtual time advances by 5 milliseconds + Then budgeted payload "aaaaaaaabb" is eventually received + And no budgeted send error is recorded + + Scenario: Reads continue without delay when pressure is low + Given a back-pressure inbound app configured as 200/2048/100/100 + When a budgeted first frame for key 2 with body "aa" arrives + And a budgeted final continuation frame for key 2 sequence 1 with body "bb" arrives + Then budgeted payload "aabb" is eventually received + And no budgeted send error is recorded diff --git a/tests/fixtures/interleaved_push_queues.rs b/tests/fixtures/interleaved_push_queues.rs index 4f17e46b..03472438 100644 --- a/tests/fixtures/interleaved_push_queues.rs +++ b/tests/fixtures/interleaved_push_queues.rs @@ -64,11 +64,11 @@ impl InterleavedPushWorld { time_slice: None, }, |handle| async move { - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); - push_expect!(handle.push_high_priority(1)); - push_expect!(handle.push_high_priority(2)); - push_expect!(handle.push_high_priority(3)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(2)); + let _ = push_expect!(handle.push_high_priority(3)); }, ) .await @@ -87,10 +87,10 @@ impl InterleavedPushWorld { }, |handle| async move { for n in 1..=6 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); }, ) .await @@ -110,7 +110,7 @@ impl InterleavedPushWorld { .rate(Some(1)) .build()?; - push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(1)); // The low-priority push should be blocked: the single token was // consumed by the high-priority push above. @@ -139,10 +139,10 @@ impl InterleavedPushWorld { }, |handle| async move { for n in 1..=5 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } for n in 101..=105 { - push_expect!(handle.push_low_priority(n)); + let _ = push_expect!(handle.push_low_priority(n)); } }, ) diff --git a/tests/fixtures/memory_budget_backpressure.rs b/tests/fixtures/memory_budget_backpressure.rs new file mode 100644 index 00000000..c29e9667 --- /dev/null +++ b/tests/fixtures/memory_budget_backpressure.rs @@ -0,0 +1,324 @@ +//! Behavioural fixture for soft-limit memory-budget back-pressure scenarios. + +use std::{fmt, future::Future, num::NonZeroUsize, str::FromStr, time::Duration}; + +use futures::SinkExt; +use rstest::fixture; +use tokio::{io::DuplexStream, sync::mpsc, task::JoinHandle}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + app::{BudgetBytes, Envelope, Handler, MemoryBudgets, WireframeApp}, + fragment::FragmentationConfig, + serializer::{BincodeSerializer, Serializer}, + test_helpers::{self, TestAssembler}, +}; +pub use wireframe_testing::TestResult; + +const ROUTE_ID: u32 = 88; +const CORRELATION_ID: Option = Some(9); +const BUFFER_CAPACITY: usize = 512; +const SPIN_ATTEMPTS: usize = 64; + +/// Parsed as +/// "`timeout_ms` / `per_message` / `per_connection` / `in_flight`". +#[derive(Clone, Copy, Debug)] +pub struct BackpressureConfig { + pub timeout_ms: u64, + pub per_message: usize, + pub per_connection: usize, + pub in_flight: usize, +} + +impl FromStr for BackpressureConfig { + type Err = String; + + fn from_str(s: &str) -> Result { + let mut values = s.split('/').map(str::trim); + let timeout_ms = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing timeout_ms")?; + let per_message = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing per_message")?; + let per_connection = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing per_connection")?; + let in_flight = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing in_flight")?; + if values.next().is_some() { + return Err("unexpected trailing segments".to_string()); + } + Ok(Self { + timeout_ms: timeout_ms + .parse() + .map_err(|error| format!("timeout_ms: {error}"))?, + per_message: per_message + .parse() + .map_err(|error| format!("per_message: {error}"))?, + per_connection: per_connection + .parse() + .map_err(|error| format!("per_connection: {error}"))?, + in_flight: in_flight + .parse() + .map_err(|error| format!("in_flight: {error}"))?, + }) + } +} + +/// Runtime-backed fixture that drives inbound assembly with memory budgets. +pub struct MemoryBudgetBackpressureWorld { + runtime: Option, + runtime_error: Option, + client: Option>, + server: Option>>, + observed_rx: Option>>, + observed_payloads: Vec>, + last_send_error: Option, +} + +impl MemoryBudgetBackpressureWorld { + fn with_runtime( + runtime: Option, + runtime_error: Option, + ) -> Self { + Self { + runtime, + runtime_error, + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + } + } +} + +impl Default for MemoryBudgetBackpressureWorld { + fn default() -> Self { + match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => Self::with_runtime(Some(runtime), None), + Err(error) => { + Self::with_runtime(None, Some(format!("failed to create runtime: {error}"))) + } + } + } +} + +impl fmt::Debug for MemoryBudgetBackpressureWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemoryBudgetBackpressureWorld") + .field("client_initialized", &self.client.is_some()) + .field("server_initialized", &self.server.is_some()) + .field("observed_payloads", &self.observed_payloads.len()) + .field("last_send_error", &self.last_send_error) + .finish_non_exhaustive() + } +} + +/// Construct the default world used by memory-budget back-pressure BDD tests. +#[fixture] +pub fn memory_budget_backpressure_world() -> MemoryBudgetBackpressureWorld { + MemoryBudgetBackpressureWorld::default() +} + +impl MemoryBudgetBackpressureWorld { + fn runtime(&self) -> TestResult<&tokio::runtime::Runtime> { + self.runtime.as_ref().ok_or_else(|| { + self.runtime_error + .clone() + .unwrap_or_else(|| "runtime unavailable".to_string()) + .into() + }) + } + + fn block_on(&self, future: F) -> TestResult + where + F: Future, + { + if tokio::runtime::Handle::try_current().is_ok() { + return Err("nested Tokio runtime detected in memory-budget fixture".into()); + } + Ok(self.runtime()?.block_on(future)) + } + + /// Start the app under test using the supplied budget and timeout config. + pub fn start_app(&mut self, config: BackpressureConfig) -> TestResult { + let Some(fragment_limit) = NonZeroUsize::new(BUFFER_CAPACITY.saturating_mul(16)) else { + return Err("buffer-derived fragment limit should be non-zero".into()); + }; + let fragmentation = FragmentationConfig::for_frame_budget( + BUFFER_CAPACITY, + fragment_limit, + Duration::from_millis(config.timeout_ms), + ) + .ok_or("failed to derive fragmentation config for test fixture")?; + + let per_message = + NonZeroUsize::new(config.per_message).ok_or("per_message must be non-zero")?; + let per_connection = + NonZeroUsize::new(config.per_connection).ok_or("per_connection must be non-zero")?; + let in_flight = NonZeroUsize::new(config.in_flight).ok_or("in_flight must be non-zero")?; + let budgets = MemoryBudgets::new( + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), + ); + + let (tx, rx) = mpsc::unbounded_channel::>(); + let handler: Handler = std::sync::Arc::new(move |envelope: &Envelope| { + let tx = tx.clone(); + let payload = envelope.payload_bytes().to_vec(); + Box::pin(async move { + let _ = tx.send(payload); + }) + }); + + let app: WireframeApp = WireframeApp::new()? + .buffer_capacity(BUFFER_CAPACITY) + .fragmentation(Some(fragmentation)) + .with_message_assembler(TestAssembler) + .memory_budgets(budgets) + .route(ROUTE_ID, handler)?; + + let codec = app.length_codec(); + let (client_stream, server_stream) = tokio::io::duplex(2048); + let client = Framed::new(client_stream, codec); + + self.block_on(async { tokio::time::pause() })?; + let server = self + .runtime()? + .spawn(async move { app.handle_connection_result(server_stream).await }); + + self.client = Some(client); + self.server = Some(server); + self.observed_rx = Some(rx); + self.observed_payloads.clear(); + self.last_send_error = None; + Ok(()) + } + + /// Send a non-final first frame for the provided message key. + pub fn send_first_frame(&mut self, key: u64, body: &str) -> TestResult { + let payload = test_helpers::first_frame_payload(key, body.as_bytes(), false, None)?; + self.send_payload(payload) + } + + /// Send a final continuation frame for the provided message key. + pub fn send_final_continuation_frame( + &mut self, + key: u64, + sequence: u32, + body: &str, + ) -> TestResult { + let payload = + test_helpers::continuation_frame_payload(key, sequence, body.as_bytes(), true)?; + self.send_payload(payload) + } + + /// Assert that no payload has been emitted yet. + pub fn assert_no_payload_ready(&mut self) -> TestResult { + self.spin_runtime()?; + self.drain_ready_payloads()?; + if self.observed_payloads.is_empty() { + return Ok(()); + } + Err(format!( + "expected no payload to be ready, observed={:?}", + self.observed_payloads + ) + .into()) + } + + /// Advance Tokio virtual time by the supplied duration in milliseconds. + pub fn advance_millis(&mut self, millis: u64) -> TestResult { + self.block_on(async { tokio::time::advance(Duration::from_millis(millis)).await })?; + Ok(()) + } + + /// Assert that the expected payload is eventually observed. + pub fn assert_payload_received(&mut self, expected: &str) -> TestResult { + let expected = expected.as_bytes(); + for _ in 0..SPIN_ATTEMPTS { + self.drain_ready_payloads()?; + if self + .observed_payloads + .iter() + .any(|payload| payload.as_slice() == expected) + { + return Ok(()); + } + self.block_on(async { tokio::task::yield_now().await })?; + } + + Err(format!( + "expected payload {:?} not observed; observed={:?}", + expected, self.observed_payloads + ) + .into()) + } + + /// Assert that sending frames into the app did not record any send error. + pub fn assert_no_send_error(&self) -> TestResult { + if self.last_send_error.is_none() { + return Ok(()); + } + Err(format!("unexpected send error: {:?}", self.last_send_error).into()) + } + + fn send_payload(&mut self, payload: Vec) -> TestResult { + let envelope = Envelope::new(ROUTE_ID, CORRELATION_ID, payload); + let serializer = BincodeSerializer; + let frame = serializer.serialize(&envelope)?; + + // Temporarily move the framed client into the async block so send/flush + // can run without holding a mutable borrow of `self`; then restore it. + let mut client = self.client.take().ok_or("client not initialized")?; + let send_result = self.block_on(async { + client.send(frame.into()).await?; + client.flush().await?; + Ok::<(), std::io::Error>(()) + }); + self.client = Some(client); + + match send_result { + Ok(Ok(())) => { + self.last_send_error = None; + Ok(()) + } + Ok(Err(error)) => { + self.last_send_error = Some(error.to_string()); + Err(error.into()) + } + Err(error) => { + self.last_send_error = Some(error.to_string()); + Err(error) + } + } + } + + fn spin_runtime(&self) -> TestResult { + self.block_on(async { + for _ in 0..8 { + tokio::task::yield_now().await; + } + })?; + Ok(()) + } + + fn drain_ready_payloads(&mut self) -> TestResult { + let mut observed_rx = self.observed_rx.take().ok_or("receiver not initialized")?; + while let Ok(payload) = observed_rx.try_recv() { + self.observed_payloads.push(payload); + } + self.observed_rx = Some(observed_rx); + Ok(()) + } +} diff --git a/tests/fixtures/memory_budget_backpressure_tests.rs b/tests/fixtures/memory_budget_backpressure_tests.rs new file mode 100644 index 00000000..c68d2aea --- /dev/null +++ b/tests/fixtures/memory_budget_backpressure_tests.rs @@ -0,0 +1,68 @@ +//! Unit tests for `BackpressureConfig` parsing. + +use std::str::FromStr; + +use super::memory_budget_backpressure::BackpressureConfig; + +#[test] +fn parses_valid_backpressure_config() { + let config = BackpressureConfig::from_str("1000/10/20/30").expect("valid config should parse"); + + assert_eq!(config.timeout_ms, 1000); + assert_eq!(config.per_message, 10); + assert_eq!(config.per_connection, 20); + assert_eq!(config.in_flight, 30); +} + +#[test] +fn fails_when_missing_fields() { + let missing_in_flight = + BackpressureConfig::from_str("1000/10/20").expect_err("missing in_flight should fail"); + assert_eq!(missing_in_flight, "missing in_flight"); + + let missing_per_message = + BackpressureConfig::from_str("1000").expect_err("missing per_message should fail"); + assert_eq!(missing_per_message, "missing per_message"); + + let missing_timeout = + BackpressureConfig::from_str("").expect_err("missing timeout_ms should fail"); + assert_eq!(missing_timeout, "missing timeout_ms"); +} + +#[test] +fn fails_with_clear_error_on_non_numeric_segments() { + let timeout_error = BackpressureConfig::from_str("not-a-number/10/20/30") + .expect_err("non-numeric timeout_ms should fail"); + assert!( + timeout_error.starts_with("timeout_ms:"), + "unexpected error message: {timeout_error}" + ); + + let per_message_error = BackpressureConfig::from_str("1000/not-a-number/20/30") + .expect_err("non-numeric per_message should fail"); + assert!( + per_message_error.starts_with("per_message:"), + "unexpected error message: {per_message_error}" + ); + + let per_connection_error = BackpressureConfig::from_str("1000/10/not-a-number/30") + .expect_err("non-numeric per_connection should fail"); + assert!( + per_connection_error.starts_with("per_connection:"), + "unexpected error message: {per_connection_error}" + ); + + let in_flight_error = BackpressureConfig::from_str("1000/10/20/not-a-number") + .expect_err("non-numeric in_flight should fail"); + assert!( + in_flight_error.starts_with("in_flight:"), + "unexpected error message: {in_flight_error}" + ); +} + +#[test] +fn fails_when_segments_are_present_after_in_flight() { + let trailing_segments = BackpressureConfig::from_str("1000/10/20/30/40") + .expect_err("trailing segments should fail"); + assert_eq!(trailing_segments, "unexpected trailing segments"); +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index be7b89a3..c8d1e330 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -15,6 +15,8 @@ pub mod codec_stateful; pub mod correlation; pub mod fragment; pub mod interleaved_push_queues; +pub mod memory_budget_backpressure; +mod memory_budget_backpressure_tests; pub mod memory_budgets; pub mod message_assembler; pub mod message_assembly; diff --git a/tests/interleaved_push_queues.rs b/tests/interleaved_push_queues.rs index a0416940..05b98861 100644 --- a/tests/interleaved_push_queues.rs +++ b/tests/interleaved_push_queues.rs @@ -117,7 +117,7 @@ async fn rate_limit_symmetric_mixed() -> TestResult { .build()?; // High push consumes the single token. - push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(1)); // Low push should now be blocked. let mut blocked = handle.push_low_priority(2).boxed(); @@ -128,7 +128,7 @@ async fn rate_limit_symmetric_mixed() -> TestResult { ); time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_low_priority(3)); + let _ = push_expect!(handle.push_low_priority(3)); let (_, a) = queues.recv().await.ok_or("recv 1 failed")?; let (_, b) = queues.recv().await.ok_or("recv 2 failed")?; @@ -153,10 +153,10 @@ async fn interleaved_fairness_yields_at_threshold() -> TestResult { // Preload: 6 high, 2 low. |handle| async move { for n in 1..=6 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); }, // Expected: H H H L H H H L |out| { @@ -183,10 +183,10 @@ async fn interleaved_all_frames_delivered() -> TestResult { }, |handle| async move { for n in 1..=5 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } for n in 101..=105 { - push_expect!(handle.push_low_priority(n)); + let _ = push_expect!(handle.push_low_priority(n)); } }, |out| { @@ -238,14 +238,14 @@ async fn interleaved_time_slice_fairness(shutdown_token: CancellationToken) -> T Ok::<_, String>(out) }); - push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(1)); time::advance(Duration::from_millis(5)).await; - push_expect!(handle.push_high_priority(2)); + let _ = push_expect!(handle.push_high_priority(2)); // Advance past the time slice so the next high frame triggers yield. time::advance(Duration::from_millis(15)).await; - push_expect!(handle.push_low_priority(42)); + let _ = push_expect!(handle.push_low_priority(42)); for n in 3..=5 { - push_expect!(handle.push_high_priority(n)); + let _ = push_expect!(handle.push_high_priority(n)); } drop(handle); @@ -278,10 +278,10 @@ async fn rate_limit_interleaved_total_throughput() -> TestResult { .build()?; // First 4 pushes (mix of priorities) should succeed immediately. - push_expect!(handle.push_high_priority(1)); - push_expect!(handle.push_low_priority(2)); - push_expect!(handle.push_high_priority(3)); - push_expect!(handle.push_low_priority(4)); + let _ = push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_low_priority(2)); + let _ = push_expect!(handle.push_high_priority(3)); + let _ = push_expect!(handle.push_low_priority(4)); // The 5th push (either priority) should block. let mut blocked = handle.push_high_priority(5).boxed(); @@ -292,8 +292,8 @@ async fn rate_limit_interleaved_total_throughput() -> TestResult { ); time::advance(Duration::from_secs(1)).await; - push_expect!(handle.push_high_priority(6)); - push_expect!(handle.push_low_priority(7)); + let _ = push_expect!(handle.push_high_priority(6)); + let _ = push_expect!(handle.push_low_priority(7)); // Drain all frames to verify delivery. let mut out = Vec::new(); @@ -318,11 +318,11 @@ async fn fairness_disabled_strict_priority() -> TestResult { time_slice: None, }, |handle| async move { - push_expect!(handle.push_low_priority(101)); - push_expect!(handle.push_low_priority(102)); - push_expect!(handle.push_high_priority(1)); - push_expect!(handle.push_high_priority(2)); - push_expect!(handle.push_high_priority(3)); + let _ = push_expect!(handle.push_low_priority(101)); + let _ = push_expect!(handle.push_low_priority(102)); + let _ = push_expect!(handle.push_high_priority(1)); + let _ = push_expect!(handle.push_high_priority(2)); + let _ = push_expect!(handle.push_high_priority(3)); }, |out| { assert_eq!( diff --git a/tests/scenarios/memory_budget_backpressure_scenarios.rs b/tests/scenarios/memory_budget_backpressure_scenarios.rs new file mode 100644 index 00000000..5579fd69 --- /dev/null +++ b/tests/scenarios/memory_budget_backpressure_scenarios.rs @@ -0,0 +1,25 @@ +//! Scenario test functions for memory budget back-pressure. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::memory_budget_backpressure::*; + +#[scenario( + path = "tests/features/memory_budget_backpressure.feature", + name = "Soft pressure delays completion until virtual time advances" +)] +fn soft_pressure_delays_completion( + memory_budget_backpressure_world: MemoryBudgetBackpressureWorld, +) { + drop(memory_budget_backpressure_world); +} + +#[scenario( + path = "tests/features/memory_budget_backpressure.feature", + name = "Reads continue without delay when pressure is low" +)] +fn low_pressure_continues_without_delay( + memory_budget_backpressure_world: MemoryBudgetBackpressureWorld, +) { + drop(memory_budget_backpressure_world); +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index 688c58d7..7d61087a 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -19,6 +19,7 @@ mod codec_stateful_scenarios; mod correlation_scenarios; mod fragment_scenarios; mod interleaved_push_queues_scenarios; +mod memory_budget_backpressure_scenarios; mod memory_budgets_scenarios; mod message_assembler_scenarios; mod message_assembly_inbound_scenarios; diff --git a/tests/steps/memory_budget_backpressure_steps.rs b/tests/steps/memory_budget_backpressure_steps.rs new file mode 100644 index 00000000..853bb6ab --- /dev/null +++ b/tests/steps/memory_budget_backpressure_steps.rs @@ -0,0 +1,69 @@ +//! Step definitions for soft-limit memory budget back-pressure scenarios. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::memory_budget_backpressure::{ + BackpressureConfig, + MemoryBudgetBackpressureWorld, + TestResult, +}; + +#[given("a back-pressure inbound app configured as {config}")] +fn given_backpressure_app( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + config: BackpressureConfig, +) -> TestResult { + memory_budget_backpressure_world.start_app(config) +} + +#[when("a budgeted first frame for key {key:u64} with body {body:string} arrives")] +fn when_first_frame_arrives( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + key: u64, + body: String, +) -> TestResult { + memory_budget_backpressure_world.send_first_frame(key, &body) +} + +#[when( + "a budgeted final continuation frame for key {key:u64} sequence {sequence:u32} with body \ + {body:string} arrives" +)] +fn when_final_continuation_arrives( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + key: u64, + sequence: u32, + body: String, +) -> TestResult { + memory_budget_backpressure_world.send_final_continuation_frame(key, sequence, &body) +} + +#[then("no budgeted payload is available before virtual time advances")] +fn then_no_payload_available( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, +) -> TestResult { + memory_budget_backpressure_world.assert_no_payload_ready() +} + +#[when("budgeted virtual time advances by {millis:u64} milliseconds")] +fn when_virtual_time_advances( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + millis: u64, +) -> TestResult { + memory_budget_backpressure_world.advance_millis(millis) +} + +#[then("budgeted payload {expected:string} is eventually received")] +fn then_payload_eventually_received( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, + expected: String, +) -> TestResult { + memory_budget_backpressure_world.assert_payload_received(&expected) +} + +#[then("no budgeted send error is recorded")] +fn then_no_send_error( + memory_budget_backpressure_world: &mut MemoryBudgetBackpressureWorld, +) -> TestResult { + memory_budget_backpressure_world.assert_no_send_error() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 92c6868d..4c1b169c 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -15,6 +15,7 @@ mod codec_stateful_steps; mod correlation_steps; mod fragment_steps; mod interleaved_push_queues_steps; +mod memory_budget_backpressure_steps; mod memory_budgets_steps; mod message_assembler_steps; mod message_assembly_inbound_steps;