From b3f1621848ed0ea18277fdc04b1cfe11ddcb0bfb Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Jun 2025 01:14:41 +0100 Subject: [PATCH 1/5] Add helper test for connection state --- tests/lifecycle.rs | 70 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index b02a552d..0463d402 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -3,8 +3,16 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, }; +use bytes::BytesMut; use tokio::io::duplex; -use wireframe::app::WireframeApp; +use wireframe::{ + app::WireframeApp, + frame::{FrameProcessor, LengthPrefixedProcessor}, + serializer::{BincodeSerializer, Serializer}, +}; + +mod util; +use util::{processor, run_app_with_frame}; #[tokio::test] async fn setup_and_teardown_callbacks_run() { @@ -80,3 +88,63 @@ async fn teardown_without_setup_does_not_run() { assert_eq!(teardown_count.load(Ordering::SeqCst), 0); } + +#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)] +struct StateEnvelope { + id: u32, + msg: Vec, +} + +impl wireframe::app::Packet for StateEnvelope { + fn id(&self) -> u32 { self.id } + + fn into_parts(self) -> (u32, Vec) { (self.id, self.msg) } + + fn from_parts(id: u32, msg: Vec) -> Self { Self { id, msg } } +} + +#[tokio::test] +async fn helpers_propagate_connection_state() { + let setup = Arc::new(AtomicUsize::new(0)); + let teardown = Arc::new(AtomicUsize::new(0)); + + let setup_clone = setup.clone(); + let teardown_clone = teardown.clone(); + + let app = WireframeApp::<_, _, StateEnvelope>::new_with_envelope() + .unwrap() + .frame_processor(processor()) + .on_connection_setup(move || { + let setup_clone = setup_clone.clone(); + async move { + setup_clone.fetch_add(1, Ordering::SeqCst); + 7u32 + } + }) + .unwrap() + .on_connection_teardown(move |state| { + let teardown_clone = teardown_clone.clone(); + async move { + assert_eq!(state, 7u32); + teardown_clone.fetch_add(1, Ordering::SeqCst); + } + }) + .unwrap() + .route(1, Arc::new(|_: &StateEnvelope| Box::pin(async {}))) + .unwrap(); + + let env = StateEnvelope { + id: 1, + msg: vec![1], + }; + let bytes = BincodeSerializer.serialize(&env).unwrap(); + let mut frame = BytesMut::new(); + LengthPrefixedProcessor::default() + .encode(&bytes, &mut frame) + .unwrap(); + + let out = run_app_with_frame(app, frame.to_vec()).await.unwrap(); + assert!(!out.is_empty()); + assert_eq!(setup.load(Ordering::SeqCst), 1); + assert_eq!(teardown.load(Ordering::SeqCst), 1); +} From 55e1c0e3d262ecb8a8e778a0a34dbe5293aa6874 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Jun 2025 02:18:07 +0100 Subject: [PATCH 2/5] Extract lifecycle app helper --- tests/lifecycle.rs | 59 ++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 0463d402..4ead8f9d 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -6,7 +6,7 @@ use std::sync::{ use bytes::BytesMut; use tokio::io::duplex; use wireframe::{ - app::WireframeApp, + app::{Envelope, Packet, WireframeApp}, frame::{FrameProcessor, LengthPrefixedProcessor}, serializer::{BincodeSerializer, Serializer}, }; @@ -14,32 +14,43 @@ use wireframe::{ mod util; use util::{processor, run_app_with_frame}; -#[tokio::test] -async fn setup_and_teardown_callbacks_run() { - let setup_count = Arc::new(AtomicUsize::new(0)); - let teardown_count = Arc::new(AtomicUsize::new(0)); - - let setup_clone = setup_count.clone(); - let teardown_clone = teardown_count.clone(); +fn wireframe_app_with_lifecycle_callbacks( + setup: &Arc, + teardown: &Arc, + state: u32, +) -> WireframeApp +where + E: Packet, +{ + let setup_clone = setup.clone(); + let teardown_clone = teardown.clone(); - let app = WireframeApp::new() + WireframeApp::<_, _, E>::new_with_envelope() .unwrap() .on_connection_setup(move || { let setup_clone = setup_clone.clone(); async move { setup_clone.fetch_add(1, Ordering::SeqCst); - 42u32 + state } }) .unwrap() - .on_connection_teardown(move |state| { + .on_connection_teardown(move |s| { let teardown_clone = teardown_clone.clone(); async move { - assert_eq!(state, 42u32); + assert_eq!(s, state); teardown_clone.fetch_add(1, Ordering::SeqCst); } }) - .unwrap(); + .unwrap() +} + +#[tokio::test] +async fn setup_and_teardown_callbacks_run() { + let setup_count = Arc::new(AtomicUsize::new(0)); + let teardown_count = Arc::new(AtomicUsize::new(0)); + + let app = wireframe_app_with_lifecycle_callbacks::(&setup_count, &teardown_count, 42); let (_client, server) = duplex(64); app.handle_connection(server).await; @@ -108,28 +119,8 @@ async fn helpers_propagate_connection_state() { let setup = Arc::new(AtomicUsize::new(0)); let teardown = Arc::new(AtomicUsize::new(0)); - let setup_clone = setup.clone(); - let teardown_clone = teardown.clone(); - - let app = WireframeApp::<_, _, StateEnvelope>::new_with_envelope() - .unwrap() + let app = wireframe_app_with_lifecycle_callbacks::(&setup, &teardown, 7) .frame_processor(processor()) - .on_connection_setup(move || { - let setup_clone = setup_clone.clone(); - async move { - setup_clone.fetch_add(1, Ordering::SeqCst); - 7u32 - } - }) - .unwrap() - .on_connection_teardown(move |state| { - let teardown_clone = teardown_clone.clone(); - async move { - assert_eq!(state, 7u32); - teardown_clone.fetch_add(1, Ordering::SeqCst); - } - }) - .unwrap() .route(1, Arc::new(|_: &StateEnvelope| Box::pin(async {}))) .unwrap(); From 0c140177854b3f034ae1050c0f3db134821405b8 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Jun 2025 02:52:45 +0100 Subject: [PATCH 3/5] Refactor lifecycle tests with reusable callbacks --- tests/lifecycle.rs | 67 +++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 4ead8f9d..487dbe39 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -1,6 +1,10 @@ -use std::sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, +use std::{ + future::Future, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, }; use bytes::BytesMut; @@ -14,6 +18,25 @@ use wireframe::{ mod util; use util::{processor, run_app_with_frame}; +fn call_counting_callback( + counter: &Arc, + result: R, +) -> impl Fn(A) -> Pin + Send>> + Clone + 'static +where + A: Send + 'static, + R: Clone + Send + 'static, +{ + let counter = counter.clone(); + move |_| { + let counter = counter.clone(); + let result = result.clone(); + Box::pin(async move { + counter.fetch_add(1, Ordering::SeqCst); + result + }) + } +} + fn wireframe_app_with_lifecycle_callbacks( setup: &Arc, teardown: &Arc, @@ -22,26 +45,14 @@ fn wireframe_app_with_lifecycle_callbacks( where E: Packet, { - let setup_clone = setup.clone(); - let teardown_clone = teardown.clone(); + let setup_cb = call_counting_callback(setup, state); + let teardown_cb = call_counting_callback(teardown, ()); WireframeApp::<_, _, E>::new_with_envelope() .unwrap() - .on_connection_setup(move || { - let setup_clone = setup_clone.clone(); - async move { - setup_clone.fetch_add(1, Ordering::SeqCst); - state - } - }) + .on_connection_setup(move || setup_cb(())) .unwrap() - .on_connection_teardown(move |s| { - let teardown_clone = teardown_clone.clone(); - async move { - assert_eq!(s, state); - teardown_clone.fetch_add(1, Ordering::SeqCst); - } - }) + .on_connection_teardown(teardown_cb) .unwrap() } @@ -61,16 +72,11 @@ async fn setup_and_teardown_callbacks_run() { #[tokio::test] async fn setup_without_teardown_runs() { let setup_count = Arc::new(AtomicUsize::new(0)); - let setup_clone = setup_count.clone(); + let cb = call_counting_callback(&setup_count, ()); let app = WireframeApp::new() .unwrap() - .on_connection_setup(move || { - let setup_clone = setup_clone.clone(); - async move { - setup_clone.fetch_add(1, Ordering::SeqCst); - } - }) + .on_connection_setup(move || cb(())) .unwrap(); let (_client, server) = duplex(64); @@ -82,16 +88,11 @@ async fn setup_without_teardown_runs() { #[tokio::test] async fn teardown_without_setup_does_not_run() { let teardown_count = Arc::new(AtomicUsize::new(0)); - let teardown_clone = teardown_count.clone(); + let cb = call_counting_callback(&teardown_count, ()); let app = WireframeApp::new() .unwrap() - .on_connection_teardown(move |()| { - let teardown_clone = teardown_clone.clone(); - async move { - teardown_clone.fetch_add(1, Ordering::SeqCst); - } - }) + .on_connection_teardown(cb) .unwrap(); let (_client, server) = duplex(64); From d1296dd98e5d2de6a9b080abcc9dee65951c69a0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Mon, 23 Jun 2025 03:12:12 +0100 Subject: [PATCH 4/5] Extract run_with_duplex_server helper --- tests/lifecycle.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index 487dbe39..dac846f8 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -8,7 +8,6 @@ use std::{ }; use bytes::BytesMut; -use tokio::io::duplex; use wireframe::{ app::{Envelope, Packet, WireframeApp}, frame::{FrameProcessor, LengthPrefixedProcessor}, @@ -16,7 +15,7 @@ use wireframe::{ }; mod util; -use util::{processor, run_app_with_frame}; +use util::{processor, run_app_with_frame, run_with_duplex_server}; fn call_counting_callback( counter: &Arc, @@ -63,8 +62,7 @@ async fn setup_and_teardown_callbacks_run() { let app = wireframe_app_with_lifecycle_callbacks::(&setup_count, &teardown_count, 42); - let (_client, server) = duplex(64); - app.handle_connection(server).await; + run_with_duplex_server(app).await; assert_eq!(setup_count.load(Ordering::SeqCst), 1); assert_eq!(teardown_count.load(Ordering::SeqCst), 1); @@ -79,8 +77,7 @@ async fn setup_without_teardown_runs() { .on_connection_setup(move || cb(())) .unwrap(); - let (_client, server) = duplex(64); - app.handle_connection(server).await; + run_with_duplex_server(app).await; assert_eq!(setup_count.load(Ordering::SeqCst), 1); } @@ -95,8 +92,7 @@ async fn teardown_without_setup_does_not_run() { .on_connection_teardown(cb) .unwrap(); - let (_client, server) = duplex(64); - app.handle_connection(server).await; + run_with_duplex_server(app).await; assert_eq!(teardown_count.load(Ordering::SeqCst), 0); } From 69c5f41117cb1d5a3217471cc3e61b31ccb93e2a Mon Sep 17 00:00:00 2001 From: Payton McIntosh Date: Mon, 23 Jun 2025 03:46:30 +0100 Subject: [PATCH 5/5] Incorporate helper functions into `testing` --- Cargo.lock | 1 + tests/lifecycle.rs | 4 +- wireframe_testing/Cargo.toml | 2 - wireframe_testing/src/helpers.rs | 123 +++++++++++++++++++++++++++++++ wireframe_testing/src/lib.rs | 6 ++ 5 files changed, 131 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12b03ee3..3cd970da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -573,6 +573,7 @@ version = "0.1.0" dependencies = [ "bincode", "bytes", + "rstest", "tokio", "wireframe", ] diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index dac846f8..b68d3566 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -13,9 +13,7 @@ use wireframe::{ frame::{FrameProcessor, LengthPrefixedProcessor}, serializer::{BincodeSerializer, Serializer}, }; - -mod util; -use util::{processor, run_app_with_frame, run_with_duplex_server}; +use wireframe_testing::{processor, run_app_with_frame, run_with_duplex_server}; fn call_counting_callback( counter: &Arc, diff --git a/wireframe_testing/Cargo.toml b/wireframe_testing/Cargo.toml index de9f5b66..062b4a05 100644 --- a/wireframe_testing/Cargo.toml +++ b/wireframe_testing/Cargo.toml @@ -8,6 +8,4 @@ tokio = { version = "1", features = ["macros", "rt", "io-util"] } wireframe = { path = ".." } bincode = "^2.0" bytes = "^1.0" - -[dev-dependencies] rstest = "0.18.2" diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 8a754068..a4957233 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -1,5 +1,6 @@ use bincode::config; use bytes::BytesMut; +use rstest::fixture; use tokio::io::{self, AsyncReadExt, AsyncWriteExt, duplex}; use wireframe::{ app::{Envelope, Packet, WireframeApp}, @@ -7,10 +8,19 @@ use wireframe::{ serializer::Serializer, }; +/// Create a default length-prefixed frame processor for tests. +#[fixture] +#[allow( + unused_braces, + reason = "Clippy is wrong here; this is not a redundant block" +)] +pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() } + pub trait TestSerializer: Serializer + FrameMetadata + Send + Sync + 'static { } + impl TestSerializer for T where T: Serializer + FrameMetadata + Send + Sync + 'static { @@ -172,3 +182,116 @@ where LengthPrefixedProcessor::default().encode(&bytes, &mut framed)?; drive_with_frame(app, framed.to_vec()).await } + +/// Run `app` with a single input `frame` using the default buffer capacity. +/// +/// # Errors +/// +/// Returns any I/O errors encountered while interacting with the in-memory +/// duplex stream. +pub async fn run_app_with_frame( + app: WireframeApp, + frame: Vec, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + run_app_with_frame_with_capacity(app, frame, DEFAULT_CAPACITY).await +} + +/// Drive `app` with a single frame using a duplex buffer of `capacity` bytes. +/// +/// # Errors +/// +/// Propagates any I/O errors from the in-memory connection. +/// +/// # Panics +/// +/// Panics if the spawned task running the application panics. +pub async fn run_app_with_frame_with_capacity( + app: WireframeApp, + frame: Vec, + capacity: usize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + run_app_with_frames_with_capacity(app, vec![frame], capacity).await +} + +/// Run `app` with multiple input `frames` using the default buffer capacity. +/// +/// # Errors +/// +/// Returns any I/O errors encountered while interacting with the in-memory +/// duplex stream. +#[allow(dead_code)] +pub async fn run_app_with_frames( + app: WireframeApp, + frames: Vec>, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + run_app_with_frames_with_capacity(app, frames, DEFAULT_CAPACITY).await +} + +/// Drive `app` with multiple frames using a duplex buffer of `capacity` bytes. +/// +/// # Errors +/// +/// Propagates any I/O errors from the in-memory connection. +/// +/// # Panics +/// +/// Panics if the spawned task running the application panics. +pub async fn run_app_with_frames_with_capacity( + app: WireframeApp, + frames: Vec>, + capacity: usize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let (mut client, server) = duplex(capacity); + let server_task = tokio::spawn(async move { + app.handle_connection(server).await; + }); + + for frame in &frames { + client.write_all(frame).await?; + } + client.shutdown().await?; + + let mut buf = Vec::new(); + client.read_to_end(&mut buf).await?; + + server_task.await.unwrap(); + Ok(buf) +} + +/// Run `app` against an empty duplex stream. +/// +/// This helper drives the connection lifecycle without sending any frames, +/// ensuring setup and teardown callbacks execute. +/// +/// # Panics +/// +/// Panics if `handle_connection` fails. +pub async fn run_with_duplex_server(app: WireframeApp) +where + S: TestSerializer, + C: Send + 'static, + E: Packet, +{ + let (_client, server) = duplex(64); + app.handle_connection(server).await; +} diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index bec3748b..183bbf12 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -24,4 +24,10 @@ pub use helpers::{ drive_with_frames, drive_with_frames_mut, drive_with_frames_with_capacity, + processor, + run_app_with_frame, + run_app_with_frame_with_capacity, + run_app_with_frames, + run_app_with_frames_with_capacity, + run_with_duplex_server, };