diff --git a/CHANGELOG.md b/CHANGELOG.md index 23cfbf28..9dbf4892 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,3 +12,49 @@ All notable changes to this project will be documented in this file. - Added a `Fragmenter` helper that slices oversized messages into sequential fragments, stamping each piece with a `FragmentHeader` for transparent transport-level reassembly. +- Breaking: Changed `FragmentError::IndexOverflow` and + `FragmentationError::IndexOverflow` from unit variants to struct variants + carrying a `last: FragmentIndex` field. This field records the final valid + index observed before the counter would overflow `u32::MAX`. + + **Migration guide:** + + Pattern matches against the old unit variant must be updated to destructure + or wildcard the new field: + + ```rust + // Before (0.1.x): unit variant + match err { + FragmentError::IndexOverflow => { /* ... */ } + // ... + } + + // After (0.2+): struct variant with `last` field + match err { + FragmentError::IndexOverflow { last } => { + eprintln!("overflow after fragment index {last}"); + } + // ... + } + ``` + + The same change applies to `FragmentationError::IndexOverflow`: + + ```rust + // Before (0.1.x) + Err(FragmentationError::IndexOverflow) => { /* ... */ } + + // After (0.2+) + Err(FragmentationError::IndexOverflow { last }) => { + log::warn!("cannot fragment: index overflow after {last}"); + } + ``` + + If the `last` value is not needed, use `{ .. }` to ignore it: + + ```rust + match err { + FragmentError::IndexOverflow { .. } => { /* handle overflow */ } + // ... + } + ``` diff --git a/Cargo.toml b/Cargo.toml index ae01754b..4a0eaf5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ big_endian_bytes = "deny" [lints.rust] unknown_lints = "deny" renamed_and_removed_lints = "deny" +missing_docs = "deny" unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } [lints.rustdoc] diff --git a/docs/generic-message-fragmentation-and-re-assembly-design.md b/docs/generic-message-fragmentation-and-re-assembly-design.md index b90442ba..dcdd121c 100644 --- a/docs/generic-message-fragmentation-and-re-assembly-design.md +++ b/docs/generic-message-fragmentation-and-re-assembly-design.md @@ -378,7 +378,9 @@ This feature is designed as a foundational layer that other features build upon. talking to codec internals. - Added `FragmentSeries` to enforce ordering invariants per logical message, surfacing precise diagnostics (`MessageMismatch`, `IndexMismatch`, - `SeriesComplete`, `IndexOverflow`). This helper keeps the reassembly logic + `SeriesComplete`, `IndexOverflow { last }`). The `IndexOverflow` variant + carries a `last: FragmentIndex` field recording the final valid index before + the counter would overflow `u32::MAX`. This helper keeps the reassembly logic deterministic and enables behavioural tests to assert transport-level guarantees without standing up a full codec pipeline. - Introduced `Fragmenter`, `FragmentBatch`, and `FragmentFrame` as reusable diff --git a/src/app/envelope.rs b/src/app/envelope.rs index 02ebb36e..eb428b41 100644 --- a/src/app/envelope.rs +++ b/src/app/envelope.rs @@ -122,12 +122,42 @@ impl PacketParts { } } + /// Return the message identifier used to route this frame. + /// + /// # Examples + /// + /// ``` + /// use wireframe::app::PacketParts; + /// + /// let parts = PacketParts::new(9, None, vec![1, 2, 3]); + /// assert_eq!(parts.id(), 9); + /// ``` #[must_use] pub const fn id(&self) -> u32 { self.id } + /// Retrieve the correlation identifier, if present. + /// + /// # Examples + /// + /// ``` + /// use wireframe::app::PacketParts; + /// + /// let parts = PacketParts::new(1, Some(42), vec![]); + /// assert_eq!(parts.correlation_id(), Some(42)); + /// ``` #[must_use] pub const fn correlation_id(&self) -> Option { self.correlation_id } + /// Consume the parts and return the raw payload bytes. + /// + /// # Examples + /// + /// ``` + /// use wireframe::app::PacketParts; + /// + /// let parts = PacketParts::new(1, None, vec![7, 8]); + /// assert_eq!(parts.payload(), vec![7, 8]); + /// ``` #[must_use] pub fn payload(self) -> Vec { self.payload } diff --git a/src/connection.rs b/src/connection.rs index c8dc9bc9..08ab7276 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -96,7 +96,9 @@ pub struct FairnessConfig { /// Bundles push queues with their shared handle for actor construction. pub struct ConnectionChannels { + /// Receivers for high- and low-priority frames consumed by the actor. pub queues: PushQueues, + /// Handle cloned by producers to enqueue frames into the shared queues. pub handle: PushHandle, } diff --git a/src/connection/test_support.rs b/src/connection/test_support.rs index deaa9bbb..cdfa6d85 100644 --- a/src/connection/test_support.rs +++ b/src/connection/test_support.rs @@ -66,6 +66,7 @@ pub fn create_test_actor_with_hooks( pub struct ActorHarness { actor: ConnectionActor, state: ActorState, + /// Frames emitted by the actor during tests, preserved for assertions. pub out: Vec, } @@ -173,10 +174,15 @@ impl ActorHarness { /// Snapshot of the actor lifecycle flags and counters. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ActorStateSnapshot { + /// `true` while the actor is still polling its sources. pub is_active: bool, + /// `true` after shutdown has begun but before sources finish. pub is_shutting_down: bool, + /// `true` once all sources have closed and the actor can exit. pub is_done: bool, + /// Total number of sources being tracked for completion. pub total_sources: usize, + /// Number of sources observed as closed so far. pub closed_sources: usize, } diff --git a/src/fragment/error.rs b/src/fragment/error.rs index 9e3c04bb..55d0a4d8 100644 --- a/src/fragment/error.rs +++ b/src/fragment/error.rs @@ -26,13 +26,17 @@ pub enum FragmentError { /// The fragment belongs to a different message. #[error("fragment message mismatch: expected {expected}, found {found}")] MessageMismatch { + /// Message identifier currently being assembled. expected: MessageId, + /// Identifier carried by the incoming fragment. found: MessageId, }, /// A fragment arrived out of order. #[error("fragment index mismatch: expected {expected}, found {found}")] IndexMismatch { + /// Index the series expected next. expected: FragmentIndex, + /// Index carried by the fragment that was received. found: FragmentIndex, }, /// The series already consumed a last fragment. @@ -40,7 +44,10 @@ pub enum FragmentError { SeriesComplete, /// The fragment index overflowed `u32::MAX`. #[error("fragment index overflow after {last}")] - IndexOverflow { last: FragmentIndex }, + IndexOverflow { + /// Last valid index observed before overflow occurred. + last: FragmentIndex, + }, } /// Errors produced while fragmenting outbound messages. @@ -51,7 +58,10 @@ pub enum FragmentationError { Encode(#[from] EncodeError), /// The fragment index cannot advance because it would overflow `u32`. #[error("fragment index overflow after {last}")] - IndexOverflow { last: FragmentIndex }, + IndexOverflow { + /// Final index emitted before the counter would overflow. + last: FragmentIndex, + }, /// Calculated fragment slice exceeded payload bounds. #[error("fragment slice out of bounds: offset={offset}, end={end}, total={total}")] SliceBounds { diff --git a/src/fragment/tests.rs b/src/fragment/tests.rs index 2d20bbe3..b1139ada 100644 --- a/src/fragment/tests.rs +++ b/src/fragment/tests.rs @@ -91,7 +91,12 @@ fn series_detects_index_overflow() { let err = series .accept(header) .expect_err("overflow must raise an error"); - assert!(matches!(err, FragmentError::IndexOverflow { .. })); + assert_eq!( + err, + FragmentError::IndexOverflow { + last: FragmentIndex::new(u32::MAX) + } + ); } #[test] diff --git a/src/push/queues/errors.rs b/src/push/queues/errors.rs index 379e5280..bf54f56a 100644 --- a/src/push/queues/errors.rs +++ b/src/push/queues/errors.rs @@ -25,5 +25,10 @@ pub enum PushConfigError { InvalidRate(usize), /// The provided capacities were zero. #[error("invalid capacities; high={high}, low={low}; each must be >= 1")] - InvalidCapacity { high: usize, low: usize }, + InvalidCapacity { + /// Capacity configured for the high-priority queue. + high: usize, + /// Capacity configured for the low-priority queue. + low: usize, + }, } diff --git a/src/push/queues/mod.rs b/src/push/queues/mod.rs index eafff53a..36db7c53 100644 --- a/src/push/queues/mod.rs +++ b/src/push/queues/mod.rs @@ -50,7 +50,9 @@ const_assert!(DEFAULT_PUSH_RATE <= MAX_PUSH_RATE); /// Priority level for outbound messages. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum PushPriority { + /// Time-sensitive frames processed ahead of low-priority traffic. High, + /// Best-effort frames that yield to high-priority work when present. Low, } diff --git a/src/server/runtime.rs b/src/server/runtime.rs index 5d613ca0..a99383bb 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -57,7 +57,9 @@ impl AcceptListener for TcpListener { /// - `initial_delay` must be at least 1 millisecond #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct BackoffConfig { + /// Delay used for the first retry after an `accept()` failure. pub initial_delay: Duration, + /// Maximum back-off delay once retries have increased exponentially. pub max_delay: Duration, } @@ -71,6 +73,27 @@ impl Default for BackoffConfig { } impl BackoffConfig { + /// Clamp delays to sane bounds and ensure `initial_delay <= max_delay`. + /// + /// This prevents accidental misconfiguration (for example, inverted or + /// zero durations) before the values are used in the accept loop. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use wireframe::server::runtime::BackoffConfig; + /// + /// let cfg = BackoffConfig { + /// initial_delay: Duration::from_millis(5), + /// max_delay: Duration::from_millis(1), + /// }; + /// + /// let normalised = cfg.normalised(); + /// assert_eq!(normalised.initial_delay, Duration::from_millis(1)); + /// assert_eq!(normalised.max_delay, Duration::from_millis(5)); + /// ``` #[must_use] pub fn normalised(mut self) -> Self { self.initial_delay = self.initial_delay.max(Duration::from_millis(1)); @@ -355,6 +378,7 @@ pub(super) async fn accept_loop( tracker, backoff, } = options; + let backoff = backoff.normalised(); debug_assert!( backoff.initial_delay <= backoff.max_delay, "BackoffConfig invariant violated: initial_delay > max_delay" diff --git a/tests/advanced/concurrency_loom.rs b/tests/advanced/concurrency_loom.rs index c1d3e358..9650d191 100644 --- a/tests/advanced/concurrency_loom.rs +++ b/tests/advanced/concurrency_loom.rs @@ -1,9 +1,9 @@ -#![cfg(all(feature = "advanced-tests", loom))] //! Concurrency tests for push queues using loom. //! //! These tests exercise the `PushHandle` shared state without Tokio. `loom` -//! explores interleavings to ensure DLQ accounting and queue-full errors remain -//! deterministic under concurrent producers. +//! explores interleavings to ensure DLQ accounting and queue-full errors +//! remain deterministic under concurrent producers. +#![cfg(all(feature = "advanced-tests", loom))] use loom::{model, thread}; use rstest::rstest; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 2ad0fa0f..b01c0904 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -19,7 +19,9 @@ pub fn unused_listener() -> StdTcpListener { use rstest::fixture; use wireframe::{app::Envelope, serializer::BincodeSerializer}; +/// Default app type used by cucumber worlds during integration tests. pub type TestApp = wireframe::app::WireframeApp; +/// Shared result type for cucumber step implementations. pub type TestResult = Result>; #[fixture] diff --git a/tests/worlds/correlation.rs b/tests/worlds/correlation.rs index 41aceced..149c0b8f 100644 --- a/tests/worlds/correlation.rs +++ b/tests/worlds/correlation.rs @@ -17,14 +17,30 @@ use wireframe::{ use super::{TestResult, build_small_queues}; #[derive(Debug, Default, World)] +/// Test world capturing correlation expectations for frame emission. pub struct CorrelationWorld { expected: Option, frames: Vec, } impl CorrelationWorld { + /// Record the correlation identifier expected on emitted frames. + /// + /// # Examples + /// ```ignore + /// let mut world = CorrelationWorld::default(); + /// world.set_expected(Some(99)); + /// ``` pub fn set_expected(&mut self, expected: Option) { self.expected = expected; } + /// Return the correlation identifier configured for this scenario. + /// + /// # Examples + /// ```ignore + /// let mut world = CorrelationWorld::default(); + /// world.set_expected(None); + /// assert_eq!(world.expected(), None); + /// ``` #[must_use] pub fn expected(&self) -> Option { self.expected } diff --git a/tests/worlds/fragment/mod.rs b/tests/worlds/fragment/mod.rs index 203c295e..a0ed1a78 100644 --- a/tests/worlds/fragment/mod.rs +++ b/tests/worlds/fragment/mod.rs @@ -29,6 +29,7 @@ use wireframe::fragment::{ use super::TestResult; #[derive(Debug, World)] +/// Test world tracking fragmentation state across behavioural scenarios. pub struct FragmentWorld { series: Option, last_result: Option>, diff --git a/tests/worlds/multi_packet.rs b/tests/worlds/multi_packet.rs index 13a96648..3012fdea 100644 --- a/tests/worlds/multi_packet.rs +++ b/tests/worlds/multi_packet.rs @@ -1,8 +1,8 @@ //! Test world for multi-packet channel scenarios. -#![cfg(not(loom))] //! //! Provides [`MultiPacketWorld`] to verify message ordering, back-pressure //! handling, and channel lifecycle in cucumber-based behaviour tests. +#![cfg(not(loom))] use std::{error::Error, fmt}; @@ -23,6 +23,7 @@ impl fmt::Display for WireframeRunError { impl Error for WireframeRunError {} #[derive(Debug, Default, World)] +/// Test world exercising multi-packet channel behaviours and back-pressure. pub struct MultiPacketWorld { messages: Vec, is_overflow_error: bool, diff --git a/tests/worlds/panic.rs b/tests/worlds/panic.rs index 66e6eeb6..a03e3fba 100644 --- a/tests/worlds/panic.rs +++ b/tests/worlds/panic.rs @@ -76,6 +76,7 @@ impl Drop for PanicServer { } #[derive(Debug, Default, World)] +/// Test world that drives a server which intentionally panics during setup. pub struct PanicWorld { server: Option, attempts: usize, diff --git a/tests/worlds/stream_end.rs b/tests/worlds/stream_end.rs index d4d1e957..cae7dd27 100644 --- a/tests/worlds/stream_end.rs +++ b/tests/worlds/stream_end.rs @@ -21,6 +21,7 @@ use wireframe_testing::{LoggerHandle, logger}; use super::{Terminator, TestResult, build_small_queues}; #[derive(Debug, Default, World)] +/// Test world capturing frames and logs for stream termination scenarios. pub struct StreamEndWorld { frames: Vec, logs: Vec<(Level, String)>,