Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/adr-002-streaming-requests-and-shared-message-assembly.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,25 @@ Precedence is:
back-pressure behaviour, and derived defaults remain in follow-on items
`8.3.2` through `8.3.5`.

#### Implementation decisions (2026-02-21)

- Roadmap item `8.3.2` implements hard-cap budget enforcement in
`MessageAssemblyState`. Budget tracking is embedded directly in the assembly
state rather than a separate wrapper, since the state already owns all
assembly lifecycle.
- A new `with_budgets()` constructor accepts optional `connection_budget`
and `in_flight_budget` limits. The integration layer (`assembly.rs`) threads
`MemoryBudgets` from `WireframeApp` and computes the effective per-message
limit as `min(max_message_size, bytes_per_message)`.
- Budget checks are applied after single-frame early return (single-frame
messages are never buffered and skip aggregate checks).
- On budget violation the offending partial assembly is freed, and the error
is surfaced as `ConnectionBudgetExceeded` or `InFlightBudgetExceeded`, both
routed through the existing `DeserFailureTracker` as `InvalidData`.
- Budget enforcement helpers are extracted to `src/message_assembler/budget.rs`
to keep `state.rs` under the 400-line file limit.
- Back-pressure (`8.3.3`) and derived defaults (`8.3.5`) remain future work.

#### Budget enforcement

- Budgets MUST cover: bytes buffered per message, bytes buffered per
Expand Down
616 changes: 616 additions & 0 deletions docs/execplans/8-3-2-budget-enforcement.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ and standardized per-connection memory budgets.
### 8.3. Per-connection memory budgets

