Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 100 additions & 46 deletions tests/lifecycle.rs
Original file line number Diff line number Diff line change
@@ -1,82 +1,136 @@
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
use std::{
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};

use tokio::io::duplex;
use wireframe::app::WireframeApp;
use bytes::BytesMut;
use wireframe::{
app::{Envelope, Packet, WireframeApp},
frame::{FrameProcessor, LengthPrefixedProcessor},
serializer::{BincodeSerializer, Serializer},
};
use wireframe_testing::{processor, run_app_with_frame, run_with_duplex_server};

fn call_counting_callback<R, A>(
counter: &Arc<AtomicUsize>,
result: R,
) -> impl Fn(A) -> Pin<Box<dyn Future<Output = R> + Send>> + Clone + 'static
where
A: Send + 'static,
R: Clone + Send + 'static,
{
let counter = counter.clone();
move |_| {
let counter = counter.clone();
let result = result.clone();
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
result
})
}
}

fn wireframe_app_with_lifecycle_callbacks<E>(
setup: &Arc<AtomicUsize>,
teardown: &Arc<AtomicUsize>,
state: u32,
) -> WireframeApp<BincodeSerializer, u32, E>
where
E: Packet,
{
let setup_cb = call_counting_callback(setup, state);
let teardown_cb = call_counting_callback(teardown, ());

WireframeApp::<_, _, E>::new_with_envelope()
.unwrap()
.on_connection_setup(move || setup_cb(()))
.unwrap()
.on_connection_teardown(teardown_cb)
.unwrap()
}

#[tokio::test]
async fn setup_and_teardown_callbacks_run() {
let setup_count = Arc::new(AtomicUsize::new(0));
let teardown_count = Arc::new(AtomicUsize::new(0));

let setup_clone = setup_count.clone();
let teardown_clone = teardown_count.clone();

let app = WireframeApp::new()
.unwrap()
.on_connection_setup(move || {
let setup_clone = setup_clone.clone();
async move {
setup_clone.fetch_add(1, Ordering::SeqCst);
42u32
}
})
.unwrap()
.on_connection_teardown(move |state| {
let teardown_clone = teardown_clone.clone();
async move {
assert_eq!(state, 42u32);
teardown_clone.fetch_add(1, Ordering::SeqCst);
}
})
.unwrap();
let app = wireframe_app_with_lifecycle_callbacks::<Envelope>(&setup_count, &teardown_count, 42);

let (_client, server) = duplex(64);
app.handle_connection(server).await;
run_with_duplex_server(app).await;

assert_eq!(setup_count.load(Ordering::SeqCst), 1);
assert_eq!(teardown_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn setup_without_teardown_runs() {
let setup_count = Arc::new(AtomicUsize::new(0));
let setup_clone = setup_count.clone();
let cb = call_counting_callback(&setup_count, ());

let app = WireframeApp::new()
.unwrap()
.on_connection_setup(move || {
let setup_clone = setup_clone.clone();
async move {
setup_clone.fetch_add(1, Ordering::SeqCst);
}
})
.on_connection_setup(move || cb(()))
.unwrap();

let (_client, server) = duplex(64);
app.handle_connection(server).await;
run_with_duplex_server(app).await;

assert_eq!(setup_count.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn teardown_without_setup_does_not_run() {
let teardown_count = Arc::new(AtomicUsize::new(0));
let teardown_clone = teardown_count.clone();
let cb = call_counting_callback(&teardown_count, ());

let app = WireframeApp::new()
.unwrap()
.on_connection_teardown(move |()| {
let teardown_clone = teardown_clone.clone();
async move {
teardown_clone.fetch_add(1, Ordering::SeqCst);
}
})
.on_connection_teardown(cb)
.unwrap();

let (_client, server) = duplex(64);
app.handle_connection(server).await;
run_with_duplex_server(app).await;

assert_eq!(teardown_count.load(Ordering::SeqCst), 0);
}

#[derive(bincode::Encode, bincode::BorrowDecode, PartialEq, Debug)]
struct StateEnvelope {
id: u32,
msg: Vec<u8>,
}

impl wireframe::app::Packet for StateEnvelope {
fn id(&self) -> u32 { self.id }

fn into_parts(self) -> (u32, Vec<u8>) { (self.id, self.msg) }

fn from_parts(id: u32, msg: Vec<u8>) -> Self { Self { id, msg } }
}

#[tokio::test]
async fn helpers_propagate_connection_state() {
let setup = Arc::new(AtomicUsize::new(0));
let teardown = Arc::new(AtomicUsize::new(0));

let app = wireframe_app_with_lifecycle_callbacks::<StateEnvelope>(&setup, &teardown, 7)
.frame_processor(processor())
.route(1, Arc::new(|_: &StateEnvelope| Box::pin(async {})))
.unwrap();

let env = StateEnvelope {
id: 1,
msg: vec![1],
};
let bytes = BincodeSerializer.serialize(&env).unwrap();
let mut frame = BytesMut::new();
LengthPrefixedProcessor::default()
.encode(&bytes, &mut frame)
.unwrap();

let out = run_app_with_frame(app, frame.to_vec()).await.unwrap();
assert!(!out.is_empty());
assert_eq!(setup.load(Ordering::SeqCst), 1);
assert_eq!(teardown.load(Ordering::SeqCst), 1);
}
2 changes: 0 additions & 2 deletions wireframe_testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ tokio = { version = "1", features = ["macros", "rt", "io-util"] }
wireframe = { path = ".." }
bincode = "^2.0"
bytes = "^1.0"

[dev-dependencies]
rstest = "0.18.2"
123 changes: 123 additions & 0 deletions wireframe_testing/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
use bincode::config;
use bytes::BytesMut;
use rstest::fixture;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, duplex};
use wireframe::{
app::{Envelope, Packet, WireframeApp},
frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor},
serializer::Serializer,
};

/// Create a default length-prefixed frame processor for tests.
#[fixture]
#[allow(
unused_braces,
reason = "Clippy is wrong here; this is not a redundant block"
)]
pub fn processor() -> LengthPrefixedProcessor { LengthPrefixedProcessor::default() }

