Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,11 @@ where
}
}

impl<S, C> WireframeApp<S, C>
impl<S, C, E> WireframeApp<S, C, E>
where
S: Serializer + crate::frame::FrameMetadata<Frame = Envelope>,
S: Serializer + crate::frame::FrameMetadata<Frame = Envelope> + Send + Sync,
C: Send + 'static,
E: Packet,
{
/// Try parsing the frame using [`FrameMetadata::parse`], falling back to
/// full deserialization on failure.
Expand Down Expand Up @@ -589,26 +590,12 @@ where
where
W: tokio::io::AsyncWrite + Unpin,
{
// Parse the frame first; routing is handled below to avoid duplicating
// logic on the success path.
let (env, _) = match self.parse_envelope(frame) {
Ok((env, _)) => {
Ok(result) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (review_instructions): Do not remove envelope handling logic without providing an equivalent replacement or justification.

The previous logic handled extracting the id and bytes from the envelope, routing the request, and sending a response or logging errors. The new code simply returns 'result' without any of this handling. This is a significant change in behavior and should be justified or replaced with equivalent logic to avoid breaking functionality.

Review instructions:

Path patterns: **/*

Instructions:
Create code-review comments for ALL issues. Avoid making general observations or non-specific feedback if at all possible.

*deser_failures = 0;
let (id, bytes) = env.into_parts();
if let Some(service) = routes.get(&id) {
let request = ServiceRequest::new(bytes);
match service.call(request).await {
Ok(resp) => {
let response = E::from_parts(id, resp.into_inner());
if let Err(e) = self.send_response(stream, &response).await {
log::warn!("failed to send response: {e}");
}
}
Err(e) => {
log::warn!("handler error for id {id}: {e}");
}
}
} else {
log::warn!("no handler for message id {id}");
}
result
}
Err(e) => {
*deser_failures += 1;
Expand Down
25 changes: 18 additions & 7 deletions tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ pub fn processor() -> LengthPrefixedProcessor {
///
/// Returns any I/O errors encountered while interacting with the in-memory
/// duplex stream.
pub async fn run_app_with_frame<S>(app: WireframeApp<S>, frame: Vec<u8>) -> io::Result<Vec<u8>>
pub async fn run_app_with_frame<S, C, E>(
app: WireframeApp<S, C, E>,
frame: Vec<u8>,
Comment on lines +45 to +47
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding or updating tests that use these helpers to exercise the new generic parameters.

Ensure there are tests using the helpers with various C and E types to validate the generic logic and constraints.

) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
run_app_with_frame_with_capacity(app, frame, DEFAULT_CAPACITY).await
}
Expand All @@ -58,13 +63,15 @@ where
/// # Panics
///
/// Panics if the spawned task running the application panics.
pub async fn run_app_with_frame_with_capacity<S>(
app: WireframeApp<S>,
pub async fn run_app_with_frame_with_capacity<S, C, E>(
app: WireframeApp<S, C, E>,
frame: Vec<u8>,
capacity: usize,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
run_app_with_frames_with_capacity(app, vec![frame], capacity).await
}
Expand All @@ -76,12 +83,14 @@ where
/// Returns any I/O errors encountered while interacting with the in-memory
/// duplex stream.
#[allow(dead_code)]
pub async fn run_app_with_frames<S>(
app: WireframeApp<S>,
pub async fn run_app_with_frames<S, C, E>(
app: WireframeApp<S, C, E>,
frames: Vec<Vec<u8>>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
run_app_with_frames_with_capacity(app, frames, DEFAULT_CAPACITY).await
}
Expand All @@ -95,13 +104,15 @@ where
/// # Panics
///
/// Panics if the spawned task running the application panics.
pub async fn run_app_with_frames_with_capacity<S>(
app: WireframeApp<S>,
pub async fn run_app_with_frames_with_capacity<S, C, E>(
app: WireframeApp<S, C, E>,
frames: Vec<Vec<u8>>,
capacity: usize,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
let (mut client, server) = duplex(capacity);
let server_task = tokio::spawn(async move {
Expand Down