Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/generic-message-fragmentation-and-re-assembly-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,8 @@ This feature is designed as a foundational layer that other features build upon.
payloads into capped fragments, and return typed collections that are simple
to wrap in protocol-specific envelopes or feed into behavioural tests before
the full `FragmentAdapter` lands.
- Added a transport-agnostic `Reassembler` that mirrors the outbound helper on
the inbound path. It buffers fragments per `MessageId` via `FragmentSeries`,
drops the partial buffer when ordering breaks, enforces a configurable
`max_message_size`, and exposes caller-driven timeout purging. This prevents
abandoned assemblies from exhausting memory.
6 changes: 3 additions & 3 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ logic.
- [x] Implement a `Fragmenter` to split a large `Message` into multiple
`Frame`s, each with a `Fragment` header.

- [ ] Implement a `Reassembler` on the receiving end to collect fragments and
- [x] Implement a `Reassembler` on the receiving end to collect fragments and
reconstruct the original `Message`.

- [ ] Manage a reassembly buffer with timeouts to prevent resource
- [x] Manage a reassembly buffer with timeouts to prevent resource
exhaustion from incomplete messages.

- [ ] **Integration with Core Library:**
Expand All @@ -276,7 +276,7 @@ logic.

- [ ] **Testing:**

- [ ] Create unit tests for the `Fragmenter` and `Reassembler`.
- [x] Create unit tests for the `Fragmenter` and `Reassembler`.

- [ ] Develop integration tests sending and receiving large messages that
require fragmentation.
Expand Down
28 changes: 28 additions & 0 deletions docs/users-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,34 @@ for fragment in batch.fragments() {
}
```

A companion `Reassembler` mirrors the helper on the inbound path. It buffers
fragments per `MessageId`, rejects out-of-order fragments, and enforces a
maximum assembled size while exposing `purge_expired` to clear stale partial
messages after a configurable timeout. When the final fragment arrives, it
returns a `ReassembledMessage` that can be decoded into the original type.

```rust
use std::{num::NonZeroUsize, time::Duration};
use wireframe::fragment::{
FragmentHeader,
FragmentIndex,
MessageId,
Reassembler,
};

let mut reassembler =
Reassembler::new(NonZeroUsize::new(512).expect("non-zero capacity"), Duration::from_secs(30));

let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::zero(), true);
let complete = reassembler
.push(header, [0_u8; 12])
.expect("fragments accepted")
.expect("single fragment completes the message");

// Decode when ready:
// let message: MyType = complete.decode().expect("decode");
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.

## Working with requests and middleware

Every inbound frame becomes a `ServiceRequest`. Middleware can inspect or
Expand Down
20 changes: 20 additions & 0 deletions src/fragment/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//! specific protocols while still surfacing precise diagnostics for
//! behavioural tests.

use std::num::NonZeroUsize;

use bincode::error::EncodeError;
use thiserror::Error;

Expand Down Expand Up @@ -51,3 +53,21 @@ pub enum FragmentationError {
#[error("fragment index overflow after {last}")]
IndexOverflow { last: FragmentIndex },
}

