Implement memory budget enforcement across message assembly (8.3.2)#474
Implement memory budget enforcement across message assembly (8.3.2)#474
Conversation
WalkthroughImplements runtime budget enforcement for streaming message assembly, adding per-connection, in-flight, and per-message byte limits with dedicated budget checking logic. Violations are surfaced as ConnectionBudgetExceeded or InFlightBudgetExceeded errors, whilst single-frame messages bypass aggregate limits. Changes
Sequence DiagramsequenceDiagram
participant Client
participant InboundHandler
participant MessageAssemblyState
participant BudgetChecker
participant ErrorHandler
Client->>InboundHandler: submit frame (key, body_len)
InboundHandler->>MessageAssemblyState: accept_first_frame_at(timestamp, input)
alt Is Single Frame
MessageAssemblyState->>Client: return AssembledMessage immediately
Note over MessageAssemblyState: Bypass aggregate budgets
else Is Multi-Frame
MessageAssemblyState->>BudgetChecker: check_aggregate_budgets(key, current_total, additional_bytes)
alt Budget Exceeded
BudgetChecker->>MessageAssemblyState: return ConnectionBudgetExceeded or InFlightBudgetExceeded
MessageAssemblyState->>MessageAssemblyState: free partial assembly
MessageAssemblyState->>ErrorHandler: surface error as InvalidData
ErrorHandler->>Client: frame rejected
else Budget OK
MessageAssemblyState->>BudgetChecker: check_size_limit(max_message_size, key, accumulated, body_len)
alt Size Limit Exceeded
BudgetChecker->>MessageAssemblyState: return MessageTooLarge
MessageAssemblyState->>ErrorHandler: surface error
ErrorHandler->>Client: frame rejected
else Size OK
MessageAssemblyState->>MessageAssemblyState: buffer frame, track buffered_bytes
MessageAssemblyState->>Client: partial assembly stored
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
Reviewer's GuideImplements runtime memory budget enforcement in the message assembly pipeline, adding aggregate budget tracking to MessageAssemblyState, wiring it through the app’s assembly construction path, surfacing new budget-related errors, and covering the behavior with unit and BDD tests plus documentation updates. Sequence diagram for frame acceptance with budget enforcementsequenceDiagram
participant Conn as Connection
participant FH as frame_handling
participant MAS as MessageAssemblyState
participant BM as budget(check_aggregate_budgets,check_size_limit)
Conn->>FH: new_message_assembly_state(fragmentation, frame_budget, memory_budgets)
activate FH
FH->>FH: compute frag_max from fragmentation or frame_budget
alt memory_budgets is Some
FH->>memory_budgets: bytes_per_message()
FH->>FH: per_msg = min(frag_max, bytes_per_message)
FH->>memory_budgets: bytes_per_connection(), bytes_in_flight()
FH->>MAS: with_budgets(per_msg, timeout, connection_budget, in_flight_budget)
else no memory_budgets
FH->>MAS: new(frag_max, timeout)
end
deactivate FH
Conn->>MAS: accept_first_frame_at(input, now)
activate MAS
MAS->>BM: check_size_limit(max_message_size, key, 0, input.body.len())
BM-->>MAS: Ok(total_body)
MAS->>MAS: compute incoming_bytes = body.len + metadata.len
MAS->>MAS: total_before = total_buffered_bytes()
MAS->>BM: check_aggregate_budgets(key, total_before, incoming_bytes, budgets)
alt budgets exceeded
BM-->>MAS: Err(ConnectionBudgetExceeded or InFlightBudgetExceeded)
MAS-->>Conn: Err(MessageAssemblyError)
else within budgets
BM-->>MAS: Ok(())
MAS->>MAS: insert new PartialAssembly
MAS-->>Conn: Ok(assembly_started)
end
deactivate MAS
Conn->>MAS: accept_continuation_frame_at(header, body, now)
activate MAS
MAS->>MAS: buffered_total = total_buffered_bytes()
MAS->>MAS: lookup PartialAssembly by key
MAS->>BM: check_size_limit(max_message_size, key, accumulated_len, body.len)
alt size limit exceeded
BM-->>MAS: Err(MessageTooLarge)
MAS->>MAS: remove PartialAssembly
MAS-->>Conn: Err(MessageTooLarge)
else size ok
BM-->>MAS: Ok(new_len)
MAS->>BM: check_aggregate_budgets(key, buffered_total, body.len, budgets)
alt aggregate budget exceeded
BM-->>MAS: Err(ConnectionBudgetExceeded or InFlightBudgetExceeded)
MAS->>MAS: remove PartialAssembly
MAS-->>Conn: Err(MessageAssemblyError)
else aggregate ok
BM-->>MAS: Ok(())
MAS->>MAS: append body to PartialAssembly
MAS-->>Conn: Ok(maybe_completed_message)
end
end
deactivate MAS
Class diagram for message assembly budget enforcement typesclassDiagram
class MessageAssemblyState {
- NonZeroUsize max_message_size
- Duration timeout
- HashMap~MessageKey, PartialAssembly~ assemblies
- AggregateBudgets budgets
+ new(max_message_size: NonZeroUsize, timeout: Duration) MessageAssemblyState
+ with_budgets(max_message_size: NonZeroUsize, timeout: Duration, connection_budget: Option~NonZeroUsize~, in_flight_budget: Option~NonZeroUsize~) MessageAssemblyState
+ total_buffered_bytes() usize
+ buffered_count() usize
+ accept_first_frame_at(input: FirstFrameInput, now: Instant) Result~Option~CompletedMessage~~
+ accept_continuation_frame_at(header: ContinuationHeader, body: Vec~u8~, now: Instant) Result~Option~CompletedMessage~~
+ purge_expired_at(now: Instant) usize
}
class AggregateBudgets {
+ Option~NonZeroUsize~ connection
+ Option~NonZeroUsize~ in_flight
}
class PartialAssembly {
- Vec~u8~ metadata
- Vec~u8~ body_buffer
+ accumulated_len() usize
+ buffered_bytes() usize
}
class MessageAssemblyError {
<<enum>>
Series
DuplicateFirstFrame
MessageTooLarge
ConnectionBudgetExceeded
InFlightBudgetExceeded
}
class BudgetModule {
<<module>>
+ check_size_limit(max_message_size: NonZeroUsize, key: MessageKey, accumulated: usize, body_len: usize) Result~usize, MessageAssemblyError~
+ check_aggregate_budgets(key: MessageKey, total_buffered: usize, incoming_bytes: usize, budgets: AggregateBudgets) Result~(), MessageAssemblyError~
}
class MemoryBudgets {
+ bytes_per_message() BudgetBytes
+ bytes_per_connection() BudgetBytes
+ bytes_in_flight() BudgetBytes
}
class BudgetBytes {
+ get() NonZeroUsize
}
class AssemblyRuntimeModule {
<<module>>
+ new_message_assembly_state(fragmentation: Option~FragmentationConfig~, frame_budget: usize, memory_budgets: Option~MemoryBudgets~) MessageAssemblyState
}
class Connection {
- Option~FragmentationConfig~ fragmentation
- usize requested_frame_length
- Option~MemoryBudgets~ memory_budgets
+ run_inbound_loop()
}
MessageAssemblyState *-- PartialAssembly
MessageAssemblyState *-- AggregateBudgets
BudgetModule ..> MessageAssemblyError
BudgetModule ..> AggregateBudgets
PartialAssembly ..> MessageAssemblyState : used_by
AssemblyRuntimeModule ..> MessageAssemblyState
AssemblyRuntimeModule ..> MemoryBudgets
MemoryBudgets *-- BudgetBytes
Connection ..> AssemblyRuntimeModule
MessageAssemblyState ..> BudgetModule
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/steps/budget_enforcement_steps.rs Comment on file fn given_budgeted_state(
budget_enforcement_world: &mut BudgetEnforcementWorld,
max_msg: usize,
timeout: u64,
conn: usize,
flight: usize,
) -> TestResult {
budget_enforcement_world.init_budgeted_state(BudgetedStateConfig {
max_message_size: max_msg,
timeout_secs: timeout,
connection_budget: conn,
in_flight_budget: flight,
})
}❌ New issue: Excess Number of Function Arguments |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/message_assembler/budget_tests.rs Comment on file fn submit_first(
state: &mut MessageAssemblyState,
key: u64,
body: &[u8],
is_last: bool,
) -> Result<Option<crate::message_assembler::AssembledMessage>, MessageAssemblyError> {
let header = first_header(key, body.len(), is_last);
let input =
FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], body).expect("valid");
state.accept_first_frame(input)
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/message_assembler/budget_tests.rs Comment on file fn submit_first(
state: &mut MessageAssemblyState,
key: u64,
body: &[u8],
is_last: bool,
) -> Result<Option<crate::message_assembler::AssembledMessage>, MessageAssemblyError> {
let header = first_header(key, body.len(), is_last);
let input =
FirstFrameInput::new(&header, EnvelopeRouting::default(), vec![], body).expect("valid");
state.accept_first_frame(input)
}❌ New issue: Code Duplication |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/steps/budget_enforcement_steps.rs Comment on file fn given_budgeted_state(
budget_enforcement_world: &mut BudgetEnforcementWorld,
max_msg: usize,
timeout: u64,
conn: usize,
flight: usize,
) -> TestResult {
budget_enforcement_world.init_budgeted_state(BudgetedStateConfig {
max_message_size: max_msg,
timeout_secs: timeout,
connection_budget: conn,
in_flight_budget: flight,
})
}❌ New issue: Excess Number of Function Arguments |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
…gets during assembly Implement runtime budget enforcement for message assembly covering bytes per message, bytes per connection, and bytes in flight. Frames exceeding configured budgets are rejected, partial assemblies freed, and errors surfaced via existing deserialization failure paths. - Add budget enforcement helpers in a new module `message_assembler/budget.rs`. - Extend `MessageAssemblyState` with optional connection and in-flight budget fields. - Introduce `with_budgets()` constructor to initialize budgets. - Wire budget checks into frame acceptance methods for first and continuation frames. - Add new error variants `ConnectionBudgetExceeded` and `InFlightBudgetExceeded`. - Update integration layer to pass `MemoryBudgets` to assembly state. - Include extensive unit and behavioral tests covering all budget enforcement scenarios. - Update user guide, design docs, and roadmap to reflect enforcement implementation. Closes roadmap item 8.3.2. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…input creation Encapsulate the construction and validation of FirstFrameHeader and FirstFrameInput into the `create_first_frame_input!` macro. This reduces code duplication and ensures consistent setup of first-frame submission helpers in budget_tests.rs. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
695b8b7 to
7cc96db
Compare
… tests - Implement budget enforcement checks for connection and in-flight aggregate limits. - Add `is_enabled` method on `AggregateBudgets` to optimize budget enforcement gating. - Abort and free partial assemblies on budget violations, surfacing errors. - Add extensive behavioural tests covering budget enforcement scenarios: connection limits, in-flight limits, dual budgets, isolation, headroom reclamation, and single-frame message bypass. - Update documentation to reflect new budget enforcement behavior and terminology. - Refactor existing tests to move enforcement coverage into separate submodule. - Ensure backward compatibility where no budgets mean no enforcement. - Minor fixes to documentation for clarity and style. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
This comment was marked as resolved.
This comment was marked as resolved.
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 4463 🏁 Script executed: Length of output: 1353
✅ Resolved (5 out of 6)
❌ Not Resolved (1 remaining)
let max = NonZeroUsize::MIN;
Self {
state: MessageAssemblyState::new(max, Duration::from_secs(30)),This means any uninitialised world accepting a multi-byte body will fail with |
Changed the default budget limit in BudgetEnforcementWorld to 64 KB from the minimum possible value. This more forgiving default allows test scenarios that forget to initialize budgeted state to fail with a clear budget error instead of a confusing MessageTooLarge error on multi-byte bodies. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/adr-002-streaming-requests-and-shared-message-assembly.md`:
- Line 311: The sentence "On budget violation the offending partial assembly is
freed and the error" needs a comma before the coordinating conjunction; update
the text in docs/ADR to read "On budget violation the offending partial assembly
is freed, and the error" so the two independent clauses are clearly separated
(locate the sentence starting "On budget violation the offending partial
assembly..." and insert the comma before "and").
In `@docs/execplans/8-3-2-budget-enforcement.md`:
- Around line 14-26: The prose in docs/execplans/8-3-2-budget-enforcement.md
(notably the Purpose and Success sections) contains long lines that violate the
80-column wrap rule; reflow those paragraphs and any long bullet lines to 80
characters or less so markdownlint passes, preserving existing wording and
bullet structure and keeping headings and the
`8.3.2`/`MemoryBudgets`/`WireframeApp` references intact.
- Around line 28-31: Replace British -ise spellings with Oxford -ize forms in
the documentation: change every occurrence of "deserialisation" to
"deserialization" (including the phrase "deserialization-failure policy") and
"optimisation" to "optimization" (and any other -ise variants like
"optimise"/"optimize") throughout the file; update the affected sentences to use
the -ize spelling while preserving original punctuation and casing.
In `@docs/users-guide.md`:
- Around line 570-577: Reflow the paragraph about the message assembly subsystem
to wrap at ~80 characters per line, standardise spelling to the en-GB-oxendict
form using “-ize” variants (e.g., "standardize" style) throughout the paragraph,
and add a comma before "since" in the final sentence so it reads "...are never
counted against aggregate budgets, since they do not buffer."; locate the block
containing the text starting "When budgets are configured, the message assembly
subsystem..." and apply these edits to that paragraph.
In `@src/message_assembler/budget_enforcement_tests.rs`:
- Around line 28-147: Collapse the duplicated connection/in-flight budget tests
into parameterised rstest cases by replacing the near-identical blocks that use
the fixtures connection_budgeted_state and in_flight_budgeted_state with a
single #[rstest] that iterates over (state_fixture, expected_error_variant)
pairs; update the tests that call submit_first, accept_continuation_frame,
continuation_header and assert on
state.buffered_count()/state.total_buffered_bytes() to use the parameterised
state (MessageAssemblyState) and match expected
MessageAssemblyError::ConnectionBudgetExceeded vs
MessageAssemblyError::InFlightBudgetExceeded accordingly; ensure helper names
submit_first, accept_continuation_frame, continuation_header, buffered_count and
total_buffered_bytes are used unchanged and that match patterns reference
MessageAssemblyError::ConnectionBudgetExceeded and
MessageAssemblyError::InFlightBudgetExceeded so behavior and assertions remain
identical.
In `@tests/steps/budget_enforcement_steps.rs`:
- Around line 16-34: The test step function given_budgeted_state currently takes
four separate params (max_msg, timeout, conn, flight) and immediately builds a
BudgetedStateConfig, triggering the clippy too-many-arguments lint and requiring
an #[expect] suppression; change the step to accept a single BudgetedStateConfig
parameter (parsed from the Cucumber step or by updating the feature to supply a
structured input) and pass it straight into
BudgetEnforcementWorld::init_budgeted_state, then remove the
#[expect(clippy::too_many_arguments, ...)] annotation so the lint no longer
applies.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (21)
docs/adr-002-streaming-requests-and-shared-message-assembly.mddocs/execplans/8-3-2-budget-enforcement.mddocs/roadmap.mddocs/users-guide.mdsrc/app/frame_handling/assembly.rssrc/app/inbound_handler.rssrc/app/memory_budgets.rssrc/message_assembler/budget.rssrc/message_assembler/budget_enforcement_tests.rssrc/message_assembler/budget_tests.rssrc/message_assembler/error.rssrc/message_assembler/mod.rssrc/message_assembler/state.rssrc/message_assembler/tests.rstests/features/budget_enforcement.featuretests/fixtures/budget_enforcement.rstests/fixtures/mod.rstests/scenarios/budget_enforcement_scenarios.rstests/scenarios/mod.rstests/steps/budget_enforcement_steps.rstests/steps/mod.rs
|
@coderabbitai Have the following now been resolved? ❌ Not Resolved (1 remaining)
let max = NonZeroUsize::MIN;
Self {
state: MessageAssemblyState::new(max, Duration::from_secs(30)),This means any uninitialised world accepting a multi-byte body will fail with |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 817
The current implementation at lines 51-54 of // Use a forgiving default so that scenarios which forget to call
// init_budgeted_state fail with a clear budget error rather than a
// confusing MessageTooLarge on any multi-byte body.
let max = NonZeroUsize::new(64 * 1024).unwrap_or(NonZeroUsize::MIN);The default All review comments from the original code review have now been addressed. 🎉 🐇 |
…nection and in-flight budgets Refactor budget enforcement tests to use parameterized rstest cases covering both connection and in-flight budget dimensions. Introduce helper functions for constructing states and asserting budget exceeded errors uniformly. This reduces duplicated code and improves test coverage. Additional cleanup in test comments and feature steps to parse budget config strings. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Summary
Changes
Core functionality
src/message_assembler/budget.rsAggregateBudgetsstruct to package connection and in-flight limitscheck_aggregate_budgetsto enforce budgets across all in-flight assembliescheck_size_limitto enforce per-message size limit (reused from existing logic)MessageAssemblyState::with_budgets(...)constructor (wrapping existing constructor)total_buffered_bytes(&self) -> usizeto report aggregate buffered bytes across all in-flight assembliesaccept_first_frame_at()andaccept_continuation_frame_at()(abort and free partial buffers on violation)min(max_message_size, bytes_per_message)when budgets are configuredAPI / Module wiring
ConnectionBudgetExceededandInFlightBudgetExceededvariants insrc/message_assembler/error.rssrc/message_assembler/mod.rssrc/app/frame_handling/assembly.rs: new signature fornew_message_assembly_stateto accept optionalMemoryBudgetsand apply budget-sourced limitssrc/app/connection.rs: passesself.memory_budgetsinto the assembly state creationTests
src/message_assembler/budget.rshelpers tested viasrc/message_assembler/budget_tests.rstests/features/budget_enforcement.featuretests/fixtures/budget_enforcement.rs,tests/steps/budget_enforcement_steps.rstests/scenarios/budget_enforcement_scenarios.rstests/fixtures/mod.rs,tests/steps/mod.rs, andtests/scenarios/mod.rsDocumentation
MessageAssemblyStateand interaction withMemoryBudgets(docs/adr-002-streaming-requests-and-shared-message-assembly.md)How to test locally
Validation and acceptance criteria
MemoryBudgetsis not configuredArtefacts
New files added:
Modified files:
📎 Task: https://www.devboxer.com/task/200e5f5a-08ad-4d8e-95be-2332833a7927