Implement transport-level fragmentation and inbound reassembly#397
Implement transport-level fragmentation and inbound reassembly#397
Conversation
…e fragments Introduces a new Reassembler that mirrors the Fragmenter by collecting and reassembling inbound message fragments keyed by MessageId. It enforces fragment ordering, limits maximum assembled message size, and supports caller-driven timeout-based purging of stale partial messages to prevent resource exhaustion. Enhancements include comprehensive unit and integration tests, updates to documentation and usage guides, and behavioural test steps verifying reassembly correctness, capacity enforcement, and eviction of expired buffers. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Reviewer's GuideAdds a transport-agnostic inbound Reassembler with size/time caps, integrates it into the fragment module’s public API, and wires comprehensive unit and Cucumber tests plus docs for reassembly behavior. Sequence diagram for Reassembler push_at fragment processingsequenceDiagram
actor Client
participant Reassembler
participant Buffers as Buffers_HashMap
participant Partial as PartialMessage
participant Series as FragmentSeries
Client->>Reassembler: push_at(header, payload, now)
Reassembler->>Reassembler: purge_expired_at(now)
Reassembler->>Buffers: entry(header.message_id())
alt Existing_partial_message
Buffers-->>Reassembler: Occupied(entry)
Reassembler->>Series: accept(header)
alt Ordering_ok_incomplete
Series-->>Reassembler: FragmentStatus::Incomplete
Reassembler->>Partial: len()
Reassembler->>Reassembler: assert_within_limit(max_message_size, message_id, attempted)
alt Within_size_cap
Reassembler->>Partial: push(payload)
Reassembler-->>Client: Ok(None)
else Exceeds_size_cap
Reassembler-->>Client: Err(ReassemblyError::MessageTooLarge)
end
else Ordering_ok_complete
Series-->>Reassembler: FragmentStatus::Complete
Reassembler->>Partial: len()
Reassembler->>Reassembler: assert_within_limit(max_message_size, message_id, attempted)
alt Within_size_cap
Reassembler->>Partial: push(payload)
Reassembler->>Partial: into_buffer()
Reassembler->>Reassembler: ReassembledMessage::new(message_id, buffer)
Reassembler-->>Client: Ok(Some(ReassembledMessage))
else Exceeds_size_cap
Reassembler-->>Client: Err(ReassemblyError::MessageTooLarge)
end
else Ordering_error
Series-->>Reassembler: Err(FragmentError)
Reassembler-->>Client: Err(ReassemblyError::Fragment)
end
else First_fragment_for_message
Buffers-->>Reassembler: Vacant(entry)
Reassembler->>Series: FragmentSeries::new(message_id)
Reassembler->>Series: accept(header)
alt Ordering_ok_incomplete
Series-->>Reassembler: FragmentStatus::Incomplete
Reassembler->>Reassembler: assert_within_limit(max_message_size, message_id, payload.len())
Reassembler->>Buffers: insert(message_id, PartialMessage::new(series, payload, now))
Reassembler-->>Client: Ok(None)
else Ordering_ok_complete
Series-->>Reassembler: FragmentStatus::Complete
Reassembler->>Reassembler: assert_within_limit(max_message_size, message_id, payload.len())
Reassembler->>Reassembler: ReassembledMessage::new(message_id, payload.to_vec())
Reassembler-->>Client: Ok(Some(ReassembledMessage))
else Ordering_error
Series-->>Reassembler: Err(FragmentError)
Reassembler-->>Client: Err(ReassemblyError::Fragment)
end
end
Class diagram for inbound Reassembler and related typesclassDiagram
class Reassembler {
+NonZeroUsize max_message_size
+Duration timeout
+HashMap~MessageId, PartialMessage~ buffers
+Reassembler new(max_message_size: NonZeroUsize, timeout: Duration) Reassembler
+push(header: FragmentHeader, payload: impl AsRef~[u8]~) Result~Option~ReassembledMessage~~ ReassemblyError
+push_at(header: FragmentHeader, payload: impl AsRef~[u8]~, now: Instant) Result~Option~ReassembledMessage~~ ReassemblyError
+purge_expired() Vec~MessageId~
+purge_expired_at(now: Instant) Vec~MessageId~
+buffered_len() usize
-assert_within_limit(limit: NonZeroUsize, message_id: MessageId, attempted: usize) Result~(), ReassemblyError
}
class PartialMessage {
-FragmentSeries series
-Vec~u8~ buffer
-Instant started_at
+new(series: FragmentSeries, payload: &[u8], started_at: Instant) PartialMessage
+push(payload: &[u8]) void
+len() usize
+started_at() Instant
+into_buffer() Vec~u8~
}
class ReassembledMessage {
+MessageId message_id
+Vec~u8~ payload
+new(message_id: MessageId, payload: Vec~u8~) ReassembledMessage
+message_id() MessageId
+payload() &[u8]
+into_payload() Vec~u8~
+decode~M: Message~() Result~M, DecodeError~
}
class ReassemblyError {
<<enum>>
+Fragment(FragmentError)
+MessageTooLarge
+message_id: MessageId
+attempted: usize
+limit: NonZeroUsize
}
class FragmentSeries {
+new(message_id: MessageId) FragmentSeries
+accept(header: FragmentHeader) Result~FragmentStatus, FragmentError~
}
class FragmentHeader {
+message_id() MessageId
}
class FragmentStatus {
<<enum>>
+Incomplete
+Complete
}
class FragmentError {
}
class MessageId {
}
class Message {
<<trait>>
+from_bytes(bytes: &[u8]) Result~Self, DecodeError~
}
class DecodeError {
}
Reassembler o--> PartialMessage : buffers
Reassembler --> ReassemblyError
Reassembler --> FragmentHeader
Reassembler --> FragmentSeries
Reassembler --> FragmentStatus
Reassembler --> MessageId
Reassembler --> ReassembledMessage
PartialMessage --> FragmentSeries
ReassembledMessage --> Message
ReassembledMessage --> MessageId
ReassembledMessage --> DecodeError
ReassemblyError --> FragmentError
ReassemblyError --> MessageId
FragmentSeries --> FragmentHeader
FragmentSeries --> FragmentStatus
FragmentHeader --> MessageId
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Summary by CodeRabbitRelease Notes
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughSummarise the change: add a transport-agnostic inbound Reassembler that buffers fragments per MessageId, enforces ordering and max assembled size, exposes timeout-based purging, returns a ReassembledMessage on completion, and includes error handling and tests. Changes
Sequence Diagram(s)sequenceDiagram
actor Caller
participant Reassembler
participant Partial as PartialMessage (buffer)
participant Series as FragmentSeries
participant Output as ReassembledMessage
Caller->>Reassembler: push(header: Fragment idx=0..N, payload)
activate Reassembler
alt New message
Reassembler->>Series: create series for message_id
Reassembler->>Partial: insert new PartialMessage(start_time, buffer)
Reassembler-->>Caller: Ok(None)
else Existing partial
Reassembler->>Partial: verify next index == expected
alt Index mismatch
Reassembler-->>Caller: Err(ReassemblyError::Fragment(...))
else Within limit
Reassembler->>Partial: append payload
alt Final fragment received
Partial->>Output: build ReassembledMessage(message_id, buffer)
Reassembler->>Reassembler: remove partial
Reassembler-->>Caller: Ok(Some(Output))
else Not final
Reassembler-->>Caller: Ok(None)
end
end
end
deactivate Reassembler
sequenceDiagram
actor Caller
participant Reassembler
participant Clock as Now / Timeout
participant Partial as PartialMessage
Caller->>Reassembler: purge_expired()
activate Reassembler
loop for each buffered partial
Reassembler->>Clock: compute elapsed = now - partial.start
alt elapsed > timeout
Reassembler->>Partial: drop partial
Reassembler-->>Caller: record evicted id
else
Reassembler-->>Caller: keep partial
end
end
Reassembler-->>Caller: return Vec<MessageId> evicted
deactivate Reassembler
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used📓 Path-based instructions (1)**/*.rs📄 CodeRabbit inference engine (AGENTS.md)
Files:
⚙️ CodeRabbit configuration file
Files:
🧬 Code graph analysis (1)tests/worlds/fragment/reassembly.rs (1)
🔍 Remote MCPI encountered rate limits with the Ref documentation search tool. Let me try a different approach to gather additional context:Based on the search results, I can see general fragment reassembly patterns but not the specific wireframe crate context. Let me search more directly for Rust fragmentation libraries and the specific PR context:The web search isn't finding the specific wireframe crate. Based on the detailed context already provided in the PR summary, I have sufficient information to provide a comprehensive review summary. The PR documentation and AI summaries are very thorough. Let me compile the relevant context for the review: Summary of Relevant Context for PR ReviewBased on the provided PR context, here is the key factual information for effectively reviewing the Reassembler PR (#397): Core Feature ImplementationThe PR adds a transport-agnostic Reassembler component that buffers inbound fragments and reconstructs complete messages. Key design considerations align with established fragment reassembly patterns:
API Surface and Public ExportsThe following new public types are exposed:
Testing CoverageComprehensive test suite validates:
Cucumber-based behavioral tests and fixture scaffolding (tests/worlds/fragment/) support stateful, time-controlled testing scenarios. Suggested Code Improvements from CommentsThree factual refactoring opportunities were identified:
The author (leynos) confirmed that implementation-level refactorings ( ⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
🔇 Additional comments (5)
Comment |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +195 to +217 fn reassembler_accumulates_ordered_fragments() {
let mut reassembler = Reassembler::new(
NonZeroUsize::new(8).expect("non-zero"),
Duration::from_secs(30),
);
let first = FragmentHeader::new(MessageId::new(2), FragmentIndex::zero(), false);
let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true);
assert!(
reassembler
.push(first, [5_u8, 6, 7])
.expect("first fragment accepted")
.is_none()
);
let complete = reassembler
.push(final_fragment, [8_u8, 9])
.expect("final fragment accepted")
.expect("message should complete");
assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]);
assert_eq!(reassembler.buffered_len(), 0);
}❌ New issue: Code Duplication |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Comment on lines +293 to +316 pub fn push_fragment(
&mut self,
message_id: u64,
index: u32,
is_last: bool,
payload_len: usize,
) {
let reassembler = self
.reassembler
.as_mut()
.expect("reassembler not configured");
let header = FragmentHeader::new(
MessageId::new(message_id),
FragmentIndex::new(index),
is_last,
);
let payload = vec![0_u8; payload_len];
self.last_reassembly_error = None;
self.last_reassembled = None;
match reassembler.push_at(header, payload, self.now) {
Ok(output) => self.last_reassembled = output,
Err(err) => self.last_reassembly_error = Some(err),
}
}❌ New issue: Excess Number of Function Arguments |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
|
Refactor src/fragment/tests.rs to eliminate code duplication between reassembler_accumulates_ordered_fragments and reassembler_rejects_out_of_order_and_drops_partial tests.
Preserve all assertion logic and test behaviour. The refactored tests should be more concise whilst maintaining identical semantics. |
Refactor the fragment tests and related test step implementations to use the FragmentHeader struct directly instead of separate message_id, index, and is_last parameters. Introduce a helper function to setup the reassembler with the first fragment to reduce duplication. This improves code clarity and consistency across the fragment tests and steps. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
- Extracted common logic for appending fragments and checking max message size into `append_and_maybe_complete` method. - This reduces code duplication when handling incomplete and complete fragment statuses. - Added tests verifying reassembler accepts fragments up to max message size, including single and multiple fragment cases. - Added support and tests for detection of out-of-order fragment errors in feature tests and step implementations. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
|
@coderabbitai review
|
This comment was marked as resolved.
This comment was marked as resolved.
…e test files - Introduced a new `reassembly.rs` test module under `tests/worlds/fragment/` with comprehensive helpers and assertions for fragment reassembly scenarios. - Renamed `tests/worlds/fragment.rs` to a directory `tests/worlds/fragment/mod.rs` to better modularize fragment-related tests. - Updated imports and test structure to integrate the new module. - Improved test clarity and coverage for reassembly error conditions and behavior. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/worlds/fragment/reassembly.rs Comment on lines +103 to +112 pub fn assert_reassembly_over_limit(&self) {
let err = self
.last_reassembly_error
.as_ref()
.expect("no reassembly error captured");
assert!(
matches!(err, ReassemblyError::MessageTooLarge { .. }),
"expected message-too-large error, got {err}"
);
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
… into generic helper Introduce a generic helper method `assert_reassembly_error_matches` to assert on the latest captured reassembly error using a predicate. This reduces code duplication and improves clarity by consolidating common assertion logic previously repeated in the `assert_reassembly_over_limit` and `assert_reassembly_out_of_order` methods. Co-authored-by: terragon-labs[bot] <terragon-labs[bot]@users.noreply.github.com>
Summary
Introduce a transport-agnostic Reassembler to buffer inbound fragments, reassemble them into complete messages, and enforce resource safeguards (size cap and timeout-based eviction). This mirrors the outbound Fragmenter path and provides a robust inbound fragmentation workflow with precise error handling and test coverage.
Changes
Public API and usage
Example snippet (from docs/users-guide):
Testing and verification
Compatibility and breaking changes
Test plan
If you need any additional usage examples or want to adjust the default max size or timeout values in tests, I can add a short per-feature guide in the docs.
🌿 Generated by Terry
ℹ️ Tag @terragon-labs to ask questions and address PR feedback
📎 Task: https://www.terragonlabs.com/task/694b974b-8b76-40fb-ae28-7049f9401ef0
Summary by Sourcery
Introduce an inbound, transport-agnostic fragment reassembler with size and timeout safeguards, plus supporting errors, tests, and documentation updates.
New Features:
Enhancements:
Documentation:
Tests: