Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/adr-002-streaming-requests-and-shared-message-assembly.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,29 @@ Precedence is:
total buffered bytes from exceeding the limit. The hard cap catches the edge
case where this invariant is violated.

#### Implementation decisions (2026-02-27)

- Roadmap item `8.3.5` implements derived default memory budgets. When
`WireframeApp::memory_budgets(...)` is not called, sensible defaults are
derived at connection time from `codec.max_frame_length()` (the frame budget
set via `buffer_capacity()`).
- Multipliers applied to `frame_budget`:
- `bytes_per_message = frame_budget × 16` (aligned with fragmentation's
`DEFAULT_MESSAGE_SIZE_MULTIPLIER`).
- `bytes_per_connection = frame_budget × 64` (allows four concurrent
message assemblies at maximum message size).
- `bytes_in_flight = frame_budget × 64` (same as per-connection; the
aggregate cap is a single value).
- Derivation is lazy — budgets are computed at runtime in `process_stream()`
via `resolve_effective_budgets()`, not stored on the builder. Changing
`buffer_capacity` or swapping codecs adjusts derived defaults automatically.
- `default_memory_budgets()` lives in `src/app/builder_defaults.rs` alongside
`default_fragmentation()`, following the same multiplier-from-frame-budget
pattern. `resolve_effective_budgets()` lives in
`src/app/frame_handling/backpressure.rs`, co-located with budget evaluation.
- Explicit budgets always take precedence over derived defaults (ADR-002
precedence rule: explicit > global caps > derived defaults).

#### Budget enforcement

- Budgets MUST cover: bytes buffered per message, bytes buffered per
Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ and standardized per-connection memory budgets.
- [x] 8.3.3. Implement soft limit (back-pressure by pausing reads) behaviour.
- [x] 8.3.4. Implement hard cap (abort early, release partial state, surface
`InvalidData`) behaviour.
- [ ] 8.3.5. Define derived defaults based on `buffer_capacity` when budgets
- [x] 8.3.5. Define derived defaults based on `buffer_capacity` when budgets
are not set explicitly.
- [ ] 8.3.6. Write tests for budget enforcement, back-pressure, and cleanup
semantics.
Expand Down
18 changes: 18 additions & 0 deletions docs/users-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,24 @@ Wireframe provides a three-tier protection model for inbound memory budgets:
`InvalidData`. This is a defence-in-depth safety net; under normal
operation, per-frame enforcement prevents this state from being reached.

#### Derived budget defaults

When `memory_budgets(...)` is not called on the builder, Wireframe derives
sensible defaults automatically from `buffer_capacity` (the codec's
`max_frame_length()`). The derived values use the same multiplier pattern as
fragmentation defaults:

| Budget field | Multiplier | Default (1024-byte frame) |
| ---------------------- | -------------------- | ------------------------- |
| `bytes_per_message` | `frame_budget × 16` | 16 KiB |
| `bytes_per_connection` | `frame_budget × 64` | 64 KiB |
| `bytes_in_flight` | `frame_budget × 64` | 64 KiB |

All three protection tiers (per-frame enforcement, soft-limit read pacing, and
hard-cap connection abort) are active with derived defaults. Changing
`buffer_capacity` adjusts derived budgets proportionally. Calling
`.memory_budgets(...)` overrides the derived defaults entirely.

#### Message key multiplexing (8.2.3)

The `MessageAssemblyState` type manages multiple concurrent message assemblies
Expand Down
114 changes: 113 additions & 1 deletion src/app/builder_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@

use std::{num::NonZeroUsize, time::Duration};

use crate::{codec::clamp_frame_length, fragment::FragmentationConfig};
use crate::{
app::memory_budgets::{BudgetBytes, MemoryBudgets},
codec::clamp_frame_length,
fragment::FragmentationConfig,
};

