Skip to content
Merged
631 changes: 631 additions & 0 deletions docs/execplans/8-6-1-update-documentation-for-streaming-requests.md

Large diffs are not rendered by default.

58 changes: 50 additions & 8 deletions docs/generic-message-fragmentation-and-re-assembly-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,20 @@ When both layers are enabled, Wireframe applies them in the following order:
- a streaming request body that the handler consumes incrementally.
4. Route and invoke handlers.

In operational terms, each layer owns a distinct hand-off:

- the transport codec and optional `FragmentAdapter` are responsible only for
delivering one complete protocol packet;
- `MessageAssembler::parse_frame_header` classifies that packet as either the
first frame or a continuation frame for one logical message;
- Wireframe strips the parsed protocol header, tracks assembly state by
message key, and decides whether the body should stay buffered or become a
stream; and
- handlers observe only the protocol-facing request shape:
- a buffered payload via the existing request path, or
- `RequestParts` plus `RequestBodyStream` / `StreamingBody` for incremental
consumption.

For screen readers: the following sequence diagram shows the decode → fragment
reassembly → message assembly → handler pipeline.

Expand Down Expand Up @@ -468,18 +482,38 @@ on the app instance and is applied in the inbound runtime pipeline in
`WireframeApp` connection handling, after transport reassembly and before
handler dispatch.

For protocol authors, the boundary rule is simple:

- use fragmentation when one protocol packet exceeds the transport frame
budget;
- use `MessageAssembler` when one logical request spans multiple protocol
packets; and
- enable both when a large logical request may itself contain transport
fragments.

Fragmentation is transport plumbing. `MessageAssembler` is protocol continuity.

### 9.3 Memory budget integration

Per-connection memory budgets apply across both layers. Budgets cover:

- bytes buffered per message (existing `max_message_size`);
- bytes buffered per message;
- bytes buffered per connection; and
- bytes buffered across in-flight assemblies.

When budgets are approached, Wireframe applies back-pressure by pausing further
reads and assembly work. When a hard cap is exceeded, Wireframe aborts early,
releases partial state, and surfaces `std::io::ErrorKind::InvalidData` from the
inbound processing path.
These limits are configured through `WireframeApp::memory_budgets(...)`. When
that builder is not called explicitly, Wireframe derives defaults from the
current `buffer_capacity`, so the same protection model still applies.

Budget enforcement is shared across the full inbound assembly pipeline:

1. **Per-frame enforcement** rejects any newly arrived frame that would exceed
the configured message or aggregate caps.
2. **Soft-pressure pacing** pauses inbound reads once buffered bytes reach 80%
of the smaller aggregate budget (`bytes_per_connection` and
`bytes_in_flight`).
3. **Hard-cap abort** terminates the connection if total buffered bytes
strictly exceed that aggregate cap.

The soft-limit implementation paces reads by inserting a short pause before
polling the next inbound frame once buffered bytes reach 80% of the smaller
Expand All @@ -490,9 +524,17 @@ aggregate cap (100%), the connection is terminated immediately with
`InvalidData` as a defence-in-depth safety net.

If both transport fragmentation and `MessageAssembler` are enabled, the
effective message cap is whichever guard triggers first. Operators should set
the fragmentation `max_message_size` and the message assembly per-message cap
to compatible values to avoid surprising early termination.
effective message cap is whichever guard triggers first:

- fragmentation's `max_message_size` for one reassembled transport packet; or
- message assembly's `bytes_per_message` budget for one logical request.

Operators should set those limits to compatible values. Otherwise a request may
be valid at the protocol layer but still terminate early at the transport
boundary, or vice versa.

Single-frame requests that complete immediately do not count against aggregate
buffer budgets because they do not remain buffered in assembly state.