/// Errors produced while re-assembling inbound fragments.
#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)]
pub enum ReassemblyError {
/// The fragment broke ordering or message tracking guarantees.
#[error("fragment rejected during reassembly: {0}")]
Fragment(#[from] FragmentError),
/// The combined fragment payloads would exceed the configured cap.
#[error("message {message_id} exceeds reassembly cap: {attempted} bytes > {limit} byte limit")]
MessageTooLarge {
/// Identifier for the logical message being assembled.
message_id: MessageId,
/// Total size that triggered the guard.
attempted: usize,
/// Configured reassembly cap.
limit: NonZeroUsize,
},
}
4 changes: 3 additions & 1 deletion src/fragment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ pub mod fragmenter;
pub mod header;
pub mod id;
pub mod index;
pub mod reassembler;
pub mod series;

pub use error::{FragmentError, FragmentStatus, FragmentationError};
pub use error::{FragmentError, FragmentStatus, FragmentationError, ReassemblyError};
pub use fragmenter::{FragmentBatch, FragmentFrame, Fragmenter};
pub use header::FragmentHeader;
pub use id::MessageId;
pub use index::FragmentIndex;
pub use reassembler::{ReassembledMessage, Reassembler};
pub use series::FragmentSeries;

#[cfg(test)]
Expand Down
265 changes: 265 additions & 0 deletions src/fragment/reassembler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
//! Inbound helper that stitches fragments back into complete messages.
//!
//! [`Reassembler`] mirrors the outbound [`Fragmenter`](crate::fragment::Fragmenter) by
//! collecting fragment payloads keyed by [`MessageId`](crate::fragment::MessageId).
//! It enforces ordering via [`FragmentSeries`](crate::fragment::FragmentSeries), guards
//! against unbounded allocation with a configurable cap, and purges stale partial
//! assemblies after a fixed timeout. The helper is transport-agnostic so codecs and
//! behavioural tests can reuse it without depending on socket types.

use std::{
collections::{
HashMap,
hash_map::{Entry, OccupiedEntry},
},
num::NonZeroUsize,
time::{Duration, Instant},
};

use bincode::error::DecodeError;

use super::{FragmentHeader, FragmentSeries, FragmentStatus, MessageId, ReassemblyError};
use crate::message::Message;

#[derive(Debug)]
struct PartialMessage {
series: FragmentSeries,
buffer: Vec<u8>,
started_at: Instant,
}

impl PartialMessage {
fn new(series: FragmentSeries, payload: &[u8], started_at: Instant) -> Self {
Self {
series,
buffer: payload.to_vec(),
started_at,
}
}

fn push(&mut self, payload: &[u8]) { self.buffer.extend_from_slice(payload); }

fn len(&self) -> usize { self.buffer.len() }

fn started_at(&self) -> Instant { self.started_at }

fn into_buffer(self) -> Vec<u8> { self.buffer }
}

/// Container for a fully re-assembled message payload.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ReassembledMessage {
message_id: MessageId,
payload: Vec<u8>,
}

impl ReassembledMessage {
/// Construct a new [`ReassembledMessage`].
#[must_use]
pub fn new(message_id: MessageId, payload: Vec<u8>) -> Self {
Self {
message_id,
payload,
}
}

/// Identifier shared by the fragments that formed this message.
#[must_use]
pub const fn message_id(&self) -> MessageId { self.message_id }

/// Borrow the re-assembled payload.
#[must_use]
pub fn payload(&self) -> &[u8] { self.payload.as_slice() }

/// Consume the message, returning the owned payload bytes.
#[must_use]
pub fn into_payload(self) -> Vec<u8> { self.payload }

/// Decode the payload into a strongly typed message.
///
/// # Errors
///
/// Returns any [`DecodeError`] raised while deserialising the payload.
pub fn decode<M: Message>(&self) -> Result<M, DecodeError> {
let (message, _) = M::from_bytes(self.payload())?;
Ok(message)
}
}

/// Stateful fragment re-assembler with timeout-based eviction.
#[derive(Debug)]
pub struct Reassembler {
max_message_size: NonZeroUsize,
timeout: Duration,
buffers: HashMap<MessageId, PartialMessage>,
}