- [x] 8.3.1. Add `WireframeApp::memory_budgets(...)` builder method.
- [ ] 8.3.2. Implement budget enforcement covering bytes per message, bytes
- [x] 8.3.2. Implement budget enforcement covering bytes per message, bytes
per connection, and bytes across in-flight assemblies.
- [ ] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour.
- [ ] 8.3.4. Implement hard cap (abort early, release partial state, surface
Expand Down
11 changes: 8 additions & 3 deletions docs/users-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,14 @@ let _app = WireframeApp::new()
.read_timeout_ms(250);
```

This builder method establishes configuration for the current `WireframeApp`
instance. Runtime enforcement and derived defaults are implemented in the
subsequent roadmap items covering memory budget enforcement behaviour.
When budgets are configured, the message assembly subsystem enforces them at
frame acceptance time. Frames that would cause the total buffered bytes to
exceed the per-connection or in-flight budget are rejected, the offending
partial assembly is freed, and the failure is surfaced through the existing
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
deserialization-failure policy (`InvalidData`). The effective per-message limit
is the minimum of the fragmentation `max_message_size` and the configured
`bytes_per_message`. Single-frame messages that complete immediately are never
counted against aggregate budgets, since they do not buffer.

#### Message key multiplexing (8.2.3)

Expand Down
23 changes: 20 additions & 3 deletions src/app/frame_handling/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::debug;

use super::core::DeserFailureTracker;
use crate::{
app::{Envelope, builder_defaults::default_fragmentation},
app::{Envelope, builder_defaults::default_fragmentation, memory_budgets::MemoryBudgets},
codec::clamp_frame_length,
fragment::FragmentationConfig,
message_assembler::{
Expand Down Expand Up @@ -42,21 +42,38 @@ impl<'a> AssemblyRuntime<'a> {
}

/// Build a connection-scoped message assembly state from known budgets.
///
/// When `memory_budgets` is `Some`, the effective per-message limit is
/// `min(fragmentation_max, bytes_per_message)` and the connection/in-flight
/// budgets are forwarded to [`MessageAssemblyState::with_budgets`].
#[must_use]
pub(crate) fn new_message_assembly_state(
fragmentation: Option<FragmentationConfig>,
frame_budget: usize,
memory_budgets: Option<MemoryBudgets>,
) -> MessageAssemblyState {
let config = fragmentation.or_else(|| default_fragmentation(frame_budget));
let max_message_size = config.map_or_else(
let frag_max = config.map_or_else(
|| NonZeroUsize::new(clamp_frame_length(frame_budget)).unwrap_or(NonZeroUsize::MIN),
|cfg| cfg.max_message_size,
);
let timeout = config.map_or(DEFAULT_MESSAGE_ASSEMBLY_TIMEOUT, |cfg| {
cfg.reassembly_timeout
});

MessageAssemblyState::new(max_message_size, timeout)
match memory_budgets {
Some(budgets) => {
let per_message = budgets.bytes_per_message().get();
let max_message_size = frag_max.min(per_message);
MessageAssemblyState::with_budgets(
max_message_size,
timeout,
Some(budgets.bytes_per_connection().get()),
Some(budgets.bytes_in_flight().get()),
)
}
None => MessageAssemblyState::new(frag_max, timeout),
}
}

/// Purge stale in-flight assemblies.
Expand Down
6 changes: 5 additions & 1 deletion src/app/inbound_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ where
framed.read_buffer_mut().reserve(max_frame_length);
let mut deser_failures = 0u32;
let mut message_assembly = self.message_assembler.as_ref().map(|_| {
frame_handling::new_message_assembly_state(self.fragmentation, requested_frame_length)
frame_handling::new_message_assembly_state(
self.fragmentation,
requested_frame_length,
self.memory_budgets,
)
});
let mut pipeline = FramePipeline::new(self.fragmentation);
let timeout_dur = Duration::from_millis(self.read_timeout_ms);
Expand Down
4 changes: 2 additions & 2 deletions src/app/memory_budgets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//!
//! `MemoryBudgets` stores explicit byte caps configured on
//! [`crate::app::WireframeApp`] via `WireframeApp::memory_budgets(...)`.
//! This module only exposes configuration; enforcement lands in a future
//! iteration.
//! Enforcement is wired through `MessageAssemblyState::with_budgets` so
//! that frames exceeding any budget dimension are rejected at assembly time.

use std::num::NonZeroUsize;

Expand Down
105 changes: 105 additions & 0 deletions src/message_assembler/budget.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//! Budget enforcement helpers for message assembly.
//!
//! This module provides aggregate budget checks applied during frame
//! acceptance. Per-message size limits are handled by the existing
//! `check_size_limit` function in [`super::state`]; this module adds
//! per-connection and in-flight aggregate budget enforcement.

use std::num::NonZeroUsize;

use super::{MessageKey, error::MessageAssemblyError};

/// Paired connection and in-flight budget limits.
///
/// Bundled into a struct so call-sites pass a single value instead of two
/// `Option<NonZeroUsize>` parameters.
#[derive(Clone, Copy, Debug)]
pub(super) struct AggregateBudgets {
pub(super) connection: Option<NonZeroUsize>,
pub(super) in_flight: Option<NonZeroUsize>,
}

impl AggregateBudgets {
/// Returns `true` when at least one aggregate budget limit is configured.
pub(super) const fn is_enabled(&self) -> bool {
self.connection.is_some() || self.in_flight.is_some()
}
}

/// Check whether accepting `additional_bytes` for `key` would exceed
/// the connection budget or in-flight budget.
///
/// Both budgets are checked against the same `current_total` because,
/// at this layer, all buffered bytes are assembly bytes. The dimensions
/// are kept separate so future work (streaming body buffers, transport
/// buffering) can diverge them.
///
/// # Errors
///
/// Returns [`MessageAssemblyError::ConnectionBudgetExceeded`] or
/// [`MessageAssemblyError::InFlightBudgetExceeded`] when the respective
/// limit would be exceeded.
pub(super) fn check_aggregate_budgets(
key: MessageKey,
current_total: usize,
additional_bytes: usize,
budgets: &AggregateBudgets,
) -> Result<(), MessageAssemblyError> {
let new_total = current_total.saturating_add(additional_bytes);

if let Some(limit) = budgets.connection
&& new_total > limit.get()
{
return Err(MessageAssemblyError::ConnectionBudgetExceeded {
key,
attempted: new_total,
limit,
});
}

if let Some(limit) = budgets.in_flight
&& new_total > limit.get()
{
return Err(MessageAssemblyError::InFlightBudgetExceeded {
key,
attempted: new_total,
limit,
});
}

Ok(())
}

/// Check if accumulated size plus new body would exceed the per-message
/// size limit.
///
/// Returns the new total size on success.
///
/// # Errors
///
/// Returns [`MessageAssemblyError::MessageTooLarge`] when the new total
/// would exceed `max_message_size`.
pub(super) fn check_size_limit(
max_message_size: NonZeroUsize,
key: MessageKey,
accumulated: usize,
body_len: usize,
) -> Result<usize, MessageAssemblyError> {
let Some(new_len) = accumulated.checked_add(body_len) else {
return Err(MessageAssemblyError::MessageTooLarge {
key,
attempted: usize::MAX,
limit: max_message_size,
});
};

if new_len > max_message_size.get() {
return Err(MessageAssemblyError::MessageTooLarge {
key,
attempted: new_len,
limit: max_message_size,
});
}

Ok(new_len)
}
Loading
Loading