See [ADR 0002][adr-0002] for the complete budget configuration surface and
failure mode semantics.
Expand Down
58 changes: 51 additions & 7 deletions docs/multi-packet-and-streaming-responses-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,18 +577,31 @@ Handlers that opt into streaming request bodies receive two components:
- `RequestBodyStream` yields body chunks incrementally, allowing handlers to
process large payloads without buffering the entire message in memory.

See [ADR 0002][adr-0002] for the canonical type definitions. The exact field
names and types may evolve; protocol authors should consult the published API
documentation for current signatures.
In the current API, `RequestParts` exposes `id()`, `correlation_id()`, and
`metadata()`. `RequestBodyStream` is a pinned boxed stream of
`Result<Bytes, std::io::Error>`, and `RequestBodyReader` adapts that stream to
`AsyncRead` for parser reuse. Handlers can also use the `StreamingBody`
extractor when they prefer an extractor-based signature over taking the stream
type directly.

The key design point is that routing metadata becomes available before the body
is complete. A handler can inspect `RequestParts` immediately, then decide
whether to consume the body lazily, stream it into a parser, or stop early.

### 11.2 Opt-in semantics

The default remains "buffered request" to preserve Wireframe's existing
transparent assembly ergonomics for small messages and simple protocols.
Handlers MAY opt into streaming by declaring a compatible extractor signature.
Handlers opt into streaming by declaring a compatible signature such as:

- `async fn handle(parts: RequestParts, body: RequestBodyStream)`;
- `async fn handle(parts: RequestParts, body: StreamingBody)`; or
- a handler that converts `StreamingBody` into `RequestBodyReader` for
`AsyncRead`-based parsing.

Wireframe MAY expose an `AsyncRead` adaptor for `RequestBodyStream` so protocol
crates can reuse existing parsers that expect `AsyncRead` rather than `Stream`.
If a handler does not opt in, Wireframe preserves the buffered request path.
This keeps the small-message path ergonomic and means protocol crates can adopt
streaming only where large inbound bodies justify the extra complexity.

### 11.3 Symmetry with response streaming

Expand All @@ -602,6 +615,15 @@ Streaming request bodies mirror `Response::Stream`:
Both directions apply the same per-connection memory budgets and back-pressure
semantics, ensuring consistent resource accounting across the duplex channel.

The symmetry is conceptual, not identical in shape:

- outbound streaming emits protocol frames chosen by the handler; and
- inbound streaming emits request-body chunks after Wireframe has already
separated protocol metadata into `RequestParts`.

That separation is what lets request handlers start work before the full body
arrives without exposing lower transport concerns.

### 11.4 Composition with MessageAssembler

When a protocol uses the `MessageAssembler` abstraction for multi-frame message
Expand All @@ -612,7 +634,29 @@ assembly, the assembler produces either:

The assembler handles protocol-specific continuity rules (ordering, missing
frames, duplicate frames) while Wireframe provides shared buffering machinery
and limit enforcement. See [ADR 0002][adr-0002] for the complete specification.
and limit enforcement.

The inbound ordering is fixed:

1. transport framing decodes one packet;
2. optional transport fragmentation reassembles that packet if required;
3. `MessageAssembler` maps protocol packets into one logical request; and
4. the handler receives either a buffered body or `RequestParts` plus a
streaming body.

This means a protocol-specific assembler never needs to understand transport
fragments. It only sees complete protocol packets.

Per-connection memory budgets apply to the shared inbound assembly pipeline.
That includes buffered request assembly and any streaming-request state that
has not yet been consumed by the handler. Wireframe enforces:

- per-frame rejection when a new chunk would exceed the configured caps;
- soft-pressure read pacing at 80% of the smaller aggregate budget; and
- hard-cap abort with `InvalidData` if total buffered bytes strictly exceed the
aggregate cap.

See [ADR 0002][adr-0002] for the complete specification.

[adr-0002]: adr/0002-streaming-requests-and-shared-message-assembly.md

Expand Down
6 changes: 3 additions & 3 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,11 @@ and standardized per-connection memory budgets.

### 8.6. Documentation