pub(super) const MIN_READ_TIMEOUT_MS: u64 = 1;
pub(super) const MAX_READ_TIMEOUT_MS: u64 = 86_400_000;
/// Default preamble read timeout in milliseconds.
pub(super) const DEFAULT_READ_TIMEOUT_MS: u64 = 100;
const DEFAULT_FRAGMENT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_MESSAGE_SIZE_MULTIPLIER: usize = 16;
const DEFAULT_MESSAGE_BUDGET_MULTIPLIER: usize = DEFAULT_MESSAGE_SIZE_MULTIPLIER;
const DEFAULT_CONNECTION_BUDGET_MULTIPLIER: usize = 64;
const DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER: usize = 64;

pub(super) fn default_fragmentation(frame_budget: usize) -> Option<FragmentationConfig> {
let frame_budget = clamp_frame_length(frame_budget);
Expand All @@ -20,3 +27,108 @@ pub(super) fn default_fragmentation(frame_budget: usize) -> Option<Fragmentation
FragmentationConfig::for_frame_budget(frame_budget, limit, DEFAULT_FRAGMENT_TIMEOUT)
})
}

/// Derive a single [`BudgetBytes`] value from a frame budget and multiplier.
///
/// The `unwrap_or(NonZeroUsize::MIN)` fallback is effectively unreachable
/// because [`clamp_frame_length`] guarantees `frame_budget >= 64`, but it
/// satisfies the `NonZeroUsize` invariant without using the forbidden
/// `.unwrap()` lint.
fn derive_budget(frame_budget: usize, multiplier: usize) -> BudgetBytes {
let bytes =
NonZeroUsize::new(frame_budget.saturating_mul(multiplier)).unwrap_or(NonZeroUsize::MIN);
BudgetBytes::new(bytes)
}

/// Derive sensible memory budgets from the codec frame budget.
///
/// Mirrors the pattern of [`default_fragmentation`], which derives
/// fragmentation settings from the same frame budget. The frame budget is
/// clamped to [`crate::codec::MIN_FRAME_LENGTH`]..=
/// [`crate::codec::MAX_FRAME_LENGTH`] before applying multipliers.
///
/// The per-message multiplier (16) is deliberately aligned with
/// `DEFAULT_MESSAGE_SIZE_MULTIPLIER` used by fragmentation defaults so
/// that the two guards agree on the maximum logical message size.
#[must_use]
pub(super) fn default_memory_budgets(frame_budget: usize) -> MemoryBudgets {
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
let frame_budget = clamp_frame_length(frame_budget);
MemoryBudgets::new(
derive_budget(frame_budget, DEFAULT_MESSAGE_BUDGET_MULTIPLIER),
derive_budget(frame_budget, DEFAULT_CONNECTION_BUDGET_MULTIPLIER),
derive_budget(frame_budget, DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER),
)
}

#[cfg(test)]
mod tests {
use rstest::rstest;

use super::*;
use crate::codec::{MAX_FRAME_LENGTH, MIN_FRAME_LENGTH};

#[rstest]
#[case(1024)]
#[case(4096)]
fn default_budgets_scale_and_use_expected_multipliers(#[case] frame_budget: usize) {
let budgets = default_memory_budgets(frame_budget);
assert_eq!(
frame_budget * DEFAULT_MESSAGE_BUDGET_MULTIPLIER,
budgets.bytes_per_message().as_usize()
);
assert_eq!(
frame_budget * DEFAULT_CONNECTION_BUDGET_MULTIPLIER,
budgets.bytes_per_connection().as_usize()
);
assert_eq!(
frame_budget * DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER,
budgets.bytes_in_flight().as_usize()
);
}

#[test]
fn default_budgets_clamp_minimum_frame_budget() {
let budgets = default_memory_budgets(10);
assert_eq!(
MIN_FRAME_LENGTH * DEFAULT_MESSAGE_BUDGET_MULTIPLIER,
budgets.bytes_per_message().as_usize()
);
assert_eq!(
MIN_FRAME_LENGTH * DEFAULT_CONNECTION_BUDGET_MULTIPLIER,
budgets.bytes_per_connection().as_usize()
);
assert_eq!(
MIN_FRAME_LENGTH * DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER,
budgets.bytes_in_flight().as_usize()
);
}

#[test]
fn default_budgets_clamp_maximum_frame_budget() {
let budgets = default_memory_budgets(MAX_FRAME_LENGTH + 1);
assert_eq!(
MAX_FRAME_LENGTH * DEFAULT_MESSAGE_BUDGET_MULTIPLIER,
budgets.bytes_per_message().as_usize()
);
assert_eq!(
MAX_FRAME_LENGTH * DEFAULT_CONNECTION_BUDGET_MULTIPLIER,
budgets.bytes_per_connection().as_usize()
);
assert_eq!(
MAX_FRAME_LENGTH * DEFAULT_IN_FLIGHT_BUDGET_MULTIPLIER,
budgets.bytes_in_flight().as_usize()
);
}

#[test]
fn default_budgets_message_budget_aligns_with_fragmentation() {
let frame_budget = 2048_usize;
let budgets = default_memory_budgets(frame_budget);
let expected = frame_budget * DEFAULT_MESSAGE_SIZE_MULTIPLIER;
assert_eq!(
expected,
budgets.bytes_per_message().as_usize(),
"per-message budget does not match fragmentation multiplier"
);
}
}
18 changes: 17 additions & 1 deletion src/app/frame_handling/backpressure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::time::Duration;
use log::{debug, warn};
use tokio::{io, time::sleep};

