diff --git a/docs/adr-002-streaming-requests-and-shared-message-assembly.md b/docs/adr-002-streaming-requests-and-shared-message-assembly.md index 0f3522d0..9633c886 100644 --- a/docs/adr-002-streaming-requests-and-shared-message-assembly.md +++ b/docs/adr-002-streaming-requests-and-shared-message-assembly.md @@ -296,6 +296,25 @@ Precedence is: back-pressure behaviour, and derived defaults remain in follow-on items `8.3.2` through `8.3.5`. +#### Implementation decisions (2026-02-21) + +- Roadmap item `8.3.2` implements hard-cap budget enforcement in + `MessageAssemblyState`. Budget tracking is embedded directly in the assembly + state rather than a separate wrapper, since the state already owns all + assembly lifecycle. +- A new `with_budgets()` constructor accepts optional `connection_budget` + and `in_flight_budget` limits. The integration layer (`assembly.rs`) threads + `MemoryBudgets` from `WireframeApp` and computes the effective per-message + limit as `min(max_message_size, bytes_per_message)`. +- Budget checks are applied after single-frame early return (single-frame + messages are never buffered and skip aggregate checks). +- On budget violation the offending partial assembly is freed, and the error + is surfaced as `ConnectionBudgetExceeded` or `InFlightBudgetExceeded`, both + routed through the existing `DeserFailureTracker` as `InvalidData`. +- Budget enforcement helpers are extracted to `src/message_assembler/budget.rs` + to keep `state.rs` under the 400-line file limit. +- Back-pressure (`8.3.3`) and derived defaults (`8.3.5`) remain future work. + #### Budget enforcement - Budgets MUST cover: bytes buffered per message, bytes buffered per diff --git a/docs/execplans/8-3-2-budget-enforcement.md b/docs/execplans/8-3-2-budget-enforcement.md new file mode 100644 index 00000000..f67b530c --- /dev/null +++ b/docs/execplans/8-3-2-budget-enforcement.md @@ -0,0 +1,616 @@ +# Implement budget enforcement for message assembly (8.3.2) + +This execution plan (ExecPlan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: COMPLETE + +No `PLANS.md` exists in this repository as of 2026-02-21. + +## Purpose / big picture + +Roadmap item `8.3.2` adds runtime enforcement of the three memory budget +dimensions introduced by `8.3.1` (configuration-only). After this change, when +a library consumer configures `MemoryBudgets` on `WireframeApp`, the message +assembly subsystem will actively enforce: + +- **bytes per message** — no single assembled message may exceed the + configured cap; +- **bytes per connection** — the sum of all bytes buffered across every + in-flight assembly on one connection may not exceed the configured cap; and +- **bytes in flight** — a second aggregate guard on buffered assembly bytes, + today equivalent to the per-connection cap but expressed as a separate + dimension so future work (streaming body buffers, transport-layer buffering) + can differentiate them. + +When a budget is exceeded the framework aborts the offending assembly, frees +its partial buffers, and surfaces `std::io::ErrorKind::InvalidData` through the +existing deserialization-failure policy. No handler is invoked for the aborted +message. Other in-flight assemblies on the same connection are unaffected. + +Success is observable when: + +- Unit tests (`rstest`) prove that each budget dimension is enforced, that + partial buffers are freed on violation, and that completed or purged + assemblies reclaim budget headroom. +- Behavioural tests (behaviour-driven development, via the `rstest-bdd` v0.5.0 + crate) prove the enforcement behaviour in scenario form. +- `docs/users-guide.md` documents that configured budgets are now enforced. +- `docs/roadmap.md` marks `8.3.2` as done after all quality gates pass. +- Design decisions are recorded in the relevant design document. + +## Constraints + +- Scope is strictly roadmap item `8.3.2`: enforcement of the three budget + dimensions during message assembly. Soft-limit back-pressure (`8.3.3`), + hard-cap abort-early semantics (`8.3.4`), and derived defaults (`8.3.5`) are + out of scope. +- Preserve current runtime behaviour when `MemoryBudgets` is not configured. + The existing `max_message_size` guard derived from fragmentation config + remains the sole per-message limit when budgets are absent. +- When both `max_message_size` (from fragmentation) and `bytes_per_message` + (from `MemoryBudgets`) are present, use the minimum of the two so neither + guard is silently overridden (per architecture decision record (ADR) 0002 + guidance on compatible values). +- Single-frame messages (`is_last: true` on first frame) are returned + immediately and never buffered. They must pass the per-message size check but + must not count against connection or in-flight budgets because they are never + "in flight" in the assembly sense. +- Budget violations must abort the assembly for the offending message key, + free its partial buffers, and surface `InvalidData` through the existing + `DeserFailureTracker` path, consistent with ADR 0002 section "Failure modes + and cleanup semantics". +- Do not add new external dependencies. +- Keep source modules under the 400-line project guidance. +- Validate with `rstest` unit tests and `rstest-bdd` behavioural tests. +- Record the enforcement design in the ADR 0002 implementation decisions. + +## Tolerances (exception triggers) + +- Scope: if this work requires touching more than 15 files or exceeds 600 net + lines of code, stop and escalate. +- Interface: if implementing enforcement requires changing any existing public + API signatures (other than adding new constructors or methods), stop and + escalate. +- Dependencies: if any new crate is required, stop and escalate. +- Ambiguity: if the distinction between `bytes_per_connection` and + `bytes_in_flight` requires different accounting than the sum of buffered + assembly bytes, stop and present options with trade-offs. +- Iterations: if the same failing test or lint issue persists after 3 focused + attempts, stop and escalate. + +## Risks + +- Risk: `state.rs` (365 lines) and `state_tests.rs` (384 lines) are both + near the 400-line limit. Adding enforcement logic and tests may exceed it. + Severity: medium. Likelihood: high. Mitigation: extract budget-checking + functions into a new `src/message_assembler/budget.rs` module and budget + tests into `src/message_assembler/budget_tests.rs`. + +- Risk: `total_buffered_bytes()` is O(n) over all in-flight assemblies per + accepted frame. Severity: low. Likelihood: low. Mitigation: acceptable for + 8.3.2 because concurrent assemblies per connection are typically in single + digits. A cached running total can be added in a future optimization pass if + profiling warrants it. + +- Risk: the conceptual overlap between `bytes_per_connection` and + `bytes_in_flight` may confuse consumers. Severity: low. Likelihood: medium. + Mitigation: keep them as separate checks with separate error variants and + document the distinction clearly. Today they track the same accounting; + future items may diverge when streaming body buffers are introduced. + +- Risk: BDD step definitions may interact poorly with `Instant` for + timeout-purge scenarios. Severity: low. Likelihood: low. Mitigation: use the + existing `_at` method variants that accept an explicit `Instant`, matching + the pattern in `state_tests.rs`. + +## Progress + +- [x] (2026-02-21) Drafted ExecPlan for roadmap item `8.3.2`. +- [x] Stage A: add budget enforcement to `MessageAssemblyState`. +- [x] Stage B: wire budgets through the integration layer. +- [x] Stage C: add unit tests (`rstest`) — 20 tests covering all budget + dimensions. +- [x] Stage D: add behavioural tests (`rstest-bdd`) — 6 scenarios. +- [x] Stage E: documentation, roadmap, and quality gates. + +## Surprises & discoveries + +- `state.rs` hit 429 lines after adding budget fields and wiring. Extracted + `check_size_limit` to `budget.rs` as planned by the risk mitigation. +- Borrow checker conflict in `accept_continuation_frame_at`: calling + `self.total_buffered_bytes()` while holding a mutable `entry` borrow. + Resolved with a snapshot pattern — read all immutable state into locals + before the `self.assemblies.entry(key)` call. +- Clippy `too_many_arguments` triggered on `check_aggregate_budgets` (5 args). + Introduced `AggregateBudgets` struct to bundle the two optional limits. +- BDD step text "the buffered count is" collided with an existing step in + `message_assembly_steps.rs`. Renamed to "the active assembly count is" to + avoid ambiguity. + +## Decision log + +- Decision: add budget tracking directly to `MessageAssemblyState` rather + than creating a separate `BudgetEnforcer` wrapper. Rationale: + `MessageAssemblyState` already owns the assembly map and is the sole + authority over buffer lifecycle. A wrapper would duplicate or proxy all + state-mutation methods with no clear separation-of-concerns benefit. + Date/Author: 2026-02-21 / DevBoxer. + +- Decision: use the minimum of `max_message_size` and `bytes_per_message` + when both are present, rather than letting one override the other. Rationale: + ADR 0002 states "the effective message cap is whichever guard triggers first" + and recommends operators set compatible values. Using the minimum enforces + this automatically. Date/Author: 2026-02-21 / DevBoxer. + +- Decision: keep `bytes_per_connection` and `bytes_in_flight` as separate + checks with separate error variants even though they currently track the same + sum. Rationale: the ADR names them as distinct budget dimensions. The cost of + two comparisons per frame is negligible, and divergence is expected in future + items. Date/Author: 2026-02-21 / DevBoxer. + +- Decision: extract budget-checking functions into + `src/message_assembler/budget.rs` to keep `state.rs` under 400 lines and give + enforcement a clear module boundary for future 8.3.3/8.3.4 work. Date/Author: + 2026-02-21 / DevBoxer. + +- Decision: account for both `body_buffer.len()` and `metadata.len()` in + `buffered_bytes()` because metadata is also heap-allocated and contributes to + memory pressure. The existing `accumulated_len()` (body only) continues to + serve the per-message size check. Date/Author: 2026-02-21 / DevBoxer. + +## Outcomes & retrospective + +Implementation complete. All quality gates pass. + +**Files created:** + +- `src/message_assembler/budget.rs` — budget enforcement helpers + (`AggregateBudgets`, `check_aggregate_budgets`, `check_size_limit`) +- `src/message_assembler/budget_tests.rs` — 20 unit tests +- `tests/features/budget_enforcement.feature` — 6 BDD scenarios +- `tests/fixtures/budget_enforcement.rs` — `BudgetEnforcementWorld` +- `tests/steps/budget_enforcement_steps.rs` — step definitions +- `tests/scenarios/budget_enforcement_scenarios.rs` — scenario bindings + +**Files modified:** + +- `src/message_assembler/mod.rs` — registered `budget` module +- `src/message_assembler/state.rs` — added `AggregateBudgets` field, + `with_budgets()` constructor, `total_buffered_bytes()`, wired budget checks + (398 lines, within limit) +- `src/message_assembler/error.rs` — added `ConnectionBudgetExceeded` and + `InFlightBudgetExceeded` variants +- `src/message_assembler/tests.rs` — registered `budget_tests` module +- `src/app/frame_handling/assembly.rs` — added `Option` param, + effective per-message limit computation, budget wiring +- `src/app/connection.rs` — passes `self.memory_budgets` to assembly state +- `src/app/memory_budgets.rs` — updated doc comment +- `docs/roadmap.md` — marked 8.3.2 done +- `docs/users-guide.md` — documented enforcement semantics +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` — added + implementation decisions +- `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, `tests/scenarios/mod.rs` — + registered budget enforcement modules + +## Context and orientation + +### Existing configuration surface (8.3.1 — complete) + +`src/app/memory_budgets.rs` defines `MemoryBudgets` and `BudgetBytes`. The type +is stored as `memory_budgets: Option` on `WireframeApp` (in +`src/app/builder/core.rs`) and set via the +`WireframeApp::memory_budgets(budgets)` builder method (in +`src/app/builder/config.rs`). The three accessors are: + +- `bytes_per_message() -> BudgetBytes` +- `bytes_per_connection() -> BudgetBytes` +- `bytes_in_flight() -> BudgetBytes` + +Each wraps a `NonZeroUsize`. + +### Message assembly subsystem (8.2 — complete) + +`src/message_assembler/state.rs` defines `MessageAssemblyState`, which manages +concurrent in-flight assemblies keyed by `MessageKey`. Key methods: + +- `new(max_message_size: NonZeroUsize, timeout: Duration)` — constructor. +- `accept_first_frame(input: FirstFrameInput)` — starts a new assembly; + checks `max_message_size` against declared total body length. +- `accept_first_frame_at(input, now: Instant)` — explicit-clock variant. +- `accept_continuation_frame(header, body)` — continues an assembly; checks + accumulated size via `check_size_limit()`. +- `accept_continuation_frame_at(header, body, now)` — explicit-clock variant. +- `purge_expired()` / `purge_expired_at(now)` — evicts stale assemblies. +- `buffered_count()` — number of in-flight assemblies. + +Internal `PartialAssembly` tracks `series`, `routing`, `metadata`, +`body_buffer`, and `started_at`. The method `accumulated_len()` returns +`body_buffer.len()` (body bytes only). + +`src/message_assembler/error.rs` defines `MessageAssemblyError` with variants +`Series(MessageSeriesError)`, `DuplicateFirstFrame { key }`, and +`MessageTooLarge { key, attempted, limit }`. + +### Integration layer (8.2.5 — complete) + +`src/app/frame_handling/assembly.rs` provides: + +- `new_message_assembly_state(fragmentation, frame_budget)` — builds a + `MessageAssemblyState` from fragmentation config or frame budget defaults. +- `assemble_if_needed(runtime, deser_failures, env, max_deser_failures)` — + applies message assembly to a complete post-fragment envelope, routing errors + through `DeserFailureTracker`. + +The call site is in `src/app/connection.rs` line 257: + + frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length) + +### Testing topology + +- Unit tests: `rstest` in crate-local modules. Budget-related state tests + currently live in `src/message_assembler/state_tests.rs` (384 lines). +- Behavioural tests: `rstest-bdd` fixtures and scenarios under + `tests/features/`, `tests/fixtures/`, `tests/steps/`, and `tests/scenarios/`. + The 8.3.1 configuration scenarios are in + `tests/features/memory_budgets.feature`. +- Feature files use Gherkin syntax. Fixtures provide a `World` struct with + helper methods. Steps map Gherkin phrases to Rust functions via procedural + macros. Scenarios bind feature + fixture via `#[scenario]`. + +### Relevant design documents + +- `docs/adr-002-streaming-requests-and-shared-message-assembly.md` — sections + on budget enforcement and failure modes. +- `docs/generic-message-fragmentation-and-re-assembly-design.md` — section + 9.3 on memory budget integration. +- `docs/hardening-wireframe-a-guide-to-production-resilience.md` — resource + cap guidance. + +## Plan of work + +### Stage A: add budget enforcement to `MessageAssemblyState` + +Create a new module `src/message_assembler/budget.rs` containing the budget +enforcement helpers. This module houses: + +1. A `buffered_bytes()` method on `PartialAssembly` returning + `body_buffer.len() + metadata.len()`. + +2. A `total_buffered_bytes()` free function (or method) that sums + `buffered_bytes()` across all assemblies. + +3. A `check_aggregate_budgets()` function that takes the current total, the + additional bytes about to be accepted, the message key, and optional + `connection_budget` and `in_flight_budget` limits. It returns + `Result<(), MessageAssemblyError>`. + +Then modify `src/message_assembler/state.rs`: + +1. Add two optional fields to `MessageAssemblyState`: + `connection_budget: Option` and + `in_flight_budget: Option`. + +2. Add a new constructor `with_budgets(max_message_size, timeout, + connection_budget, + in_flight_budget)`. Refactor the existing `new()` to delegate to ` + with_budgets` with `None` for both budget fields. + +3. Add a public `total_buffered_bytes(&self) -> usize` query method for + diagnostics and testing. + +4. Wire `check_aggregate_budgets()` into `accept_first_frame_at()` — after + the existing `MessageTooLarge` check, before inserting the partial assembly + — using `input.body.len() + input.metadata.len()` as the incoming bytes. + +5. Wire `check_aggregate_budgets()` into `accept_continuation_frame_at()` — + after the existing `check_size_limit` call, before `push_body` — using + `body.len()` as the incoming bytes. On budget violation, call + `entry.remove()` to free partial buffers before returning the error. + +Modify `src/message_assembler/error.rs` to add two new variants to +`MessageAssemblyError`: + +- `ConnectionBudgetExceeded { key, attempted, limit }` +- `InFlightBudgetExceeded { key, attempted, limit }` + +Register the new module in `src/message_assembler/mod.rs`. + +Go/no-go: if `state.rs` exceeds 400 lines after changes, extract more logic to +`budget.rs` before proceeding. + +### Stage B: wire budgets through the integration layer + +Modify `src/app/frame_handling/assembly.rs`: + +1. Add `memory_budgets: Option` as a third parameter to + `new_message_assembly_state`. + +2. Inside the function, after computing `max_message_size` from fragmentation + config, compute the effective per-message limit as the minimum of + `max_message_size` and `budgets.bytes_per_message().get()` when budgets are + configured. + +3. Extract `connection_budget` and `in_flight_budget` from the budgets and + pass all three to `MessageAssemblyState::with_budgets()`. + +4. Add the `MemoryBudgets` import. + +Modify `src/app/connection.rs`: + +1. Pass `self.memory_budgets` as the third argument at the call site + (line 257). `MemoryBudgets` is `Copy`, so no ownership issues. + +Go/no-go: `make lint` and `make test` must pass before proceeding. + +### Stage C: add unit tests (`rstest`) + +Create `src/message_assembler/budget_tests.rs` to keep `state_tests.rs` under +400 lines. Register it in `src/message_assembler/tests.rs` with +`#[path = "budget_tests.rs"] mod budget_tests;`. + +Tests to add (all using `rstest` fixtures): + +**`total_buffered_bytes` accounting:** + +- `state_reports_zero_buffered_bytes_when_empty` — fresh state returns 0. +- `state_reports_buffered_bytes_for_single_assembly` — after accepting one + multi-frame first frame, total equals body + metadata length. +- `state_reports_buffered_bytes_for_multiple_assemblies` — after starting two + assemblies, total is their sum. +- `state_reduces_buffered_bytes_after_completion` — completing an assembly + reduces the total. +- `state_reduces_buffered_bytes_after_timeout_purge` — purging an expired + assembly reduces the total. + +**Connection budget enforcement:** + +- `state_rejects_first_frame_exceeding_connection_budget` — single message + whose body + metadata exceeds `connection_budget`. +- `state_rejects_continuation_exceeding_connection_budget` — second assembly + pushes aggregate over `connection_budget`. +- `state_allows_frame_within_connection_budget` — multiple assemblies fitting + within budget. +- `state_frees_partial_assembly_on_connection_budget_violation` — + `buffered_count()` drops after rejection. + +**In-flight budget enforcement:** + +- `state_rejects_first_frame_exceeding_in_flight_budget` — analogous to + connection budget tests. +- `state_rejects_continuation_exceeding_in_flight_budget` — analogous. +- `state_frees_partial_assembly_on_in_flight_budget_violation` — analogous. + +**Per-message budget override:** + +- `state_uses_minimum_of_max_message_size_and_budget` — construct with + `max_message_size=1024` and `bytes_per_message=512`; verify 512 is the + effective limit. + +**Budget interaction with existing behaviour:** + +- `state_budget_not_enforced_when_none` — default constructor allows any + amount (backward compatibility). +- `state_budget_violation_does_not_affect_other_assemblies` — key 1 exceeds + budget; key 2 continues unaffected. +- `state_timeout_purge_reclaims_budget_headroom` — after purge, + previously-blocked frame now fits. + +Go/no-go: all new and existing unit tests pass. + +### Stage D: add behavioural tests (`rstest-bdd`) + +Create `tests/features/budget_enforcement.feature` with scenarios covering the +observable enforcement behaviours: + +1. Accept frames within all budget limits. +2. Reject frame exceeding per-message budget. +3. Reject continuation exceeding connection budget; verify partial freed. +4. Reject continuation exceeding in-flight budget; verify partial freed. +5. Reclaim budget headroom after assembly completes. +6. Reclaim budget headroom after timeout purge. + +Create supporting files: + +- `tests/fixtures/budget_enforcement.rs` — `BudgetEnforcementWorld` fixture. +- `tests/steps/budget_enforcement_steps.rs` — step definitions. +- `tests/scenarios/budget_enforcement_scenarios.rs` — `#[scenario]` bindings. + +Register the new files in `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, and +`tests/scenarios/mod.rs`. + +Go/no-go: all BDD scenarios pass via `make test`. + +### Stage E: documentation, roadmap, and quality gates + +1. Update `src/app/memory_budgets.rs` module doc comment. +2. Update `docs/users-guide.md` "Per-connection memory budgets" section. +3. Update `docs/adr-002-streaming-requests-and-shared-message-assembly.md`. +4. Mark `8.3.2` as done in `docs/roadmap.md`. +5. Run full quality gates. + +Go/no-go: do not mark roadmap done until all gates pass. + +## Concrete steps + +Run from repository root (`/home/user/project`). + +1. Create `src/message_assembler/budget.rs` with budget enforcement helpers. + +2. Modify `src/message_assembler/mod.rs` to register the new module. + +3. Add error variants to `src/message_assembler/error.rs`. + +4. Modify `src/message_assembler/state.rs`: add fields, `with_budgets()` + constructor, `total_buffered_bytes()` query, and wire budget checks into + `accept_first_frame_at()` and `accept_continuation_frame_at()`. + +5. Verify state module compiles and existing tests pass: + + set -o pipefail + cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-2-unit-a.log + + Expected: all existing `state_tests` and `series_tests` pass. + +6. Modify `src/app/frame_handling/assembly.rs`: update + `new_message_assembly_state` to accept `Option` and thread + budget values into `MessageAssemblyState::with_budgets()`. + +7. Modify `src/app/connection.rs` line 257: pass `self.memory_budgets`. + +8. Verify integration compiles and existing tests pass: + + set -o pipefail + make lint 2>&1 | tee /tmp/wireframe-8-3-2-lint-b.log + make test 2>&1 | tee /tmp/wireframe-8-3-2-test-b.log + + Expected: no regressions. + +9. Create `src/message_assembler/budget_tests.rs` with the unit tests + described in Stage C. Register in `src/message_assembler/tests.rs`. + +10. Run unit tests: + + set -o pipefail + cargo test --lib message_assembler 2>&1 | tee /tmp/wireframe-8-3-2-unit-c.log + + Expected: all new budget tests pass alongside existing tests. + +11. Create the four BDD test files described in Stage D. Register in + `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, `tests/scenarios/mod.rs`. + +12. Run BDD tests: + + set -o pipefail + cargo test --test bdd budget_enforcement 2>&1 | tee /tmp/wireframe-8-3-2-bdd.log + + Expected: all new scenarios pass. + +13. Update documentation files as described in Stage E. + +14. Run full quality gates: + + set -o pipefail + make fmt 2>&1 | tee /tmp/wireframe-8-3-2-fmt.log + make markdownlint 2>&1 | tee /tmp/wireframe-8-3-2-markdownlint.log + make check-fmt 2>&1 | tee /tmp/wireframe-8-3-2-check-fmt.log + make lint 2>&1 | tee /tmp/wireframe-8-3-2-lint.log + make test 2>&1 | tee /tmp/wireframe-8-3-2-test.log + make nixie 2>&1 | tee /tmp/wireframe-8-3-2-nixie.log + + Expected: all pass with zero warnings. + +15. If any gate fails, inspect the corresponding log, fix the issue, and + rerun that command before continuing. + +## Validation and acceptance + +Acceptance criteria: + +- Budget enforcement: unit tests prove each dimension is enforced, partial + buffers are freed on violation, and completed or purged assemblies reclaim + headroom. +- Backward compatibility: all existing tests pass unchanged when + `MemoryBudgets` is not configured. +- Behavioural coverage: `rstest-bdd` scenarios pass for enforcement behaviour. +- Documentation: design decisions recorded in ADR 0002; user guide updated; + roadmap `8.3.2` marked done. + +Quality criteria: + +- Tests: all existing and new unit + behavioural tests pass via `make test`. +- Lint: `make lint` passes with no warnings. +- Formatting: `make fmt` and `make check-fmt` pass. +- Markdown: `make markdownlint` passes. +- Mermaid: `make nixie` passes. + +## Idempotence and recovery + +All edits are additive and safe to rerun. If an intermediate step fails: + +- Keep local changes. +- Fix the specific failure. +- Rerun only the failed command, then continue the pipeline. + +Avoid destructive git commands. If rollback is needed, revert only files +changed for this roadmap item. + +## Artefacts and notes + +Expected artefacts after completion: + +- New: `src/message_assembler/budget.rs` (budget enforcement helpers). +- New: `src/message_assembler/budget_tests.rs` (budget enforcement unit + tests). +- New: `tests/features/budget_enforcement.feature`. +- New: `tests/fixtures/budget_enforcement.rs`. +- New: `tests/steps/budget_enforcement_steps.rs`. +- New: `tests/scenarios/budget_enforcement_scenarios.rs`. +- Modified: `src/message_assembler/mod.rs`, `state.rs`, `error.rs`, + `tests.rs`. +- Modified: `src/app/frame_handling/assembly.rs`, `src/app/connection.rs`. +- Modified: `src/app/memory_budgets.rs` (doc comment update). +- Modified: `tests/fixtures/mod.rs`, `tests/steps/mod.rs`, + `tests/scenarios/mod.rs`. +- Modified: `docs/adr-002-streaming-requests-and-shared-message-assembly.md`, + `docs/users-guide.md`, `docs/roadmap.md`. +- Gate logs: `/tmp/wireframe-8-3-2-*.log`. + +## Interfaces and dependencies + +At the end of this milestone, the following interfaces must exist. + +In `src/message_assembler/error.rs`: + + pub enum MessageAssemblyError { + // … existing variants … + + /// Accepting this frame would exceed the per-connection buffering budget. + #[error("connection budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + ConnectionBudgetExceeded { + key: MessageKey, + attempted: usize, + limit: NonZeroUsize, + }, + + /// Accepting this frame would exceed the in-flight assembly byte budget. + #[error("in-flight budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + InFlightBudgetExceeded { + key: MessageKey, + attempted: usize, + limit: NonZeroUsize, + }, + } + +In `src/message_assembler/state.rs`: + + impl MessageAssemblyState { + /// Create a new assembly state manager with optional aggregate budgets. + #[must_use] + pub fn with_budgets( + max_message_size: NonZeroUsize, + timeout: Duration, + connection_budget: Option, + in_flight_budget: Option, + ) -> Self { … } + + /// Total bytes buffered across all in-flight assemblies. + #[must_use] + pub fn total_buffered_bytes(&self) -> usize { … } + } + +In `src/app/frame_handling/assembly.rs`: + + pub(crate) fn new_message_assembly_state( + fragmentation: Option, + frame_budget: usize, + memory_budgets: Option, + ) -> MessageAssemblyState { … } + +Dependencies: + +- No new crates. +- Testing remains on `rstest` and `rstest-bdd` v0.5.0 already present in + `Cargo.toml`. diff --git a/docs/roadmap.md b/docs/roadmap.md index de385954..52752e73 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -292,7 +292,7 @@ and standardized per-connection memory budgets. ### 8.3. Per-connection memory budgets - [x] 8.3.1. Add `WireframeApp::memory_budgets(...)` builder method. -- [ ] 8.3.2. Implement budget enforcement covering bytes per message, bytes +- [x] 8.3.2. Implement budget enforcement covering bytes per message, bytes per connection, and bytes across in-flight assemblies. - [ ] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour. - [ ] 8.3.4. Implement hard cap (abort early, release partial state, surface diff --git a/docs/users-guide.md b/docs/users-guide.md index 2742bc2e..d4dd209e 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -567,9 +567,14 @@ let _app = WireframeApp::new() .read_timeout_ms(250); ``` -This builder method establishes configuration for the current `WireframeApp` -instance. Runtime enforcement and derived defaults are implemented in the -subsequent roadmap items covering memory budget enforcement behaviour. +When budgets are configured, the message assembly subsystem enforces them at +frame acceptance time. Frames that would cause the total buffered bytes to +exceed the per-connection or in-flight budget are rejected, the offending +partial assembly is freed, and the failure is surfaced through the existing +deserialization-failure policy (`InvalidData`). The effective per-message limit +is the minimum of the fragmentation `max_message_size` and the configured +`bytes_per_message`. Single-frame messages that complete immediately are never +counted against aggregate budgets, since they do not buffer. #### Message key multiplexing (8.2.3) diff --git a/src/app/frame_handling/assembly.rs b/src/app/frame_handling/assembly.rs index 18222d3a..f7edfd91 100644 --- a/src/app/frame_handling/assembly.rs +++ b/src/app/frame_handling/assembly.rs @@ -6,7 +6,7 @@ use log::debug; use super::core::DeserFailureTracker; use crate::{ - app::{Envelope, builder_defaults::default_fragmentation}, + app::{Envelope, builder_defaults::default_fragmentation, memory_budgets::MemoryBudgets}, codec::clamp_frame_length, fragment::FragmentationConfig, message_assembler::{ @@ -42,13 +42,18 @@ impl<'a> AssemblyRuntime<'a> { } /// Build a connection-scoped message assembly state from known budgets. +/// +/// When `memory_budgets` is `Some`, the effective per-message limit is +/// `min(fragmentation_max, bytes_per_message)` and the connection/in-flight +/// budgets are forwarded to [`MessageAssemblyState::with_budgets`]. #[must_use] pub(crate) fn new_message_assembly_state( fragmentation: Option, frame_budget: usize, + memory_budgets: Option, ) -> MessageAssemblyState { let config = fragmentation.or_else(|| default_fragmentation(frame_budget)); - let max_message_size = config.map_or_else( + let frag_max = config.map_or_else( || NonZeroUsize::new(clamp_frame_length(frame_budget)).unwrap_or(NonZeroUsize::MIN), |cfg| cfg.max_message_size, ); @@ -56,7 +61,19 @@ pub(crate) fn new_message_assembly_state( cfg.reassembly_timeout }); - MessageAssemblyState::new(max_message_size, timeout) + match memory_budgets { + Some(budgets) => { + let per_message = budgets.bytes_per_message().get(); + let max_message_size = frag_max.min(per_message); + MessageAssemblyState::with_budgets( + max_message_size, + timeout, + Some(budgets.bytes_per_connection().get()), + Some(budgets.bytes_in_flight().get()), + ) + } + None => MessageAssemblyState::new(frag_max, timeout), + } } /// Purge stale in-flight assemblies. diff --git a/src/app/inbound_handler.rs b/src/app/inbound_handler.rs index 2952cf0e..eefd9b2e 100644 --- a/src/app/inbound_handler.rs +++ b/src/app/inbound_handler.rs @@ -254,7 +254,11 @@ where framed.read_buffer_mut().reserve(max_frame_length); let mut deser_failures = 0u32; let mut message_assembly = self.message_assembler.as_ref().map(|_| { - frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length) + frame_handling::new_message_assembly_state( + self.fragmentation, + requested_frame_length, + self.memory_budgets, + ) }); let mut pipeline = FramePipeline::new(self.fragmentation); let timeout_dur = Duration::from_millis(self.read_timeout_ms); diff --git a/src/app/memory_budgets.rs b/src/app/memory_budgets.rs index 4adc3812..8671612f 100644 --- a/src/app/memory_budgets.rs +++ b/src/app/memory_budgets.rs @@ -2,8 +2,8 @@ //! //! `MemoryBudgets` stores explicit byte caps configured on //! [`crate::app::WireframeApp`] via `WireframeApp::memory_budgets(...)`. -//! This module only exposes configuration; enforcement lands in a future -//! iteration. +//! Enforcement is wired through `MessageAssemblyState::with_budgets` so +//! that frames exceeding any budget dimension are rejected at assembly time. use std::num::NonZeroUsize; diff --git a/src/message_assembler/budget.rs b/src/message_assembler/budget.rs new file mode 100644 index 00000000..0d98e6a2 --- /dev/null +++ b/src/message_assembler/budget.rs @@ -0,0 +1,105 @@ +//! Budget enforcement helpers for message assembly. +//! +//! This module provides aggregate budget checks applied during frame +//! acceptance. Per-message size limits are handled by the existing +//! `check_size_limit` function in [`super::state`]; this module adds +//! per-connection and in-flight aggregate budget enforcement. + +use std::num::NonZeroUsize; + +use super::{MessageKey, error::MessageAssemblyError}; + +/// Paired connection and in-flight budget limits. +/// +/// Bundled into a struct so call-sites pass a single value instead of two +/// `Option` parameters. +#[derive(Clone, Copy, Debug)] +pub(super) struct AggregateBudgets { + pub(super) connection: Option, + pub(super) in_flight: Option, +} + +impl AggregateBudgets { + /// Returns `true` when at least one aggregate budget limit is configured. + pub(super) const fn is_enabled(&self) -> bool { + self.connection.is_some() || self.in_flight.is_some() + } +} + +/// Check whether accepting `additional_bytes` for `key` would exceed +/// the connection budget or in-flight budget. +/// +/// Both budgets are checked against the same `current_total` because, +/// at this layer, all buffered bytes are assembly bytes. The dimensions +/// are kept separate so future work (streaming body buffers, transport +/// buffering) can diverge them. +/// +/// # Errors +/// +/// Returns [`MessageAssemblyError::ConnectionBudgetExceeded`] or +/// [`MessageAssemblyError::InFlightBudgetExceeded`] when the respective +/// limit would be exceeded. +pub(super) fn check_aggregate_budgets( + key: MessageKey, + current_total: usize, + additional_bytes: usize, + budgets: &AggregateBudgets, +) -> Result<(), MessageAssemblyError> { + let new_total = current_total.saturating_add(additional_bytes); + + if let Some(limit) = budgets.connection + && new_total > limit.get() + { + return Err(MessageAssemblyError::ConnectionBudgetExceeded { + key, + attempted: new_total, + limit, + }); + } + + if let Some(limit) = budgets.in_flight + && new_total > limit.get() + { + return Err(MessageAssemblyError::InFlightBudgetExceeded { + key, + attempted: new_total, + limit, + }); + } + + Ok(()) +} + +/// Check if accumulated size plus new body would exceed the per-message +/// size limit. +/// +/// Returns the new total size on success. +/// +/// # Errors +/// +/// Returns [`MessageAssemblyError::MessageTooLarge`] when the new total +/// would exceed `max_message_size`. +pub(super) fn check_size_limit( + max_message_size: NonZeroUsize, + key: MessageKey, + accumulated: usize, + body_len: usize, +) -> Result { + let Some(new_len) = accumulated.checked_add(body_len) else { + return Err(MessageAssemblyError::MessageTooLarge { + key, + attempted: usize::MAX, + limit: max_message_size, + }); + }; + + if new_len > max_message_size.get() { + return Err(MessageAssemblyError::MessageTooLarge { + key, + attempted: new_len, + limit: max_message_size, + }); + } + + Ok(new_len) +} diff --git a/src/message_assembler/budget_enforcement_tests.rs b/src/message_assembler/budget_enforcement_tests.rs new file mode 100644 index 00000000..f04a785c --- /dev/null +++ b/src/message_assembler/budget_enforcement_tests.rs @@ -0,0 +1,305 @@ +//! Budget enforcement tests: connection, in-flight, dual, isolation, +//! headroom reclamation, and single-frame bypass. + +use std::time::{Duration, Instant}; + +use rstest::rstest; + +use super::{ + connection_budgeted_state, + dual_budgeted_state, + in_flight_budgeted_state, + nz, + submit_first, + submit_first_at, + unbounded_state, +}; +use crate::message_assembler::{ + MessageAssemblyError, + MessageAssemblyState, + MessageKey, + test_helpers::continuation_header, +}; + +// ============================================================================= +// Helpers for parameterised budget-dimension tests +// ============================================================================= + +/// Build a `MessageAssemblyState` with only the named budget dimension +/// set to 20 bytes. +fn state_for_dimension(dimension: &str) -> MessageAssemblyState { + match dimension { + "connection" => connection_budgeted_state(), + "in_flight" => in_flight_budgeted_state(), + _ => panic!("unknown budget dimension: {dimension}"), + } +} + +/// Assert that `err` is the expected budget-exceeded variant for +/// `dimension`, matching the given `key` and `attempted` values. +fn assert_budget_exceeded(err: &MessageAssemblyError, dimension: &str, key: u64, attempted: usize) { + match dimension { + "connection" => assert!( + matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { + key: k, + attempted: a, + .. + } if k == &MessageKey(key) && *a == attempted + ), + "expected ConnectionBudgetExceeded(key={key}, attempted={attempted}), got: {err:?}" + ), + "in_flight" => assert!( + matches!( + err, + MessageAssemblyError::InFlightBudgetExceeded { + key: k, + attempted: a, + .. + } if k == &MessageKey(key) && *a == attempted + ), + "expected InFlightBudgetExceeded(key={key}, attempted={attempted}), got: {err:?}" + ), + _ => panic!("unknown budget dimension: {dimension}"), + } +} + +/// Assert that `err` matches the budget-exceeded variant for `dimension` +/// without inspecting field values. +fn assert_budget_exceeded_any(err: &MessageAssemblyError, dimension: &str) { + match dimension { + "connection" => assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "expected ConnectionBudgetExceeded, got: {err:?}" + ), + "in_flight" => assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "expected InFlightBudgetExceeded, got: {err:?}" + ), + _ => panic!("unknown budget dimension: {dimension}"), + } +} + +// ============================================================================= +// Parameterised budget enforcement (connection + in-flight) +// ============================================================================= + +#[rstest] +#[case::connection("connection")] +#[case::in_flight("in_flight")] +fn budget_allows_frames_within_limit(#[case] dimension: &str) { + let mut state = state_for_dimension(dimension); + // 10 bytes fits within 20-byte budget + submit_first(&mut state, 1, &[0u8; 10], false).expect("within budget"); + assert_eq!(state.total_buffered_bytes(), 10); +} + +#[rstest] +#[case::connection("connection")] +#[case::in_flight("in_flight")] +fn budget_rejects_first_frame_exceeding_limit(#[case] dimension: &str) { + let mut state = state_for_dimension(dimension); + // 21 bytes exceeds 20-byte budget + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert_budget_exceeded(&err, dimension, 1, 21); + assert_eq!(state.buffered_count(), 0); +} + +#[rstest] +#[case::connection("connection")] +#[case::in_flight("in_flight")] +fn budget_rejects_continuation_exceeding_limit(#[case] dimension: &str) { + let mut state = state_for_dimension(dimension); + // First frame: 10 bytes (within 20-byte budget) + submit_first(&mut state, 1, &[0u8; 10], false).expect("first frame"); + + // Continuation: 11 bytes would bring total to 21, exceeding 20 + let cont = continuation_header(1, 1, 11, true); + let err = state + .accept_continuation_frame(&cont, &[0u8; 11]) + .expect_err("should reject"); + assert_budget_exceeded_any(&err, dimension); + // Partial assembly should be freed on budget violation + assert_eq!(state.buffered_count(), 0); +} + +// ============================================================================= +// Connection-specific: partial assembly freed on violation +// ============================================================================= + +#[rstest] +fn connection_budget_frees_partial_on_violation( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Start two assemblies: 8 + 8 = 16 (within 20) + submit_first(&mut state, 1, &[0u8; 8], false).expect("first"); + submit_first(&mut state, 2, &[0u8; 8], false).expect("second"); + assert_eq!(state.buffered_count(), 2); + + // Continuation on key 1: 5 bytes would bring total to 21 + let cont = continuation_header(1, 1, 5, false); + let err = state + .accept_continuation_frame(&cont, &[0u8; 5]) + .expect_err("should reject"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Key 1 is freed; key 2 survives + assert_eq!(state.buffered_count(), 1); + assert_eq!(state.total_buffered_bytes(), 8); +} + +// ============================================================================= +// Dual budget: tighter budget triggers first +// ============================================================================= + +#[rstest] +fn dual_budget_in_flight_triggers_before_connection( + #[from(dual_budgeted_state)] mut state: MessageAssemblyState, +) { + // In-flight budget is 20, connection budget is 30. + // 21 bytes should trigger in-flight first. + let err = submit_first(&mut state, 1, &[0u8; 21], false).expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::InFlightBudgetExceeded { .. }), + "expected in-flight to trigger first, got: {err:?}" + ); +} + +#[test] +fn dual_budget_connection_triggers_when_in_flight_not_exceeded() { + // connection=15 < in_flight=20, so connection triggers first. + let mut state = MessageAssemblyState::with_budgets( + nz(1024), + Duration::from_secs(30), + Some(nz(15)), + Some(nz(20)), + ); + let err = submit_first(&mut state, 1, &[0u8; 16], false).expect_err("should reject"); + assert!( + matches!(err, MessageAssemblyError::ConnectionBudgetExceeded { .. }), + "expected connection to trigger first, got: {err:?}" + ); +} + +// ============================================================================= +// Backward compatibility: no budgets = no enforcement +// ============================================================================= + +#[rstest] +fn no_budgets_allows_large_frames(#[from(unbounded_state)] mut state: MessageAssemblyState) { + // 500 bytes is well within the 1024 per-message limit and has no + // aggregate cap. + submit_first(&mut state, 1, &[0u8; 500], false).expect("no budget enforcement"); + submit_first(&mut state, 2, &[0u8; 500], false).expect("no budget enforcement"); + assert_eq!(state.total_buffered_bytes(), 1000); +} + +// ============================================================================= +// Budget isolation: violation for one key does not affect others +// ============================================================================= + +#[rstest] +fn budget_violation_does_not_affect_other_assemblies( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Start key 1 with 10 bytes + submit_first(&mut state, 1, &[0u8; 10], false).expect("first"); + assert_eq!(state.buffered_count(), 1); + + // Try key 2 with 11 bytes — total would be 21, exceeding 20-byte + // budget + let err = submit_first(&mut state, 2, &[0u8; 11], false).expect_err("should reject"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Key 1 is still intact + assert_eq!(state.buffered_count(), 1); + assert_eq!(state.total_buffered_bytes(), 10); + + // Key 1 can still be completed + let cont = continuation_header(1, 1, 3, true); + let msg = state + .accept_continuation_frame(&cont, b"fin") + .expect("cont") + .expect("complete"); + assert_eq!(msg.body(), &[&[0u8; 10][..], b"fin"].concat()); +} + +// ============================================================================= +// Headroom reclamation after purge +// ============================================================================= + +#[test] +fn headroom_reclaimed_after_purge_allows_new_assembly() { + let mut state = + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None); + + let now = Instant::now(); + submit_first_at(&mut state, 1, &[0u8; 15], now).expect("first"); + assert_eq!(state.total_buffered_bytes(), 15); + + // A 6-byte frame would exceed budget (15 + 6 = 21 > 20) + let err = submit_first(&mut state, 2, &[0u8; 6], false).expect_err("over budget"); + assert!(matches!( + err, + MessageAssemblyError::ConnectionBudgetExceeded { .. } + )); + + // Purge key 1 via timeout + let future = now + Duration::from_secs(31); + state.purge_expired_at(future); + assert_eq!(state.total_buffered_bytes(), 0); + + // Now 6 bytes is fine + submit_first(&mut state, 3, &[0u8; 6], false).expect("within reclaimed budget"); + assert_eq!(state.total_buffered_bytes(), 6); +} + +#[test] +fn headroom_reclaimed_after_completion_allows_new_frame() { + let mut state = + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20))); + + submit_first(&mut state, 1, &[0u8; 15], false).expect("first"); + + // Complete key 1 + let cont = continuation_header(1, 1, 3, true); + state + .accept_continuation_frame(&cont, b"end") + .expect("cont") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 0); + + // Now we have full headroom again + submit_first(&mut state, 2, &[0u8; 20], false).expect("within reclaimed budget"); + assert_eq!(state.total_buffered_bytes(), 20); +} + +// ============================================================================= +// Single-frame messages bypass aggregate budgets +// ============================================================================= + +#[rstest] +fn single_frame_message_not_subject_to_aggregate_budgets( + #[from(connection_budgeted_state)] mut state: MessageAssemblyState, +) { + // Fill up to the budget edge + submit_first(&mut state, 1, &[0u8; 19], false).expect("first"); + assert_eq!(state.total_buffered_bytes(), 19); + + // A single-frame message should still succeed because it never + // buffers + let msg = submit_first(&mut state, 2, b"big-single-frame-payload", true) + .expect("single frame accepted") + .expect("should complete immediately"); + assert_eq!(msg.body(), b"big-single-frame-payload"); + + // Buffered bytes unchanged (single-frame was never buffered) + assert_eq!(state.total_buffered_bytes(), 19); +} diff --git a/src/message_assembler/budget_tests.rs b/src/message_assembler/budget_tests.rs new file mode 100644 index 00000000..90ae65eb --- /dev/null +++ b/src/message_assembler/budget_tests.rs @@ -0,0 +1,171 @@ +//! Unit tests for aggregate budget enforcement (8.3.2). +//! +//! Helpers, fixtures, and `total_buffered_bytes` accounting tests live here. +//! Enforcement tests (connection, in-flight, dual, isolation, headroom, +//! single-frame bypass) are in the `enforcement` sub-module. + +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use rstest::{fixture, rstest}; + +use crate::message_assembler::{ + EnvelopeRouting, + FirstFrameInput, + MessageAssemblyError, + MessageAssemblyState, + test_helpers::{continuation_header, first_header}, +}; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Non-zero shorthand. +fn nz(val: usize) -> NonZeroUsize { NonZeroUsize::new(val).expect("non-zero") } + +/// Build a [`FirstFrameHeader`] and [`FirstFrameInput`] in the caller's +/// scope from a key, body slice, and finality flag. +/// +/// Encapsulates the header construction and input validation common to all +/// first-frame submission helpers. The macro expands to two `let` bindings +/// (`$hdr` for the header, `$inp` for the input) so the header lives long +/// enough for the input's borrow. +macro_rules! create_first_frame_input { + ($hdr:ident, $inp:ident, $key:expr, $body:expr, $is_last:expr) => { + let $hdr = first_header($key, $body.len(), $is_last); + let $inp = + FirstFrameInput::new(&$hdr, EnvelopeRouting::default(), vec![], $body).expect("valid"); + }; +} + +/// Submit a first frame with `body_len` bytes of body data. +fn submit_first( + state: &mut MessageAssemblyState, + key: u64, + body: &[u8], + is_last: bool, +) -> Result, MessageAssemblyError> { + create_first_frame_input!(_header, input, key, body, is_last); + state.accept_first_frame(input) +} + +/// Submit a first frame at a specific timestamp. +fn submit_first_at( + state: &mut MessageAssemblyState, + key: u64, + body: &[u8], + now: Instant, +) -> Result, MessageAssemblyError> { + create_first_frame_input!(_header, input, key, body, false); + state.accept_first_frame_at(input, now) +} + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +/// State with no budgets — backwards-compatibility baseline. +#[fixture] +fn unbounded_state() -> MessageAssemblyState { + MessageAssemblyState::new(nz(1024), Duration::from_secs(30)) +} + +/// State with a 20-byte connection budget and 1024-byte per-message limit. +#[fixture] +fn connection_budgeted_state() -> MessageAssemblyState { + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), Some(nz(20)), None) +} + +/// State with a 20-byte in-flight budget and 1024-byte per-message limit. +#[fixture] +fn in_flight_budgeted_state() -> MessageAssemblyState { + MessageAssemblyState::with_budgets(nz(1024), Duration::from_secs(30), None, Some(nz(20))) +} + +/// State with both connection (30) and in-flight (20) budgets. +#[fixture] +fn dual_budgeted_state() -> MessageAssemblyState { + MessageAssemblyState::with_budgets( + nz(1024), + Duration::from_secs(30), + Some(nz(30)), + Some(nz(20)), + ) +} + +// ============================================================================= +// total_buffered_bytes accounting +// ============================================================================= + +#[rstest] +fn total_buffered_bytes_starts_at_zero(#[from(unbounded_state)] state: MessageAssemblyState) { + assert_eq!(state.total_buffered_bytes(), 0); +} + +#[rstest] +fn total_buffered_bytes_tracks_single_assembly( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"hello", false).expect("first frame"); + assert_eq!(state.total_buffered_bytes(), 5); +} + +#[rstest] +fn total_buffered_bytes_tracks_multiple_assemblies( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"aaa", false).expect("first"); + submit_first(&mut state, 2, b"bbbbb", false).expect("second"); + assert_eq!(state.total_buffered_bytes(), 8); // 3 + 5 +} + +#[rstest] +fn total_buffered_bytes_decreases_after_completion( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"aaa", false).expect("first"); + submit_first(&mut state, 2, b"bb", false).expect("second"); + assert_eq!(state.total_buffered_bytes(), 5); + + // Complete key 1 + let cont = continuation_header(1, 1, 2, true); + state + .accept_continuation_frame(&cont, b"xx") + .expect("cont") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 2); // only key 2 remains +} + +#[rstest] +fn total_buffered_bytes_decreases_after_purge( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + let now = Instant::now(); + submit_first_at(&mut state, 1, b"data", now).expect("first"); + assert_eq!(state.total_buffered_bytes(), 4); + + let future = now + Duration::from_secs(31); + state.purge_expired_at(future); + assert_eq!(state.total_buffered_bytes(), 0); +} + +#[rstest] +fn single_frame_message_not_counted_in_buffered_bytes( + #[from(unbounded_state)] mut state: MessageAssemblyState, +) { + submit_first(&mut state, 1, b"complete", true) + .expect("first") + .expect("complete"); + assert_eq!(state.total_buffered_bytes(), 0); + assert_eq!(state.buffered_count(), 0); +} + +// ============================================================================= +// Budget enforcement tests — see budget_enforcement_tests.rs +// ============================================================================= + +#[path = "budget_enforcement_tests.rs"] +mod enforcement; diff --git a/src/message_assembler/error.rs b/src/message_assembler/error.rs index 17915718..86b5c33d 100644 --- a/src/message_assembler/error.rs +++ b/src/message_assembler/error.rs @@ -107,4 +107,26 @@ pub enum MessageAssemblyError { /// Configured size cap. limit: NonZeroUsize, }, + + /// Accepting this frame would exceed the per-connection buffering budget. + #[error("connection budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + ConnectionBudgetExceeded { + /// Message key whose frame triggered the guard. + key: MessageKey, + /// Total bytes that would result from accepting the frame. + attempted: usize, + /// Configured per-connection byte cap. + limit: NonZeroUsize, + }, + + /// Accepting this frame would exceed the in-flight assembly byte budget. + #[error("in-flight budget exceeded for key {key}: {attempted} bytes > {limit} bytes")] + InFlightBudgetExceeded { + /// Message key whose frame triggered the guard. + key: MessageKey, + /// Total bytes that would result from accepting the frame. + attempted: usize, + /// Configured in-flight byte cap. + limit: NonZeroUsize, + }, } diff --git a/src/message_assembler/mod.rs b/src/message_assembler/mod.rs index 2a0e44a4..d6362f27 100644 --- a/src/message_assembler/mod.rs +++ b/src/message_assembler/mod.rs @@ -20,6 +20,7 @@ //! - Duplicate frames (already-processed sequences) //! - Frames arriving after the series is complete +mod budget; pub mod error; mod header; pub mod series; diff --git a/src/message_assembler/state.rs b/src/message_assembler/state.rs index 863579cc..a39a129a 100644 --- a/src/message_assembler/state.rs +++ b/src/message_assembler/state.rs @@ -13,6 +13,7 @@ use std::{ use super::{ ContinuationFrameHeader, MessageKey, + budget::{AggregateBudgets, check_aggregate_budgets, check_size_limit}, error::{MessageAssemblyError, MessageSeriesError, MessageSeriesStatus}, series::MessageSeries, types::{AssembledMessage, EnvelopeRouting, FirstFrameInput}, @@ -44,6 +45,9 @@ impl PartialAssembly { fn set_metadata(&mut self, data: Vec) { self.metadata = data; } fn accumulated_len(&self) -> usize { self.body_buffer.len() } + + /// Total heap bytes held by this partial assembly (body + metadata). + fn buffered_bytes(&self) -> usize { self.body_buffer.len().saturating_add(self.metadata.len()) } } /// Stateful manager for multiple concurrent message assemblies. @@ -109,6 +113,7 @@ pub struct MessageAssemblyState { max_message_size: NonZeroUsize, timeout: Duration, assemblies: HashMap, + budgets: AggregateBudgets, } impl MessageAssemblyState { @@ -120,10 +125,29 @@ impl MessageAssemblyState { /// * `timeout` - Duration after which partial assemblies are purged. #[must_use] pub fn new(max_message_size: NonZeroUsize, timeout: Duration) -> Self { + Self::with_budgets(max_message_size, timeout, None, None) + } + + /// Create a new assembly state manager with optional aggregate budgets. + /// + /// When `connection_budget` or `in_flight_budget` is `Some`, frames that + /// would cause the total buffered bytes across all in-flight assemblies + /// to exceed the respective limit are rejected. + #[must_use] + pub fn with_budgets( + max_message_size: NonZeroUsize, + timeout: Duration, + connection_budget: Option, + in_flight_budget: Option, + ) -> Self { Self { max_message_size, timeout, assemblies: HashMap::new(), + budgets: AggregateBudgets { + connection: connection_budget, + in_flight: in_flight_budget, + }, } } @@ -193,6 +217,19 @@ impl MessageAssemblyState { ))); } + // Check aggregate budgets before buffering. Single-frame messages + // are returned above and never counted. The O(n) scan is gated on + // is_enabled() so the hot path stays O(1) when budgets are absent. + if self.budgets.is_enabled() { + let incoming_bytes = input.body.len().saturating_add(input.metadata.len()); + check_aggregate_budgets( + key, + self.total_buffered_bytes(), + incoming_bytes, + &self.budgets, + )?; + } + // Start new assembly, preserving envelope routing metadata from the // first frame so the completed message is dispatched correctly. let mut partial = PartialAssembly::new(series, input.routing, now); @@ -237,34 +274,6 @@ impl MessageAssemblyState { ) } - /// Check if accumulated size plus new body would exceed the limit. - /// - /// Returns the new total size on success. - fn check_size_limit( - max_message_size: NonZeroUsize, - key: MessageKey, - accumulated: usize, - body_len: usize, - ) -> Result { - let Some(new_len) = accumulated.checked_add(body_len) else { - return Err(MessageAssemblyError::MessageTooLarge { - key, - attempted: usize::MAX, - limit: max_message_size, - }); - }; - - if new_len > max_message_size.get() { - return Err(MessageAssemblyError::MessageTooLarge { - key, - attempted: new_len, - limit: max_message_size, - }); - } - - Ok(new_len) - } - /// Process a continuation frame with an explicit timestamp. /// /// See [`accept_continuation_frame`](Self::accept_continuation_frame) for @@ -295,6 +304,16 @@ impl MessageAssemblyState { )); } + // Snapshot budget state before the mutable entry borrow. The O(n) + // scan is gated on is_enabled() so it's skipped when budgets are absent. + let max_message_size = self.max_message_size; + let budgets = self.budgets; + let buffered_total = if budgets.is_enabled() { + self.total_buffered_bytes() + } else { + 0 + }; + let Entry::Occupied(mut entry) = self.assemblies.entry(key) else { return Err(MessageAssemblyError::Series( MessageSeriesError::MissingFirstFrame { key }, @@ -314,8 +333,13 @@ impl MessageAssemblyState { // Check size limit let accumulated = entry.get().accumulated_len(); - if let Err(e) = Self::check_size_limit(self.max_message_size, key, accumulated, body.len()) - { + if let Err(e) = check_size_limit(max_message_size, key, accumulated, body.len()) { + entry.remove(); + return Err(e); + } + + // Check aggregate budgets + if let Err(e) = check_aggregate_budgets(key, buffered_total, body.len(), &budgets) { entry.remove(); return Err(e); } @@ -359,6 +383,17 @@ impl MessageAssemblyState { evicted } + /// Total bytes buffered across all in-flight assemblies. + /// + /// Includes both body and metadata bytes for each partial assembly. + #[must_use] + pub fn total_buffered_bytes(&self) -> usize { + self.assemblies + .values() + .map(PartialAssembly::buffered_bytes) + .sum() + } + /// Number of partial assemblies currently buffered. #[must_use] pub fn buffered_count(&self) -> usize { self.assemblies.len() } diff --git a/src/message_assembler/tests.rs b/src/message_assembler/tests.rs index 48508814..e8568e00 100644 --- a/src/message_assembler/tests.rs +++ b/src/message_assembler/tests.rs @@ -201,3 +201,10 @@ mod series_tests; #[path = "state_tests.rs"] mod state_tests; + +// ============================================================================= +// Budget enforcement tests (8.3.2) - see budget_tests.rs +// ============================================================================= + +#[path = "budget_tests.rs"] +mod budget_tests; diff --git a/tests/features/budget_enforcement.feature b/tests/features/budget_enforcement.feature new file mode 100644 index 00000000..7ea6e517 --- /dev/null +++ b/tests/features/budget_enforcement.feature @@ -0,0 +1,49 @@ +@budget_enforcement +Feature: Memory budget enforcement during message assembly + When memory budgets are configured on a Wireframe application, the message + assembly subsystem actively rejects frames that would exceed any of the + three budget dimensions — per-message, per-connection, and in-flight. + + Background: + Given a budgeted assembly state configured as 1024/30/50/40 + + Scenario: Accept frames within all budget limits + When a first frame for key 1 with 10 body bytes is accepted + Then the frame is accepted without error + And total buffered bytes is 10 + + Scenario: Reject first frame exceeding connection budget + When a first frame for key 1 with 51 body bytes is rejected + Then the error is "ConnectionBudgetExceeded" + And the active assembly count is 0 + + Scenario: Reject continuation exceeding in-flight budget + When a first frame for key 1 with 30 body bytes is accepted + Then the frame is accepted without error + When a continuation for key 1 with sequence 1 and 11 body bytes is rejected + Then the error is "InFlightBudgetExceeded" + And the active assembly count is 0 + + Scenario: Reclaim budget headroom after assembly completes + When a first frame for key 1 with 35 body bytes is accepted + Then total buffered bytes is 35 + When a final continuation for key 1 with sequence 1 and 5 body bytes completes the message + Then total buffered bytes is 0 + When a first frame for key 2 with 35 body bytes is accepted + Then the frame is accepted without error + + Scenario: Reclaim budget headroom after timeout purge + Given the clock is at time zero + When a first frame for key 1 with 35 body bytes is accepted at time zero + Then total buffered bytes is 35 + When expired assemblies are purged at 31 seconds + Then total buffered bytes is 0 + When a first frame for key 2 with 35 body bytes is accepted + Then the frame is accepted without error + + Scenario: Single-frame message bypasses aggregate budgets + When a first frame for key 1 with 35 body bytes is accepted + Then total buffered bytes is 35 + When a single-frame message for key 2 with 100 body bytes is accepted + Then the single-frame message completes immediately + And total buffered bytes is 35 diff --git a/tests/fixtures/budget_enforcement.rs b/tests/fixtures/budget_enforcement.rs new file mode 100644 index 00000000..9927c203 --- /dev/null +++ b/tests/fixtures/budget_enforcement.rs @@ -0,0 +1,288 @@ +//! Behavioural test world for budget enforcement (8.3.2). + +use std::{ + num::NonZeroUsize, + str::FromStr, + time::{Duration, Instant}, +}; + +use rstest::fixture; +use wireframe::message_assembler::{ + ContinuationFrameHeader, + EnvelopeRouting, + FirstFrameHeader, + FirstFrameInput, + FrameSequence, + MessageAssemblyError, + MessageAssemblyState, + MessageKey, +}; +pub use wireframe_testing::TestResult; + +/// Configuration bundle for `init_budgeted_state`. +/// +/// Parsed from the Gherkin step text as +/// `"max_message_size / timeout_secs / connection_budget / in_flight_budget"`. +#[derive(Clone, Copy)] +pub struct BudgetedStateConfig { + pub max_message_size: usize, + pub timeout_secs: u64, + pub connection_budget: usize, + pub in_flight_budget: usize, +} + +impl FromStr for BudgetedStateConfig { + type Err = String; + + fn from_str(s: &str) -> Result { + let mut parts = s.split('/').map(str::trim); + let msg = parts.next().ok_or("missing max_message_size")?; + let timeout = parts.next().ok_or("missing timeout_secs")?; + let conn = parts.next().ok_or("missing connection_budget")?; + let flight = parts.next().ok_or("missing in_flight_budget")?; + Ok(Self { + max_message_size: msg.parse().map_err(|e| format!("max_message_size: {e}"))?, + timeout_secs: timeout.parse().map_err(|e| format!("timeout_secs: {e}"))?, + connection_budget: conn + .parse() + .map_err(|e| format!("connection_budget: {e}"))?, + in_flight_budget: flight + .parse() + .map_err(|e| format!("in_flight_budget: {e}"))?, + }) + } +} + +/// Continuation frame descriptor. +#[derive(Clone, Copy)] +pub struct ContinuationInput { + pub key: u64, + pub sequence: u32, + pub body_len: usize, + pub is_last: bool, +} + +/// Behavioural test world for budget enforcement scenarios. +#[derive(Debug)] +pub struct BudgetEnforcementWorld { + state: MessageAssemblyState, + last_error: Option, + last_accepted: bool, + last_completed: bool, + epoch: Option, +} + +impl Default for BudgetEnforcementWorld { + fn default() -> Self { + // Use a forgiving default so that scenarios which forget to call + // init_budgeted_state fail with a clear budget error rather than a + // confusing MessageTooLarge on any multi-byte body. + let max = NonZeroUsize::new(64 * 1024).unwrap_or(NonZeroUsize::MIN); + Self { + state: MessageAssemblyState::new(max, Duration::from_secs(30)), + last_error: None, + last_accepted: false, + last_completed: false, + epoch: None, + } + } +} + +/// Fixture for `BudgetEnforcementWorld`. +#[fixture] +#[rustfmt::skip] +pub fn budget_enforcement_world() -> BudgetEnforcementWorld { + BudgetEnforcementWorld::default() +} + +impl BudgetEnforcementWorld { + /// Re-initialise state with explicit budgets. + pub fn init_budgeted_state(&mut self, cfg: BudgetedStateConfig) -> TestResult { + let max_message_size = + NonZeroUsize::new(cfg.max_message_size).ok_or("max_message_size must be non-zero")?; + let connection = + NonZeroUsize::new(cfg.connection_budget).ok_or("connection_budget must be non-zero")?; + let in_flight = + NonZeroUsize::new(cfg.in_flight_budget).ok_or("in_flight_budget must be non-zero")?; + self.state = MessageAssemblyState::with_budgets( + max_message_size, + Duration::from_secs(cfg.timeout_secs), + Some(connection), + Some(in_flight), + ); + Ok(()) + } + + /// Set the epoch for time-based tests. + pub fn set_epoch(&mut self) { self.epoch = Some(Instant::now()); } + + /// Record the result of an assembly operation. + fn record_result( + &mut self, + result: Result< + Option, + MessageAssemblyError, + >, + ) { + match result { + Ok(None) => { + self.last_accepted = true; + self.last_completed = false; + self.last_error = None; + } + Ok(Some(_)) => { + self.last_accepted = true; + self.last_completed = true; + self.last_error = None; + } + Err(e) => { + self.last_accepted = false; + self.last_completed = false; + self.last_error = Some(e); + } + } + } + + /// Build a first-frame header and body from key, body length, and finality. + fn build_first_input(key: u64, body_len: usize, is_last: bool) -> (Vec, FirstFrameHeader) { + ( + vec![0u8; body_len], + FirstFrameHeader { + message_key: MessageKey(key), + metadata_len: 0, + body_len, + total_body_len: None, + is_last, + }, + ) + } + + /// Submit a multi-frame first frame. + pub fn accept_first_frame(&mut self, key: u64, body_len: usize) -> TestResult { + let (body, header) = Self::build_first_input(key, body_len, false); + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .map_err(|e| e.to_string())?; + let result = self.state.accept_first_frame(input); + self.record_result(result); + Ok(()) + } + + /// Submit a multi-frame first frame at the stored epoch. + pub fn accept_first_frame_at_epoch(&mut self, key: u64, body_len: usize) -> TestResult { + let now = self.epoch.ok_or("epoch not set")?; + let (body, header) = Self::build_first_input(key, body_len, false); + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .map_err(|e| e.to_string())?; + let result = self.state.accept_first_frame_at(input, now); + self.record_result(result); + Ok(()) + } + + /// Submit a first frame that is expected to be rejected. + pub fn reject_first_frame(&mut self, key: u64, body_len: usize) -> TestResult { + self.accept_first_frame(key, body_len)?; + if self.last_accepted { + return Err("expected first frame to be rejected, but it was accepted".into()); + } + Ok(()) + } + + /// Submit a single-frame message. + pub fn accept_single_frame(&mut self, key: u64, body_len: usize) -> TestResult { + let (body, header) = Self::build_first_input(key, body_len, true); + let input = FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], &body) + .map_err(|e| e.to_string())?; + let result = self.state.accept_first_frame(input); + self.record_result(result); + Ok(()) + } + + /// Submit a continuation frame. + pub fn accept_continuation(&mut self, input: ContinuationInput) { + let body = vec![0u8; input.body_len]; + let header = ContinuationFrameHeader { + message_key: MessageKey(input.key), + sequence: Some(FrameSequence(input.sequence)), + body_len: input.body_len, + is_last: input.is_last, + }; + let result = self.state.accept_continuation_frame(&header, &body); + self.record_result(result); + } + + /// Submit a continuation frame that is expected to be rejected. + pub fn reject_continuation(&mut self, key: u64, sequence: u32, body_len: usize) -> TestResult { + self.accept_continuation(ContinuationInput { + key, + sequence, + body_len, + is_last: false, + }); + if self.last_accepted { + return Err("expected continuation to be rejected, but it was accepted".into()); + } + Ok(()) + } + + /// Purge expired assemblies at a given offset from epoch. + pub fn purge_at_offset(&mut self, offset_secs: u64) -> TestResult { + let epoch = self.epoch.ok_or("epoch not set")?; + let future = epoch + Duration::from_secs(offset_secs); + self.state.purge_expired_at(future); + Ok(()) + } + + // ---- Assertions ---- + + pub fn assert_accepted(&self) -> TestResult { + if !self.last_accepted { + let detail = self + .last_error + .as_ref() + .map_or("(no error captured)".to_string(), |e| format!("{e}")); + return Err(format!("expected frame to be accepted, got error: {detail}").into()); + } + Ok(()) + } + + pub fn assert_completed(&self) -> TestResult { + if !self.last_completed { + return Err("expected message to complete".into()); + } + Ok(()) + } + + pub fn assert_error_kind(&self, expected: &str) -> TestResult { + let err = self + .last_error + .as_ref() + .ok_or("expected an error, but none was captured")?; + let actual = match err { + MessageAssemblyError::ConnectionBudgetExceeded { .. } => "ConnectionBudgetExceeded", + MessageAssemblyError::InFlightBudgetExceeded { .. } => "InFlightBudgetExceeded", + MessageAssemblyError::MessageTooLarge { .. } => "MessageTooLarge", + MessageAssemblyError::DuplicateFirstFrame { .. } => "DuplicateFirstFrame", + MessageAssemblyError::Series(_) => "Series", + }; + if actual != expected { + return Err(format!("expected error kind '{expected}', got '{actual}'").into()); + } + Ok(()) + } + + pub fn assert_total_buffered_bytes(&self, expected: usize) -> TestResult { + let actual = self.state.total_buffered_bytes(); + if actual != expected { + return Err(format!("expected total_buffered_bytes={expected}, got {actual}").into()); + } + Ok(()) + } + + pub fn assert_buffered_count(&self, expected: usize) -> TestResult { + let actual = self.state.buffered_count(); + if actual != expected { + return Err(format!("expected buffered_count={expected}, got {actual}").into()); + } + Ok(()) + } +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index b05281d3..7344370c 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -3,6 +3,7 @@ //! Each world from the former Cucumber tests is converted to an rstest fixture //! here. +pub mod budget_enforcement; pub mod client_lifecycle; pub mod client_messaging; pub mod client_preamble; diff --git a/tests/scenarios/budget_enforcement_scenarios.rs b/tests/scenarios/budget_enforcement_scenarios.rs new file mode 100644 index 00000000..7b457a27 --- /dev/null +++ b/tests/scenarios/budget_enforcement_scenarios.rs @@ -0,0 +1,57 @@ +//! Scenario test functions for budget enforcement (8.3.2). + +use rstest_bdd_macros::scenario; + +use crate::fixtures::budget_enforcement::*; + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Accept frames within all budget limits" +)] +fn accept_frames_within_limits(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reject first frame exceeding connection budget" +)] +fn reject_first_frame_exceeding_connection_budget( + budget_enforcement_world: BudgetEnforcementWorld, +) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reject continuation exceeding in-flight budget" +)] +fn reject_continuation_exceeding_in_flight_budget( + budget_enforcement_world: BudgetEnforcementWorld, +) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reclaim budget headroom after assembly completes" +)] +fn reclaim_headroom_after_completion(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Reclaim budget headroom after timeout purge" +)] +fn reclaim_headroom_after_purge(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} + +#[scenario( + path = "tests/features/budget_enforcement.feature", + name = "Single-frame message bypasses aggregate budgets" +)] +fn single_frame_bypasses_budgets(budget_enforcement_world: BudgetEnforcementWorld) { + let _ = budget_enforcement_world; +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index d866accb..6c52a8c4 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -7,6 +7,7 @@ #[path = "../steps/mod.rs"] pub(crate) mod steps; +mod budget_enforcement_scenarios; mod client_lifecycle_scenarios; mod client_messaging_scenarios; mod client_preamble_scenarios; diff --git a/tests/steps/budget_enforcement_steps.rs b/tests/steps/budget_enforcement_steps.rs new file mode 100644 index 00000000..ff732816 --- /dev/null +++ b/tests/steps/budget_enforcement_steps.rs @@ -0,0 +1,144 @@ +//! Step definitions for budget enforcement BDD scenarios (8.3.2). + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::budget_enforcement::{ + BudgetEnforcementWorld, + BudgetedStateConfig, + ContinuationInput, + TestResult, +}; + +// --------------------------------------------------------------------------- +// Given +// --------------------------------------------------------------------------- + +#[given("a budgeted assembly state configured as {config}")] +fn given_budgeted_state( + budget_enforcement_world: &mut BudgetEnforcementWorld, + config: BudgetedStateConfig, +) -> TestResult { + budget_enforcement_world.init_budgeted_state(config) +} + +#[given("the clock is at time zero")] +fn given_clock_at_zero(budget_enforcement_world: &mut BudgetEnforcementWorld) { + budget_enforcement_world.set_epoch(); +} + +// --------------------------------------------------------------------------- +// When +// --------------------------------------------------------------------------- + +#[when("a first frame for key {key} with {body_len} body bytes is accepted")] +fn when_first_frame_accepted( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_first_frame(key, body_len) +} + +#[when("a first frame for key {key} with {body_len} body bytes is rejected")] +fn when_first_frame_rejected( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.reject_first_frame(key, body_len) +} + +#[when("a first frame for key {key} with {body_len} body bytes is accepted at time zero")] +fn when_first_frame_accepted_at_epoch( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_first_frame_at_epoch(key, body_len) +} + +#[when("a continuation for key {key} with sequence {seq} and {body_len} body bytes is rejected")] +fn when_continuation_rejected( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + seq: u32, + body_len: usize, +) -> TestResult { + budget_enforcement_world.reject_continuation(key, seq, body_len) +} + +#[when( + "a final continuation for key {key} with sequence {seq} and {body_len} body bytes completes \ + the message" +)] +fn when_final_continuation_completes( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + seq: u32, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_continuation(ContinuationInput { + key, + sequence: seq, + body_len, + is_last: true, + }); + budget_enforcement_world.assert_completed() +} + +#[when("expired assemblies are purged at {offset_secs} seconds")] +fn when_purged_at_offset( + budget_enforcement_world: &mut BudgetEnforcementWorld, + offset_secs: u64, +) -> TestResult { + budget_enforcement_world.purge_at_offset(offset_secs) +} + +#[when("a single-frame message for key {key} with {body_len} body bytes is accepted")] +fn when_single_frame_accepted( + budget_enforcement_world: &mut BudgetEnforcementWorld, + key: u64, + body_len: usize, +) -> TestResult { + budget_enforcement_world.accept_single_frame(key, body_len) +} + +// --------------------------------------------------------------------------- +// Then +// --------------------------------------------------------------------------- + +#[then("the frame is accepted without error")] +fn then_frame_accepted(budget_enforcement_world: &mut BudgetEnforcementWorld) -> TestResult { + budget_enforcement_world.assert_accepted() +} + +#[then("the error is \"{error_kind}\"")] +fn then_error_is( + budget_enforcement_world: &mut BudgetEnforcementWorld, + error_kind: String, +) -> TestResult { + budget_enforcement_world.assert_error_kind(&error_kind) +} + +#[then("total buffered bytes is {expected}")] +fn then_total_buffered_bytes( + budget_enforcement_world: &mut BudgetEnforcementWorld, + expected: usize, +) -> TestResult { + budget_enforcement_world.assert_total_buffered_bytes(expected) +} + +#[then("the active assembly count is {expected}")] +fn then_active_assembly_count( + budget_enforcement_world: &mut BudgetEnforcementWorld, + expected: usize, +) -> TestResult { + budget_enforcement_world.assert_buffered_count(expected) +} + +#[then("the single-frame message completes immediately")] +fn then_single_frame_completes( + budget_enforcement_world: &mut BudgetEnforcementWorld, +) -> TestResult { + budget_enforcement_world.assert_completed() +} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index 292badfd..6de12dbd 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -3,6 +3,7 @@ //! Step functions are synchronous and call async world methods via //! `Runtime::new().block_on(...)`. +mod budget_enforcement_steps; mod client_lifecycle_steps; mod client_messaging_steps; mod client_preamble_steps;