- [ ] 8.6.1. Update `generic-message-fragmentation-and-re-assembly-design.md`
- [x] 8.6.1. Update `generic-message-fragmentation-and-re-assembly-design.md`
with composition guidance.
- [ ] 8.6.2. Update `multi-packet-and-streaming-responses-design.md` with a
- [x] 8.6.2. Update `multi-packet-and-streaming-responses-design.md` with a
streaming request body section.
- [ ] 8.6.3. Update
- [x] 8.6.3. Update
`the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md`
with MessageAssembler and budget details.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ incremental processing of large inbound payloads. The default remains buffered
request handling to preserve existing ergonomics for small messages and simple
protocols.

At a high level, `RequestParts` separates routing metadata from the body, and
`RequestBodyStream` yields body chunks as a pinned, boxed stream. See [ADR
0002][adr-0002] for the canonical type definitions; the exact field names and
types shown there are illustrative and may evolve before stabilization.
This is now a concrete part of the public API rather than a future sketch.
`RequestParts` separates routing metadata from the body, while
`RequestBodyStream` yields body chunks as a pinned boxed stream. Protocol
crates can adapt that stream to `AsyncRead` through `RequestBodyReader`, and
extractor-based handlers can use `StreamingBody` for the same underlying
capability.

Completion of a streaming response is signalled by a protocol-defined
terminator frame. The new `stream_end_frame` hook allows implementations to
Expand Down Expand Up @@ -229,6 +231,15 @@ applies them in order: transport reassembly first, then message assembly. This
ensures protocol crates migrating to `MessageAssembler` do not need to
special-case transport fragments.

This capability has moved from design intent to implemented maturity:

- applications register assemblers with
`WireframeApp::with_message_assembler(...)`;
- the inbound runtime path applies the assembler after transport reassembly and
before handler dispatch; and
- handlers receive either a buffered request or a streaming request body based
on the assembler's framing outcome.

## III. Capability Maturity: From Functional to Production-Grade

A feature-complete library is not necessarily a mature one. The road to
Expand Down Expand Up @@ -302,6 +313,14 @@ Rust's ownership model and `Drop` trait are the foundation of resource safety.
partial state, and surfaces `std::io::ErrorKind::InvalidData` from the
inbound processing path.

This now ships as a three-tier protection model:

- per-frame rejection when an incoming frame would exceed the configured
caps;
- soft-pressure read pacing at 80% of the smaller aggregate budget; and
- immediate connection abort when buffered bytes strictly exceed the hard
aggregate cap.

- **Timeouts:** The reassembly logic will include a non-optional,
configurable timeout to automatically purge partial messages that are
abandoned or sent too slowly.
Expand Down
24 changes: 20 additions & 4 deletions docs/users-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,10 @@ assert_eq!(parts.metadata(), &[0x01, 0x02]);
Unlike `PacketParts` (which carries the raw payload for envelope
reconstruction), `RequestParts` carries only protocol-defined metadata required
to interpret the streaming body. The body itself is consumed through a separate
stream, enabling back-pressure and incremental processing.[^46]
stream, enabling back-pressure and incremental processing.[^46] When
`MessageAssembler` is configured, this split happens after any transport
fragment reassembly, so handlers and protocol extractors see protocol-level
metadata rather than lower-level fragment state.

### Message assembler hook

Expand Down Expand Up @@ -833,6 +836,12 @@ When configured, this hook now runs on the inbound connection path after
transport fragmentation reassembly and before handler dispatch. Incomplete
assemblies remain buffered per message key until completion or timeout eviction.

The assembler's output drives the handler-facing request shape:

- fully assembled messages continue through the buffered request path; and
- streaming-capable messages surface as `RequestParts` plus
`RequestBodyStream` / `StreamingBody`.

Message-assembly parsing and continuity failures are treated as inbound
deserialization failures and follow the existing failure threshold policy.

