Implement Transport-Level Fragmentation And Reassembly#399
Conversation
…assembly - Introduce FragmentationConfig to manage fragment sizes and reassembly limits. - Implement Fragmenter and Reassembler for splitting and reconstructing messages. - Add support to WireframeApp and ConnectionActor to enable automatic fragmentation for oversized payloads. - Define a standard on-wire fragment encoding with "FRAG" marker and header length. - Provide builder methods to configure or disable fragmentation. - Add comprehensive tests for fragmentation and reassembly, including edge cases. - Update documentation with usage examples and configuration guide for fragmentation. This enhances the resilience of the messaging layer by transparently handling large messages that exceed transport frame limits. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Reviewer's GuideImplements transport-level message fragmentation and reassembly that is transparently integrated into WireframeApp connections and the ConnectionActor, with configuration derived from buffer capacity, public fragmentation helpers, and comprehensive tests and docs coverage. Sequence diagram for fragmented request/response handling in WireframeApp and ConnectionActorsequenceDiagram
actor Client
participant WireframeApp
participant FragmentationState
participant HandlerService
participant ConnectionActor
participant FramedTransport
%% Outbound path: server sending response frames via ConnectionActor
Client->>WireframeApp: send request frame
WireframeApp->>WireframeApp: handle_connection(read loop)
WireframeApp->>WireframeApp: decode_envelope(frame)
WireframeApp->>FragmentationState: reassemble(env)
alt fragments incomplete
FragmentationState-->>WireframeApp: Ok(None)
WireframeApp-->>FramedTransport: wait for more frames
else complete or not fragmented
FragmentationState-->>WireframeApp: Ok(Some(env_complete))
WireframeApp->>HandlerService: call(request from env_complete)
HandlerService-->>WireframeApp: Ok(response_payload)
WireframeApp->>FragmentationState: fragment(Envelope)
alt fragmentation enabled and needed
FragmentationState-->>WireframeApp: Ok(Vec of fragmented Envelopes)
else not enabled or small payload
FragmentationState-->>WireframeApp: Ok(single Envelope)
end
loop for each response Envelope
WireframeApp->>FramedTransport: serialize and send frame
end
end
%% Outbound path for generic push/streaming via ConnectionActor
Client->>ConnectionActor: enqueue outbound frame F
ConnectionActor->>ConnectionActor: process_frame_with_hooks_and_metrics(F, out)
alt fragmentation configured
ConnectionActor->>ConnectionActor: call fragmentation(frame)
alt fragmentation succeeds
ConnectionActor-->>ConnectionActor: Vec of frames
loop for each fragmented frame
ConnectionActor->>ConnectionActor: push_frame(frame, out)
ConnectionActor->>FramedTransport: send frame
end
else fragmentation error
ConnectionActor->>ConnectionActor: log warning, inc_handler_errors
end
else no fragmentation
ConnectionActor->>ConnectionActor: push_frame(F, out)
ConnectionActor->>FramedTransport: send frame
end
FramedTransport-->>Client: deliver fragmented or whole frames
Class diagram for core fragmentation types and payload helpersclassDiagram
class FragmentationConfig {
+NonZeroUsize fragment_payload_cap
+NonZeroUsize max_message_size
+Duration reassembly_timeout
+for_frame_budget(frame_budget: usize, max_message_size: NonZeroUsize, reassembly_timeout: Duration) Option~FragmentationConfig~
+encoded_fragment_ceiling() usize
}
class FragmentationState {
-Fragmenter fragmenter
-Reassembler reassembler
+new(config: FragmentationConfig) FragmentationState
+fragment~E: Packet~(packet: E) Result~Vec~E~~, FragmentationError~
+reassemble~E: Packet~(packet: E) Result~Option~E~, FragmentProcessError~
+purge_expired() Vec~MessageId~
}
class FragmentProcessError {
<<enum>>
+Decode(DecodeError)
+Reassembly(ReassemblyError)
}
class Fragmenter {
+new(fragment_payload_cap: NonZeroUsize) Fragmenter
+max_fragment_size() NonZeroUsize
+fragment_bytes(payload: &[u8]) Result~FragmentBatch, FragmentationError~
}
class Reassembler {
+new(max_message_size: NonZeroUsize, reassembly_timeout: Duration) Reassembler
+push(header: FragmentHeader, fragment_payload: Vec~u8~) Result~Option~ReassembledMessage~, ReassemblyError~
+purge_expired() Vec~MessageId~
}
class FragmentHeader {
+new(message_id: MessageId, index: FragmentIndex, is_last_fragment: bool) FragmentHeader
}
class MessageId {
+new(raw: u64) MessageId
}
class FragmentIndex {
+zero() FragmentIndex
+new(raw: u32) FragmentIndex
}
class FragmentPayloadEncoding {
<<static>>
+FRAGMENT_MAGIC: &[u8;4]
+fragment_overhead() NonZeroUsize
+encode_fragment_payload(header: FragmentHeader, payload: &[u8]) Result~Vec~u8~~, EncodeError~
+decode_fragment_payload(payload: &[u8]) Result~Option~(FragmentHeader, &[u8])~, DecodeError~
}
FragmentationState --> Fragmenter : uses
FragmentationState --> Reassembler : uses
FragmentationState --> FragmentationConfig : constructed_from
FragmentationState --> FragmentPayloadEncoding : uses
FragmentationConfig --> FragmentPayloadEncoding : uses
Fragmenter --> FragmentHeader : produces
Fragmenter --> FragmentIndex : uses
Fragmenter --> MessageId : uses
Reassembler --> FragmentHeader : consumes
Reassembler --> MessageId : returns
FragmentHeader --> MessageId : has
FragmentHeader --> FragmentIndex : has
FragmentPayloadEncoding --> FragmentHeader : encodes_decodes
FragmentPayloadEncoding --> MessageId : via_header
FragmentPayloadEncoding --> FragmentIndex : via_header
Class diagram for WireframeApp and ConnectionActor fragmentation integrationclassDiagram
class WireframeApp {
+usize buffer_capacity
+u64 read_timeout_ms
+Option~FragmentationConfig~ fragmentation
+new() Result~WireframeApp, Error~
+buffer_capacity(capacity: usize) WireframeApp
+read_timeout_ms(timeout_ms: u64) WireframeApp
+fragmentation(config: Option~FragmentationConfig~) WireframeApp
+fragmentation_config() Option~FragmentationConfig~
+length_codec() LengthDelimitedCodec
+handle_connection~W: AsyncRead+AsyncWrite+Unpin~(stream: W) Result~(), io::Error~
+handle_frame~W: AsyncRead+AsyncWrite+Unpin~(framed: &mut Framed~W, LengthDelimitedCodec~, frame: &[u8], deser_failures: &mut u32, routes: &HashMap~u32, HandlerService~E~~, fragmentation: &mut Option~FragmentationState~) Result~(), io::Error~
+decode_envelope(frame: &[u8], deser_failures: &mut u32) Result~Option~Envelope~, io::Error~
+reassemble_if_needed(fragmentation: &mut Option~FragmentationState~, deser_failures: &mut u32, env: Envelope) Option~Envelope~
+forward_response~W: AsyncRead+AsyncWrite+Unpin~(env: Envelope, service: &HandlerService~E~, framed: &mut Framed~W, LengthDelimitedCodec~, fragmentation: &mut Option~FragmentationState~) Result~(), io::Error~
}
class WireframeAppBuilder {
+usize buffer_capacity
+u64 read_timeout_ms
+Option~FragmentationConfig~ fragmentation
+build() Result~WireframeApp, Error~
+buffer_capacity(capacity: usize) WireframeAppBuilder
+read_timeout_ms(timeout_ms: u64) WireframeAppBuilder
+fragmentation(config: Option~FragmentationConfig~) WireframeAppBuilder
}
class ConnectionActor {
++F frame_type
++E error_type
+ProtocolHooks~F, E~ hooks
+ConnectionContext ctx
+FairnessTracker fairness
+Option~OutboundFragmenter~F~~ fragmentation
+Option~ConnectionId~ connection_id
+Option~SocketAddr~ peer_addr
+new(push: PushQueues~F, E~, hooks: ProtocolHooks~F, E~, ctx: ConnectionContext) ConnectionActor~F, E~
+set_fairness(fairness: FairnessConfig) void
+enable_fragmentation(config: FragmentationConfig) void
+set_response(stream: Option~FrameStream~F, E~~) void
+process_frame_with_hooks_and_metrics(frame: F, out: &mut Vec~F~) void
+push_frame(frame: F, out: &mut Vec~F~) void
}
class OutboundFragmenterF {
<<typealias>>
+Fn(frame: F) -> Result~Vec~F~~, FragmentationError~
}
class default_fragmentation_fn {
<<static>>
+DEFAULT_FRAGMENT_TIMEOUT: Duration
+DEFAULT_MESSAGE_SIZE_MULTIPLIER: usize
+default_fragmentation(capacity: usize) Option~FragmentationConfig~
}
class HandlerServiceE {
+call(request: ServiceRequest~Envelope~) Future~Result~Response, Error~~
}
class Envelope {
+id: u32
+correlation_id: Option~u64~
+payload: Vec~u8~
+from_parts(parts: PacketParts) Envelope
+into_parts() PacketParts
}
WireframeAppBuilder --> WireframeApp : builds
WireframeAppBuilder --> FragmentationConfig : configures
WireframeApp --> WireframeAppBuilder : constructed_from
WireframeApp --> FragmentationConfig : holds
WireframeApp --> FragmentationState : uses
WireframeApp --> HandlerServiceE : routes_to
WireframeApp --> Envelope : processes
WireframeApp --> default_fragmentation_fn : uses
ConnectionActor --> OutboundFragmenterF : uses
ConnectionActor --> FragmentationConfig : configured_by
ConnectionActor --> Fragmenter : uses
ConnectionActor --> FragmentPayloadEncoding : uses
default_fragmentation_fn --> FragmentationConfig : constructs
OutboundFragmenterF <|-- ConnectionActor : field_fragmentation
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Warning Rate limit exceeded@leynos has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 46 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (7)
Note Reviews pausedUse the following commands to manage reviews:
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Summary by CodeRabbitRelease Notes
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughSummarise the PR: adds transport-level message fragmentation and reassembly, a FragmentationConfig derived from frame budgets, wire-format encoding/decoding (FRAG + length + bincode header + payload), integration into builders and connection actors for outbound fragmentation and inbound reassembly, default DoS guards (30s timeout, memory cap), and tests plus docs updates. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Builder as WireframeApp\nBuilder
participant Conn as ConnectionActor
participant FragOut as Fragmenter\n(outbound)
participant Transport as Wire
participant FragIn as Reassembler\n(inbound)
participant Handler as Application\nHandler
Client->>Builder: fragmentation(Some(cfg))
Builder->>Conn: supply FragmentationConfig
Conn->>Conn: enable_fragmentation(cfg)
Note over Client,Conn: OUTBOUND
Client->>Conn: send(large_message)
Conn->>FragOut: fragment_packet(message)
alt fragmented
FragOut->>Transport: encode each fragment with FRAG + len + header + payload
Transport->>Transport: emit multiple frames
else not fragmented
Conn->>Transport: emit single frame
end
Note over Transport,Conn: INBOUND
Transport->>Conn: deliver frame(s)
alt frame has FRAG marker
Conn->>FragIn: decode_fragment_payload()
FragIn->>FragIn: buffer and reassemble fragments
alt complete
FragIn->>Handler: deliver reassembled envelope
else pending
FragIn->>Conn: await more fragments / apply timeout
end
else no FRAG marker
Conn->>Handler: deliver envelope directly
end
Note over Conn: DoS guards
Conn->>Conn: purge_expired() on timeout, enforce max reassembly size
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Comment |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +156 to +185 async fn out_of_order_fragments_are_rejected() {
let buffer_capacity = 512;
let config = fragmentation_config(buffer_capacity);
let (tx, mut rx) = mpsc::unbounded_channel();
let app = make_app(buffer_capacity, config, &tx);
let codec = app.length_codec();
let (client_stream, server_stream) = tokio::io::duplex(256);
let mut client = Framed::new(client_stream, codec.clone());
let fragmenter = Fragmenter::new(config.fragment_payload_cap);
let payload = vec![1_u8; 800];
let request = Envelope::new(ROUTE_ID, CORRELATION, payload);
let mut fragments = fragment_envelope(&request, &fragmenter);
fragments.swap(0, 1);
let server = tokio::spawn(async move { app.handle_connection(server_stream).await });
send_envelopes(&mut client, &fragments).await;
client.get_mut().shutdown().await.expect("shutdown write");
assert!(
timeout(Duration::from_millis(200), rx.recv())
.await
.is_err(),
"handler should not receive out-of-order fragments"
);
drop(client);
server.await.expect("server task");
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
Refactored the out_of_order_fragments_are_rejected and duplicate_fragments_clear_reassembly tests into a single generic helper test_fragment_rejection. This reduces duplication and improves clarity by passing mutation closures and rejection messages as parameters. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- The outbound fragmentation logic in
FragmentationState::fragmentand the closure created inConnectionActor::enable_fragmentationare nearly identical; consider extracting a shared helper that operates onPacket/PacketPartsto avoid divergence and make future changes easier to maintain. - In
reassemble_if_needed, the warning logs always usecorrelation_id={:?}withNone::<u64>; it would be more useful to pass through the actual correlation ID from the envelope or fragment header so that fragmentation/reassembly failures can be correlated with specific requests.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The outbound fragmentation logic in `FragmentationState::fragment` and the closure created in `ConnectionActor::enable_fragmentation` are nearly identical; consider extracting a shared helper that operates on `Packet`/`PacketParts` to avoid divergence and make future changes easier to maintain.
- In `reassemble_if_needed`, the warning logs always use `correlation_id={:?}` with `None::<u64>`; it would be more useful to pass through the actual correlation ID from the envelope or fragment header so that fragmentation/reassembly failures can be correlated with specific requests.
## Individual Comments
### Comment 1
<location> `src/app/connection.rs:378-387` </location>
<code_context>
+ fn reassemble_if_needed(
</code_context>
<issue_to_address>
**issue (bug_risk):** Fragmentation decode/reassembly errors increase `deser_failures` but never trigger the max-deserialization-failures cutoff.
`decode_envelope` enforces `self.max_deser_failures` by closing the connection with `ErrorKind::InvalidData` once the threshold is exceeded. In contrast, `reassemble_if_needed` increments `deser_failures` on `FragmentProcessError::Decode` / `Reassembly` but never checks the threshold and just returns `None`.
As a result, a peer sending malformed fragments can drive unbounded `deser_failures` and warnings without ever being disconnected, which is inconsistent with envelope deserialization handling and may weaken our defensive posture.
If fragment corruption is meant to be treated like envelope deserialization failure, consider either bubbling a "too many fragment errors" condition up to `handle_frame` so it can return an `io::Error`, or providing a callback/closure into `reassemble_if_needed` to perform the same cutoff check when incrementing `deser_failures`.
</issue_to_address>
### Comment 2
<location> `src/app/connection.rs:387-391` </location>
<code_context>
+ match state.reassemble(env) {
+ Ok(Some(env)) => Some(env),
+ Ok(None) => None,
+ Err(FragmentProcessError::Decode(err)) => {
+ *deser_failures += 1;
+ warn!(
+ "failed to decode fragment header: correlation_id={:?}, error={err:?}",
+ None::<u64>
+ );
+ crate::metrics::inc_deser_errors();
</code_context>
<issue_to_address>
**suggestion:** Fragmentation error logs hard-code `correlation_id=None` even though the envelope’s correlation ID is available.
In these error paths you log `correlation_id={:?}` with `None::<u64>` even though `reassemble_if_needed` receives an `Envelope` that already has a `correlation_id`.
Please pass the real correlation ID into these logs (e.g. capture `env.correlation_id` before `state.reassemble(env)` and use that value) so fragmentation errors can be traced to the correct envelope.
Suggested implementation:
```rust
fn reassemble_if_needed(
fragmentation: &mut Option<FragmentationState>,
deser_failures: &mut u32,
env: Envelope,
) -> Option<Envelope> {
if let Some(state) = fragmentation.as_mut() {
let correlation_id = env.correlation_id;
match state.reassemble(env) {
```
```rust
Err(FragmentProcessError::Decode(err)) => {
*deser_failures += 1;
warn!(
"failed to decode fragment header: correlation_id={:?}, error={err:?}",
correlation_id
);
crate::metrics::inc_deser_errors();
None
}
Err(FragmentProcessError::Reassembly(err)) => {
*deser_failures += 1;
warn!(
"fragment reassembly failed: correlation_id={:?}, error={err:?}",
correlation_id
);
crate::metrics::inc_deser_errors();
None
```
If `Envelope` does not expose a public `correlation_id` field, replace `env.correlation_id` with the appropriate accessor (for example, `env.correlation_id()` or `env.metadata.correlation_id`). Also scan the rest of `src/app/connection.rs` for any other fragment-related warnings that still log `correlation_id=None::<u64>` and update them to use the captured `correlation_id` in the same way.
</issue_to_address>
### Comment 3
<location> `src/app/connection.rs:46-55` </location>
<code_context>
Deserialize(Box<dyn std::error::Error + Send + Sync>),
}
+/// Bundles outbound fragmentation and inbound re-assembly state for a connection.
+struct FragmentationState {
+ fragmenter: Fragmenter,
+ reassembler: Reassembler,
+}
+
+enum FragmentProcessError {
+ Decode(DecodeError),
+ Reassembly(ReassemblyError),
+}
+
+impl FragmentationState {
+ fn new(config: FragmentationConfig) -> Self {
+ Self {
+ fragmenter: Fragmenter::new(config.fragment_payload_cap),
+ reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout),
+ }
+ }
+
+ fn fragment<E: Packet>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
+ let parts = packet.into_parts();
+ let id = parts.id();
</code_context>
<issue_to_address>
**suggestion:** The fragmentation logic here is duplicated in `ConnectionActor::enable_fragmentation`; consider a shared helper to avoid divergence.
The logic in `FragmentationState::fragment` closely matches the closure in `ConnectionActor::enable_fragmentation` (extracting `PacketParts`, enforcing `max_fragment_size`, calling `fragment_bytes`, and wrapping via `encode_fragment_payload`). This duplication risks outbound/inbound behaviour diverging over time. Consider extracting a shared helper (e.g. `fn fragment_packet<P: Packet>(fragmenter: &Fragmenter, packet: P) -> Result<Vec<P>, FragmentationError>`) and calling it from both sites so future changes to the fragmentation scheme only need to be made once.
Suggested implementation:
```rust
fn fragment_packet<E: Packet>(
fragmenter: &Fragmenter,
packet: E,
) -> Result<Vec<E>, FragmentationError> {
let parts = packet.into_parts();
let id = parts.id();
let correlation = parts.correlation_id();
let payload = parts.payload();
// Fast-path: payload fits into a single frame; no fragmentation needed.
if payload.len() <= fragmenter.max_fragment_size().get() {
return Ok(vec![E::from_parts(PacketParts::new(
id,
correlation,
payload,
))]);
}
// Fragment the payload into multiple frames and wrap each fragment payload.
//
// NOTE: Adjust the exact mapping logic to match the existing
// `ConnectionActor::enable_fragmentation` closure implementation so that
// we preserve wire compatibility (e.g. fragment index, "is_last" flag, etc.).
let fragments = fragmenter.fragment_bytes(payload)?;
Ok(fragments
.into_iter()
.enumerate()
.map(|(idx, fragment_bytes)| {
// The concrete signature of `encode_fragment_payload` should mirror the
// existing fragmentation logic used in `ConnectionActor::enable_fragmentation`.
let fragment_payload =
encode_fragment_payload(id, idx as u32, /* is_last = */ false, fragment_bytes);
E::from_parts(PacketParts::new(id, correlation, fragment_payload))
})
.collect())
}
impl FragmentationState {
fn new(config: FragmentationConfig) -> Self {
Self {
fragmenter: Fragmenter::new(config.fragment_payload_cap),
reassembler: Reassembler::new(config.max_message_size, config.reassembly_timeout),
}
}
fn fragment<E: Packet>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
fragment_packet(&self.fragmenter, packet)
}
```
To fully implement the deduplication and ensure behaviour stays aligned:
1. **Adjust the helper to match real signatures:**
- Replace the call to `fragmenter.fragment_bytes(payload)?` with the exact call currently used in `ConnectionActor::enable_fragmentation` (e.g. return type, error type).
- Update the `encode_fragment_payload(...)` call to match its actual signature — in many implementations this includes fragment sequence number, "is_last" flag, and the raw fragment bytes. Mirror whatever parameters and flags the current closure uses, including the correct calculation of `is_last`.
2. **Refactor `ConnectionActor::enable_fragmentation`:**
- Locate the closure / function inside `ConnectionActor::enable_fragmentation` that currently performs outbound fragmentation (the logic that:
- extracts `PacketParts`,
- enforces `max_fragment_size`,
- calls `fragment_bytes`,
- and wraps via `encode_fragment_payload`).
- Replace that inline logic with a call to the shared helper, for example:
```rust
// Pseudocode - adapt to the actual types and lifetimes:
let outbound = move |packet: E| fragment_packet(&fragmenter, packet);
```
- Ensure the `fragmenter` passed into `fragment_packet` is the same `Fragmenter` instance already used by `ConnectionActor::enable_fragmentation` (or use the one inside `FragmentationState` if that state is shared).
3. **Keep error handling consistent:**
- If the closure previously mapped `FragmentationError` into some connection-level error type, keep that mapping but have it act on the result of `fragment_packet(...)` instead of a locally duplicated implementation.
Once these adjustments are made, the fragmentation algorithm will be maintained in only one place (`fragment_packet`), and both `FragmentationState::fragment` and `ConnectionActor::enable_fragmentation` will share identical behaviour.
</issue_to_address>
### Comment 4
<location> `tests/fragment_transport.rs:155` </location>
<code_context>
+ server.await.expect("server task");
+}
+
+async fn test_fragment_rejection<F>(fragment_mutator: F, rejection_message: &str)
+where
+ F: FnOnce(&mut Vec<Envelope>),
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a rejection test for malformed fragment headers (decode failure), not just reassembly failures
The existing uses of `test_fragment_rejection` cover `Reassembler` errors for out-of-order and duplicate fragments, but they don’t hit the `FragmentProcessError::Decode` branch (where `decode_fragment_payload` fails after seeing `FRAG`). To cover this, you could add a new `test_fragment_rejection` caller that:
- Builds a valid fragment sequence from `fragment_envelope`,
- Corrupts the first fragment so it still starts with `FRAG` but has an invalid/truncated header,
- Verifies, as in the other cases, that the handler never receives a payload.
This would exercise the decode-error path in the connection handling and confirm malformed headers are safely rejected without invoking handlers.
Suggested implementation:
```rust
server.await.expect("server task");
}
#[tokio::test]
async fn fragment_rejection_malformed_fragment_header() {
// This exercises the `FragmentProcessError::Decode` branch by ensuring we have:
// - A payload that will be fragmented
// - A first fragment that still starts with "FRAG" but whose header is truncated/corrupted
// The server should reject the fragments and never deliver a payload to the handler.
test_fragment_rejection(
|fragments| {
let first = fragments
.first_mut()
.expect("fragmenter must produce at least one fragment");
// Corrupt the first fragment so that:
// - It still starts with the "FRAG" marker
// - The header after "FRAG" is invalid/truncated and will fail to decode
//
// Adjust this pattern to match the actual `Envelope` representation in your codebase.
match first {
Envelope::Binary(bytes) => {
// Ensure the fragment still has the "FRAG" prefix
assert!(
bytes.starts_with(b"FRAG"),
"expected first fragment to begin with FRAG"
);
// Keep only "FRAG" plus 1 byte to truncate the rest of the header
let truncated_len = b"FRAG".len() + 1;
if bytes.len() > truncated_len {
bytes.truncate(truncated_len);
} else {
// If it's already very short, pad minimally and then truncate
// to ensure we still start with "FRAG" but have an incomplete header.
while bytes.len() < truncated_len {
bytes.push(0);
}
bytes.truncate(truncated_len);
}
}
// If your Envelope type uses a different variant name/shape for raw fragments,
// update this match arm accordingly.
other => panic!("unexpected fragment envelope variant: {:?}", other),
}
},
"malformed fragment header is rejected",
)
.await;
}
async fn test_fragment_rejection<F>(fragment_mutator: F, rejection_message: &str)
where
F: FnOnce(&mut Vec<Envelope>),
```
To integrate this test cleanly you may need to:
1. Adjust the `Envelope` pattern in the `match first { ... }` block:
- If your `Envelope` enum uses a different variant (e.g. `Envelope::Payload(Vec<u8>)`, `Envelope::Frame(Vec<u8>)`, or a struct with a `bytes`/`payload` field), update the `match` arm to:
- Match the correct variant.
- Get a mutable `Vec<u8>` reference for the raw fragment bytes.
2. Ensure `Envelope` implements `Debug` if it does not already, or remove `{:?}` from the panic to avoid a compile error.
3. If your fragment bytes do not literally start with the ASCII `"FRAG"` prefix, but use a different marker or structured header, adapt the `starts_with(b"FRAG")` assertion and the truncate logic to:
- Preserve whatever prefix your `decode_fragment_payload` uses to detect a fragment.
- Truncate the remainder of the header so that the decoder will fail when parsing it.
4. If there is an existing test naming convention (e.g. `fragment_rejects_malformed_header`), rename the test function to align with that convention.
5. Confirm that `test_fragment_rejection` is `pub(crate)` or in scope for the new test if the file is split into modules; adjust visibility or module paths as needed.
</issue_to_address>
### Comment 5
<location> `tests/fragment_transport.rs:124-125` </location>
<code_context>
+ push::PushQueues,
+};
+
+#[tokio::test]
+async fn connection_actor_fragments_outbound_frames() {
+ let (queues, handle) = PushQueues::<Envelope>::builder()
</code_context>
<issue_to_address>
**suggestion (testing):** Add a round-trip integration test where no fragmentation occurs (payload under fragment cap)
Since we already exercise the fragmented path, it’d be helpful to also cover the “pass-through” path when `payload.len() <= fragment_payload_cap`: keep fragmentation enabled, send a single `Envelope` with a payload below the cap (no manual fragmentation), and assert that the handler receives the original payload and the response is returned unfragmented. This ensures enabling fragmentation doesn’t change behaviour for small messages and exercises the non-fragmenting branch in `FragmentationState::fragment` / the connection actor logic.
</issue_to_address>
### Comment 6
<location> `tests/fragment_transport.rs:100` </location>
<code_context>
+ panic!("response stream ended before reassembly completed");
+}
+
+fn make_app(
+ capacity: usize,
+ config: FragmentationConfig,
</code_context>
<issue_to_address>
**suggestion (testing):** Consider a test that verifies behaviour when fragmentation is explicitly disabled via the public API
Right now `make_app` always sets `fragmentation(Some(config))`, so all integration tests only cover the "fragmentation enabled" path. Since the public API uses `WireframeApp::fragmentation(None)` to disable fragmentation, please add a test that builds an app with `fragmentation(None)` and a suitable `buffer_capacity`, sends a payload that would otherwise be fragmented, and asserts the resulting behaviour (e.g. handler receives the full payload or the connection fails as expected when exceeding the transport budget). This will exercise the opt‑out path and guard the new default/override logic against regressions.
Suggested implementation:
```rust
panic!("response stream ended before reassembly completed");
}
#[tokio::test]
async fn fragmentation_can_be_disabled_via_public_api() {
// Use a modest buffer capacity that comfortably fits the test payload.
let capacity: usize = 1024;
// Channel used by the handler to record the payload it sees.
let (tx, mut rx) = mpsc::unbounded_channel::<Vec<u8>>();
// Handler that mirrors the one used by `make_app`: it forwards the payload
// of the incoming `Envelope` into the `mpsc` channel.
let handler: Handler<Envelope> = {
let tx = tx.clone();
std::sync::Arc::new(move |env: &Envelope| {
let tx = tx.clone();
let payload = env.clone().into_parts().payload();
Box::pin(async move {
tx.send(payload).expect("record payload");
})
})
};
// Build an app via the public API with fragmentation explicitly disabled.
//
// This uses the same buffer capacity that our fragmentation-enabled tests use,
// but opts out of fragmentation by passing `None`. This exercises the
// default/override logic in `WireframeApp::fragmentation`.
let mut app = WireframeApp::builder()
.buffer_capacity(capacity)
.fragmentation(None)
.handler(handler)
.build()
.expect("app should build with fragmentation disabled");
// Construct a payload that would be fragmented when fragmentation is enabled
// (i.e. larger than a typical fragment size, but within the transport buffer).
let payload_size = capacity / 2;
let payload: Vec<u8> = vec![0xAB; payload_size];
// Drive the app with a single request carrying our large payload.
//
// The exact way to construct an `Envelope` and invoke the app should mirror
// the other tests in this module; we assume a simple "request" helper.
let request = Envelope::from_payload(payload.clone());
app.handle(request)
.await
.expect("request should be handled when fragmentation is disabled");
// The handler should have seen the full, unfragmented payload.
let recorded = rx
.recv()
.expect("handler should record exactly one payload");
assert_eq!(
recorded,
payload,
"when fragmentation is disabled, the handler should receive the full payload"
);
}
fn make_app(
capacity: usize,
config: FragmentationConfig,
sender: &mpsc::UnboundedSender<Vec<u8>>,
```
Because we only see part of `tests/fragment_transport.rs`, you will likely need to align a few details with the rest of the file:
1. **App builder API**
- Adjust the construction of the app if the actual API differs, e.g.:
- If the builder is created differently (`WireframeApp::new()` or `WireframeAppBuilder::new()`), update:
```rust
let mut app = WireframeApp::builder()
.buffer_capacity(capacity)
.fragmentation(None)
.handler(handler)
.build()
.expect("app should build with fragmentation disabled");
```
to match the existing pattern used elsewhere in this test module.
2. **Envelope construction**
- Replace:
```rust
let request = Envelope::from_payload(payload.clone());
```
with the actual helper or constructor used in your tests to build an `Envelope` from a payload (e.g. `Envelope::request(...)`, `Envelope::from_bytes(...)`, or a local helper).
3. **Driving the app**
- Replace:
```rust
app.handle(request)
.await
.expect("request should be handled when fragmentation is disabled");
```
with however you normally drive the `WireframeApp` in tests:
- It might be `app.call(request).await`, `app.handle_envelope(request).await`, or via a test harness/transport helper. Use the exact method from the other tests so the fragmentation/transport layer is exercised.
4. **Imports**
- Ensure the module has:
- `use tokio;` / `#[tokio::test]` support.
- `use tokio::sync::mpsc;`
- `use crate::...::{WireframeApp, FragmentationConfig, Envelope, Handler};`
or equivalent imports already present. If not, add them at the top of the file following the existing import style.
5. **Payload size vs transport budget**
- If your transport budget is tied to `buffer_capacity` differently, you may want to tweak:
```rust
let payload_size = capacity / 2;
```
to a value that you know:
- Would be fragmented under the default fragmentation config, but
- Still fits within the underlying transport budget when fragmentation is disabled.
</issue_to_address>
### Comment 7
<location> `tests/connection_fragmentation.rs:13-14` </location>
<code_context>
+ push::PushQueues,
+};
+
+#[tokio::test]
+async fn connection_actor_fragments_outbound_frames() {
+ let (queues, handle) = PushQueues::<Envelope>::builder()
+ .high_capacity(4)
</code_context>
<issue_to_address>
**suggestion (testing):** Add a complementary test to ensure small frames are passed through unfragmented by `ConnectionActor`
This test validates that enabling `ConnectionActor::enable_fragmentation` correctly fragments and reassembles outbound frames. To fully cover the behavior, please also add a test for the non-fragmenting path:
- Enable fragmentation as in this test.
- Use a payload whose length is below `fragment_payload_cap`.
- Run the actor and assert `out` contains exactly one frame and `decode_fragment_payload` for its payload returns `Ok(None)`.
This will exercise the `payload.len() <= fragmenter.max_fragment_size()` branch and confirm small frames are not unnecessarily wrapped when fragmentation is enabled.
Suggested implementation:
```rust
#[tokio::test]
async fn connection_actor_passes_through_small_outbound_frames_unfragmented() {
let (queues, handle) = PushQueues::<Envelope>::builder()
.high_capacity(4)
.low_capacity(4)
.build()
.expect("build queues");
let shutdown = CancellationToken::new();
let mut actor: ConnectionActor<_, ()> =
ConnectionActor::new(queues, handle.clone(), None, shutdown);
// Enable fragmentation with the same configuration used by the
// fragmenting test so we exercise the same code-path with a
// small-enough payload.
let cfg = FragmentationConfig::for_frame_budget(
96,
NonZeroUsize::new(256).expect("non-zero message cap"),
);
actor.enable_fragmentation(cfg);
// Payload that should be strictly below the fragment payload cap so it
// is sent as a single, unfragmented frame even when fragmentation is
// enabled.
let payload = b"small-payload".to_vec();
// Drive the actor with a single outbound payload.
//
// The exact mechanics of how an outbound frame is injected depend on the
// definitions of `Envelope`, `Packet`, and the shape of `PushQueues`.
// The core idea is:
//
// 1. Wrap `payload` into the appropriate `Packet`/`Envelope`.
// 2. Push it into the "outbound" side of the queues that the
// `ConnectionActor` reads from.
// 3. Run the actor until it has produced all outbound frames, collecting
// them into `out`.
//
// The pseudocode below follows the same pattern as the existing
// `connection_actor_fragments_outbound_frames` test and should be
// adapted to the actual queue / envelope APIs if they differ.
// Example outline — align with the existing test:
//
// let packet = Packet::new(payload.clone());
// let envelope = Envelope::from(packet);
//
// // Send the envelope to the actor so it becomes an outbound frame.
// handle.outbound().send(envelope).await.expect("send outbound");
//
// // Run the actor for long enough to process the frame.
// tokio::spawn(async move {
// actor.run().await;
// });
//
// // Collect all outbound frames produced by the actor.
// let mut out = Vec::new();
// while let Some(frame) = handle.next_outbound_frame().await {
// out.push(frame);
// // Stop once we know we received everything this test cares about.
// if out.len() == 1 {
// break;
// }
// }
// For the assertions below we assume we have collected the frames in
// a `Vec` named `out` whose items expose a `payload: Vec<u8>` (or
// equivalent) field. Adjust the destructuring if the frame type differs.
//
// let mut out_iter = out.into_iter();
// let only_frame = out_iter.next().expect("exactly one outbound frame");
// assert!(
// out_iter.next().is_none(),
// "expected exactly one outbound frame when payload is below fragment cap"
// );
//
// Decode the payload as a potential fragment; for an unfragmented
// payload this must return `Ok(None)`.
//
// match decode_fragment_payload(&only_frame.payload) {
// Ok(None) => {}
// other => panic!(
// "expected small outbound frame to be unfragmented, got {:?}",
// other
// ),
// }
}
#[tokio::test]
async fn connection_actor_fragments_outbound_frames() {
```
To fully implement and compile this test, you’ll need to:
1. **Mirror the wiring used in the existing fragmenting test**
- Look at how `connection_actor_fragments_outbound_frames`:
- Wraps a `Vec<u8>` into a `Packet` and then an `Envelope` (or equivalent).
- Sends that envelope into the queues/handle so `ConnectionActor` treats it as an outbound frame.
- Runs the actor (e.g. via `actor.run().await` or `tokio::spawn(actor.run())`).
- Collects produced outbound frames into a `Vec` (here referenced as `out`).
- Replace the commented “Example outline” section in the new test with the *same concrete code-path* you use in the existing test (only differing in the payload size and the assertions).
2. **Provide access to the outbound frames**
- If the existing test already gets “frames” from some `handle` API (e.g. `handle.outbound_frames().collect().await` or similar), reuse that exact mechanism in the new test to build `out: Vec<FrameType>`.
- Ensure each item in `out` exposes the raw payload as a `&[u8]` or `Vec<u8>`; if the field is not named `payload`, adjust the call to `decode_fragment_payload` accordingly, e.g.:
- `decode_fragment_payload(only_frame.data())`
- or `decode_fragment_payload(&only_frame.payload_bytes)`
3. **Ensure the small payload stays below the fragment cap**
- If `FragmentationConfig` exposes the effective maximum payload (e.g. `cfg.fragment_payload_cap()` or `cfg.max_fragment_size()`), prefer computing the payload size dynamically:
```rust
let max = cfg.fragment_payload_cap(); // or cfg.max_fragment_size()
let payload = vec![0u8; max.saturating_sub(1)];
```
- If not, keeping `b"small-payload".to_vec()` is fine as long as it’s smaller than the payload you use in the fragmenting test and definitely below the threshold that causes fragmentation.
4. **Keep both tests consistent**
- After wiring the new test, confirm that:
- The existing `connection_actor_fragments_outbound_frames` still asserts that multiple frames/fragments are produced and `decode_fragment_payload` reconstructs the original payload.
- The new `connection_actor_passes_through_small_outbound_frames_unfragmented` asserts exactly one frame and `decode_fragment_payload` returns `Ok(None)` for that frame’s payload.
</issue_to_address>
### Comment 8
<location> `docs/users-guide.md:318-319` </location>
<code_context>
-```rust
-use std::sync::Arc;
+Fragmented messages enforce two guards: `max_message_size` caps the total
+reassembled payload and `reassembly_timeout` evicts stale partial messages.
+Customise or disable fragmentation via the builder:
</code_context>
<issue_to_address>
**nitpick (typo):** Consider standardizing the spelling of “reassembled” across the docs.
In this file you use “reassembled” (no hyphen), while related docs (e.g., fragmentation design and hardening) use “re-assembled”. Please pick one form—preferably “reassembled”—and apply it consistently across those docs.
Suggested implementation:
```
The connection reassembles fragments before invoking handlers, so
handlers continue to work with complete `Envelope` values.[^6]
```
To fully implement your suggestion, update all related docs (e.g., fragmentation design and hardening docs) to use the same spelling:
1. Search for `re-assembled` and replace it with `reassembled`.
2. Search for `re-assembles` and replace it with `reassembles`.
Apply these changes across:
- The fragmentation design document.
- Any hardening / security docs that describe fragmentation.
- Any other markdown/docs where fragmentation and message reassembly are discussed.
This will ensure consistent usage of “reassembled” throughout the documentation set.
</issue_to_address>
### Comment 9
<location> `docs/hardening-wireframe-a-guide-to-production-resilience.md:315-316` </location>
<code_context>
assemblies, which are purged if they are not completed within the time limit
(e.g., 30 seconds).
+`WireframeApp` enables these guards by default, deriving the fragment payload
+cap from `buffer_capacity`, capping re-assembled messages at 16× that budget,
+and evicting stale assemblies after 30 seconds. Applications can override or
+disable the defaults via `fragmentation(...)`.
</code_context>
<issue_to_address>
**nitpick (typo):** Minor spelling consistency: “re-assembled” vs “reassembled”.
Here you use “re-assembled”, while other docs (e.g., the user guide) use “reassembled”. Please pick one form and apply it consistently across the documentation.
Suggested implementation:
```
`WireframeApp` enables these guards by default, deriving the fragment payload
cap from `buffer_capacity`, capping reassembled messages at 16× that budget,
and evicting stale assemblies after 30 seconds. Applications can override or
disable the defaults via `fragmentation(...)`.
```
To fully apply the consistency suggestion, you should:
1. Search this file (and the rest of the documentation set) for variants like `re-assembled`, `re-assembly`, or `re-assembling`.
2. Normalize them to `reassembled`, `reassembly`, and `reassembling` respectively, matching the form already used in the user guide.
</issue_to_address>
### Comment 10
<location> `src/app/connection.rs:65` </location>
<code_context>
+ }
+ }
+
+ fn fragment<E: Packet>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
+ let parts = packet.into_parts();
+ let id = parts.id();
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the new fragmentation and decoding flow by removing redundant non-fragment branches, merging envelope decoding and reassembly into one helper, and centralizing fragmentation logic reuse.
You can reduce the added complexity in a few focused places without changing behavior.
### 1. Remove the double “non-fragment” path in `FragmentationState::fragment`
You currently special‑case the “small enough” payload twice:
```rust
if payload.len() <= self.fragmenter.max_fragment_size().get() {
return Ok(vec![E::from_parts(PacketParts::new(
id,
correlation,
payload,
))]);
}
let batch = self.fragmenter.fragment_bytes(&payload)?;
if !batch.is_fragmented() {
return Ok(vec![E::from_parts(PacketParts::new(
id,
correlation,
payload,
))]);
}
```
You can rely on `fragment_bytes` returning a non‑fragmented batch and remove one branch:
```rust
fn fragment<E: Packet>(&self, packet: E) -> Result<Vec<E>, FragmentationError> {
let parts = packet.into_parts();
let id = parts.id();
let correlation = parts.correlation_id();
let payload = parts.payload();
let batch = self.fragmenter.fragment_bytes(&payload)?;
if !batch.is_fragmented() {
return Ok(vec![E::from_parts(PacketParts::new(
id,
correlation,
payload,
))]);
}
let mut frames = Vec::with_capacity(batch.len());
for fragment in batch {
let (header, payload) = fragment.into_parts();
let encoded = encode_fragment_payload(header, &payload)?;
frames.push(E::from_parts(PacketParts::new(id, correlation, encoded)));
}
Ok(frames)
}
```
This keeps semantics identical but removes a redundant condition and branch.
---
### 2. Collapse `decode_envelope` + `reassemble_if_needed` into a single helper
`handle_frame` currently has to orchestrate both helpers and track `deser_failures` across them:
```rust
let Some(env) = self.decode_envelope(frame, deser_failures)? else {
return Ok(());
};
let Some(env) = Self::reassemble_if_needed(fragmentation, deser_failures, env) else {
return Ok(());
};
```
You can encapsulate all decoding + reassembly + deserialization error bookkeeping in one linear helper. This reduces cross‑cutting concerns (metrics and `deser_failures`) across two functions and one enum.
Example restructuring:
```rust
fn decode_and_reassemble(
&self,
frame: &[u8],
deser_failures: &mut u32,
fragmentation: &mut Option<FragmentationState>,
) -> io::Result<Option<Envelope>> {
// parse
let env = match self.parse_envelope(frame) {
Ok((env, _)) => {
*deser_failures = 0;
env
}
Err(EnvelopeDecodeError::Parse(e)) | Err(EnvelopeDecodeError::Deserialize(e)) => {
*deser_failures += 1;
warn!(
"failed to {} message: correlation_id={:?}, error={e:?}",
match e {
EnvelopeDecodeError::Parse(_) => "parse",
EnvelopeDecodeError::Deserialize(_) => "deserialize",
},
None::<u64>,
);
crate::metrics::inc_deser_errors();
if *deser_failures >= MAX_DESER_FAILURES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"too many deserialization failures",
));
}
return Ok(None);
}
};
// optional reassembly
if let Some(state) = fragmentation.as_mut() {
match state.reassemble(env) {
Ok(Some(env)) => Ok(Some(env)),
Ok(None) => Ok(None),
Err(FragmentProcessError::Decode(err)) => {
*deser_failures += 1;
warn!(
"failed to decode fragment header: correlation_id={:?}, error={err:?}",
None::<u64>
);
crate::metrics::inc_deser_errors();
Ok(None)
}
Err(FragmentProcessError::Reassembly(err)) => {
*deser_failures += 1;
warn!(
"fragment reassembly failed: correlation_id={:?}, error={err:?}",
None::<u64>
);
crate::metrics::inc_deser_errors();
Ok(None)
}
}
} else {
Ok(Some(env))
}
}
```
Then `handle_frame` becomes simpler and easier to read:
```rust
async fn handle_frame<W>(
&self,
framed: &mut Framed<W, LengthDelimitedCodec>,
frame: &[u8],
deser_failures: &mut u32,
routes: &HashMap<u32, HandlerService<E>>,
fragmentation: &mut Option<FragmentationState>,
) -> io::Result<()>
where
W: AsyncRead + AsyncWrite + Unpin,
{
crate::metrics::inc_frames(crate::metrics::Direction::Inbound);
let Some(env) = self.decode_and_reassemble(frame, deser_failures, fragmentation)? else {
return Ok(());
};
if let Some(service) = routes.get(&env.id) {
self.forward_response(env, service, framed, fragmentation).await?;
} else {
warn!(
"no handler for message id: id={}, correlation_id={:?}",
env.id, env.correlation_id
);
}
Ok(())
}
```
This keeps all current logging, metrics, and `deser_failures` semantics, but reduces nesting and the need to mentally track state across multiple helpers.
---
### 3. Reuse fragmentation logic across call sites
You already have `FragmentationState::fragment` returning `Vec<E>`. Anywhere else you’re doing:
- extract `PacketParts`
- check size / fragment
- encode via `encode_fragment_payload`
- rebuild `Envelope`s
can now just call `state.fragment(...)`. For example, in `forward_response` you already do this:
```rust
let responses = if let Some(state) = fragmentation.as_mut() {
match state.fragment(Envelope::from_parts(parts)) {
Ok(fragmented) => fragmented,
Err(err) => { /* ... */ }
}
} else {
vec![Envelope::from_parts(parts)]
};
```
If you have similar logic in `ConnectionActor::enable_fragmentation` or other places, replace it with the same call:
```rust
let responses = state.fragment(envelope)?;
```
This centralizes fragmentation semantics in one place (`FragmentationState::fragment`), so changes to framing, headers, or limits don’t need to be kept in sync across multiple, slightly diverging implementations.
</issue_to_address>
### Comment 11
<location> `src/connection.rs:63` </location>
<code_context>
session::ConnectionId,
};
+type OutboundFragmenter<F> = Arc<dyn Fn(F) -> Result<Vec<F>, FragmentationError> + Send + Sync>;
+
/// Events returned by [`next_event`].
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the outbound fragmentation path by storing a concrete `Fragmenter`, sharing the fragmentation helper logic, and inlining the send pipeline into a single function.
You can keep the new functionality while reducing complexity and duplication by:
1. **Replacing `Option<Arc<dyn Fn>>` with a concrete fragmenter type**, and
2. **Extracting the duplicated fragmentation logic into a shared helper**, and
3. **Flattening `process_frame_with_hooks_and_metrics` into a single linear pipeline.**
### 1. Store a concrete fragmenter instead of `Arc<dyn Fn>`
You don’t need dynamic dispatch here; the actor can own a concrete fragmenter and call a helper function.
```rust
// Before
type OutboundFragmenter<F> = Arc<dyn Fn(F) -> Result<Vec<F>, FragmentationError> + Send + Sync>;
struct ConnectionActor<F, E> {
// ...
fragmentation: Option<OutboundFragmenter<F>>,
// ...
}
```
Change to something like:
```rust
use crate::fragment::Fragmenter;
struct ConnectionActor<F, E> {
// ...
outbound_fragmenter: Option<Fragmenter>,
// ...
}
```
And in `enable_fragmentation` you only construct and store the `Fragmenter`:
```rust
pub fn enable_fragmentation(&mut self, config: FragmentationConfig)
where
F: Packet,
{
self.outbound_fragmenter = Some(Fragmenter::new(config.fragment_payload_cap));
}
```
No `Arc`, no `dyn Fn`, no extra closure layer.
### 2. Share the fragmentation logic with `FragmentationState::fragment`
The closure in `enable_fragmentation` is almost a copy of the logic in `FragmentationState::fragment`. Extract that into a shared helper in `crate::fragment` (or wherever `Fragmenter` lives), generic over `Packet`:
```rust
// e.g. in crate::fragment (or a small shared module)
use crate::app::{Packet, PacketParts};
pub fn fragment_packet<F>(
fragmenter: &Fragmenter,
frame: F,
) -> Result<Vec<F>, FragmentationError>
where
F: Packet,
{
let parts = frame.into_parts();
let id = parts.id();
let correlation = parts.correlation_id();
let payload = parts.payload();
if payload.len() <= fragmenter.max_fragment_size().get() {
return Ok(vec![F::from_parts(PacketParts::new(id, correlation, payload))]);
}
let batch = fragmenter.fragment_bytes(&payload)?;
if !batch.is_fragmented() {
return Ok(vec![F::from_parts(PacketParts::new(id, correlation, payload))]);
}
let mut frames = Vec::with_capacity(batch.len());
for fragment in batch {
let (header, payload) = fragment.into_parts();
let encoded = encode_fragment_payload(header, &payload)?;
frames.push(F::from_parts(PacketParts::new(id, correlation, encoded)));
}
Ok(frames)
}
```
Then both `FragmentationState::fragment` and `ConnectionActor` can call this instead of duplicating the sequence.
In `ConnectionActor`:
```rust
use crate::fragment::fragment_packet;
fn process_frame_with_hooks_and_metrics(&mut self, frame: F, out: &mut Vec<F>)
where
F: Packet,
{
let frames_result = if let Some(fragmenter) = &self.outbound_fragmenter {
fragment_packet(fragmenter, frame)
} else {
Ok(vec![frame])
};
let frames = match frames_result {
Ok(frames) => frames,
Err(err) => {
warn!(
"failed to fragment frame: connection_id={:?}, peer={:?}, error={err:?}",
self.connection_id, self.peer_addr,
);
crate::metrics::inc_handler_errors();
return;
}
};
for mut frame in frames {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}
}
```
### 3. Remove `push_frame` and flatten the send pipeline
With the above, `process_frame_with_hooks_and_metrics` becomes a single, linear path:
- Decide which frames to send (fragmented or not),
- Then in one loop apply hooks and metrics and push.
`push_frame` becomes unnecessary and can be removed:
```rust
// Before
fn process_frame_with_hooks_and_metrics(&mut self, frame: F, out: &mut Vec<F>) {
if let Some(fragment) = &self.fragmentation {
match fragment(frame) {
Ok(frames) => {
for frame in frames {
self.push_frame(frame, out);
}
}
Err(err) => { /* ... */ }
}
} else {
self.push_frame(frame, out);
}
}
fn push_frame(&mut self, frame: F, out: &mut Vec<F>) {
let mut frame = frame;
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}
```
```rust
// After (using fragment_packet and concrete Fragmenter)
fn process_frame_with_hooks_and_metrics(&mut self, frame: F, out: &mut Vec<F>)
where
F: Packet,
{
let frames_result = if let Some(fragmenter) = &self.outbound_fragmenter {
fragment_packet(fragmenter, frame)
} else {
Ok(vec![frame])
};
let frames = match frames_result {
Ok(frames) => frames,
Err(err) => {
warn!(
"failed to fragment frame: connection_id={:?}, peer={:?}, error={err:?}",
self.connection_id, self.peer_addr,
);
crate::metrics::inc_handler_errors();
return;
}
};
for mut frame in frames {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
}
}
```
This keeps all existing behavior (including metrics and error handling) but:
- Removes the `Arc<dyn Fn>` indirection,
- Deduplicates the fragmentation algorithm,
- And keeps the main send path easy to follow in one function.
</issue_to_address>
### Comment 12
<location> `src/connection.rs:333` </location>
<code_context>
+ let id = parts.id();
+ let correlation = parts.correlation_id();
+ let payload = parts.payload();
+ if payload.len() <= fragmenter.max_fragment_size().get() {
+ return Ok(vec![F::from_parts(PacketParts::new(
+ id,
</code_context>
<issue_to_address>
**issue (review_instructions):** Refactor the outbound fragmentation closure to reuse the same fragmentation logic as `FragmentationState::fragment` instead of duplicating it.
Refactor this closure to avoid duplicating the fragmentation logic already implemented in `FragmentationState::fragment` in `src/app/connection.rs`. The two code paths are effectively identical (same `into_parts` extraction, small-payload fast path, `fragment_bytes` call, and loop encoding fragments), which violates the DRY requirement. Extract a shared helper (e.g., a generic function over `Packet`) or reuse `FragmentationState` for outbound traffic so that both inbound and outbound fragmentation rely on a single implementation while staying readable.
<details>
<summary>Review instructions:</summary>
**Path patterns:** `**/*`
**Instructions:**
Keep code DRY, but readable. Use refactoring approaches best suited for the language in question.
</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
…odule The packet fragmentation code in src/app/connection.rs has been moved to a new helper function fragment_packet in a new module src/app/fragment_utils.rs. This consolidates fragmentation logic, improves modularity, and simplifies the FragmentationState::fragment method to delegate to the shared utility. Also updated imports and module declarations to reflect the new fragment_utils module. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/connection.rs (1)
5-31: Fix formatting:cargo fmtcheck failed.The pipeline reports a formatting issue. Run
cargo fmtto resolve.#!/bin/bash # Verify formatting status cargo fmt --check -- src/app/connection.rs 2>&1 | head -20
♻️ Duplicate comments (6)
docs/hardening-wireframe-a-guide-to-production-resilience.md (1)
315-318: Standardise “reassembled” spelling without a hyphenReplace “re-assembled” with “reassembled” on Line 316 and normalise related forms (“re-assembly”, “re-assembling”) to “reassembly” / “reassembling” in this document and the wider docs set to keep terminology consistent with the user guide and previous feedback.
docs/users-guide.md (1)
311-321: Normalise hyphenation and -ize spellings in fragmentation textChange “re-assembles fragments” on Line 315 to “reassembles fragments” to keep “reassembled” unhyphenated across the docs set. Replace “Customise or disable fragmentation” on Line 320 with “Customize or disable fragmentation” to align with the mandated Oxford
-izespellings.tests/connection_fragmentation.rs (1)
13-60: Add complementary test for the non-fragmenting pathKeep this test for the fragmenting path, and add a second test that enables the same
FragmentationConfigbut uses a payload strictly below the effective fragment payload cap, then asserts:
- Exactly one outbound frame is produced.
decode_fragment_payloadon that frame’s payload returnsOk(None).Reuse the same wiring as here (queues, actor, run, collection) so both fragmenting and pass-through behaviours are exercised under
ConnectionActor::enable_fragmentation, matching earlier review feedback.src/app/connection.rs (2)
351-382: Capture and log the actualcorrelation_idfrom the envelope.The envelope's
correlation_idis available beforestate.reassemble(env)consumes it. Capture it beforehand and use it in the warning logs instead ofNone::<u64>.fn reassemble_if_needed( fragmentation: &mut Option<FragmentationState>, deser_failures: &mut u32, env: Envelope, ) -> Option<Envelope> { if let Some(state) = fragmentation.as_mut() { + let correlation_id = env.correlation_id; match state.reassemble(env) { Ok(Some(env)) => Some(env), Ok(None) => None, Err(FragmentProcessError::Decode(err)) => { *deser_failures += 1; warn!( "failed to decode fragment header: correlation_id={:?}, error={err:?}", - None::<u64> + correlation_id ); crate::metrics::inc_deser_errors(); None } Err(FragmentProcessError::Reassembly(err)) => { *deser_failures += 1; warn!( "fragment reassembly failed: correlation_id={:?}, error={err:?}", - None::<u64> + correlation_id ); crate::metrics::inc_deser_errors(); None } } } else { Some(env) } }
360-377: EnforceMAX_DESER_FAILURESthreshold for fragment errors.Fragment decode/reassembly errors increment
deser_failuresbut never check againstMAX_DESER_FAILURES. This allows a peer to send unlimited malformed fragments without disconnection, inconsistent with envelope deserialization handling.Return an error or propagate a signal when
*deser_failures >= MAX_DESER_FAILURESafter incrementing, mirroringdecode_envelope's behaviour.src/connection.rs (1)
317-358: Reusefragment_packethelper instead of duplicating fragmentation logic.The closure in
enable_fragmentationduplicates the logic already extracted intofragment_packetinsrc/app/fragment_utils.rs. This violates DRY and risks the two implementations diverging.pub fn enable_fragmentation(&mut self, config: FragmentationConfig) where F: Packet, { let fragmenter = Arc::new(Fragmenter::new(config.fragment_payload_cap)); - self.fragmentation = Some(Arc::new(move |frame: F| { - let parts = frame.into_parts(); - let id = parts.id(); - let correlation = parts.correlation_id(); - let payload = parts.payload(); - if payload.len() <= fragmenter.max_fragment_size().get() { - return Ok(vec![F::from_parts(PacketParts::new( - id, - correlation, - payload, - ))]); - } - - let batch = fragmenter.fragment_bytes(&payload)?; - if !batch.is_fragmented() { - return Ok(vec![F::from_parts(PacketParts::new( - id, - correlation, - payload, - ))]); - } - - let mut frames = Vec::with_capacity(batch.len()); - for fragment in batch { - let (header, payload) = fragment.into_parts(); - let encoded = encode_fragment_payload(header, &payload)?; - frames.push(F::from_parts(PacketParts::new(id, correlation, encoded))); - } - Ok(frames) - })); + self.fragmentation = Some(Arc::new(move |frame: F| { + fragment_packet(&fragmenter, frame) + })); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (15)
docs/generic-message-fragmentation-and-re-assembly-design.md(1 hunks)docs/hardening-wireframe-a-guide-to-production-resilience.md(1 hunks)docs/roadmap.md(1 hunks)docs/users-guide.md(1 hunks)src/app/builder.rs(9 hunks)src/app/connection.rs(10 hunks)src/app/fragment_utils.rs(1 hunks)src/app/mod.rs(1 hunks)src/connection.rs(6 hunks)src/fragment/config.rs(1 hunks)src/fragment/mod.rs(1 hunks)src/fragment/payload.rs(1 hunks)src/lib.rs(1 hunks)tests/connection_fragmentation.rs(1 hunks)tests/fragment_transport.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Runmake check-fmt,make lint, andmake testbefore committing.make check-fmtexecutescargo fmt --workspace -- --check.make lintexecutescargo clippy --workspace --all-targets --all-features -- -D warnings.make testexecutescargo test --workspace.
Clippy warnings MUST be disallowed in Rust code.
Fix any warnings emitted during Rust tests in the code itself rather than silencing them.
Extract meaningfully named helper functions adhering to separation of concerns and CQRS when a Rust function is too long.
Group related parameters in meaningfully named structs when a Rust function has too many parameters.
Consider usingArcto reduce the amount of data returned when a Rust function is returning a large error.
Write unit and behavioural tests for new Rust functionality. Run both before and after making any change.
Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public APIs in Rust using Rustdoc comments (///) so documentation can be generated withcargo doc.
Prefer immutable data in Rust and avoid unnecessarymutbindings.
Handle errors with theResulttype in Rust instead of panicking where feasible.
Avoidunsafecode in Rust unless absolutely necessary and document any usage clearly.
Place function attributes after doc comments in Rust.
Do not usereturnin single-line Rust functions.
Use predicate functions for Rust conditional criteria with more than two branches.
Lints must not be silenced in Rust except as a last resort. Lint rule suppressions must be tightly scoped and include a clear reason.
Preferexpectoverallowin Rust.
Userstestfixtures for shared test setup in Rust.
Replace duplicated Rust tests with#[rstest(...)]parameterised cases.
Prefermockallfor mocks/stubs in Rust.
Prefer.expect()over.unwrap()in Rust.
Useconcat!()to combine long string literals in Rust rather than escaping newlines with a bac...
Files:
src/app/mod.rstests/connection_fragmentation.rssrc/app/fragment_utils.rssrc/fragment/mod.rssrc/fragment/config.rssrc/fragment/payload.rssrc/lib.rssrc/app/connection.rssrc/app/builder.rssrc/connection.rstests/fragment_transport.rs
⚙️ CodeRabbit configuration file
**/*.rs: * Seek to keep the cognitive complexity of functions no more than 9.
- Adhere to single responsibility and CQRS
- Place function attributes after doc comments.
- Do not use
returnin single-line functions.- Move conditionals with >2 branches into a predicate function.
- Avoid
unsafeunless absolutely necessary.- Every module must begin with a
//!doc comment that explains the module's purpose and utility.- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.- Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
- Use
rstestfixtures for shared setup and to avoid repetition between tests.- Replace duplicated tests with
#[rstest(...)]parameterised cases.- Prefer
mockallfor mocks/stubs.- Prefer
.expect()over.unwrap()in tests..expect()and.unwrap()are forbidden outside of tests. Errors must be propagated.- Ensure that any API or behavioural changes are reflected in the documentation in
docs/- Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/- Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
- Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such
- For testing of functionality depending upon environment variables, dependency injection and...
Files:
src/app/mod.rstests/connection_fragmentation.rssrc/app/fragment_utils.rssrc/fragment/mod.rssrc/fragment/config.rssrc/fragment/payload.rssrc/lib.rssrc/app/connection.rssrc/app/builder.rssrc/connection.rstests/fragment_transport.rs
docs/**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
Documentation must use en-GB-oxendict ("-ize" / "-yse" / "-our") spelling and grammar, with the exception of the naming of the "LICENSE" file.
Follow the documentation style guide conventions when writing project documentation
docs/**/*.md: Use British English based on the Oxford English Dictionary (en-GB-oxendict), including: -ize suffix in words like 'realize' and 'organization', -lyse suffix in words like 'analyse', -our suffix in words like 'colour', -re suffix in words like 'centre', double 'l' in words like 'cancelled', maintain 'e' in words like 'likeable', -ogue suffix in words like 'catalogue'
The word 'outwith' is acceptable in British English documentation
Keep US spelling when used in an API (for example, 'color')
Use the Oxford comma in documentation: 'ships, planes, and hovercraft' where it aids comprehension
Treat company names as collective nouns: 'Lille Industries are expanding'
Write headings in sentence case in documentation
Use Markdown headings (#, ##, ###, and so on) in order without skipping levels
Follow markdownlint recommendations for Markdown files
Always provide a language identifier for fenced code blocks in documentation; use 'plaintext' for non-code text
Use '-' as the first level bullet and renumber lists when items change in documentation
Prefer inline links using 'text' or angle brackets around the URL in documentation
Ensure blank lines before and after bulleted lists and fenced blocks in documentation
Ensure tables have a delimiter line below the header row in documentation
Expand any uncommon acronym on first use in documentation (for example, Continuous Integration (CI))
Wrap paragraphs at 80 columns in documentation
Wrap code at 120 columns in documentation
Do not wrap tables in documentation
Use footnotes referenced with '[^label]' in documentation
Include Mermaid diagrams where they add clarity to documentation
When embedding figures in documentation, use '' and provide brief alt text desc...
Files:
docs/hardening-wireframe-a-guide-to-production-resilience.mddocs/roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.md
**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
**/*.md: Validate Markdown files usingmake markdownlint.
Runmake fmtafter any documentation changes to format all Markdown files and fix table markup.
Validate Mermaid diagrams in Markdown files by runningmake nixie.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown must be wrapped at 120 columns.
Tables and headings in Markdown must not be wrapped.
Use dashes (-) for list bullets in Markdown.
Use GitHub-flavoured Markdown footnotes ([^1]) for references and footnotes.
Files:
docs/hardening-wireframe-a-guide-to-production-resilience.mddocs/roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.md
⚙️ CodeRabbit configuration file
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/hardening-wireframe-a-guide-to-production-resilience.mddocs/roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.md
docs/**/*.{rs,md}
📄 CodeRabbit inference engine (docs/rust-doctest-dry-guide.md)
Every doctest should validate the public API of a crate from the perspective of an external user, treating each documentation test as a separate temporary crate that imports the library as an external dependency
Files:
docs/hardening-wireframe-a-guide-to-production-resilience.mddocs/roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.md
docs/{**/*.md,**/*.rs}
📄 CodeRabbit inference engine (docs/wireframe-1-0-detailed-development-roadmap.md)
Remove the proposed
FrameSinkfrom the design and documentasync-streamas the canonical way to create streams imperatively forResponse::Streamhandling
Files:
docs/hardening-wireframe-a-guide-to-production-resilience.mddocs/roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.md
🧬 Code graph analysis (9)
src/app/fragment_utils.rs (5)
src/app/connection.rs (1)
fragment(66-68)src/fragment/payload.rs (1)
encode_fragment_payload(49-67)tests/multi_packet_streaming.rs (1)
parts(91-91)tests/worlds/fragment/mod.rs (1)
batch(128-130)src/fragment/fragmenter.rs (1)
header(157-157)
src/fragment/mod.rs (1)
src/fragment/payload.rs (3)
decode_fragment_payload(79-113)encode_fragment_payload(49-67)fragment_overhead(25-34)
src/fragment/config.rs (2)
src/fragment/payload.rs (5)
std(32-32)std(60-60)std(82-82)std(93-93)fragment_overhead(25-34)src/app/connection.rs (1)
new(59-64)
src/fragment/payload.rs (1)
src/fragment/index.rs (1)
zero(34-34)
src/lib.rs (1)
src/fragment/payload.rs (3)
decode_fragment_payload(79-113)encode_fragment_payload(49-67)fragment_overhead(25-34)
src/app/connection.rs (4)
src/app/builder.rs (3)
default_fragmentation(38-44)new(159-159)fragmentation(385-388)src/app/fragment_utils.rs (1)
fragment_packet(11-36)src/fragment/payload.rs (2)
decode_fragment_payload(79-113)encode_fragment_payload(49-67)src/app/envelope.rs (12)
new(83-89)new(115-121)id(49-49)id(94-94)id(124-124)payload(130-130)from_parts(58-58)from_parts(101-101)correlation_id(52-52)correlation_id(97-97)correlation_id(105-105)correlation_id(127-127)
src/app/builder.rs (1)
src/fragment/config.rs (1)
for_frame_budget(33-51)
src/connection.rs (4)
src/app/fragment_utils.rs (1)
fragment_packet(11-36)src/app/connection.rs (2)
fragment(66-68)new(59-64)src/fragment/fragmenter.rs (1)
header(157-157)src/fragment/payload.rs (1)
encode_fragment_payload(49-67)
tests/fragment_transport.rs (3)
src/fragment/payload.rs (2)
decode_fragment_payload(79-113)encode_fragment_payload(49-67)src/fragment/config.rs (1)
for_frame_budget(33-51)src/fragment/fragmenter.rs (2)
header(157-157)fragments(190-190)
🪛 GitHub Actions: CI
src/app/mod.rs
[error] 11-11: cargo fmt check failed. Formatting differences detected (diff in module declarations). Run 'cargo fmt' to format.
src/app/connection.rs
[error] 13-13: cargo fmt check failed. Formatting differences detected (diff in code). Run 'cargo fmt' to format.
🪛 LanguageTool
docs/hardening-wireframe-a-guide-to-production-resilience.md
[style] ~318-~318: Consider using the typographical ellipsis character here instead.
Context: ...an override or disable the defaults via fragmentation(...). ## 5. Advanced Resilience Patterns ...
(ELLIPSIS)
docs/users-guide.md
[uncategorized] ~319-~319: Possible missing comma found.
Context: ...essage_sizecaps the total reassembled payload andreassembly_timeout` evicts stale p...
(AI_HYDRA_LEO_MISSING_COMMA)
[style] ~319-~319: Would you like to use the Oxford spelling “Customize”? The spelling ‘Customise’ is also correct.
Context: ...timeout` evicts stale partial messages. Customise or disable fragmentation via the builde...
(OXFORD_SPELLING_Z_NOT_S)
[uncategorized] ~340-~340: Possible missing comma found.
Context: ...en the transport already supports large frames or when fragmentation should be deferre...
(AI_HYDRA_LEO_MISSING_COMMA)
🔍 Remote MCP
Based on the context I've gathered, let me provide a comprehensive summary relevant to your PR review:
Summary of Additional Context for PR #399 Review
Architectural Foundation
The Wireframe framework implements transport-level fragmentation and reassembly through a pluggable FragmentStrategy trait and FragmentAdapter middleware to ensure application-level code remains unaware of fragmentation, handling only complete logical messages. This PR implements the concrete wire encoding layer for that design with a FRAG marker-based format.
Fragment Metadata Structure
The framework tracks fragment metadata including msg_id (message identifier for multiplexing), seq (sequence number for ordering), is_final (completion flag), payload_len (fragment size), and total_message_len (reassembly pre-allocation hint). Your PR's FragmentHeader struct with message_id, fragment_index, and is_last_fragment maps directly to these concepts.
Test Coverage Requirements
The fragmentation test suite must cover byte-for-byte accuracy in reassembly, correctness of fragment generation, DoS protection against memory exhaustion, handling of out-of-order fragments, and multiplexed reassembly with interleaved fragments. Your tests in fragment_transport.rs appear to address these scenarios, though the reviewer noted duplication that should be refactored.
Safety Guardrails
The max_message_size is a strict limit triggering connection termination on overflow, and reassembly_timeout automatically purges incomplete messages to prevent attacks where incomplete fragments accumulate. Your PR's defaults of 16× buffer capacity cap and 30-second timeout align with this design.
Design Trade-offs
Your implementation diverges from the documented FragmentStrategy trait by using a fixed wire format with FRAG markers and bincode encoding, rather than a pluggable strategy. The FragmentStrategy approach is designed to be protocol-agnostic and kept independent of any specific wire format, but your PR hardcodes the wire representation. This simplification is reasonable for MVP but may impact future extensibility to support different protocols.
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Sourcery review
🔇 Additional comments (20)
docs/roadmap.md (1)
272-272: Roadmap progression accurately tracked for Phase 7.All four completed items in Phase 7 are correctly marked with [x] checkmarks and properly formatted within the 80-column wrapping limit. The changes accurately reflect the fragmentation and reassembly implementation delivered in this PR:
- Integration of F&R layer into the Connection actor's read/write paths is complete.
- Transparency to handler functions is ensured (handlers send/receive complete Message objects).
- Integration tests covering large message fragmentation are implemented.
- Edge cases (out-of-order, duplicates, timeouts) are tested.
The section structure, British English spelling, and Markdown conventions (dashes for bullets, blank lines, footnote formatting) are all consistent with the documentation style guide.
Also applies to: 274-275, 281-282, 284-285
src/fragment/mod.rs (1)
8-9: Expose fragmentation config and payload helpers via the fragment moduleKeep
configandpayloadas public submodules and re-exportFragmentationConfig,FRAGMENT_MAGIC,encode_fragment_payload,decode_fragment_payload, andfragment_overheadhere. This gives external users a single, coherent import surface atwireframe::fragment::*rather than reaching into submodules directly.Also applies to: 14-15, 18-29
src/lib.rs (1)
32-53: Keep top-level fragmentation re-exports as the primary public surfaceRetain the new crate-level re-exports for
FRAGMENT_MAGIC,FragmentationConfig,encode_fragment_payload,decode_fragment_payload, andfragment_overheadso external code can depend onwireframe’s stable public API without importing from internal modules.src/app/mod.rs (1)
11-15: Module declaration formatting is already correct—no action needed.The working tree is clean and the recent commit that added
pub mod fragment_utils;has already been properly formatted. The CI workflow includes a format check step (make check-fmt) that validates all code, and there are no outstanding diffs. The module declarations at lines 11–15 are in their final, correctly formatted state.src/app/fragment_utils.rs (1)
1-36: LGTM!Clean helper extraction that centralises the fragmentation logic. The module doc comment, Rustdoc on the public function, and immutable-first approach all align with guidelines.
src/app/builder.rs (2)
35-44: LGTM!The
default_fragmentationhelper correctly handles edge cases where capacity is zero or multiplication would overflow, returningNonein both scenarios.
380-388: LGTM!The
fragmentation()setter provides a clean override mechanism. Documentation clearly indicates thatNonedisables fragmentation.src/app/connection.rs (2)
47-97: LGTM!
FragmentationStatecleanly bundles fragmenter and reassembler. Thefragment()method now delegates to the sharedfragment_packethelper, addressing the previous duplication concern.
384-452: LGTM!The
forward_responsemethod correctly integrates outbound fragmentation. Error handling logs warnings, increments metrics, and gracefully continues without crashing the connection.src/connection.rs (2)
697-702: LGTM!The
push_framehelper cleanly encapsulates hook application and metrics emission, reducing duplication withinprocess_frame_with_hooks_and_metrics.
676-695: LGTM!The fragmentation integration in
process_frame_with_hooks_and_metricscorrectly handles the optional fragmenter, logs errors with connection context, and increments metrics on failure.tests/fragment_transport.rs (9)
25-35: LGTM!Constants and the
fragmentation_confighelper are well-defined with appropriate timeout values for testing.
37-57: LGTM!The
fragment_envelopehelper correctly handles the single-fragment optimisation and properly encodes multi-fragment payloads.
59-68: LGTM!The
send_envelopeshelper is straightforward and correctly serializes and transmits envelopes.
70-98: LGTM!The
read_reassembled_responsehelper correctly handles both fragmented and non-fragmented responses, with appropriate use of theReassembler.
100-122: LGTM!The
make_apphelper is well-structured, correctly configuring fragmentation and routing for tests.
124-153: LGTM!Solid round-trip test covering the complete fragmentation and reassembly flow.
155-187: LGTM!The
test_fragment_rejectionhelper effectively eliminates duplication between rejection tests while maintaining clear, parameterised behaviour.
189-196: LGTM!Concise use of the helper to verify out-of-order fragment rejection.
198-208: LGTM!Correctly tests that duplicate fragments are detected and rejected.
…ests - Refactor reassemble_if_needed to return io::Result<Option<Envelope>> - Emit detailed warnings on decoding and reassembly errors with correlation ID - Enforce maximum deserialization failure threshold to return Err - Simplify fragmentation closure with helper fragment_packet - Add tests for unfragmented outbound frames, malformed fragment rejection, and disabling fragmentation - Minor docs wording fix Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 7
♻️ Duplicate comments (3)
src/connection.rs (1)
63-64: TheArc<dyn Fn>indirection is retained but now delegates tofragment_packet.Past reviewers suggested storing a concrete
Fragmenterinstead ofArc<dyn Fn>. The current implementation keeps the closure but addresses the DRY concern by callingfragment_packet(line 329). This is a reasonable trade-off: the closure approach avoids addingPacketbounds toConnectionActor's struct definition while centralising fragmentation logic in the shared helper.If strict adherence to the "no dynamic dispatch" suggestion is desired, refactor
ConnectionActorto be generic overPacketand storeOption<Arc<Fragmenter>>directly. Otherwise, the current approach is acceptable.docs/users-guide.md (1)
340-344: Insert comma after "large frames".Add a comma to separate the two conditions clearly.
-Set `fragmentation(None)` when the transport already supports large frames or -when fragmentation should be deferred to an upstream gateway. The +Set `fragmentation(None)` when the transport already supports large frames, or +when fragmentation should be deferred to an upstream gateway. TheThis addresses the static analysis hint and a past review comment.
tests/fragment_transport.rs (1)
1-2: Add a module-level doc comment.Per coding guidelines, every module must begin with a
//!doc comment explaining its purpose and utility.#![cfg(not(loom))] +//! Integration tests for transport-level fragmentation and reassembly. +//! +//! These tests verify that large payloads are correctly fragmented, transmitted, +//! and reassembled, and that malformed or out-of-order fragments are rejected. use std::{num::NonZeroUsize, time::Duration};
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (5)
docs/users-guide.md(2 hunks)src/app/connection.rs(10 hunks)src/connection.rs(6 hunks)tests/connection_fragmentation.rs(1 hunks)tests/fragment_transport.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Runmake check-fmt,make lint, andmake testbefore committing.make check-fmtexecutescargo fmt --workspace -- --check.make lintexecutescargo clippy --workspace --all-targets --all-features -- -D warnings.make testexecutescargo test --workspace.
Clippy warnings MUST be disallowed in Rust code.
Fix any warnings emitted during Rust tests in the code itself rather than silencing them.
Extract meaningfully named helper functions adhering to separation of concerns and CQRS when a Rust function is too long.
Group related parameters in meaningfully named structs when a Rust function has too many parameters.
Consider usingArcto reduce the amount of data returned when a Rust function is returning a large error.
Write unit and behavioural tests for new Rust functionality. Run both before and after making any change.
Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public APIs in Rust using Rustdoc comments (///) so documentation can be generated withcargo doc.
Prefer immutable data in Rust and avoid unnecessarymutbindings.
Handle errors with theResulttype in Rust instead of panicking where feasible.
Avoidunsafecode in Rust unless absolutely necessary and document any usage clearly.
Place function attributes after doc comments in Rust.
Do not usereturnin single-line Rust functions.
Use predicate functions for Rust conditional criteria with more than two branches.
Lints must not be silenced in Rust except as a last resort. Lint rule suppressions must be tightly scoped and include a clear reason.
Preferexpectoverallowin Rust.
Userstestfixtures for shared test setup in Rust.
Replace duplicated Rust tests with#[rstest(...)]parameterised cases.
Prefermockallfor mocks/stubs in Rust.
Prefer.expect()over.unwrap()in Rust.
Useconcat!()to combine long string literals in Rust rather than escaping newlines with a bac...
Files:
src/app/connection.rstests/connection_fragmentation.rstests/fragment_transport.rssrc/connection.rs
⚙️ CodeRabbit configuration file
**/*.rs: * Seek to keep the cognitive complexity of functions no more than 9.
- Adhere to single responsibility and CQRS
- Place function attributes after doc comments.
- Do not use
returnin single-line functions.- Move conditionals with >2 branches into a predicate function.
- Avoid
unsafeunless absolutely necessary.- Every module must begin with a
//!doc comment that explains the module's purpose and utility.- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.- Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
- Use
rstestfixtures for shared setup and to avoid repetition between tests.- Replace duplicated tests with
#[rstest(...)]parameterised cases.- Prefer
mockallfor mocks/stubs.- Prefer
.expect()over.unwrap()in tests..expect()and.unwrap()are forbidden outside of tests. Errors must be propagated.- Ensure that any API or behavioural changes are reflected in the documentation in
docs/- Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/- Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
- Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such
- For testing of functionality depending upon environment variables, dependency injection and...
Files:
src/app/connection.rstests/connection_fragmentation.rstests/fragment_transport.rssrc/connection.rs
docs/**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
Documentation must use en-GB-oxendict ("-ize" / "-yse" / "-our") spelling and grammar, with the exception of the naming of the "LICENSE" file.
Follow the documentation style guide conventions when writing project documentation
docs/**/*.md: Use British English based on the Oxford English Dictionary (en-GB-oxendict), including: -ize suffix in words like 'realize' and 'organization', -lyse suffix in words like 'analyse', -our suffix in words like 'colour', -re suffix in words like 'centre', double 'l' in words like 'cancelled', maintain 'e' in words like 'likeable', -ogue suffix in words like 'catalogue'
The word 'outwith' is acceptable in British English documentation
Keep US spelling when used in an API (for example, 'color')
Use the Oxford comma in documentation: 'ships, planes, and hovercraft' where it aids comprehension
Treat company names as collective nouns: 'Lille Industries are expanding'
Write headings in sentence case in documentation
Use Markdown headings (#, ##, ###, and so on) in order without skipping levels
Follow markdownlint recommendations for Markdown files
Always provide a language identifier for fenced code blocks in documentation; use 'plaintext' for non-code text
Use '-' as the first level bullet and renumber lists when items change in documentation
Prefer inline links using 'text' or angle brackets around the URL in documentation
Ensure blank lines before and after bulleted lists and fenced blocks in documentation
Ensure tables have a delimiter line below the header row in documentation
Expand any uncommon acronym on first use in documentation (for example, Continuous Integration (CI))
Wrap paragraphs at 80 columns in documentation
Wrap code at 120 columns in documentation
Do not wrap tables in documentation
Use footnotes referenced with '[^label]' in documentation
Include Mermaid diagrams where they add clarity to documentation
When embedding figures in documentation, use '' and provide brief alt text desc...
Files:
docs/users-guide.md
**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
**/*.md: Validate Markdown files usingmake markdownlint.
Runmake fmtafter any documentation changes to format all Markdown files and fix table markup.
Validate Mermaid diagrams in Markdown files by runningmake nixie.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown must be wrapped at 120 columns.
Tables and headings in Markdown must not be wrapped.
Use dashes (-) for list bullets in Markdown.
Use GitHub-flavoured Markdown footnotes ([^1]) for references and footnotes.
Files:
docs/users-guide.md
⚙️ CodeRabbit configuration file
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/users-guide.md
docs/**/*.{rs,md}
📄 CodeRabbit inference engine (docs/rust-doctest-dry-guide.md)
Every doctest should validate the public API of a crate from the perspective of an external user, treating each documentation test as a separate temporary crate that imports the library as an external dependency
Files:
docs/users-guide.md
docs/{**/*.md,**/*.rs}
📄 CodeRabbit inference engine (docs/wireframe-1-0-detailed-development-roadmap.md)
Remove the proposed
FrameSinkfrom the design and documentasync-streamas the canonical way to create streams imperatively forResponse::Streamhandling
Files:
docs/users-guide.md
🧬 Code graph analysis (4)
src/app/connection.rs (3)
src/app/builder.rs (3)
default_fragmentation(38-44)new(159-159)fragmentation(385-388)src/app/fragment_utils.rs (1)
fragment_packet(11-36)src/fragment/payload.rs (2)
decode_fragment_payload(79-113)encode_fragment_payload(49-67)
tests/connection_fragmentation.rs (2)
src/fragment/payload.rs (5)
std(32-32)std(60-60)std(82-82)std(93-93)decode_fragment_payload(79-113)src/fragment/config.rs (1)
for_frame_budget(33-51)
tests/fragment_transport.rs (4)
src/fragment/payload.rs (6)
std(32-32)std(60-60)std(82-82)std(93-93)decode_fragment_payload(79-113)encode_fragment_payload(49-67)src/app/connection.rs (2)
fragment(66-68)new(59-64)src/fragment/config.rs (1)
for_frame_budget(33-51)src/fragment/fragmenter.rs (2)
header(157-157)fragments(190-190)
src/connection.rs (2)
src/app/fragment_utils.rs (1)
fragment_packet(11-36)src/app/connection.rs (2)
fragment(66-68)new(59-64)
🪛 GitHub Actions: CI
src/app/connection.rs
[warning] 13-13: Diff in import order: 'builder::{default_fragmentation, WireframeApp}' vs 'builder::{WireframeApp, default_fragmentation}'.
[warning] 288-288: Diff in conditional chain formatting around 'let Some(env) = ... else' branch.
tests/fragment_transport.rs
[warning] 13-13: Diff in test imports: FRAGMENT_MAGIC added.
[warning] 19-19: Diff in test imports: FRAGMENT_MAGIC moved/adjusted.
[warning] 230-230: Diff in fragment assembly: Envelope::from_parts call updated to multi-line formatting.
🪛 GitHub Check: build-test
src/app/connection.rs
[warning] 288-288:
Diff in /home/runner/work/wireframe/wireframe/src/app/connection.rs
tests/fragment_transport.rs
[warning] 230-230:
Diff in /home/runner/work/wireframe/wireframe/tests/fragment_transport.rs
[warning] 19-19:
Diff in /home/runner/work/wireframe/wireframe/tests/fragment_transport.rs
[warning] 13-13:
Diff in /home/runner/work/wireframe/wireframe/tests/fragment_transport.rs
🪛 LanguageTool
docs/users-guide.md
[uncategorized] ~319-~319: Possible missing comma found.
Context: ...essage_sizecaps the total reassembled payload andreassembly_timeout` evicts stale p...
(AI_HYDRA_LEO_MISSING_COMMA)
[style] ~319-~319: Would you like to use the Oxford spelling “Customize”? The spelling ‘Customise’ is also correct.
Context: ...timeout` evicts stale partial messages. Customise or disable fragmentation via the builde...
(OXFORD_SPELLING_Z_NOT_S)
[uncategorized] ~340-~340: Possible missing comma found.
Context: ...en the transport already supports large frames or when fragmentation should be deferre...
(AI_HYDRA_LEO_MISSING_COMMA)
🔍 Remote MCP Ref
Summary of additional, concrete facts useful for review
-
Fragment wire format and decoder/encoder behavior
- Payloads start with a fixed 4‑byte marker FRAGMENT_MAGIC, then a big‑endian u16 header length, a bincode‑encoded FragmentHeader, then the fragment bytes. decode_fragment_payload detects FRAGMENT_MAGIC, reads header length, bincode‑decodes header and returns header + payload slice; encode_fragment_payload produces the reverse sequence.
-
Fragment overhead & size computations
- The module exposes fragment_overhead() (used to compute frame budget consumption) and FragmentationConfig::encoded_fragment_ceiling() which returns fragment_payload_cap + fragment_overhead. The encoder/decoder assume the header length fits in u16.
-
FragmentationConfig construction & guards
- FragmentationConfig::for_frame_budget(frame_budget, max_message_size, reassembly_timeout) computes the fragment_payload_cap by subtracting fragment_overhead() and an envelope guard (32 bytes) from frame_budget; returns None when budget is insufficient. FragmentationConfig holds fragment_payload_cap (NonZeroUsize), max_message_size (NonZeroUsize), and reassembly_timeout (Duration).
-
Defaults and hardening behavior (wiring in app/builder)
- Builder code derives a default FragmentationConfig from buffer_capacity, uses a 30s reassembly timeout constant, and enforces a max reassembled message cap (16× frame budget) per the new defaults described in docs and builder changes. This is applied by default but can be overridden via WireframeApp::fragmentation(Some/None). (See docs + builder changes in PR context.) [PR diff docs/builder and docs/hardening additions in PR context]
-
Tests and test duplication noted
- New tests exercise encoding/decoding, full round‑trip, out‑of‑order/duplicate fragment handling, eviction on timeout, and disabling fragmentation. Test files added: tests/connection_fragmentation.rs and tests/fragment_transport.rs. Reviewer-observed duplication: two tests share setup/teardown and differ only by pre-send mutation (out_of_order vs duplicate); suggested extraction of a helper test_fragment_rejection(fragment_mutator, message). (See PR comments / proposed refactor in PR context.) [PR test files referenced above]
Tags
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Sourcery review
🔇 Additional comments (15)
tests/connection_fragmentation.rs (2)
13-60: LGTM!The test setup and fragmentation/reassembly logic are sound. The tests now cover both the fragmenting path (large payload) and the pass-through path (small payload), addressing the previous reviewer's suggestion.
62-98: LGTM!This test correctly exercises the non-fragmenting branch by using a payload just under
fragment_payload_cap, verifying thatdecode_fragment_payloadreturnsOk(None)for unfragmented frames. This directly addresses the past review comment requesting a complementary test for small frames.src/app/connection.rs (3)
47-97: LGTM!
FragmentationStatecleanly bundles theFragmenterandReassembler, and delegates fragmentation tofragment_packet(addressing the earlier duplication concern). Thereassemblemethod correctly handles both fragmented and non-fragmented payloads, andpurge_expiredis available for timeout-based eviction.
353-397: LGTM!
reassemble_if_needednow correctly capturescorrelation_idbefore callingstate.reassemble(env)and uses it in warning logs. TheMAX_DESER_FAILUREScutoff is now enforced for fragment decode and reassembly errors, addressing the earlier bug-risk concern.
399-467: LGTM!
forward_responsecorrectly fragments outbound responses when fragmentation is enabled, handles fragmentation errors gracefully with logging and metrics, and iterates over the resulting fragments to serialise and send each one.docs/users-guide.md (2)
311-316: LGTM!The documentation clearly explains that fragmentation is automatic, derived from
buffer_capacity, and that handlers receive completeEnvelopevalues after reassembly.
322-338: LGTM!The code example correctly demonstrates constructing a
FragmentationConfigviafor_frame_budgetand applying it with.fragmentation(Some(cfg)).src/connection.rs (3)
317-331: LGTM!
enable_fragmentationcorrectly constructs theFragmenterand wraps it in a closure that callsfragment_packet. The doc comment accurately describes the behaviour.
649-675: LGTM!
process_frame_with_hooks_and_metricscleanly handles both the fragmented and non-fragmented paths. Fragmentation errors are logged with connection context and increment the handler error metric. The extractedpush_framehelper applies hooks and metrics consistently.
301-301: InitialisefragmentationtoNonein constructor.Verified that
fragmentation: Noneis set inwith_hooks, ensuring the field defaults to disabled untilenable_fragmentationis called.tests/fragment_transport.rs (5)
156-188: LGTM!The
test_fragment_rejectionhelper cleanly encapsulates shared setup and teardown. It accepts a closure to mutate fragments and a rejection message for the assertion, precisely addressing the PR objective to eliminate duplication between the out-of-order and duplicate fragment tests.
190-209: LGTM!Both tests now use the shared helper with concise, focused closures.
out_of_order_fragments_are_rejectedswaps fragments;duplicate_fragments_clear_reassemblyinserts a duplicate. The duplication flagged in the PR comments has been resolved.
211-239: LGTM!This test exercises the
FragmentProcessError::Decodebranch by truncating the fragment header after theFRAGmarker, ensuring malformed headers are safely rejected without invoking handlers. This addresses the past reviewer suggestion.
241-279: LGTM!The eviction test correctly sends the first fragment, sleeps beyond the reassembly timeout, then sends remaining fragments. The assertion confirms the handler never receives a payload after timeout eviction.
281-320: LGTM!This test verifies that
.fragmentation(None)disables fragmentation, and the handler receives the full payload without fragmentation overhead. It addresses the past reviewer suggestion for an opt-out path test.
… protections - Added support for concurrent reassembly of multiple message fragments using a DashMap keyed by message ID, allowing interleaved fragments from different logical messages. - Harden FragmentAdapter with configurable timeout for partial message reassembly and strict max message size enforcement to prevent memory exhaustion DoS attacks. - Updated documentation to refine terminology (removing hyphenations in 'reassembly') and clarify design goals and test benchmarks. - Refactored fragment_utils and related tests for better error management and correctness. - Improved logging messages for fragment decoding and reassembly failures. - Adjusted WireframeApp builder to set sensible default fragmentation configuration including eviction of stale fragments. This foundational enhancement improves protocol resilience and scalability by enabling safe and efficient handling of fragmented message streams. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
docs/generic-message-fragmentation-and-re-assembly-design.md (1)
265-366: Ensure measurable objectives remain realistic as implementation evolves.Keep the objectives table (max_message_size enforcement, timeout-based purge, performance targets) aligned with the actual adapter behaviour and benchmarks; update these rows if caps, timeout semantics, or performance budgets change.
docs/multi-layered-testing-strategy.md (2)
97-129: Keep reassembly terminology consistent across testing docs.Retain the dehyphenated “reassembled”/“reassembly” spelling here and propagate the same form anywhere new fragmentation tests or benchmarks are documented to minimise confusion.
490-503: Match performance benchmark text to actual FragmentAdapter throughput tests.Ensure the stated “> 5 GiB/s” throughput goal for the
FragmentAdaptermicro-benchmark reflects realistic measurements on CI hardware and update this target if future changes alter performance characteristics.
♻️ Duplicate comments (6)
docs/generic-message-fragmentation-and-re-assembly-design.md (1)
147-159: Keep FRAG wire-layout section locked tofragment::payloadimplementation.Treat this “Wire encoding” section as the single source of truth for external readers and update it in lockstep with
encode_fragment_payload/decode_fragment_payloadwhenever the marker, header length, or header encoding changes so the documentedFRAGformat always matches the code.tests/connection_fragmentation.rs (2)
1-2: Add a module-level doc comment describing these tests.Document this test module with an inner
//!comment explaining it exercisesConnectionActoroutbound fragmentation (multi-fragment and unfragmented paths) so it complies with the project guideline that every module begins with a descriptive doc comment.-#![cfg(not(loom))] +#![cfg(not(loom))] +//! Tests for `ConnectionActor` outbound fragmentation behaviour. +//! +//! Verify that frames exceeding the fragment payload cap are split into +//! multiple fragments and that small frames pass through unfragmented.
100-100: MoveROUTE_IDto the top of the module.Place
ROUTE_IDnear the imports or immediately before the first test to follow idiomatic Rust layout and make shared constants easy to locate.-#![cfg(not(loom))] +#![cfg(not(loom))] + +const ROUTE_ID: u32 = 7; @@ -const ROUTE_ID: u32 = 7;tests/fragment_transport.rs (2)
1-2: Add a module-level doc comment.Per coding guidelines, every module must begin with a
//!doc comment explaining its purpose and utility.#![cfg(not(loom))] +//! Integration tests for transport-level fragmentation and reassembly. +//! +//! These tests verify that large payloads are correctly fragmented, transmitted, +//! and reassembled, and that malformed or out-of-order fragments are rejected. use std::{num::NonZeroUsize, time::Duration};
245-283: Extract common setup into a shared fixture or extendtest_fragment_rejection.This test duplicates setup code from
test_fragment_rejection(lines 247–260 mirror lines 160–167). Per coding guidelines, userstestfixtures for shared test setup and parameterise where feasible.src/connection.rs (1)
63-64: Store a concreteFragmenterinstead ofArc<dyn Fn>.The dynamic dispatch is unnecessary. Store
Option<Arc<Fragmenter>>directly and callfragment_packetinprocess_frame_with_hooks_and_metrics. This removes indirection and aligns with the helper-based approach already used inFragmentationState.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (12)
docs/contents.md(1 hunks)docs/generic-message-fragmentation-and-re-assembly-design.md(11 hunks)docs/hardening-wireframe-a-guide-to-production-resilience.md(1 hunks)docs/multi-layered-testing-strategy.md(2 hunks)docs/observability-operability-and-maturity.md(1 hunks)docs/wireframe-1-0-detailed-development-roadmap.md(1 hunks)src/app/connection.rs(10 hunks)src/app/fragment_utils.rs(1 hunks)src/app/mod.rs(1 hunks)src/connection.rs(6 hunks)tests/connection_fragmentation.rs(1 hunks)tests/fragment_transport.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
docs/**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
Documentation must use en-GB-oxendict ("-ize" / "-yse" / "-our") spelling and grammar, with the exception of the naming of the "LICENSE" file.
Follow the documentation style guide conventions when writing project documentation
docs/**/*.md: Use British English based on the Oxford English Dictionary (en-GB-oxendict), including: -ize suffix in words like 'realize' and 'organization', -lyse suffix in words like 'analyse', -our suffix in words like 'colour', -re suffix in words like 'centre', double 'l' in words like 'cancelled', maintain 'e' in words like 'likeable', -ogue suffix in words like 'catalogue'
The word 'outwith' is acceptable in British English documentation
Keep US spelling when used in an API (for example, 'color')
Use the Oxford comma in documentation: 'ships, planes, and hovercraft' where it aids comprehension
Treat company names as collective nouns: 'Lille Industries are expanding'
Write headings in sentence case in documentation
Use Markdown headings (#, ##, ###, and so on) in order without skipping levels
Follow markdownlint recommendations for Markdown files
Always provide a language identifier for fenced code blocks in documentation; use 'plaintext' for non-code text
Use '-' as the first level bullet and renumber lists when items change in documentation
Prefer inline links using 'text' or angle brackets around the URL in documentation
Ensure blank lines before and after bulleted lists and fenced blocks in documentation
Ensure tables have a delimiter line below the header row in documentation
Expand any uncommon acronym on first use in documentation (for example, Continuous Integration (CI))
Wrap paragraphs at 80 columns in documentation
Wrap code at 120 columns in documentation
Do not wrap tables in documentation
Use footnotes referenced with '[^label]' in documentation
Include Mermaid diagrams where they add clarity to documentation
When embedding figures in documentation, use '' and provide brief alt text desc...
Files:
docs/contents.mddocs/observability-operability-and-maturity.mddocs/multi-layered-testing-strategy.mddocs/hardening-wireframe-a-guide-to-production-resilience.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.md
**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
**/*.md: Validate Markdown files usingmake markdownlint.
Runmake fmtafter any documentation changes to format all Markdown files and fix table markup.
Validate Mermaid diagrams in Markdown files by runningmake nixie.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown must be wrapped at 120 columns.
Tables and headings in Markdown must not be wrapped.
Use dashes (-) for list bullets in Markdown.
Use GitHub-flavoured Markdown footnotes ([^1]) for references and footnotes.
Files:
docs/contents.mddocs/observability-operability-and-maturity.mddocs/multi-layered-testing-strategy.mddocs/hardening-wireframe-a-guide-to-production-resilience.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.md
⚙️ CodeRabbit configuration file
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/contents.mddocs/observability-operability-and-maturity.mddocs/multi-layered-testing-strategy.mddocs/hardening-wireframe-a-guide-to-production-resilience.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.md
docs/**/*.{rs,md}
📄 CodeRabbit inference engine (docs/rust-doctest-dry-guide.md)
Every doctest should validate the public API of a crate from the perspective of an external user, treating each documentation test as a separate temporary crate that imports the library as an external dependency
Files:
docs/contents.mddocs/observability-operability-and-maturity.mddocs/multi-layered-testing-strategy.mddocs/hardening-wireframe-a-guide-to-production-resilience.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.md
docs/{**/*.md,**/*.rs}
📄 CodeRabbit inference engine (docs/wireframe-1-0-detailed-development-roadmap.md)
Remove the proposed
FrameSinkfrom the design and documentasync-streamas the canonical way to create streams imperatively forResponse::Streamhandling
Files:
docs/contents.mddocs/observability-operability-and-maturity.mddocs/multi-layered-testing-strategy.mddocs/hardening-wireframe-a-guide-to-production-resilience.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/generic-message-fragmentation-and-re-assembly-design.md
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Runmake check-fmt,make lint, andmake testbefore committing.make check-fmtexecutescargo fmt --workspace -- --check.make lintexecutescargo clippy --workspace --all-targets --all-features -- -D warnings.make testexecutescargo test --workspace.
Clippy warnings MUST be disallowed in Rust code.
Fix any warnings emitted during Rust tests in the code itself rather than silencing them.
Extract meaningfully named helper functions adhering to separation of concerns and CQRS when a Rust function is too long.
Group related parameters in meaningfully named structs when a Rust function has too many parameters.
Consider usingArcto reduce the amount of data returned when a Rust function is returning a large error.
Write unit and behavioural tests for new Rust functionality. Run both before and after making any change.
Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public APIs in Rust using Rustdoc comments (///) so documentation can be generated withcargo doc.
Prefer immutable data in Rust and avoid unnecessarymutbindings.
Handle errors with theResulttype in Rust instead of panicking where feasible.
Avoidunsafecode in Rust unless absolutely necessary and document any usage clearly.
Place function attributes after doc comments in Rust.
Do not usereturnin single-line Rust functions.
Use predicate functions for Rust conditional criteria with more than two branches.
Lints must not be silenced in Rust except as a last resort. Lint rule suppressions must be tightly scoped and include a clear reason.
Preferexpectoverallowin Rust.
Userstestfixtures for shared test setup in Rust.
Replace duplicated Rust tests with#[rstest(...)]parameterised cases.
Prefermockallfor mocks/stubs in Rust.
Prefer.expect()over.unwrap()in Rust.
Useconcat!()to combine long string literals in Rust rather than escaping newlines with a bac...
Files:
src/app/mod.rssrc/app/fragment_utils.rstests/fragment_transport.rstests/connection_fragmentation.rssrc/app/connection.rssrc/connection.rs
⚙️ CodeRabbit configuration file
**/*.rs: * Seek to keep the cognitive complexity of functions no more than 9.
- Adhere to single responsibility and CQRS
- Place function attributes after doc comments.
- Do not use
returnin single-line functions.- Move conditionals with >2 branches into a predicate function.
- Avoid
unsafeunless absolutely necessary.- Every module must begin with a
//!doc comment that explains the module's purpose and utility.- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.- Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
- Use
rstestfixtures for shared setup and to avoid repetition between tests.- Replace duplicated tests with
#[rstest(...)]parameterised cases.- Prefer
mockallfor mocks/stubs.- Prefer
.expect()over.unwrap()in tests..expect()and.unwrap()are forbidden outside of tests. Errors must be propagated.- Ensure that any API or behavioural changes are reflected in the documentation in
docs/- Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/- Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
- Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such
- For testing of functionality depending upon environment variables, dependency injection and...
Files:
src/app/mod.rssrc/app/fragment_utils.rstests/fragment_transport.rstests/connection_fragmentation.rssrc/app/connection.rssrc/connection.rs
🧬 Code graph analysis (5)
src/app/fragment_utils.rs (2)
src/fragment/payload.rs (1)
encode_fragment_payload(49-67)src/fragment/fragmenter.rs (1)
header(157-157)
tests/fragment_transport.rs (2)
src/fragment/payload.rs (2)
decode_fragment_payload(79-113)encode_fragment_payload(49-67)src/fragment/config.rs (1)
for_frame_budget(33-51)
tests/connection_fragmentation.rs (3)
src/fragment/payload.rs (5)
std(32-32)std(60-60)std(82-82)std(93-93)decode_fragment_payload(79-113)src/app/connection.rs (2)
fragment(65-67)new(58-63)src/fragment/config.rs (1)
for_frame_budget(33-51)
src/app/connection.rs (5)
src/app/builder.rs (3)
default_fragmentation(38-44)new(159-159)fragmentation(385-388)src/app/fragment_utils.rs (1)
fragment_packet(15-40)src/fragment/payload.rs (1)
decode_fragment_payload(79-113)src/app/envelope.rs (12)
new(83-89)new(115-121)id(49-49)id(94-94)id(124-124)payload(130-130)from_parts(58-58)from_parts(101-101)correlation_id(52-52)correlation_id(97-97)correlation_id(105-105)correlation_id(127-127)src/fragment/fragmenter.rs (1)
header(157-157)
src/connection.rs (2)
src/app/fragment_utils.rs (1)
fragment_packet(15-40)src/app/connection.rs (2)
fragment(65-67)new(58-63)
🪛 GitHub Actions: CI
src/app/connection.rs
[error] 359-359: Rust fmt check would reformate code: diff detected in fragment header formatting. Run 'cargo fmt' to apply standard formatting.
[error] 371-371: Rust fmt check would reformate code: diff detected in fragment reassembly formatting. Run 'cargo fmt' to apply standard formatting.
🪛 GitHub Check: build-test
src/app/connection.rs
[warning] 371-371:
Diff in /home/runner/work/wireframe/wireframe/src/app/connection.rs
[warning] 359-359:
Diff in /home/runner/work/wireframe/wireframe/src/app/connection.rs
🪛 LanguageTool
docs/observability-operability-and-maturity.md
[style] ~141-~141: To form a complete sentence, be sure to include a subject or ‘there’.
Context: ...rames dropped due to a full push queue. Should be zero in a healthy system. ...
(MISSING_IT_THERE)
docs/hardening-wireframe-a-guide-to-production-resilience.md
[style] ~318-~318: Consider using the typographical ellipsis character here instead.
Context: ...an override or disable the defaults via fragmentation(...). ## 5. Advanced Resilience Patterns ...
(ELLIPSIS)
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Sourcery review
🔇 Additional comments (19)
docs/observability-operability-and-maturity.md (1)
136-147: Table formatting and Measurable Objective reframing look correct.The metrics table is properly formatted with correct delimiter alignment and no wrapping. The reframed Measurable Objective paragraph (lines 145–147) is well wrapped at approximately 80 columns and clearly articulates the goal of constructing a comprehensive operational health dashboard. The reframing improves clarity without changing the semantic intent.
src/app/mod.rs (1)
11-18: Exposefragment_utilsas intended app-level entry point.Leave
fragment_utilsas a public submodule so callers access fragmentation helpers viaapp::fragment_utilswithout enlarging the mainappre-export surface.tests/fragment_transport.rs (8)
29-36: LGTM!The
fragmentation_confighelper correctly derives aFragmentationConfigfrom a capacity-based frame budget with sensible defaults.
60-69: LGTM!The
send_envelopeshelper correctly serializes and sends envelopes over the framed stream.
71-99: LGTM!The
read_reassembled_responsehelper correctly handles both fragmented and non-fragmented responses, using theReassemblerto reconstruct payloads.
101-123: LGTM!The
make_apphelper correctly constructs aWireframeAppwith fragmentation enabled and a handler that records payloads for verification.
125-154: LGTM!The round-trip test comprehensively verifies fragmentation and reassembly for both request and response paths.
156-188: LGTM!The
test_fragment_rejectionhelper consolidates shared test scaffolding per the PR objectives, accepting a closure to mutate fragments and a rejection message for the assertion.
190-209: LGTM!The
out_of_order_fragments_are_rejectedandduplicate_fragments_clear_reassemblytests correctly use the shared helper to verify rejection behaviour with minimal duplication.
211-243: LGTM!The malformed fragment header test correctly truncates the payload after the
FRAGmarker to exercise the decode-error path.src/app/connection.rs (4)
46-96: LGTM!The
FragmentationStatestruct correctly bundles outbound fragmentation and inbound reassembly, delegating to the sharedfragment_packethelper and handling all reassembly cases with proper error propagation.
113-116: LGTM!The
fragmentation_configmethod correctly falls back todefault_fragmentationwhen no explicit config is set.
275-305: LGTM!The
handle_framemethod cleanly orchestrates decoding, reassembly, and response forwarding with proper error propagation and metrics.
390-458: LGTM!The
forward_responsemethod correctly fragments responses when configured and handles serialization/send errors gracefully with appropriate logging and metrics.src/connection.rs (5)
12-16: LGTM!The expanded imports correctly bring in
Arcand atomic types for connection tracking alongside fragmentation dependencies.
52-56: LGTM!The imports correctly bring in the fragmentation types and the shared
fragment_packethelper from the app module.
138-141: LGTM!The
fragmentationfield is correctly added toConnectionActorand initialised toNonein the constructors.
317-331: LGTM!The
enable_fragmentationmethod correctly creates aFragmenterand wraps it in a closure that delegates tofragment_packet, avoiding duplication of fragmentation logic.
649-675: LGTM!The
process_frame_with_hooks_and_metricsmethod correctly routes frames through fragmentation when configured, handling errors with appropriate logging and metrics. The extractedpush_framehelper cleanly encapsulates hook application and emission.
…module - Introduced `make_handler` helper to reduce code duplication in `fragment_transport.rs` tests. - Improved payload handling to avoid unnecessary cloning. - Updated the fragmentation error docs in `fragment_utils.rs` for clarity. - Reformatted multi-line warnings in `connection.rs` for better readability. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/connection.rs (1)
1-464: Decompose the module to stay within the 400-line limit.The file is 464 lines, exceeding the 400-line limit specified in the coding guidelines. Large modules must be decomposed.
Suggested refactoring:
- Extract
FragmentationStateandFragmentProcessError(lines 46-96) into a separate modulesrc/app/fragmentation_state.rs.- Re-export the types from
src/app/mod.rsif they need to be accessible within the app module.- If the file still exceeds 400 lines, further extract helper functions (
decode_envelope,reassemble_if_needed,forward_response) into a separate module such assrc/app/frame_handling.rs.This will bring the file into compliance with the coding guidelines whilst maintaining clear module boundaries.
Based on coding guidelines stating files must not exceed 400 lines.
♻️ Duplicate comments (5)
tests/connection_fragmentation.rs (3)
1-2: Add a module-level doc comment.Every module must begin with a
//!doc comment explaining its purpose and utility per coding guidelines.Apply this diff:
#![cfg(not(loom))] +//! Integration tests for `ConnectionActor` outbound fragmentation behaviour. +//! +//! Verifies that frames exceeding the fragment payload cap are split into +//! multiple fragments and that small frames pass through unfragmented. use std::{num::NonZeroUsize, time::Duration};
101-101: Move ROUTE_ID to the top of the file.Place constants near the imports or before the first function for consistency with idiomatic Rust module layout.
Apply this diff:
+const ROUTE_ID: u32 = 7; + #[tokio::test] async fn connection_actor_fragments_outbound_frames() {Then remove the declaration at line 101.
92-93: Remove unnecessary clone when extracting payload.Consume
onlydirectly instead of cloning it, as it is not used afterwards.Apply this diff:
let only = out.into_iter().next().expect("frame present"); - let payload_out = only.clone().into_parts().payload(); + let payload_out = only.into_parts().payload();tests/fragment_transport.rs (2)
1-2: Add a module-level doc comment.Every module must begin with a
//!doc comment explaining its purpose and utility per coding guidelines.Apply this diff:
#![cfg(not(loom))] +//! Integration tests for transport-level fragmentation and reassembly. +//! +//! These tests verify that large payloads are correctly fragmented, transmitted, +//! and reassembled, and that malformed or out-of-order fragments are rejected. use std::{num::NonZeroUsize, time::Duration};
38-58: Avoid redundant clone when payload fits in a single fragment.The function clones
envtwice (lines 39 and 45). Restructure to avoid the double allocation.Apply this diff:
fn fragment_envelope(env: &Envelope, fragmenter: &Fragmenter) -> Vec<Envelope> { let parts = env.clone().into_parts(); let id = parts.id(); let correlation = parts.correlation_id(); let payload = parts.payload(); if payload.len() <= fragmenter.max_fragment_size().get() { - return vec![env.clone()]; + return vec![Envelope::new(id, correlation, payload)]; }This ensures the payload is only extracted once and avoids cloning the entire envelope twice.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
src/app/connection.rs(10 hunks)src/app/fragment_utils.rs(1 hunks)tests/connection_fragmentation.rs(1 hunks)tests/fragment_transport.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Runmake check-fmt,make lint, andmake testbefore committing.make check-fmtexecutescargo fmt --workspace -- --check.make lintexecutescargo clippy --workspace --all-targets --all-features -- -D warnings.make testexecutescargo test --workspace.
Clippy warnings MUST be disallowed in Rust code.
Fix any warnings emitted during Rust tests in the code itself rather than silencing them.
Extract meaningfully named helper functions adhering to separation of concerns and CQRS when a Rust function is too long.
Group related parameters in meaningfully named structs when a Rust function has too many parameters.
Consider usingArcto reduce the amount of data returned when a Rust function is returning a large error.
Write unit and behavioural tests for new Rust functionality. Run both before and after making any change.
Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public APIs in Rust using Rustdoc comments (///) so documentation can be generated withcargo doc.
Prefer immutable data in Rust and avoid unnecessarymutbindings.
Handle errors with theResulttype in Rust instead of panicking where feasible.
Avoidunsafecode in Rust unless absolutely necessary and document any usage clearly.
Place function attributes after doc comments in Rust.
Do not usereturnin single-line Rust functions.
Use predicate functions for Rust conditional criteria with more than two branches.
Lints must not be silenced in Rust except as a last resort. Lint rule suppressions must be tightly scoped and include a clear reason.
Preferexpectoverallowin Rust.
Userstestfixtures for shared test setup in Rust.
Replace duplicated Rust tests with#[rstest(...)]parameterised cases.
Prefermockallfor mocks/stubs in Rust.
Prefer.expect()over.unwrap()in Rust.
Useconcat!()to combine long string literals in Rust rather than escaping newlines with a bac...
Files:
src/app/fragment_utils.rstests/connection_fragmentation.rstests/fragment_transport.rssrc/app/connection.rs
⚙️ CodeRabbit configuration file
**/*.rs: * Seek to keep the cognitive complexity of functions no more than 9.
- Adhere to single responsibility and CQRS
- Place function attributes after doc comments.
- Do not use
returnin single-line functions.- Move conditionals with >2 branches into a predicate function.
- Avoid
unsafeunless absolutely necessary.- Every module must begin with a
//!doc comment that explains the module's purpose and utility.- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.- Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
- Use
rstestfixtures for shared setup and to avoid repetition between tests.- Replace duplicated tests with
#[rstest(...)]parameterised cases.- Prefer
mockallfor mocks/stubs.- Prefer
.expect()over.unwrap()in tests..expect()and.unwrap()are forbidden outside of tests. Errors must be propagated.- Ensure that any API or behavioural changes are reflected in the documentation in
docs/- Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/- Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
- Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such
- For testing of functionality depending upon environment variables, dependency injection and...
Files:
src/app/fragment_utils.rstests/connection_fragmentation.rstests/fragment_transport.rssrc/app/connection.rs
🧬 Code graph analysis (3)
src/app/fragment_utils.rs (3)
src/app/connection.rs (1)
fragment(65-67)src/fragment/payload.rs (1)
encode_fragment_payload(49-67)src/fragment/fragmenter.rs (1)
header(157-157)
tests/connection_fragmentation.rs (3)
src/fragment/payload.rs (5)
std(32-32)std(60-60)std(82-82)std(93-93)decode_fragment_payload(79-113)src/app/connection.rs (2)
fragment(65-67)new(58-63)src/fragment/config.rs (1)
for_frame_budget(33-51)
tests/fragment_transport.rs (4)
src/fragment/payload.rs (6)
std(32-32)std(60-60)std(82-82)std(93-93)decode_fragment_payload(79-113)encode_fragment_payload(49-67)src/app/connection.rs (2)
fragment(65-67)new(58-63)src/fragment/config.rs (1)
for_frame_budget(33-51)src/fragment/fragmenter.rs (2)
header(157-157)fragments(190-190)
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Sourcery review
🔇 Additional comments (1)
src/app/fragment_utils.rs (1)
8-41: LGTM! Past review concern addressed.The "# Errors" section now explicitly mentions both fragmenting and encoding failures, and the implementation correctly preserves packet
idandcorrelation_idacross all emitted fragments.
Added a new test to verify handling of unfragmented requests and responses, ensuring small payloads pass through the fragmentation layer unaltered. This complements existing tests and improves coverage for fragmentation transport logic. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
…' terms Corrected inconsistent use of hyphenation in the documentation for terms like 're-assembly' and 'reassembly' for consistency and clarity. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
|
@coderabbitai pause |
✅ Actions performedReviews paused. |
Corrected comma usage and changed "Customise" to American English "Customize" in the fragmentation customization explanation. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
…and fragment rejection behavior - Introduce tests verifying that frames exceeding payload caps are correctly fragmented. - Validate small frames pass through without fragmentation. - Test fragment rejection scenarios via mutated fragments. - Add FragmentRejectionSetup helper struct for test setup. - Minor import order fix in connection.rs. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
… for clarity Replaced the previous fragmentation field of type Option<OutboundFragmenter<F>> (an Arc<dyn Fn(F) -> Result<Vec<F>, FragmentationError> + Send + Sync>) with Option<Fragmenter>. This simplifies the fragmentation logic by directly storing a Fragmenter instance. Other cleanup includes: - Removal of OutboundFragmenter type alias. - Adapting ConnectionActor's methods to use Fragmenter directly for fragmentation. - Added Packet trait bounds where necessary for fragmentation. - Improvements in test setups including implementations of Packet for u8 and Vec<u8> for test support. - Introduction of helper spawn_app() in integration tests to avoid duplication. This refactor improves code clarity, reduces indirection, and improves type safety around fragmentation handling. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
…meterized cases Refactored the fragment rejection tests in `fragment_transport.rs` to use rstest parameterized test with multiple cases for out-of-order, duplicate, and malformed fragment headers. Updated test signatures to accept fragment mutators returning owned Vec<Envelope> and improved code clarity and reuse. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
- Moved FragmentationState struct and related logic from connection.rs to a new module fragmentation_state.rs. - Moved frame decoding, reassembly, and response forwarding helpers to frame_handling.rs. - Updated connection.rs to use these new modules, reducing complexity and improving code organization. - Adjusted imports and usage accordingly, removing now redundant methods and types from connection.rs. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (16)
docs/generic-message-fragmentation-and-re-assembly-design.md(11 hunks)docs/multi-layered-testing-strategy.md(4 hunks)docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md(4 hunks)docs/users-guide.md(2 hunks)docs/wireframe-1-0-detailed-development-roadmap.md(1 hunks)src/app/connection.rs(6 hunks)src/app/fragmentation_state.rs(1 hunks)src/app/frame_handling.rs(1 hunks)src/app/mod.rs(1 hunks)src/connection.rs(8 hunks)src/connection/test_support.rs(1 hunks)src/fragment/config.rs(1 hunks)src/fragment/mod.rs(1 hunks)src/fragment/payload.rs(1 hunks)tests/connection_fragmentation.rs(1 hunks)tests/fragment_transport.rs(1 hunks)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Runmake check-fmt,make lint, andmake testbefore committing.make check-fmtexecutescargo fmt --workspace -- --check.make lintexecutescargo clippy --workspace --all-targets --all-features -- -D warnings.make testexecutescargo test --workspace.
Clippy warnings MUST be disallowed in Rust code.
Fix any warnings emitted during Rust tests in the code itself rather than silencing them.
Extract meaningfully named helper functions adhering to separation of concerns and CQRS when a Rust function is too long.
Group related parameters in meaningfully named structs when a Rust function has too many parameters.
Consider usingArcto reduce the amount of data returned when a Rust function is returning a large error.
Write unit and behavioural tests for new Rust functionality. Run both before and after making any change.
Every Rust module must begin with a module level (//!) comment explaining the module's purpose and utility.
Document public APIs in Rust using Rustdoc comments (///) so documentation can be generated withcargo doc.
Prefer immutable data in Rust and avoid unnecessarymutbindings.
Handle errors with theResulttype in Rust instead of panicking where feasible.
Avoidunsafecode in Rust unless absolutely necessary and document any usage clearly.
Place function attributes after doc comments in Rust.
Do not usereturnin single-line Rust functions.
Use predicate functions for Rust conditional criteria with more than two branches.
Lints must not be silenced in Rust except as a last resort. Lint rule suppressions must be tightly scoped and include a clear reason.
Preferexpectoverallowin Rust.
Userstestfixtures for shared test setup in Rust.
Replace duplicated Rust tests with#[rstest(...)]parameterised cases.
Prefermockallfor mocks/stubs in Rust.
Prefer.expect()over.unwrap()in Rust.
Useconcat!()to combine long string literals in Rust rather than escaping newlines with a bac...
Files:
src/app/mod.rstests/fragment_transport.rstests/connection_fragmentation.rssrc/fragment/config.rssrc/app/connection.rssrc/fragment/mod.rssrc/app/frame_handling.rssrc/fragment/payload.rssrc/app/fragmentation_state.rssrc/connection.rssrc/connection/test_support.rs
⚙️ CodeRabbit configuration file
**/*.rs: * Seek to keep the cognitive complexity of functions no more than 9.
- Adhere to single responsibility and CQRS
- Place function attributes after doc comments.
- Do not use
returnin single-line functions.- Move conditionals with >2 branches into a predicate function.
- Avoid
unsafeunless absolutely necessary.- Every module must begin with a
//!doc comment that explains the module's purpose and utility.- Comments and docs must follow en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Lints must not be silenced except as a last resort.
#[allow]is forbidden.- Only narrowly scoped
#[expect(lint, reason = "...")]is allowed.- No lint groups, no blanket or file-wide suppression.
- Include
FIXME:with link if a fix is expected.- Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.
- Use
rstestfixtures for shared setup and to avoid repetition between tests.- Replace duplicated tests with
#[rstest(...)]parameterised cases.- Prefer
mockallfor mocks/stubs.- Prefer
.expect()over.unwrap()in tests..expect()and.unwrap()are forbidden outside of tests. Errors must be propagated.- Ensure that any API or behavioural changes are reflected in the documentation in
docs/- Ensure that any completed roadmap steps are recorded in the appropriate roadmap in
docs/- Files must not exceed 400 lines in length
- Large modules must be decomposed
- Long match statements or dispatch tables should be decomposed by domain and collocated with targets
- Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
- Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such
- For testing of functionality depending upon environment variables, dependency injection and...
Files:
src/app/mod.rstests/fragment_transport.rstests/connection_fragmentation.rssrc/fragment/config.rssrc/app/connection.rssrc/fragment/mod.rssrc/app/frame_handling.rssrc/fragment/payload.rssrc/app/fragmentation_state.rssrc/connection.rssrc/connection/test_support.rs
docs/**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
Documentation must use en-GB-oxendict ("-ize" / "-yse" / "-our") spelling and grammar, with the exception of the naming of the "LICENSE" file.
Follow the documentation style guide conventions when writing project documentation
docs/**/*.md: Use British English based on the Oxford English Dictionary (en-GB-oxendict), including: -ize suffix in words like 'realize' and 'organization', -lyse suffix in words like 'analyse', -our suffix in words like 'colour', -re suffix in words like 'centre', double 'l' in words like 'cancelled', maintain 'e' in words like 'likeable', -ogue suffix in words like 'catalogue'
The word 'outwith' is acceptable in British English documentation
Keep US spelling when used in an API (for example, 'color')
Use the Oxford comma in documentation: 'ships, planes, and hovercraft' where it aids comprehension
Treat company names as collective nouns: 'Lille Industries are expanding'
Write headings in sentence case in documentation
Use Markdown headings (#, ##, ###, and so on) in order without skipping levels
Follow markdownlint recommendations for Markdown files
Always provide a language identifier for fenced code blocks in documentation; use 'plaintext' for non-code text
Use '-' as the first level bullet and renumber lists when items change in documentation
Prefer inline links using 'text' or angle brackets around the URL in documentation
Ensure blank lines before and after bulleted lists and fenced blocks in documentation
Ensure tables have a delimiter line below the header row in documentation
Expand any uncommon acronym on first use in documentation (for example, Continuous Integration (CI))
Wrap paragraphs at 80 columns in documentation
Wrap code at 120 columns in documentation
Do not wrap tables in documentation
Use footnotes referenced with '[^label]' in documentation
Include Mermaid diagrams where they add clarity to documentation
When embedding figures in documentation, use '' and provide brief alt text desc...
Files:
docs/multi-layered-testing-strategy.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
**/*.md: Validate Markdown files usingmake markdownlint.
Runmake fmtafter any documentation changes to format all Markdown files and fix table markup.
Validate Mermaid diagrams in Markdown files by runningmake nixie.
Markdown paragraphs and bullet points must be wrapped at 80 columns.
Code blocks in Markdown must be wrapped at 120 columns.
Tables and headings in Markdown must not be wrapped.
Use dashes (-) for list bullets in Markdown.
Use GitHub-flavoured Markdown footnotes ([^1]) for references and footnotes.
Files:
docs/multi-layered-testing-strategy.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
⚙️ CodeRabbit configuration file
**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")
- Use en-GB-oxendict (-ize / -yse / -our) spelling and grammar
- Headings must not be wrapped.
- Documents must start with a level 1 heading
- Headings must correctly increase or decrease by no more than one level at a time
- Use GitHub-flavoured Markdown style for footnotes and endnotes.
- Numbered footnotes must be numbered by order of appearance in the document.
Files:
docs/multi-layered-testing-strategy.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
docs/**/*.{rs,md}
📄 CodeRabbit inference engine (docs/rust-doctest-dry-guide.md)
Every doctest should validate the public API of a crate from the perspective of an external user, treating each documentation test as a separate temporary crate that imports the library as an external dependency
Files:
docs/multi-layered-testing-strategy.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
docs/{**/*.md,**/*.rs}
📄 CodeRabbit inference engine (docs/wireframe-1-0-detailed-development-roadmap.md)
Remove the proposed
FrameSinkfrom the design and documentasync-streamas the canonical way to create streams imperatively forResponse::Streamhandling
Files:
docs/multi-layered-testing-strategy.mddocs/generic-message-fragmentation-and-re-assembly-design.mddocs/users-guide.mddocs/wireframe-1-0-detailed-development-roadmap.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
🧠 Learnings (2)
📚 Learning: 2025-11-26T19:40:40.601Z
Learnt from: CR
Repo: leynos/wireframe PR: 0
File: docs/wireframe-1-0-detailed-development-roadmap.md:0-0
Timestamp: 2025-11-26T19:40:40.601Z
Learning: Applies to docs/**/fragment/adapter.rs : Enhance the `FragmentAdapter`'s inbound logic to support concurrent re-assembly of multiple messages using `msg_id` from `FragmentMeta` as a key into a `dashmap::DashMap` of partial messages
Applied to files:
docs/generic-message-fragmentation-and-re-assembly-design.mddocs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
📚 Learning: 2025-11-26T19:39:24.515Z
Learnt from: CR
Repo: leynos/wireframe PR: 0
File: docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md:0-0
Timestamp: 2025-11-26T19:39:24.515Z
Learning: Applies to docs/**/{tests,test}/**/*.rs : Use `proptest` for stateful property testing to validate complex protocol conversations like fragmentation and re-assembly, generating thousands of random-but-valid operation sequences
Applied to files:
docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md
🧬 Code graph analysis (7)
tests/fragment_transport.rs (4)
src/fragment/payload.rs (7)
std(33-33)std(61-61)std(83-83)std(94-94)std(147-147)decode_fragment_payload(80-114)encode_fragment_payload(50-68)src/fragment/config.rs (1)
for_frame_budget(35-53)tests/multi_packet_streaming.rs (1)
parts(91-91)src/fragment/fragmenter.rs (2)
header(157-157)fragments(190-190)
tests/connection_fragmentation.rs (4)
src/fragment/payload.rs (6)
std(33-33)std(61-61)std(83-83)std(94-94)std(147-147)decode_fragment_payload(80-114)src/app/fragmentation_state.rs (1)
fragment(38-40)src/fragment/config.rs (1)
for_frame_budget(35-53)src/fragment/fragmenter.rs (1)
header(157-157)
src/fragment/config.rs (1)
src/fragment/payload.rs (6)
std(33-33)std(61-61)std(83-83)std(94-94)std(147-147)fragment_overhead(25-35)
src/fragment/mod.rs (1)
src/fragment/payload.rs (3)
decode_fragment_payload(80-114)encode_fragment_payload(50-68)fragment_overhead(25-35)
src/fragment/payload.rs (2)
src/fragment/fragmenter.rs (1)
header(157-157)src/fragment/index.rs (1)
zero(34-34)
src/connection.rs (4)
src/app/fragment_utils.rs (1)
fragment_packet(16-41)src/app/fragmentation_state.rs (1)
fragment(38-40)src/correlation.rs (3)
correlation_id(11-11)correlation_id(18-18)correlation_id(24-24)src/metrics.rs (2)
inc_handler_errors(86-86)inc_handler_errors(89-89)
src/connection/test_support.rs (1)
src/app/envelope.rs (11)
id(49-49)id(94-94)id(124-124)correlation_id(52-52)correlation_id(97-97)correlation_id(105-105)correlation_id(127-127)into_parts(55-55)into_parts(99-99)from_parts(58-58)from_parts(101-101)
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Sourcery review
🔇 Additional comments (46)
docs/wireframe-1-0-detailed-development-roadmap.md (1)
53-60: LGTM!The terminology normalisation ("reassembly" over "re-assembly") and the "(Done)" status marker align with the broader PR changes and accurately reflect the completed implementation.
docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md (1)
174-175: LGTM!Terminology normalisation ("reassembly" over "re-assembly") is consistent across the document and aligns with the broader PR changes.
src/fragment/mod.rs (2)
1-17: LGTM!The module structure is well-organised with clear public/private boundaries. The doc comment adequately explains the module's purpose.
18-34: LGTM!The public re-exports provide a clean, cohesive API surface for fragmentation functionality. The grouping of payload utilities is logical.
docs/multi-layered-testing-strategy.md (2)
97-129: LGTM!Terminology normalisation is consistent with other documentation in this PR.
502-504: LGTM!The revised throughput target (2 GiB/s) is more realistic for shared CI runners, and the guidance note ensures this target is revisited when appropriate.
src/app/mod.rs (1)
15-17: LGTM!The visibility choices are appropriate:
fragment_utilsis public for external use, whilstfragmentation_stateandframe_handlingremain private implementation details.src/connection/test_support.rs (2)
17-20: LGTM!The additional imports support the new
Packettrait implementations below.
22-42: LGTM!These minimal
Packetimplementations for primitive types are appropriate for test harness purposes. The single-line function style aligns with coding guidelines.tests/connection_fragmentation.rs (2)
1-17: LGTM!Module documentation is present and the
ROUTE_IDconstant is correctly placed at the top. Past review concerns have been addressed.
19-67: LGTM!The test correctly derives payload size from the configured fragment cap and thoroughly verifies reassembly. The test logic is sound.
src/fragment/payload.rs (4)
1-16: LGTM!The module documentation clearly explains the fragment payload format and purpose. The
FRAGMENT_MAGICconstant is appropriately defined.
18-35: LGTM!The function correctly computes fragment overhead with appropriate documentation of invariants and panic conditions. Past review concerns about encoding assumptions have been addressed.
37-68: LGTM!The encoding function is well-implemented with pre-allocated capacity and appropriate error handling. The panic condition is documented and guarded by
try_into().expect().
70-196: LGTM!The decode function handles all error cases robustly, and the test suite comprehensively covers round-trip encoding, non-fragment detection, and both error branches (
UnexpectedEndand length mismatch). Past review concerns about error branch coverage have been addressed.tests/fragment_transport.rs (5)
1-3: LGTM!The module-level doc comment is present and correctly explains the module's purpose, satisfying the coding guidelines requirement.
31-138: LGTM!The helper functions are well-structured, follow separation of concerns, and the
make_handlerextraction addresses previous duplication concerns. Thefragment_envelopefunction now avoids redundant cloning as previously requested.
140-194: LGTM!Both round-trip tests comprehensively verify the fragmentation and pass-through paths. The
unfragmented_request_and_response_round_triptest addresses the previous review suggestion by exercising the non-fragmenting branch when fragmentation is enabled but payload is below the cap.
196-308: LGTM!The
FragmentRejectionSetupstruct andtest_fragment_rejectionhelper consolidate the shared scaffolding as requested in the PR objectives. Therstestparameterised approach correctly replaces duplicated tests per coding guidelines.
310-378: LGTM!The
expired_fragments_are_evictedtest verifies timeout-based eviction, andfragmentation_can_be_disabled_via_public_apiexercises the opt-out path as requested in previous reviews.src/connection.rs (4)
12-16: LGTM!The import reorganisation is clean and the
Arcimport is correctly placed within thesyncblock.
315-326: LGTM!The
enable_fragmentationmethod correctly stores a concreteFragmenterwrapped inArc, addressing previous review feedback about removing dynamic dispatch. ThePackettrait bound ensures the frame type supports fragmentation operations.
645-674: LGTM!The fragmentation logic correctly delegates to
fragment_packetfromfragment_utils, addressing the DRY concerns from previous reviews. Thepush_framehelper cleanly encapsulates hook application and metric recording.
397-420: Based on my verification, I have all the necessary information. The review comment correctly identifies a real concern:Key findings:
Fis bound toFrameLike + CorrelatableFrame + Packet(line 242 in src/connection.rs)- Both
CorrelatableFrame(src/correlation.rs) andPacket(src/app/envelope.rs) definecorrelation_id(&self) -> Option<u64>- The code correctly uses explicit qualification:
CorrelatableFrame::correlation_id(frame)- This explicit qualification is necessary to avoid ambiguity, not just stylistic
The explicit qualification already present in the code properly disambiguates between the two trait methods. The code will not compile if ambiguity arises as traits evolve, since Rust will error on method call ambiguity. No changes are needed.
Explicit trait qualification already prevents ambiguity.
Both
CorrelatableFrameandPackettraits definecorrelation_id(&self) -> Option<u64>, andFis bound to both (line 242). The calls on lines 402–406 and 412–414 correctly useCorrelatableFrame::correlation_id(frame)to disambiguate. This qualification is necessary and will catch any future trait evolution issues at compile time.docs/generic-message-fragmentation-and-re-assembly-design.md (2)
147-163: LGTM!Section 3.4 clearly documents the on-wire fragment encoding format, aligning with the implementation in
fragment::payload. The defaults (16× cap, 30s timeout) match the code.
13-18: LGTM!The terminology updates correctly standardise on "reassembly" and "reassembled" throughout the introduction, aligning with en-GB-oxendict conventions.
src/fragment/config.rs (2)
1-6: LGTM!The module-level documentation correctly uses "reassembly" spelling, and the imports are appropriate.
21-59: LGTM!The
ENVELOPE_GUARD_BYTESconstant is now documented as requested in previous reviews. Thefor_frame_budgetlogic correctly handles edge cases where the budget is insufficient, andencoded_fragment_ceilingprovides a useful accessor.src/app/connection.rs (5)
14-27: LGTM!The imports are correctly organised, bringing in the fragmentation-related types and utilities needed for the updated connection handling.
53-56: LGTM!The
fragmentation_configmethod correctly falls back todefault_fragmentationwhen no explicit configuration is set, ensuring fragmentation is enabled by default when the buffer capacity supports it.
174-213: LGTM!The
process_streammethod correctly initialises fragmentation state from the configuration and purges expired assemblies on read timeout, providing DoS protection against abandoned partial messages.
215-251: LGTM!The
handle_framemethod cleanly delegates toframe_handling::reassemble_if_neededandframe_handling::forward_response, addressing previous review feedback about centralising fragmentation logic. The deserialization failure threshold is correctly passed through.
253-294: LGTM!The
decode_envelopeextraction cleanly separates envelope parsing from reassembly, with appropriate error handling and deserialization failure tracking. The threshold check prevents unbounded failures before disconnection.src/app/fragmentation_state.rs (4)
1-4: Module documentation is present and clear.The module-level doc comment explains the purpose well and references the consuming types.
30-36: Constructor correctly initialises both components from config.The delegation to
Fragmenter::newandReassembler::newis clean and keeps configuration centralised.
42-69: Reassembly logic is sound.The three-way dispatch correctly handles: complete reassembly (returns
Some), partial fragment (returnsNone), and non-fragment payloads (passes through unchanged). Error propagation is explicit.
71-71: Single-line delegation is idiomatic.src/app/frame_handling.rs (4)
1-3: Module documentation is present.
92-100: Verify that silently dropping fragmentation failures is intentional.When
state.fragment()fails, the function logs and returnsOk(())without notifying the caller. The response is lost. Confirm this is the desired behaviour; if not, propagate an error or emit a specific metric for dropped responses.
106-127: Partial fragment delivery may leave peer reassembly pending until timeout.If serialization or send fails mid-sequence, remaining fragments are dropped. The peer's
Reassemblerwill hold partial state untilreassembly_timeoutexpires. This is likely acceptable (timeouts exist for this reason), but document this behaviour if not already covered.
70-81: Function signature and bounds are appropriate.The generic constraints correctly require
Serializer + Send + Sync,Packet, and async I/O traits.docs/users-guide.md (5)
147-149: Spelling and grammar are correct.The text uses "reassembled" consistently, aligning with the resolved past review comment.
311-316: Introduction accurately describes the automatic fragmentation behaviour.
318-320: Guards description is accurate and uses correct Oxford spelling.
340-344: Guidance for disabling fragmentation is clear and correctly punctuated.
329-333: Use.expect()instead of.unwrap()in documentation examples.Per coding guidelines, prefer
.expect()over.unwrap(). Documentation examples serve as templates; using.expect()reinforces good practice.let cfg = FragmentationConfig::for_frame_budget( 1024, - NonZeroUsize::new(16 * 1024).unwrap(), + NonZeroUsize::new(16 * 1024).expect("16 KiB is non-zero"), Duration::from_secs(30), ).expect("frame budget too small for fragments");Likely an incorrect or invalid review comment.
Refactored error handling in `reassemble_if_needed` to use a shared `handle_fragment_error` function for decoding and reassembly failures, reducing code duplication and improving clarity. Added `thiserror`-based error messages in `FragmentProcessError` to support better error descriptions and logging consistency. Test code was updated to factor out common setup for fragmentation in `connection_fragmentation.rs`, enhancing test maintainability. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
Changes
Why
How to test
API usage notes
Migration & compatibility
Performance considerations
🌿 Generated by Terry
ℹ️ Tag @terragon-labs to ask questions and address PR feedback
📎 Task: https://www.terragonlabs.com/task/aff0ff40-67c7-46c3-a989-0fc7f61e7719
Summary by Sourcery
Add transport-level fragmentation and reassembly so large messages are split into framed fragments on write and transparently reassembled on read, with sensible defaults and configuration hooks.
New Features:
Enhancements:
Documentation:
Tests: