From 664806aba7626542a50bcb2bc36430d526656f93 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 21 Feb 2026 19:24:35 +0000 Subject: [PATCH 1/5] feat(message_assembler): enforce per-message and aggregate memory budgets during assembly Implement runtime budget enforcement for message assembly covering bytes per message, bytes per connection, and bytes in flight. Frames exceeding configured budgets are rejected, partial assemblies freed, and errors surfaced via existing deserialization failure paths. - Add budget enforcement helpers in a new module `message_assembler/budget.rs`. - Extend `MessageAssemblyState` with optional connection and in-flight budget fields. - Introduce `with_budgets()` constructor to initialize budgets. - Wire budget checks into frame acceptance methods for first and continuation frames. - Add new error variants `ConnectionBudgetExceeded` and `InFlightBudgetExceeded`. - Update integration layer to pass `MemoryBudgets` to assembly state. - Include extensive unit and behavioral tests covering all budget enforcement scenarios. - Update user guide, design docs, and roadmap to reflect enforcement implementation. Closes roadmap item 8.3.2. Co-authored-by: devboxerhub[bot] --- ...ng-requests-and-shared-message-assembly.md | 19 + docs/execplans/8-3-2-budget-enforcement.md | 614 ++++++++++++++++++ docs/roadmap.md | 2 +- docs/users-guide.md | 11 +- src/app/frame_handling/assembly.rs | 23 +- src/app/inbound_handler.rs | 6 +- src/app/memory_budgets.rs | 4 +- src/message_assembler/budget.rs | 98 +++ src/message_assembler/budget_tests.rs | 424 ++++++++++++ src/message_assembler/error.rs | 22 + src/message_assembler/mod.rs | 1 + src/message_assembler/state.rs | 87 ++- src/message_assembler/tests.rs | 7 + tests/features/budget_enforcement.feature | 49 ++ tests/fixtures/budget_enforcement.rs | 258 ++++++++ tests/fixtures/mod.rs | 1 + .../scenarios/budget_enforcement_scenarios.rs | 57 ++ tests/scenarios/mod.rs | 1 + tests/steps/budget_enforcement_steps.rs | 156 +++++ tests/steps/mod.rs | 1 + 20 files changed, 1801 insertions(+), 40 deletions(-) create mode 100644 docs/execplans/8-3-2-budget-enforcement.md create mode 100644 src/message_assembler/budget.rs create mode 100644 src/message_assembler/budget_tests.rs create mode 100644 tests/features/budget_enforcement.feature create mode 100644 tests/fixtures/budget_enforcement.rs create mode 100644 tests/scenarios/budget_enforcement_scenarios.rs create mode 100644 tests/steps/budget_enforcement_steps.rs diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 0f3522d0..44878658 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -296,6 +296,25 @@ Precedence is: back-pressure behaviour, and derived defaults remain in follow-on items `8.3.2` through `8.3.5`. +#### Implementation decisions (2026-02-21) + +- Roadmap item `8.3.2` implements hard-cap budget enforcement in + `MessageAssemblyState`. Budget tracking is embedded directly in the assembly + state rather than a separate wrapper, since the state already owns all + assembly lifecycle. +- A new `with_budgets()` constructor accepts optional `connection_budget` + and `in_flight_budget` limits. The integration layer (`assembly.rs`) threads + `MemoryBudgets` from `WireframeApp` and computes the effective per-message + limit as `min(max_message_size, bytes_per_message)`. +- Budget checks are applied after single-frame early return (single-frame + messages are never buffered and skip aggregate checks). +- On budget violation the offending partial assembly is freed and the error + is surfaced as `ConnectionBudgetExceeded` or `InFlightBudgetExceeded`, both + routed through the existing `DeserFailureTracker` as `InvalidData`. +- Budget enforcement helpers are extracted to `src/message_assembler/budget.rs` + to keep `state.rs` under the 400-line file limit. +- Back-pressure (`8.3.3`) and derived defaults (`8.3.5`) remain future work. + #### Budget enforcement - Budgets MUST cover: bytes buffered per message, bytes buffered per diff --git a/docs/execplans/8-3-2-budget-enforcement.md b/docs/execplans/8-3-2-budget-enforcement.md new file mode 100644 index 00000000..363fef90 --- /dev/null +++ b/docs/execplans/8-3-2-budget-enforcement.md @@ -0,0 +1,614 @@ +# Implement budget enforcement for message assembly (8.3.2) + +This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, +`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and +`Outcomes & Retrospective` must be kept up to date as work proceeds. + +Status: COMPLETE + +No `PLANS.md` exists in this repository as of 2026-02-21. + +## Purpose / big picture + +Roadmap item `8.3.2` adds runtime enforcement of the three memory budget +dimensions introduced by `8.3.1` (configuration-only). After this change, when +a library consumer configures `MemoryBudgets` on `WireframeApp`, the message +assembly subsystem will actively enforce: + +- **bytes per message** — no single assembled message may exceed the configured + cap; +- **bytes per connection** — the sum of all bytes buffered across every + in-flight assembly on one connection may not exceed the configured cap; and +- **bytes in flight** — a second aggregate guard on buffered assembly bytes, + today equivalent to the per-connection cap but expressed as a separate + dimension so future work (streaming body buffers, transport-layer buffering) + can differentiate them. + +When a budget is exceeded the framework aborts the offending assembly, frees +its partial buffers, and surfaces `std::io::ErrorKind::InvalidData` through the +existing deserialization-failure policy. No handler is invoked for the aborted +message. Other in-flight assemblies on the same connection are unaffected. + +Success is observable when: + +- Unit tests (`rstest`) prove that each budget dimension is enforced, that + partial buffers are freed on violation, and that completed or purged + assemblies reclaim budget headroom. +- Behavioural tests (`rstest-bdd` v0.5.0) prove the enforcement behaviour in + scenario form. +- `docs/users-guide.md` documents that configured budgets are now enforced. +- `docs/roadmap.md` marks `8.3.2` as done after all quality gates pass. +- Design decisions are recorded in the relevant design document. + +## Constraints + +- Scope is strictly roadmap item `8.3.2`: enforcement of the three budget + dimensions during message assembly. Soft-limit back-pressure (`8.3.3`), + hard-cap abort-early semantics (`8.3.4`), and derived defaults (`8.3.5`) are + out of scope. +- Preserve current runtime behaviour when `MemoryBudgets` is not configured. + The existing `max_message_size` guard derived from fragmentation config + remains the sole per-message limit when budgets are absent. +- When both `max_message_size` (from fragmentation) and `bytes_per_message` + (from `MemoryBudgets`) are present, use the minimum of the two so neither + guard is silently overridden (per ADR 0002 guidance on compatible values). +- Single-frame messages (`is_last: true` on first frame) are returned + immediately and never buffered. They must pass the per-message size check but + must not count against connection or in-flight budgets because they are never + "in flight" in the assembly sense. +- Budget violations must abort the assembly for the offending message key, + free its partial buffers, and surface `InvalidData` through the existing + `DeserFailureTracker` path, consistent with ADR 0002 section "Failure modes + and cleanup semantics". +- Do not add new external dependencies. +- Keep source modules under the 400-line project guidance. +- Validate with `rstest` unit tests and `rstest-bdd` behavioural tests. +- Record the enforcement design in the ADR 0002 implementation decisions. + +## Tolerances (exception triggers) + +- Scope: if this work requires touching more than 15 files or exceeds 600 net + lines of code, stop and escalate. +- Interface: if implementing enforcement requires changing any existing public + API signatures (other than adding new constructors or methods), stop and + escalate. +- Dependencies: if any new crate is required, stop and escalate. +- Ambiguity: if the distinction between `bytes_per_connection` and + `bytes_in_flight` requires different accounting than the sum of buffered + assembly bytes, stop and present options with trade-offs. +- Iterations: if the same failing test or lint issue persists after 3 focused + attempts, stop and escalate. + +## Risks + +- Risk: `state.rs` (365 lines) and `state_tests.rs` (384 lines) are both + near the 400-line limit. Adding enforcement logic and tests may exceed it. + Severity: medium. Likelihood: high. Mitigation: extract budget-checking + functions into a new `src/message_assembler/budget.rs` module and budget + tests into `src/message_assembler/budget_tests.rs`. + +- Risk: `total_buffered_bytes()` is O(n) over all in-flight assemblies per + accepted frame. Severity: low. Likelihood: low. Mitigation: acceptable for + 8.3.2 because concurrent assemblies per connection are typically in single + digits. A cached running total can be added in a future optimisation pass if + profiling warrants it. + +- Risk: the conceptual overlap between `bytes_per_connection` and + `bytes_in_flight` may confuse consumers. Severity: low. Likelihood: medium. + Mitigation: keep them as separate checks with separate error variants and + document the distinction clearly. Today they track the same accounting; + future items may diverge when streaming body buffers are introduced. + +- Risk: BDD step definitions may interact poorly with `Instant` for + timeout-purge scenarios. Severity: low. Likelihood: low. Mitigation: use the + existing `_at` method variants that accept an explicit `Instant`, matching + the pattern in `state_tests.rs`. + +## Progress + +- [x] (2026-02-21) Drafted ExecPlan for roadmap item `8.3.2`. +- [x] Stage A: add budget enforcement to `MessageAssemblyState`. +- [x] Stage B: wire budgets through the integration layer. +- [x] Stage C: add unit tests (`rstest`) — 20 tests covering all budget + dimensions. +- [x] Stage D: add behavioural tests (`rstest-bdd`) — 6 scenarios. +- [x] Stage E: documentation, roadmap, and quality gates. + +## Surprises & discoveries + +- `state.rs` hit 429 lines after adding budget fields and wiring. Extracted + `check_size_limit` to `budget.rs` as planned by the risk mitigation. +- Borrow checker conflict in `accept_continuation_frame_at`: calling + `self.total_buffered_bytes()` while holding a mutable `entry` borrow. + Resolved with a snapshot pattern — read all immutable state into locals + before the `self.assemblies.entry(key)` call. +- Clippy `too_many_arguments` triggered on `check_aggregate_budgets` (5 args). + Introduced `AggregateBudgets` struct to bundle the two optional limits. +- BDD step text "the buffered count is" collided with an existing step in + `message_assembly_steps.rs`. Renamed to "the active assembly count is" to + avoid ambiguity. + +## Decision log + +- Decision: add budget tracking directly to `MessageAssemblyState` rather + than creating a separate `BudgetEnforcer` wrapper. Rationale: + `MessageAssemblyState` already owns the assembly map and is the sole + authority over buffer lifecycle. A wrapper would duplicate or proxy all + state-mutation methods with no clear separation-of-concerns benefit. + Date/Author: 2026-02-21 / DevBoxer. + +- Decision: use the minimum of `max_message_size` and `bytes_per_message` + when both are present, rather than letting one override the other. Rationale: + ADR 0002 states "the effective message cap is whichever guard triggers first" + and recommends operators set compatible values. Using the minimum enforces + this automatically. Date/Author: 2026-02-21 / DevBoxer. + +- Decision: keep `bytes_per_connection` and `bytes_in_flight` as separate + checks with separate error variants even though they currently track the same + sum. Rationale: the ADR names them as distinct budget dimensions. The cost of + two comparisons per frame is negligible, and divergence is expected in future + items. Date/Author: 2026-02-21 / DevBoxer. + +- Decision: extract budget-checking functions into + `src/message_assembler/budget.rs` to keep `state.rs` under 400 lines and give + enforcement a clear module boundary for future 8.3.3/8.3.4 work. Date/Author: + 2026-02-21 / DevBoxer. + +- Decision: account for both `body_buffer.len()` and `metadata.len()` in + `buffered_bytes()` because metadata is also heap-allocated and contributes to + memory pressure. The existing `accumulated_len()` (body only) continues to + serve the per-message size check. Date/Author: 2026-02-21 / DevBoxer. + +## Outcomes & retrospective + +Implementation complete. All quality gates pass. + +**Files created:** + +- `src/message_assembler/budget.rs` — budget enforcement helpers + (`AggregateBudgets`, `check_aggregate_budgets`, `check_size_limit`) +- `src/message_assembler/budget_tests.rs` — 20 unit tests +- `tests/features/budget_enforcement.feature` — 6 BDD scenarios +- `tests/fixtures/budget_enforcement.rs` — `BudgetEnforcementWorld` +- `tests/steps/budget_enforcement_steps.rs` — step definitions +- `tests/scenarios/budget_enforcement_scenarios.rs` — scenario bindings + +**Files modified:** + +- `src/message_assembler/mod.rs` — registered `budget` module +- `src/message_assembler/state.rs` — added `AggregateBudgets` field, + `with_budgets()` constructor, `total_buffered_bytes()`, wired budget checks + (398 lines, within limit) +- `src/message_assembler/error.rs` — added `ConnectionBudgetExceeded` and + `InFlightBudgetExceeded` variants +- `src/message_assembler/tests.rs` — registered `budget_tests` module +- `src/app/frame_handling/assembly.rs` — added `Option` param, + effective per-message limit computation, budget wiring +- `src/app/connection.rs` — passes `self.memory_budgets` to assembly state +- `src/app/memory_budgets.rs` — updated doc comment +- `docs/roadmap.md` — marked 8.3.2 done +- `docs/users-guide.md` — documented enforcement semantics +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` — added + implementation decisions +- `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, `tests/scenarios/mod.rs` — + registered budget enforcement modules + +## Context and orientation + +### Existing configuration surface (8.3.1 — complete) + +`src/app/memory_budgets.rs` defines `MemoryBudgets` and `BudgetBytes`. The type +is stored as `memory_budgets: Option` on `WireframeApp` (in +`src/app/builder/core.rs`) and set via the +`WireframeApp::memory_budgets(budgets)` builder method (in +`src/app/builder/config.rs`). The three accessors are: + +- `bytes_per_message() -> BudgetBytes` +- `bytes_per_connection() -> BudgetBytes` +- `bytes_in_flight() -> BudgetBytes` + +Each wraps a `NonZeroUsize`. + +### Message assembly subsystem (8.2 — complete) + +`src/message_assembler/state.rs` defines `MessageAssemblyState`, which manages +concurrent in-flight assemblies keyed by `MessageKey`. Key methods: + +- `new(max_message_size: NonZeroUsize, timeout: Duration)` — constructor. +- `accept_first_frame(input: FirstFrameInput)` — starts a new assembly; + checks `max_message_size` against declared total body length. +- `accept_first_frame_at(input, now: Instant)` — explicit-clock variant. +- `accept_continuation_frame(header, body)` — continues an assembly; checks + accumulated size via `check_size_limit()`. +- `accept_continuation_frame_at(header, body, now)` — explicit-clock variant. +- `purge_expired()` / `purge_expired_at(now)` — evicts stale assemblies. +- `buffered_count()` — number of in-flight assemblies. + +Internal `PartialAssembly` tracks `series`, `routing`, `metadata`, +`body_buffer`, and `started_at`. The method `accumulated_len()` returns +`body_buffer.len()` (body bytes only). + +`src/message_assembler/error.rs` defines `MessageAssemblyError` with variants +`Series(MessageSeriesError)`, `DuplicateFirstFrame { key }`, and +`MessageTooLarge { key, attempted, limit }`. + +### Integration layer (8.2.5 — complete) + +`src/app/frame_handling/assembly.rs` provides: + +- `new_message_assembly_state(fragmentation, frame_budget)` — builds a + `MessageAssemblyState` from fragmentation config or frame budget defaults. +- `assemble_if_needed(runtime, deser_failures, env, max_deser_failures)` — + applies message assembly to a complete post-fragment envelope, routing errors + through `DeserFailureTracker`. + +The call site is in `src/app/connection.rs` line 257: + + frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length) + +### Testing topology + +- Unit tests: `rstest` in crate-local modules. Budget-related state tests + currently live in `src/message_assembler/state_tests.rs` (384 lines). +- Behavioural tests: `rstest-bdd` fixtures and scenarios under + `tests/features/`, `tests/fixtures/`, `tests/steps/`, and `tests/scenarios/`. + The 8.3.1 configuration scenarios are in + `tests/features/memory_budgets.feature`. +- Feature files use Gherkin syntax. Fixtures provide a `World` struct with + helper methods. Steps map Gherkin phrases to Rust functions via procedural + macros. Scenarios bind feature + fixture via `#[scenario]`. + +### Relevant design documents + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` — sections + on budget enforcement and failure modes. +- `docs/generic-message-fragmentation-and-re-assembly-design.md` — section + 9.3 on memory budget integration. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` — resource + cap guidance. + +## Plan of work + +### Stage A: add budget enforcement to `MessageAssemblyState` + +Create a new module `src/message_assembler/budget.rs` containing the budget +enforcement helpers. This module houses: + +1. A `buffered_bytes()` method on `PartialAssembly` returning + `body_buffer.len() + metadata.len()`. + +2. A `total_buffered_bytes()` free function (or method) that sums + `buffered_bytes()` across all assemblies. + +3. A `check_aggregate_budgets()` function that takes the current total, the + additional bytes about to be accepted, the message key, and optional + `connection_budget` and `in_flight_budget` limits. It returns + `Result<(), MessageAssemblyError>`. + +Then modify `src/message_assembler/state.rs`: + +1. Add two optional fields to `MessageAssemblyState`: + `connection_budget: Option` and + `in_flight_budget: Option`. + +2. Add a new constructor `with_budgets(max_message_size, timeout, + connection_budget, + in_flight_budget)`. Refactor the existing `new()` to delegate to ` + with_budgets` with `None` for both budget fields. + +3. Add a public `total_buffered_bytes(&self) -> usize` query method for + diagnostics and testing. + +4. Wire `check_aggregate_budgets()` into `accept_first_frame_at()` — after + the existing `MessageTooLarge` check, before inserting the partial assembly + — using `input.body.len() + input.metadata.len()` as the incoming bytes. + +5. Wire `check_aggregate_budgets()` into `accept_continuation_frame_at()` — + after the existing `check_size_limit` call, before `push_body` — using + `body.len()` as the incoming bytes. On budget violation, call + `entry.remove()` to free partial buffers before returning the error. + +Modify `src/message_assembler/error.rs` to add two new variants to +`MessageAssemblyError`: + +- `ConnectionBudgetExceeded { key, attempted, limit }` +- `InFlightBudgetExceeded { key, attempted, limit }` + +Register the new module in `src/message_assembler/mod.rs`. + +Go/no-go: if `state.rs` exceeds 400 lines after changes, extract more logic to +`budget.rs` before proceeding. + +### Stage B: wire budgets through the integration layer + +Modify `src/app/frame_handling/assembly.rs`: + +1. Add `memory_budgets: Option` as a third parameter to + `new_message_assembly_state`. + +2. Inside the function, after computing `max_message_size` from fragmentation + config, compute the effective per-message limit as the minimum of + `max_message_size` and `budgets.bytes_per_message().get()` when budgets are + configured. + +3. Extract `connection_budget` and `in_flight_budget` from the budgets and + pass all three to `MessageAssemblyState::with_budgets()`. + +4. Add the `MemoryBudgets` import. + +Modify `src/app/connection.rs`: + +1. Pass `self.memory_budgets` as the third argument at the call site + (line 257). `MemoryBudgets` is `Copy`, so no ownership issues. + +Go/no-go: `make lint` and `make test` must pass before proceeding. + +### Stage C: add unit tests (`rstest`) + +Create `src/message_assembler/budget_tests.rs` to keep `state_tests.rs` under +400 lines. Register it in `src/message_assembler/tests.rs` with +`#[path = "budget_tests.rs"] mod budget_tests;`. + +Tests to add (all using `rstest` fixtures): + +**`total_buffered_bytes` accounting:** + +- `state_reports_zero_buffered_bytes_when_empty` — fresh state returns 0. +- `state_reports_buffered_bytes_for_single_assembly` — after accepting one + multi-frame first frame, total equals body + metadata length. +- `state_reports_buffered_bytes_for_multiple_assemblies` — after starting two + assemblies, total is their sum. +- `state_reduces_buffered_bytes_after_completion` — completing an assembly + reduces the total. +- `state_reduces_buffered_bytes_after_timeout_purge` — purging an expired + assembly reduces the total. + +**Connection budget enforcement:** + +- `state_rejects_first_frame_exceeding_connection_budget` — single message + whose body + metadata exceeds `connection_budget`. +- `state_rejects_continuation_exceeding_connection_budget` — second assembly + pushes aggregate over `connection_budget`. +- `state_allows_frame_within_connection_budget` — multiple assemblies fitting + within budget. +- `state_frees_partial_assembly_on_connection_budget_violation` — + `buffered_count()` drops after rejection. + +**In-flight budget enforcement:** + +- `state_rejects_first_frame_exceeding_in_flight_budget` — analogous to + connection budget tests. +- `state_rejects_continuation_exceeding_in_flight_budget` — analogous. +- `state_frees_partial_assembly_on_in_flight_budget_violation` — analogous. + +**Per-message budget override:** + +- `state_uses_minimum_of_max_message_size_and_budget` — construct with + `max_message_size=1024` and `bytes_per_message=512`; verify 512 is the + effective limit. + +**Budget interaction with existing behaviour:** + +- `state_budget_not_enforced_when_none` — default constructor allows any + amount (backward compatibility). +- `state_budget_violation_does_not_affect_other_assemblies` — key 1 exceeds + budget; key 2 continues unaffected. +- `state_timeout_purge_reclaims_budget_headroom` — after purge, + previously-blocked frame now fits. + +Go/no-go: all new and existing unit tests pass. + +### Stage D: add behavioural tests (`rstest-bdd`) + +Create `tests/features/budget_enforcement.feature` with scenarios covering the +observable enforcement behaviours: + +1. Accept frames within all budget limits. +2. Reject frame exceeding per-message budget. +3. Reject continuation exceeding connection budget; verify partial freed. +4. Reject continuation exceeding in-flight budget; verify partial freed. +5. Reclaim budget headroom after assembly completes. +6. Reclaim budget headroom after timeout purge. + +Create supporting files: + +- `tests/fixtures/budget_enforcement.rs` — `BudgetEnforcementWorld` fixture. +- `tests/steps/budget_enforcement_steps.rs` — step definitions. +- `tests/scenarios/budget_enforcement_scenarios.rs` — `#[scenario]` bindings. + +Register the new files in `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, and +`tests/scenarios/mod.rs`. + +Go/no-go: all BDD scenarios pass via `make test`. + +### Stage E: documentation, roadmap, and quality gates + +1. Update `src/app/memory_budgets.rs` module doc comment. +2. Update `docs/users-guide.md` "Per-connection memory budgets" section. +3. Update `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +4. Mark `8.3.2` as done in `docs/roadmap.md`. +5. Run full quality gates. + +Go/no-go: do not mark roadmap done until all gates pass. + +## Concrete steps + +Run from repository root (`/home/user/project`). + +1. Create `src/message_assembler/budget.rs` with budget enforcement helpers. + +2. Modify `src/message_assembler/mod.rs` to register the new module. + +3. Add error variants to `src/message_assembler/error.rs`. + +4. Modify `src/message_assembler/state.rs`: add fields, `with_budgets()` + constructor, `total_buffered_bytes()` query, and wire budget checks into + `accept_first_frame_at()` and `accept_continuation_frame_at()`. + +5. Verify state module compiles and existing tests pass: + + set -o pipefail + cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-2-unit-a.log + + Expected: all existing `state_tests` and `series_tests` pass. + +6. Modify `src/app/frame_handling/assembly.rs`: update + `new_message_assembly_state` to accept `Option` and thread + budget values into `MessageAssemblyState::with_budgets()`. + +7. Modify `src/app/connection.rs` line 257: pass `self.memory_budgets`. + +8. Verify integration compiles and existing tests pass: + + set -o pipefail + make lint 2>&1 | tee /tmp/wireframe-8-3-2-lint-b.log + make test 2>&1 | tee /tmp/wireframe-8-3-2-test-b.log + + Expected: no regressions. + +9. Create `src/message_assembler/budget_tests.rs` with the unit tests + described in Stage C. Register in `src/message_assembler/tests.rs`. + +10. Run unit tests: + + set -o pipefail + cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-2-unit-c.log + + Expected: all new budget tests pass alongside existing tests. + +11. Create the four BDD test files described in Stage D. Register in + `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, `tests/scenarios/mod.rs`. + +12. Run BDD tests: + + set -o pipefail + cargo test --test bdd budget_enforcement 2>&1 | tee /tmp/wireframe-8-3-2-bdd.log + + Expected: all new scenarios pass. + +13. Update documentation files as described in Stage E. + +14. Run full quality gates: + + set -o pipefail + make fmt 2>&1 | tee /tmp/wireframe-8-3-2-fmt.log + make markdownlint 2>&1 | tee /tmp/wireframe-8-3-2-markdownlint.log + make check-fmt 2>&1 | tee /tmp/wireframe-8-3-2-check-fmt.log + make lint 2>&1 | tee /tmp/wireframe-8-3-2-lint.log + make test 2>&1 | tee /tmp/wireframe-8-3-2-test.log + make nixie 2>&1 | tee /tmp/wireframe-8-3-2-nixie.log + + Expected: all pass with zero warnings. + +15. If any gate fails, inspect the corresponding log, fix the issue, and + rerun that command before continuing. + +## Validation and acceptance + +Acceptance criteria: + +- Budget enforcement: unit tests prove each dimension is enforced, partial + buffers are freed on violation, and completed or purged assemblies reclaim + headroom. +- Backward compatibility: all existing tests pass unchanged when + `MemoryBudgets` is not configured. +- Behavioural coverage: `rstest-bdd` scenarios pass for enforcement behaviour. +- Documentation: design decisions recorded in ADR 0002; user guide updated; + roadmap `8.3.2` marked done. + +Quality criteria: + +- Tests: all existing and new unit + behavioural tests pass via `make test`. +- Lint: `make lint` passes with no warnings. +- Formatting: `make fmt` and `make check-fmt` pass. +- Markdown: `make markdownlint` passes. +- Mermaid: `make nixie` passes. + +## Idempotence and recovery + +All edits are additive and safe to rerun. If an intermediate step fails: + +- Keep local changes. +- Fix the specific failure. +- Rerun only the failed command, then continue the pipeline. + +Avoid destructive git commands. If rollback is needed, revert only files +changed for this roadmap item. + +## Artefacts and notes + +Expected artefacts after completion: + +- New: `src/message_assembler/budget.rs` (budget enforcement helpers). +- New: `src/message_assembler/budget_tests.rs` (budget enforcement unit + tests). +- New: `tests/features/budget_enforcement.feature`. +- New: `tests/fixtures/budget_enforcement.rs`. +- New: `tests/steps/budget_enforcement_steps.rs`. +- New: `tests/scenarios/budget_enforcement_scenarios.rs`. +- Modified: `src/message_assembler/mod.rs`, `state.rs`, `error.rs`, + `tests.rs`. +- Modified: `src/app/frame_handling/assembly.rs`, `src/app/connection.rs`. +- Modified: `src/app/memory_budgets.rs` (doc comment update). +- Modified: `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, + `tests/scenarios/mod.rs`. +- Modified: `docs/adr-002-streaming-requests-and-shared-message-assembly.md`, + `docs/users-guide.md`, `docs/roadmap.md`. +- Gate logs: `/tmp/wireframe-8-3-2-*.log`. + +## Interfaces and dependencies + +At the end of this milestone, the following interfaces must exist. + +In `src/message_assembler/error.rs`: + + pub enum MessageAssemblyError { + // … existing variants … + + /// Accepting this frame would exceed the per-connection buffering budget. + #[error("connection budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + ConnectionBudgetExceeded { + key: MessageKey, + attempted: usize, + limit: NonZeroUsize, + }, + + /// Accepting this frame would exceed the in-flight assembly byte budget. + #[error("in-flight budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + InFlightBudgetExceeded { + key: MessageKey, + attempted: usize, + limit: NonZeroUsize, + }, + } + +In `src/message_assembler/state.rs`: + + impl MessageAssemblyState { + /// Create a new assembly state manager with optional aggregate budgets. + #[must_use] + pub fn with_budgets( + max_message_size: NonZeroUsize, + timeout: Duration, + connection_budget: Option, + in_flight_budget: Option, + ) -> Self { … } + + /// Total bytes buffered across all in-flight assemblies. + #[must_use] + pub fn total_buffered_bytes(&self) -> usize { … } + } + +In `src/app/frame_handling/assembly.rs`: + + pub(crate) fn new_message_assembly_state( + fragmentation: Option, + frame_budget: usize, + memory_budgets: Option, + ) -> MessageAssemblyState { … } + +Dependencies: + +- No new crates. +- Testing remains on `rstest` and `rstest-bdd` v0.5.0 already present in + `Cargo.toml`. diff --git a/docs/roadmap.md b/docs/roadmap.md index de385954..52752e73 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -292,7 +292,7 @@ and standardized per-connection memory budgets. ### 8.3. Per-connection memory budgets - [x] 8.3.1. Add `WireframeApp::memory_budgets(...)` builder method. -- [ ] 8.3.2. Implement budget enforcement covering bytes per message, bytes +- [x] 8.3.2. Implement budget enforcement covering bytes per message, bytes per connection, and bytes across in-flight assemblies. - [ ] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. - [ ] 8.3.4. Implement hard cap (abort early, release partial state, surface diff --git a/docs/users-guide.md b/docs/users-guide.md index 2742bc2e..c3903c1a 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -567,9 +567,14 @@ let _app = WireframeApp::new() .read_timeout_ms(250); ``` -This builder method establishes configuration for the current `WireframeApp` -instance. Runtime enforcement and derived defaults are implemented in the -subsequent roadmap items covering memory budget enforcement behaviour. +When budgets are configured, the message assembly subsystem enforces them at +frame acceptance time. Frames that would cause the total buffered bytes to +exceed the per-connection or in-flight budget are rejected, the offending +partial assembly is freed, and the failure is surfaced through the existing +deserialization-failure policy (`InvalidData`). The effective per-message limit +is the minimum of the fragmentation `max_message_size` and the configured +`bytes_per_message`. Single-frame messages that complete immediately are never +counted against aggregate budgets since they do not buffer. #### Message key multiplexing (8.2.3) diff --git a/src/app/frame_handling/assembly.rs b/src/app/frame_handling/assembly.rs index 18222d3a..f7edfd91 100644 --- a/src/app/frame_handling/assembly.rs +++ b/src/app/frame_handling/assembly.rs @@ -6,7 +6,7 @@ use log::debug; use super::core::DeserFailureTracker; use crate::{ - app::{Envelope, builder_defaults::default_fragmentation}, + app::{Envelope, builder_defaults::default_fragmentation, memory_budgets::MemoryBudgets}, codec::clamp_frame_length, fragment::FragmentationConfig, message_assembler::{ @@ -42,13 +42,18 @@ impl<'a> AssemblyRuntime<'a> { } /// Build a connection-scoped message assembly state from known budgets. +/// +/// When `memory_budgets` is `Some`, the effective per-message limit is +/// `min(fragmentation_max, bytes_per_message)` and the connection/in-flight +/// budgets are forwarded to [`MessageAssemblyState::with_budgets`]. #[must_use] pub(crate) fn new_message_assembly_state( fragmentation: Option, frame_budget: usize, + memory_budgets: Option, ) -> MessageAssemblyState { let config = fragmentation.or_else(|| default_fragmentation(frame_budget)); - let max_message_size = config.map_or_else( + let frag_max = config.map_or_else( || NonZeroUsize::new(clamp_frame_length(frame_budget)).unwrap_or(NonZeroUsize::MIN), |cfg| cfg.max_message_size, ); @@ -56,7 +61,19 @@ pub(crate) fn new_message_assembly_state( cfg.reassembly_timeout }); - MessageAssemblyState::new(max_message_size, timeout) + match memory_budgets { + Some(budgets) => { + let per_message = budgets.bytes_per_message().get(); + let max_message_size = frag_max.min(per_message); + MessageAssemblyState::with_budgets( + max_message_size, + timeout, + Some(budgets.bytes_per_connection().get()), + Some(budgets.bytes_in_flight().get()), + ) + } + None => MessageAssemblyState::new(frag_max, timeout), + } } /// Purge stale in-flight assemblies. diff --git a/src/app/inbound_handler.rs b/src/app/inbound_handler.rs index 2952cf0e..eefd9b2e 100644 --- a/src/app/inbound_handler.rs +++ b/src/app/inbound_handler.rs @@ -254,7 +254,11 @@ where framed.read_buffer_mut().reserve(max_frame_length); let mut deser_failures = 0u32; let mut message_assembly = self.message_assembler.as_ref().map(|_| { - frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length) + frame_handling::new_message_assembly_state( + self.fragmentation, + requested_frame_length, + self.memory_budgets, + ) }); let mut pipeline = FramePipeline::new(self.fragmentation); let timeout_dur = Duration::from_millis(self.read_timeout_ms); diff --git a/src/app/memory_budgets.rs b/src/app/memory_budgets.rs index 4adc3812..8671612f 100644 --- a/src/app/memory_budgets.rs +++ b/src/app/memory_budgets.rs @@ -2,8 +2,8 @@ //! //! `MemoryBudgets` stores explicit byte caps configured on //! [`crate::app::WireframeApp`] via `WireframeApp::memory_budgets(...)`. -//! This module only exposes configuration; enforcement lands in a future -//! iteration. +//! Enforcement is wired through `MessageAssemblyState::with_budgets` so +//! that frames exceeding any budget dimension are rejected at assembly time. use std::num::NonZeroUsize; diff --git a/src/message_assembler/budget.rs b/src/message_assembler/budget.rs new file mode 100644 index 00000000..eff94abe --- /dev/null +++ b/src/message_assembler/budget.rs @@ -0,0 +1,98 @@ +//! Budget enforcement helpers for message assembly. +//! +//! This module provides aggregate budget checks applied during frame +//! acceptance. Per-message size limits are handled by the existing +//! `check_size_limit` function in [`super::state`]; this module adds +//! per-connection and in-flight aggregate budget enforcement. + +use std::num::NonZeroUsize; + +use super::{MessageKey, error::MessageAssemblyError}; + +/// Paired connection and in-flight budget limits. +/// +/// Bundled into a struct so call-sites pass a single value instead of two +/// `Option` parameters. +#[derive(Clone, Copy, Debug)] +pub(super) struct AggregateBudgets { + pub(super) connection: Option, + pub(super) in_flight: Option, +} + +/// Check whether accepting `additional_bytes` for `key` would exceed +/// the connection budget or in-flight budget. +/// +/// Both budgets are checked against the same `current_total` because, +/// at this layer, all buffered bytes are assembly bytes. The dimensions +/// are kept separate so future work (streaming body buffers, transport +/// buffering) can diverge them. +/// +/// # Errors +/// +/// Returns [`MessageAssemblyError::ConnectionBudgetExceeded`] or +/// [`MessageAssemblyError::InFlightBudgetExceeded`] when the respective +/// limit would be exceeded. +pub(super) fn check_aggregate_budgets( + key: MessageKey, + current_total: usize, + additional_bytes: usize, + budgets: &AggregateBudgets, +) -> Result<(), MessageAssemblyError> { + let new_total = current_total.saturating_add(additional_bytes); + + if let Some(limit) = budgets.connection + && new_total > limit.get() + { + return Err(MessageAssemblyError::ConnectionBudgetExceeded { + key, + attempted: new_total, + limit, + }); + } + + if let Some(limit) = budgets.in_flight + && new_total > limit.get() + { + return Err(MessageAssemblyError::InFlightBudgetExceeded { + key, + attempted: new_total, + limit, + }); + } + + Ok(()) +} + +/// Check if accumulated size plus new body would exceed the per-message +/// size limit. +/// +/// Returns the new total size on success. +/// +/// # Errors +/// +/// Returns [`MessageAssemblyError::MessageTooLarge`] when the new total +/// would exceed `max_message_size`. +pub(super) fn check_size_limit( + max_message_size: NonZeroUsize, + key: MessageKey, + accumulated: usize, + body_len: usize, +) -> Result { + let Some(new_len) = accumulated.checked_add(body_len) else { + return Err(MessageAssemblyError::MessageTooLarge { + key, + attempted: usize::MAX, + limit: max_message_size, + }); + }; + + if new_len > max_message_size.get() { + return Err(MessageAssemblyError::MessageTooLarge { + key, + attempted: new_len, + limit: max_message_size, + }); + } + + Ok(new_len) +} diff --git a/src/message_assembler/budget_tests.rs b/src/message_assembler/budget_tests.rs new file mode 100644 index 00000000..71f71827 --- /dev/null +++ b/src/message_assembler/budget_tests.rs @@ -0,0 +1,424 @@ +//! Unit tests for aggregate budget enforcement (8.3.2). + +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use rstest::{fixture, rstest}; + +use crate::message_assembler::{ + EnvelopeRouting, + FirstFrameInput, + MessageAssemblyError, + MessageAssemblyState, + MessageKey, + test_helpers::{continuation_header, first_header}, +}; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Non-zero shorthand. +fn nz(val: usize) -> NonZeroUsize { NonZeroUsize::new(val).expect("non-zero") } + +/// Submit a first frame with `body_len` bytes of body data. +fn submit_first( + state: &mut MessageAssemblyState, + key: u64, + body: &[u8], + is_last: bool, +) -> Result, MessageAssemblyError> { + let header = first_header(key, body.len(), is_last); + let input = + FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], body).expect("valid"); + state.accept_first_frame(input) +} + +/// Submit a first frame at a specific timestamp. +fn submit_first_at( + state: &mut MessageAssemblyState, + key: u64, + body: &[u8], + now: Instant, +) -> Result, MessageAssemblyError> { + let header = first_header(key, body.len(), false); + let input = + FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], body).expect("valid"); + state.accept_first_frame_at(input, now) +} + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +/// State with no budgets — backwards-compatibility baseline. +#[fixture] +fn unbounded_state() -> MessageAssemblyState { + MessageAssemblyState::new(nz(1024), Duration::from_secs(30)) +} + +/// State with a 20-byte connection budget and 1024-byte per-message limit. +#[fixture] +fn connection_budgeted_state() -> MessageAssemblyState { + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None) +} + +/// State with a 20-byte in-flight budget and 1024-byte per-message limit. +#[fixture] +fn in_flight_budgeted_state() -> MessageAssemblyState { + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20))) +} + +/// State with both connection (30) and in-flight (20) budgets. +#[fixture] +fn dual_budgeted_state() -> MessageAssemblyState { + MessageAssemblyState::with_budgets( + nz(1024), + Duration::from_secs(30), + Some(nz(30)), + Some(nz(20)), + ) +} + +// ============================================================================= +// total_buffered_bytes accounting +// ============================================================================= + +#[rstest] +fn total_buffered_bytes_starts_at_zero(#[from(unbounded_state)] state: MessageAssemblyState) { + assert_eq!(state.total_buffered_bytes(), 0); +} + +#[rstest] +fn total_buffered_bytes_tracks_single_assembly( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"hello", false).expect("first frame"); + assert_eq!(state.total_buffered_bytes(), 5); +} + +#[rstest] +fn total_buffered_bytes_tracks_multiple_assemblies( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"aaa", false).expect("first"); + submit_first(&mut state, 2, b"bbbbb", false).expect("second"); + assert_eq!(state.total_buffered_bytes(), 8); // 3 + 5 +} + +#[rstest] +fn total_buffered_bytes_decreases_after_completion( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"aaa", false).expect("first"); + submit_first(&mut state, 2, b"bb", false).expect("second"); + assert_eq!(state.total_buffered_bytes(), 5); + + // Complete key 1 + let cont = continuation_header(1, 1, 2, true); + state + .accept_continuation_frame(&cont, b"xx") + .expect("cont") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 2); // only key 2 remains +} + +#[rstest] +fn total_buffered_bytes_decreases_after_purge( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + let now = Instant::now(); + submit_first_at(&mut state, 1, b"data", now).expect("first"); + assert_eq!(state.total_buffered_bytes(), 4); + + let future = now + Duration::from_secs(31); + state.purge_expired_at(future); + assert_eq!(state.total_buffered_bytes(), 0); +} + +#[rstest] +fn single_frame_message_not_counted_in_buffered_bytes( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"complete", true) + .expect("first") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 0); + assert_eq!(state.buffered_count(), 0); +} + +// ============================================================================= +// Connection budget enforcement +// ============================================================================= + +#[rstest] +fn connection_budget_allows_frames_within_limit( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // 10 bytes fits within 20-byte budget + submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); + assert_eq!(state.total_buffered_bytes(), 10); +} + +#[rstest] +fn connection_budget_rejects_first_frame_exceeding_limit( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // 21 bytes exceeds 20-byte budget + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { + key: MessageKey(1), + attempted: 21, + .. + } + ), + "unexpected error: {err:?}" + ); + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +fn connection_budget_rejects_continuation_exceeding_limit( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // First frame: 10 bytes (within 20-byte budget) + submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); + + // Continuation: 11 bytes would bring total to 21, exceeding 20 + let cont = continuation_header(1, 1, 11, true); + let err = state + .accept_continuation_frame(&cont, &[0u8; 11]) + .expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "unexpected error: {err:?}" + ); + // Partial assembly should be freed on budget violation + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +fn connection_budget_frees_partial_on_violation( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Start two assemblies: 8 + 8 = 16 (within 20) + submit_first(&mut state, 1, &[0u8; 8], false).expect("first"); + submit_first(&mut state, 2, &[0u8; 8], false).expect("second"); + assert_eq!(state.buffered_count(), 2); + + // Continuation on key 1: 5 bytes would bring total to 21 + let cont = continuation_header(1, 1, 5, false); + let err = state + .accept_continuation_frame(&cont, &[0u8; 5]) + .expect_err("should reject"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Key 1 is freed; key 2 survives + assert_eq!(state.buffered_count(), 1); + assert_eq!(state.total_buffered_bytes(), 8); +} + +// ============================================================================= +// In-flight budget enforcement +// ============================================================================= + +#[rstest] +fn in_flight_budget_allows_frames_within_limit( + #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); + assert_eq!(state.total_buffered_bytes(), 10); +} + +#[rstest] +fn in_flight_budget_rejects_first_frame_exceeding_limit( + #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, +) { + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!( + err, + MessageAssemblyError::InFlightBudgetExceeded { + key: MessageKey(1), + attempted: 21, + .. + } + ), + "unexpected error: {err:?}" + ); + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +fn in_flight_budget_rejects_continuation_exceeding_limit( + #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); + + let cont = continuation_header(1, 1, 11, true); + let err = state + .accept_continuation_frame(&cont, &[0u8; 11]) + .expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "unexpected error: {err:?}" + ); + assert_eq!(state.buffered_count(), 0); +} + +// ============================================================================= +// Dual budget: tighter budget triggers first +// ============================================================================= + +#[rstest] +fn dual_budget_in_flight_triggers_before_connection( + #[from(dual_budgeted_state)] mut state: MessageAssemblyState, +) { + // In-flight budget is 20, connection budget is 30. + // 21 bytes should trigger in-flight first. + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "expected in-flight to trigger first, got: {err:?}" + ); +} + +#[test] +fn dual_budget_connection_triggers_when_in_flight_not_exceeded() { + // connection=15 < in_flight=20, so connection triggers first. + let mut state = MessageAssemblyState::with_budgets( + nz(1024), + Duration::from_secs(30), + Some(nz(15)), + Some(nz(20)), + ); + let err = submit_first(&mut state, 1, &[0u8; 16], false).expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "expected connection to trigger first, got: {err:?}" + ); +} + +// ============================================================================= +// Backward compatibility: no budgets = no enforcement +// ============================================================================= + +#[rstest] +fn no_budgets_allows_large_frames(#[from(unbounded_state)] mut state: MessageAssemblyState) { + // 500 bytes is well within the 1024 per-message limit and has no aggregate cap. + submit_first(&mut state, 1, &[0u8; 500], false).expect("no budget enforcement"); + submit_first(&mut state, 2, &[0u8; 500], false).expect("no budget enforcement"); + assert_eq!(state.total_buffered_bytes(), 1000); +} + +// ============================================================================= +// Budget isolation: violation for one key does not affect others +// ============================================================================= + +#[rstest] +fn budget_violation_does_not_affect_other_assemblies( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Start key 1 with 10 bytes + submit_first(&mut state, 1, &[0u8; 10], false).expect("first"); + assert_eq!(state.buffered_count(), 1); + + // Try key 2 with 11 bytes — total would be 21, exceeding 20-byte budget + let err = submit_first(&mut state, 2, &[0u8; 11], false).expect_err("should reject"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Key 1 is still intact + assert_eq!(state.buffered_count(), 1); + assert_eq!(state.total_buffered_bytes(), 10); + + // Key 1 can still be completed + let cont = continuation_header(1, 1, 3, true); + let msg = state + .accept_continuation_frame(&cont, b"fin") + .expect("cont") + .expect("complete"); + assert_eq!(msg.body(), &[&[0u8; 10][..], b"fin"].concat()); +} + +// ============================================================================= +// Headroom reclamation after purge +// ============================================================================= + +#[test] +fn headroom_reclaimed_after_purge_allows_new_assembly() { + let mut state = + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None); + + let now = Instant::now(); + submit_first_at(&mut state, 1, &[0u8; 15], now).expect("first"); + assert_eq!(state.total_buffered_bytes(), 15); + + // A 6-byte frame would exceed budget (15 + 6 = 21 > 20) + let err = submit_first(&mut state, 2, &[0u8; 6], false).expect_err("over budget"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Purge key 1 via timeout + let future = now + Duration::from_secs(31); + state.purge_expired_at(future); + assert_eq!(state.total_buffered_bytes(), 0); + + // Now 6 bytes is fine + submit_first(&mut state, 3, &[0u8; 6], false).expect("within reclaimed budget"); + assert_eq!(state.total_buffered_bytes(), 6); +} + +#[test] +fn headroom_reclaimed_after_completion_allows_new_frame() { + let mut state = + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20))); + + submit_first(&mut state, 1, &[0u8; 15], false).expect("first"); + + // Complete key 1 + let cont = continuation_header(1, 1, 3, true); + state + .accept_continuation_frame(&cont, b"end") + .expect("cont") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 0); + + // Now we have full headroom again + submit_first(&mut state, 2, &[0u8; 20], false).expect("within reclaimed budget"); + assert_eq!(state.total_buffered_bytes(), 20); +} + +// ============================================================================= +// Single-frame messages bypass aggregate budgets +// ============================================================================= + +#[rstest] +fn single_frame_message_not_subject_to_aggregate_budgets( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Fill up to the budget edge + submit_first(&mut state, 1, &[0u8; 19], false).expect("first"); + assert_eq!(state.total_buffered_bytes(), 19); + + // A single-frame message should still succeed because it never buffers + let msg = submit_first(&mut state, 2, b"big-single-frame-payload", true) + .expect("single frame accepted") + .expect("should complete immediately"); + assert_eq!(msg.body(), b"big-single-frame-payload"); + + // Buffered bytes unchanged (single-frame was never buffered) + assert_eq!(state.total_buffered_bytes(), 19); +} diff --git a/src/message_assembler/error.rs b/src/message_assembler/error.rs index 17915718..86b5c33d 100644 --- a/src/message_assembler/error.rs +++ b/src/message_assembler/error.rs @@ -107,4 +107,26 @@ pub enum MessageAssemblyError { /// Configured size cap. limit: NonZeroUsize, }, + + /// Accepting this frame would exceed the per-connection buffering budget. + #[error("connection budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + ConnectionBudgetExceeded { + /// Message key whose frame triggered the guard. + key: MessageKey, + /// Total bytes that would result from accepting the frame. + attempted: usize, + /// Configured per-connection byte cap. + limit: NonZeroUsize, + }, + + /// Accepting this frame would exceed the in-flight assembly byte budget. + #[error("in-flight budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + InFlightBudgetExceeded { + /// Message key whose frame triggered the guard. + key: MessageKey, + /// Total bytes that would result from accepting the frame. + attempted: usize, + /// Configured in-flight byte cap. + limit: NonZeroUsize, + }, } diff --git a/src/message_assembler/mod.rs b/src/message_assembler/mod.rs index 2a0e44a4..d6362f27 100644 --- a/src/message_assembler/mod.rs +++ b/src/message_assembler/mod.rs @@ -20,6 +20,7 @@ //! - Duplicate frames (already-processed sequences) //! - Frames arriving after the series is complete +mod budget; pub mod error; mod header; pub mod series; diff --git a/src/message_assembler/state.rs b/src/message_assembler/state.rs index 863579cc..03310ea0 100644 --- a/src/message_assembler/state.rs +++ b/src/message_assembler/state.rs @@ -13,6 +13,7 @@ use std::{ use super::{ ContinuationFrameHeader, MessageKey, + budget::{AggregateBudgets, check_aggregate_budgets, check_size_limit}, error::{MessageAssemblyError, MessageSeriesError, MessageSeriesStatus}, series::MessageSeries, types::{AssembledMessage, EnvelopeRouting, FirstFrameInput}, @@ -44,6 +45,9 @@ impl PartialAssembly { fn set_metadata(&mut self, data: Vec) { self.metadata = data; } fn accumulated_len(&self) -> usize { self.body_buffer.len() } + + /// Total heap bytes held by this partial assembly (body + metadata). + fn buffered_bytes(&self) -> usize { self.body_buffer.len().saturating_add(self.metadata.len()) } } /// Stateful manager for multiple concurrent message assemblies. @@ -109,6 +113,7 @@ pub struct MessageAssemblyState { max_message_size: NonZeroUsize, timeout: Duration, assemblies: HashMap, + budgets: AggregateBudgets, } impl MessageAssemblyState { @@ -120,10 +125,29 @@ impl MessageAssemblyState { /// * `timeout` - Duration after which partial assemblies are purged. #[must_use] pub fn new(max_message_size: NonZeroUsize, timeout: Duration) -> Self { + Self::with_budgets(max_message_size, timeout, None, None) + } + + /// Create a new assembly state manager with optional aggregate budgets. + /// + /// When `connection_budget` or `in_flight_budget` is `Some`, frames that + /// would cause the total buffered bytes across all in-flight assemblies + /// to exceed the respective limit are rejected. + #[must_use] + pub fn with_budgets( + max_message_size: NonZeroUsize, + timeout: Duration, + connection_budget: Option, + in_flight_budget: Option, + ) -> Self { Self { max_message_size, timeout, assemblies: HashMap::new(), + budgets: AggregateBudgets { + connection: connection_budget, + in_flight: in_flight_budget, + }, } } @@ -193,6 +217,16 @@ impl MessageAssemblyState { ))); } + // Check aggregate budgets before buffering (single-frame messages + // are returned above and never counted against aggregate budgets). + let incoming_bytes = input.body.len().saturating_add(input.metadata.len()); + check_aggregate_budgets( + key, + self.total_buffered_bytes(), + incoming_bytes, + &self.budgets, + )?; + // Start new assembly, preserving envelope routing metadata from the // first frame so the completed message is dispatched correctly. let mut partial = PartialAssembly::new(series, input.routing, now); @@ -237,34 +271,6 @@ impl MessageAssemblyState { ) } - /// Check if accumulated size plus new body would exceed the limit. - /// - /// Returns the new total size on success. - fn check_size_limit( - max_message_size: NonZeroUsize, - key: MessageKey, - accumulated: usize, - body_len: usize, - ) -> Result { - let Some(new_len) = accumulated.checked_add(body_len) else { - return Err(MessageAssemblyError::MessageTooLarge { - key, - attempted: usize::MAX, - limit: max_message_size, - }); - }; - - if new_len > max_message_size.get() { - return Err(MessageAssemblyError::MessageTooLarge { - key, - attempted: new_len, - limit: max_message_size, - }); - } - - Ok(new_len) - } - /// Process a continuation frame with an explicit timestamp. /// /// See [`accept_continuation_frame`](Self::accept_continuation_frame) for @@ -295,6 +301,11 @@ impl MessageAssemblyState { )); } + // Snapshot budget state before the mutable entry borrow. + let max_message_size = self.max_message_size; + let budgets = self.budgets; + let buffered_total = self.total_buffered_bytes(); + let Entry::Occupied(mut entry) = self.assemblies.entry(key) else { return Err(MessageAssemblyError::Series( MessageSeriesError::MissingFirstFrame { key }, @@ -314,8 +325,13 @@ impl MessageAssemblyState { // Check size limit let accumulated = entry.get().accumulated_len(); - if let Err(e) = Self::check_size_limit(self.max_message_size, key, accumulated, body.len()) - { + if let Err(e) = check_size_limit(max_message_size, key, accumulated, body.len()) { + entry.remove(); + return Err(e); + } + + // Check aggregate budgets + if let Err(e) = check_aggregate_budgets(key, buffered_total, body.len(), &budgets) { entry.remove(); return Err(e); } @@ -359,6 +375,17 @@ impl MessageAssemblyState { evicted } + /// Total bytes buffered across all in-flight assemblies. + /// + /// Includes both body and metadata bytes for each partial assembly. + #[must_use] + pub fn total_buffered_bytes(&self) -> usize { + self.assemblies + .values() + .map(PartialAssembly::buffered_bytes) + .sum() + } + /// Number of partial assemblies currently buffered. #[must_use] pub fn buffered_count(&self) -> usize { self.assemblies.len() } diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 48508814..e8568e00 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -201,3 +201,10 @@ mod series_tests; #[path = "state_tests.rs"] mod state_tests; + +// ============================================================================= +// Budget enforcement tests (8.3.2) - see budget_tests.rs +// ============================================================================= + +#[path = "budget_tests.rs"] +mod budget_tests; diff --git a/tests/features/budget_enforcement.feature b/tests/features/budget_enforcement.feature new file mode 100644 index 00000000..94b8a3d9 --- /dev/null +++ b/tests/features/budget_enforcement.feature @@ -0,0 +1,49 @@ +@budget_enforcement +Feature: Memory budget enforcement during message assembly + When memory budgets are configured on a Wireframe application, the message + assembly subsystem actively rejects frames that would exceed any of the + three budget dimensions — per-message, per-connection, and in-flight. + + Background: + Given a budgeted assembly state with max message size 1024, timeout 30, connection budget 50, and in-flight budget 40 + + Scenario: Accept frames within all budget limits + When a first frame for key 1 with 10 body bytes is accepted + Then the frame is accepted without error + And total buffered bytes is 10 + + Scenario: Reject first frame exceeding connection budget + When a first frame for key 1 with 51 body bytes is rejected + Then the error is "ConnectionBudgetExceeded" + And the active assembly count is 0 + + Scenario: Reject continuation exceeding in-flight budget + When a first frame for key 1 with 30 body bytes is accepted + Then the frame is accepted without error + When a continuation for key 1 with sequence 1 and 11 body bytes is rejected + Then the error is "InFlightBudgetExceeded" + And the active assembly count is 0 + + Scenario: Reclaim budget headroom after assembly completes + When a first frame for key 1 with 35 body bytes is accepted + Then total buffered bytes is 35 + When a final continuation for key 1 with sequence 1 and 5 body bytes completes the message + Then total buffered bytes is 0 + When a first frame for key 2 with 35 body bytes is accepted + Then the frame is accepted without error + + Scenario: Reclaim budget headroom after timeout purge + Given the clock is at time zero + When a first frame for key 1 with 35 body bytes is accepted at time zero + Then total buffered bytes is 35 + When expired assemblies are purged at 31 seconds + Then total buffered bytes is 0 + When a first frame for key 2 with 35 body bytes is accepted + Then the frame is accepted without error + + Scenario: Single-frame message bypasses aggregate budgets + When a first frame for key 1 with 35 body bytes is accepted + Then total buffered bytes is 35 + When a single-frame message for key 2 with 100 body bytes is accepted + Then the single-frame message completes immediately + And total buffered bytes is 35 diff --git a/tests/fixtures/budget_enforcement.rs b/tests/fixtures/budget_enforcement.rs new file mode 100644 index 00000000..2bad83ce --- /dev/null +++ b/tests/fixtures/budget_enforcement.rs @@ -0,0 +1,258 @@ +//! Behavioural test world for budget enforcement (8.3.2). + +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use rstest::fixture; +use wireframe::message_assembler::{ + ContinuationFrameHeader, + EnvelopeRouting, + FirstFrameHeader, + FirstFrameInput, + FrameSequence, + MessageAssemblyError, + MessageAssemblyState, + MessageKey, +}; +pub use wireframe_testing::TestResult; + +/// Configuration bundle for `init_budgeted_state`. +#[derive(Clone, Copy)] +pub struct BudgetedStateConfig { + pub max_message_size: usize, + pub timeout_secs: u64, + pub connection_budget: usize, + pub in_flight_budget: usize, +} + +/// Continuation frame descriptor. +#[derive(Clone, Copy)] +pub struct ContinuationInput { + pub key: u64, + pub sequence: u32, + pub body_len: usize, + pub is_last: bool, +} + +/// Behavioural test world for budget enforcement scenarios. +#[derive(Debug)] +pub struct BudgetEnforcementWorld { + state: MessageAssemblyState, + last_error: Option, + last_accepted: bool, + last_completed: bool, + epoch: Option, +} + +impl Default for BudgetEnforcementWorld { + fn default() -> Self { + let max = NonZeroUsize::MIN; + Self { + state: MessageAssemblyState::new(max, Duration::from_secs(30)), + last_error: None, + last_accepted: false, + last_completed: false, + epoch: None, + } + } +} + +/// Fixture for `BudgetEnforcementWorld`. +#[fixture] +#[rustfmt::skip] +pub fn budget_enforcement_world() -> BudgetEnforcementWorld { + BudgetEnforcementWorld::default() +} + +impl BudgetEnforcementWorld { + /// Re-initialise state with explicit budgets. + pub fn init_budgeted_state(&mut self, cfg: BudgetedStateConfig) -> TestResult { + let max_message_size = + NonZeroUsize::new(cfg.max_message_size).ok_or("max_message_size must be non-zero")?; + let connection = + NonZeroUsize::new(cfg.connection_budget).ok_or("connection_budget must be non-zero")?; + let in_flight = + NonZeroUsize::new(cfg.in_flight_budget).ok_or("in_flight_budget must be non-zero")?; + self.state = MessageAssemblyState::with_budgets( + max_message_size, + Duration::from_secs(cfg.timeout_secs), + Some(connection), + Some(in_flight), + ); + Ok(()) + } + + /// Set the epoch for time-based tests. + pub fn set_epoch(&mut self) { self.epoch = Some(Instant::now()); } + + /// Record the result of an assembly operation. + fn record_result( + &mut self, + result: Result< + Option, + MessageAssemblyError, + >, + ) { + match result { + Ok(None) => { + self.last_accepted = true; + self.last_completed = false; + self.last_error = None; + } + Ok(Some(_)) => { + self.last_accepted = true; + self.last_completed = true; + self.last_error = None; + } + Err(e) => { + self.last_accepted = false; + self.last_error = Some(e); + } + } + } + + /// Build a first-frame header and body from key, body length, and finality. + fn build_first_input(key: u64, body_len: usize, is_last: bool) -> (Vec, FirstFrameHeader) { + ( + vec![0u8; body_len], + FirstFrameHeader { + message_key: MessageKey(key), + metadata_len: 0, + body_len, + total_body_len: None, + is_last, + }, + ) + } + + /// Submit a multi-frame first frame. + pub fn accept_first_frame(&mut self, key: u64, body_len: usize) -> TestResult { + let (body, header) = Self::build_first_input(key, body_len, false); + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .map_err(|e| e.to_string())?; + let result = self.state.accept_first_frame(input); + self.record_result(result); + Ok(()) + } + + /// Submit a multi-frame first frame at the stored epoch. + pub fn accept_first_frame_at_epoch(&mut self, key: u64, body_len: usize) -> TestResult { + let now = self.epoch.ok_or("epoch not set")?; + let (body, header) = Self::build_first_input(key, body_len, false); + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .map_err(|e| e.to_string())?; + let result = self.state.accept_first_frame_at(input, now); + self.record_result(result); + Ok(()) + } + + /// Submit a first frame that is expected to be rejected. + pub fn reject_first_frame(&mut self, key: u64, body_len: usize) -> TestResult { + self.accept_first_frame(key, body_len)?; + if self.last_accepted { + return Err("expected first frame to be rejected, but it was accepted".into()); + } + Ok(()) + } + + /// Submit a single-frame message. + pub fn accept_single_frame(&mut self, key: u64, body_len: usize) -> TestResult { + let (body, header) = Self::build_first_input(key, body_len, true); + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .map_err(|e| e.to_string())?; + let result = self.state.accept_first_frame(input); + self.record_result(result); + Ok(()) + } + + /// Submit a continuation frame. + pub fn accept_continuation(&mut self, input: ContinuationInput) { + let body = vec![0u8; input.body_len]; + let header = ContinuationFrameHeader { + message_key: MessageKey(input.key), + sequence: Some(FrameSequence(input.sequence)), + body_len: input.body_len, + is_last: input.is_last, + }; + let result = self.state.accept_continuation_frame(&header, &body); + self.record_result(result); + } + + /// Submit a continuation frame that is expected to be rejected. + pub fn reject_continuation(&mut self, key: u64, sequence: u32, body_len: usize) -> TestResult { + self.accept_continuation(ContinuationInput { + key, + sequence, + body_len, + is_last: false, + }); + if self.last_accepted { + return Err("expected continuation to be rejected, but it was accepted".into()); + } + Ok(()) + } + + /// Purge expired assemblies at a given offset from epoch. + pub fn purge_at_offset(&mut self, offset_secs: u64) -> TestResult { + let epoch = self.epoch.ok_or("epoch not set")?; + let future = epoch + Duration::from_secs(offset_secs); + self.state.purge_expired_at(future); + Ok(()) + } + + // ---- Assertions ---- + + pub fn assert_accepted(&self) -> TestResult { + if !self.last_accepted { + let detail = self + .last_error + .as_ref() + .map_or("(no error captured)".to_string(), |e| format!("{e}")); + return Err(format!("expected frame to be accepted, got error: {detail}").into()); + } + Ok(()) + } + + pub fn assert_completed(&self) -> TestResult { + if !self.last_completed { + return Err("expected message to complete".into()); + } + Ok(()) + } + + pub fn assert_error_kind(&self, expected: &str) -> TestResult { + let err = self + .last_error + .as_ref() + .ok_or("expected an error, but none was captured")?; + let actual = match err { + MessageAssemblyError::ConnectionBudgetExceeded { .. } => "ConnectionBudgetExceeded", + MessageAssemblyError::InFlightBudgetExceeded { .. } => "InFlightBudgetExceeded", + MessageAssemblyError::MessageTooLarge { .. } => "MessageTooLarge", + MessageAssemblyError::DuplicateFirstFrame { .. } => "DuplicateFirstFrame", + MessageAssemblyError::Series(_) => "Series", + }; + if actual != expected { + return Err(format!("expected error kind '{expected}', got '{actual}'").into()); + } + Ok(()) + } + + pub fn assert_total_buffered_bytes(&self, expected: usize) -> TestResult { + let actual = self.state.total_buffered_bytes(); + if actual != expected { + return Err(format!("expected total_buffered_bytes={expected}, got {actual}").into()); + } + Ok(()) + } + + pub fn assert_buffered_count(&self, expected: usize) -> TestResult { + let actual = self.state.buffered_count(); + if actual != expected { + return Err(format!("expected buffered_count={expected}, got {actual}").into()); + } + Ok(()) + } +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index b05281d3..7344370c 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -3,6 +3,7 @@ //! Each world from the former Cucumber tests is converted to an rstest fixture //! here. +pub mod budget_enforcement; pub mod client_lifecycle; pub mod client_messaging; pub mod client_preamble; diff --git a/tests/scenarios/budget_enforcement_scenarios.rs b/tests/scenarios/budget_enforcement_scenarios.rs new file mode 100644 index 00000000..7b457a27 --- /dev/null +++ b/tests/scenarios/budget_enforcement_scenarios.rs @@ -0,0 +1,57 @@ +//! Scenario test functions for budget enforcement (8.3.2). + +use rstest_bdd_macros::scenario; + +use crate::fixtures::budget_enforcement::*; + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Accept frames within all budget limits" +)] +fn accept_frames_within_limits(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reject first frame exceeding connection budget" +)] +fn reject_first_frame_exceeding_connection_budget( + budget_enforcement_world: BudgetEnforcementWorld, +) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reject continuation exceeding in-flight budget" +)] +fn reject_continuation_exceeding_in_flight_budget( + budget_enforcement_world: BudgetEnforcementWorld, +) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reclaim budget headroom after assembly completes" +)] +fn reclaim_headroom_after_completion(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reclaim budget headroom after timeout purge" +)] +fn reclaim_headroom_after_purge(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Single-frame message bypasses aggregate budgets" +)] +fn single_frame_bypasses_budgets(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index d866accb..6c52a8c4 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -7,6 +7,7 @@ #[path = "../steps/mod.rs"] pub(crate) mod steps; +mod budget_enforcement_scenarios; mod client_lifecycle_scenarios; mod client_messaging_scenarios; mod client_preamble_scenarios; diff --git a/tests/steps/budget_enforcement_steps.rs b/tests/steps/budget_enforcement_steps.rs new file mode 100644 index 00000000..e1bc1709 --- /dev/null +++ b/tests/steps/budget_enforcement_steps.rs @@ -0,0 +1,156 @@ +//! Step definitions for budget enforcement BDD scenarios (8.3.2). + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::budget_enforcement::{ + BudgetEnforcementWorld, + BudgetedStateConfig, + ContinuationInput, + TestResult, +}; + +// --------------------------------------------------------------------------- +// Given +// --------------------------------------------------------------------------- + +#[given( + "a budgeted assembly state with max message size {max_msg}, timeout {timeout}, connection \ + budget {conn}, and in-flight budget {flight}" +)] +#[expect(clippy::too_many_arguments, reason = "step macro injects world param")] +fn given_budgeted_state( + budget_enforcement_world: &mut BudgetEnforcementWorld, + max_msg: usize, + timeout: u64, + conn: usize, + flight: usize, +) -> TestResult { + budget_enforcement_world.init_budgeted_state(BudgetedStateConfig { + max_message_size: max_msg, + timeout_secs: timeout, + connection_budget: conn, + in_flight_budget: flight, + }) +} + +#[given("the clock is at time zero")] +fn given_clock_at_zero(budget_enforcement_world: &mut BudgetEnforcementWorld) { + budget_enforcement_world.set_epoch(); +} + +// --------------------------------------------------------------------------- +// When +// --------------------------------------------------------------------------- + +#[when("a first frame for key {key} with {body_len} body bytes is accepted")] +fn when_first_frame_accepted( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_first_frame(key, body_len) +} + +#[when("a first frame for key {key} with {body_len} body bytes is rejected")] +fn when_first_frame_rejected( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.reject_first_frame(key, body_len) +} + +#[when("a first frame for key {key} with {body_len} body bytes is accepted at time zero")] +fn when_first_frame_accepted_at_epoch( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_first_frame_at_epoch(key, body_len) +} + +#[when("a continuation for key {key} with sequence {seq} and {body_len} body bytes is rejected")] +fn when_continuation_rejected( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + seq: u32, + body_len: usize, +) -> TestResult { + budget_enforcement_world.reject_continuation(key, seq, body_len) +} + +#[when( + "a final continuation for key {key} with sequence {seq} and {body_len} body bytes completes \ + the message" +)] +fn when_final_continuation_completes( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + seq: u32, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_continuation(ContinuationInput { + key, + sequence: seq, + body_len, + is_last: true, + }); + budget_enforcement_world.assert_completed() +} + +#[when("expired assemblies are purged at {offset_secs} seconds")] +fn when_purged_at_offset( + budget_enforcement_world: &mut BudgetEnforcementWorld, + offset_secs: u64, +) -> TestResult { + budget_enforcement_world.purge_at_offset(offset_secs) +} + +#[when("a single-frame message for key {key} with {body_len} body bytes is accepted")] +fn when_single_frame_accepted( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_single_frame(key, body_len) +} + +// --------------------------------------------------------------------------- +// Then +// --------------------------------------------------------------------------- + +#[then("the frame is accepted without error")] +fn then_frame_accepted(budget_enforcement_world: &mut BudgetEnforcementWorld) -> TestResult { + budget_enforcement_world.assert_accepted() +} + +#[then("the error is \"{error_kind}\"")] +fn then_error_is( + budget_enforcement_world: &mut BudgetEnforcementWorld, + error_kind: String, +) -> TestResult { + budget_enforcement_world.assert_error_kind(&error_kind) +} + +#[then("total buffered bytes is {expected}")] +fn then_total_buffered_bytes( + budget_enforcement_world: &mut BudgetEnforcementWorld, + expected: usize, +) -> TestResult { + budget_enforcement_world.assert_total_buffered_bytes(expected) +} + +#[then("the active assembly count is {expected}")] +fn then_active_assembly_count( + budget_enforcement_world: &mut BudgetEnforcementWorld, + expected: usize, +) -> TestResult { + budget_enforcement_world.assert_buffered_count(expected) +} + +#[then("the single-frame message completes immediately")] +fn then_single_frame_completes( + budget_enforcement_world: &mut BudgetEnforcementWorld, +) -> TestResult { + budget_enforcement_world.assert_completed() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 292badfd..6de12dbd 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -3,6 +3,7 @@ //! Step functions are synchronous and call async world methods via //! `Runtime::new().block_on(...)`. +mod budget_enforcement_steps; mod client_lifecycle_steps; mod client_messaging_steps; mod client_preamble_steps; From 7cc96dbfb8efe943b433119eb0d42a28c53dd3c7 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 22 Feb 2026 21:54:30 +0000 Subject: [PATCH 2/5] refactor(message_assembler): introduce macro to simplify first-frame input creation Encapsulate the construction and validation of FirstFrameHeader and FirstFrameInput into the `create_first_frame_input!` macro. This reduces code duplication and ensures consistent setup of first-frame submission helpers in budget_tests.rs. Co-authored-by: devboxerhub[bot] --- src/message_assembler/budget_tests.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/message_assembler/budget_tests.rs b/src/message_assembler/budget_tests.rs index 71f71827..8c916f98 100644 --- a/src/message_assembler/budget_tests.rs +++ b/src/message_assembler/budget_tests.rs @@ -23,6 +23,21 @@ use crate::message_assembler::{ /// Non-zero shorthand. fn nz(val: usize) -> NonZeroUsize { NonZeroUsize::new(val).expect("non-zero") } +/// Build a [`FirstFrameHeader`] and [`FirstFrameInput`] in the caller's +/// scope from a key, body slice, and finality flag. +/// +/// Encapsulates the header construction and input validation common to all +/// first-frame submission helpers. The macro expands to two `let` bindings +/// (`$hdr` for the header, `$inp` for the input) so the header lives long +/// enough for the input's borrow. +macro_rules! create_first_frame_input { + ($hdr:ident, $inp:ident, $key:expr, $body:expr, $is_last:expr) => { + let $hdr = first_header($key, $body.len(), $is_last); + let $inp = + FirstFrameInput::new(&$hdr, EnvelopeRouting::default(), vec![], $body).expect("valid"); + }; +} + /// Submit a first frame with `body_len` bytes of body data. fn submit_first( state: &mut MessageAssemblyState, @@ -30,9 +45,7 @@ fn submit_first( body: &[u8], is_last: bool, ) -> Result, MessageAssemblyError> { - let header = first_header(key, body.len(), is_last); - let input = - FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], body).expect("valid"); + create_first_frame_input!(_header, input, key, body, is_last); state.accept_first_frame(input) } @@ -43,9 +56,7 @@ fn submit_first_at( body: &[u8], now: Instant, ) -> Result, MessageAssemblyError> { - let header = first_header(key, body.len(), false); - let input = - FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], body).expect("valid"); + create_first_frame_input!(_header, input, key, body, false); state.accept_first_frame_at(input, now) } From 577429e156b8acf18614a12a4c786df1cd65eb5b Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Feb 2026 12:04:40 +0000 Subject: [PATCH 3/5] feat(message_assembler): enforce aggregate budgets with comprehensive tests - Implement budget enforcement checks for connection and in-flight aggregate limits. - Add `is_enabled` method on `AggregateBudgets` to optimize budget enforcement gating. - Abort and free partial assemblies on budget violations, surfacing errors. - Add extensive behavioural tests covering budget enforcement scenarios: connection limits, in-flight limits, dual budgets, isolation, headroom reclamation, and single-frame message bypass. - Update documentation to reflect new budget enforcement behavior and terminology. - Refactor existing tests to move enforcement coverage into separate submodule. - Ensure backward compatibility where no budgets mean no enforcement. - Minor fixes to documentation for clarity and style. Co-authored-by: devboxerhub[bot] --- docs/execplans/8-3-2-budget-enforcement.md | 16 +- docs/users-guide.md | 2 +- src/message_assembler/budget.rs | 7 + .../budget_enforcement_tests.rs | 296 ++++++++++++++++++ src/message_assembler/budget_tests.rs | 278 +--------------- src/message_assembler/state.rs | 30 +- tests/fixtures/budget_enforcement.rs | 1 + 7 files changed, 340 insertions(+), 290 deletions(-) create mode 100644 src/message_assembler/budget_enforcement_tests.rs diff --git a/docs/execplans/8-3-2-budget-enforcement.md b/docs/execplans/8-3-2-budget-enforcement.md index 363fef90..2faffaf6 100644 --- a/docs/execplans/8-3-2-budget-enforcement.md +++ b/docs/execplans/8-3-2-budget-enforcement.md @@ -1,8 +1,9 @@ # Implement budget enforcement for message assembly (8.3.2) -This ExecPlan is a living document. The sections `Constraints`, `Tolerances`, -`Risks`, `Progress`, `Surprises & Discoveries`, `Decision Log`, and -`Outcomes & Retrospective` must be kept up to date as work proceeds. +This execution plan (ExecPlan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. Status: COMPLETE @@ -26,7 +27,7 @@ assembly subsystem will actively enforce: When a budget is exceeded the framework aborts the offending assembly, frees its partial buffers, and surfaces `std::io::ErrorKind::InvalidData` through the -existing deserialization-failure policy. No handler is invoked for the aborted +existing deserialisation-failure policy. No handler is invoked for the aborted message. Other in-flight assemblies on the same connection are unaffected. Success is observable when: @@ -34,8 +35,8 @@ Success is observable when: - Unit tests (`rstest`) prove that each budget dimension is enforced, that partial buffers are freed on violation, and that completed or purged assemblies reclaim budget headroom. -- Behavioural tests (`rstest-bdd` v0.5.0) prove the enforcement behaviour in - scenario form. +- Behavioural tests (behaviour-driven development, via the `rstest-bdd` v0.5.0 + crate) prove the enforcement behaviour in scenario form. - `docs/users-guide.md` documents that configured budgets are now enforced. - `docs/roadmap.md` marks `8.3.2` as done after all quality gates pass. - Design decisions are recorded in the relevant design document. @@ -51,7 +52,8 @@ Success is observable when: remains the sole per-message limit when budgets are absent. - When both `max_message_size` (from fragmentation) and `bytes_per_message` (from `MemoryBudgets`) are present, use the minimum of the two so neither - guard is silently overridden (per ADR 0002 guidance on compatible values). + guard is silently overridden (per architecture decision record (ADR) 0002 + guidance on compatible values). - Single-frame messages (`is_last: true` on first frame) are returned immediately and never buffered. They must pass the per-message size check but must not count against connection or in-flight budgets because they are never diff --git a/docs/users-guide.md b/docs/users-guide.md index c3903c1a..1e974f41 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -571,7 +571,7 @@ When budgets are configured, the message assembly subsystem enforces them at frame acceptance time. Frames that would cause the total buffered bytes to exceed the per-connection or in-flight budget are rejected, the offending partial assembly is freed, and the failure is surfaced through the existing -deserialization-failure policy (`InvalidData`). The effective per-message limit +deserialisation-failure policy (`InvalidData`). The effective per-message limit is the minimum of the fragmentation `max_message_size` and the configured `bytes_per_message`. Single-frame messages that complete immediately are never counted against aggregate budgets since they do not buffer. diff --git a/src/message_assembler/budget.rs b/src/message_assembler/budget.rs index eff94abe..0d98e6a2 100644 --- a/src/message_assembler/budget.rs +++ b/src/message_assembler/budget.rs @@ -19,6 +19,13 @@ pub(super) struct AggregateBudgets { pub(super) in_flight: Option, } +impl AggregateBudgets { + /// Returns `true` when at least one aggregate budget limit is configured. + pub(super) const fn is_enabled(&self) -> bool { + self.connection.is_some() || self.in_flight.is_some() + } +} + /// Check whether accepting `additional_bytes` for `key` would exceed /// the connection budget or in-flight budget. /// diff --git a/src/message_assembler/budget_enforcement_tests.rs b/src/message_assembler/budget_enforcement_tests.rs new file mode 100644 index 00000000..5059db5d --- /dev/null +++ b/src/message_assembler/budget_enforcement_tests.rs @@ -0,0 +1,296 @@ +//! Budget enforcement tests: connection, in-flight, dual, isolation, +//! headroom reclamation, and single-frame bypass. + +use std::time::{Duration, Instant}; + +use rstest::rstest; + +use super::{ + connection_budgeted_state, + dual_budgeted_state, + in_flight_budgeted_state, + nz, + submit_first, + submit_first_at, + unbounded_state, +}; +use crate::message_assembler::{ + MessageAssemblyError, + MessageAssemblyState, + MessageKey, + test_helpers::continuation_header, +}; + +// ============================================================================= +// Connection budget enforcement +// ============================================================================= + +#[rstest] +fn connection_budget_allows_frames_within_limit( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // 10 bytes fits within 20-byte budget + submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); + assert_eq!(state.total_buffered_bytes(), 10); +} + +#[rstest] +fn connection_budget_rejects_first_frame_exceeding_limit( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // 21 bytes exceeds 20-byte budget + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { + key: MessageKey(1), + attempted: 21, + .. + } + ), + "unexpected error: {err:?}" + ); + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +fn connection_budget_rejects_continuation_exceeding_limit( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // First frame: 10 bytes (within 20-byte budget) + submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); + + // Continuation: 11 bytes would bring total to 21, exceeding 20 + let cont = continuation_header(1, 1, 11, true); + let err = state + .accept_continuation_frame(&cont, &[0u8; 11]) + .expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "unexpected error: {err:?}" + ); + // Partial assembly should be freed on budget violation + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +fn connection_budget_frees_partial_on_violation( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Start two assemblies: 8 + 8 = 16 (within 20) + submit_first(&mut state, 1, &[0u8; 8], false).expect("first"); + submit_first(&mut state, 2, &[0u8; 8], false).expect("second"); + assert_eq!(state.buffered_count(), 2); + + // Continuation on key 1: 5 bytes would bring total to 21 + let cont = continuation_header(1, 1, 5, false); + let err = state + .accept_continuation_frame(&cont, &[0u8; 5]) + .expect_err("should reject"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Key 1 is freed; key 2 survives + assert_eq!(state.buffered_count(), 1); + assert_eq!(state.total_buffered_bytes(), 8); +} + +// ============================================================================= +// In-flight budget enforcement +// ============================================================================= + +#[rstest] +fn in_flight_budget_allows_frames_within_limit( + #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); + assert_eq!(state.total_buffered_bytes(), 10); +} + +#[rstest] +fn in_flight_budget_rejects_first_frame_exceeding_limit( + #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, +) { + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!( + err, + MessageAssemblyError::InFlightBudgetExceeded { + key: MessageKey(1), + attempted: 21, + .. + } + ), + "unexpected error: {err:?}" + ); + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +fn in_flight_budget_rejects_continuation_exceeding_limit( + #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); + + let cont = continuation_header(1, 1, 11, true); + let err = state + .accept_continuation_frame(&cont, &[0u8; 11]) + .expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "unexpected error: {err:?}" + ); + assert_eq!(state.buffered_count(), 0); +} + +// ============================================================================= +// Dual budget: tighter budget triggers first +// ============================================================================= + +#[rstest] +fn dual_budget_in_flight_triggers_before_connection( + #[from(dual_budgeted_state)] mut state: MessageAssemblyState, +) { + // In-flight budget is 20, connection budget is 30. + // 21 bytes should trigger in-flight first. + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "expected in-flight to trigger first, got: {err:?}" + ); +} + +#[test] +fn dual_budget_connection_triggers_when_in_flight_not_exceeded() { + // connection=15 < in_flight=20, so connection triggers first. + let mut state = MessageAssemblyState::with_budgets( + nz(1024), + Duration::from_secs(30), + Some(nz(15)), + Some(nz(20)), + ); + let err = submit_first(&mut state, 1, &[0u8; 16], false).expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "expected connection to trigger first, got: {err:?}" + ); +} + +// ============================================================================= +// Backward compatibility: no budgets = no enforcement +// ============================================================================= + +#[rstest] +fn no_budgets_allows_large_frames(#[from(unbounded_state)] mut state: MessageAssemblyState) { + // 500 bytes is well within the 1024 per-message limit and has no aggregate cap. + submit_first(&mut state, 1, &[0u8; 500], false).expect("no budget enforcement"); + submit_first(&mut state, 2, &[0u8; 500], false).expect("no budget enforcement"); + assert_eq!(state.total_buffered_bytes(), 1000); +} + +// ============================================================================= +// Budget isolation: violation for one key does not affect others +// ============================================================================= + +#[rstest] +fn budget_violation_does_not_affect_other_assemblies( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Start key 1 with 10 bytes + submit_first(&mut state, 1, &[0u8; 10], false).expect("first"); + assert_eq!(state.buffered_count(), 1); + + // Try key 2 with 11 bytes — total would be 21, exceeding 20-byte budget + let err = submit_first(&mut state, 2, &[0u8; 11], false).expect_err("should reject"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Key 1 is still intact + assert_eq!(state.buffered_count(), 1); + assert_eq!(state.total_buffered_bytes(), 10); + + // Key 1 can still be completed + let cont = continuation_header(1, 1, 3, true); + let msg = state + .accept_continuation_frame(&cont, b"fin") + .expect("cont") + .expect("complete"); + assert_eq!(msg.body(), &[&[0u8; 10][..], b"fin"].concat()); +} + +// ============================================================================= +// Headroom reclamation after purge +// ============================================================================= + +#[test] +fn headroom_reclaimed_after_purge_allows_new_assembly() { + let mut state = + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None); + + let now = Instant::now(); + submit_first_at(&mut state, 1, &[0u8; 15], now).expect("first"); + assert_eq!(state.total_buffered_bytes(), 15); + + // A 6-byte frame would exceed budget (15 + 6 = 21 > 20) + let err = submit_first(&mut state, 2, &[0u8; 6], false).expect_err("over budget"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Purge key 1 via timeout + let future = now + Duration::from_secs(31); + state.purge_expired_at(future); + assert_eq!(state.total_buffered_bytes(), 0); + + // Now 6 bytes is fine + submit_first(&mut state, 3, &[0u8; 6], false).expect("within reclaimed budget"); + assert_eq!(state.total_buffered_bytes(), 6); +} + +#[test] +fn headroom_reclaimed_after_completion_allows_new_frame() { + let mut state = + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20))); + + submit_first(&mut state, 1, &[0u8; 15], false).expect("first"); + + // Complete key 1 + let cont = continuation_header(1, 1, 3, true); + state + .accept_continuation_frame(&cont, b"end") + .expect("cont") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 0); + + // Now we have full headroom again + submit_first(&mut state, 2, &[0u8; 20], false).expect("within reclaimed budget"); + assert_eq!(state.total_buffered_bytes(), 20); +} + +// ============================================================================= +// Single-frame messages bypass aggregate budgets +// ============================================================================= + +#[rstest] +fn single_frame_message_not_subject_to_aggregate_budgets( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Fill up to the budget edge + submit_first(&mut state, 1, &[0u8; 19], false).expect("first"); + assert_eq!(state.total_buffered_bytes(), 19); + + // A single-frame message should still succeed because it never buffers + let msg = submit_first(&mut state, 2, b"big-single-frame-payload", true) + .expect("single frame accepted") + .expect("should complete immediately"); + assert_eq!(msg.body(), b"big-single-frame-payload"); + + // Buffered bytes unchanged (single-frame was never buffered) + assert_eq!(state.total_buffered_bytes(), 19); +} diff --git a/src/message_assembler/budget_tests.rs b/src/message_assembler/budget_tests.rs index 8c916f98..90ae65eb 100644 --- a/src/message_assembler/budget_tests.rs +++ b/src/message_assembler/budget_tests.rs @@ -1,4 +1,8 @@ //! Unit tests for aggregate budget enforcement (8.3.2). +//! +//! Helpers, fixtures, and `total_buffered_bytes` accounting tests live here. +//! Enforcement tests (connection, in-flight, dual, isolation, headroom, +//! single-frame bypass) are in the `enforcement` sub-module. use std::{ num::NonZeroUsize, @@ -12,7 +16,6 @@ use crate::message_assembler::{ FirstFrameInput, MessageAssemblyError, MessageAssemblyState, - MessageKey, test_helpers::{continuation_header, first_header}, }; @@ -161,275 +164,8 @@ fn single_frame_message_not_counted_in_buffered_bytes( } // ============================================================================= -// Connection budget enforcement +// Budget enforcement tests — see budget_enforcement_tests.rs // ============================================================================= -#[rstest] -fn connection_budget_allows_frames_within_limit( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { - // 10 bytes fits within 20-byte budget - submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); - assert_eq!(state.total_buffered_bytes(), 10); -} - -#[rstest] -fn connection_budget_rejects_first_frame_exceeding_limit( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { - // 21 bytes exceeds 20-byte budget - let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); - assert!( - matches!( - err, - MessageAssemblyError::ConnectionBudgetExceeded { - key: MessageKey(1), - attempted: 21, - .. - } - ), - "unexpected error: {err:?}" - ); - assert_eq!(state.buffered_count(), 0); -} - -#[rstest] -fn connection_budget_rejects_continuation_exceeding_limit( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { - // First frame: 10 bytes (within 20-byte budget) - submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); - - // Continuation: 11 bytes would bring total to 21, exceeding 20 - let cont = continuation_header(1, 1, 11, true); - let err = state - .accept_continuation_frame(&cont, &[0u8; 11]) - .expect_err("should reject"); - assert!( - matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), - "unexpected error: {err:?}" - ); - // Partial assembly should be freed on budget violation - assert_eq!(state.buffered_count(), 0); -} - -#[rstest] -fn connection_budget_frees_partial_on_violation( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { - // Start two assemblies: 8 + 8 = 16 (within 20) - submit_first(&mut state, 1, &[0u8; 8], false).expect("first"); - submit_first(&mut state, 2, &[0u8; 8], false).expect("second"); - assert_eq!(state.buffered_count(), 2); - - // Continuation on key 1: 5 bytes would bring total to 21 - let cont = continuation_header(1, 1, 5, false); - let err = state - .accept_continuation_frame(&cont, &[0u8; 5]) - .expect_err("should reject"); - assert!(matches!( - err, - MessageAssemblyError::ConnectionBudgetExceeded { .. } - )); - - // Key 1 is freed; key 2 survives - assert_eq!(state.buffered_count(), 1); - assert_eq!(state.total_buffered_bytes(), 8); -} - -// ============================================================================= -// In-flight budget enforcement -// ============================================================================= - -#[rstest] -fn in_flight_budget_allows_frames_within_limit( - #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, -) { - submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); - assert_eq!(state.total_buffered_bytes(), 10); -} - -#[rstest] -fn in_flight_budget_rejects_first_frame_exceeding_limit( - #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, -) { - let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); - assert!( - matches!( - err, - MessageAssemblyError::InFlightBudgetExceeded { - key: MessageKey(1), - attempted: 21, - .. - } - ), - "unexpected error: {err:?}" - ); - assert_eq!(state.buffered_count(), 0); -} - -#[rstest] -fn in_flight_budget_rejects_continuation_exceeding_limit( - #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, -) { - submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); - - let cont = continuation_header(1, 1, 11, true); - let err = state - .accept_continuation_frame(&cont, &[0u8; 11]) - .expect_err("should reject"); - assert!( - matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), - "unexpected error: {err:?}" - ); - assert_eq!(state.buffered_count(), 0); -} - -// ============================================================================= -// Dual budget: tighter budget triggers first -// ============================================================================= - -#[rstest] -fn dual_budget_in_flight_triggers_before_connection( - #[from(dual_budgeted_state)] mut state: MessageAssemblyState, -) { - // In-flight budget is 20, connection budget is 30. - // 21 bytes should trigger in-flight first. - let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); - assert!( - matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), - "expected in-flight to trigger first, got: {err:?}" - ); -} - -#[test] -fn dual_budget_connection_triggers_when_in_flight_not_exceeded() { - // connection=15 < in_flight=20, so connection triggers first. - let mut state = MessageAssemblyState::with_budgets( - nz(1024), - Duration::from_secs(30), - Some(nz(15)), - Some(nz(20)), - ); - let err = submit_first(&mut state, 1, &[0u8; 16], false).expect_err("should reject"); - assert!( - matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), - "expected connection to trigger first, got: {err:?}" - ); -} - -// ============================================================================= -// Backward compatibility: no budgets = no enforcement -// ============================================================================= - -#[rstest] -fn no_budgets_allows_large_frames(#[from(unbounded_state)] mut state: MessageAssemblyState) { - // 500 bytes is well within the 1024 per-message limit and has no aggregate cap. - submit_first(&mut state, 1, &[0u8; 500], false).expect("no budget enforcement"); - submit_first(&mut state, 2, &[0u8; 500], false).expect("no budget enforcement"); - assert_eq!(state.total_buffered_bytes(), 1000); -} - -// ============================================================================= -// Budget isolation: violation for one key does not affect others -// ============================================================================= - -#[rstest] -fn budget_violation_does_not_affect_other_assemblies( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { - // Start key 1 with 10 bytes - submit_first(&mut state, 1, &[0u8; 10], false).expect("first"); - assert_eq!(state.buffered_count(), 1); - - // Try key 2 with 11 bytes — total would be 21, exceeding 20-byte budget - let err = submit_first(&mut state, 2, &[0u8; 11], false).expect_err("should reject"); - assert!(matches!( - err, - MessageAssemblyError::ConnectionBudgetExceeded { .. } - )); - - // Key 1 is still intact - assert_eq!(state.buffered_count(), 1); - assert_eq!(state.total_buffered_bytes(), 10); - - // Key 1 can still be completed - let cont = continuation_header(1, 1, 3, true); - let msg = state - .accept_continuation_frame(&cont, b"fin") - .expect("cont") - .expect("complete"); - assert_eq!(msg.body(), &[&[0u8; 10][..], b"fin"].concat()); -} - -// ============================================================================= -// Headroom reclamation after purge -// ============================================================================= - -#[test] -fn headroom_reclaimed_after_purge_allows_new_assembly() { - let mut state = - MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None); - - let now = Instant::now(); - submit_first_at(&mut state, 1, &[0u8; 15], now).expect("first"); - assert_eq!(state.total_buffered_bytes(), 15); - - // A 6-byte frame would exceed budget (15 + 6 = 21 > 20) - let err = submit_first(&mut state, 2, &[0u8; 6], false).expect_err("over budget"); - assert!(matches!( - err, - MessageAssemblyError::ConnectionBudgetExceeded { .. } - )); - - // Purge key 1 via timeout - let future = now + Duration::from_secs(31); - state.purge_expired_at(future); - assert_eq!(state.total_buffered_bytes(), 0); - - // Now 6 bytes is fine - submit_first(&mut state, 3, &[0u8; 6], false).expect("within reclaimed budget"); - assert_eq!(state.total_buffered_bytes(), 6); -} - -#[test] -fn headroom_reclaimed_after_completion_allows_new_frame() { - let mut state = - MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20))); - - submit_first(&mut state, 1, &[0u8; 15], false).expect("first"); - - // Complete key 1 - let cont = continuation_header(1, 1, 3, true); - state - .accept_continuation_frame(&cont, b"end") - .expect("cont") - .expect("complete"); - assert_eq!(state.total_buffered_bytes(), 0); - - // Now we have full headroom again - submit_first(&mut state, 2, &[0u8; 20], false).expect("within reclaimed budget"); - assert_eq!(state.total_buffered_bytes(), 20); -} - -// ============================================================================= -// Single-frame messages bypass aggregate budgets -// ============================================================================= - -#[rstest] -fn single_frame_message_not_subject_to_aggregate_budgets( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { - // Fill up to the budget edge - submit_first(&mut state, 1, &[0u8; 19], false).expect("first"); - assert_eq!(state.total_buffered_bytes(), 19); - - // A single-frame message should still succeed because it never buffers - let msg = submit_first(&mut state, 2, b"big-single-frame-payload", true) - .expect("single frame accepted") - .expect("should complete immediately"); - assert_eq!(msg.body(), b"big-single-frame-payload"); - - // Buffered bytes unchanged (single-frame was never buffered) - assert_eq!(state.total_buffered_bytes(), 19); -} +#[path = "budget_enforcement_tests.rs"] +mod enforcement; diff --git a/src/message_assembler/state.rs b/src/message_assembler/state.rs index 03310ea0..a39a129a 100644 --- a/src/message_assembler/state.rs +++ b/src/message_assembler/state.rs @@ -217,15 +217,18 @@ impl MessageAssemblyState { ))); } - // Check aggregate budgets before buffering (single-frame messages - // are returned above and never counted against aggregate budgets). - let incoming_bytes = input.body.len().saturating_add(input.metadata.len()); - check_aggregate_budgets( - key, - self.total_buffered_bytes(), - incoming_bytes, - &self.budgets, - )?; + // Check aggregate budgets before buffering. Single-frame messages + // are returned above and never counted. The O(n) scan is gated on + // is_enabled() so the hot path stays O(1) when budgets are absent. + if self.budgets.is_enabled() { + let incoming_bytes = input.body.len().saturating_add(input.metadata.len()); + check_aggregate_budgets( + key, + self.total_buffered_bytes(), + incoming_bytes, + &self.budgets, + )?; + } // Start new assembly, preserving envelope routing metadata from the // first frame so the completed message is dispatched correctly. @@ -301,10 +304,15 @@ impl MessageAssemblyState { )); } - // Snapshot budget state before the mutable entry borrow. + // Snapshot budget state before the mutable entry borrow. The O(n) + // scan is gated on is_enabled() so it's skipped when budgets are absent. let max_message_size = self.max_message_size; let budgets = self.budgets; - let buffered_total = self.total_buffered_bytes(); + let buffered_total = if budgets.is_enabled() { + self.total_buffered_bytes() + } else { + 0 + }; let Entry::Occupied(mut entry) = self.assemblies.entry(key) else { return Err(MessageAssemblyError::Series( diff --git a/tests/fixtures/budget_enforcement.rs b/tests/fixtures/budget_enforcement.rs index 2bad83ce..f94af4d2 100644 --- a/tests/fixtures/budget_enforcement.rs +++ b/tests/fixtures/budget_enforcement.rs @@ -108,6 +108,7 @@ impl BudgetEnforcementWorld { } Err(e) => { self.last_accepted = false; + self.last_completed = false; self.last_error = Some(e); } } From a26373f0f77357f686d7abdedbfc19b4e71456b1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Feb 2026 17:08:39 +0000 Subject: [PATCH 4/5] test(fixtures): improve default budget limit to yield clearer errors Changed the default budget limit in BudgetEnforcementWorld to 64 KB from the minimum possible value. This more forgiving default allows test scenarios that forget to initialize budgeted state to fail with a clear budget error instead of a confusing MessageTooLarge error on multi-byte bodies. Co-authored-by: devboxerhub[bot] --- tests/fixtures/budget_enforcement.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/fixtures/budget_enforcement.rs b/tests/fixtures/budget_enforcement.rs index f94af4d2..b8b4792c 100644 --- a/tests/fixtures/budget_enforcement.rs +++ b/tests/fixtures/budget_enforcement.rs @@ -48,7 +48,10 @@ pub struct BudgetEnforcementWorld { impl Default for BudgetEnforcementWorld { fn default() -> Self { - let max = NonZeroUsize::MIN; + // Use a forgiving default so that scenarios which forget to call + // init_budgeted_state fail with a clear budget error rather than a + // confusing MessageTooLarge on any multi-byte body. + let max = NonZeroUsize::new(64 * 1024).unwrap_or(NonZeroUsize::MIN); Self { state: MessageAssemblyState::new(max, Duration::from_secs(30)), last_error: None, From 001fd2256dfd3c53c8d88c27ffc1b8c59c9fa635 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Feb 2026 17:36:01 +0000 Subject: [PATCH 5/5] test(message_assembler,budget_enforcement): parametrise tests for connection and in-flight budgets Refactor budget enforcement tests to use parameterized rstest cases covering both connection and in-flight budget dimensions. Introduce helper functions for constructing states and asserting budget exceeded errors uniformly. This reduces duplicated code and improves test coverage. Additional cleanup in test comments and feature steps to parse budget config strings. Co-authored-by: devboxerhub[bot] --- ...ng-requests-and-shared-message-assembly.md | 2 +- docs/execplans/8-3-2-budget-enforcement.md | 8 +- docs/users-guide.md | 4 +- .../budget_enforcement_tests.rs | 161 +++++++++--------- tests/features/budget_enforcement.feature | 2 +- tests/fixtures/budget_enforcement.rs | 26 +++ tests/steps/budget_enforcement_steps.rs | 18 +- 7 files changed, 122 insertions(+), 99 deletions(-) diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 44878658..9633c886 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -308,7 +308,7 @@ Precedence is: limit as `min(max_message_size, bytes_per_message)`. - Budget checks are applied after single-frame early return (single-frame messages are never buffered and skip aggregate checks). -- On budget violation the offending partial assembly is freed and the error +- On budget violation the offending partial assembly is freed, and the error is surfaced as `ConnectionBudgetExceeded` or `InFlightBudgetExceeded`, both routed through the existing `DeserFailureTracker` as `InvalidData`. - Budget enforcement helpers are extracted to `src/message_assembler/budget.rs` diff --git a/docs/execplans/8-3-2-budget-enforcement.md b/docs/execplans/8-3-2-budget-enforcement.md index 2faffaf6..f67b530c 100644 --- a/docs/execplans/8-3-2-budget-enforcement.md +++ b/docs/execplans/8-3-2-budget-enforcement.md @@ -16,8 +16,8 @@ dimensions introduced by `8.3.1` (configuration-only). After this change, when a library consumer configures `MemoryBudgets` on `WireframeApp`, the message assembly subsystem will actively enforce: -- **bytes per message** — no single assembled message may exceed the configured - cap; +- **bytes per message** — no single assembled message may exceed the + configured cap; - **bytes per connection** — the sum of all bytes buffered across every in-flight assembly on one connection may not exceed the configured cap; and - **bytes in flight** — a second aggregate guard on buffered assembly bytes, @@ -27,7 +27,7 @@ assembly subsystem will actively enforce: When a budget is exceeded the framework aborts the offending assembly, frees its partial buffers, and surfaces `std::io::ErrorKind::InvalidData` through the -existing deserialisation-failure policy. No handler is invoked for the aborted +existing deserialization-failure policy. No handler is invoked for the aborted message. Other in-flight assemblies on the same connection are unaffected. Success is observable when: @@ -92,7 +92,7 @@ Success is observable when: - Risk: `total_buffered_bytes()` is O(n) over all in-flight assemblies per accepted frame. Severity: low. Likelihood: low. Mitigation: acceptable for 8.3.2 because concurrent assemblies per connection are typically in single - digits. A cached running total can be added in a future optimisation pass if + digits. A cached running total can be added in a future optimization pass if profiling warrants it. - Risk: the conceptual overlap between `bytes_per_connection` and diff --git a/docs/users-guide.md b/docs/users-guide.md index 1e974f41..d4dd209e 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -571,10 +571,10 @@ When budgets are configured, the message assembly subsystem enforces them at frame acceptance time. Frames that would cause the total buffered bytes to exceed the per-connection or in-flight budget are rejected, the offending partial assembly is freed, and the failure is surfaced through the existing -deserialisation-failure policy (`InvalidData`). The effective per-message limit +deserialization-failure policy (`InvalidData`). The effective per-message limit is the minimum of the fragmentation `max_message_size` and the configured `bytes_per_message`. Single-frame messages that complete immediately are never -counted against aggregate budgets since they do not buffer. +counted against aggregate budgets, since they do not buffer. #### Message key multiplexing (8.2.3) diff --git a/src/message_assembler/budget_enforcement_tests.rs b/src/message_assembler/budget_enforcement_tests.rs index 5059db5d..f04a785c 100644 --- a/src/message_assembler/budget_enforcement_tests.rs +++ b/src/message_assembler/budget_enforcement_tests.rs @@ -22,42 +22,95 @@ use crate::message_assembler::{ }; // ============================================================================= -// Connection budget enforcement +// Helpers for parameterised budget-dimension tests +// ============================================================================= + +/// Build a `MessageAssemblyState` with only the named budget dimension +/// set to 20 bytes. +fn state_for_dimension(dimension: &str) -> MessageAssemblyState { + match dimension { + "connection" => connection_budgeted_state(), + "in_flight" => in_flight_budgeted_state(), + _ => panic!("unknown budget dimension: {dimension}"), + } +} + +/// Assert that `err` is the expected budget-exceeded variant for +/// `dimension`, matching the given `key` and `attempted` values. +fn assert_budget_exceeded(err: &MessageAssemblyError, dimension: &str, key: u64, attempted: usize) { + match dimension { + "connection" => assert!( + matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { + key: k, + attempted: a, + .. + } if k == &MessageKey(key) && *a == attempted + ), + "expected ConnectionBudgetExceeded(key={key}, attempted={attempted}), got: {err:?}" + ), + "in_flight" => assert!( + matches!( + err, + MessageAssemblyError::InFlightBudgetExceeded { + key: k, + attempted: a, + .. + } if k == &MessageKey(key) && *a == attempted + ), + "expected InFlightBudgetExceeded(key={key}, attempted={attempted}), got: {err:?}" + ), + _ => panic!("unknown budget dimension: {dimension}"), + } +} + +/// Assert that `err` matches the budget-exceeded variant for `dimension` +/// without inspecting field values. +fn assert_budget_exceeded_any(err: &MessageAssemblyError, dimension: &str) { + match dimension { + "connection" => assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "expected ConnectionBudgetExceeded, got: {err:?}" + ), + "in_flight" => assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "expected InFlightBudgetExceeded, got: {err:?}" + ), + _ => panic!("unknown budget dimension: {dimension}"), + } +} + +// ============================================================================= +// Parameterised budget enforcement (connection + in-flight) // ============================================================================= #[rstest] -fn connection_budget_allows_frames_within_limit( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { +#[case::connection("connection")] +#[case::in_flight("in_flight")] +fn budget_allows_frames_within_limit(#[case] dimension: &str) { + let mut state = state_for_dimension(dimension); // 10 bytes fits within 20-byte budget submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); assert_eq!(state.total_buffered_bytes(), 10); } #[rstest] -fn connection_budget_rejects_first_frame_exceeding_limit( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { +#[case::connection("connection")] +#[case::in_flight("in_flight")] +fn budget_rejects_first_frame_exceeding_limit(#[case] dimension: &str) { + let mut state = state_for_dimension(dimension); // 21 bytes exceeds 20-byte budget let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); - assert!( - matches!( - err, - MessageAssemblyError::ConnectionBudgetExceeded { - key: MessageKey(1), - attempted: 21, - .. - } - ), - "unexpected error: {err:?}" - ); + assert_budget_exceeded(&err, dimension, 1, 21); assert_eq!(state.buffered_count(), 0); } #[rstest] -fn connection_budget_rejects_continuation_exceeding_limit( - #[from(connection_budgeted_state)] mut state: MessageAssemblyState, -) { +#[case::connection("connection")] +#[case::in_flight("in_flight")] +fn budget_rejects_continuation_exceeding_limit(#[case] dimension: &str) { + let mut state = state_for_dimension(dimension); // First frame: 10 bytes (within 20-byte budget) submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); @@ -66,14 +119,15 @@ fn connection_budget_rejects_continuation_exceeding_limit( let err = state .accept_continuation_frame(&cont, &[0u8; 11]) .expect_err("should reject"); - assert!( - matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), - "unexpected error: {err:?}" - ); + assert_budget_exceeded_any(&err, dimension); // Partial assembly should be freed on budget violation assert_eq!(state.buffered_count(), 0); } +// ============================================================================= +// Connection-specific: partial assembly freed on violation +// ============================================================================= + #[rstest] fn connection_budget_frees_partial_on_violation( #[from(connection_budgeted_state)] mut state: MessageAssemblyState, @@ -98,54 +152,6 @@ fn connection_budget_frees_partial_on_violation( assert_eq!(state.total_buffered_bytes(), 8); } -// ============================================================================= -// In-flight budget enforcement -// ============================================================================= - -#[rstest] -fn in_flight_budget_allows_frames_within_limit( - #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, -) { - submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); - assert_eq!(state.total_buffered_bytes(), 10); -} - -#[rstest] -fn in_flight_budget_rejects_first_frame_exceeding_limit( - #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, -) { - let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); - assert!( - matches!( - err, - MessageAssemblyError::InFlightBudgetExceeded { - key: MessageKey(1), - attempted: 21, - .. - } - ), - "unexpected error: {err:?}" - ); - assert_eq!(state.buffered_count(), 0); -} - -#[rstest] -fn in_flight_budget_rejects_continuation_exceeding_limit( - #[from(in_flight_budgeted_state)] mut state: MessageAssemblyState, -) { - submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); - - let cont = continuation_header(1, 1, 11, true); - let err = state - .accept_continuation_frame(&cont, &[0u8; 11]) - .expect_err("should reject"); - assert!( - matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), - "unexpected error: {err:?}" - ); - assert_eq!(state.buffered_count(), 0); -} - // ============================================================================= // Dual budget: tighter budget triggers first // ============================================================================= @@ -185,7 +191,8 @@ fn dual_budget_connection_triggers_when_in_flight_not_exceeded() { #[rstest] fn no_budgets_allows_large_frames(#[from(unbounded_state)] mut state: MessageAssemblyState) { - // 500 bytes is well within the 1024 per-message limit and has no aggregate cap. + // 500 bytes is well within the 1024 per-message limit and has no + // aggregate cap. submit_first(&mut state, 1, &[0u8; 500], false).expect("no budget enforcement"); submit_first(&mut state, 2, &[0u8; 500], false).expect("no budget enforcement"); assert_eq!(state.total_buffered_bytes(), 1000); @@ -203,7 +210,8 @@ fn budget_violation_does_not_affect_other_assemblies( submit_first(&mut state, 1, &[0u8; 10], false).expect("first"); assert_eq!(state.buffered_count(), 1); - // Try key 2 with 11 bytes — total would be 21, exceeding 20-byte budget + // Try key 2 with 11 bytes — total would be 21, exceeding 20-byte + // budget let err = submit_first(&mut state, 2, &[0u8; 11], false).expect_err("should reject"); assert!(matches!( err, @@ -285,7 +293,8 @@ fn single_frame_message_not_subject_to_aggregate_budgets( submit_first(&mut state, 1, &[0u8; 19], false).expect("first"); assert_eq!(state.total_buffered_bytes(), 19); - // A single-frame message should still succeed because it never buffers + // A single-frame message should still succeed because it never + // buffers let msg = submit_first(&mut state, 2, b"big-single-frame-payload", true) .expect("single frame accepted") .expect("should complete immediately"); diff --git a/tests/features/budget_enforcement.feature b/tests/features/budget_enforcement.feature index 94b8a3d9..7ea6e517 100644 --- a/tests/features/budget_enforcement.feature +++ b/tests/features/budget_enforcement.feature @@ -5,7 +5,7 @@ Feature: Memory budget enforcement during message assembly three budget dimensions — per-message, per-connection, and in-flight. Background: - Given a budgeted assembly state with max message size 1024, timeout 30, connection budget 50, and in-flight budget 40 + Given a budgeted assembly state configured as 1024/30/50/40 Scenario: Accept frames within all budget limits When a first frame for key 1 with 10 body bytes is accepted diff --git a/tests/fixtures/budget_enforcement.rs b/tests/fixtures/budget_enforcement.rs index b8b4792c..9927c203 100644 --- a/tests/fixtures/budget_enforcement.rs +++ b/tests/fixtures/budget_enforcement.rs @@ -2,6 +2,7 @@ use std::{ num::NonZeroUsize, + str::FromStr, time::{Duration, Instant}, }; @@ -19,6 +20,9 @@ use wireframe::message_assembler::{ pub use wireframe_testing::TestResult; /// Configuration bundle for `init_budgeted_state`. +/// +/// Parsed from the Gherkin step text as +/// `"max_message_size / timeout_secs / connection_budget / in_flight_budget"`. #[derive(Clone, Copy)] pub struct BudgetedStateConfig { pub max_message_size: usize, @@ -27,6 +31,28 @@ pub struct BudgetedStateConfig { pub in_flight_budget: usize, } +impl FromStr for BudgetedStateConfig { + type Err = String; + + fn from_str(s: &str) -> Result { + let mut parts = s.split('/').map(str::trim); + let msg = parts.next().ok_or("missing max_message_size")?; + let timeout = parts.next().ok_or("missing timeout_secs")?; + let conn = parts.next().ok_or("missing connection_budget")?; + let flight = parts.next().ok_or("missing in_flight_budget")?; + Ok(Self { + max_message_size: msg.parse().map_err(|e| format!("max_message_size: {e}"))?, + timeout_secs: timeout.parse().map_err(|e| format!("timeout_secs: {e}"))?, + connection_budget: conn + .parse() + .map_err(|e| format!("connection_budget: {e}"))?, + in_flight_budget: flight + .parse() + .map_err(|e| format!("in_flight_budget: {e}"))?, + }) + } +} + /// Continuation frame descriptor. #[derive(Clone, Copy)] pub struct ContinuationInput { diff --git a/tests/steps/budget_enforcement_steps.rs b/tests/steps/budget_enforcement_steps.rs index e1bc1709..ff732816 100644 --- a/tests/steps/budget_enforcement_steps.rs +++ b/tests/steps/budget_enforcement_steps.rs @@ -13,24 +13,12 @@ use crate::fixtures::budget_enforcement::{ // Given // --------------------------------------------------------------------------- -#[given( - "a budgeted assembly state with max message size {max_msg}, timeout {timeout}, connection \ - budget {conn}, and in-flight budget {flight}" -)] -#[expect(clippy::too_many_arguments, reason = "step macro injects world param")] +#[given("a budgeted assembly state configured as {config}")] fn given_budgeted_state( budget_enforcement_world: &mut BudgetEnforcementWorld, - max_msg: usize, - timeout: u64, - conn: usize, - flight: usize, + config: BudgetedStateConfig, ) -> TestResult { - budget_enforcement_world.init_budgeted_state(BudgetedStateConfig { - max_message_size: max_msg, - timeout_secs: timeout, - connection_budget: conn, - in_flight_budget: flight, - }) + budget_enforcement_world.init_budgeted_state(config) } #[given("the clock is at time zero")]