impl Reassembler {
/// Create a new re-assembler that enforces a maximum reconstructed payload size.
#[must_use]
pub fn new(max_message_size: NonZeroUsize, timeout: Duration) -> Self {
Self {
max_message_size,
timeout,
buffers: HashMap::new(),
}
}

/// Process a fragment using the current time.
///
/// Returns `Ok(Some(_))` when the fragment completes the message, `Ok(None)`
/// while more fragments are required, or an error if ordering or size
/// invariants are violated.
///
/// # Errors
///
/// Returns [`ReassemblyError`] when a fragment arrives out of order or would
/// push the reconstructed payload beyond the configured limit.
pub fn push(
&mut self,
header: FragmentHeader,
payload: impl AsRef<[u8]>,
) -> Result<Option<ReassembledMessage>, ReassemblyError> {
self.push_at(header, payload, Instant::now())
}

/// Process a fragment using an explicit clock reading.
///
/// Accepting an explicit `now` simplifies deterministic testing and allows
/// callers to co-ordinate eviction sweeps with their own timers.
///
/// # Errors
///
/// Returns [`ReassemblyError`] when the fragment violates ordering or
/// exceeds the configured reassembly cap.
pub fn push_at(
&mut self,
header: FragmentHeader,
payload: impl AsRef<[u8]>,
now: Instant,
) -> Result<Option<ReassembledMessage>, ReassemblyError> {
self.purge_expired_at(now);

let payload = payload.as_ref();

match self.buffers.entry(header.message_id()) {
Entry::Occupied(mut occupied) => {
let status = occupied
.get_mut()
.series
.accept(header)
.map_err(ReassemblyError::from);

match status {
Ok(FragmentStatus::Incomplete) => Self::append_and_maybe_complete(
self.max_message_size,
occupied,
header.message_id(),
payload,
false,
),
Ok(FragmentStatus::Complete) => Self::append_and_maybe_complete(
self.max_message_size,
occupied,
header.message_id(),
payload,
true,
),
Err(err) => {
occupied.remove();
Err(err)
}
}
}
Entry::Vacant(vacant) => {
let mut series = FragmentSeries::new(header.message_id());
let status = series.accept(header).map_err(ReassemblyError::from)?;
let attempted = payload.len();
Self::assert_within_limit(self.max_message_size, header.message_id(), attempted)?;

match status {
FragmentStatus::Incomplete => {
vacant.insert(PartialMessage::new(series, payload, now));
Ok(None)
}
FragmentStatus::Complete => Ok(Some(ReassembledMessage::new(
header.message_id(),
payload.to_vec(),
))),
}
}
}
}

/// Remove any partial messages that exceeded the configured timeout.
///
/// Returns the identifiers of messages that were evicted.
pub fn purge_expired(&mut self) -> Vec<MessageId> { self.purge_expired_at(Instant::now()) }

/// Remove any partial messages that exceeded the configured timeout using
/// an explicit clock reading.
///
/// Returns the identifiers of messages that were evicted.
pub fn purge_expired_at(&mut self, now: Instant) -> Vec<MessageId> {
let mut evicted = Vec::new();
let timeout = self.timeout;

self.buffers.retain(|message_id, partial| {
let expired = now.saturating_duration_since(partial.started_at()) >= timeout;
if expired {
evicted.push(*message_id);
}
!expired
});

evicted
}

/// Number of partial messages currently buffered.
#[must_use]
pub fn buffered_len(&self) -> usize { self.buffers.len() }

fn assert_within_limit(
limit: NonZeroUsize,
message_id: MessageId,
attempted: usize,
) -> Result<(), ReassemblyError> {
if attempted > limit.get() {
return Err(ReassemblyError::MessageTooLarge {
message_id,
attempted,
limit,
});
}
Ok(())
}

fn append_and_maybe_complete(
limit: NonZeroUsize,
mut occupied: OccupiedEntry<'_, MessageId, PartialMessage>,
message_id: MessageId,
payload: &[u8],
completes: bool,
) -> Result<Option<ReassembledMessage>, ReassemblyError> {
let Some(attempted) = occupied.get().len().checked_add(payload.len()) else {
occupied.remove();
return Err(ReassemblyError::MessageTooLarge {
message_id,
attempted: usize::MAX,
limit,
});
};
if let Err(err) = Self::assert_within_limit(limit, message_id, attempted) {
occupied.remove();
return Err(err);
}

occupied.get_mut().push(payload);
if completes {
let buffer = occupied.remove().into_buffer();
Ok(Some(ReassembledMessage::new(message_id, buffer)))
} else {
Ok(None)
}
}
}
Loading
Loading