Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,15 @@ features of the 1.0 release.
the socket. The `PushHandle` and the application code that uses it remain
completely unaware of fragmentation.

- **Unified Codec Pipeline:** On the server side, the `FramePipeline`
(`src/app/codec_driver.rs`) applies outbound fragmentation and metrics to
every `Envelope` before it reaches the wire. Handler responses pass through
the pipeline before serialization and codec wrapping, ensuring the same
fragmentation logic applies to both handler responses and push traffic.
Protocol hooks are applied at the connection-actor level and are currently
deferred for the app-router path pending resolution of the `F::Frame` vs
`Envelope` type constraint.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
```rust,ignore
// Codec stack with explicit frame-size limits and fragmentation.
use wireframe::app::WireframeApp;
Expand Down
731 changes: 731 additions & 0 deletions docs/execplans/9-3-1-fragment-adapter-trait.md

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions docs/execplans/vocabulary-normalization.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ architectural model match.
- [ ] Inventory API symbols and docs text against glossary.
- [ ] Propose targeted rename set and update map.
- [ ] Apply code and doc renames.
- [ ] Create/update developers guide conceptual model section.
- [ ] Create/update developers' guide conceptual model section.
- [ ] Update migration guide and run quality gates.

## Surprises & Discoveries

- Observation: `docs/developers-guide.md` is currently absent.
Evidence: repository file inventory under `docs/`. Impact: this plan must
include creating the developers guide.
include creating the developers' guide.

## Decision Log

Expand Down Expand Up @@ -122,7 +122,7 @@ Stage C proposes targeted renames. Limit changes to inconsistent or ambiguous
symbols; avoid blanket renaming where semantics are already clear.

Stage D applies updates with synchronized docs. Ensure code, user guide, and
new developers guide describe the same conceptual model.
new developers' guide describe the same conceptual model.

Stage E updates migration guidance and validates compile/lint/test/doc gates.

Expand All @@ -149,7 +149,7 @@ Run all commands from repository root (`/home/user/project`).

Expected success indicators:

- Conceptual glossary is explicit in users and developers guides.
- Conceptual glossary is explicit in users' and developers' guides.
- User-visible symbol names align to layer definitions.
- Migration guide captures all renamed public items.

Expand Down
9 changes: 9 additions & 0 deletions docs/multi-packet-and-streaming-responses-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ not hang.
Outbound order is serializer → fragmentation → codec framing; inbound order
is codec decode → fragment reassembly → deserialization.

- **Unified Codec Pipeline (server path):** On the server side, handler
responses — including `Response::Stream` and `Response::MultiPacket` — pass
through the `FramePipeline` (`src/app/codec_driver.rs`) before reaching the
wire. The pipeline applies fragmentation and outbound metrics uniformly to
every `Envelope`, regardless of response variant. This ensures that large
streaming frames are fragmented by the same path that fragments single-frame
handler responses. Protocol hook integration for the server path is planned
for a follow-up stage.

- **Streaming Request Bodies:** [ADR 0002][adr-0002] introduces first-class
streaming request bodies as the inbound counterpart to streaming responses.
Handlers MAY receive `RequestParts` plus `RequestBodyStream` rather than a
Expand Down
20 changes: 11 additions & 9 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,18 @@ integration boundaries.

### 9.3. Unified codec handling

- [ ] 9.3.1. Unify codec handling between the app router and the `Connection`
- [x] 9.3.1. Unify codec handling between the app router and the `Connection`
actor.[^outbound-design]
- [ ] Route app-level request and response handling through the actor codec
path so protocol hooks apply consistently.
- [ ] Remove duplicate codec construction in `src/app/connection.rs` once
the actor path owns framing.
- [ ] Add integration tests covering streaming responses and push traffic
through the unified path.
- [ ] Add back-pressure tests for `Response::Stream` routed through the
codec layer.[^streaming-design]
- [x] Route app-level request and response handling through the
`FramePipeline` so fragmentation and metrics apply consistently.
- [x] Remove duplicate codec construction in `src/app/connection.rs`; the
`FramePipeline` owns outbound fragmentation.
- [x] Add integration tests covering the unified pipeline (round-trip,
fragmentation, sequential requests, disabled fragmentation).
- [x] Add BDD behavioural tests exercising the unified codec path.
- [x] Note: protocol hooks (`before_send`) are deferred to a follow-up
stage because `F::Frame` and `Envelope` types may
differ.[^streaming-design]

### 9.4. Property-based codec tests

Expand Down
8 changes: 7 additions & 1 deletion docs/users-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn build_app() -> wireframe::Result<WireframeApp> {
```

