diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 4d917181..73d54db0 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -350,6 +350,29 @@ Precedence is: total buffered bytes from exceeding the limit. The hard cap catches the edge case where this invariant is violated. +#### Implementation decisions (2026-02-27) + +- Roadmap item `8.3.5` implements derived default memory budgets. When + `WireframeApp::memory_budgets(...)` is not called, sensible defaults are + derived at connection time from `codec.max_frame_length()` (the frame budget + set via `buffer_capacity()`). +- Multipliers applied to `frame_budget`: + - `bytes_per_message = frame_budget × 16` (aligned with fragmentation's + `DEFAULT_MESSAGE_SIZE_MULTIPLIER`). + - `bytes_per_connection = frame_budget × 64` (allows four concurrent + message assemblies at maximum message size). + - `bytes_in_flight = frame_budget × 64` (same as per-connection; the + aggregate cap is a single value). +- Derivation is lazy — budgets are computed at runtime in `process_stream()` + via `resolve_effective_budgets()`, not stored on the builder. Changing + `buffer_capacity` or swapping codecs adjusts derived defaults automatically. +- `default_memory_budgets()` lives in `src/app/builder_defaults.rs` alongside + `default_fragmentation()`, following the same multiplier-from-frame-budget + pattern. `resolve_effective_budgets()` lives in + `src/app/frame_handling/backpressure.rs`, co-located with budget evaluation. +- Explicit budgets always take precedence over derived defaults (ADR-002 + precedence rule: explicit > global caps > derived defaults). + #### Budget enforcement - Budgets MUST cover: bytes buffered per message, bytes buffered per diff --git a/docs/execplans/8-3-5-Define derived defaults based on buffer_capacity.md b/docs/execplans/8-3-5-Define derived defaults based on buffer_capacity.md new file mode 100644 index 00000000..ddd80ffd --- /dev/null +++ b/docs/execplans/8-3-5-Define derived defaults based on buffer_capacity.md @@ -0,0 +1,727 @@ +# Define derived memory budget defaults from `buffer_capacity` (8.3.5) + +This ExecPlan (execution plan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: COMPLETE + +## Purpose / big picture + +Roadmap item `8.3.5` closes the last gap in the three-tier per-connection +memory budget protection model. Today, when a user does not call +`.memory_budgets(...)` on `WireframeApp`, all three budget tiers are completely +disabled: + +- **Per-frame enforcement** (8.3.2) -- no per-message, per-connection, or + in-flight budget checks. +- **Soft-limit pacing** (8.3.3) -- no read throttling under memory pressure. +- **Hard-cap abort** (8.3.4) -- no connection termination on budget breach. + +The `memory_budgets` field remains `None`, and every check short-circuits to a +no-op. An application using fragmentation and message assembly but without +explicit budgets has zero memory protection on inbound assembly paths. + +After this change, sensible memory budgets are derived automatically from +`buffer_capacity` (the codec's `max_frame_length()`) whenever the user has not +set explicit budgets. The derivation follows the same pattern already used for +fragmentation defaults in `src/app/builder_defaults.rs`. Derived budgets are +computed lazily at connection time in the inbound handler, not stored on the +builder. Changing `buffer_capacity` automatically adjusts derived budgets; +explicit `.memory_budgets(...)` always takes precedence. + +Success is observable when: + +- An application using `WireframeApp::new()` with default settings (no + `.memory_budgets(...)` call) has memory budget enforcement active, with + budgets derived from the 1024-byte default frame length. +- Changing `buffer_capacity` changes the derived budgets proportionally. +- Explicitly calling `.memory_budgets(...)` overrides the derived defaults. +- All three tiers (per-frame enforcement, soft-limit pacing, hard-cap abort) + function with derived budgets. +- `rstest` unit tests validate the `default_memory_budgets()` derivation + function and the `resolve_effective_budgets()` integration helper. +- `rstest-bdd` v0.5.0 scenarios validate that the three-tier protection model + activates with derived budgets and that explicit budgets override them. +- ADR-002 records the implementation decisions. +- The users guide explains that budgets are derived automatically. +- `docs/roadmap.md` marks `8.3.5` as complete. + +## Constraints + +- Scope is strictly roadmap item `8.3.5`: derived default budgets from + `buffer_capacity`. +- Do not regress budget enforcement (8.3.2), soft-limit pacing (8.3.3), or + hard-cap abort (8.3.4) semantics. +- Preserve runtime behaviour when `memory_budgets` IS explicitly configured. + Explicit budgets must always take precedence over derived defaults, per the + ADR-002 precedence rule (explicit > global caps > derived defaults). +- Keep public API changes to zero. The derivation is an internal mechanism. If + a public API change is required, stop and escalate. +- Do not add new external dependencies. +- Keep all modified or new source files at or below the 400-line repository + guidance. +- `src/app/inbound_handler.rs` is currently 392 lines. Changes must keep it at + or below 400. +- Use en-GB-oxendict spelling in all comments and documentation. +- Follow the behaviour-driven development (BDD) 4-file pattern (feature, + fixture, steps, scenarios) with globally unique step text using a + `derived-budget` prefix. +- Validate with `rstest` unit tests and `rstest-bdd` v0.5.0 behavioural tests. +- Follow testing guidance from `docs/rust-testing-with-rstest-fixtures.md`, + `docs/reliable-testing-in-rust-via-dependency-injection.md`, and + `docs/rstest-bdd-users-guide.md`. +- Record implementation decisions in ADR-002. +- Update `docs/users-guide.md` for any consumer-visible behavioural change. +- Mark roadmap item `8.3.5` done only after all quality gates pass. + +## Tolerances (exception triggers) + +- Scope: if implementation requires changes to more than 16 files or more than + 600 net lines of code, stop and escalate. +- Interface: if a new public builder method, changed public function signature, + or new public type is required, stop and escalate. +- Dependencies: if any new crate is required, stop and escalate. +- Semantics ambiguity: if the derived multipliers produce budgets that conflict + with fragmentation defaults in a way that breaks existing tests, stop and + present options with trade-offs. +- Iterations: if the same failing gate persists after 3 focused attempts, stop + and escalate. +- Time: if any single stage exceeds 4 hours elapsed effort, stop and escalate. +- File length: if `src/app/inbound_handler.rs` would exceed 400 lines, extract + additional logic into `src/app/frame_handling/` helpers before continuing. +- File length: if `src/app/builder_defaults.rs` would exceed 400 lines, stop + and restructure. + +## Risks + +- Risk: `src/app/inbound_handler.rs` is currently 392 lines (8 lines from the + 400-line cap). Wiring in derived-budget resolution must fit within this + headroom. Severity: high. Likelihood: high. Mitigation: resolve effective + budgets via a one-line call to a new `resolve_effective_budgets()` helper in + `backpressure.rs`, then replace two inline `self.memory_budgets` references + with `Some(effective_budgets)`. Net change is approximately +2 lines (392 to + 394). The helper lives in `backpressure.rs` (131 lines, ample room). + +- Risk: BDD step-text collisions with existing `memory_budgets_steps.rs`, + `memory_budget_backpressure_steps.rs`, or `memory_budget_hard_cap_steps.rs`. + Severity: medium. Likelihood: medium. Mitigation: all new step phrases use a + `derived-budget` prefix (e.g., "a derived-budget app with buffer capacity + 512"). + +- Risk: the multiplier choice for `bytes_per_connection` and `bytes_in_flight` + may be too generous or too restrictive for certain workloads. Severity: low. + Likelihood: low. Mitigation: choose multipliers that align with fragmentation + defaults (which use `16x` for max message size). Document the rationale and + note that explicit `.memory_budgets(...)` is available for tuning. + +- Risk: `pub(super)` visibility on `default_memory_budgets()` may not be + reachable from `frame_handling::backpressure`. Severity: low. Likelihood: + low. Mitigation: confirmed that the existing `default_fragmentation()` + function uses `pub(super)` and is already imported from + `frame_handling::assembly.rs` via `crate::app::builder_defaults::`. The same + access path works for the new function. + +## Progress + +- [x] Drafted ExecPlan for roadmap item `8.3.5`. +- [x] Stage A: add `default_memory_budgets()` and + `resolve_effective_budgets()` with unit tests. +- [x] Stage B: integrate derived defaults into inbound read path. +- [x] Stage C: add behavioural tests (`rstest-bdd` v0.5.0). +- [x] Stage D: documentation and roadmap updates. +- [x] Stage E: quality gates and final verification. + +## Surprises & discoveries + +- BDD scenario 1 (derived budgets enforce per-connection limit) required + careful frame-count tuning. With `buffer_capacity=512`, derived + `per_connection=32768`. Sending frames with 400-byte bodies means each frame + consumes ~400 bytes of budget. The connection budget is breached around frame + 82, but the `DeserFailureTracker` requires 10 consecutive failures before + closing the connection. This means 95 frames are needed (frames 82-91 produce + 10 failures, triggering abort). Initial attempts with 20 and 90 frames caused + test hangs. + +- `tokio::time::pause()` is required in the BDD fixture even though no + explicit time assertions are made. The soft-limit pacing tier inserts a + `sleep(5ms)` when pressure is detected. With paused time, tokio's + auto-advance fires instantly when all tasks await timers (during + `block_on(server)`). Without paused time, the real 5ms sleeps accumulate and + combine with the read-timeout loop to cause hangs. + +- `inbound_handler.rs` grew from 392 to 396 lines (plan estimated 394). + The `resolve_effective_budgets` call plus its import added 4 net lines rather + than 2, but the file remains within the 400-line cap. + +## Decision log + +- Decision: place `default_memory_budgets()` in `src/app/builder_defaults.rs` + alongside `default_fragmentation()`. Rationale: the two functions are + structural twins -- both derive configuration from `frame_budget` using a + multiplier pattern. Co-locating them makes the parallel intent visible. + `builder_defaults.rs` is currently 22 lines, leaving ample headroom. + Date/Author: 2026-02-26 / plan phase. + +- Decision: resolve effective budgets once in `process_stream` (Option B from + design analysis) rather than duplicating derivation across + `new_message_assembly_state()` and `evaluate_memory_pressure()`. Rationale: + avoids dual derivation, keeps resolution explicit in one place. The + `let effective_budgets = ...` binding replaces two raw `self.memory_budgets` + references. Net line impact on `inbound_handler.rs` is approximately +2 lines + (392 to 394). Date/Author: 2026-02-26 / plan phase. + +- Decision: introduce `resolve_effective_budgets()` as a thin helper in + `src/app/frame_handling/backpressure.rs` rather than inlining the + `Option::unwrap_or_else` in `process_stream`. Rationale: keeps + `inbound_handler.rs` under the line cap and co-locates budget resolution with + budget evaluation. `backpressure.rs` is at 131 lines with ample room. + Date/Author: 2026-02-26 / plan phase. + +- Decision: `default_memory_budgets()` returns `MemoryBudgets` (not + `Option`). Rationale: `clamp_frame_length()` guarantees + `frame_budget >= 64`, so multiplied values are always non-zero. Returning a + non-optional type makes the function's infallibility clear. Date/Author: + 2026-02-26 / plan phase. + +- Decision: use the following multipliers for derived defaults: + - `bytes_per_message` = `frame_budget * 16` (aligned with fragmentation's + `DEFAULT_MESSAGE_SIZE_MULTIPLIER`). + - `bytes_per_connection` = `frame_budget * 64` (allows 4 concurrent message + assemblies at max message size). + - `bytes_in_flight` = `frame_budget * 64` (same as per-connection; the + aggregate cap is a single value). + Rationale: `bytes_per_message` matches fragmentation's `max_message_size` so + the two guards agree. `bytes_per_connection` at 64x provides headroom for + multiple concurrent assemblies without being so large as to be meaningless. + Setting `bytes_in_flight` equal to `bytes_per_connection` simplifies the + mental model. For the default 1024-byte frame, this yields: per_message = 16 + KiB (kibibytes), per_connection = 64 KiB, in_flight = 64 KiB. Date/Author: + 2026-02-26 / plan phase. + +- Decision: `buffer_capacity()` and `with_codec()` do NOT need to clear + derived defaults because derivation is lazy (computed at runtime from + `codec.max_frame_length()`), not stored on the builder. Rationale: mirrors + how `default_fragmentation()` works -- it is called at runtime in + `new_message_assembly_state()` when the stored `fragmentation` is `None`. + Changing the codec automatically changes the input to the derivation + function. Date/Author: 2026-02-26 / plan phase. + +## Outcomes & retrospective + +All acceptance criteria met. All quality gates green. + +- `default_memory_budgets()` added to `src/app/builder_defaults.rs` (grew + from 22 to 129 lines including tests). +- `resolve_effective_budgets()` added to + `src/app/frame_handling/backpressure.rs` (grew from 131 to 145 lines). +- `inbound_handler.rs` wired with derived defaults (grew from 392 to 396 + lines, within 400-line cap). +- 3 BDD scenarios pass: derived enforcement, within-limits delivery, and + explicit override. +- 8 unit tests pass: 5 for `default_memory_budgets()`, 3 for + `resolve_effective_budgets()`. +- ADR-002 updated with 8.3.5 implementation decisions. +- Users guide expanded with "Derived budget defaults" section. +- Roadmap item 8.3.5 marked complete. +- No regressions: full test suite (371 unit + 135 BDD) passes. + +Retrospective: + +- The frame-count tuning for the BDD enforcement scenario was the main + implementation challenge. The interaction between derived budget thresholds, + the `DeserFailureTracker` 10-failure limit, and `tokio::time::pause()` + auto-advance required three iterations to get right. Documenting the math in + the surprises section will help future tests. +- The plan estimated +2 lines for `inbound_handler.rs` but actual was +4. + The discrepancy came from the `use` import line and a blank line for + readability. Still well within the 400-line cap. +- The lazy derivation decision proved correct: no builder changes were needed + for `buffer_capacity()` or `with_codec()`, matching the + `default_fragmentation()` pattern exactly. + +## Context and orientation + +### Memory budget types + +`src/app/memory_budgets.rs` (101 lines) defines `MemoryBudgets`, a `Copy` +struct with three `BudgetBytes(NonZeroUsize)` fields: + +- `message_budget` (accessor: `bytes_per_message()`) -- max bytes per + single logical message. +- `connection_window` (accessor: `bytes_per_connection()`) -- max bytes + buffered across all assemblies on one connection. +- `assembly_bytes` (accessor: `bytes_in_flight()`) -- max bytes across + in-flight assemblies. + +`BudgetBytes` wraps `NonZeroUsize` and provides `new()`, `get()`, `as_usize()`, +plus `From` conversions. + +### Builder field and explicit configuration + +`src/app/builder/core.rs` stores `memory_budgets: Option` on +`WireframeApp`. Default is `None`. The builder method +`WireframeApp::memory_budgets(budgets)` in `src/app/builder/config.rs` sets it +to `Some(budgets)`. + +Type-changing operations (`with_codec()` in `src/app/builder/codec.rs`, +`serializer()`) preserve the `memory_budgets` field via `RebuildParams`. + +### `buffer_capacity` and frame budget + +`buffer_capacity(capacity)` in `src/app/builder/codec.rs` (lines 76-81) clamps +the value to `[64, 16 MiB]` via `clamp_frame_length()`, creates a new +`LengthDelimitedFrameCodec`, and clears `fragmentation` to `None`. + +`LengthDelimitedFrameCodec::default()` uses `max_frame_length: 1024`. + +### Derivation template: `default_fragmentation()` + +`src/app/builder_defaults.rs` (22 lines) contains `default_fragmentation()`, +which takes `frame_budget: usize`, clamps it, multiplies by +`DEFAULT_MESSAGE_SIZE_MULTIPLIER` (16) to get `max_message_size`, and builds a +`FragmentationConfig`. This is the exact pattern `8.3.5` follows. + +### Inbound read path + +`src/app/inbound_handler.rs` (392 lines) contains +`WireframeApp::process_stream()`. Two call sites use `self.memory_budgets`: + +1. Line 270-276: `frame_handling::new_message_assembly_state(self.fragmentation, + requested_frame_length, + self.memory_budgets)` -- creates `MessageAssemblyState`. When budgets is ` + None`, no aggregate enforcement occurs. + +2. Line 281-284: `frame_handling::evaluate_memory_pressure( + message_assembly.as_ref(), + self.memory_budgets)` -- evaluates pressure. When budgets is `None + `, returns `Continue` unconditionally. + +Both call sites pass the raw `Option`. Task 8.3.5 resolves this +to an always-`Some` value by deriving defaults when the field is `None`. + +### Assembly module + +`src/app/frame_handling/assembly.rs` (289 lines) contains +`new_message_assembly_state()` which threads budgets into +`MessageAssemblyState::with_budgets(...)`. When `memory_budgets` is `None`, it +falls through to `MessageAssemblyState::new()` which internally calls +`with_budgets(..., None, None)` -- disabling all aggregate budget enforcement. + +### Backpressure module + +`src/app/frame_handling/backpressure.rs` (131 lines) contains +`evaluate_memory_pressure()` and `apply_memory_pressure()`. Both short-circuit +on `None` budgets. The `active_aggregate_limit_bytes()` helper computes +`min(bytes_per_connection, bytes_in_flight)`. + +### Re-exports + +`src/app/frame_handling/mod.rs` (30 lines) re-exports `pub(crate)` items from +`backpressure` and `assembly`. Adding `resolve_effective_budgets` to the +re-export list requires one additional line. + +### BDD test infrastructure + +The project uses a 4-file BDD pattern with `rstest-bdd` v0.5.0: + +1. Feature file: `tests/features/.feature` (Gherkin syntax). +2. Fixture: `tests/fixtures/.rs` (world struct + helpers). +3. Steps: `tests/steps/_steps.rs` (`given`/`when`/`then` macros). +4. Scenarios: `tests/scenarios/_scenarios.rs` (bindings). + +Module registrations: `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, +`tests/scenarios/mod.rs`. Step text must be globally unique across all step +files. The fixture parameter name in step functions must match the fixture +function name exactly. + +Reference implementation: `tests/fixtures/memory_budget_hard_cap.rs` (345 +lines) demonstrates the world struct pattern with `tokio::runtime::Runtime`, +`tokio::io::duplex`, frame sending helpers, and assertion methods. + +## Plan of work + +### Stage A: add `default_memory_budgets()` and `resolve_effective_budgets()` with unit tests + +**A1.** Add `default_memory_budgets()` to `src/app/builder_defaults.rs`. + +Below the existing `default_fragmentation()` function, add three constants and +one function: + +```rust +const DEFAULT_MESSAGE_BUDGET_MULTIPLIER: usize = 16; +const DEFAULT_CONNECTION_BUDGET_MULTIPLIER: usize = 64; +const DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER: usize = 64; + +pub(super) fn default_memory_budgets(frame_budget: usize) -> MemoryBudgets { + let frame_budget = clamp_frame_length(frame_budget); + let per_message = NonZeroUsize::new( + frame_budget.saturating_mul(DEFAULT_MESSAGE_BUDGET_MULTIPLIER), + ) + .unwrap_or(NonZeroUsize::MIN); + let per_connection = NonZeroUsize::new( + frame_budget.saturating_mul(DEFAULT_CONNECTION_BUDGET_MULTIPLIER), + ) + .unwrap_or(NonZeroUsize::MIN); + let in_flight = NonZeroUsize::new( + frame_budget.saturating_mul(DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER), + ) + .unwrap_or(NonZeroUsize::MIN); + MemoryBudgets::new( + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), + ) +} +``` + +Add imports for `MemoryBudgets` and `BudgetBytes` from +`crate::app::memory_budgets`. The `unwrap_or(NonZeroUsize::MIN)` fallback +avoids the forbidden `.unwrap()` lint while being effectively unreachable since +`clamp_frame_length` guarantees `>= 64`. + +The file grows from 22 to approximately 55 lines (including a +`#[cfg(test)] mod tests` block). + +**A2.** Add unit tests for `default_memory_budgets()` in a +`#[cfg(test)] mod tests` block at the bottom of `src/app/builder_defaults.rs`: + +1. `default_budgets_use_expected_multipliers` -- with the default 1024-byte + frame: asserts `bytes_per_message == 16_384`, + `bytes_per_connection == 65_536`, `bytes_in_flight == 65_536`. +2. `default_budgets_scale_with_frame_budget` -- with a 4096-byte frame: + asserts proportional scaling. +3. `default_budgets_clamp_minimum_frame_budget` -- with a 10-byte input + (below `MIN_FRAME_LENGTH` of 64): asserts values are based on clamped 64. +4. `default_budgets_clamp_maximum_frame_budget` -- with a value above + `MAX_FRAME_LENGTH`: asserts values are based on clamped 16 MiB. +5. `default_budgets_message_budget_aligns_with_fragmentation` -- confirms + that `bytes_per_message` equals + `frame_budget * DEFAULT_MESSAGE_SIZE_MULTIPLIER`, the same multiplier + fragmentation uses. + +**A3.** Add `resolve_effective_budgets()` to +`src/app/frame_handling/backpressure.rs`: + +```rust +/// Resolve the effective memory budgets for one connection. +/// +/// Returns the explicit budgets if configured, or derives sensible +/// defaults from `frame_budget` using the same multiplier pattern as +/// fragmentation defaults. +#[must_use] +pub(crate) fn resolve_effective_budgets( + explicit: Option, + frame_budget: usize, +) -> MemoryBudgets { + explicit.unwrap_or_else(|| default_memory_budgets(frame_budget)) +} +``` + +Add import for `default_memory_budgets` from `crate::app::builder_defaults`. +The file grows from 131 to approximately 145 lines. + +**A4.** Add `resolve_effective_budgets` to the `pub(crate) use` re-export in +`src/app/frame_handling/mod.rs`. + +**A5.** Add unit tests for `resolve_effective_budgets()` in +`src/app/frame_handling/backpressure_tests.rs`: + +1. `resolve_returns_explicit_budgets_when_configured` -- given + `Some(budgets)`, returns those exact budgets regardless of `frame_budget`. +2. `resolve_returns_derived_budgets_when_none` -- given `None` and + `frame_budget=1024`, returns the expected derived values. +3. `resolve_derived_budgets_change_with_frame_budget` -- given `None` and + different `frame_budget` values, returns proportionally different budgets. + +Go/no-go: run `cargo test --lib builder_defaults` and +`cargo test --lib frame_handling`. All tests pass. + +### Stage B: integrate derived defaults into inbound read path + +**B1.** In `src/app/inbound_handler.rs`, in `process_stream()`, after +`let requested_frame_length = codec.max_frame_length();` (line 260), add: + +```rust +let effective_budgets = frame_handling::resolve_effective_budgets( + self.memory_budgets, + requested_frame_length, +); +``` + +**B2.** Change line 274 from `self.memory_budgets,` to +`Some(effective_budgets),`. + +**B3.** Change line 283 from `self.memory_budgets,` to +`Some(effective_budgets),`. + +Net change: +2 lines (one `let` binding added, two in-place substitutions). +File grows from 392 to approximately 394 lines. + +Go/no-go: run `cargo test --lib` and `make lint`. All pass. Verify +`wc -l src/app/inbound_handler.rs` is 400 or less. + +### Stage C: add behavioural tests (`rstest-bdd` v0.5.0) + +**C1.** Feature file: `tests/features/derived_memory_budgets.feature` + +Three scenarios with `derived-budget` step prefix: + +1. **Derived budgets enforce per-frame limits** -- send enough frames to a + default-budget app (no explicit `.memory_budgets(...)`) to trigger the + hard-cap abort. Assert connection terminates with an error. +2. **Derived budgets allow frames within limits** -- send frames within the + derived budget to a default-budget app. Assert payload delivery and no + connection error. +3. **Explicit budgets override derived defaults** -- configure tight explicit + budgets on an app that also has a `buffer_capacity`. Send frames that exceed + the explicit budget (but would be within derived defaults). Assert + connection terminates. + +**C2.** Fixture: `tests/fixtures/derived_memory_budgets.rs` + +Follow the `memory_budget_hard_cap.rs` pattern. The world struct +(`DerivedMemoryBudgetsWorld`) supports two startup modes: + +- `start_app_derived(buffer_capacity)` -- creates `WireframeApp` with + `.buffer_capacity(capacity)` and `.enable_fragmentation()` but NO + `.memory_budgets(...)` call. Derived budgets activate at runtime. +- `start_app_explicit(buffer_capacity, per_message, per_connection, + in_flight)` -- same but adds `.memory_budgets(…)`. + +Reuse the frame-sending helpers and assertion methods from the hard-cap fixture +pattern. + +**C3.** Steps: `tests/steps/derived_memory_budgets_steps.rs` + +Step definitions with `derived-budget` prefix. The `given` step parses buffer +capacity and optional explicit budgets. The `when` and `then` steps delegate to +world methods. + +**C4.** Scenarios: `tests/scenarios/derived_memory_budgets_scenarios.rs` + +Bind each feature scenario to the fixture. + +**C5.** Register modules in `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, +`tests/scenarios/mod.rs`. + +Go/no-go: run `cargo test --test bdd --all-features derived_memory_budgets`. +All scenarios pass. Then run `make lint` and `make test` for full regression. + +### Stage D: documentation and roadmap updates + +**D1.** ADR-002 +(`docs/adr-002-streaming-requests-and-shared-message-assembly.md`): add a new +implementation decisions block after the existing 8.3.4 entry: + +- Roadmap item `8.3.5` implements derived default memory budgets. +- When `WireframeApp::memory_budgets(...)` is not called, defaults are derived + at connection time from `codec.max_frame_length()`. +- Multipliers: `bytes_per_message = frame_budget * 16` (aligned with + fragmentation), `bytes_per_connection = frame_budget * 64`, + `bytes_in_flight = frame_budget * 64`. +- Derivation is lazy (not stored on the builder). Changing `buffer_capacity` + or swapping codecs adjusts derived defaults automatically. +- Explicit budgets always take precedence (ADR-002 precedence rule 1 > 3). + +**D2.** Users guide (`docs/users-guide.md`): expand the "Per-connection memory +budgets" section to explain: + +- Budgets are derived automatically from `buffer_capacity` when not set + explicitly. +- State the default multipliers and resulting values for the 1024-byte + default (per_message = 16 KiB, per_connection = 64 KiB, in_flight = 64 KiB). +- All three protection tiers are active with derived defaults. +- Calling `.memory_budgets(...)` overrides the derived defaults entirely. +- Changing `buffer_capacity` adjusts derived defaults proportionally. + +**D3.** Roadmap (`docs/roadmap.md`): change the `8.3.5` line from `- [ ]` to +`- [x]`. + +### Stage E: quality gates and final verification + +Run all required gates with `set -o pipefail` and `tee` to log files: + +```shell +set -o pipefail +make fmt 2>&1 | tee /tmp/wireframe-8-3-5-fmt.log +make markdownlint MDLINT=/root/.bun/bin/markdownlint-cli2 \ + 2>&1 | tee /tmp/wireframe-8-3-5-markdownlint.log +make check-fmt 2>&1 | tee /tmp/wireframe-8-3-5-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-8-3-5-lint.log +make test 2>&1 | tee /tmp/wireframe-8-3-5-test.log +make nixie 2>&1 | tee /tmp/wireframe-8-3-5-nixie.log +``` + +No roadmap checkbox update until every gate is green. + +## Concrete steps + +Run from the repository root (`/home/user/project`). + +1. Implement Stage A: `default_memory_budgets()` + + `resolve_effective_budgets()` + unit tests. +2. Run focused unit tests: + +```shell +set -o pipefail +cargo test --lib builder_defaults 2>&1 | tee /tmp/wireframe-8-3-5-unit-a1.log +cargo test --lib frame_handling 2>&1 | tee /tmp/wireframe-8-3-5-unit-a2.log +``` + +1. Implement Stage B: wire into `process_stream`. +2. Run focused tests and lint: + +```shell +set -o pipefail +cargo test --lib 2>&1 | tee /tmp/wireframe-8-3-5-unit-b.log +make lint 2>&1 | tee /tmp/wireframe-8-3-5-lint-b.log +``` + +1. Verify line count: + +```shell +wc -l src/app/inbound_handler.rs +``` + +1. Implement Stage C: BDD tests. +2. Run targeted BDD scenarios: + +```shell +set -o pipefail +cargo test --test bdd --all-features derived_memory_budgets \ + 2>&1 | tee /tmp/wireframe-8-3-5-bdd.log +``` + +1. Implement Stage D: documentation updates. +2. Run full quality gates (Stage E). + +## Validation and acceptance + +Acceptance criteria: + +- Derived-budget behaviour: when `WireframeApp::memory_budgets(...)` is not + called, memory budgets are derived from `buffer_capacity`. All three + protection tiers are active. +- Explicit override: when `.memory_budgets(...)` is called, the explicit values + are used regardless of `buffer_capacity`. +- Multiplier correctness: for the default 1024-byte frame, derived budgets are: + `bytes_per_message` = 16,384, `bytes_per_connection` = 65,536, + `bytes_in_flight` = 65,536. +- Proportional scaling: changing `buffer_capacity` changes derived budgets + proportionally. +- No regressions: all existing budget enforcement (8.3.2), soft-limit (8.3.3), + and hard-cap (8.3.4) tests continue to pass. +- Unit tests (`rstest`) validate `default_memory_budgets()` and + `resolve_effective_budgets()`. +- Behavioural tests (`rstest-bdd` v0.5.0) validate derived-budget activation + and explicit override. +- Design documentation records decisions in ADR-002. +- Users guide explains derived defaults. +- `docs/roadmap.md` marks `8.3.5` done. + +Quality criteria: + +- tests: `make test` passes. +- lint: `make lint` passes with no warnings. +- formatting: `make fmt` and `make check-fmt` pass. +- markdown: `make markdownlint` passes. +- mermaid validation: `make nixie` passes. + +## Idempotence and recovery + +All planned edits are additive and safe to rerun. If a step fails: + +- Preserve local changes. +- Inspect the relevant `/tmp/wireframe-8-3-5-*.log` file. +- Apply the minimal fix. +- Rerun only the failed command first, then downstream gates. + +Avoid destructive git commands. If rollback is required, revert only files +changed for `8.3.5`. + +## Artefacts and notes + +Expected artefacts after completion: + +- Modified: `src/app/builder_defaults.rs` (~55 lines, up from 22). +- Modified: `src/app/frame_handling/backpressure.rs` (~145 lines, up from 131). +- Modified: `src/app/frame_handling/backpressure_tests.rs` (~320 lines, up + from 268). +- Modified: `src/app/frame_handling/mod.rs` (~31 lines, up from 30). +- Modified: `src/app/inbound_handler.rs` (~394 lines, up from 392). +- New: `tests/features/derived_memory_budgets.feature`. +- New: `tests/fixtures/derived_memory_budgets.rs`. +- New: `tests/steps/derived_memory_budgets_steps.rs`. +- New: `tests/scenarios/derived_memory_budgets_scenarios.rs`. +- Modified: `tests/fixtures/mod.rs`. +- Modified: `tests/steps/mod.rs`. +- Modified: `tests/scenarios/mod.rs`. +- Modified: `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +- Modified: `docs/users-guide.md`. +- Modified: `docs/roadmap.md`. +- New: `docs/execplans/8-3-5-Define derived defaults based on + buffer_capacity.md`. +- Gate logs: `/tmp/wireframe-8-3-5-*.log`. + +Total: 5 new files (4 test + 1 execplan), 11 modified files = 16 artefacts (at +tolerance boundary). + +## Interfaces and dependencies + +No new external dependencies are required. + +Internal interfaces expected at the end of this milestone: + +In `src/app/builder_defaults.rs`: + +```rust +/// Multiplier applied to `frame_budget` for the per-message default budget. +/// Aligned with `DEFAULT_MESSAGE_SIZE_MULTIPLIER` used by +/// `default_fragmentation()`. +const DEFAULT_MESSAGE_BUDGET_MULTIPLIER: usize = 16; + +/// Multiplier applied to `frame_budget` for the per-connection default +/// budget. +const DEFAULT_CONNECTION_BUDGET_MULTIPLIER: usize = 64; + +/// Multiplier applied to `frame_budget` for the in-flight default budget. +const DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER: usize = 64; + +/// Derive sensible memory budgets from the codec frame budget. +/// +/// Mirrors the pattern of [`default_fragmentation`], which derives +/// fragmentation settings from the same frame budget. The frame budget is +/// clamped to [`crate::codec::MIN_FRAME_LENGTH`]..= +/// [`crate::codec::MAX_FRAME_LENGTH`] before applying multipliers. +#[must_use] +pub(super) fn default_memory_budgets(frame_budget: usize) -> MemoryBudgets +``` + +In `src/app/frame_handling/backpressure.rs`: + +```rust +/// Resolve the effective memory budgets for one connection. +/// +/// Returns the explicit budgets if configured, or derives sensible +/// defaults from `frame_budget` using the same multiplier pattern as +/// fragmentation defaults. +#[must_use] +pub(crate) fn resolve_effective_budgets( + explicit: Option, + frame_budget: usize, +) -> MemoryBudgets +``` + +In `src/app/inbound_handler.rs::process_stream`: + +- Calls `frame_handling::resolve_effective_budgets(self.memory_budgets, + requested_frame_length)` once and passes `Some(effective_budgets)` to both ` + new_message_assembly_state()` and `evaluate_memory_pressure()`. + +In behavioural tests: + +- A new `rstest-bdd` world (`DerivedMemoryBudgetsWorld`) validates that + derived budgets enable all three protection tiers and that explicit budgets + override derived defaults. diff --git a/docs/roadmap.md b/docs/roadmap.md index b29b9a7b..4d713f82 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -297,7 +297,7 @@ and standardized per-connection memory budgets. - [x] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. - [x] 8.3.4. Implement hard cap (abort early, release partial state, surface `InvalidData`) behaviour. -- [ ] 8.3.5. Define derived defaults based on `buffer_capacity` when budgets +- [x] 8.3.5. Define derived defaults based on `buffer_capacity` when budgets are not set explicitly. - [ ] 8.3.6. Write tests for budget enforcement, back-pressure, and cleanup semantics. diff --git a/docs/users-guide.md b/docs/users-guide.md index 6d3064c3..1a293fd3 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -626,6 +626,24 @@ Wireframe provides a three-tier protection model for inbound memory budgets: `InvalidData`. This is a defence-in-depth safety net; under normal operation, per-frame enforcement prevents this state from being reached. +#### Derived budget defaults + +When `memory_budgets(...)` is not called on the builder, Wireframe derives +sensible defaults automatically from `buffer_capacity` (the codec's +`max_frame_length()`). The derived values use the same multiplier pattern as +fragmentation defaults: + +| Budget field | Multiplier | Default (1024-byte frame) | +| ---------------------- | -------------------- | ------------------------- | +| `bytes_per_message` | `frame_budget × 16` | 16 KiB | +| `bytes_per_connection` | `frame_budget × 64` | 64 KiB | +| `bytes_in_flight` | `frame_budget × 64` | 64 KiB | + +All three protection tiers (per-frame enforcement, soft-limit read pacing, and +hard-cap connection abort) are active with derived defaults. Changing +`buffer_capacity` adjusts derived budgets proportionally. Calling +`.memory_budgets(...)` overrides the derived defaults entirely. + #### Message key multiplexing (8.2.3) The `MessageAssemblyState` type manages multiple concurrent message assemblies diff --git a/src/app/builder_defaults.rs b/src/app/builder_defaults.rs index b1c7c280..4a3681ed 100644 --- a/src/app/builder_defaults.rs +++ b/src/app/builder_defaults.rs @@ -2,7 +2,11 @@ use std::{num::NonZeroUsize, time::Duration}; -use crate::{codec::clamp_frame_length, fragment::FragmentationConfig}; +use crate::{ + app::memory_budgets::{BudgetBytes, MemoryBudgets}, + codec::clamp_frame_length, + fragment::FragmentationConfig, +}; pub(super) const MIN_READ_TIMEOUT_MS: u64 = 1; pub(super) const MAX_READ_TIMEOUT_MS: u64 = 86_400_000; @@ -10,6 +14,9 @@ pub(super) const MAX_READ_TIMEOUT_MS: u64 = 86_400_000; pub(super) const DEFAULT_READ_TIMEOUT_MS: u64 = 100; const DEFAULT_FRAGMENT_TIMEOUT: Duration = Duration::from_secs(30); const DEFAULT_MESSAGE_SIZE_MULTIPLIER: usize = 16; +const DEFAULT_MESSAGE_BUDGET_MULTIPLIER: usize = DEFAULT_MESSAGE_SIZE_MULTIPLIER; +const DEFAULT_CONNECTION_BUDGET_MULTIPLIER: usize = 64; +const DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER: usize = 64; pub(super) fn default_fragmentation(frame_budget: usize) -> Option { let frame_budget = clamp_frame_length(frame_budget); @@ -20,3 +27,108 @@ pub(super) fn default_fragmentation(frame_budget: usize) -> Option= 64`, but it +/// satisfies the `NonZeroUsize` invariant without using the forbidden +/// `.unwrap()` lint. +fn derive_budget(frame_budget: usize, multiplier: usize) -> BudgetBytes { + let bytes = + NonZeroUsize::new(frame_budget.saturating_mul(multiplier)).unwrap_or(NonZeroUsize::MIN); + BudgetBytes::new(bytes) +} + +/// Derive sensible memory budgets from the codec frame budget. +/// +/// Mirrors the pattern of [`default_fragmentation`], which derives +/// fragmentation settings from the same frame budget. The frame budget is +/// clamped to [`crate::codec::MIN_FRAME_LENGTH`]..= +/// [`crate::codec::MAX_FRAME_LENGTH`] before applying multipliers. +/// +/// The per-message multiplier (16) is deliberately aligned with +/// `DEFAULT_MESSAGE_SIZE_MULTIPLIER` used by fragmentation defaults so +/// that the two guards agree on the maximum logical message size. +#[must_use] +pub(super) fn default_memory_budgets(frame_budget: usize) -> MemoryBudgets { + let frame_budget = clamp_frame_length(frame_budget); + MemoryBudgets::new( + derive_budget(frame_budget, DEFAULT_MESSAGE_BUDGET_MULTIPLIER), + derive_budget(frame_budget, DEFAULT_CONNECTION_BUDGET_MULTIPLIER), + derive_budget(frame_budget, DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER), + ) +} + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + use crate::codec::{MAX_FRAME_LENGTH, MIN_FRAME_LENGTH}; + + #[rstest] + #[case(1024)] + #[case(4096)] + fn default_budgets_scale_and_use_expected_multipliers(#[case] frame_budget: usize) { + let budgets = default_memory_budgets(frame_budget); + assert_eq!( + frame_budget * DEFAULT_MESSAGE_BUDGET_MULTIPLIER, + budgets.bytes_per_message().as_usize() + ); + assert_eq!( + frame_budget * DEFAULT_CONNECTION_BUDGET_MULTIPLIER, + budgets.bytes_per_connection().as_usize() + ); + assert_eq!( + frame_budget * DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER, + budgets.bytes_in_flight().as_usize() + ); + } + + #[test] + fn default_budgets_clamp_minimum_frame_budget() { + let budgets = default_memory_budgets(10); + assert_eq!( + MIN_FRAME_LENGTH * DEFAULT_MESSAGE_BUDGET_MULTIPLIER, + budgets.bytes_per_message().as_usize() + ); + assert_eq!( + MIN_FRAME_LENGTH * DEFAULT_CONNECTION_BUDGET_MULTIPLIER, + budgets.bytes_per_connection().as_usize() + ); + assert_eq!( + MIN_FRAME_LENGTH * DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER, + budgets.bytes_in_flight().as_usize() + ); + } + + #[test] + fn default_budgets_clamp_maximum_frame_budget() { + let budgets = default_memory_budgets(MAX_FRAME_LENGTH + 1); + assert_eq!( + MAX_FRAME_LENGTH * DEFAULT_MESSAGE_BUDGET_MULTIPLIER, + budgets.bytes_per_message().as_usize() + ); + assert_eq!( + MAX_FRAME_LENGTH * DEFAULT_CONNECTION_BUDGET_MULTIPLIER, + budgets.bytes_per_connection().as_usize() + ); + assert_eq!( + MAX_FRAME_LENGTH * DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER, + budgets.bytes_in_flight().as_usize() + ); + } + + #[test] + fn default_budgets_message_budget_aligns_with_fragmentation() { + let frame_budget = 2048_usize; + let budgets = default_memory_budgets(frame_budget); + let expected = frame_budget * DEFAULT_MESSAGE_SIZE_MULTIPLIER; + assert_eq!( + expected, + budgets.bytes_per_message().as_usize(), + "per-message budget does not match fragmentation multiplier" + ); + } +} diff --git a/src/app/frame_handling/backpressure.rs b/src/app/frame_handling/backpressure.rs index 580a3bfc..6d570924 100644 --- a/src/app/frame_handling/backpressure.rs +++ b/src/app/frame_handling/backpressure.rs @@ -12,7 +12,10 @@ use std::time::Duration; use log::{debug, warn}; use tokio::{io, time::sleep}; -use crate::{app::MemoryBudgets, message_assembler::MessageAssemblyState}; +use crate::{ + app::{MemoryBudgets, builder_defaults::default_memory_budgets}, + message_assembler::MessageAssemblyState, +}; /// Soft-pressure threshold numerator (4/5 == 80%). const SOFT_LIMIT_NUMERATOR: u128 = 4; @@ -124,6 +127,19 @@ fn active_aggregate_limit_bytes(budgets: MemoryBudgets) -> usize { .min(budgets.bytes_in_flight().as_usize()) } +/// Resolve the effective memory budgets for one connection. +/// +/// Returns the explicit budgets if configured, or derives sensible +/// defaults from `frame_budget` using the same multiplier pattern as +/// fragmentation defaults. +#[must_use] +pub(crate) fn resolve_effective_budgets( + explicit: Option, + frame_budget: usize, +) -> MemoryBudgets { + explicit.unwrap_or_else(|| default_memory_budgets(frame_budget)) +} + fn is_at_or_above_soft_limit(buffered_bytes: usize, aggregate_limit: usize) -> bool { let lhs = (buffered_bytes as u128).saturating_mul(SOFT_LIMIT_DENOMINATOR); let rhs = (aggregate_limit as u128).saturating_mul(SOFT_LIMIT_NUMERATOR); diff --git a/src/app/frame_handling/backpressure_tests.rs b/src/app/frame_handling/backpressure_tests.rs index 3bcaac65..9748d461 100644 --- a/src/app/frame_handling/backpressure_tests.rs +++ b/src/app/frame_handling/backpressure_tests.rs @@ -5,7 +5,12 @@ use std::{error::Error, io, num::NonZeroUsize, time::Duration}; use rstest::{fixture, rstest}; use super::{ - backpressure::{MemoryPressureAction, has_hard_cap_been_breached, should_pause_inbound_reads}, + backpressure::{ + MemoryPressureAction, + has_hard_cap_been_breached, + resolve_effective_budgets, + should_pause_inbound_reads, + }, evaluate_memory_pressure, }; use crate::{ @@ -266,3 +271,68 @@ fn evaluate_pause_uses_expected_duration(budgets: TestResult) -> other => Err(io::Error::other(format!("expected Pause, got {other:?}")).into()), } } + +// ---------- resolve_effective_budgets tests ---------- + +#[rstest] +fn resolve_returns_explicit_budgets_when_configured() -> TestResult { + let explicit = custom_budgets(512, 2048, 4096)?; + let resolved = resolve_effective_budgets(Some(explicit), 1024); + if resolved != explicit { + return Err(io::Error::other(format!( + "expected explicit budgets {explicit:?}, got {resolved:?}" + )) + .into()); + } + Ok(()) +} + +#[rstest] +fn resolve_returns_derived_budgets_when_none() -> TestResult { + let resolved = resolve_effective_budgets(None, 1024); + if resolved.bytes_per_message().as_usize() != 16_384 { + return Err(io::Error::other(format!( + "expected derived bytes_per_message=16384, got {}", + resolved.bytes_per_message().as_usize() + )) + .into()); + } + if resolved.bytes_per_connection().as_usize() != 65_536 { + return Err(io::Error::other(format!( + "expected derived bytes_per_connection=65536, got {}", + resolved.bytes_per_connection().as_usize() + )) + .into()); + } + if resolved.bytes_in_flight().as_usize() != 65_536 { + return Err(io::Error::other(format!( + "expected derived bytes_in_flight=65536, got {}", + resolved.bytes_in_flight().as_usize() + )) + .into()); + } + Ok(()) +} + +#[rstest] +fn resolve_derived_budgets_change_with_frame_budget() -> TestResult { + let small = resolve_effective_budgets(None, 512); + let large = resolve_effective_budgets(None, 2048); + if small.bytes_per_message().as_usize() >= large.bytes_per_message().as_usize() { + return Err(io::Error::other(format!( + "expected smaller frame budget to yield smaller per_message: {} vs {}", + small.bytes_per_message().as_usize(), + large.bytes_per_message().as_usize() + )) + .into()); + } + if small.bytes_per_connection().as_usize() >= large.bytes_per_connection().as_usize() { + return Err(io::Error::other(format!( + "expected smaller frame budget to yield smaller per_connection: {} vs {}", + small.bytes_per_connection().as_usize(), + large.bytes_per_connection().as_usize() + )) + .into()); + } + Ok(()) +} diff --git a/src/app/frame_handling/mod.rs b/src/app/frame_handling/mod.rs index 558f3a9b..40d4092f 100644 --- a/src/app/frame_handling/mod.rs +++ b/src/app/frame_handling/mod.rs @@ -17,7 +17,11 @@ pub(crate) use assembly::{ new_message_assembly_state, purge_expired_assemblies, }; -pub(crate) use backpressure::{apply_memory_pressure, evaluate_memory_pressure}; +pub(crate) use backpressure::{ + apply_memory_pressure, + evaluate_memory_pressure, + resolve_effective_budgets, +}; pub(crate) use decode::decode_envelope; pub(crate) use reassembly::reassemble_if_needed; pub(crate) use response::forward_response; diff --git a/src/app/inbound_handler.rs b/src/app/inbound_handler.rs index a8b9c1d1..87659336 100644 --- a/src/app/inbound_handler.rs +++ b/src/app/inbound_handler.rs @@ -266,12 +266,14 @@ where ); } framed.read_buffer_mut().reserve(max_frame_length); + let effective_budgets = + frame_handling::resolve_effective_budgets(self.memory_budgets, requested_frame_length); let mut deser_failures = 0u32; let mut message_assembly = self.message_assembler.as_ref().map(|_| { frame_handling::new_message_assembly_state( self.fragmentation, requested_frame_length, - self.memory_budgets, + Some(effective_budgets), ) }); let mut pipeline = FramePipeline::new(self.fragmentation); @@ -280,7 +282,7 @@ where loop { let pressure = frame_handling::evaluate_memory_pressure( message_assembly.as_ref(), - self.memory_budgets, + Some(effective_budgets), ); frame_handling::apply_memory_pressure(pressure, || { purge_expired(&mut pipeline, &mut message_assembly); diff --git a/tests/features/derived_memory_budgets.feature b/tests/features/derived_memory_budgets.feature new file mode 100644 index 00000000..e207fb76 --- /dev/null +++ b/tests/features/derived_memory_budgets.feature @@ -0,0 +1,21 @@ +@derived_memory_budgets +Feature: Derived memory budget defaults from buffer capacity + When memory budgets are not set explicitly, sensible defaults are derived + from the codec buffer capacity, enabling all three tiers of protection. + + Scenario: Derived budgets enforce per-connection limit + Given a derived-budget app with buffer capacity 512 + When derived-budget first frames for keys 1 to 95 each with body size 400 arrive + Then the derived-budget connection terminates with an error + + Scenario: Derived budgets allow frames within limits + Given a derived-budget app with buffer capacity 512 + When a derived-budget first frame for key 1 with body "bb" arrives + And a derived-budget final continuation for key 1 sequence 1 with body "cc" arrives + Then derived-budget payload "bbcc" is eventually received + And no derived-budget connection error is recorded + + Scenario: Explicit budgets override derived defaults + Given a derived-budget app with buffer capacity 512 and explicit budgets 2048/8/8 + When derived-budget first frames for keys 1 to 11 each with body "aaaaaaaa" arrive + Then the derived-budget connection terminates with an error diff --git a/tests/fixtures/derived_memory_budgets.rs b/tests/fixtures/derived_memory_budgets.rs new file mode 100644 index 00000000..aeecd1dc --- /dev/null +++ b/tests/fixtures/derived_memory_budgets.rs @@ -0,0 +1,398 @@ +//! Behavioural fixture for derived memory budget default scenarios. + +use std::{fmt, future::Future, num::NonZeroUsize, time::Duration}; + +use futures::SinkExt; +use rstest::fixture; +use tokio::{io::DuplexStream, sync::mpsc, task::JoinHandle}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use wireframe::{ + app::{BudgetBytes, Envelope, Handler, MemoryBudgets, WireframeApp}, + fragment::FragmentationConfig, + serializer::{BincodeSerializer, Serializer}, + test_helpers::{self, TestAssembler}, +}; +pub use wireframe_testing::TestResult; + +/// Parsed as "`per_message` / `per_connection` / `in_flight`". +#[derive(Clone, Copy, Debug)] +pub struct ExplicitBudgetConfig { + /// Maximum bytes buffered for a single logical message. + pub per_message: usize, + /// Maximum bytes buffered across all assemblies on one connection. + pub per_connection: usize, + /// Maximum bytes buffered across all in-flight assemblies globally. + pub in_flight: usize, +} + +impl std::str::FromStr for ExplicitBudgetConfig { + type Err = String; + + fn from_str(s: &str) -> Result { + let mut values = s.split('/').map(str::trim); + let per_message = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing per_message")?; + let per_connection = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing per_connection")?; + let in_flight = values + .next() + .filter(|value| !value.is_empty()) + .ok_or("missing in_flight")?; + if values.next().is_some() { + return Err("unexpected trailing segments".to_string()); + } + Ok(Self { + per_message: per_message + .parse() + .map_err(|error| format!("per_message: {error}"))?, + per_connection: per_connection + .parse() + .map_err(|error| format!("per_connection: {error}"))?, + in_flight: in_flight + .parse() + .map_err(|error| format!("in_flight: {error}"))?, + }) + } +} + +const ROUTE_ID: u32 = 91; +const CORRELATION_ID: Option = Some(20); +const SPIN_ATTEMPTS: usize = 64; +const SERVER_JOIN_TIMEOUT: Duration = Duration::from_secs(2); + +/// Runtime-backed fixture that drives inbound assembly with derived (or +/// explicit) memory budgets and validates protection tier behaviour. +pub struct DerivedMemoryBudgetsWorld { + runtime: Option, + runtime_error: Option, + client: Option>, + server: Option>>, + observed_rx: Option>>, + observed_payloads: Vec>, + last_send_error: Option, + connection_error: Option, +} + +impl DerivedMemoryBudgetsWorld { + fn with_runtime( + runtime: Option, + runtime_error: Option, + ) -> Self { + Self { + runtime, + runtime_error, + client: None, + server: None, + observed_rx: None, + observed_payloads: Vec::new(), + last_send_error: None, + connection_error: None, + } + } +} + +impl Default for DerivedMemoryBudgetsWorld { + fn default() -> Self { + match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(runtime) => Self::with_runtime(Some(runtime), None), + Err(error) => { + Self::with_runtime(None, Some(format!("failed to create runtime: {error}"))) + } + } + } +} + +impl fmt::Debug for DerivedMemoryBudgetsWorld { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DerivedMemoryBudgetsWorld") + .field("client_initialized", &self.client.is_some()) + .field("server_initialized", &self.server.is_some()) + .field("observed_payloads", &self.observed_payloads.len()) + .field("last_send_error", &self.last_send_error) + .field("connection_error", &self.connection_error) + .finish_non_exhaustive() + } +} + +/// Construct the default world used by derived memory budget BDD tests. +#[fixture] +pub fn derived_memory_budgets_world() -> DerivedMemoryBudgetsWorld { + DerivedMemoryBudgetsWorld::default() +} + +impl DerivedMemoryBudgetsWorld { + fn runtime(&self) -> TestResult<&tokio::runtime::Runtime> { + self.runtime.as_ref().ok_or_else(|| { + self.runtime_error + .clone() + .unwrap_or_else(|| "runtime unavailable".to_string()) + .into() + }) + } + + fn block_on(&self, future: F) -> TestResult + where + F: Future, + { + if tokio::runtime::Handle::try_current().is_ok() { + return Err("nested Tokio runtime detected in derived-budget fixture".into()); + } + Ok(self.runtime()?.block_on(future)) + } + + /// Derive a fragmentation config from the app's clamped frame length. + /// + /// Uses the codec's actual `max_frame_length()` (post-clamping) to keep + /// the test fixture aligned with production derivation behaviour. + fn fragmentation_from_app(app: &WireframeApp) -> TestResult { + let clamped = app.length_codec().max_frame_length(); + // The multiplier 16 mirrors `DEFAULT_MESSAGE_SIZE_MULTIPLIER` in + // `builder_defaults` (module-private). If that constant changes, + // update this value in `fragmentation_from_app` to match. + let fragment_limit = NonZeroUsize::new(clamped.saturating_mul(16)) + .ok_or("buffer-derived fragment limit should be non-zero")?; + FragmentationConfig::for_frame_budget(clamped, fragment_limit, Duration::from_secs(30)) + .ok_or_else(|| "failed to derive fragmentation config for test fixture".into()) + } + + /// Build the handler and payload channel shared by both startup modes. + fn build_handler() -> (Handler, mpsc::UnboundedReceiver>) { + let (tx, rx) = mpsc::unbounded_channel::>(); + let handler: Handler = std::sync::Arc::new(move |envelope: &Envelope| { + let tx = tx.clone(); + let payload = envelope.payload_bytes().to_vec(); + Box::pin(async move { + let _ = tx.send(payload); + }) + }); + (handler, rx) + } + + /// Start the app under test with derived budgets (no explicit + /// `.memory_budgets(...)` call). + pub fn start_app_derived(&mut self, buffer_capacity: usize) -> TestResult { + let partial = WireframeApp::new()?.buffer_capacity(buffer_capacity); + let fragmentation = Self::fragmentation_from_app(&partial)?; + let (handler, rx) = Self::build_handler(); + + let app = partial + .fragmentation(Some(fragmentation)) + .with_message_assembler(TestAssembler) + .route(ROUTE_ID, handler)?; + + self.start_with_app(app, rx) + } + + /// Start the app under test with explicit budgets (overriding derived + /// defaults). + pub fn start_app_explicit( + &mut self, + buffer_capacity: usize, + config: ExplicitBudgetConfig, + ) -> TestResult { + let partial = WireframeApp::new()?.buffer_capacity(buffer_capacity); + let fragmentation = Self::fragmentation_from_app(&partial)?; + + let per_message = + NonZeroUsize::new(config.per_message).ok_or("per_message must be non-zero")?; + let per_connection = + NonZeroUsize::new(config.per_connection).ok_or("per_connection must be non-zero")?; + let in_flight = NonZeroUsize::new(config.in_flight).ok_or("in_flight must be non-zero")?; + let budgets = MemoryBudgets::new( + BudgetBytes::new(per_message), + BudgetBytes::new(per_connection), + BudgetBytes::new(in_flight), + ); + + let (handler, rx) = Self::build_handler(); + + let app = partial + .fragmentation(Some(fragmentation)) + .with_message_assembler(TestAssembler) + .memory_budgets(budgets) + .route(ROUTE_ID, handler)?; + + self.start_with_app(app, rx) + } + + fn start_with_app( + &mut self, + app: WireframeApp, + rx: mpsc::UnboundedReceiver>, + ) -> TestResult { + let codec = app.length_codec(); + let (client_stream, server_stream) = tokio::io::duplex(64 * 1024); + let client = Framed::new(client_stream, codec); + + self.block_on(async { tokio::time::pause() })?; + let server = self + .runtime()? + .spawn(async move { app.handle_connection_result(server_stream).await }); + + self.client = Some(client); + self.server = Some(server); + self.observed_rx = Some(rx); + self.observed_payloads.clear(); + self.last_send_error = None; + self.connection_error = None; + Ok(()) + } + + /// Send multiple non-final first frames for keys in a range. + /// + /// The first frame is expected to succeed; a failure there indicates a + /// genuine fixture problem and is propagated immediately. Subsequent + /// send failures are tolerated because budget enforcement may close the + /// connection mid-range — the caller observes the outcome via + /// `assert_connection_aborted`. + pub fn send_first_frames_for_range(&mut self, start: u64, end: u64, body: &str) -> TestResult { + for key in start..=end { + let result = self.send_first_frame(key, body); + if key == start { + result?; + } else if result.is_err() { + break; + } + self.spin_runtime()?; + } + Ok(()) + } + + /// Send a non-final first frame for the provided message key. + pub fn send_first_frame(&mut self, key: u64, body: &str) -> TestResult { + let payload = test_helpers::first_frame_payload(key, body.as_bytes(), false, None)?; + self.send_payload(payload) + } + + /// Send a final continuation frame for the provided message key. + pub fn send_final_continuation_frame( + &mut self, + key: u64, + sequence: u32, + body: &str, + ) -> TestResult { + let payload = + test_helpers::continuation_frame_payload(key, sequence, body.as_bytes(), true)?; + self.send_payload(payload) + } + + /// Assert that the connection has terminated with an error. + pub fn assert_connection_aborted(&mut self) -> TestResult { + self.spin_runtime()?; + self.drain_ready_payloads()?; + match self.join_server()? { + Ok(()) => Err("expected connection to abort, but it completed successfully".into()), + Err(error) => { + self.connection_error = Some(error.to_string()); + Ok(()) + } + } + } + + /// Assert that the expected payload is eventually observed. + pub fn assert_payload_received(&mut self, expected: &str) -> TestResult { + let expected = expected.as_bytes(); + for _ in 0..SPIN_ATTEMPTS { + self.drain_ready_payloads()?; + if self + .observed_payloads + .iter() + .any(|payload| payload.as_slice() == expected) + { + return Ok(()); + } + self.block_on(async { tokio::task::yield_now().await })?; + } + + Err(format!( + "expected payload {:?} not observed; observed={:?}", + expected, self.observed_payloads + ) + .into()) + } + + /// Assert that no connection error has occurred. + /// + /// Checks the server task directly rather than relying solely on the + /// cached `connection_error` field, which is only populated by + /// `assert_connection_aborted`. This prevents false negatives when the + /// server errors after processing frames in non-abort scenarios. + pub fn assert_no_connection_error(&mut self) -> TestResult { + if let Some(ref error) = self.connection_error { + return Err(format!("unexpected connection error: {error}").into()); + } + // Drop the client so the server sees EOF and can finish cleanly. + self.client.take(); + match self.join_server()? { + Ok(()) => Ok(()), + Err(error) => Err(format!("server task returned error: {error}").into()), + } + } + + fn send_payload(&mut self, payload: Vec) -> TestResult { + let envelope = Envelope::new(ROUTE_ID, CORRELATION_ID, payload); + let serializer = BincodeSerializer; + let frame = serializer.serialize(&envelope)?; + + let mut client = self.client.take().ok_or("client not initialized")?; + let send_result = self.block_on(async { + client.send(frame.into()).await?; + client.flush().await?; + Ok::<(), std::io::Error>(()) + }); + self.client = Some(client); + + match send_result { + Ok(Ok(())) => { + self.last_send_error = None; + Ok(()) + } + Ok(Err(error)) => { + self.last_send_error = Some(error.to_string()); + Err(error.into()) + } + Err(error) => { + self.last_send_error = Some(error.to_string()); + Err(error) + } + } + } + + /// Join the server task with a bounded timeout to prevent indefinite + /// hangs. Returns the inner `io::Result` on success, or a timeout error. + fn join_server(&mut self) -> TestResult> { + let server = self.server.take().ok_or("server not initialized")?; + let join_result = + self.block_on(async { tokio::time::timeout(SERVER_JOIN_TIMEOUT, server).await })?; + match join_result { + Ok(Ok(io_result)) => Ok(io_result), + Ok(Err(join_error)) => Err(format!("server task panicked: {join_error}").into()), + Err(_elapsed) => Err("server task did not complete within timeout".into()), + } + } + + fn spin_runtime(&self) -> TestResult { + self.block_on(async { + for _ in 0..8 { + tokio::task::yield_now().await; + } + })?; + Ok(()) + } + + fn drain_ready_payloads(&mut self) -> TestResult { + let mut observed_rx = self.observed_rx.take().ok_or("receiver not initialized")?; + while let Ok(payload) = observed_rx.try_recv() { + self.observed_payloads.push(payload); + } + self.observed_rx = Some(observed_rx); + Ok(()) + } +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 101b8a1e..abfd25d9 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -14,6 +14,7 @@ pub mod codec_performance_benchmarks; pub mod codec_property_roundtrip; pub mod codec_stateful; pub mod correlation; +pub mod derived_memory_budgets; pub mod fragment; pub mod interleaved_push_queues; pub mod memory_budget_backpressure; diff --git a/tests/scenarios/derived_memory_budgets_scenarios.rs b/tests/scenarios/derived_memory_budgets_scenarios.rs new file mode 100644 index 00000000..60c0d290 --- /dev/null +++ b/tests/scenarios/derived_memory_budgets_scenarios.rs @@ -0,0 +1,35 @@ +//! Scenario test functions for derived memory budget defaults. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::derived_memory_budgets::*; + +#[scenario( + path = "tests/features/derived_memory_budgets.feature", + name = "Derived budgets enforce per-connection limit" +)] +fn derived_budgets_enforce_per_connection_limit( + derived_memory_budgets_world: DerivedMemoryBudgetsWorld, +) { + drop(derived_memory_budgets_world); +} + +#[scenario( + path = "tests/features/derived_memory_budgets.feature", + name = "Derived budgets allow frames within limits" +)] +fn derived_budgets_allow_frames_within_limits( + derived_memory_budgets_world: DerivedMemoryBudgetsWorld, +) { + drop(derived_memory_budgets_world); +} + +#[scenario( + path = "tests/features/derived_memory_budgets.feature", + name = "Explicit budgets override derived defaults" +)] +fn explicit_budgets_override_derived_defaults( + derived_memory_budgets_world: DerivedMemoryBudgetsWorld, +) { + drop(derived_memory_budgets_world); +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index 18c8bbfa..6998c8bd 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -18,6 +18,7 @@ mod codec_performance_benchmarks_scenarios; mod codec_property_roundtrip_scenarios; mod codec_stateful_scenarios; mod correlation_scenarios; +mod derived_memory_budgets_scenarios; mod fragment_scenarios; mod interleaved_push_queues_scenarios; mod memory_budget_backpressure_scenarios; diff --git a/tests/steps/derived_memory_budgets_steps.rs b/tests/steps/derived_memory_budgets_steps.rs new file mode 100644 index 00000000..9f449c22 --- /dev/null +++ b/tests/steps/derived_memory_budgets_steps.rs @@ -0,0 +1,99 @@ +//! Step definitions for derived memory budget default scenarios. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::derived_memory_budgets::{ + DerivedMemoryBudgetsWorld, + ExplicitBudgetConfig, + TestResult, +}; + +#[given("a derived-budget app with buffer capacity {capacity:u64}")] +fn given_derived_budget_app( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + capacity: u64, +) -> TestResult { + let capacity = usize::try_from(capacity)?; + derived_memory_budgets_world.start_app_derived(capacity) +} + +#[given("a derived-budget app with buffer capacity {capacity:u64} and explicit budgets {config}")] +fn given_derived_budget_app_explicit( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + capacity: u64, + config: ExplicitBudgetConfig, +) -> TestResult { + let capacity = usize::try_from(capacity)?; + derived_memory_budgets_world.start_app_explicit(capacity, config) +} + +#[when( + "derived-budget first frames for keys {start:u64} to {end:u64} each with body {body:string} \ + arrive" +)] +fn when_derived_budget_first_frames_for_range( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + start: u64, + end: u64, + body: String, +) -> TestResult { + derived_memory_budgets_world.send_first_frames_for_range(start, end, &body) +} + +#[when( + "derived-budget first frames for keys {start:u64} to {end:u64} each with body size {size:u64} \ + arrive" +)] +fn when_derived_budget_first_frames_for_range_sized( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + start: u64, + end: u64, + size: u64, +) -> TestResult { + let body = "a".repeat(usize::try_from(size)?); + derived_memory_budgets_world.send_first_frames_for_range(start, end, &body) +} + +#[when("a derived-budget first frame for key {key:u64} with body {body:string} arrives")] +fn when_derived_budget_first_frame( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + key: u64, + body: String, +) -> TestResult { + derived_memory_budgets_world.send_first_frame(key, &body) +} + +#[when( + "a derived-budget final continuation for key {key:u64} sequence {sequence:u32} with body \ + {body:string} arrives" +)] +fn when_derived_budget_final_continuation( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + key: u64, + sequence: u32, + body: String, +) -> TestResult { + derived_memory_budgets_world.send_final_continuation_frame(key, sequence, &body) +} + +#[then("the derived-budget connection terminates with an error")] +fn then_derived_budget_connection_terminates( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, +) -> TestResult { + derived_memory_budgets_world.assert_connection_aborted() +} + +#[then("derived-budget payload {expected:string} is eventually received")] +fn then_derived_budget_payload_received( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, + expected: String, +) -> TestResult { + derived_memory_budgets_world.assert_payload_received(&expected) +} + +#[then("no derived-budget connection error is recorded")] +fn then_no_derived_budget_connection_error( + derived_memory_budgets_world: &mut DerivedMemoryBudgetsWorld, +) -> TestResult { + derived_memory_budgets_world.assert_no_connection_error() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 4acbc725..e99bc492 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -14,6 +14,7 @@ mod codec_performance_benchmarks_steps; mod codec_property_roundtrip_steps; mod codec_stateful_steps; mod correlation_steps; +mod derived_memory_budgets_steps; mod fragment_steps; mod interleaved_push_queues_steps; mod memory_budget_backpressure_steps;