diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index 706dd95e..7485267e 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -368,3 +368,8 @@ This feature is designed as a foundational layer that other features build upon. payloads into capped fragments, and return typed collections that are simple to wrap in protocol-specific envelopes or feed into behavioural tests before the full `FragmentAdapter` lands. +- Added a transport-agnostic `Reassembler` that mirrors the outbound helper on + the inbound path. It buffers fragments per `MessageId` via `FragmentSeries`, + drops the partial buffer when ordering breaks, enforces a configurable + `max_message_size`, and exposes caller-driven timeout purging. This prevents + abandoned assemblies from exhausting memory. diff --git a/docs/roadmap.md b/docs/roadmap.md index 26c64a99..2a1ec360 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -261,10 +261,10 @@ logic. - [x] Implement a `Fragmenter` to split a large `Message` into multiple `Frame`s, each with a `Fragment` header. - - [ ] Implement a `Reassembler` on the receiving end to collect fragments and + - [x] Implement a `Reassembler` on the receiving end to collect fragments and reconstruct the original `Message`. - - [ ] Manage a reassembly buffer with timeouts to prevent resource + - [x] Manage a reassembly buffer with timeouts to prevent resource exhaustion from incomplete messages. - [ ] **Integration with Core Library:** @@ -276,7 +276,7 @@ logic. - [ ] **Testing:** - - [ ] Create unit tests for the `Fragmenter` and `Reassembler`. + - [x] Create unit tests for the `Fragmenter` and `Reassembler`. - [ ] Develop integration tests sending and receiving large messages that require fragmentation. diff --git a/docs/users-guide.md b/docs/users-guide.md index 15bd7175..7a68f178 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -176,6 +176,34 @@ for fragment in batch.fragments() { } ``` +A companion `Reassembler` mirrors the helper on the inbound path. It buffers +fragments per `MessageId`, rejects out-of-order fragments, and enforces a +maximum assembled size while exposing `purge_expired` to clear stale partial +messages after a configurable timeout. When the final fragment arrives, it +returns a `ReassembledMessage` that can be decoded into the original type. + +```rust +use std::{num::NonZeroUsize, time::Duration}; +use wireframe::fragment::{ + FragmentHeader, + FragmentIndex, + MessageId, + Reassembler, +}; + +let mut reassembler = + Reassembler::new(NonZeroUsize::new(512).expect("non-zero capacity"), Duration::from_secs(30)); + +let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::zero(), true); +let complete = reassembler + .push(header, [0_u8; 12]) + .expect("fragments accepted") + .expect("single fragment completes the message"); + +// Decode when ready: +// let message: MyType = complete.decode().expect("decode"); +``` + ## Working with requests and middleware Every inbound frame becomes a `ServiceRequest`. Middleware can inspect or diff --git a/src/fragment/error.rs b/src/fragment/error.rs index 9970b0cc..78c4d855 100644 --- a/src/fragment/error.rs +++ b/src/fragment/error.rs @@ -4,6 +4,8 @@ //! specific protocols while still surfacing precise diagnostics for //! behavioural tests. +use std::num::NonZeroUsize; + use bincode::error::EncodeError; use thiserror::Error; @@ -51,3 +53,21 @@ pub enum FragmentationError { #[error("fragment index overflow after {last}")] IndexOverflow { last: FragmentIndex }, } + +/// Errors produced while re-assembling inbound fragments. +#[derive(Clone, Copy, Debug, Error, PartialEq, Eq)] +pub enum ReassemblyError { + /// The fragment broke ordering or message tracking guarantees. + #[error("fragment rejected during reassembly: {0}")] + Fragment(#[from] FragmentError), + /// The combined fragment payloads would exceed the configured cap. + #[error("message {message_id} exceeds reassembly cap: {attempted} bytes > {limit} byte limit")] + MessageTooLarge { + /// Identifier for the logical message being assembled. + message_id: MessageId, + /// Total size that triggered the guard. + attempted: usize, + /// Configured reassembly cap. + limit: NonZeroUsize, + }, +} diff --git a/src/fragment/mod.rs b/src/fragment/mod.rs index c47c2af2..a87ed54d 100644 --- a/src/fragment/mod.rs +++ b/src/fragment/mod.rs @@ -10,13 +10,15 @@ pub mod fragmenter; pub mod header; pub mod id; pub mod index; +pub mod reassembler; pub mod series; -pub use error::{FragmentError, FragmentStatus, FragmentationError}; +pub use error::{FragmentError, FragmentStatus, FragmentationError, ReassemblyError}; pub use fragmenter::{FragmentBatch, FragmentFrame, Fragmenter}; pub use header::FragmentHeader; pub use id::MessageId; pub use index::FragmentIndex; +pub use reassembler::{ReassembledMessage, Reassembler}; pub use series::FragmentSeries; #[cfg(test)] diff --git a/src/fragment/reassembler.rs b/src/fragment/reassembler.rs new file mode 100644 index 00000000..f5a5b326 --- /dev/null +++ b/src/fragment/reassembler.rs @@ -0,0 +1,265 @@ +//! Inbound helper that stitches fragments back into complete messages. +//! +//! [`Reassembler`] mirrors the outbound [`Fragmenter`](crate::fragment::Fragmenter) by +//! collecting fragment payloads keyed by [`MessageId`](crate::fragment::MessageId). +//! It enforces ordering via [`FragmentSeries`](crate::fragment::FragmentSeries), guards +//! against unbounded allocation with a configurable cap, and purges stale partial +//! assemblies after a fixed timeout. The helper is transport-agnostic so codecs and +//! behavioural tests can reuse it without depending on socket types. + +use std::{ + collections::{ + HashMap, + hash_map::{Entry, OccupiedEntry}, + }, + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +use bincode::error::DecodeError; + +use super::{FragmentHeader, FragmentSeries, FragmentStatus, MessageId, ReassemblyError}; +use crate::message::Message; + +#[derive(Debug)] +struct PartialMessage { + series: FragmentSeries, + buffer: Vec, + started_at: Instant, +} + +impl PartialMessage { + fn new(series: FragmentSeries, payload: &[u8], started_at: Instant) -> Self { + Self { + series, + buffer: payload.to_vec(), + started_at, + } + } + + fn push(&mut self, payload: &[u8]) { self.buffer.extend_from_slice(payload); } + + fn len(&self) -> usize { self.buffer.len() } + + fn started_at(&self) -> Instant { self.started_at } + + fn into_buffer(self) -> Vec { self.buffer } +} + +/// Container for a fully re-assembled message payload. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ReassembledMessage { + message_id: MessageId, + payload: Vec, +} + +impl ReassembledMessage { + /// Construct a new [`ReassembledMessage`]. + #[must_use] + pub fn new(message_id: MessageId, payload: Vec) -> Self { + Self { + message_id, + payload, + } + } + + /// Identifier shared by the fragments that formed this message. + #[must_use] + pub const fn message_id(&self) -> MessageId { self.message_id } + + /// Borrow the re-assembled payload. + #[must_use] + pub fn payload(&self) -> &[u8] { self.payload.as_slice() } + + /// Consume the message, returning the owned payload bytes. + #[must_use] + pub fn into_payload(self) -> Vec { self.payload } + + /// Decode the payload into a strongly typed message. + /// + /// # Errors + /// + /// Returns any [`DecodeError`] raised while deserialising the payload. + pub fn decode(&self) -> Result { + let (message, _) = M::from_bytes(self.payload())?; + Ok(message) + } +} + +/// Stateful fragment re-assembler with timeout-based eviction. +#[derive(Debug)] +pub struct Reassembler { + max_message_size: NonZeroUsize, + timeout: Duration, + buffers: HashMap, +} + +impl Reassembler { + /// Create a new re-assembler that enforces a maximum reconstructed payload size. + #[must_use] + pub fn new(max_message_size: NonZeroUsize, timeout: Duration) -> Self { + Self { + max_message_size, + timeout, + buffers: HashMap::new(), + } + } + + /// Process a fragment using the current time. + /// + /// Returns `Ok(Some(_))` when the fragment completes the message, `Ok(None)` + /// while more fragments are required, or an error if ordering or size + /// invariants are violated. + /// + /// # Errors + /// + /// Returns [`ReassemblyError`] when a fragment arrives out of order or would + /// push the reconstructed payload beyond the configured limit. + pub fn push( + &mut self, + header: FragmentHeader, + payload: impl AsRef<[u8]>, + ) -> Result, ReassemblyError> { + self.push_at(header, payload, Instant::now()) + } + + /// Process a fragment using an explicit clock reading. + /// + /// Accepting an explicit `now` simplifies deterministic testing and allows + /// callers to co-ordinate eviction sweeps with their own timers. + /// + /// # Errors + /// + /// Returns [`ReassemblyError`] when the fragment violates ordering or + /// exceeds the configured reassembly cap. + pub fn push_at( + &mut self, + header: FragmentHeader, + payload: impl AsRef<[u8]>, + now: Instant, + ) -> Result, ReassemblyError> { + self.purge_expired_at(now); + + let payload = payload.as_ref(); + + match self.buffers.entry(header.message_id()) { + Entry::Occupied(mut occupied) => { + let status = occupied + .get_mut() + .series + .accept(header) + .map_err(ReassemblyError::from); + + match status { + Ok(FragmentStatus::Incomplete) => Self::append_and_maybe_complete( + self.max_message_size, + occupied, + header.message_id(), + payload, + false, + ), + Ok(FragmentStatus::Complete) => Self::append_and_maybe_complete( + self.max_message_size, + occupied, + header.message_id(), + payload, + true, + ), + Err(err) => { + occupied.remove(); + Err(err) + } + } + } + Entry::Vacant(vacant) => { + let mut series = FragmentSeries::new(header.message_id()); + let status = series.accept(header).map_err(ReassemblyError::from)?; + let attempted = payload.len(); + Self::assert_within_limit(self.max_message_size, header.message_id(), attempted)?; + + match status { + FragmentStatus::Incomplete => { + vacant.insert(PartialMessage::new(series, payload, now)); + Ok(None) + } + FragmentStatus::Complete => Ok(Some(ReassembledMessage::new( + header.message_id(), + payload.to_vec(), + ))), + } + } + } + } + + /// Remove any partial messages that exceeded the configured timeout. + /// + /// Returns the identifiers of messages that were evicted. + pub fn purge_expired(&mut self) -> Vec { self.purge_expired_at(Instant::now()) } + + /// Remove any partial messages that exceeded the configured timeout using + /// an explicit clock reading. + /// + /// Returns the identifiers of messages that were evicted. + pub fn purge_expired_at(&mut self, now: Instant) -> Vec { + let mut evicted = Vec::new(); + let timeout = self.timeout; + + self.buffers.retain(|message_id, partial| { + let expired = now.saturating_duration_since(partial.started_at()) >= timeout; + if expired { + evicted.push(*message_id); + } + !expired + }); + + evicted + } + + /// Number of partial messages currently buffered. + #[must_use] + pub fn buffered_len(&self) -> usize { self.buffers.len() } + + fn assert_within_limit( + limit: NonZeroUsize, + message_id: MessageId, + attempted: usize, + ) -> Result<(), ReassemblyError> { + if attempted > limit.get() { + return Err(ReassemblyError::MessageTooLarge { + message_id, + attempted, + limit, + }); + } + Ok(()) + } + + fn append_and_maybe_complete( + limit: NonZeroUsize, + mut occupied: OccupiedEntry<'_, MessageId, PartialMessage>, + message_id: MessageId, + payload: &[u8], + completes: bool, + ) -> Result, ReassemblyError> { + let Some(attempted) = occupied.get().len().checked_add(payload.len()) else { + occupied.remove(); + return Err(ReassemblyError::MessageTooLarge { + message_id, + attempted: usize::MAX, + limit, + }); + }; + if let Err(err) = Self::assert_within_limit(limit, message_id, attempted) { + occupied.remove(); + return Err(err); + } + + occupied.get_mut().push(payload); + if completes { + let buffer = occupied.remove().into_buffer(); + Ok(Some(ReassembledMessage::new(message_id, buffer))) + } else { + Ok(None) + } + } +} diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index d942a227..c4e4b61a 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -1,10 +1,31 @@ -use std::num::NonZeroUsize; +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; use bincode::{BorrowDecode, Encode}; use rstest::rstest; use super::*; +fn setup_reassembler_with_first_fragment( + message_id: u64, + first_payload: impl AsRef<[u8]>, +) -> Reassembler { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false); + assert!( + reassembler + .push(first, first_payload) + .expect("first fragment accepted") + .is_none() + ); + reassembler +} + #[test] fn fragment_header_exposes_fields() { let header = FragmentHeader::new(MessageId::new(9), FragmentIndex::new(2), true); @@ -168,3 +189,175 @@ fn fragmenter_respects_explicit_message_ids() { let next = fragmenter.fragment_bytes([1_u8]).expect("next fragment"); assert_eq!(next.message_id(), MessageId::new(10)); } + +#[test] +fn reassembler_allows_single_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true); + let payload = vec![0_u8; max_message_size.get()]; + + let result = reassembler + .push(header, payload) + .expect("fragment within limit should be accepted"); + + let assembled = result.expect("single fragment should complete reassembly"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_allows_multi_fragment_at_max_message_size() { + let max_message_size = NonZeroUsize::new(16).expect("non-zero"); + let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5)); + + let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false); + let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true); + + let first_payload = vec![0_u8; 8]; + let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()]; + + assert!( + reassembler + .push(first_header, first_payload) + .expect("first fragment within limit") + .is_none(), + "first fragment should not complete the message", + ); + + let result = reassembler + .push(second_header, second_payload) + .expect("second fragment keeps total at limit"); + + let assembled = result.expect("fragments should complete reassembly at exact limit"); + assert_eq!(assembled.payload().len(), max_message_size.get()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_returns_single_fragment_immediately() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(16).expect("non-zero"), + Duration::from_secs(5), + ); + let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true); + let payload = vec![1_u8, 2, 3, 4]; + + let complete = reassembler + .push(header, payload.clone()) + .expect("reassembly must succeed") + .expect("single fragment should complete message"); + + assert_eq!(complete.message_id(), MessageId::new(1)); + assert_eq!(complete.payload(), payload.as_slice()); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_accumulates_ordered_fragments() { + let mut reassembler = setup_reassembler_with_first_fragment(2, [5_u8, 6, 7]); + let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true); + + let complete = reassembler + .push(final_fragment, [8_u8, 9]) + .expect("final fragment accepted") + .expect("message should complete"); + + assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_rejects_out_of_order_and_drops_partial() { + let mut reassembler = setup_reassembler_with_first_fragment(3, [1_u8, 2]); + let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true); + + let err = reassembler + .push(skipped, [3_u8]) + .expect_err("out-of-order fragment must be rejected"); + assert!(matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + )); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_enforces_maximum_payload_size() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(30), + ); + let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false); + let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true); + + assert!( + reassembler + .push(first, [1_u8, 2, 3]) + .expect("first fragment accepted") + .is_none() + ); + + let err = reassembler + .push(final_fragment, [4_u8, 5]) + .expect_err("payload growth beyond cap must be rejected"); + assert_eq!( + err, + ReassemblyError::MessageTooLarge { + message_id: MessageId::new(4), + attempted: 5, + limit: NonZeroUsize::new(4).expect("non-zero"), + } + ); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[test] +fn reassembler_purges_expired_messages() { + let mut reassembler = Reassembler::new( + NonZeroUsize::new(8).expect("non-zero"), + Duration::from_secs(2), + ); + let now = Instant::now(); + let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false); + + assert!( + reassembler + .push_at(header, [0_u8, 1], now) + .expect("first fragment accepted") + .is_none() + ); + assert_eq!(reassembler.buffered_len(), 1); + + let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3)); + assert_eq!(evicted, vec![MessageId::new(5)]); + assert_eq!(reassembler.buffered_len(), 0); +} + +#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)] +struct ExampleMessage(u8); + +#[test] +fn reassembler_decodes_reconstructed_message() { + let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero")); + let batch = fragmenter + .fragment_message(&ExampleMessage(11)) + .expect("fragment message"); + let mut reassembler = Reassembler::new( + NonZeroUsize::new(4).expect("non-zero"), + Duration::from_secs(10), + ); + + let mut output = None; + for fragment in batch { + let (header, payload) = fragment.into_parts(); + output = reassembler + .push(header, payload) + .expect("fragment accepted"); + } + + let assembled = output.expect("message should complete"); + let decoded: ExampleMessage = assembled.decode().expect("decode message"); + assert_eq!(decoded, ExampleMessage(11)); +} diff --git a/src/lib.rs b/src/lib.rs index 39c1c70a..cde3833f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,9 @@ pub use fragment::{ FragmentationError, Fragmenter, MessageId, + ReassembledMessage, + Reassembler, + ReassemblyError, }; pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}; pub use metrics::{CONNECTIONS_ACTIVE, Direction, ERRORS_TOTAL, FRAMES_PROCESSED}; diff --git a/tests/features/fragment.feature b/tests/features/fragment.feature index ef36d30d..99cd447a 100644 --- a/tests/features/fragment.feature +++ b/tests/features/fragment.feature @@ -45,3 +45,33 @@ Feature: Fragment metadata enforcement And fragment 2 carries 2 bytes And fragment 2 is marked final And the fragments use message id 0 + + Scenario: Reassembler rebuilds sequential fragments + Given a reassembler allowing 10 bytes with a 30-second reassembly timeout + When fragment 0 for message 21 with 4 bytes arrives marked non-final + And fragment 1 for message 21 with 3 bytes arrives marked final + Then the reassembler outputs a payload of 7 bytes + And the reassembler is buffering 0 messages + + Scenario: Reassembler rejects messages that exceed the cap + Given a reassembler allowing 4 bytes with a 30-second reassembly timeout + When fragment 0 for message 22 with 3 bytes arrives marked non-final + And fragment 1 for message 22 with 2 bytes arrives marked final + Then the reassembler reports a message-too-large error + And no message has been reassembled yet + And the reassembler is buffering 0 messages + + Scenario: Reassembler evicts stale partial messages + Given a reassembler allowing 8 bytes with a 1-second reassembly timeout + When fragment 0 for message 23 with 5 bytes arrives marked non-final + And time advances by 2 seconds + And expired reassembly buffers are purged + Then the reassembler is buffering 0 messages + And message 23 is evicted + And no message has been reassembled yet + + Scenario: Reassembler rejects out-of-order fragments + Given a reassembler allowing 8 bytes with a 30-second reassembly timeout + When fragment 1 for message 24 with 2 bytes arrives marked non-final + Then the reassembler reports an out-of-order fragment error + And the reassembler is buffering 0 messages diff --git a/tests/steps/fragment_steps.rs b/tests/steps/fragment_steps.rs index fbefa9e1..608e397f 100644 --- a/tests/steps/fragment_steps.rs +++ b/tests/steps/fragment_steps.rs @@ -1,5 +1,8 @@ //! Steps for fragment metadata behavioural tests. +use std::time::Duration; + use cucumber::{given, then, when}; +use wireframe::{FragmentHeader, FragmentIndex, MessageId}; use crate::world::FragmentWorld; @@ -73,3 +76,64 @@ fn then_fragment_non_final(world: &mut FragmentWorld, index: usize) { fn then_fragment_message_id(world: &mut FragmentWorld, message_id: u64) { world.assert_message_id(message_id); } + +#[given(expr = "a reassembler allowing {int} bytes with a {int}-second reassembly timeout")] +fn given_reassembler(world: &mut FragmentWorld, max_bytes: usize, timeout_secs: u64) { + world.configure_reassembler(max_bytes, timeout_secs); +} + +#[when(expr = "fragment {int} for message {int} with {int} bytes arrives marked non-final")] +fn when_reassembler_fragment_non_final( + world: &mut FragmentWorld, + index: u32, + message: u64, + len: usize, +) { + let header = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(index), false); + world.push_fragment(header, len); +} + +#[when(expr = "fragment {int} for message {int} with {int} bytes arrives marked final")] +fn when_reassembler_fragment_final( + world: &mut FragmentWorld, + index: u32, + message: u64, + len: usize, +) { + let header = FragmentHeader::new(MessageId::new(message), FragmentIndex::new(index), true); + world.push_fragment(header, len); +} + +#[when(expr = "time advances by {int} seconds")] +fn when_time_advances(world: &mut FragmentWorld, seconds: u64) { + world.advance_time(Duration::from_secs(seconds)); +} + +#[when("expired reassembly buffers are purged")] +fn when_reassembly_purged(world: &mut FragmentWorld) { world.purge_reassembly(); } + +#[then(expr = "the reassembler outputs a payload of {int} bytes")] +fn then_reassembled_len(world: &mut FragmentWorld, expected: usize) { + world.assert_reassembled_len(expected); +} + +#[then("no message has been reassembled yet")] +fn then_no_reassembled_message(world: &mut FragmentWorld) { world.assert_no_reassembly(); } + +#[then("the reassembler reports a message-too-large error")] +fn then_reassembly_over_limit(world: &mut FragmentWorld) { world.assert_reassembly_over_limit(); } + +#[then("the reassembler reports an out-of-order fragment error")] +fn then_reassembly_out_of_order(world: &mut FragmentWorld) { + world.assert_reassembly_out_of_order(); +} + +#[then(expr = "the reassembler is buffering {int} messages")] +fn then_buffered_messages(world: &mut FragmentWorld, expected: usize) { + world.assert_buffered_messages(expected); +} + +#[then(expr = "message {int} is evicted")] +fn then_message_evicted(world: &mut FragmentWorld, message: u64) { + world.assert_evicted_message(message); +} diff --git a/tests/worlds/fragment.rs b/tests/worlds/fragment/mod.rs similarity index 91% rename from tests/worlds/fragment.rs rename to tests/worlds/fragment/mod.rs index 8de5c5cc..12542cc6 100644 --- a/tests/worlds/fragment.rs +++ b/tests/worlds/fragment/mod.rs @@ -6,7 +6,9 @@ //! `Fragmenter` and inspecting the resulting `FragmentBatch` state. #![cfg(not(loom))] -use std::num::NonZeroUsize; +mod reassembly; + +use std::{num::NonZeroUsize, time::Instant}; use cucumber::World; use wireframe::fragment::{ @@ -19,14 +21,38 @@ use wireframe::fragment::{ FragmentStatus, Fragmenter, MessageId, + ReassembledMessage, + Reassembler, + ReassemblyError, }; -#[derive(Debug, Default, World)] +#[derive(Debug, World)] pub struct FragmentWorld { series: Option, last_result: Option>, fragmenter: Option, last_batch: Option, + reassembler: Option, + last_reassembled: Option, + last_reassembly_error: Option, + now: Instant, + last_evicted: Vec, +} + +impl Default for FragmentWorld { + fn default() -> Self { + Self { + series: None, + last_result: None, + fragmenter: None, + last_batch: None, + reassembler: None, + last_reassembled: None, + last_reassembly_error: None, + now: Instant::now(), + last_evicted: Vec::new(), + } + } } impl FragmentWorld { diff --git a/tests/worlds/fragment/reassembly.rs b/tests/worlds/fragment/reassembly.rs new file mode 100644 index 00000000..20d9a4e5 --- /dev/null +++ b/tests/worlds/fragment/reassembly.rs @@ -0,0 +1,169 @@ +//! Reassembly-focused helpers for `FragmentWorld`. + +use std::{num::NonZeroUsize, time::Duration}; + +use super::{ + FragmentError, + FragmentHeader, + FragmentWorld, + MessageId, + Reassembler, + ReassemblyError, +}; + +impl FragmentWorld { + /// Configure a reassembler with size and timeout guards. + /// + /// # Panics + /// Panics if `max_message_size` is zero. + pub fn configure_reassembler(&mut self, max_message_size: usize, timeout_secs: u64) { + let size = NonZeroUsize::new(max_message_size).expect("reassembly cap must be non-zero"); + self.reassembler = Some(Reassembler::new(size, Duration::from_secs(timeout_secs))); + self.last_reassembled = None; + self.last_reassembly_error = None; + self.last_evicted.clear(); + } + + /// Submit a fragment to the configured reassembler. + /// + /// # Panics + /// Panics if the reassembler has not been configured. + pub fn push_fragment(&mut self, header: FragmentHeader, payload_len: usize) { + let reassembler = self + .reassembler + .as_mut() + .expect("reassembler not configured"); + let payload = vec![0_u8; payload_len]; + self.last_reassembly_error = None; + self.last_reassembled = None; + match reassembler.push_at(header, payload, self.now) { + Ok(output) => self.last_reassembled = output, + Err(err) => self.last_reassembly_error = Some(err), + } + } + + /// Advance the simulated clock. + /// + /// # Panics + /// + /// Panics if advancing the clock would overflow [`Instant`]. + pub fn advance_time(&mut self, delta: Duration) { + self.now = self + .now + .checked_add(delta) + .expect("time advance overflowed"); + } + + /// Purge expired partial messages based on the current clock reading. + /// + /// # Panics + /// + /// Panics if the reassembler has not been configured. + pub fn purge_reassembly(&mut self) { + let reassembler = self + .reassembler + .as_mut() + .expect("reassembler not configured"); + self.last_evicted = reassembler.purge_expired_at(self.now); + } + + /// Assert that a message has been reassembled with the expected payload length. + /// + /// # Panics + /// Panics if no message has been reassembled yet. + pub fn assert_reassembled_len(&self, expected_len: usize) { + let message = self + .last_reassembled + .as_ref() + .expect("no message reassembled"); + assert_eq!( + message.payload().len(), + expected_len, + "payload length mismatch" + ); + } + + /// Assert that no message has been fully reassembled. + /// + /// # Panics + /// + /// Panics if a message has already been reassembled. + pub fn assert_no_reassembly(&self) { + assert!( + self.last_reassembled.is_none(), + "unexpected reassembled message present" + ); + } + + /// Helper for asserting on the latest captured reassembly error. + /// + /// # Panics + /// Panics if no reassembly error was captured or if the predicate returns false. + fn assert_reassembly_error_matches(&self, predicate: F, expected_description: &str) + where + F: FnOnce(&ReassemblyError) -> bool, + { + let err = self + .last_reassembly_error + .as_ref() + .expect("no reassembly error captured"); + assert!(predicate(err), "expected {expected_description}, got {err}"); + } + + /// Assert the latest reassembly error signalled an over-limit message. + /// + /// # Panics + /// + /// Panics if no reassembly error was captured. + pub fn assert_reassembly_over_limit(&self) { + self.assert_reassembly_error_matches( + |err| matches!(err, ReassemblyError::MessageTooLarge { .. }), + "message-too-large error", + ); + } + + /// Assert that the latest reassembly error was triggered by an out-of-order fragment. + /// + /// # Panics + /// + /// Panics if no reassembly error was captured or if the error was not an index mismatch. + pub fn assert_reassembly_out_of_order(&self) { + self.assert_reassembly_error_matches( + |err| { + matches!( + err, + ReassemblyError::Fragment(FragmentError::IndexMismatch { .. }) + ) + }, + "out-of-order error", + ); + } + + /// Assert the number of buffered partial messages. + /// + /// # Panics + /// Panics if the reassembler has not been configured. + pub fn assert_buffered_messages(&self, expected: usize) { + let reassembler = self + .reassembler + .as_ref() + .expect("reassembler not configured"); + assert_eq!( + reassembler.buffered_len(), + expected, + "unexpected buffered message count" + ); + } + + /// Assert that the most recent purge evicted a specific message identifier. + /// + /// # Panics + /// + /// Panics if the purge record does not contain `message_id`. + pub fn assert_evicted_message(&self, message_id: u64) { + assert!( + self.last_evicted.contains(&MessageId::new(message_id)), + "message {message_id} was not evicted" + ); + } +}