Implement hard-cap memory-budget protection (8.3.4)#483
Conversation
Implement hard-cap connection abort for inbound memory budgets (roadmap item 8.3.4) as a defence-in-depth safety net. When total buffered bytes strictly exceed the aggregate cap (min of bytes_per_connection and bytes_in_flight), the inbound read loop immediately aborts the connection with InvalidData, bypassing the DeserFailureTracker. A combined evaluate_memory_pressure() function is added that returns MemoryPressureAction (Continue, Pause, Abort) to integrate both soft-limit and hard-cap checks, keeping inbound_handler.rs within the 400-line limit. Add comprehensive unit tests and rstest-bdd behavioural tests validating abort behaviour and connection health within budget. Update relevant documentation, including the ADR, user guide, roadmap, and design notes, to reflect the three-tier memory budget protection model: per-frame enforcement, soft-limit read pacing, and hard-cap abort. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Reviewer's GuideImplements a unified inbound memory-pressure policy with a new hard-cap connection abort path, refactors the inbound read loop to consult this policy, and adds unit/BDD tests and documentation updates to describe the three-tier memory-budget model and mark roadmap item 8.3.4 complete. Class diagram for backpressure policy and inbound handler integrationclassDiagram
class MemoryBudgets {
+bytes_per_message: BudgetBytes
+bytes_per_connection: BudgetBytes
+bytes_in_flight: BudgetBytes
}
class MessageAssemblyState {
+total_buffered_bytes() usize
+buffered_count() usize
}
class MemoryPressureAction {
<<enum>>
+Continue
+Pause
+Abort
}
class BackpressureModule {
+evaluate_memory_pressure(state: Option~&MessageAssemblyState~, budgets: Option~MemoryBudgets~) MemoryPressureAction
+has_hard_cap_been_breached(state: Option~&MessageAssemblyState~, budgets: Option~MemoryBudgets~) bool
+should_pause_inbound_reads(state: Option~&MessageAssemblyState~, budgets: Option~MemoryBudgets~) bool
-SOFT_LIMIT_NUMERATOR: u128
-SOFT_LIMIT_DENOMINATOR: u128
-SOFT_LIMIT_PAUSE_DURATION: Duration
-active_aggregate_limit_bytes(budgets: MemoryBudgets) usize
}
class InboundHandler {
+process_stream(stream: TStream, memory_budgets: Option~MemoryBudgets~) io::Result
-read_timeout_ms: u64
-message_assembly: Option~MessageAssemblyState~
-pipeline: Pipeline
}
class FrameHandlingMod {
+evaluate_memory_pressure(state: Option~&MessageAssemblyState~, budgets: Option~MemoryBudgets~) MemoryPressureAction
+decode_envelope(...)
+reassemble_if_needed(...)
+forward_response(...)
}
BackpressureModule ..> MemoryPressureAction : returns
BackpressureModule ..> MessageAssemblyState : reads_buffer_stats
BackpressureModule ..> MemoryBudgets : reads_limits
FrameHandlingMod ..> BackpressureModule : re_exports
InboundHandler ..> FrameHandlingMod : uses_evaluate_memory_pressure
InboundHandler ..> MessageAssemblyState : maintains
InboundHandler ..> MemoryBudgets : configured_by_app
InboundHandler ..> BackpressureModule : indirect_policy_dependency
Flow diagram for three-tier inbound memory protection modelflowchart TD
A[Start inbound frame processing] --> B{Per_frame_budget_enforcement}
B -->|Frame exceeds per_message or aggregate budgets| C[Reject frame and free offending assembly]
C --> D[Surface InvalidData via DeserFailureTracker]
B -->|Frame within budgets| E[Accept frame and update MessageAssemblyState]
E --> F{Evaluate_memory_pressure}
F -->|Hard cap breached
buffered_bytes > aggregate_limit| G[Abort connection with InvalidData]
G --> H[Drop MessageAssemblyState and free all partial assemblies]
F -->|Soft limit reached
buffered_bytes >= 80% of aggregate_limit| I[Pause reads for 5 ms]
I --> J[Purge expired assemblies]
J --> K[Continue processing next frame]
F -->|Below soft limit
normal pressure| K
K --> L[Read next frame from socket]
L --> B
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
WalkthroughIntroduces hard-cap memory pressure evaluation alongside soft-limit back-pressure for inbound message handling. Adds Changes
Sequence DiagramsequenceDiagram
participant Inbound as Inbound Read Loop
participant Eval as evaluate_memory<br/>_pressure()
participant Check as has_hard_cap_<br/>been_breached()
participant Pause as should_pause_<br/>inbound_reads()
participant Handler as Inbound Handler<br/>Error Path
Inbound->>Eval: evaluate_memory_pressure(state, budgets)
Eval->>Check: Check hard cap exceeded?
alt Hard Cap Breached
Check-->>Eval: true
Eval-->>Inbound: MemoryPressureAction::Abort
Inbound->>Handler: Return InvalidData IoError
Handler->>Handler: Terminate connection
else Hard Cap OK
Check-->>Eval: false
Eval->>Pause: Check soft-limit threshold
alt At/Above Soft Limit
Pause-->>Eval: true
Eval-->>Inbound: MemoryPressureAction::Pause(duration)
Inbound->>Inbound: Sleep for duration
Inbound->>Inbound: Purge expired assemblies
else Below Soft Limit
Pause-->>Eval: false
Eval-->>Inbound: MemoryPressureAction::Continue
Inbound->>Inbound: Proceed with read
end
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly Related PRs
Suggested Labels
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/app/frame_handling/backpressure_tests.rs Comment on file fn hard_cap_uses_smallest_aggregate_budget_dimension() -> TestResult {
let per_message =
NonZeroUsize::new(1024).ok_or_else(|| io::Error::other("1024 is non-zero"))?;
let per_connection =
NonZeroUsize::new(200).ok_or_else(|| io::Error::other("200 is non-zero"))?;
let in_flight = NonZeroUsize::new(100).ok_or_else(|| io::Error::other("100 is non-zero"))?;
let budgets = MemoryBudgets::new(
BudgetBytes::new(per_message),
BudgetBytes::new(per_connection),
BudgetBytes::new(in_flight),
);
let state = state_with_buffered_bytes(101)?;
if !has_hard_cap_been_breached(Some(&state), Some(budgets)) {
return Err(io::Error::other("expected breach at smallest aggregate dimension").into());
}
Ok(())
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
…ure tests - Introduce `custom_budgets` helper to reduce test duplication - Simplify tests by replacing repeated budget setup with helper - Add `ExpectedAction` enum for expected results in parameterized test - Consolidate multiple evaluate_memory_pressure tests into one parameterized test - Add `action_matches` function to compare action variants ignoring Duration payload - Increase test clarity and maintainability through restructuring Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Hey - I've found 5 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="src/app/frame_handling/backpressure_tests.rs" line_range="195-204" />
<code_context>
+
+/// Compare pressure actions by variant, ignoring the `Duration` payload in
+/// `Pause`.
+fn action_matches(actual: &MemoryPressureAction, expected: &MemoryPressureAction) -> bool {
+ matches!(
+ (actual, expected),
+ (
+ MemoryPressureAction::Continue,
+ MemoryPressureAction::Continue
+ ) | (
+ MemoryPressureAction::Pause(_),
+ MemoryPressureAction::Pause(_)
+ ) | (MemoryPressureAction::Abort, MemoryPressureAction::Abort)
+ )
+}
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider a targeted test that asserts the soft-limit pause duration is the expected value, not just the variant
Because `action_matches` ignores the `Duration` in `MemoryPressureAction::Pause`, the current parameterized test wouldn’t catch an accidental change to `SOFT_LIMIT_PAUSE_DURATION`. To cover this, add a small unit test that exercises `evaluate_memory_pressure` in a soft-limit scenario, matches on `MemoryPressureAction::Pause(d)`, and asserts `d == Duration::from_millis(5)` alongside the existing test.
Suggested implementation:
```rust
use super::{
MemoryPressureAction,
backpressure::{has_hard_cap_been_breached, should_pause_inbound_reads},
evaluate_memory_pressure,
};
#[test]
fn soft_limit_pause_uses_expected_duration() -> Result<(), Box<dyn Error>> {
// Arrange a scenario where the soft limit is breached but the hard cap is not.
//
// This should cause `evaluate_memory_pressure` to return a `Pause`
// with the configured soft-limit pause duration (currently 5ms).
let (state, budgets) = soft_limit_only_state_and_budgets()?;
let action = evaluate_memory_pressure(&state, &budgets)?;
match action {
MemoryPressureAction::Pause(d) => {
assert_eq!(d, Duration::from_millis(5));
}
other => {
panic!("expected MemoryPressureAction::Pause(_), got {other:?}");
}
}
Ok(())
}
```
To wire this test up to your existing code, you will need to:
1. Replace `soft_limit_only_state_and_budgets()` with concrete setup that produces a “soft limit breached, hard cap not breached” scenario. The easiest way is usually to:
- Reuse the same fixtures/builders used by your existing “combined evaluate_memory_pressure” tests, or
- Introduce a small helper returning `(state, budgets)` that configures usage just above the soft limit and below the hard cap.
2. Adjust the call to `evaluate_memory_pressure(&state, &budgets)?` if its signature differs (e.g., argument order, additional parameters, or if it does not return a `Result`).
3. If `evaluate_memory_pressure` does not use `Result`, remove the `Result` return type and `?` operators from the test accordingly.
4. Ensure that `MemoryPressureAction` implements `Debug` (or adapt the panic message) if `{other:?}` is not currently valid.
These adjustments should align the new test with the conventions and helpers already present in `backpressure_tests.rs`.
</issue_to_address>
### Comment 2
<location path="tests/fixtures/memory_budget_hard_cap.rs" line_range="24-33" />
<code_context>
+/// We cannot pass `MemoryPressureAction` directly in `#[case]` attributes
+/// because the `Pause` variant contains a `Duration`, so we use a simple tag
+/// that the test body maps to the real variant for comparison.
+#[derive(Clone, Copy, Debug)]
+enum ExpectedAction {
+ Continue,
</code_context>
<issue_to_address>
**suggestion (testing):** Add unit-style tests for `HardCapConfig::from_str` to cover invalid/edge-case parsing
Since `HardCapConfig` is the only way to configure this fixture, its failure paths should be covered directly rather than relying solely on BDD scenarios. Please add focused tests for `HardCapConfig::from_str` covering:
- A valid four-segment input.
- Each missing segment variant (e.g. `"/64/100/100"`, `"10//100/100"`).
- An extra segment (e.g. `"10/64/100/100/extra"`).
- Non-numeric segment values.
- Zero budgets/timeouts, asserting they either return `Err` or produce a config that fails early in `start_app`.
This will make regressions in the parsing logic much easier to detect and localize.
</issue_to_address>
### Comment 3
<location path="tests/fixtures/memory_budget_hard_cap.rs" line_range="244-253" />
<code_context>
+ /// Assert that the connection has terminated with an error.
</code_context>
<issue_to_address>
**suggestion (testing):** Consider asserting that no payloads are delivered in abort scenarios to verify in-flight messages are not surfaced
Right now the BDD step and `assert_connection_aborted` only check that an error occurs, not that no payloads were delivered. Please also assert that `observed_payloads` is empty at the time of abort (either in this helper or a new one) so a scenario like “Connection terminates after repeated budget violations” verifies that the hard-cap path drops in-flight data instead of delivering it.
</issue_to_address>
### Comment 4
<location path="src/app/inbound_handler.rs" line_range="281" />
<code_context>
loop {
- if frame_handling::should_pause_inbound_reads(
+ match frame_handling::evaluate_memory_pressure(
message_assembly.as_ref(),
self.memory_budgets,
</code_context>
<issue_to_address>
**issue (review_instructions):** This `match` introduces a 3-way branch and should be encapsulated in a predicate/helper function to comply with the conditional-branching guideline.
The review guideline asks to move conditionals with more than two branches into a predicate function. This `match` over `MemoryPressureAction` has three arms (`Abort`, `Pause`, `Continue`) and is now inlined in the loop body.
Consider extracting this branching into a small helper (e.g. a function that takes the action and performs the appropriate side-effect, or a function that maps the action to a simpler boolean/enum used here) so that the loop body does not directly contain a >2-branch conditional.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.rs`
**Instructions:**
Move conditionals with >2 branches into a predicate function.
</details>
</issue_to_address>
### Comment 5
<location path="docs/execplans/8-3-4-hard-cap-behaviour.md" line_range="263" />
<code_context>
+
+### BDD test pattern
+
+The project uses a 4-file pattern for BDD tests with `rstest-bdd` v0.5.0:
+
+1. **Feature file** (`tests/features/<name>.feature`): Gherkin scenarios.
</code_context>
<issue_to_address>
**suggestion (review_instructions):** Define “BDD” (behaviour-driven development) at its first use in this document.
To follow the “define uncommon acronyms on first use” guidance, expand this sentence to something like:
"The project uses a 4-file pattern for behaviour-driven development (BDD) tests with `rstest-bdd` v0.5.0:"
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*.md`
**Instructions:**
Define uncommon acronyms on first use.
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/execplans/8-3-4-hard-cap-behaviour.md`:
- Around line 89-91: Replace the British "-ising" spelling in the sentence "stop
and present options with trade-offs before finalising behaviour" with the Oxford
English Dictionary preferred "-izing" form: change "finalising behaviour" to
"finalizing behaviour" (ensure the surrounding sentence remains unchanged) to
comply with the en-GB-oxendict -ize spelling guideline in
docs/execplans/8-3-4-hard-cap-behaviour.md; verify there are no other instances
of "-ising" in that file and update them to "-izing" where appropriate.
In `@src/app/frame_handling/backpressure_tests.rs`:
- Around line 229-251: The parameterized test evaluate_memory_pressure_behaviour
contains a duplicate case: remove the redundant
#[case::abort_takes_priority_over_pause(101, true, ExpectedAction::Abort)]
(which is identical to #[case::above_hard_cap(101, true,
ExpectedAction::Abort)]), or replace it with a distinct scenario (e.g.,
different buffered_bytes or use_budgets flag) so each #[case::...] in the
evaluate_memory_pressure_behaviour test is unique; update any references to the
removed case name if used elsewhere.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (16)
docs/adr-002-streaming-requests-and-shared-message-assembly.mddocs/execplans/8-3-4-hard-cap-behaviour.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/roadmap.mddocs/users-guide.mdsrc/app/frame_handling/backpressure.rssrc/app/frame_handling/backpressure_tests.rssrc/app/frame_handling/mod.rssrc/app/inbound_handler.rstests/features/memory_budget_hard_cap.featuretests/fixtures/memory_budget_hard_cap.rstests/fixtures/mod.rstests/scenarios/memory_budget_hard_cap_scenarios.rstests/scenarios/mod.rstests/steps/memory_budget_hard_cap_steps.rstests/steps/mod.rs
…cement - Introduce `apply_memory_pressure` async function to act on memory pressure actions: Abort logs a warning and returns an error to terminate the connection. Pause logs at debug level, awaits pause duration, then purges expired assemblies. Continue is a no-op. - Replace inline memory pressure handling in inbound_handler with `apply_memory_pressure` usage. - Add tests for pause duration and hard-cap behavior. - Expand memory budget hard cap fixture tests to cover error cases. - Update documentation for hard-cap behavior with clarifications and spelling fixes. - Refactor exports to include `apply_memory_pressure`. This change enforces a per-connection memory hard cap by aborting connections that exceed it and applies backpressure via pausing when soft limits are reached. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
@coderabbitai Have the following now been resolved? |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 4491 🏁 Script executed: Length of output: 2758 🏁 Script executed: Length of output: 1196
✅ Resolved (4 of 5)Comment 1: Soft-limit pause duration testRESOLVED — Found
Comment 2: HardCapConfig::from_str unit testsRESOLVED — Comprehensive test module
Comment 3: Assert no payloads delivered in abort scenariosRESOLVED —
Comment 4: Extract 3-way match to helper functionRESOLVED — New
❌ Not Resolved (1 of 5)Comment 5: Define "BDD" acronym on first useNOT RESOLVED — Line 263 in
The acronym is not expanded at this location. While "behaviour-driven development" appears at line 114, it's in a different context (risk section) rather than at the first use of the acronym. Summary: Four of the five comments have been fully addressed. Only Comment 5 (defining the BDD acronym) remains outstanding. |
Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Summary
Changes
Rationale
Testing plan
Public API impact
Artefacts
Validation notes
If reviewers have questions about edge cases (e.g., interaction with partial assemblies on hard cap), please reference the ADR update and the exec-plan for staging rationale.
📎 Task: https://www.devboxer.com/task/daf2c9a4-6b90-435a-b4fa-3ac698c8e8d1