Expand Down Expand Up @@ -875,6 +884,10 @@ is the minimum of the fragmentation `max_message_size` and the configured
`bytes_per_message`. Single-frame messages that complete immediately are never
counted against aggregate budgets, since they do not buffer.

If `memory_budgets(...)` is not configured explicitly, Wireframe derives the
same three fields from `buffer_capacity`, so the protection model remains
active even on the default path.

Wireframe provides a three-tier protection model for inbound memory budgets:

1. **Per-frame enforcement** — frames that would cause total buffered bytes to
Expand Down Expand Up @@ -1074,7 +1087,8 @@ Handlers can opt into streaming request bodies using the `StreamingBody`
extractor or by accepting a `RequestBodyStream` directly. The framework creates
a bounded channel and forwards body chunks as they arrive; back-pressure
propagates automatically when the handler consumes slower than the network
delivers.
delivers. Routing and correlation metadata stay available immediately through
`RequestParts`, so handlers can begin work before the full body is assembled.

```rust,no_run
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -1131,8 +1145,10 @@ async fn with_extractor(parts: RequestParts, body: StreamingBody) {

Back-pressure is enforced via bounded channels: when the internal buffer fills,
the framework pauses reading from the socket until the handler drains pending
chunks. This prevents memory exhaustion under slow consumer conditions. The
`body_channel` helper creates channels with configurable capacity:
chunks. This prevents memory exhaustion under slow consumer conditions. When
the request arrived through `MessageAssembler`, the same per-connection memory
budgets continue to govern partial buffered state before chunks reach the
handler. The `body_channel` helper creates channels with configurable capacity:

```rust
use wireframe::request::body_channel;
Expand Down
5 changes: 5 additions & 0 deletions src/app/builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ where
}

/// Configure per-connection memory budgets for inbound buffering.
///
/// These budgets apply across the shared inbound assembly pipeline,
/// including protocol-level message assembly and streaming request
/// hand-off state. Wireframe uses them for per-frame rejection,
/// soft-pressure read pacing, and hard-cap connection aborts.
#[must_use]
pub fn memory_budgets(mut self, budgets: MemoryBudgets) -> Self {
self.memory_budgets = Some(budgets);
Expand Down
7 changes: 5 additions & 2 deletions src/app/builder/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ where

/// Install a [`MessageAssembler`] implementation.
///
/// The assembler parses protocol-specific frame headers to support
/// multi-frame request assembly once the inbound pipeline integrates it.
/// The assembler parses protocol-specific frame headers for multi-frame
/// request assembly on the inbound pipeline. Wireframe applies the hook
/// after any transport fragmentation reassembly and before handler
/// dispatch, producing either buffered requests or streaming request
/// bodies.
///
/// # Examples
///
Expand Down
24 changes: 24 additions & 0 deletions tests/features/streaming_request.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
@streaming_request
Feature: Streaming request body consumption
Streaming request bodies let handlers consume inbound payloads
incrementally while preserving back-pressure and error propagation.

Scenario: StreamingBody exposes an AsyncRead adapter
Given a streaming request body channel with capacity 4
When body chunks "hello " and "world" are sent
And the streaming body is read through the AsyncRead adapter
Then the collected body is "hello world"

Scenario: Bounded request body channels apply back-pressure
Given a streaming request body channel with capacity 1
When one body chunk "first" is buffered without draining the stream
And another body chunk "second" is sent with a 50 millisecond timeout
Then the send is blocked by back-pressure

Scenario: Request body stream errors reach consumers
Given a streaming request body channel with capacity 4
When body chunk "ok" is sent
And a request body error of kind "invalid data" is sent
And the request body stream is drained directly
Then one chunk is received before the error
And the last stream error kind is "invalid data"
1 change: 1 addition & 0 deletions tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ pub mod request_parts;
pub mod serializer_boundaries;
pub mod slow_io_backpressure;
pub mod stream_end;
pub mod streaming_request;
pub mod test_observability;
pub mod unified_codec;
Loading
Loading