From 3e17407e74df6c6d70967a915c87bf1a5ee98a2b Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 20 Jun 2025 17:38:16 +0100 Subject: [PATCH 1/2] Add multi-frame support for test helper * Added run_app_with_frames utilities to drive streaming tests.* Updated existing routes test and added new sequential frames test.* Removed Logger middleware from echo example for clarity. --- examples/echo.rs | 13 +---------- tests/routes.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++---- tests/util.rs | 31 +++++++++++++++++++++++++- 3 files changed, 85 insertions(+), 17 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index bcae69fc..c9eb5c74 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -1,22 +1,11 @@ use std::io; -use wireframe::{ - app::{Middleware, WireframeApp}, - server::WireframeServer, -}; - -/// Simple middleware demonstrating the `wrap` API. -/// -/// `Middleware` has no hooks yet, so this type is just a marker. -struct Logger; -impl Middleware for Logger {} +use wireframe::{app::WireframeApp, server::WireframeServer}; #[tokio::main] async fn main() -> io::Result<()> { let factory = || { WireframeApp::new() - .unwrap() - .wrap(Logger) .unwrap() .route( 1, diff --git a/tests/routes.rs b/tests/routes.rs index e8d45db7..9e7070a0 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -13,7 +13,7 @@ use wireframe::{ }; mod util; -use util::run_app_with_frame; +use util::{run_app_with_frame, run_app_with_frames}; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestEnvelope { @@ -30,7 +30,7 @@ async fn handler_receives_message_and_echoes_response() { let called_clone = called.clone(); let app = WireframeApp::new() .unwrap() - .frame_processor(LengthPrefixedProcessor) + .frame_processor(LengthPrefixedProcessor::default()) .route( 1, Box::new(move |_| { @@ -49,14 +49,17 @@ async fn handler_receives_message_and_echoes_response() { }; let env_bytes = BincodeSerializer.serialize(&env).unwrap(); let mut framed = BytesMut::new(); - LengthPrefixedProcessor + LengthPrefixedProcessor::default() .encode(&env_bytes, &mut framed) .unwrap(); let out = run_app_with_frame(app, framed.to_vec()).await.unwrap(); let mut buf = BytesMut::from(&out[..]); - let frame = LengthPrefixedProcessor.decode(&mut buf).unwrap().unwrap(); + let frame = LengthPrefixedProcessor::default() + .decode(&mut buf) + .unwrap() + .unwrap(); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .unwrap(); @@ -64,3 +67,50 @@ async fn handler_receives_message_and_echoes_response() { assert_eq!(echo, Echo(42)); assert_eq!(called.load(Ordering::SeqCst), 1); } + +#[tokio::test] +async fn multiple_frames_processed_in_sequence() { + let app = WireframeApp::new() + .unwrap() + .frame_processor(LengthPrefixedProcessor::default()) + .route(1, Box::new(|_| Box::pin(async {}))) + .unwrap(); + + let frames: Vec> = (1u8..=2) + .map(|id| { + let msg_bytes = Echo(id).to_bytes().unwrap(); + let env = TestEnvelope { + id: 1, + msg: msg_bytes, + }; + let env_bytes = BincodeSerializer.serialize(&env).unwrap(); + let mut framed = BytesMut::new(); + LengthPrefixedProcessor::default() + .encode(&env_bytes, &mut framed) + .unwrap(); + framed.to_vec() + }) + .collect(); + + let out = run_app_with_frames(app, frames).await.unwrap(); + + let mut buf = BytesMut::from(&out[..]); + let first = LengthPrefixedProcessor::default() + .decode(&mut buf) + .unwrap() + .unwrap(); + let (env1, _) = BincodeSerializer + .deserialize::(&first) + .unwrap(); + let (echo1, _) = Echo::from_bytes(&env1.msg).unwrap(); + let second = LengthPrefixedProcessor::default() + .decode(&mut buf) + .unwrap() + .unwrap(); + let (env2, _) = BincodeSerializer + .deserialize::(&second) + .unwrap(); + let (echo2, _) = Echo::from_bytes(&env2.msg).unwrap(); + assert_eq!(echo1, Echo(1)); + assert_eq!(echo2, Echo(2)); +} diff --git a/tests/util.rs b/tests/util.rs index 66d7e0b4..94bfa634 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -36,13 +36,42 @@ pub async fn run_app_with_frame_with_capacity( app: WireframeApp, frame: Vec, capacity: usize, +) -> io::Result> { + run_app_with_frames_with_capacity(app, vec![frame], capacity).await +} + +/// Run `app` with multiple input `frames` using the default buffer capacity. +/// +/// # Errors +/// +/// Returns any I/O errors encountered while interacting with the in-memory +/// duplex stream. +pub async fn run_app_with_frames(app: WireframeApp, frames: Vec>) -> io::Result> { + run_app_with_frames_with_capacity(app, frames, DEFAULT_CAPACITY).await +} + +/// Drive `app` with multiple frames using a duplex buffer of `capacity` bytes. +/// +/// # Errors +/// +/// Propagates any I/O errors from the in-memory connection. +/// +/// # Panics +/// +/// Panics if the spawned task running the application panics. +pub async fn run_app_with_frames_with_capacity( + app: WireframeApp, + frames: Vec>, + capacity: usize, ) -> io::Result> { let (mut client, server) = duplex(capacity); let server_task = tokio::spawn(async move { app.handle_connection(server).await; }); - client.write_all(&frame).await?; + for frame in &frames { + client.write_all(frame).await?; + } client.shutdown().await?; let mut buf = Vec::new(); From 2054e8e4d4942da3ff2f9ebab4eb8400deb60669 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 20 Jun 2025 23:46:31 +0100 Subject: [PATCH 2/2] Import LengthPrefixedProcessor in tests (#98) --- tests/routes.rs | 5 ++--- tests/util.rs | 6 ------ 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/tests/routes.rs b/tests/routes.rs index 1bfdcfd3..9e7070a0 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -7,13 +7,13 @@ use bytes::BytesMut; use wireframe::{ Serializer, app::WireframeApp, - frame::FrameProcessor, + frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::BincodeSerializer, }; mod util; -use util::{default_processor, run_app_with_frame, run_app_with_frames}; +use util::{run_app_with_frame, run_app_with_frames}; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestEnvelope { @@ -28,7 +28,6 @@ struct Echo(u8); async fn handler_receives_message_and_echoes_response() { let called = Arc::new(AtomicUsize::new(0)); let called_clone = called.clone(); - let processor = default_processor(); let app = WireframeApp::new() .unwrap() .frame_processor(LengthPrefixedProcessor::default()) diff --git a/tests/util.rs b/tests/util.rs index 59387962..94bfa634 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -80,9 +80,3 @@ pub async fn run_app_with_frames_with_capacity( server_task.await.unwrap(); Ok(buf) } - -/// Convenience for constructing a default length-prefixed processor. -#[must_use] -pub fn default_processor() -> wireframe::frame::LengthPrefixedProcessor { - wireframe::frame::LengthPrefixedProcessor::default() -}