Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,49 @@ All notable changes to this project will be documented in this file.
- Added a `Fragmenter` helper that slices oversized messages into sequential
fragments, stamping each piece with a `FragmentHeader` for transparent
transport-level reassembly.
- Breaking: Changed `FragmentError::IndexOverflow` and
`FragmentationError::IndexOverflow` from unit variants to struct variants
carrying a `last: FragmentIndex` field. This field records the final valid
index observed before the counter would overflow `u32::MAX`.

**Migration guide:**

Pattern matches against the old unit variant must be updated to destructure
or wildcard the new field:

```rust
// Before (0.1.x): unit variant
match err {
FragmentError::IndexOverflow => { /* ... */ }
// ...
}

// After (0.2+): struct variant with `last` field
match err {
FragmentError::IndexOverflow { last } => {
eprintln!("overflow after fragment index {last}");
}
// ...
}
```

The same change applies to `FragmentationError::IndexOverflow`:

```rust
// Before (0.1.x)
Err(FragmentationError::IndexOverflow) => { /* ... */ }

// After (0.2+)
Err(FragmentationError::IndexOverflow { last }) => {
log::warn!("cannot fragment: index overflow after {last}");
}
```

If the `last` value is not needed, use `{ .. }` to ignore it:

```rust
match err {
FragmentError::IndexOverflow { .. } => { /* handle overflow */ }
// ...
}
```
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ big_endian_bytes = "deny"
[lints.rust]
unknown_lints = "deny"
renamed_and_removed_lints = "deny"
missing_docs = "deny"
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }

[lints.rustdoc]
Expand Down
4 changes: 3 additions & 1 deletion docs/generic-message-fragmentation-and-re-assembly-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,9 @@ This feature is designed as a foundational layer that other features build upon.
talking to codec internals.
- Added `FragmentSeries` to enforce ordering invariants per logical message,
surfacing precise diagnostics (`MessageMismatch`, `IndexMismatch`,
`SeriesComplete`, `IndexOverflow`). This helper keeps the reassembly logic
`SeriesComplete`, `IndexOverflow { last }`). The `IndexOverflow` variant
carries a `last: FragmentIndex` field recording the final valid index before
the counter would overflow `u32::MAX`. This helper keeps the reassembly logic
deterministic and enables behavioural tests to assert transport-level
guarantees without standing up a full codec pipeline.
- Introduced `Fragmenter`, `FragmentBatch`, and `FragmentFrame` as reusable
Expand Down
30 changes: 30 additions & 0 deletions src/app/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,42 @@ impl PacketParts {
}
}

/// Return the message identifier used to route this frame.
///
/// # Examples
///
/// ```
/// use wireframe::app::PacketParts;
///
/// let parts = PacketParts::new(9, None, vec![1, 2, 3]);
/// assert_eq!(parts.id(), 9);
/// ```
#[must_use]
pub const fn id(&self) -> u32 { self.id }

/// Retrieve the correlation identifier, if present.
///
/// # Examples
///
/// ```
/// use wireframe::app::PacketParts;
///
/// let parts = PacketParts::new(1, Some(42), vec![]);
/// assert_eq!(parts.correlation_id(), Some(42));
/// ```
#[must_use]
pub const fn correlation_id(&self) -> Option<u64> { self.correlation_id }

/// Consume the parts and return the raw payload bytes.
///
/// # Examples
///
/// ```
/// use wireframe::app::PacketParts;
///
/// let parts = PacketParts::new(1, None, vec![7, 8]);
/// assert_eq!(parts.payload(), vec![7, 8]);
/// ```
#[must_use]
pub fn payload(self) -> Vec<u8> { self.payload }

