Integrate inbound MessageAssembler; add assembly helper and routing#458
Integrate inbound MessageAssembler; add assembly helper and routing#458
Conversation
Introduces a comprehensive execution plan document detailing the integration of the MessageAssembler trait into the inbound connection path. The plan covers constraints, tolerances, risks, progress stages, tests, and documentation updates corresponding to roadmap items 8.2.5 and 8.2.6. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Reviewer's GuideIntegrates protocol-level MessageAssembler into the inbound connection path via a new frame-handling assembly helper, centralizes envelope decoding, wires per-connection assembly state and purge logic into WireframeApp, and adds focused unit/BDD tests plus documentation and roadmap updates for the new behaviour. Sequence diagram for inbound frame processing with message assemblysequenceDiagram
actor Client
participant Connection as WireframeApp_connection
participant FrameHandling as frame_handling
participant Assembler as MessageAssembler
participant AssemblyState as MessageAssemblyState
participant Handler as HandlerService
Client->>Connection: receive_frame()
loop per_frame
Connection->>FrameHandling: decode_envelope(frame)
FrameHandling-->>Connection: Option Envelope
alt envelope_decoded
Connection->>FrameHandling: reassemble_if_needed(env, fragmentation)
FrameHandling-->>Connection: Option Envelope
alt post_fragment_envelope
Connection->>FrameHandling: assemble_if_needed(AssemblyRuntime, deser_failures, env, MAX_DESER_FAILURES)
activate FrameHandling
FrameHandling->>Assembler: parse_frame_header(env.payload_bytes())
Assembler-->>FrameHandling: ParsedFrameHeader
FrameHandling->>AssemblyState: accept_first_frame_or_continuation(...)
AssemblyState-->>FrameHandling: Option AssembledMessage
deactivate FrameHandling
alt assembly_complete
FrameHandling-->>Connection: Some Envelope
Connection->>Handler: forward_response(env)
Handler-->>Connection: handle_result
else assembly_in_progress
FrameHandling-->>Connection: None
end
else no_post_fragment_envelope
FrameHandling-->>Connection: None
end
else decode_failed_or_skipped
FrameHandling-->>Connection: None
end
end
Sequence diagram for read-timeout purging of fragmentation and message assembliessequenceDiagram
actor Client
participant Connection as WireframeApp_connection
participant FrameHandling as frame_handling
participant FragState as FragmentationState
participant AssemblyState as MessageAssemblyState
Client->>Connection: idle
Connection->>Connection: wait_for_frame_with_timeout()
Connection-->>Connection: Err timeout
Connection->>Connection: purge_expired(fragmentation, message_assembly)
alt fragmentation_state_present
Connection->>FragState: purge_expired()
FragState-->>Connection: evicted_fragment_keys
end
Connection->>FrameHandling: purge_expired_assemblies(message_assembly)
FrameHandling->>AssemblyState: purge_expired()
AssemblyState-->>FrameHandling: evicted_message_keys
FrameHandling-->>Connection: return
Connection-->>Client: continue_waiting_for_next_frame
Updated class diagram for inbound message assembly helpers and related typesclassDiagram
class AssemblyRuntime {
+Option~&Arc_MessageAssembler~~ assembler
+Option~MessageAssemblyState~* state
+new(Option~&Arc_MessageAssembler~~ assembler, Option~MessageAssemblyState~* state) AssemblyRuntime
}
class MessageAssemblyState {
+new(NonZeroUsize max_message_size, Duration timeout) MessageAssemblyState
+purge_expired() Vec~AssemblyKey~
+accept_first_frame(FirstFrameInput input) Result~Option~AssembledMessage~, AssemblyError~
+accept_continuation_frame(ContinuationFrameHeader header, &[u8] frame_bytes) Result~Option~AssembledMessage~, AssemblyError~
}
class MessageAssembler {
<<trait>>
+parse_frame_header(&[u8] payload) Result~ParsedFrameHeader, AssemblyError~
}
class Envelope {
+u32 id
+Option~u64~ correlation_id
+Vec~u8~ payload
+new(u32 id, Option~u64~ correlation_id, Vec~u8~ payload) Envelope
+payload_bytes() &[u8]
}
class DeserFailureTracker {
+new(u32* failures, u32 max_failures) DeserFailureTracker
+record(Option~u64~ correlation_id, &str context, io::Error err) io::Result~()~
}
class FirstFrameHeader {
+usize metadata_len
+usize body_len
}
class ContinuationFrameHeader {
+usize body_len
}
class FirstFrameInput {
+new(FirstFrameHeader header, Vec~u8~ metadata, &[u8] body) Result~FirstFrameInput, AssemblyError~
}
class AssembledMessage {
+metadata() &[u8]
+body() &[u8]
}
class FrameHeader {
<<enum>>
First(FirstFrameHeader)
Continuation(ContinuationFrameHeader)
}
class AssemblyContext {
-MessageAssemblyState* state
-DeserFailureTracker* failures
-Option~u64~ correlation_id
}
%% Free helper functions in assembly.rs represented as a utility class
class AssemblyHelpers {
+new_message_assembly_state(Option~FragmentationConfig~ fragmentation, usize frame_budget) MessageAssemblyState
+purge_expired_assemblies(Option~MessageAssemblyState~* assembly) void
+assemble_if_needed(AssemblyRuntime runtime, u32* deser_failures, Envelope env, u32 max_deser_failures) io::Result~Option~Envelope~~
+process_first_frame(AssemblyContext* context, FirstFrameHeader* header, &[u8] frame_bytes) io::Result~Option~AssembledMessage~~
+process_continuation_frame(AssemblyContext* context, ContinuationFrameHeader* header, &[u8] frame_bytes) io::Result~Option~AssembledMessage~~
+envelope_from_assembled(u32 id, Option~u64~ correlation_id, AssembledMessage* assembled) Envelope
}
class FragmentationConfig {
+NonZeroUsize max_message_size
+Duration reassembly_timeout
}
AssemblyRuntime --> MessageAssembler : uses
AssemblyRuntime --> MessageAssemblyState : holds_mutably
AssemblyHelpers ..> AssemblyRuntime : constructs
AssemblyHelpers ..> MessageAssemblyState : creates_and_purges
AssemblyHelpers ..> DeserFailureTracker : uses
AssemblyHelpers ..> Envelope : creates
AssemblyHelpers ..> FirstFrameHeader : reads
AssemblyHelpers ..> ContinuationFrameHeader : reads
AssemblyHelpers ..> AssembledMessage : consumes
AssemblyHelpers ..> FragmentationConfig : reads
DeserFailureTracker --> Envelope : correlates_with
MessageAssembler --> FrameHeader : returns
FrameHeader --> FirstFrameHeader : variant
FrameHeader --> ContinuationFrameHeader : variant
MessageAssemblyState --> FirstFrameInput : consumes
MessageAssemblyState --> ContinuationFrameHeader : reads
MessageAssemblyState --> AssembledMessage : returns
AssemblyContext --> MessageAssemblyState : holds_mutably
AssemblyContext --> DeserFailureTracker : holds_mutably
AssemblyContext --> Envelope : correlates_via_id
AssembledMessage --> Envelope : converted_into_via_AssemblyHelpers
FragmentationConfig --> MessageAssemblyState : configures_new_state
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughIntegrate inbound message assembly into the connection path: add frame_handling decode and assembly helpers, wire MessageAssemblyState and MessageAssembler into connection runtime, propagate EnvelopeRouting through assembly types, centralise decode/assemble logic, add unit and BDD integration tests, and update docs and roadmap entries. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Inbound Stream
participant Conn as Connection Handler
participant Decode as Frame Handling (decode)
participant Assemble as Frame Handling (assembly)
participant Dispatch as Handler Dispatch
Client->>Conn: deliver frame
Conn->>Decode: decode_envelope(parse_result, frame, &mut deser_failures, MAX_DESER_FAILURES)
Decode->>Decode: parse envelope, derive correlation_id, track failures
Decode-->>Conn: Ok(Some(Envelope)) / Ok(None) / Err(close)
Conn->>Assemble: assemble_if_needed(AssemblyRuntime{assembler?, state?}, &mut deser_failures, env, MAX_DESER_FAILURES)
alt assembler configured and frame is fragmented
Assemble->>Assemble: parse headers, validate lengths
alt first frame
Assemble->>Assemble: create PartialAssembly (store routing)
Assemble-->>Conn: Ok(None)
else continuation
Assemble->>Assemble: append to partial, check ordering
alt complete
Assemble->>Assemble: build AssembledMessage (preserve routing)
Assemble-->>Conn: Ok(Some(Envelope))
else awaiting more
Assemble-->>Conn: Ok(None)
end
end
else no assembler or non-fragmented
Assemble-->>Conn: Ok(Some(Envelope)) (pass-through)
end
Conn->>Dispatch: dispatch assembled envelope to handler
Dispatch->>Dispatch: invoke handler
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts (beta)
Comment |
|
@coderabbitai please create a new GitHub issue for the following: Connection module naming is ambiguous: |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 46
The issue documents the confusion between |
…ction path - Added protocol-level message assembly on the inbound path after transport fragmentation reassembly and before handler dispatch. - Implemented parsing and continuity validation for message assembly frames with unified failure handling using deserialization failure policy. - Introduced new modules for assembly logic (`assembly.rs`) and envelope decoding (`decode.rs`) to streamline connection code. - Updated `WireframeApp` connection handling to create and maintain message assembly state alongside fragmentation state. - Added detailed unit tests and behavioral tests for interleaving, ordering violations, timeout purging, and integration scenarios. - Upgraded `rstest-bdd` dev-dependencies to v0.5.0 to support new behavioral tests. - Updated documentation, exec plans, and roadmap to reflect completed message assembly integration (items 8.2.5 and 8.2.6). This enhancement completes the roadmap for streaming requests and shared message assembly, improving protocol multiplexing and robustness on the inbound connection path. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
This comment was marked as resolved.
This comment was marked as resolved.
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/fixtures/message_assembly_inbound.rs Comment on lines +142 to +149 pub fn send_continuation_frame(&mut self, key: u64, sequence: u32, body: &str) -> TestResult {
self.send_payload(continuation_frame_payload(
key,
sequence,
body.as_bytes(),
false,
))
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
…er method Refactored the sending of continuation frames in message_assembly_inbound.rs by combining repeated code into a new helper method `send_continuation_frame_impl`. This reduces duplication and clarifies the distinction between final and non-final continuation frames while preserving existing functionality. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Excess Number of Function Argumentstests/fixtures/message_assembly_inbound.rs: MessageAssemblyInboundWorld.send_continuation_frame_impl What lead to degradation?MessageAssemblyInboundWorld.send_continuation_frame_impl has 5 arguments, max arguments = 4 Why does this problem occur?Functions with many arguments indicate either a) low cohesion where the function has too many responsibilities, or b) a missing abstraction that encapsulates those arguments. How to fix it?Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring INTRODUCE PARAMETER OBJECT to encapsulate arguments that refer to the same logical concept. |
This comment was marked as resolved.
This comment was marked as resolved.
Introduce MessageKey and FrameSequence newtype structs to enhance type safety in the message assembly inbound test fixture. Update related methods to accept these types via Into conversions, improving code clarity and preventing misuse of raw integers. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: tests/fixtures/message_assembly_inbound.rs Comment on file //! `MessageAssemblyInboundWorld` fixture for inbound assembly integration.
❌ New issue: Primitive Obsession |
This comment was marked as resolved.
This comment was marked as resolved.
…ly tests Refactor send_continuation_frame_impl method to accept a ContinuationFrameParams struct instead of multiple parameters, improving code clarity and maintainability in the message assembly inbound test fixture. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: Code Duplicationsrc/app/frame_handling/assembly_tests.rs: What lead to degradation?The module contains 2 functions with similar structure: inbound_assembly_handles_interleaved_sequences,inbound_assembly_rejects_ordering_violations Why does this problem occur?Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health. How to fix it?A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. Read More |
This comment was marked as resolved.
This comment was marked as resolved.
… code Introduced `process_assembly_frame` helper function to encapsulate calls to `assemble_if_needed` with assembly state and parameters. Replaced repetitive assembly invocation patterns in integration tests with this helper to improve code readability and maintainability without changing test logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
This comment was marked as resolved.
This comment was marked as resolved.
…sembly timeout test Replaced the use of thread::sleep with advancing a synthetic clock (Instant) in the inbound assembly timeout test. This change makes the test deterministic and independent of real wall-clock scheduling, improving reliability of timeout purging behavior verification. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/app/frame_handling/assembly_tests.rs`:
- Around line 24-30: The test fixture message_assembly_state uses
NonZeroUsize::new(1024).unwrap_or(...); replace the redundant unwrap_or fallback
with .expect("1024 is non-zero") to make the invariant explicit and provide a
clear failure message; update the call inside MessageAssemblyState::new in the
fixture (function message_assembly_state) accordingly.
…bly tests Refactor MessageAssemblyInboundWorld tests to use a current-thread Tokio runtime with all features enabled. Pause the Tokio clock at test start and replace sleep calls with time advancements. This improves test determinism and makes time-dependent actions independent of wall-clock scheduling. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/fixtures/message_assembly_inbound.rs`:
- Around line 239-264: Extract the hard-coded millisecond timeouts used in
assert_received_payload and assert_received_count into clearly named constants
(e.g., PAYLOAD_COLLECT_TIMEOUT_MS and COUNT_COLLECT_TIMEOUT_MS or a single
OBSERVED_COLLECT_TIMEOUT_MS if same) and replace the literal
Duration::from_millis(200) and Duration::from_millis(120) calls in those methods
with Duration::from_millis(CONSTANT). Update the constants' placement near the
top of the file or module so both functions (assert_received_payload and
assert_received_count) can reference them and ensure the intent and units
(milliseconds) are clear in the constant names.
…t frame - Introduce `EnvelopeRouting` struct to carry envelope_id and correlation_id. - Extend `FirstFrameInput` and `AssembledMessage` to include `EnvelopeRouting`. - Modify message assembly state to track and propagate routing metadata through assembly. - Update frame handling and assembly tests to preserve routing info on completed messages. - Ensure completed messages use envelope routing metadata from the initial first frame regardless of continuation frames. - Improves message dispatching and correlation by retaining original envelope identifiers throughout multi-frame message assembly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 2829
✅ All Resolved
All review comments have been successfully addressed. The codebase now has centralised test helpers, comprehensive test coverage, improved error handling ergonomics, better documentation, and deterministic time-based testing. |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
Verify each finding against the current code and only fix it if needed.
In `@src/message_assembler/types.rs`:
- Around line 11-23: Replace the primitive fields in EnvelopeRouting with
newtype wrappers: change pub envelope_id: u32 to pub envelope_id: EnvelopeId and
change pub correlation_id: Option<u64> to pub correlation_id:
Option<CorrelationId>; add or reuse simple newtypes EnvelopeId(u32) and
CorrelationId(u64) (derive/impl Clone, Copy, Debug, PartialEq, Eq, Default as
needed) and update any code that constructs or reads EnvelopeRouting (e.g.,
places that build from the transport frame or match on
envelope_id/correlation_id) to convert between the primitive and newtype
(From/Into or explicit constructors) so routing metadata is type-safe without
altering the external behavior of AssembledMessage dispatch or logging.
In `@tests/fixtures/message_assembly_inbound.rs`:
- Around line 18-38: Add Rustdoc comments for all public types and methods:
document the structs MessageKey and FrameSequence with /// explaining their
purpose and wrapped value, add docs for the function
message_assembly_inbound_world describing what world it constructs/returns, and
annotate the MessageAssemblyInboundWorld type and each of its public methods
with /// describing their behavior, inputs and return values; ensure comments
appear above the type/function/method signatures and follow project style so
cargo doc includes them.
- Around line 275-301: The send_payload function currently takes self.client
before calling serializer.serialize(&envelope) which can ?-return and leave
self.client as None; fix by either performing serialization before taking the
client or, if you must take the client first, ensure you always restore it on
any early return: move the call to serializer.serialize(&envelope) to occur
before self.client.take(), or wrap the serialization in a match/result and
reassign self.client = Some(client) before propagating any Err from serialize,
then proceed with the async send using the restored client (refer to
send_payload, serializer.serialize, self.client, and send_result to locate the
change).
…pes for routing Refactor routing metadata types by replacing raw u32 and u64 identifiers with strongly typed wrappers: EnvelopeId and CorrelationId. This improves type safety and clarity in message assembly and frame handling components. Update related code, tests, and documentation to use these newtypes accordingly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Improve the doc comment for the MessageAssemblyInboundWorld fixture used in BDD test scenarios. Clarifies its purpose for rstest injection and its role in exercising inbound message assembly integration within scenario tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…458) * docs(execplans): add execplan for integrating MessageAssembler inbound Introduces a comprehensive execution plan document detailing the integration of the MessageAssembler trait into the inbound connection path. The plan covers constraints, tolerances, risks, progress stages, tests, and documentation updates corresponding to roadmap items 8.2.5 and 8.2.6. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(app/frame_handling): integrate inbound message assembly in connection path - Added protocol-level message assembly on the inbound path after transport fragmentation reassembly and before handler dispatch. - Implemented parsing and continuity validation for message assembly frames with unified failure handling using deserialization failure policy. - Introduced new modules for assembly logic (`assembly.rs`) and envelope decoding (`decode.rs`) to streamline connection code. - Updated `WireframeApp` connection handling to create and maintain message assembly state alongside fragmentation state. - Added detailed unit tests and behavioral tests for interleaving, ordering violations, timeout purging, and integration scenarios. - Upgraded `rstest-bdd` dev-dependencies to v0.5.0 to support new behavioral tests. - Updated documentation, exec plans, and roadmap to reflect completed message assembly integration (items 8.2.5 and 8.2.6). This enhancement completes the roadmap for streaming requests and shared message assembly, improving protocol multiplexing and robustness on the inbound connection path. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): refactor continuation frame sending into single helper method Refactored the sending of continuation frames in message_assembly_inbound.rs by combining repeated code into a new helper method `send_continuation_frame_impl`. This reduces duplication and clarifies the distinction between final and non-final continuation frames while preserving existing functionality. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): use newtype wrappers for message key and frame sequence Introduce MessageKey and FrameSequence newtype structs to enhance type safety in the message assembly inbound test fixture. Update related methods to accept these types via Into conversions, improving code clarity and preventing misuse of raw integers. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): use ContinuationFrameParams struct in message assembly tests Refactor send_continuation_frame_impl method to accept a ContinuationFrameParams struct instead of multiple parameters, improving code clarity and maintainability in the message assembly inbound test fixture. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(frame_handling): extract helper to simplify assembly testing code Introduced `process_assembly_frame` helper function to encapsulate calls to `assemble_if_needed` with assembly state and parameters. Replaced repetitive assembly invocation patterns in integration tests with this helper to improve code readability and maintainability without changing test logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(frame_handling): unify message assembly error handling and payload helpers - Introduced AssemblyContext::fail_invalid_none for streamlined error recording - Replaced envelope_from_assembled function with Envelope::from_assembled method - Moved test helpers for building first and continuation frame payloads to a common test_helpers module - Updated tests and fixtures to use shared payload builders instead of local copies - Adjusted failure counter reset in connection.rs to occur only after full inbound pipeline success - Minor docs wording improvements This refactor improves consistency in error handling, code reuse for test data generation, and overall maintainability of the message assembly logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * test(frame_handling): replace thread sleep with synthetic clock in assembly timeout test Replaced the use of thread::sleep with advancing a synthetic clock (Instant) in the inbound assembly timeout test. This change makes the test deterministic and independent of real wall-clock scheduling, improving reliability of timeout purging behavior verification. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * test(fixtures): deterministically control tokio time in message assembly tests Refactor MessageAssemblyInboundWorld tests to use a current-thread Tokio runtime with all features enabled. Pause the Tokio clock at test start and replace sleep calls with time advancements. This improves test determinism and makes time-dependent actions independent of wall-clock scheduling. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(message_assembler): preserve envelope routing metadata from first frame - Introduce `EnvelopeRouting` struct to carry envelope_id and correlation_id. - Extend `FirstFrameInput` and `AssembledMessage` to include `EnvelopeRouting`. - Modify message assembly state to track and propagate routing metadata through assembly. - Update frame handling and assembly tests to preserve routing info on completed messages. - Ensure completed messages use envelope routing metadata from the initial first frame regardless of continuation frames. - Improves message dispatching and correlation by retaining original envelope identifiers throughout multi-frame message assembly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(message-assembler): introduce EnvelopeId and CorrelationId newtypes for routing Refactor routing metadata types by replacing raw u32 and u64 identifiers with strongly typed wrappers: EnvelopeId and CorrelationId. This improves type safety and clarity in message assembly and frame handling components. Update related code, tests, and documentation to use these newtypes accordingly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * docs(tests): enhance fixture documentation for inbound message assembly Improve the doc comment for the MessageAssemblyInboundWorld fixture used in BDD test scenarios. Clarifies its purpose for rstest injection and its role in exercising inbound message assembly integration within scenario tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> --------- Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…458) * docs(execplans): add execplan for integrating MessageAssembler inbound Introduces a comprehensive execution plan document detailing the integration of the MessageAssembler trait into the inbound connection path. The plan covers constraints, tolerances, risks, progress stages, tests, and documentation updates corresponding to roadmap items 8.2.5 and 8.2.6. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(app/frame_handling): integrate inbound message assembly in connection path - Added protocol-level message assembly on the inbound path after transport fragmentation reassembly and before handler dispatch. - Implemented parsing and continuity validation for message assembly frames with unified failure handling using deserialization failure policy. - Introduced new modules for assembly logic (`assembly.rs`) and envelope decoding (`decode.rs`) to streamline connection code. - Updated `WireframeApp` connection handling to create and maintain message assembly state alongside fragmentation state. - Added detailed unit tests and behavioral tests for interleaving, ordering violations, timeout purging, and integration scenarios. - Upgraded `rstest-bdd` dev-dependencies to v0.5.0 to support new behavioral tests. - Updated documentation, exec plans, and roadmap to reflect completed message assembly integration (items 8.2.5 and 8.2.6). This enhancement completes the roadmap for streaming requests and shared message assembly, improving protocol multiplexing and robustness on the inbound connection path. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): refactor continuation frame sending into single helper method Refactored the sending of continuation frames in message_assembly_inbound.rs by combining repeated code into a new helper method `send_continuation_frame_impl`. This reduces duplication and clarifies the distinction between final and non-final continuation frames while preserving existing functionality. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): use newtype wrappers for message key and frame sequence Introduce MessageKey and FrameSequence newtype structs to enhance type safety in the message assembly inbound test fixture. Update related methods to accept these types via Into conversions, improving code clarity and preventing misuse of raw integers. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): use ContinuationFrameParams struct in message assembly tests Refactor send_continuation_frame_impl method to accept a ContinuationFrameParams struct instead of multiple parameters, improving code clarity and maintainability in the message assembly inbound test fixture. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(frame_handling): extract helper to simplify assembly testing code Introduced `process_assembly_frame` helper function to encapsulate calls to `assemble_if_needed` with assembly state and parameters. Replaced repetitive assembly invocation patterns in integration tests with this helper to improve code readability and maintainability without changing test logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(frame_handling): unify message assembly error handling and payload helpers - Introduced AssemblyContext::fail_invalid_none for streamlined error recording - Replaced envelope_from_assembled function with Envelope::from_assembled method - Moved test helpers for building first and continuation frame payloads to a common test_helpers module - Updated tests and fixtures to use shared payload builders instead of local copies - Adjusted failure counter reset in connection.rs to occur only after full inbound pipeline success - Minor docs wording improvements This refactor improves consistency in error handling, code reuse for test data generation, and overall maintainability of the message assembly logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * test(frame_handling): replace thread sleep with synthetic clock in assembly timeout test Replaced the use of thread::sleep with advancing a synthetic clock (Instant) in the inbound assembly timeout test. This change makes the test deterministic and independent of real wall-clock scheduling, improving reliability of timeout purging behavior verification. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * test(fixtures): deterministically control tokio time in message assembly tests Refactor MessageAssemblyInboundWorld tests to use a current-thread Tokio runtime with all features enabled. Pause the Tokio clock at test start and replace sleep calls with time advancements. This improves test determinism and makes time-dependent actions independent of wall-clock scheduling. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(message_assembler): preserve envelope routing metadata from first frame - Introduce `EnvelopeRouting` struct to carry envelope_id and correlation_id. - Extend `FirstFrameInput` and `AssembledMessage` to include `EnvelopeRouting`. - Modify message assembly state to track and propagate routing metadata through assembly. - Update frame handling and assembly tests to preserve routing info on completed messages. - Ensure completed messages use envelope routing metadata from the initial first frame regardless of continuation frames. - Improves message dispatching and correlation by retaining original envelope identifiers throughout multi-frame message assembly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(message-assembler): introduce EnvelopeId and CorrelationId newtypes for routing Refactor routing metadata types by replacing raw u32 and u64 identifiers with strongly typed wrappers: EnvelopeId and CorrelationId. This improves type safety and clarity in message assembly and frame handling components. Update related code, tests, and documentation to use these newtypes accordingly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * docs(tests): enhance fixture documentation for inbound message assembly Improve the doc comment for the MessageAssemblyInboundWorld fixture used in BDD test scenarios. Clarifies its purpose for rstest injection and its role in exercising inbound message assembly integration within scenario tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> --------- Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
* docs(execplans): add detailed exec plan for unifying codec handling between app router and Connection actor Introduce a comprehensive 626-line ExecPlan document outlining the design, constraints, risks, progress, and testing strategies for unifying codec handling between the app router and Connection actor in the Wireframe server runtime. This living document elaborates on the existing dual-path architecture, the need for uniform protocol hook invocation, fragmentation, and metrics handling by routing all outbound payloads through a codec-aware ConnectionActor wrapper. The plan defines stages from initial design, scaffolding a codec driver, refactoring response handling, integration and BDD tests, to documentation updates, ensuring source compatibility and no public API breaks. This foundational strategy aims to harmonize streaming, push, and multi-packet responses and enhance robustness and maintainability. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * Unify outbound frame processing through FramePipeline Introduce `FramePipeline` in `src/app/codec_driver.rs` to centralise outbound fragmentation and metrics for handler responses. All outbound envelopes now pass through a single pipeline that applies optional fragmentation before serialisation and codec wrapping. Key changes: - Add `codec_driver` module with `FramePipeline`, `send_envelope`, and `flush_pipeline_output` helpers - Update `frame_handling/core.rs` `ResponseContext` to hold a `FramePipeline` reference instead of raw `FragmentationState` - Simplify `frame_handling/response.rs` to route through the pipeline instead of ad-hoc fragment → serialize → send - Update `frame_handling/reassembly.rs` to access fragmentation state via `FramePipeline::fragmentation_mut()` - Remove duplicate `fragment_responses()`, `serialize_response()`, and `send_response_payload()` helpers - Update exec plan with progress and design decisions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add integration tests for the unified codec pipeline Add `tests/unified_codec.rs` with five integration tests exercising the `FramePipeline`-based outbound path end-to-end: - handler_response_round_trips_through_pipeline - fragmented_response_passes_through_pipeline - small_payload_passes_unfragmented - multiple_sequential_requests_through_pipeline - pipeline_with_no_fragmentation_passes_large_payload Tests use duplex streams with `handle_connection_result` to validate that handler responses flow correctly through the unified pipeline, including both fragmented and unfragmented payloads. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add BDD behavioural tests for the unified codec pipeline Five rstest-bdd scenarios validate the FramePipeline end-to-end: handler round-trip, fragmented response, unfragmented small payload, multiple sequential requests, and disabled-fragmentation passthrough. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Update documentation for the unified codec pipeline Record the FramePipeline unification in the outbound messaging design, multi-packet/streaming design, and users guide. Mark roadmap item 9.3.1 and its sub-items as done. Update exec plan progress to stage H. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Mark exec plan 9.3.1 as done with outcomes and retrospective All stages complete. Quality gates pass. Protocol hooks deferred to follow-up stage due to F::Frame vs Envelope type constraint. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor(tests/fixtures/unified_codec): deduplicate payload verification logic Extracted common verification logic from verify_handler_payloads and verify_response_payloads into a new verify_payloads_match helper method to reduce code duplication and improve maintainability. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * Integrate inbound MessageAssembler; add assembly helper and routing (#458) * docs(execplans): add execplan for integrating MessageAssembler inbound Introduces a comprehensive execution plan document detailing the integration of the MessageAssembler trait into the inbound connection path. The plan covers constraints, tolerances, risks, progress stages, tests, and documentation updates corresponding to roadmap items 8.2.5 and 8.2.6. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(app/frame_handling): integrate inbound message assembly in connection path - Added protocol-level message assembly on the inbound path after transport fragmentation reassembly and before handler dispatch. - Implemented parsing and continuity validation for message assembly frames with unified failure handling using deserialization failure policy. - Introduced new modules for assembly logic (`assembly.rs`) and envelope decoding (`decode.rs`) to streamline connection code. - Updated `WireframeApp` connection handling to create and maintain message assembly state alongside fragmentation state. - Added detailed unit tests and behavioral tests for interleaving, ordering violations, timeout purging, and integration scenarios. - Upgraded `rstest-bdd` dev-dependencies to v0.5.0 to support new behavioral tests. - Updated documentation, exec plans, and roadmap to reflect completed message assembly integration (items 8.2.5 and 8.2.6). This enhancement completes the roadmap for streaming requests and shared message assembly, improving protocol multiplexing and robustness on the inbound connection path. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): refactor continuation frame sending into single helper method Refactored the sending of continuation frames in message_assembly_inbound.rs by combining repeated code into a new helper method `send_continuation_frame_impl`. This reduces duplication and clarifies the distinction between final and non-final continuation frames while preserving existing functionality. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): use newtype wrappers for message key and frame sequence Introduce MessageKey and FrameSequence newtype structs to enhance type safety in the message assembly inbound test fixture. Update related methods to accept these types via Into conversions, improving code clarity and preventing misuse of raw integers. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): use ContinuationFrameParams struct in message assembly tests Refactor send_continuation_frame_impl method to accept a ContinuationFrameParams struct instead of multiple parameters, improving code clarity and maintainability in the message assembly inbound test fixture. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(frame_handling): extract helper to simplify assembly testing code Introduced `process_assembly_frame` helper function to encapsulate calls to `assemble_if_needed` with assembly state and parameters. Replaced repetitive assembly invocation patterns in integration tests with this helper to improve code readability and maintainability without changing test logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(frame_handling): unify message assembly error handling and payload helpers - Introduced AssemblyContext::fail_invalid_none for streamlined error recording - Replaced envelope_from_assembled function with Envelope::from_assembled method - Moved test helpers for building first and continuation frame payloads to a common test_helpers module - Updated tests and fixtures to use shared payload builders instead of local copies - Adjusted failure counter reset in connection.rs to occur only after full inbound pipeline success - Minor docs wording improvements This refactor improves consistency in error handling, code reuse for test data generation, and overall maintainability of the message assembly logic. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * test(frame_handling): replace thread sleep with synthetic clock in assembly timeout test Replaced the use of thread::sleep with advancing a synthetic clock (Instant) in the inbound assembly timeout test. This change makes the test deterministic and independent of real wall-clock scheduling, improving reliability of timeout purging behavior verification. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * test(fixtures): deterministically control tokio time in message assembly tests Refactor MessageAssemblyInboundWorld tests to use a current-thread Tokio runtime with all features enabled. Pause the Tokio clock at test start and replace sleep calls with time advancements. This improves test determinism and makes time-dependent actions independent of wall-clock scheduling. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(message_assembler): preserve envelope routing metadata from first frame - Introduce `EnvelopeRouting` struct to carry envelope_id and correlation_id. - Extend `FirstFrameInput` and `AssembledMessage` to include `EnvelopeRouting`. - Modify message assembly state to track and propagate routing metadata through assembly. - Update frame handling and assembly tests to preserve routing info on completed messages. - Ensure completed messages use envelope routing metadata from the initial first frame regardless of continuation frames. - Improves message dispatching and correlation by retaining original envelope identifiers throughout multi-frame message assembly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * feat(message-assembler): introduce EnvelopeId and CorrelationId newtypes for routing Refactor routing metadata types by replacing raw u32 and u64 identifiers with strongly typed wrappers: EnvelopeId and CorrelationId. This improves type safety and clarity in message assembly and frame handling components. Update related code, tests, and documentation to use these newtypes accordingly. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * docs(tests): enhance fixture documentation for inbound message assembly Improve the doc comment for the MessageAssemblyInboundWorld fixture used in BDD test scenarios. Clarifies its purpose for rstest injection and its role in exercising inbound message assembly integration within scenario tests. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> --------- Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * Fix duplicated message assembly bindings Deduplicate the message assembly local and context field bindings in src/app/connection.rs after the rebase conflict merge. This keeps the merged FramePipeline + message assembly behaviour while restoring a compilable FrameHandlingContext construction/destructuring path. * refactor(unified-codec): unify test setup with shared runtime and helper modules - Introduced a shared Tokio runtime for synchronous BDD test steps to avoid repetitive runtime creation and blocking. - Extracted common send/receive helpers into a shared module for reuse across integration and behavioral tests. - Refactored test fixtures and integration test harness to use shared helpers and runtime. - Improved error handling and test code clarity by eliminating redundant async runtime creations. This refactor improves test code maintainability and performance by centralizing common logic and resource usage. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): consolidate payload verification in unified_codec steps Refactored the 'then_handler_receives_payload' and 'then_handler_receives_reassembled' test steps in `unified_codec_steps.rs` to use a new helper function `collect_and_verify_handler_payloads`. This reduces code duplication and improves test code clarity. Additionally, improved documentation clarity in `docs/execplans/9-3-1-fragment-adapter-trait.md` with expanded acronyms and consistent spelling, and added a doc comment in `unified_codec.rs` for the `setup_harness` function. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * fix(test_helpers): make first and continuation frame payload builders return Results Refactored `first_frame_payload` and `continuation_frame_payload` functions to return `Result<Vec<u8>, io::Error>` instead of unconditionally returning the payload vector. Added error handling for body length exceeding `u32::MAX`. Updated call sites in tests and related code to handle the Result with `.expect("valid test payload")` or propagating the error as appropriate. This improves robustness by explicitly handling potential payload construction failures. Also includes minor documentation and style tweaks unrelated to the core fix. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> * refactor(tests): propagate errors from shared Tokio runtime initialization Changed runtime() to return a Result to handle potential initialization errors. Updated callers to propagate these errors properly instead of panicking. This improves error handling and robustness in the unified_codec_steps test code. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> --------- Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Changes
Core Functionality
src/app/frame_handling/assembly.rsthat:MessageAssembler, and updatesMessageAssemblyState;Envelopewhen assembly is ready for dispatch.src/app/frame_handling/mod.rs.src/app/connection.rsto:reassemble_if_neededand before route lookup;ContinuationFrameParamsand refactorsend_continuation_frame_implto accept a params object, simplifying frame construction and enabling clearer tests.EnvelopeRoutingto carry per-envelope routing metadata from the first frame.FirstFrameInputto include routing data;AssembledMessagenow carries routing information for correct dispatch.src/message_assembler/*to propagate routing through assembly.API Surface
rstest-bddupgrade.EnvelopeRoutingto routing data carried from first frame.FirstFrameInputwith routing;AssembledMessagecarries routing.Testing
rstest-bddv0.5.0 and adjust scaffolding.ContinuationFrameParams.Documentation & Design
docs/users-guide.mdanddocs/roadmap.mdto reflect the new inbound behaviour and mark roadmap items 8.2.5/8.2.6 as done after gates pass.Validation and acceptance
rstest-bddv0.5.0.Plan of work (high level)
src/app/connection.rsand implement purging behavior.Artifacts
src/app/frame_handling/assembly.rs(exported viasrc/app/frame_handling/mod.rs).src/app/connection.rsto support inbound assembly state and invocation.docs/execplans/8-2-5-integrate-with-the-connection-actor-inbound-path.md(present on branch).Notes
📎 Task: https://www.devboxer.com/task/4c43c2fe-e65f-4e59-a3a2-dc176706eff0