pub trait TestSerializer:
Serializer + FrameMetadata<Frame = Envelope> + Send + Sync + 'static
{
}

impl<T> TestSerializer for T where
T: Serializer + FrameMetadata<Frame = Envelope> + Send + Sync + 'static
{
Expand Down Expand Up @@ -172,3 +182,116 @@ where
LengthPrefixedProcessor::default().encode(&bytes, &mut framed)?;
drive_with_frame(app, framed.to_vec()).await
}

/// Run `app` with a single input `frame` using the default buffer capacity.
///
/// # Errors
///
/// Returns any I/O errors encountered while interacting with the in-memory
/// duplex stream.
pub async fn run_app_with_frame<S, C, E>(
app: WireframeApp<S, C, E>,
frame: Vec<u8>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
run_app_with_frame_with_capacity(app, frame, DEFAULT_CAPACITY).await
}

/// Drive `app` with a single frame using a duplex buffer of `capacity` bytes.
///
/// # Errors
///
/// Propagates any I/O errors from the in-memory connection.
///
/// # Panics
///
/// Panics if the spawned task running the application panics.
pub async fn run_app_with_frame_with_capacity<S, C, E>(
app: WireframeApp<S, C, E>,
frame: Vec<u8>,
capacity: usize,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
run_app_with_frames_with_capacity(app, vec![frame], capacity).await
}

/// Run `app` with multiple input `frames` using the default buffer capacity.
///
/// # Errors
///
/// Returns any I/O errors encountered while interacting with the in-memory
/// duplex stream.
#[allow(dead_code)]
pub async fn run_app_with_frames<S, C, E>(
app: WireframeApp<S, C, E>,
frames: Vec<Vec<u8>>,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
run_app_with_frames_with_capacity(app, frames, DEFAULT_CAPACITY).await
}

/// Drive `app` with multiple frames using a duplex buffer of `capacity` bytes.
///
/// # Errors
///
/// Propagates any I/O errors from the in-memory connection.
///
/// # Panics
///
/// Panics if the spawned task running the application panics.
pub async fn run_app_with_frames_with_capacity<S, C, E>(
app: WireframeApp<S, C, E>,
frames: Vec<Vec<u8>>,
capacity: usize,
) -> io::Result<Vec<u8>>
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
let (mut client, server) = duplex(capacity);
let server_task = tokio::spawn(async move {
app.handle_connection(server).await;
});

for frame in &frames {
client.write_all(frame).await?;
}
client.shutdown().await?;

let mut buf = Vec::new();
client.read_to_end(&mut buf).await?;

server_task.await.unwrap();
Ok(buf)
}

/// Run `app` against an empty duplex stream.
///
/// This helper drives the connection lifecycle without sending any frames,
/// ensuring setup and teardown callbacks execute.
///
/// # Panics
///
/// Panics if `handle_connection` fails.
pub async fn run_with_duplex_server<S, C, E>(app: WireframeApp<S, C, E>)
where
S: TestSerializer,
C: Send + 'static,
E: Packet,
{
let (_client, server) = duplex(64);
app.handle_connection(server).await;
}
6 changes: 6 additions & 0 deletions wireframe_testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ pub use helpers::{
drive_with_frames,
drive_with_frames_mut,
drive_with_frames_with_capacity,
processor,
run_app_with_frame,
run_app_with_frame_with_capacity,
run_app_with_frames,
run_app_with_frames_with_capacity,
run_with_duplex_server,
};