Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions docs/wireframe-testing-crate.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ where

These functions mirror the behaviour of `run_app_with_frame` and
`run_app_with_frames` found in the repository’s test utilities. They create a
`tokio::io::duplex` stream, spawn the application as a background task, and
write the provided frame(s) to the client side of the stream. After the
application finishes processing, the helpers collect the bytes written back and
return them for inspection.
`tokio::io::duplex` stream, run the application on the server half, and write
the provided frame(s) to the client side. All helpers delegate to a single
internal function that handles this I/O plumbing, ensuring consistent
behaviour. Should the application panic, the panic message is returned as an
`io::Error` beginning with `server task failed`, helping surface failures in
tests. After the application finishes processing the input frames, the bytes
written back are collected for inspection.

Any I/O errors surfaced by the duplex stream or failures while decoding a
length prefix propagate through the returned `IoResult`. Malformed or truncated
Expand Down
1 change: 1 addition & 0 deletions wireframe_testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ rstest = "0.18.2"
logtest = "2"
log = "0.4"
metrics-util = "0.20"
futures = "0.3"
97 changes: 60 additions & 37 deletions wireframe_testing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use bincode::config;
use bytes::BytesMut;
use rstest::fixture;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, duplex};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, DuplexStream, duplex};
use wireframe::{
app::{Envelope, Packet, WireframeApp},
frame::{FrameMetadata, FrameProcessor, LengthPrefixedProcessor},
Expand All @@ -33,6 +33,53 @@ impl<T> TestSerializer for T where

const DEFAULT_CAPACITY: usize = 4096;

async fn drive_internal<F, Fut>(
server_fn: F,
frames: Vec<Vec<u8>>,
capacity: usize,
) -> io::Result<Vec<u8>>
where
F: FnOnce(DuplexStream) -> Fut,
Fut: std::future::Future<Output = ()> + Send,
{
let (mut client, server) = duplex(capacity);
Comment thread
leynos marked this conversation as resolved.

let server_fut = async {
use futures::FutureExt as _;
let result = std::panic::AssertUnwindSafe(server_fn(server))
.catch_unwind()
.await;
match result {
Ok(_) => Ok(()),
Err(panic) => {
let msg = panic
.downcast_ref::<&str>()
.copied()
.or_else(|| panic.downcast_ref::<String>().map(String::as_str))
.unwrap_or("<non-string panic>");
Err(io::Error::new(
io::ErrorKind::Other,
format!("server task failed: {msg}"),
))
}
}
};

let client_fut = async {
for frame in &frames {
client.write_all(frame).await?;
}
client.shutdown().await?;

let mut buf = Vec::new();
client.read_to_end(&mut buf).await?;
io::Result::Ok(buf)
};

let ((), buf) = tokio::try_join!(server_fut, client_fut)?;
Ok(buf)
}

/// Drive `app` with a single length-prefixed `frame` and return the bytes
/// produced by the server.
///
Expand Down Expand Up @@ -134,26 +181,12 @@ where
C: Send + 'static,
E: Packet,
{
let (mut client, server) = duplex(capacity);
let server_task = tokio::spawn(async move {
app.handle_connection(server).await;
});

for frame in &frames {
client.write_all(frame).await?;
}
client.shutdown().await?;

let mut buf = Vec::new();
client.read_to_end(&mut buf).await?;

match server_task.await {
Ok(_) => Ok(buf),
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!("server task failed: {e}"),
)),
}
drive_internal(
|server| async move { app.handle_connection(server).await },
frames,
capacity,
)
.await
Comment thread
leynos marked this conversation as resolved.
}

/// Feed a single frame into a mutable `app`, allowing the instance to be reused
Expand Down Expand Up @@ -248,22 +281,12 @@ where
C: Send + 'static,
E: Packet,
{
let (mut client, server) = duplex(capacity);

let server_fut = app.handle_connection(server);
let client_fut = async {
for frame in &frames {
client.write_all(frame).await?;
}
client.shutdown().await?;

let mut buf = Vec::new();
client.read_to_end(&mut buf).await?;
io::Result::Ok(buf)
};

let ((), buf) = tokio::join!(server_fut, client_fut);
buf
drive_internal(
|server| async { app.handle_connection(server).await },
frames,
capacity,
)
.await
}

/// Encode `msg` using bincode, frame it and drive `app`.
Expand Down