diff --git a/src/app.rs b/src/app.rs index ce242f67..4dd31b83 100644 --- a/src/app.rs +++ b/src/app.rs @@ -436,10 +436,11 @@ where } } -impl WireframeApp +impl WireframeApp where - S: Serializer + crate::frame::FrameMetadata, + S: Serializer + crate::frame::FrameMetadata + Send + Sync, C: Send + 'static, + E: Packet, { /// Try parsing the frame using [`FrameMetadata::parse`], falling back to /// full deserialization on failure. @@ -589,26 +590,12 @@ where where W: tokio::io::AsyncWrite + Unpin, { + // Parse the frame first; routing is handled below to avoid duplicating + // logic on the success path. let (env, _) = match self.parse_envelope(frame) { - Ok((env, _)) => { + Ok(result) => { *deser_failures = 0; - let (id, bytes) = env.into_parts(); - if let Some(service) = routes.get(&id) { - let request = ServiceRequest::new(bytes); - match service.call(request).await { - Ok(resp) => { - let response = E::from_parts(id, resp.into_inner()); - if let Err(e) = self.send_response(stream, &response).await { - log::warn!("failed to send response: {e}"); - } - } - Err(e) => { - log::warn!("handler error for id {id}: {e}"); - } - } - } else { - log::warn!("no handler for message id {id}"); - } + result } Err(e) => { *deser_failures += 1; diff --git a/tests/util.rs b/tests/util.rs index ac27e08e..2ef136c8 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -42,9 +42,14 @@ pub fn processor() -> LengthPrefixedProcessor { /// /// Returns any I/O errors encountered while interacting with the in-memory /// duplex stream. -pub async fn run_app_with_frame(app: WireframeApp, frame: Vec) -> io::Result> +pub async fn run_app_with_frame( + app: WireframeApp, + frame: Vec, +) -> io::Result> where S: TestSerializer, + C: Send + 'static, + E: Packet, { run_app_with_frame_with_capacity(app, frame, DEFAULT_CAPACITY).await } @@ -58,13 +63,15 @@ where /// # Panics /// /// Panics if the spawned task running the application panics. -pub async fn run_app_with_frame_with_capacity( - app: WireframeApp, +pub async fn run_app_with_frame_with_capacity( + app: WireframeApp, frame: Vec, capacity: usize, ) -> io::Result> where S: TestSerializer, + C: Send + 'static, + E: Packet, { run_app_with_frames_with_capacity(app, vec![frame], capacity).await } @@ -76,12 +83,14 @@ where /// Returns any I/O errors encountered while interacting with the in-memory /// duplex stream. #[allow(dead_code)] -pub async fn run_app_with_frames( - app: WireframeApp, +pub async fn run_app_with_frames( + app: WireframeApp, frames: Vec>, ) -> io::Result> where S: TestSerializer, + C: Send + 'static, + E: Packet, { run_app_with_frames_with_capacity(app, frames, DEFAULT_CAPACITY).await } @@ -95,13 +104,15 @@ where /// # Panics /// /// Panics if the spawned task running the application panics. -pub async fn run_app_with_frames_with_capacity( - app: WireframeApp, +pub async fn run_app_with_frames_with_capacity( + app: WireframeApp, frames: Vec>, capacity: usize, ) -> io::Result> where S: TestSerializer, + C: Send + 'static, + E: Packet, { let (mut client, server) = duplex(capacity); let server_task = tokio::spawn(async move {