Refactor: modularize into submodules; rebase onto origin/main#448
Refactor: modularize into submodules; rebase onto origin/main#448
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughSummarise the refactor that splits several large modules into focused submodules, re-exports the original public API surfaces, and relocates tests into dedicated test files; introduces modularised WireframeApp and WireframeClient builders, reorganised frame-handling and codec recovery, and a new extractor subsystem. (≤50 words) Changes
Sequence Diagram(s)(Skipped — changes are primarily reorganisation/refactor rather than a single new multi-component control flow.) Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
Reviewer's GuideRefactors several monolithic modules (app builder, frame handling, extractor, client builder, codec recovery, and server config tests) into structured submodules, introduces new extractor and codec recovery primitives, updates frame response handling to be codec-aware, and rewrites server backoff and related server-config tests to be parameterised with broader coverage while preserving the public API surface. Sequence diagram for codec-aware response forwardingsequenceDiagram
participant Conn as ConnectionTask
participant FH as frame_handling
participant HS as HandlerService
participant Frag as FragmentationState
participant Ser as Serializer
participant Codec as FrameCodec
participant FramedIO as Framed_ConnectionCodec
Conn->>FH: forward_response(env, service, ctx)
FH->>HS: call(ServiceRequest(payload, correlation_id))
HS-->>FH: Response
FH->>FH: PacketParts::new(env.id, resp.correlation_id, body)
FH->>Frag: fragment(envelope)
Frag-->>FH: Vec<Envelope> responses
loop for each response
FH->>Ser: serialize(response)
Ser-->>FH: Vec<u8> bytes
FH->>Codec: wrap_payload(Bytes::from(bytes))
Codec-->>FH: Frame
FH->>FramedIO: send(Frame)
FramedIO-->>FH: Result
end
FH-->>Conn: io::Result<()>
Class diagram for modularised WireframeApp builderclassDiagram
class WireframeApp~S,C,E,F~ {
+HashMap~u32,Handler~E~~ handlers
+OnceCell~Arc~HashMap~u32,HandlerService~E~~~~ routes
+Vec~Middleware~E~~ middleware
+S serializer
+HashMap~TypeId,Arc~Any+Send+Sync~~ app_data
+Option~Arc~ConnectionSetup~C~~~ on_connect
+Option~Arc~ConnectionTeardown~C~~~ on_disconnect
+Option~Arc~WireframeProtocol~Frame=F::Frame,ProtocolError=()~~ protocol
+Option~mpsc::Sender~Vec~u8~~~~ push_dlq
+F codec
+u64 read_timeout_ms
+Option~FragmentationConfig~ fragmentation
+Option~Arc~MessageAssembler~~ message_assembler
+new() Result~WireframeApp~S,C,E,F~~
+with_serializer(serializer S) Result~WireframeApp~S,C,E,F~~
+rebuild_with_params~S2,F2~(serializer S2, codec F2, protocol Option~Arc~WireframeProtocol~Frame=F2::Frame,ProtocolError=()~~, fragmentation Option~FragmentationConfig~, message_assembler Option~Arc~MessageAssembler~~) WireframeApp~S2,C,E,F2~
}
class Serializer {
<<interface>>
}
class FrameCodec {
<<interface>>
+wrap_payload(payload Bytes) Frame
+max_frame_length() usize
}
class WireframeProtocol {
<<interface>>
+Frame
+ProtocolError
}
class MessageAssembler {
<<interface>>
}
class Handler~E~ {
}
class HandlerService~E~ {
}
class Middleware~E~ {
<<interface>>
}
class Envelope {
}
class Packet {
<<interface>>
}
class ConnectionSetup~C~ {
<<type alias or trait>>
}
class ConnectionTeardown~C~ {
<<type alias or trait>>
}
class FragmentationConfig {
}
class BincodeSerializer {
}
class LengthDelimitedFrameCodec {
}
WireframeApp --> Serializer : uses S
WireframeApp --> FrameCodec : uses F
WireframeApp --> WireframeProtocol : optional protocol
WireframeApp --> MessageAssembler : optional
WireframeApp --> HandlerService : builds routes
WireframeApp --> Handler : registers
WireframeApp --> Middleware : pipeline
WireframeApp --> ConnectionSetup : on_connect
WireframeApp --> ConnectionTeardown : on_disconnect
Envelope ..|> Packet
BincodeSerializer ..|> Serializer
LengthDelimitedFrameCodec ..|> FrameCodec
Class diagram for new extractor module structureclassDiagram
class FromMessageRequest {
<<trait>>
+from_message_request(req MessageRequest, payload Payload) Result~Self,Error~
+Error
}
class MessageRequest {
+Option~SocketAddr~ peer_addr
+HashMap~TypeId,Arc~Any+Send+Sync~~ app_data
-Option~Mutex~Option~RequestBodyStream~~~ body_stream
+new() MessageRequest
+with_peer_addr(addr Option~SocketAddr~) MessageRequest
+state~T~() Option~SharedState~T~~
+insert_state~T~(state T) void
+set_body_stream(stream RequestBodyStream) void
+take_body_stream() Option~RequestBodyStream~
}
class Payload~'a~ {
+&'a [u8] data
+new(data &'a [u8]) Payload
+advance(count usize) void
+remaining() usize
}
class SharedState~T~ {
+Arc~T~ 0
+from(inner Arc~T~) SharedState~T~
+from(inner T) SharedState~T~
+deref() &T
}
class StreamingBody {
+RequestBodyStream stream
+new(stream RequestBodyStream) StreamingBody
+into_stream() RequestBodyStream
+into_reader() RequestBodyReader
}
class ConnectionInfo {
+Option~SocketAddr~ peer_addr
+peer_addr() Option~SocketAddr~
}
class Message~T~ {
+T 0
+into_inner() T
+deref() &T
}
class ExtractError {
<<enum>>
MissingState(&'static str)
InvalidPayload(DecodeError)
MissingBodyStream
}
class WireMessage {
<<trait>>
+from_bytes(bytes &[u8]) Result~(Self,usize),DecodeError~
}
class DecodeError {
}
class RequestBodyStream {
}
class RequestBodyReader {
+new(stream RequestBodyStream) RequestBodyReader
}
FromMessageRequest <|.. SharedState
FromMessageRequest <|.. StreamingBody
FromMessageRequest <|.. ConnectionInfo
FromMessageRequest <|.. Message
MessageRequest --> SharedState : returns
MessageRequest --> RequestBodyStream : manages
StreamingBody --> RequestBodyStream : wraps
StreamingBody --> RequestBodyReader : adapter
Message --> WireMessage : T bounds WireMessage
WireMessage --> DecodeError
ExtractError ..> DecodeError
SharedState --> "1" T : deref
Class diagram for codec recovery moduleclassDiagram
class RecoveryPolicy {
<<enum>>
Drop
Quarantine
Disconnect
+as_str() &str
}
class RecoveryConfig {
+u32 max_consecutive_drops
+Duration quarantine_duration
+bool log_dropped_frames
+max_consecutive_drops(count u32) RecoveryConfig
+quarantine_duration(duration Duration) RecoveryConfig
+log_dropped_frames(enabled bool) RecoveryConfig
}
class CodecErrorContext {
+Option~u64~ connection_id
+Option~SocketAddr~ peer_address
+Option~u64~ correlation_id
+Option~String~ codec_state
+new() CodecErrorContext
+with_connection_id(id u64) CodecErrorContext
+with_peer_address(addr SocketAddr) CodecErrorContext
+with_correlation_id(id u64) CodecErrorContext
+with_codec_state(state String) CodecErrorContext
}
class RecoveryPolicyHook {
<<trait>>
+recovery_policy(error CodecError, ctx CodecErrorContext) RecoveryPolicy
+quarantine_duration(error CodecError, ctx CodecErrorContext) Duration
+on_error(error CodecError, ctx CodecErrorContext, policy RecoveryPolicy) void
}
class DefaultRecoveryPolicy {
<<struct>>
}
class CodecError {
+default_recovery_policy() RecoveryPolicy
}
DefaultRecoveryPolicy ..|> RecoveryPolicyHook
RecoveryPolicyHook ..> RecoveryPolicy
RecoveryPolicyHook ..> CodecError
RecoveryPolicyHook ..> CodecErrorContext
RecoveryConfig ..> RecoveryPolicy : config for behaviour
CodecErrorContext ..> RecoveryConfig : may use defaults
Class diagram for modular WireframeClientBuilderclassDiagram
class WireframeClientBuilder~S,P,C~ {
+S serializer
+ClientCodecConfig codec_config
+SocketOptions socket_options
+Option~PreambleConfig~P~~ preamble_config
+LifecycleHooks~C~ lifecycle_hooks
+new() WireframeClientBuilder~BincodeSerializer,(),()~
}
class ClientCodecConfig {
+default() ClientCodecConfig
}
class SocketOptions {
+default() SocketOptions
}
class PreambleConfig~P~ {
}
class LifecycleHooks~C~ {
+default() LifecycleHooks~C~
}
class BincodeSerializer {
}
WireframeClientBuilder --> ClientCodecConfig
WireframeClientBuilder --> SocketOptions
WireframeClientBuilder --> PreambleConfig
WireframeClientBuilder --> LifecycleHooks
BincodeSerializer <.. WireframeClientBuilder : default S
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/server/config/tests/tests_preamble.rs Comment on lines +52 to +125 async fn test_preamble_handler_registration(
factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
#[case] handler: PreambleHandlerKind,
) {
let counter = Arc::new(AtomicUsize::new(0));
let c = counter.clone();
let server = server_with_preamble(factory);
let server = match handler {
PreambleHandlerKind::Success => {
server.on_preamble_decode_success(move |_p: &TestPreamble, _| {
let c = c.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Ok(())
})
})
}
PreambleHandlerKind::Failure => {
server.on_preamble_decode_failure(move |_err: &DecodeError, _stream| {
let c = c.clone();
Box::pin(async move {
c.fetch_add(1, Ordering::SeqCst);
Ok::<(), io::Error>(())
})
})
}
};
assert_eq!(counter.load(Ordering::SeqCst), 0);
match handler {
PreambleHandlerKind::Success => {
assert!(server.on_preamble_success.is_some());
let handler = server
.on_preamble_success
.as_ref()
.expect("success handler missing");
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("bind listener");
let addr = listener.local_addr().expect("listener addr");
let _client = TcpStream::connect(addr)
.await
.expect("client connect failed");
let (mut stream, _) = listener.accept().await.expect("accept stream");
let preamble = TestPreamble {
id: 0,
message: String::new(),
};
handler(&preamble, &mut stream)
.await
.expect("handler failed");
}
PreambleHandlerKind::Failure => {
assert!(server.on_preamble_failure.is_some());
let handler = server
.on_preamble_failure
.as_ref()
.expect("failure handler missing");
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("bind listener");
let addr = listener.local_addr().expect("listener addr");
let _client = TcpStream::connect(addr)
.await
.expect("client connect failed");
let (mut stream, _) = listener.accept().await.expect("accept stream");
handler(&DecodeError::UnexpectedEnd { additional: 0 }, &mut stream)
.await
.expect("handler failed");
}
}
assert_eq!(counter.load(Ordering::SeqCst), 1);
}❌ New issue: Large Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix: src/server/config/tests/tests_backoff.rs Comment on lines +144 to +166 fn test_accept_backoff_parameter_swapping(
factory: impl Fn() -> WireframeApp + Send + Sync + Clone + 'static,
) {
let server = WireframeServer::new(factory.clone()).accept_backoff(BackoffConfig {
initial_delay: Duration::from_millis(5),
max_delay: Duration::from_millis(1),
});
assert_eq!(
server.backoff_config.initial_delay,
Duration::from_millis(1)
);
assert_eq!(server.backoff_config.max_delay, Duration::from_millis(5));
let server = WireframeServer::new(factory).accept_backoff(BackoffConfig {
initial_delay: Duration::ZERO,
max_delay: Duration::ZERO,
});
assert_eq!(
server.backoff_config.initial_delay,
Duration::from_millis(1)
);
assert_eq!(server.backoff_config.max_delay, Duration::from_millis(1));
}❌ New issue: Code Duplication |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
3e408b0 to
c2e5431
Compare
c2e5431 to
92faee4
Compare
|
@coderabbitai review |
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/app/builder_defaults.rs`:
- Around line 7-10: Add a Rustdoc comment for the constant
DEFAULT_READ_TIMEOUT_MS so it appears in generated docs: locate the constant
DEFAULT_READ_TIMEOUT_MS in builder_defaults.rs and add a /// comment above it
describing what the constant represents (the default read timeout in
milliseconds) and any important constraints or units (e.g., that values are in
milliseconds and bounded by MIN_READ_TIMEOUT_MS and MAX_READ_TIMEOUT_MS).
In `@src/codec/recovery/tests.rs`:
- Around line 10-20: The fixture functions default_hook and default_ctx include
unnecessary compiler_fence calls; remove the
std::sync::atomic::compiler_fence(std::sync::atomic::Ordering::SeqCst) lines so
each fixture simply returns its value (DefaultRecoveryPolicy for default_hook
and CodecErrorContext::new() for default_ctx). Ensure the fixtures remain
annotated with #[fixture] and keep the function signatures and return types
unchanged while deleting the fence calls.
In `@src/server/mod.rs`:
- Around line 20-22: Add a Rustdoc comment for the crate-visible function
default_worker_count describing its purpose and behavior: that it returns the
number of workers based on std::thread::available_parallelism() with a fallback
of 1, mention the return type usize and that it is crate-visible, and place the
doc comment (///) immediately above the fn default_worker_count() declaration so
it appears in generated cargo doc output.
|
@coderabbitai Have the following now been resolved? |
This comment was marked as resolved.
This comment was marked as resolved.
4085772 to
5fc52e0
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/app/frame_handling/response.rs`:
- Around line 24-48: Update the doc comment for forward_response to explicitly
state that application handler errors are absorbed: note that when
service.call(request) returns Err(e) (the Err(e) branch) the error is logged and
metrics are incremented (crate::metrics::inc_handler_errors()) and the function
returns Ok(()) instead of propagating a transport error, i.e., this is an
intentional "log-and-continue" behavior for application-level handler failures
distinct from transport (io::Error) failures which are still propagated; place
this brief note in the existing /// doc above the function so callers of
forward_response (which accepts an Envelope and creates a ServiceRequest)
understand the semantics.
…client builder Removed the monolithic src/app/builder.rs file and replaced it with a modular builder implementation split across multiple files (codec.rs, config.rs, core.rs, lifecycle.rs, mod.rs, protocol.rs, routing.rs, state.rs). This improves maintainability and readability by organizing related functionality into dedicated modules. Deleted the deprecated src/client/builder.rs and related client builder modules, cleaning up the codebase. Additionally, extracted frame handling helpers into a frame_handling module with submodules for core, reassembly, and response forwarding logic, keeping connection.rs smaller and more focused. Overall, this commit significantly improves project structure by organizing app builder and frame handling code into well-defined modules, and removing outdated client builder code. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…s and add tcp connection setup helper - Replace multiple individual backoff config tests with parameterized test cases for better coverage and conciseness. - Add detailed cases including clamp, swap, and default value scenarios for backoff configuration validation. - Extract and reuse async TCP connection setup in preamble tests to reduce redundancy and improve readability. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…ruct - Introduced BackoffScenario struct to encapsulate test parameters - Updated test cases to use BackoffScenario instances for clarity - Simplified assertions by using scenario description messages - Improved maintainability and readability of backoff config tests Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Run cargo fmt to align parameterised test assertions with project formatting rules.
…client builder Removed the monolithic src/app/builder.rs file and replaced it with a modular builder implementation split across multiple files (codec.rs, config.rs, core.rs, lifecycle.rs, mod.rs, protocol.rs, routing.rs, state.rs). This improves maintainability and readability by organizing related functionality into dedicated modules. Deleted the deprecated src/client/builder.rs and related client builder modules, cleaning up the codebase. Additionally, extracted frame handling helpers into a frame_handling module with submodules for core, reassembly, and response forwarding logic, keeping connection.rs smaller and more focused. Overall, this commit significantly improves project structure by organizing app builder and frame handling code into well-defined modules, and removing outdated client builder code. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…r and improve logging - Replace nested matches with `?` operator in response serialization and sending - Enhance logging messages with consistent formatting and variable interpolation - Add documentation comments for DeserFailureTracker and ResponseContext - Improve error propagation and metric increments - Add and enhance tests with rstest fixtures and parameterization - General code cleanup for readability and safety (e.g., saturating_add) Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
…, tests, and code clarity - Propagate underlying I/O errors in response handling instead of generic messages - Simplify recovery policy tests by removing rstest fixtures - Add Debug impl for SharedState with non-exhaustive output - Document default worker count computation in server module - Minor test assertion cleanup and added doc comment for default read timeout constant Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Apply rustfmt output produced during post-rebase validation so working tree is clean and quality gates remain reproducible.
cbe37fe to
56b1e1f
Compare
Extended the documentation of `forward_response` to clarify its behavior when handling errors from the service call. The docs now explain the difference between application-level errors, which are logged and counted but do not propagate an error, and transport-level errors, which are propagated as `io::Error`. This improves code understandability and maintenance. Co-authored-by: devboxerhub[bot] <devboxerhub[bot]@users.noreply.github.com>
Summary
Changes
Why
How to test
Notes for reviewers
📎 Task: https://www.devboxer.com/task/56865d80-c8c0-4c00-9f2b-fa7dfeb7809d