Expand Down
2 changes: 2 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ pub struct FairnessConfig {

/// Bundles push queues with their shared handle for actor construction.
pub struct ConnectionChannels<F> {
/// Receivers for high- and low-priority frames consumed by the actor.
pub queues: PushQueues<F>,
/// Handle cloned by producers to enqueue frames into the shared queues.
pub handle: PushHandle<F>,
}

Expand Down
6 changes: 6 additions & 0 deletions src/connection/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn create_test_actor_with_hooks(
pub struct ActorHarness {
actor: ConnectionActor<u8, ()>,
state: ActorState,
/// Frames emitted by the actor during tests, preserved for assertions.
pub out: Vec<u8>,
}

Expand Down Expand Up @@ -173,10 +174,15 @@ impl ActorHarness {
/// Snapshot of the actor lifecycle flags and counters.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ActorStateSnapshot {
/// `true` while the actor is still polling its sources.
pub is_active: bool,
/// `true` after shutdown has begun but before sources finish.
pub is_shutting_down: bool,
/// `true` once all sources have closed and the actor can exit.
pub is_done: bool,
/// Total number of sources being tracked for completion.
pub total_sources: usize,
/// Number of sources observed as closed so far.
pub closed_sources: usize,
}

Expand Down
14 changes: 12 additions & 2 deletions src/fragment/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,28 @@ pub enum FragmentError {
/// The fragment belongs to a different message.
#[error("fragment message mismatch: expected {expected}, found {found}")]
MessageMismatch {
/// Message identifier currently being assembled.
expected: MessageId,
/// Identifier carried by the incoming fragment.
found: MessageId,
},
/// A fragment arrived out of order.
#[error("fragment index mismatch: expected {expected}, found {found}")]
IndexMismatch {
/// Index the series expected next.
expected: FragmentIndex,
/// Index carried by the fragment that was received.
found: FragmentIndex,
},
/// The series already consumed a last fragment.
#[error("fragment series already complete")]
SeriesComplete,
/// The fragment index overflowed `u32::MAX`.
#[error("fragment index overflow after {last}")]
IndexOverflow { last: FragmentIndex },
IndexOverflow {
/// Last valid index observed before overflow occurred.
last: FragmentIndex,
},
}

/// Errors produced while fragmenting outbound messages.
Expand All @@ -51,7 +58,10 @@ pub enum FragmentationError {
Encode(#[from] EncodeError),
/// The fragment index cannot advance because it would overflow `u32`.
#[error("fragment index overflow after {last}")]
IndexOverflow { last: FragmentIndex },
IndexOverflow {
/// Final index emitted before the counter would overflow.
last: FragmentIndex,
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
/// Calculated fragment slice exceeded payload bounds.
#[error("fragment slice out of bounds: offset={offset}, end={end}, total={total}")]
SliceBounds {
Expand Down
7 changes: 6 additions & 1 deletion src/fragment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ fn series_detects_index_overflow() {
let err = series
.accept(header)
.expect_err("overflow must raise an error");
assert!(matches!(err, FragmentError::IndexOverflow { .. }));
assert_eq!(
err,
FragmentError::IndexOverflow {
last: FragmentIndex::new(u32::MAX)
}
);
}

#[test]
Expand Down
7 changes: 6 additions & 1 deletion src/push/queues/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ pub enum PushConfigError {
InvalidRate(usize),
/// The provided capacities were zero.
#[error("invalid capacities; high={high}, low={low}; each must be >= 1")]
InvalidCapacity { high: usize, low: usize },
InvalidCapacity {
/// Capacity configured for the high-priority queue.
high: usize,
/// Capacity configured for the low-priority queue.
low: usize,
},
}
2 changes: 2 additions & 0 deletions src/push/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ const_assert!(DEFAULT_PUSH_RATE <= MAX_PUSH_RATE);
/// Priority level for outbound messages.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PushPriority {
/// Time-sensitive frames processed ahead of low-priority traffic.
High,
/// Best-effort frames that yield to high-priority work when present.
Low,
}

Expand Down
24 changes: 24 additions & 0 deletions src/server/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ impl AcceptListener for TcpListener {
/// - `initial_delay` must be at least 1 millisecond
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct BackoffConfig {
/// Delay used for the first retry after an `accept()` failure.
pub initial_delay: Duration,
/// Maximum back-off delay once retries have increased exponentially.
pub max_delay: Duration,
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Expand All @@ -71,6 +73,27 @@ impl Default for BackoffConfig {
}

impl BackoffConfig {
/// Clamp delays to sane bounds and ensure `initial_delay <= max_delay`.
///
/// This prevents accidental misconfiguration (for example, inverted or
/// zero durations) before the values are used in the accept loop.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// use wireframe::server::runtime::BackoffConfig;
///
/// let cfg = BackoffConfig {
/// initial_delay: Duration::from_millis(5),
/// max_delay: Duration::from_millis(1),
/// };
///
/// let normalised = cfg.normalised();
/// assert_eq!(normalised.initial_delay, Duration::from_millis(1));
/// assert_eq!(normalised.max_delay, Duration::from_millis(5));
/// ```
#[must_use]
pub fn normalised(mut self) -> Self {
self.initial_delay = self.initial_delay.max(Duration::from_millis(1));
Expand Down Expand Up @@ -355,6 +378,7 @@ pub(super) async fn accept_loop<F, T, L>(
tracker,
backoff,
} = options;
let backoff = backoff.normalised();
debug_assert!(
backoff.initial_delay <= backoff.max_delay,
"BackoffConfig invariant violated: initial_delay > max_delay"
Expand Down
6 changes: 3 additions & 3 deletions tests/advanced/concurrency_loom.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#![cfg(all(feature = "advanced-tests", loom))]
//! Concurrency tests for push queues using loom.
//!
//! These tests exercise the `PushHandle` shared state without Tokio. `loom`
//! explores interleavings to ensure DLQ accounting and queue-full errors remain
//! deterministic under concurrent producers.
//! explores interleavings to ensure DLQ accounting and queue-full errors
//! remain deterministic under concurrent producers.
#![cfg(all(feature = "advanced-tests", loom))]

use loom::{model, thread};
use rstest::rstest;
Expand Down
2 changes: 2 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ pub fn unused_listener() -> StdTcpListener {
use rstest::fixture;
use wireframe::{app::Envelope, serializer::BincodeSerializer};

/// Default app type used by cucumber worlds during integration tests.
pub type TestApp = wireframe::app::WireframeApp<BincodeSerializer, (), Envelope>;
/// Shared result type for cucumber step implementations.
pub type TestResult<T = ()> = Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[fixture]
Expand Down
16 changes: 16 additions & 0 deletions tests/worlds/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,30 @@ use wireframe::{
use super::{TestResult, build_small_queues};

#[derive(Debug, Default, World)]
/// Test world capturing correlation expectations for frame emission.
pub struct CorrelationWorld {
expected: Option<u64>,
frames: Vec<Envelope>,
}

impl CorrelationWorld {
/// Record the correlation identifier expected on emitted frames.
///
/// # Examples
/// ```ignore
/// let mut world = CorrelationWorld::default();
/// world.set_expected(Some(99));
/// ```
pub fn set_expected(&mut self, expected: Option<u64>) { self.expected = expected; }

/// Return the correlation identifier configured for this scenario.
///
/// # Examples
/// ```ignore
/// let mut world = CorrelationWorld::default();
/// world.set_expected(None);
/// assert_eq!(world.expected(), None);
/// ```
#[must_use]
pub fn expected(&self) -> Option<u64> { self.expected }

Expand Down
1 change: 1 addition & 0 deletions tests/worlds/fragment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use wireframe::fragment::{
use super::TestResult;

#[derive(Debug, World)]
/// Test world tracking fragmentation state across behavioural scenarios.
pub struct FragmentWorld {
series: Option<FragmentSeries>,
last_result: Option<Result<FragmentStatus, FragmentError>>,
Expand Down
3 changes: 2 additions & 1 deletion tests/worlds/multi_packet.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Test world for multi-packet channel scenarios.
#![cfg(not(loom))]
//!
//! Provides [`MultiPacketWorld`] to verify message ordering, back-pressure
//! handling, and channel lifecycle in cucumber-based behaviour tests.
#![cfg(not(loom))]

use std::{error::Error, fmt};

Expand All @@ -23,6 +23,7 @@ impl fmt::Display for WireframeRunError {
impl Error for WireframeRunError {}

#[derive(Debug, Default, World)]
/// Test world exercising multi-packet channel behaviours and back-pressure.
pub struct MultiPacketWorld {
messages: Vec<u8>,
is_overflow_error: bool,
Expand Down
1 change: 1 addition & 0 deletions tests/worlds/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl Drop for PanicServer {
}

#[derive(Debug, Default, World)]
/// Test world that drives a server which intentionally panics during setup.
pub struct PanicWorld {
server: Option<PanicServer>,
attempts: usize,
Expand Down
1 change: 1 addition & 0 deletions tests/worlds/stream_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use wireframe_testing::{LoggerHandle, logger};
use super::{Terminator, TestResult, build_small_queues};

#[derive(Debug, Default, World)]
/// Test world capturing frames and logs for stream termination scenarios.
pub struct StreamEndWorld {
frames: Vec<u8>,
logs: Vec<(Level, String)>,
Expand Down
Loading