The snippet below wires the builder into a Tokio runtime, decodes inbound
payloads, and emits a serialised response. It showcases the typical `main`
payloads, and emits a serialized response. It showcases the typical `main`
function for a microservice that listens on localhost and responds to a `Ping`
message with a `Pong` payload.[^2][^10][^15]

Expand Down Expand Up @@ -879,6 +879,12 @@ fragmentation is delegated to an upstream gateway). The `ConnectionActor`
mirrors the same behaviour for push traffic and streaming responses through
`enable_fragmentation`, ensuring client-visible frames follow the same format.

On the server side, a unified `FramePipeline` applies the same fragmentation
logic to all outbound `Envelope` values — handler responses, streaming frames,
and multi-packet channels — before serialization and codec wrapping. This
guarantees that a single connection-scoped `FragmentationState` manages both
outbound fragmentation and inbound reassembly.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

## Protocol hooks

Install a custom protocol with `with_protocol`. `protocol_hooks()` converts the
Expand Down
224 changes: 224 additions & 0 deletions src/app/codec_driver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
//! Codec-aware connection driver that bridges the connection actor's frame
//! processing pipeline to a framed transport stream.
//!
//! The [`FramePipeline`] applies optional fragmentation and outbound metrics
//! to every [`Envelope`] before it reaches the wire. The [`send_envelope`]
//! and [`flush_pipeline_output`] helpers then serialize, wrap via
//! [`FrameCodec::wrap_payload`], and write the resulting frame to the
//! underlying [`Framed`] stream.
//!
//! This module ensures all outbound frames — handler responses, push
//! messages, streaming responses, and multi-packet channels — pass through
//! the same fragmentation and metrics pipeline before reaching the wire.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