use crate::{app::MemoryBudgets, message_assembler::MessageAssemblyState};
use crate::{
app::{MemoryBudgets, builder_defaults::default_memory_budgets},
message_assembler::MessageAssemblyState,
};

/// Soft-pressure threshold numerator (4/5 == 80%).
const SOFT_LIMIT_NUMERATOR: u128 = 4;
Expand Down Expand Up @@ -124,6 +127,19 @@ fn active_aggregate_limit_bytes(budgets: MemoryBudgets) -> usize {
.min(budgets.bytes_in_flight().as_usize())
}

/// Resolve the effective memory budgets for one connection.
///
/// Returns the explicit budgets if configured, or derives sensible
/// defaults from `frame_budget` using the same multiplier pattern as
/// fragmentation defaults.
#[must_use]
pub(crate) fn resolve_effective_budgets(
explicit: Option<MemoryBudgets>,
frame_budget: usize,
) -> MemoryBudgets {
explicit.unwrap_or_else(|| default_memory_budgets(frame_budget))
}

fn is_at_or_above_soft_limit(buffered_bytes: usize, aggregate_limit: usize) -> bool {
let lhs = (buffered_bytes as u128).saturating_mul(SOFT_LIMIT_DENOMINATOR);
let rhs = (aggregate_limit as u128).saturating_mul(SOFT_LIMIT_NUMERATOR);
Expand Down
72 changes: 71 additions & 1 deletion src/app/frame_handling/backpressure_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use std::{error::Error, io, num::NonZeroUsize, time::Duration};
use rstest::{fixture, rstest};

use super::{
backpressure::{MemoryPressureAction, has_hard_cap_been_breached, should_pause_inbound_reads},
backpressure::{
MemoryPressureAction,
has_hard_cap_been_breached,
resolve_effective_budgets,
should_pause_inbound_reads,
},
evaluate_memory_pressure,
};
use crate::{
Expand Down Expand Up @@ -266,3 +271,68 @@ fn evaluate_pause_uses_expected_duration(budgets: TestResult<MemoryBudgets>) ->
other => Err(io::Error::other(format!("expected Pause, got {other:?}")).into()),
}
}

// ---------- resolve_effective_budgets tests ----------

#[rstest]
fn resolve_returns_explicit_budgets_when_configured() -> TestResult {
let explicit = custom_budgets(512, 2048, 4096)?;
let resolved = resolve_effective_budgets(Some(explicit), 1024);
if resolved != explicit {
return Err(io::Error::other(format!(
"expected explicit budgets {explicit:?}, got {resolved:?}"
))
.into());
}
Ok(())
}

