From 166d25c092009529f1a2dd444f66fb81546d62df Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 24 Nov 2025 12:44:59 +0000 Subject: [PATCH 1/5] feat(fragment): add transport-agnostic Reassembler for inbound message fragments Introduces a new Reassembler that mirrors the Fragmenter by collecting and reassembling inbound message fragments keyed by MessageId. It enforces fragment ordering, limits maximum assembled message size, and supports caller-driven timeout-based purging of stale partial messages to prevent resource exhaustion. Enhancements include comprehensive unit and integration tests, updates to documentation and usage guides, and behavioural test steps verifying reassembly correctness, capacity enforcement, and eviction of expired buffers. Co-authored-by: terragon-labs[bot] --- ...ge-fragmentation-and-re-assembly-design.md | 5 + docs/roadmap.md | 6 +- docs/users-guide.md | 28 ++ src/fragment/error.rs | 20 ++ src/fragment/mod.rs | 4 +- src/fragment/reassembler.rs | 248 ++++++++++++++++++ src/fragment/tests.rs | 154 ++++++++++- src/lib.rs | 3 + tests/features/fragment.feature | 24 ++ tests/steps/fragment_steps.rs | 56 ++++ tests/worlds/fragment.rs | 168 +++++++++++- 11 files changed, 709 insertions(+), 7 deletions(-) create mode 100644 src/fragment/reassembler.rs diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 706dd95e..7485267e 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -368,3 +368,8 @@ This feature is designed as a foundational layer that other features build upon. payloads into capped fragments, and return typed collections that are simple to wrap in protocol-specific envelopes or feed into behavioural tests before the full `FragmentAdapter` lands. +- Added a transport-agnostic `Reassembler` that mirrors the outbound helper on + the inbound path. It buffers fragments per `MessageId` via `FragmentSeries`, + drops the partial buffer when ordering breaks, enforces a configurable + `max_message_size`, and exposes caller-driven timeout purging. This prevents + abandoned assemblies from exhausting memory. diff --git a/docs/roadmap.md b/docs/roadmap.md index 26c64a99..2a1ec360 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -261,10 +261,10 @@ logic. - [x] Implement a `Fragmenter` to split a large `Message` into multiple `Frame`s, each with a `Fragment` header. - - [ ] Implement a `Reassembler` on the receiving end to collect fragments and + - [x] Implement a `Reassembler` on the receiving end to collect fragments and reconstruct the original `Message`. - - [ ] Manage a reassembly buffer with timeouts to prevent resource + - [x] Manage a reassembly buffer with timeouts to prevent resource exhaustion from incomplete messages. - [ ] **Integration with Core Library:** @@ -276,7 +276,7 @@ logic. - [ ] **Testing:** - - [ ] Create unit tests for the `Fragmenter` and `Reassembler`. + - [x] Create unit tests for the `Fragmenter` and `Reassembler`. - [ ] Develop integration tests sending and receiving large messages that require fragmentation. diff --git a/docs/users-guide.md b/docs/users-guide.md index 15bd7175..eca43aea 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -176,6 +176,34 @@ for fragment in batch.fragments() { } ``` +A companion `Reassembler` mirrors the helper on the inbound path. It buffers +fragments per `MessageId`, rejects out-of-order fragments, and enforces a +maximum assembled size while exposing `purge_expired` to clear stale partial +messages after a configurable timeout. When the final fragment arrives it +returns a `ReassembledMessage` that can be decoded into the original type. + +```rust +use std::{num::NonZeroUsize, time::Duration}; +use wireframe::fragment::{ + FragmentHeader, + FragmentIndex, + MessageId, + Reassembler, +}; + +let mut reassembler = + Reassembler::new(NonZeroUsize::new(512).unwrap(), Duration::from_secs(30)); + +let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::zero(), true); +let complete = reassembler + .push(header, [0_u8; 12]) + .expect("fragments accepted") + .expect("single fragment completes the message"); + +// Decode when ready: +// let message: MyType = complete.decode().expect("decode"); +``` + ## Working with requests and middleware Every inbound frame becomes a `ServiceRequest`. Middleware can inspect or diff --git a/src/fragment/error.rs b/src/fragment/error.rs index 9970b0cc..78c4d855 100644 --- a/src/fragment/error.rs +++ b/src/fragment/error.rs @@ -4,6 +4,8 @@ //! specific protocols while still surfacing precise diagnostics for //! behavioural tests. +use std::num::NonZeroUsize; + use bincode::error::EncodeError; use thiserror::Error; @@ -51,3 +53,21 @@ pub enum FragmentationError { #[error("fragment index overflow after {last}")] IndexOverflow { last: FragmentIndex }, } + +/// Errors produced while re-assembling inbound fragments. +#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)] +pub enum ReassemblyError { + /// The fragment broke ordering or message tracking guarantees. + #[error("fragment rejected during reassembly: {0}")] + Fragment(#[from] FragmentError), + /// The combined fragment payloads would exceed the configured cap. + #[error("message {message_id} exceeds reassembly cap: {attempted} bytes > {limit} byte limit")] + MessageTooLarge { + /// Identifier for the logical message being assembled. + message_id: MessageId, + /// Total size that triggered the guard. + attempted: usize, + /// Configured reassembly cap. + limit: NonZeroUsize, + }, +} diff --git a/src/fragment/mod.rs b/src/fragment/mod.rs index c47c2af2..a87ed54d 100644 --- a/src/fragment/mod.rs +++ b/src/fragment/mod.rs @@ -10,13 +10,15 @@ pub mod fragmenter; pub mod header; pub mod id; pub mod index; +pub mod reassembler; pub mod series; -pub use error::{FragmentError, FragmentStatus, FragmentationError}; +pub use error::{FragmentError, FragmentStatus, FragmentationError, ReassemblyError}; pub use fragmenter::{FragmentBatch, FragmentFrame, Fragmenter}; pub use header::FragmentHeader; pub use id::MessageId; pub use index::FragmentIndex; +pub use reassembler::{ReassembledMessage, Reassembler}; pub use series::FragmentSeries; #[cfg(test)] diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs new file mode 100644 index 00000000..2ef16d91 --- /dev/null +++ b/src/fragment/reassembler.rs @@ -0,0 +1,248 @@ +//! Inbound helper that stitches fragments back into complete messages. +//! +//! [`Reassembler`] mirrors the outbound [`Fragmenter`](crate::fragment::Fragmenter) by +//! collecting fragment payloads keyed by [`MessageId`](crate::fragment::MessageId). +//! It enforces ordering via [`FragmentSeries`](crate::fragment::FragmentSeries), guards +//! against unbounded allocation with a configurable cap, and purges stale partial +//! assemblies after a fixed timeout. The helper is transport-agnostic so codecs and +//! behavioural tests can reuse it without depending on socket types. + +use std::{ + collections::{HashMap, hash_map::Entry}, + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use bincode::error::DecodeError; + +use super::{FragmentHeader, FragmentSeries, FragmentStatus, MessageId, ReassemblyError}; +use crate::message::Message; + +#[derive(Debug)] +struct PartialMessage { + series: FragmentSeries, + buffer: Vec, + started_at: Instant, +} + +impl PartialMessage { + fn new(series: FragmentSeries, payload: &[u8], started_at: Instant) -> Self { + let mut buffer = Vec::with_capacity(payload.len()); + buffer.extend_from_slice(payload); + Self { + series, + buffer, + started_at, + } + } + + fn push(&mut self, payload: &[u8]) { self.buffer.extend_from_slice(payload); } + + fn len(&self) -> usize { self.buffer.len() } + + fn started_at(&self) -> Instant { self.started_at } + + fn into_buffer(self) -> Vec { self.buffer } +} + +/// Container for a fully re-assembled message payload. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ReassembledMessage { + message_id: MessageId, + payload: Vec, +} + +impl ReassembledMessage { + /// Construct a new [`ReassembledMessage`]. + #[must_use] + pub fn new(message_id: MessageId, payload: Vec) -> Self { + Self { + message_id, + payload, + } + } + + /// Identifier shared by the fragments that formed this message. + #[must_use] + pub const fn message_id(&self) -> MessageId { self.message_id } + + /// Borrow the re-assembled payload. + #[must_use] + pub fn payload(&self) -> &[u8] { self.payload.as_slice() } + + /// Consume the message, returning the owned payload bytes. + #[must_use] + pub fn into_payload(self) -> Vec { self.payload } + + /// Decode the payload into a strongly typed message. + /// + /// # Errors + /// + /// Returns any [`DecodeError`] raised while deserialising the payload. + pub fn decode(&self) -> Result { + let (message, _) = M::from_bytes(self.payload())?; + Ok(message) + } +} + +/// Stateful fragment re-assembler with timeout-based eviction. +#[derive(Debug)] +pub struct Reassembler { + max_message_size: NonZeroUsize, + timeout: Duration, + buffers: HashMap, +} + +impl Reassembler { + /// Create a new re-assembler that enforces a maximum reconstructed payload size. + #[must_use] + pub fn new(max_message_size: NonZeroUsize, timeout: Duration) -> Self { + Self { + max_message_size, + timeout, + buffers: HashMap::new(), + } + } + + /// Process a fragment using the current time. + /// + /// Returns `Ok(Some(_))` when the fragment completes the message, `Ok(None)` + /// while more fragments are required, or an error if ordering or size + /// invariants are violated. + /// + /// # Errors + /// + /// Returns [`ReassemblyError`] when a fragment arrives out of order or would + /// push the reconstructed payload beyond the configured limit. + pub fn push( + &mut self, + header: FragmentHeader, + payload: impl AsRef<[u8]>, + ) -> Result, ReassemblyError> { + self.push_at(header, payload, Instant::now()) + } + + /// Process a fragment using an explicit clock reading. + /// + /// Accepting an explicit `now` simplifies deterministic testing and allows + /// callers to co-ordinate eviction sweeps with their own timers. + /// + /// # Errors + /// + /// Returns [`ReassemblyError`] when the fragment violates ordering or + /// exceeds the configured reassembly cap. + pub fn push_at( + &mut self, + header: FragmentHeader, + payload: impl AsRef<[u8]>, + now: Instant, + ) -> Result, ReassemblyError> { + self.purge_expired_at(now); + + let payload = payload.as_ref(); + + match self.buffers.entry(header.message_id()) { + Entry::Occupied(mut occupied) => { + let status = occupied + .get_mut() + .series + .accept(header) + .map_err(ReassemblyError::from); + + match status { + Ok(FragmentStatus::Incomplete) => { + let attempted = occupied.get().len() + payload.len(); + if let Err(err) = Self::assert_within_limit( + self.max_message_size, + header.message_id(), + attempted, + ) { + occupied.remove(); + return Err(err); + } + occupied.get_mut().push(payload); + Ok(None) + } + Ok(FragmentStatus::Complete) => { + let attempted = occupied.get().len() + payload.len(); + if let Err(err) = Self::assert_within_limit( + self.max_message_size, + header.message_id(), + attempted, + ) { + occupied.remove(); + return Err(err); + } + occupied.get_mut().push(payload); + let buffer = occupied.remove().into_buffer(); + Ok(Some(ReassembledMessage::new(header.message_id(), buffer))) + } + Err(err) => { + occupied.remove(); + Err(err) + } + } + } + Entry::Vacant(vacant) => { + let mut series = FragmentSeries::new(header.message_id()); + let status = series.accept(header).map_err(ReassemblyError::from)?; + let attempted = payload.len(); + Self::assert_within_limit(self.max_message_size, header.message_id(), attempted)?; + + match status { + FragmentStatus::Incomplete => { + vacant.insert(PartialMessage::new(series, payload, now)); + Ok(None) + } + FragmentStatus::Complete => Ok(Some(ReassembledMessage::new( + header.message_id(), + payload.to_vec(), + ))), + } + } + } + } + + /// Remove any partial messages that exceeded the configured timeout. + /// + /// Returns the identifiers of messages that were evicted. + pub fn purge_expired(&mut self) -> Vec { self.purge_expired_at(Instant::now()) } + + /// Remove any partial messages that exceeded the configured timeout using + /// an explicit clock reading. + /// + /// Returns the identifiers of messages that were evicted. + pub fn purge_expired_at(&mut self, now: Instant) -> Vec { + let mut evicted = Vec::new(); + let timeout = self.timeout; + + self.buffers.retain(|message_id, partial| { + let expired = now.saturating_duration_since(partial.started_at()) >= timeout; + if expired { + evicted.push(*message_id); + } + !expired + }); + + evicted + } + + /// Number of partial messages currently buffered. + #[must_use] + pub fn buffered_len(&self) -> usize { self.buffers.len() } + + fn assert_within_limit( + limit: NonZeroUsize, + message_id: MessageId, + attempted: usize, + ) -> Result<(), ReassemblyError> { + if attempted > limit.get() { + return Err(ReassemblyError::MessageTooLarge { + message_id, + attempted, + limit, + }); + } + Ok(()) + } +} diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index d942a227..ac982f9f 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -1,4 +1,7 @@ -use std::num::NonZeroUsize; +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; use bincode::{BorrowDecode, Encode}; use rstest::rstest; @@ -168,3 +171,152 @@ fn fragmenter_respects_explicit_message_ids() { let next = fragmenter.fragment_bytes([1_u8]).expect("next fragment"); assert_eq!(next.message_id(), MessageId::new(10)); } + +#[test] +fn reassembler_returns_single_fragment_immediately() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(16).expect("non-zero"), + Duration::from_secs(5), + ); + let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); + let payload = vec![1_u8, 2, 3, 4]; + + let complete = reassembler + .push(header, payload.clone()) + .expect("reassembly must succeed") + .expect("single fragment should complete message"); + + assert_eq!(complete.message_id(), MessageId::new(1)); + assert_eq!(complete.payload(), payload.as_slice()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_accumulates_ordered_fragments() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(2), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, [5_u8, 6, 7]) + .expect("first fragment accepted") + .is_none() + ); + + let complete = reassembler + .push(final_fragment, [8_u8, 9]) + .expect("final fragment accepted") + .expect("message should complete"); + + assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_rejects_out_of_order_and_drops_partial() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(3), FragmentIndex::zero(), false); + let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); + + assert!( + reassembler + .push(first, [1_u8, 2]) + .expect("first fragment accepted") + .is_none() + ); + + let err = reassembler + .push(skipped, [3_u8]) + .expect_err("out-of-order fragment must be rejected"); + assert!(matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + )); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_enforces_maximum_payload_size() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, [1_u8, 2, 3]) + .expect("first fragment accepted") + .is_none() + ); + + let err = reassembler + .push(final_fragment, [4_u8, 5]) + .expect_err("payload growth beyond cap must be rejected"); + assert_eq!( + err, + ReassemblyError::MessageTooLarge { + message_id: MessageId::new(4), + attempted: 5, + limit: NonZeroUsize::new(4).expect("non-zero"), + } + ); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_purges_expired_messages() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(2), + ); + let now = Instant::now(); + let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false); + + assert!( + reassembler + .push_at(header, [0_u8, 1], now) + .expect("first fragment accepted") + .is_none() + ); + assert_eq!(reassembler.buffered_len(), 1); + + let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3)); + assert_eq!(evicted, vec![MessageId::new(5)]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)] +struct ExampleMessage(u8); + +#[test] +fn reassembler_decodes_reconstructed_message() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); + let batch = fragmenter + .fragment_message(&ExampleMessage(11)) + .expect("fragment message"); + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(10), + ); + + let mut output = None; + for fragment in batch { + let (header, payload) = fragment.into_parts(); + output = reassembler + .push(header, payload) + .expect("fragment accepted"); + } + + let assembled = output.expect("message should complete"); + let decoded: ExampleMessage = assembled.decode().expect("decode message"); + assert_eq!(decoded, ExampleMessage(11)); +} diff --git a/src/lib.rs b/src/lib.rs index 39c1c70a..cde3833f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,9 @@ pub use fragment::{ FragmentationError, Fragmenter, MessageId, + ReassembledMessage, + Reassembler, + ReassemblyError, }; pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}; pub use metrics::{CONNECTIONS_ACTIVE, Direction, ERRORS_TOTAL, FRAMES_PROCESSED}; diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index ef36d30d..006a3aeb 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -45,3 +45,27 @@ Feature: Fragment metadata enforcement And fragment 2 carries 2 bytes And fragment 2 is marked final And the fragments use message id 0 + + Scenario: Reassembler rebuilds sequential fragments + Given a reassembler allowing 10 bytes with a 30-second reassembly timeout + When fragment 0 for message 21 with 4 bytes arrives marked non-final + And fragment 1 for message 21 with 3 bytes arrives marked final + Then the reassembler outputs a payload of 7 bytes + And the reassembler is buffering 0 messages + + Scenario: Reassembler rejects messages that exceed the cap + Given a reassembler allowing 4 bytes with a 30-second reassembly timeout + When fragment 0 for message 22 with 3 bytes arrives marked non-final + And fragment 1 for message 22 with 2 bytes arrives marked final + Then the reassembler reports a message-too-large error + And no message has been reassembled yet + And the reassembler is buffering 0 messages + + Scenario: Reassembler evicts stale partial messages + Given a reassembler allowing 8 bytes with a 1-second reassembly timeout + When fragment 0 for message 23 with 5 bytes arrives marked non-final + And time advances by 2 seconds + And expired reassembly buffers are purged + Then the reassembler is buffering 0 messages + And message 23 is evicted + And no message has been reassembled yet diff --git a/tests/steps/fragment_steps.rs b/tests/steps/fragment_steps.rs index fbefa9e1..e22db512 100644 --- a/tests/steps/fragment_steps.rs +++ b/tests/steps/fragment_steps.rs @@ -1,4 +1,6 @@ //! Steps for fragment metadata behavioural tests. +use std::time::Duration; + use cucumber::{given, then, when}; use crate::world::FragmentWorld; @@ -73,3 +75,57 @@ fn then_fragment_non_final(world: &mut FragmentWorld, index: usize) { fn then_fragment_message_id(world: &mut FragmentWorld, message_id: u64) { world.assert_message_id(message_id); } + +#[given(expr = "a reassembler allowing {int} bytes with a {int}-second reassembly timeout")] +fn given_reassembler(world: &mut FragmentWorld, max_bytes: usize, timeout_secs: u64) { + world.configure_reassembler(max_bytes, timeout_secs); +} + +#[when(expr = "fragment {int} for message {int} with {int} bytes arrives marked non-final")] +fn when_reassembler_fragment_non_final( + world: &mut FragmentWorld, + index: u32, + message: u64, + len: usize, +) { + world.push_fragment(message, index, false, len); +} + +#[when(expr = "fragment {int} for message {int} with {int} bytes arrives marked final")] +fn when_reassembler_fragment_final( + world: &mut FragmentWorld, + index: u32, + message: u64, + len: usize, +) { + world.push_fragment(message, index, true, len); +} + +#[when(expr = "time advances by {int} seconds")] +fn when_time_advances(world: &mut FragmentWorld, seconds: u64) { + world.advance_time(Duration::from_secs(seconds)); +} + +#[when("expired reassembly buffers are purged")] +fn when_reassembly_purged(world: &mut FragmentWorld) { world.purge_reassembly(); } + +#[then(expr = "the reassembler outputs a payload of {int} bytes")] +fn then_reassembled_len(world: &mut FragmentWorld, expected: usize) { + world.assert_reassembled_len(expected); +} + +#[then("no message has been reassembled yet")] +fn then_no_reassembled_message(world: &mut FragmentWorld) { world.assert_no_reassembly(); } + +#[then("the reassembler reports a message-too-large error")] +fn then_reassembly_over_limit(world: &mut FragmentWorld) { world.assert_reassembly_over_limit(); } + +#[then(expr = "the reassembler is buffering {int} messages")] +fn then_buffered_messages(world: &mut FragmentWorld, expected: usize) { + world.assert_buffered_messages(expected); +} + +#[then(expr = "message {int} is evicted")] +fn then_message_evicted(world: &mut FragmentWorld, message: u64) { + world.assert_evicted_message(message); +} diff --git a/tests/worlds/fragment.rs b/tests/worlds/fragment.rs index 8de5c5cc..a251299c 100644 --- a/tests/worlds/fragment.rs +++ b/tests/worlds/fragment.rs @@ -6,7 +6,10 @@ //! `Fragmenter` and inspecting the resulting `FragmentBatch` state. #![cfg(not(loom))] -use std::num::NonZeroUsize; +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; use cucumber::World; use wireframe::fragment::{ @@ -19,14 +22,38 @@ use wireframe::fragment::{ FragmentStatus, Fragmenter, MessageId, + ReassembledMessage, + Reassembler, + ReassemblyError, }; -#[derive(Debug, Default, World)] +#[derive(Debug, World)] pub struct FragmentWorld { series: Option, last_result: Option>, fragmenter: Option, last_batch: Option, + reassembler: Option, + last_reassembled: Option, + last_reassembly_error: Option, + now: Instant, + last_evicted: Vec, +} + +impl Default for FragmentWorld { + fn default() -> Self { + Self { + series: None, + last_result: None, + fragmenter: None, + last_batch: None, + reassembler: None, + last_reassembled: None, + last_reassembly_error: None, + now: Instant::now(), + last_evicted: Vec::new(), + } + } } impl FragmentWorld { @@ -246,4 +273,141 @@ impl FragmentWorld { "unexpected message identifier", ); } + + /// Configure a reassembler with size and timeout guards. + /// + /// # Panics + /// Panics if `max_message_size` is zero. + pub fn configure_reassembler(&mut self, max_message_size: usize, timeout_secs: u64) { + let size = NonZeroUsize::new(max_message_size).expect("reassembly cap must be non-zero"); + self.reassembler = Some(Reassembler::new(size, Duration::from_secs(timeout_secs))); + self.last_reassembled = None; + self.last_reassembly_error = None; + self.last_evicted.clear(); + } + + /// Submit a fragment to the configured reassembler. + /// + /// # Panics + /// Panics if the reassembler has not been configured. + pub fn push_fragment( + &mut self, + message_id: u64, + index: u32, + is_last: bool, + payload_len: usize, + ) { + let reassembler = self + .reassembler + .as_mut() + .expect("reassembler not configured"); + let header = FragmentHeader::new( + MessageId::new(message_id), + FragmentIndex::new(index), + is_last, + ); + let payload = vec![0_u8; payload_len]; + self.last_reassembly_error = None; + self.last_reassembled = None; + match reassembler.push_at(header, payload, self.now) { + Ok(output) => self.last_reassembled = output, + Err(err) => self.last_reassembly_error = Some(err), + } + } + + /// Advance the simulated clock. + /// + /// # Panics + /// + /// Panics if advancing the clock would overflow [`Instant`]. + pub fn advance_time(&mut self, delta: Duration) { + self.now = self + .now + .checked_add(delta) + .expect("time advance overflowed"); + } + + /// Purge expired partial messages based on the current clock reading. + /// + /// # Panics + /// + /// Panics if the reassembler has not been configured. + pub fn purge_reassembly(&mut self) { + let reassembler = self + .reassembler + .as_mut() + .expect("reassembler not configured"); + self.last_evicted = reassembler.purge_expired_at(self.now); + } + + /// Assert that a message has been reassembled with the expected payload length. + /// + /// # Panics + /// Panics if no message has been reassembled yet. + pub fn assert_reassembled_len(&self, expected_len: usize) { + let message = self + .last_reassembled + .as_ref() + .expect("no message reassembled"); + assert_eq!( + message.payload().len(), + expected_len, + "payload length mismatch" + ); + } + + /// Assert that no message has been fully reassembled. + /// + /// # Panics + /// + /// Panics if a message has already been reassembled. + pub fn assert_no_reassembly(&self) { + assert!( + self.last_reassembled.is_none(), + "unexpected reassembled message present" + ); + } + + /// Assert the latest reassembly error signalled an over-limit message. + /// + /// # Panics + /// Panics if no reassembly error was captured. + pub fn assert_reassembly_over_limit(&self) { + let err = self + .last_reassembly_error + .as_ref() + .expect("no reassembly error captured"); + assert!( + matches!(err, ReassemblyError::MessageTooLarge { .. }), + "expected message-too-large error, got {err}" + ); + } + + /// Assert the number of buffered partial messages. + /// + /// # Panics + /// Panics if the reassembler has not been configured. + pub fn assert_buffered_messages(&self, expected: usize) { + let reassembler = self + .reassembler + .as_ref() + .expect("reassembler not configured"); + assert_eq!( + reassembler.buffered_len(), + expected, + "unexpected buffered message count" + ); + } + + /// Assert that the most recent purge evicted a specific message identifier. + /// + /// # Panics + /// + /// Panics if the purge record does not contain `message_id`. + pub fn assert_evicted_message(&self, message_id: u64) { + assert!( + self.last_evicted.contains(&MessageId::new(message_id)), + "message {message_id} was not evicted" + ); + } } From aad0d1a3e6582c6c4152210ffb8ced4467894db3 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 24 Nov 2025 13:57:48 +0000 Subject: [PATCH 2/5] refactor(fragment tests): unify fragment header usage in tests Refactor the fragment tests and related test step implementations to use the FragmentHeader struct directly instead of separate message_id, index, and is_last parameters. Introduce a helper function to setup the reassembler with the first fragment to reduce duplication. This improves code clarity and consistency across the fragment tests and steps. Co-authored-by: terragon-labs[bot] --- src/fragment/tests.rs | 44 ++++++++++++++++------------------- tests/steps/fragment_steps.rs | 7 ++++-- tests/worlds/fragment.rs | 13 +---------- 3 files changed, 26 insertions(+), 38 deletions(-) diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index ac982f9f..682fc344 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -8,6 +8,24 @@ use rstest::rstest; use super::*; +fn setup_reassembler_with_first_fragment( + message_id: u64, + first_payload: impl AsRef<[u8]>, +) -> Reassembler { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false); + assert!( + reassembler + .push(first, first_payload) + .expect("first fragment accepted") + .is_none() + ); + reassembler +} + #[test] fn fragment_header_exposes_fields() { let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::new(2), true); @@ -193,20 +211,9 @@ fn reassembler_returns_single_fragment_immediately() { #[test] fn reassembler_accumulates_ordered_fragments() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(30), - ); - let first = FragmentHeader::new(MessageId::new(2), FragmentIndex::zero(), false); + let mut reassembler = setup_reassembler_with_first_fragment(2, [5_u8, 6, 7]); let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); - assert!( - reassembler - .push(first, [5_u8, 6, 7]) - .expect("first fragment accepted") - .is_none() - ); - let complete = reassembler .push(final_fragment, [8_u8, 9]) .expect("final fragment accepted") @@ -218,20 +225,9 @@ fn reassembler_accumulates_ordered_fragments() { #[test] fn reassembler_rejects_out_of_order_and_drops_partial() { - let mut reassembler = Reassembler::new( - NonZeroUsize::new(8).expect("non-zero"), - Duration::from_secs(30), - ); - let first = FragmentHeader::new(MessageId::new(3), FragmentIndex::zero(), false); + let mut reassembler = setup_reassembler_with_first_fragment(3, [1_u8, 2]); let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); - assert!( - reassembler - .push(first, [1_u8, 2]) - .expect("first fragment accepted") - .is_none() - ); - let err = reassembler .push(skipped, [3_u8]) .expect_err("out-of-order fragment must be rejected"); diff --git a/tests/steps/fragment_steps.rs b/tests/steps/fragment_steps.rs index e22db512..493c069d 100644 --- a/tests/steps/fragment_steps.rs +++ b/tests/steps/fragment_steps.rs @@ -2,6 +2,7 @@ use std::time::Duration; use cucumber::{given, then, when}; +use wireframe::{FragmentHeader, FragmentIndex, MessageId}; use crate::world::FragmentWorld; @@ -88,7 +89,8 @@ fn when_reassembler_fragment_non_final( message: u64, len: usize, ) { - world.push_fragment(message, index, false, len); + let header = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(index), false); + world.push_fragment(header, len); } #[when(expr = "fragment {int} for message {int} with {int} bytes arrives marked final")] @@ -98,7 +100,8 @@ fn when_reassembler_fragment_final( message: u64, len: usize, ) { - world.push_fragment(message, index, true, len); + let header = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(index), true); + world.push_fragment(header, len); } #[when(expr = "time advances by {int} seconds")] diff --git a/tests/worlds/fragment.rs b/tests/worlds/fragment.rs index a251299c..9be87d9e 100644 --- a/tests/worlds/fragment.rs +++ b/tests/worlds/fragment.rs @@ -290,22 +290,11 @@ impl FragmentWorld { /// /// # Panics /// Panics if the reassembler has not been configured. - pub fn push_fragment( - &mut self, - message_id: u64, - index: u32, - is_last: bool, - payload_len: usize, - ) { + pub fn push_fragment(&mut self, header: FragmentHeader, payload_len: usize) { let reassembler = self .reassembler .as_mut() .expect("reassembler not configured"); - let header = FragmentHeader::new( - MessageId::new(message_id), - FragmentIndex::new(index), - is_last, - ); let payload = vec![0_u8; payload_len]; self.last_reassembly_error = None; self.last_reassembled = None; From 3ed15cd915b7d6d7f695cad42c3f70b74ecd0141 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 24 Nov 2025 20:43:40 +0000 Subject: [PATCH 3/5] feat(fragment): refactor reassembler to simplify fragment handling - Extracted common logic for appending fragments and checking max message size into `append_and_maybe_complete` method. - This reduces code duplication when handling incomplete and complete fragment statuses. - Added tests verifying reassembler accepts fragments up to max message size, including single and multiple fragment cases. - Added support and tests for detection of out-of-order fragment errors in feature tests and step implementations. Co-authored-by: terragon-labs[bot] --- src/fragment/reassembler.rs | 72 +++++++++++++++++++-------------- src/fragment/tests.rs | 45 +++++++++++++++++++++ tests/features/fragment.feature | 6 +++ tests/steps/fragment_steps.rs | 5 +++ tests/worlds/fragment.rs | 19 +++++++++ 5 files changed, 116 insertions(+), 31 deletions(-) diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs index 2ef16d91..f95f1dc3 100644 --- a/src/fragment/reassembler.rs +++ b/src/fragment/reassembler.rs @@ -8,7 +8,10 @@ //! behavioural tests can reuse it without depending on socket types. use std::{ - collections::{HashMap, hash_map::Entry}, + collections::{ + HashMap, + hash_map::{Entry, OccupiedEntry}, + }, num::NonZeroUsize, time::{Duration, Instant}, }; @@ -27,11 +30,9 @@ struct PartialMessage { impl PartialMessage { fn new(series: FragmentSeries, payload: &[u8], started_at: Instant) -> Self { - let mut buffer = Vec::with_capacity(payload.len()); - buffer.extend_from_slice(payload); Self { series, - buffer, + buffer: payload.to_vec(), started_at, } } @@ -150,33 +151,20 @@ impl Reassembler { .map_err(ReassemblyError::from); match status { - Ok(FragmentStatus::Incomplete) => { - let attempted = occupied.get().len() + payload.len(); - if let Err(err) = Self::assert_within_limit( - self.max_message_size, - header.message_id(), - attempted, - ) { - occupied.remove(); - return Err(err); - } - occupied.get_mut().push(payload); - Ok(None) - } - Ok(FragmentStatus::Complete) => { - let attempted = occupied.get().len() + payload.len(); - if let Err(err) = Self::assert_within_limit( - self.max_message_size, - header.message_id(), - attempted, - ) { - occupied.remove(); - return Err(err); - } - occupied.get_mut().push(payload); - let buffer = occupied.remove().into_buffer(); - Ok(Some(ReassembledMessage::new(header.message_id(), buffer))) - } + Ok(FragmentStatus::Incomplete) => Self::append_and_maybe_complete( + self.max_message_size, + occupied, + header.message_id(), + payload, + false, + ), + Ok(FragmentStatus::Complete) => Self::append_and_maybe_complete( + self.max_message_size, + occupied, + header.message_id(), + payload, + true, + ), Err(err) => { occupied.remove(); Err(err) @@ -245,4 +233,26 @@ impl Reassembler { } Ok(()) } + + fn append_and_maybe_complete( + limit: NonZeroUsize, + mut occupied: OccupiedEntry<'_, MessageId, PartialMessage>, + message_id: MessageId, + payload: &[u8], + completes: bool, + ) -> Result, ReassemblyError> { + let attempted = occupied.get().len() + payload.len(); + if let Err(err) = Self::assert_within_limit(limit, message_id, attempted) { + occupied.remove(); + return Err(err); + } + + occupied.get_mut().push(payload); + if completes { + let buffer = occupied.remove().into_buffer(); + Ok(Some(ReassembledMessage::new(message_id, buffer))) + } else { + Ok(None) + } + } } diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index 682fc344..c4e4b61a 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -190,6 +190,51 @@ fn fragmenter_respects_explicit_message_ids() { assert_eq!(next.message_id(), MessageId::new(10)); } +#[test] +fn reassembler_allows_single_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true); + let payload = vec![0_u8; max_message_size.get()]; + + let result = reassembler + .push(header, payload) + .expect("fragment within limit should be accepted"); + + let assembled = result.expect("single fragment should complete reassembly"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_allows_multi_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false); + let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true); + + let first_payload = vec![0_u8; 8]; + let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()]; + + assert!( + reassembler + .push(first_header, first_payload) + .expect("first fragment within limit") + .is_none(), + "first fragment should not complete the message", + ); + + let result = reassembler + .push(second_header, second_payload) + .expect("second fragment keeps total at limit"); + + let assembled = result.expect("fragments should complete reassembly at exact limit"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + #[test] fn reassembler_returns_single_fragment_immediately() { let mut reassembler = Reassembler::new( diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index 006a3aeb..99cd447a 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -69,3 +69,9 @@ Feature: Fragment metadata enforcement Then the reassembler is buffering 0 messages And message 23 is evicted And no message has been reassembled yet + + Scenario: Reassembler rejects out-of-order fragments + Given a reassembler allowing 8 bytes with a 30-second reassembly timeout + When fragment 1 for message 24 with 2 bytes arrives marked non-final + Then the reassembler reports an out-of-order fragment error + And the reassembler is buffering 0 messages diff --git a/tests/steps/fragment_steps.rs b/tests/steps/fragment_steps.rs index 493c069d..608e397f 100644 --- a/tests/steps/fragment_steps.rs +++ b/tests/steps/fragment_steps.rs @@ -123,6 +123,11 @@ fn then_no_reassembled_message(world: &mut FragmentWorld) { world.assert_no_reas #[then("the reassembler reports a message-too-large error")] fn then_reassembly_over_limit(world: &mut FragmentWorld) { world.assert_reassembly_over_limit(); } +#[then("the reassembler reports an out-of-order fragment error")] +fn then_reassembly_out_of_order(world: &mut FragmentWorld) { + world.assert_reassembly_out_of_order(); +} + #[then(expr = "the reassembler is buffering {int} messages")] fn then_buffered_messages(world: &mut FragmentWorld, expected: usize) { world.assert_buffered_messages(expected); diff --git a/tests/worlds/fragment.rs b/tests/worlds/fragment.rs index 9be87d9e..d21b28ac 100644 --- a/tests/worlds/fragment.rs +++ b/tests/worlds/fragment.rs @@ -399,4 +399,23 @@ impl FragmentWorld { "message {message_id} was not evicted" ); } + + /// Assert that the latest reassembly error was triggered by an out-of-order fragment. + /// + /// # Panics + /// + /// Panics if no reassembly error was captured or if the error was not an index mismatch. + pub fn assert_reassembly_out_of_order(&self) { + let err = self + .last_reassembly_error + .as_ref() + .expect("no reassembly error captured"); + assert!( + matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + ), + "expected out-of-order error, got {err}" + ); + } } From da43b0d7c6acf7d8b048ea8e0810e52b76a202e0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 25 Nov 2025 19:55:07 +0000 Subject: [PATCH 4/5] test(fragment/reassembly): add focused reassembly tests and reorganize test files - Introduced a new `reassembly.rs` test module under `tests/worlds/fragment/` with comprehensive helpers and assertions for fragment reassembly scenarios. - Renamed `tests/worlds/fragment.rs` to a directory `tests/worlds/fragment/mod.rs` to better modularize fragment-related tests. - Updated imports and test structure to integrate the new module. - Improved test clarity and coverage for reassembly error conditions and behavior. Co-authored-by: terragon-labs[bot] --- docs/users-guide.md | 4 +- src/fragment/reassembler.rs | 9 +- tests/worlds/{fragment.rs => fragment/mod.rs} | 152 +---------------- tests/worlds/fragment/reassembly.rs | 160 ++++++++++++++++++ 4 files changed, 173 insertions(+), 152 deletions(-) rename tests/worlds/{fragment.rs => fragment/mod.rs} (65%) create mode 100644 tests/worlds/fragment/reassembly.rs diff --git a/docs/users-guide.md b/docs/users-guide.md index eca43aea..7a68f178 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -179,7 +179,7 @@ for fragment in batch.fragments() { A companion `Reassembler` mirrors the helper on the inbound path. It buffers fragments per `MessageId`, rejects out-of-order fragments, and enforces a maximum assembled size while exposing `purge_expired` to clear stale partial -messages after a configurable timeout. When the final fragment arrives it +messages after a configurable timeout. When the final fragment arrives, it returns a `ReassembledMessage` that can be decoded into the original type. ```rust @@ -192,7 +192,7 @@ use wireframe::fragment::{ }; let mut reassembler = - Reassembler::new(NonZeroUsize::new(512).unwrap(), Duration::from_secs(30)); + Reassembler::new(NonZeroUsize::new(512).expect("non-zero capacity"), Duration::from_secs(30)); let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::zero(), true); let complete = reassembler diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs index f95f1dc3..f5a5b326 100644 --- a/src/fragment/reassembler.rs +++ b/src/fragment/reassembler.rs @@ -241,7 +241,14 @@ impl Reassembler { payload: &[u8], completes: bool, ) -> Result, ReassemblyError> { - let attempted = occupied.get().len() + payload.len(); + let Some(attempted) = occupied.get().len().checked_add(payload.len()) else { + occupied.remove(); + return Err(ReassemblyError::MessageTooLarge { + message_id, + attempted: usize::MAX, + limit, + }); + }; if let Err(err) = Self::assert_within_limit(limit, message_id, attempted) { occupied.remove(); return Err(err); diff --git a/tests/worlds/fragment.rs b/tests/worlds/fragment/mod.rs similarity index 65% rename from tests/worlds/fragment.rs rename to tests/worlds/fragment/mod.rs index d21b28ac..12542cc6 100644 --- a/tests/worlds/fragment.rs +++ b/tests/worlds/fragment/mod.rs @@ -6,10 +6,9 @@ //! `Fragmenter` and inspecting the resulting `FragmentBatch` state. #![cfg(not(loom))] -use std::{ - num::NonZeroUsize, - time::{Duration, Instant}, -}; +mod reassembly; + +use std::{num::NonZeroUsize, time::Instant}; use cucumber::World; use wireframe::fragment::{ @@ -273,149 +272,4 @@ impl FragmentWorld { "unexpected message identifier", ); } - - /// Configure a reassembler with size and timeout guards. - /// - /// # Panics - /// Panics if `max_message_size` is zero. - pub fn configure_reassembler(&mut self, max_message_size: usize, timeout_secs: u64) { - let size = NonZeroUsize::new(max_message_size).expect("reassembly cap must be non-zero"); - self.reassembler = Some(Reassembler::new(size, Duration::from_secs(timeout_secs))); - self.last_reassembled = None; - self.last_reassembly_error = None; - self.last_evicted.clear(); - } - - /// Submit a fragment to the configured reassembler. - /// - /// # Panics - /// Panics if the reassembler has not been configured. - pub fn push_fragment(&mut self, header: FragmentHeader, payload_len: usize) { - let reassembler = self - .reassembler - .as_mut() - .expect("reassembler not configured"); - let payload = vec![0_u8; payload_len]; - self.last_reassembly_error = None; - self.last_reassembled = None; - match reassembler.push_at(header, payload, self.now) { - Ok(output) => self.last_reassembled = output, - Err(err) => self.last_reassembly_error = Some(err), - } - } - - /// Advance the simulated clock. - /// - /// # Panics - /// - /// Panics if advancing the clock would overflow [`Instant`]. - pub fn advance_time(&mut self, delta: Duration) { - self.now = self - .now - .checked_add(delta) - .expect("time advance overflowed"); - } - - /// Purge expired partial messages based on the current clock reading. - /// - /// # Panics - /// - /// Panics if the reassembler has not been configured. - pub fn purge_reassembly(&mut self) { - let reassembler = self - .reassembler - .as_mut() - .expect("reassembler not configured"); - self.last_evicted = reassembler.purge_expired_at(self.now); - } - - /// Assert that a message has been reassembled with the expected payload length. - /// - /// # Panics - /// Panics if no message has been reassembled yet. - pub fn assert_reassembled_len(&self, expected_len: usize) { - let message = self - .last_reassembled - .as_ref() - .expect("no message reassembled"); - assert_eq!( - message.payload().len(), - expected_len, - "payload length mismatch" - ); - } - - /// Assert that no message has been fully reassembled. - /// - /// # Panics - /// - /// Panics if a message has already been reassembled. - pub fn assert_no_reassembly(&self) { - assert!( - self.last_reassembled.is_none(), - "unexpected reassembled message present" - ); - } - - /// Assert the latest reassembly error signalled an over-limit message. - /// - /// # Panics - /// Panics if no reassembly error was captured. - pub fn assert_reassembly_over_limit(&self) { - let err = self - .last_reassembly_error - .as_ref() - .expect("no reassembly error captured"); - assert!( - matches!(err, ReassemblyError::MessageTooLarge { .. }), - "expected message-too-large error, got {err}" - ); - } - - /// Assert the number of buffered partial messages. - /// - /// # Panics - /// Panics if the reassembler has not been configured. - pub fn assert_buffered_messages(&self, expected: usize) { - let reassembler = self - .reassembler - .as_ref() - .expect("reassembler not configured"); - assert_eq!( - reassembler.buffered_len(), - expected, - "unexpected buffered message count" - ); - } - - /// Assert that the most recent purge evicted a specific message identifier. - /// - /// # Panics - /// - /// Panics if the purge record does not contain `message_id`. - pub fn assert_evicted_message(&self, message_id: u64) { - assert!( - self.last_evicted.contains(&MessageId::new(message_id)), - "message {message_id} was not evicted" - ); - } - - /// Assert that the latest reassembly error was triggered by an out-of-order fragment. - /// - /// # Panics - /// - /// Panics if no reassembly error was captured or if the error was not an index mismatch. - pub fn assert_reassembly_out_of_order(&self) { - let err = self - .last_reassembly_error - .as_ref() - .expect("no reassembly error captured"); - assert!( - matches!( - err, - ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) - ), - "expected out-of-order error, got {err}" - ); - } } diff --git a/tests/worlds/fragment/reassembly.rs b/tests/worlds/fragment/reassembly.rs new file mode 100644 index 00000000..84ae67ec --- /dev/null +++ b/tests/worlds/fragment/reassembly.rs @@ -0,0 +1,160 @@ +//! Reassembly-focused helpers for `FragmentWorld`. + +use std::{num::NonZeroUsize, time::Duration}; + +use super::{ + FragmentError, + FragmentHeader, + FragmentWorld, + MessageId, + Reassembler, + ReassemblyError, +}; + +impl FragmentWorld { + /// Configure a reassembler with size and timeout guards. + /// + /// # Panics + /// Panics if `max_message_size` is zero. + pub fn configure_reassembler(&mut self, max_message_size: usize, timeout_secs: u64) { + let size = NonZeroUsize::new(max_message_size).expect("reassembly cap must be non-zero"); + self.reassembler = Some(Reassembler::new(size, Duration::from_secs(timeout_secs))); + self.last_reassembled = None; + self.last_reassembly_error = None; + self.last_evicted.clear(); + } + + /// Submit a fragment to the configured reassembler. + /// + /// # Panics + /// Panics if the reassembler has not been configured. + pub fn push_fragment(&mut self, header: FragmentHeader, payload_len: usize) { + let reassembler = self + .reassembler + .as_mut() + .expect("reassembler not configured"); + let payload = vec![0_u8; payload_len]; + self.last_reassembly_error = None; + self.last_reassembled = None; + match reassembler.push_at(header, payload, self.now) { + Ok(output) => self.last_reassembled = output, + Err(err) => self.last_reassembly_error = Some(err), + } + } + + /// Advance the simulated clock. + /// + /// # Panics + /// + /// Panics if advancing the clock would overflow [`Instant`]. + pub fn advance_time(&mut self, delta: Duration) { + self.now = self + .now + .checked_add(delta) + .expect("time advance overflowed"); + } + + /// Purge expired partial messages based on the current clock reading. + /// + /// # Panics + /// + /// Panics if the reassembler has not been configured. + pub fn purge_reassembly(&mut self) { + let reassembler = self + .reassembler + .as_mut() + .expect("reassembler not configured"); + self.last_evicted = reassembler.purge_expired_at(self.now); + } + + /// Assert that a message has been reassembled with the expected payload length. + /// + /// # Panics + /// Panics if no message has been reassembled yet. + pub fn assert_reassembled_len(&self, expected_len: usize) { + let message = self + .last_reassembled + .as_ref() + .expect("no message reassembled"); + assert_eq!( + message.payload().len(), + expected_len, + "payload length mismatch" + ); + } + + /// Assert that no message has been fully reassembled. + /// + /// # Panics + /// + /// Panics if a message has already been reassembled. + pub fn assert_no_reassembly(&self) { + assert!( + self.last_reassembled.is_none(), + "unexpected reassembled message present" + ); + } + + /// Assert the latest reassembly error signalled an over-limit message. + /// + /// # Panics + /// + /// Panics if no reassembly error was captured. + pub fn assert_reassembly_over_limit(&self) { + let err = self + .last_reassembly_error + .as_ref() + .expect("no reassembly error captured"); + assert!( + matches!(err, ReassemblyError::MessageTooLarge { .. }), + "expected message-too-large error, got {err}" + ); + } + + /// Assert that the latest reassembly error was triggered by an out-of-order fragment. + /// + /// # Panics + /// + /// Panics if no reassembly error was captured or if the error was not an index mismatch. + pub fn assert_reassembly_out_of_order(&self) { + let err = self + .last_reassembly_error + .as_ref() + .expect("no reassembly error captured"); + assert!( + matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + ), + "expected out-of-order error, got {err}" + ); + } + + /// Assert the number of buffered partial messages. + /// + /// # Panics + /// Panics if the reassembler has not been configured. + pub fn assert_buffered_messages(&self, expected: usize) { + let reassembler = self + .reassembler + .as_ref() + .expect("reassembler not configured"); + assert_eq!( + reassembler.buffered_len(), + expected, + "unexpected buffered message count" + ); + } + + /// Assert that the most recent purge evicted a specific message identifier. + /// + /// # Panics + /// + /// Panics if the purge record does not contain `message_id`. + pub fn assert_evicted_message(&self, message_id: u64) { + assert!( + self.last_evicted.contains(&MessageId::new(message_id)), + "message {message_id} was not evicted" + ); + } +} From cb4c566c57ea705f5795768edc8a820feab5b4bb Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 26 Nov 2025 21:36:38 +0000 Subject: [PATCH 5/5] refactor(tests/worlds/fragment): refactor reassembly error assertions into generic helper Introduce a generic helper method `assert_reassembly_error_matches` to assert on the latest captured reassembly error using a predicate. This reduces code duplication and improves clarity by consolidating common assertion logic previously repeated in the `assert_reassembly_over_limit` and `assert_reassembly_out_of_order` methods. Co-authored-by: terragon-labs[bot] --- tests/worlds/fragment/reassembly.rs | 43 +++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/tests/worlds/fragment/reassembly.rs b/tests/worlds/fragment/reassembly.rs index 84ae67ec..20d9a4e5 100644 --- a/tests/worlds/fragment/reassembly.rs +++ b/tests/worlds/fragment/reassembly.rs @@ -95,19 +95,30 @@ impl FragmentWorld { ); } - /// Assert the latest reassembly error signalled an over-limit message. + /// Helper for asserting on the latest captured reassembly error. /// /// # Panics - /// - /// Panics if no reassembly error was captured. - pub fn assert_reassembly_over_limit(&self) { + /// Panics if no reassembly error was captured or if the predicate returns false. + fn assert_reassembly_error_matches(&self, predicate: F, expected_description: &str) + where + F: FnOnce(&ReassemblyError) -> bool, + { let err = self .last_reassembly_error .as_ref() .expect("no reassembly error captured"); - assert!( - matches!(err, ReassemblyError::MessageTooLarge { .. }), - "expected message-too-large error, got {err}" + assert!(predicate(err), "expected {expected_description}, got {err}"); + } + + /// Assert the latest reassembly error signalled an over-limit message. + /// + /// # Panics + /// + /// Panics if no reassembly error was captured. + pub fn assert_reassembly_over_limit(&self) { + self.assert_reassembly_error_matches( + |err| matches!(err, ReassemblyError::MessageTooLarge { .. }), + "message-too-large error", ); } @@ -117,16 +128,14 @@ impl FragmentWorld { /// /// Panics if no reassembly error was captured or if the error was not an index mismatch. pub fn assert_reassembly_out_of_order(&self) { - let err = self - .last_reassembly_error - .as_ref() - .expect("no reassembly error captured"); - assert!( - matches!( - err, - ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) - ), - "expected out-of-order error, got {err}" + self.assert_reassembly_error_matches( + |err| { + matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + ) + }, + "out-of-order error", ); }