use bytes::Bytes;
use futures::SinkExt;
use log::warn;
use tokio::io::{self, AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

use super::{
combined_codec::ConnectionCodec,
envelope::Envelope,
fragmentation_state::FragmentationState,
};
use crate::{
codec::FrameCodec,
fragment::{FragmentationConfig, FragmentationError},
serializer::Serializer,
};

/// Outbound frame processing pipeline mirroring the connection actor's
/// `process_frame_with_hooks_and_metrics` logic.
///
/// Applies optional fragmentation and outbound metrics to each envelope.
/// Produces a buffer of processed envelopes ready for serialization and
/// transmission.
pub(crate) struct FramePipeline {
fragmentation: Option<FragmentationState>,
out: Vec<Envelope>,
}

impl FramePipeline {
/// Create a pipeline with the given optional fragmentation config.
pub(crate) fn new(fragmentation: Option<FragmentationConfig>) -> Self {
Self {
fragmentation: fragmentation.map(FragmentationState::new),
out: Vec::new(),
}
}

/// Process an envelope through the pipeline: fragment → metrics.
///
/// Processed envelopes are buffered internally. Call
/// [`drain_output`](Self::drain_output) to retrieve them.
pub(crate) fn process(&mut self, envelope: Envelope) -> io::Result<()> {
let id = envelope.id;
let correlation_id = envelope.correlation_id;
let frames = self.fragment_envelope(envelope).map_err(|err| {
warn!(
"failed to fragment outbound envelope: id={id}, \
correlation_id={correlation_id:?}, error={err:?}"
);
crate::metrics::inc_handler_errors();
io::Error::other(err)
})?;
for frame in frames {
self.push_frame(frame);
}
Ok(())
}

/// Fragment an envelope if fragmentation is enabled, otherwise return it
/// as a single-element vector.
fn fragment_envelope(
&mut self,
envelope: Envelope,
) -> Result<Vec<Envelope>, FragmentationError> {
match self.fragmentation.as_mut() {
Some(state) => state.fragment(envelope),
None => Ok(vec![envelope]),
}
}

/// Purge expired fragment reassembly state, if fragmentation is enabled.
pub(crate) fn purge_expired(&mut self) {
if let Some(state) = self.fragmentation.as_mut() {
state.purge_expired();
}
}

/// Drain all buffered output envelopes, returning them for transmission.
pub(crate) fn drain_output(&mut self) -> Vec<Envelope> { std::mem::take(&mut self.out) }

/// Returns a mutable reference to the inner fragmentation state, if
/// fragmentation is enabled.
///
/// Used by the inbound reassembly path which needs direct access to
/// [`FragmentationState::reassemble`].
pub(crate) fn fragmentation_mut(&mut self) -> Option<&mut FragmentationState> {
self.fragmentation.as_mut()
}

/// Returns `true` when fragmentation is enabled.
#[cfg(test)]
pub(crate) fn has_fragmentation(&self) -> bool { self.fragmentation.is_some() }

fn push_frame(&mut self, envelope: Envelope) {
self.out.push(envelope);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}
}

/// Serialize an [`Envelope`] and write it through the codec to the framed
/// stream.
///
/// # Errors
///
/// Returns an [`io::Error`] if serialization or sending fails.
pub(super) async fn send_envelope<S, W, F>(
serializer: &S,
codec: &F,
framed: &mut Framed<W, ConnectionCodec<F>>,
envelope: &Envelope,
) -> io::Result<()>
where
S: Serializer + Send + Sync,
W: AsyncRead + AsyncWrite + Unpin,
F: FrameCodec,
{
let bytes = serializer.serialize(envelope).map_err(|e| {
let id = envelope.id;
let correlation_id = envelope.correlation_id;
warn!(
"failed to serialize outbound envelope: id={id}, correlation_id={correlation_id:?}, \
error={e:?}"
);
crate::metrics::inc_handler_errors();
io::Error::other(e)
})?;
let frame = codec.wrap_payload(Bytes::from(bytes));
framed.send(frame).await.map_err(|e| {
let id = envelope.id;
let correlation_id = envelope.correlation_id;
warn!(
"failed to send outbound frame: id={id}, correlation_id={correlation_id:?}, \
error={e:?}"
);
crate::metrics::inc_handler_errors();
io::Error::other(e)
})
}

/// Flush a batch of pipeline-produced [`Envelope`] values through the codec
/// to the framed stream.
///
/// Each envelope is serialized, wrapped, and written individually. On the
/// first I/O failure the remaining envelopes are discarded and the error is
/// returned.
///
/// # Errors
///
/// Returns an [`io::Error`] if any envelope fails to serialize or send.
pub(super) async fn flush_pipeline_output<S, W, F>(
serializer: &S,
codec: &F,
framed: &mut Framed<W, ConnectionCodec<F>>,
envelopes: &mut Vec<Envelope>,
) -> io::Result<()>
where
S: Serializer + Send + Sync,
W: AsyncRead + AsyncWrite + Unpin,
F: FrameCodec,
{
for envelope in envelopes.drain(..) {
send_envelope(serializer, codec, framed, &envelope).await?;
}
Ok(())
}

#[cfg(test)]
mod tests {
use rstest::{fixture, rstest};

use super::*;

#[fixture]
fn pipeline() -> FramePipeline {
let config = None;
FramePipeline::new(config)
}

#[rstest]
fn process_single_envelope_emits_one_frame(mut pipeline: FramePipeline) {
let env = Envelope::new(1, Some(42), vec![1, 2, 3]);
pipeline
.process(env)
.expect("processing should succeed without fragmentation");
let mut output = pipeline.drain_output();
assert_eq!(output.len(), 1);
let first = output
.pop()
.expect("pipeline should emit exactly one envelope");
assert_eq!(first.id, 1);
assert_eq!(first.correlation_id, Some(42));
assert_eq!(first.payload, vec![1, 2, 3]);
}

#[rstest]
fn drain_clears_buffer(mut pipeline: FramePipeline) {
pipeline
.process(Envelope::new(1, None, vec![]))
.expect("processing should succeed without fragmentation");
let first = pipeline.drain_output();
assert_eq!(first.len(), 1);

let second = pipeline.drain_output();
assert!(second.is_empty());
}

#[rstest]
fn pipeline_without_fragmentation(pipeline: FramePipeline) {
assert!(!pipeline.has_fragmentation());
}
}
Loading
Loading