diff --git a/examples/echo.rs b/examples/echo.rs index bcae69fc..c9eb5c74 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -1,22 +1,11 @@ use std::io; -use wireframe::{ - app::{Middleware, WireframeApp}, - server::WireframeServer, -}; - -/// Simple middleware demonstrating the `wrap` API. -/// -/// `Middleware` has no hooks yet, so this type is just a marker. -struct Logger; -impl Middleware for Logger {} +use wireframe::{app::WireframeApp, server::WireframeServer}; #[tokio::main] async fn main() -> io::Result<()> { let factory = || { WireframeApp::new() - .unwrap() - .wrap(Logger) .unwrap() .route( 1, diff --git a/tests/routes.rs b/tests/routes.rs index 9e38fe2f..9e7070a0 100644 --- a/tests/routes.rs +++ b/tests/routes.rs @@ -7,13 +7,13 @@ use bytes::BytesMut; use wireframe::{ Serializer, app::WireframeApp, - frame::FrameProcessor, + frame::{FrameProcessor, LengthPrefixedProcessor}, message::Message, serializer::BincodeSerializer, }; mod util; -use util::{default_processor, run_app_with_frame}; +use util::{run_app_with_frame, run_app_with_frames}; #[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] struct TestEnvelope { @@ -28,10 +28,9 @@ struct Echo(u8); async fn handler_receives_message_and_echoes_response() { let called = Arc::new(AtomicUsize::new(0)); let called_clone = called.clone(); - let processor = default_processor(); let app = WireframeApp::new() .unwrap() - .frame_processor(processor) + .frame_processor(LengthPrefixedProcessor::default()) .route( 1, Box::new(move |_| { @@ -50,12 +49,17 @@ async fn handler_receives_message_and_echoes_response() { }; let env_bytes = BincodeSerializer.serialize(&env).unwrap(); let mut framed = BytesMut::new(); - processor.encode(&env_bytes, &mut framed).unwrap(); + LengthPrefixedProcessor::default() + .encode(&env_bytes, &mut framed) + .unwrap(); let out = run_app_with_frame(app, framed.to_vec()).await.unwrap(); let mut buf = BytesMut::from(&out[..]); - let frame = processor.decode(&mut buf).unwrap().unwrap(); + let frame = LengthPrefixedProcessor::default() + .decode(&mut buf) + .unwrap() + .unwrap(); let (resp_env, _) = BincodeSerializer .deserialize::(&frame) .unwrap(); @@ -63,3 +67,50 @@ async fn handler_receives_message_and_echoes_response() { assert_eq!(echo, Echo(42)); assert_eq!(called.load(Ordering::SeqCst), 1); } + +#[tokio::test] +async fn multiple_frames_processed_in_sequence() { + let app = WireframeApp::new() + .unwrap() + .frame_processor(LengthPrefixedProcessor::default()) + .route(1, Box::new(|_| Box::pin(async {}))) + .unwrap(); + + let frames: Vec> = (1u8..=2) + .map(|id| { + let msg_bytes = Echo(id).to_bytes().unwrap(); + let env = TestEnvelope { + id: 1, + msg: msg_bytes, + }; + let env_bytes = BincodeSerializer.serialize(&env).unwrap(); + let mut framed = BytesMut::new(); + LengthPrefixedProcessor::default() + .encode(&env_bytes, &mut framed) + .unwrap(); + framed.to_vec() + }) + .collect(); + + let out = run_app_with_frames(app, frames).await.unwrap(); + + let mut buf = BytesMut::from(&out[..]); + let first = LengthPrefixedProcessor::default() + .decode(&mut buf) + .unwrap() + .unwrap(); + let (env1, _) = BincodeSerializer + .deserialize::(&first) + .unwrap(); + let (echo1, _) = Echo::from_bytes(&env1.msg).unwrap(); + let second = LengthPrefixedProcessor::default() + .decode(&mut buf) + .unwrap() + .unwrap(); + let (env2, _) = BincodeSerializer + .deserialize::(&second) + .unwrap(); + let (echo2, _) = Echo::from_bytes(&env2.msg).unwrap(); + assert_eq!(echo1, Echo(1)); + assert_eq!(echo2, Echo(2)); +} diff --git a/tests/util.rs b/tests/util.rs index 16371c41..94bfa634 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -36,13 +36,42 @@ pub async fn run_app_with_frame_with_capacity( app: WireframeApp, frame: Vec, capacity: usize, +) -> io::Result> { + run_app_with_frames_with_capacity(app, vec![frame], capacity).await +} + +/// Run `app` with multiple input `frames` using the default buffer capacity. +/// +/// # Errors +/// +/// Returns any I/O errors encountered while interacting with the in-memory +/// duplex stream. +pub async fn run_app_with_frames(app: WireframeApp, frames: Vec>) -> io::Result> { + run_app_with_frames_with_capacity(app, frames, DEFAULT_CAPACITY).await +} + +/// Drive `app` with multiple frames using a duplex buffer of `capacity` bytes. +/// +/// # Errors +/// +/// Propagates any I/O errors from the in-memory connection. +/// +/// # Panics +/// +/// Panics if the spawned task running the application panics. +pub async fn run_app_with_frames_with_capacity( + app: WireframeApp, + frames: Vec>, + capacity: usize, ) -> io::Result> { let (mut client, server) = duplex(capacity); let server_task = tokio::spawn(async move { app.handle_connection(server).await; }); - client.write_all(&frame).await?; + for frame in &frames { + client.write_all(frame).await?; + } client.shutdown().await?; let mut buf = Vec::new(); @@ -51,9 +80,3 @@ pub async fn run_app_with_frame_with_capacity( server_task.await.unwrap(); Ok(buf) } - -/// Convenience for constructing a default length-prefixed processor. -#[must_use] -pub fn default_processor() -> wireframe::frame::LengthPrefixedProcessor { - wireframe::frame::LengthPrefixedProcessor::default() -}