Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/app/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
//! [`crate::app::builder::WireframeApp`] for how envelopes are used when
//! registering routes.

use crate::{correlation::CorrelatableFrame, message::Message};
use crate::{
correlation::CorrelatableFrame,
fragment::{FragmentParts, Fragmentable},
message::Message,
};

/// Envelope-like type used to wrap incoming and outgoing messages.
///
Expand Down Expand Up @@ -243,3 +247,19 @@ impl From<PacketParts> for Envelope {
Envelope::new(id, correlation_id, payload)
}
}

// Blanket implementation: any Packet is automatically Fragmentable.
impl<T: Packet> Fragmentable for T {
fn into_fragment_parts(self) -> FragmentParts {
let parts = self.into_parts();
FragmentParts::new(parts.id(), parts.correlation_id(), parts.payload())
}

fn from_fragment_parts(parts: FragmentParts) -> Self {
T::from_parts(PacketParts::new(
parts.id(),
parts.correlation_id(),
parts.payload(),
))
}
}
51 changes: 11 additions & 40 deletions src/app/fragment_utils.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
//! Shared helpers for applying transport-level fragmentation to packets.

use crate::{
app::{Packet, PacketParts},
fragment::{FragmentationError, Fragmenter, encode_fragment_payload},
};

/// Fragment a packet using the provided fragmenter, returning one or more frames.
///
/// Small payloads that fit within the fragment cap are returned unchanged.
///
/// # Errors
///
/// Returns [`FragmentationError`] if fragmenting the payload fails or if
/// encoding the fragment header and payload into an on-wire frame fails.
pub fn fragment_packet<E: Packet>(
fragmenter: &Fragmenter,
packet: E,
) -> Result<Vec<E>, FragmentationError> {
let parts = packet.into_parts();
let id = parts.id();
let correlation = parts.correlation_id();
let payload = parts.payload();

let batch = fragmenter.fragment_bytes(&payload)?;
if !batch.is_fragmented() {
return Ok(vec![E::from_parts(PacketParts::new(
id,
correlation,
payload,
))]);
}

let mut frames = Vec::with_capacity(batch.len());
for fragment in batch {
let (header, payload) = fragment.into_parts();
let encoded = encode_fragment_payload(header, &payload)?;
frames.push(E::from_parts(PacketParts::new(id, correlation, encoded)));
}
Ok(frames)
}
//!
//! **Deprecated:** This module re-exports [`fragment_packet`] from its
//! canonical location in [`crate::fragment`]. New code should import directly
//! from [`crate::fragment::fragment_packet`] instead.

// Re-export from canonical location for backward compatibility.
#[deprecated(
since = "0.3.0",
note = "use `crate::fragment::fragment_packet` instead"
)]
pub use crate::fragment::fragment_packet;
8 changes: 6 additions & 2 deletions src/app/fragmentation_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use thiserror::Error;

use super::{Packet, PacketParts};
use crate::fragment::{
Fragmentable,
FragmentationError,
Fragmenter,
MessageId,
Expand Down Expand Up @@ -39,8 +40,11 @@ impl FragmentationState {
}
}

pub(crate) fn fragment<E: Packet>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
crate::app::fragment_utils::fragment_packet(&self.fragmenter, packet)
pub(crate) fn fragment<E: Fragmentable>(
&self,
packet: E,
) -> Result<Vec<E>, FragmentationError> {
crate::fragment::fragment_packet(&self.fragmenter, packet)
}

pub(crate) fn reassemble<E: Packet>(
Expand Down
2 changes: 2 additions & 0 deletions src/fragment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod fragmenter;
pub mod header;
pub mod id;
pub mod index;
pub mod packet;
pub mod payload;
pub mod reassembler;
pub mod series;
Expand All @@ -21,6 +22,7 @@ pub use fragmenter::{FragmentBatch, FragmentFrame, Fragmenter};
pub use header::FragmentHeader;
pub use id::MessageId;
pub use index::FragmentIndex;
pub use packet::{FragmentParts, Fragmentable, fragment_packet};
pub use payload::{
FRAGMENT_MAGIC,
decode_fragment_payload,
Expand Down
99 changes: 99 additions & 0 deletions src/fragment/packet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Shared helpers for applying transport-level fragmentation to packets.
//!
//! This module provides the [`Fragmentable`] trait and [`fragment_packet`]
//! function used by both the connection actor and the application layer to
//! split oversized payloads into transport-safe fragment frames.

use super::{FragmentationError, Fragmenter, encode_fragment_payload};

/// Component values extracted from or used to build a [`Fragmentable`] packet.
///
/// This type mirrors [`crate::app::PacketParts`] but lives in the fragment
/// module to avoid a layering violation where fragmentation depends on app
/// types.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FragmentParts {
id: u32,
correlation_id: Option<u64>,
payload: Vec<u8>,
}

impl FragmentParts {
/// Construct a new set of fragment parts.
#[must_use]
pub fn new(id: u32, correlation_id: Option<u64>, payload: Vec<u8>) -> Self {
Self {
id,
correlation_id,
payload,
}
}

/// Return the message identifier.
#[must_use]
pub const fn id(&self) -> u32 { self.id }

/// Retrieve the correlation identifier, if present.
#[must_use]
pub const fn correlation_id(&self) -> Option<u64> { self.correlation_id }

/// Consume the parts and return the raw payload bytes.
#[must_use]
pub fn payload(self) -> Vec<u8> { self.payload }
}

/// A packet that can be decomposed into parts and reconstructed for fragmentation.
///
/// This trait captures the minimal interface required by the fragmentation layer
/// to split oversized packets into smaller frames and reassemble them. It
/// intentionally avoids depending on application-layer types like
/// [`crate::app::Packet`].
///
/// Types implementing [`crate::app::Packet`] automatically implement this trait
/// via a blanket implementation in the `app` module.
pub trait Fragmentable: Send + Sync + 'static + Sized {
/// Consume the packet and return its identifier, correlation id and payload bytes.
fn into_fragment_parts(self) -> FragmentParts;

/// Construct a new packet from raw parts.
fn from_fragment_parts(parts: FragmentParts) -> Self;
}

/// Fragment a packet using the provided fragmenter, returning one or more frames.
///
/// Small payloads that fit within the fragment cap are returned unchanged.
///
/// # Errors
///
/// Returns [`FragmentationError`] if fragmenting the payload fails or if
/// encoding the fragment header and payload into an on-wire frame fails.
pub fn fragment_packet<E: Fragmentable>(
fragmenter: &Fragmenter,
packet: E,
) -> Result<Vec<E>, FragmentationError> {
let parts = packet.into_fragment_parts();
let id = parts.id();
let correlation = parts.correlation_id();
let payload = parts.payload();

let batch = fragmenter.fragment_bytes(&payload)?;
if !batch.is_fragmented() {
return Ok(vec![E::from_fragment_parts(FragmentParts::new(
id,
correlation,
payload,
))]);
}

let mut frames = Vec::with_capacity(batch.len());
for fragment in batch {
let (header, payload) = fragment.into_parts();
let encoded = encode_fragment_payload(header, &payload)?;
frames.push(E::from_fragment_parts(FragmentParts::new(
id,
correlation,
encoded,
)));
}
Ok(frames)
}
Loading