#[rstest]
fn resolve_returns_derived_budgets_when_none() -> TestResult {
let resolved = resolve_effective_budgets(None, 1024);
if resolved.bytes_per_message().as_usize() != 16_384 {
return Err(io::Error::other(format!(
"expected derived bytes_per_message=16384, got {}",
resolved.bytes_per_message().as_usize()
))
.into());
}
if resolved.bytes_per_connection().as_usize() != 65_536 {
return Err(io::Error::other(format!(
"expected derived bytes_per_connection=65536, got {}",
resolved.bytes_per_connection().as_usize()
))
.into());
}
if resolved.bytes_in_flight().as_usize() != 65_536 {
return Err(io::Error::other(format!(
"expected derived bytes_in_flight=65536, got {}",
resolved.bytes_in_flight().as_usize()
))
.into());
}
Ok(())
}

#[rstest]
fn resolve_derived_budgets_change_with_frame_budget() -> TestResult {
let small = resolve_effective_budgets(None, 512);
let large = resolve_effective_budgets(None, 2048);
if small.bytes_per_message().as_usize() >= large.bytes_per_message().as_usize() {
return Err(io::Error::other(format!(
"expected smaller frame budget to yield smaller per_message: {} vs {}",
small.bytes_per_message().as_usize(),
large.bytes_per_message().as_usize()
))
.into());
}
if small.bytes_per_connection().as_usize() >= large.bytes_per_connection().as_usize() {
return Err(io::Error::other(format!(
"expected smaller frame budget to yield smaller per_connection: {} vs {}",
small.bytes_per_connection().as_usize(),
large.bytes_per_connection().as_usize()
))
.into());
}
Ok(())
}
6 changes: 5 additions & 1 deletion src/app/frame_handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ pub(crate) use assembly::{
new_message_assembly_state,
purge_expired_assemblies,
};
pub(crate) use backpressure::{apply_memory_pressure, evaluate_memory_pressure};
pub(crate) use backpressure::{
apply_memory_pressure,
evaluate_memory_pressure,
resolve_effective_budgets,
};
pub(crate) use decode::decode_envelope;
pub(crate) use reassembly::reassemble_if_needed;
pub(crate) use response::forward_response;
Expand Down
6 changes: 4 additions & 2 deletions src/app/inbound_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,14 @@ where
);
}
framed.read_buffer_mut().reserve(max_frame_length);
let effective_budgets =
frame_handling::resolve_effective_budgets(self.memory_budgets, requested_frame_length);
let mut deser_failures = 0u32;
let mut message_assembly = self.message_assembler.as_ref().map(|_| {
frame_handling::new_message_assembly_state(
self.fragmentation,
requested_frame_length,
self.memory_budgets,
Some(effective_budgets),
)
});
let mut pipeline = FramePipeline::new(self.fragmentation);
Expand All @@ -280,7 +282,7 @@ where
loop {
let pressure = frame_handling::evaluate_memory_pressure(
message_assembly.as_ref(),
self.memory_budgets,
Some(effective_budgets),
);
frame_handling::apply_memory_pressure(pressure, || {
purge_expired(&mut pipeline, &mut message_assembly);
Expand Down
21 changes: 21 additions & 0 deletions tests/features/derived_memory_budgets.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
@derived_memory_budgets
Feature: Derived memory budget defaults from buffer capacity
When memory budgets are not set explicitly, sensible defaults are derived
from the codec buffer capacity, enabling all three tiers of protection.

Scenario: Derived budgets enforce per-connection limit
Given a derived-budget app with buffer capacity 512
When derived-budget first frames for keys 1 to 95 each with body size 400 arrive
Then the derived-budget connection terminates with an error

Scenario: Derived budgets allow frames within limits
Given a derived-budget app with buffer capacity 512
When a derived-budget first frame for key 1 with body "bb" arrives
And a derived-budget final continuation for key 1 sequence 1 with body "cc" arrives
Then derived-budget payload "bbcc" is eventually received
And no derived-budget connection error is recorded

Scenario: Explicit budgets override derived defaults
Given a derived-budget app with buffer capacity 512 and explicit budgets 2048/8/8
When derived-budget first frames for keys 1 to 11 each with body "aaaaaaaa" arrive
Then the derived-budget connection terminates with an error
Loading
Loading