From 5d2f5d04ab2d50898ccdb59e29b197340d1d0ce5 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 3 Jul 2025 12:42:47 +0100 Subject: [PATCH 1/4] Add async-stream response example --- Cargo.lock | 23 +++++++++++++ Cargo.toml | 1 + ...asynchronous-outbound-messaging-roadmap.md | 4 +-- ...-set-philosophy-and-capability-maturity.md | 6 ++-- examples/async_stream.rs | 31 +++++++++++++++++ tests/async_stream.rs | 34 +++++++++++++++++++ 6 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 examples/async_stream.rs create mode 100644 tests/async_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 3905a0aa..eba1e08b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,28 @@ dependencies = [ "memchr", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.88" @@ -1174,6 +1196,7 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" name = "wireframe" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "bincode", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 7775b619..6e879628 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ wireframe_testing = { path = "./wireframe_testing" } logtest = "^2.0" proptest = "^1.0" loom = "^0.7" +async-stream = "0.3" [features] advanced-tests = [] diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index 15fef8c4..488b5437 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -31,8 +31,8 @@ design documents. [Resilience Guide §3.2][resilience-registry]). - [x] **Document `async-stream`** for creating `Response::Stream` values ([Roadmap #2.4][roadmap-2-4]). -- [ ] **Example handler using `async-stream`** demonstrating `Response::Stream` - generation in the examples directory. +- [x] **Example handler using `async-stream`** demonstrating `Response::Stream` + generation in the examples directory (`examples/async_stream.rs`). - [x] **Tests covering streams and push delivery** drawing on [Testing Guide §4][testing-guide-advanced]. diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 7bc3b2e5..c2eb3a57 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -41,8 +41,8 @@ server-initiated pushes and streaming responses. #### The Unified `Response` Enum and Declarative Handler Model To provide a clean, unified API, the handler return type will evolve. A more -ergonomic, declarative approach replaces the previous imperative model. -Handlers will return an enhanced +ergonomic, declarative approach replaces the previous imperative model. Handlers +will return an enhanced `Response` enum, giving developers clear and efficient ways to express their intent. @@ -88,6 +88,8 @@ async fn handle_large_query(req: Request) -> io::Result> { } ``` +See `examples/async_stream.rs` for a runnable demonstration of this pattern. + #### The Connection Actor The underlying engine for this duplex communication is the diff --git a/examples/async_stream.rs b/examples/async_stream.rs new file mode 100644 index 00000000..93d15f6a --- /dev/null +++ b/examples/async_stream.rs @@ -0,0 +1,31 @@ +//! Demonstrates generating `Response::Stream` values using `async-stream`. +//! +//! The `stream_response` function yields five sequential frames using +//! `async_stream::try_stream`. It returns a `Response` that can be +//! consumed by a `ConnectionActor`. + +use async_stream::try_stream; +use futures::StreamExt; +use wireframe::response::Response; + +#[derive(bincode::Encode, bincode::BorrowDecode, Debug, PartialEq)] +struct Frame(u32); + +fn stream_response() -> Response { + let frames = try_stream! { + for n in 0..5u32 { + yield Frame(n); + } + }; + Response::Stream(Box::pin(frames)) +} + +#[tokio::main] +async fn main() { + let Response::Stream(mut stream) = stream_response() else { + return; + }; + while let Some(Ok(frame)) = stream.next().await { + println!("received frame: {frame:?}"); + } +} diff --git a/tests/async_stream.rs b/tests/async_stream.rs new file mode 100644 index 00000000..34449bb1 --- /dev/null +++ b/tests/async_stream.rs @@ -0,0 +1,34 @@ +//! Tests for streams generated with the `async-stream` crate. +//! +//! These ensure that a `ConnectionActor` correctly drains frames from an +//! async-stream based `FrameStream`. + +use async_stream::try_stream; +use rstest::rstest; +use tokio_util::sync::CancellationToken; +use wireframe::{ + connection::ConnectionActor, + push::PushQueues, + response::{FrameStream, WireframeError}, +}; + +fn frame_stream() -> impl futures::Stream>> { + try_stream! { + for n in 0u8..3 { + yield n; + } + } +} + +#[rstest] +#[tokio::test] +async fn async_stream_frames_processed_in_order() { + let (queues, handle) = PushQueues::::bounded(8, 8); + let shutdown = CancellationToken::new(); + let stream: FrameStream = Box::pin(frame_stream()); + + let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![0, 1, 2]); +} From 7cba9cd103b6d490545c069b4563a1ed89e77013 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 3 Jul 2025 17:44:22 +0100 Subject: [PATCH 2/4] Make Response error generic optional --- ...me-1-0-feature-set-philosophy-and-capability-maturity.md | 6 ++---- examples/async_stream.rs | 2 +- src/response.rs | 6 +++--- tests/async_stream.rs | 4 ++-- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index c2eb3a57..16a730db 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -42,10 +42,8 @@ server-initiated pushes and streaming responses. To provide a clean, unified API, the handler return type will evolve. A more ergonomic, declarative approach replaces the previous imperative model. Handlers -will return an enhanced - -`Response` enum, giving developers clear and efficient ways to express their -intent. +will return an enhanced `Response` enum, giving developers clear and efficient +ways to express their intent. Rust diff --git a/examples/async_stream.rs b/examples/async_stream.rs index 93d15f6a..ae1fa78e 100644 --- a/examples/async_stream.rs +++ b/examples/async_stream.rs @@ -11,7 +11,7 @@ use wireframe::response::Response; #[derive(bincode::Encode, bincode::BorrowDecode, Debug, PartialEq)] struct Frame(u32); -fn stream_response() -> Response { +fn stream_response() -> Response { let frames = try_stream! { for n in 0..5u32 { yield Frame(n); diff --git a/src/response.rs b/src/response.rs index 0e975609..47f22bfc 100644 --- a/src/response.rs +++ b/src/response.rs @@ -37,11 +37,11 @@ use futures::stream::Stream; /// /// Each yielded item is a `Result` containing either a frame `F` or a /// [`WireframeError`] describing why the stream failed. -pub type FrameStream = +pub type FrameStream = Pin>> + Send + 'static>>; /// Represents the full response to a request. -pub enum Response { +pub enum Response { /// A single frame reply. Single(F), /// An optimised list of frames. @@ -73,7 +73,7 @@ impl From> for Response { /// A generic error type for wireframe operations. #[derive(Debug)] -pub enum WireframeError { +pub enum WireframeError { /// An error in the underlying transport (e.g., socket closed). Io(std::io::Error), /// A protocol-defined logical error. diff --git a/tests/async_stream.rs b/tests/async_stream.rs index 34449bb1..66b39427 100644 --- a/tests/async_stream.rs +++ b/tests/async_stream.rs @@ -12,7 +12,7 @@ use wireframe::{ response::{FrameStream, WireframeError}, }; -fn frame_stream() -> impl futures::Stream>> { +fn frame_stream() -> impl futures::Stream> { try_stream! { for n in 0u8..3 { yield n; @@ -25,7 +25,7 @@ fn frame_stream() -> impl futures::Stream>> async fn async_stream_frames_processed_in_order() { let (queues, handle) = PushQueues::::bounded(8, 8); let shutdown = CancellationToken::new(); - let stream: FrameStream = Box::pin(frame_stream()); + let stream: FrameStream = Box::pin(frame_stream()); let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); let mut out = Vec::new(); From 94ebb2a2ed0454bf1af7f48e127c616e842c2721 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 3 Jul 2025 23:21:06 +0100 Subject: [PATCH 3/4] Add async-stream diagrams --- examples/async_stream.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 examples/async_stream.md diff --git a/examples/async_stream.md b/examples/async_stream.md new file mode 100644 index 00000000..208c8341 --- /dev/null +++ b/examples/async_stream.md @@ -0,0 +1,39 @@ +# Async Stream Example + +This example demonstrates generating a `Response::Stream` using `async-stream`. +It returns five frames to illustrate the pattern. + +```mermaid +sequenceDiagram + actor User + participant Main as main() + participant StreamResp as stream_response() + participant Stream as Stream + User->>Main: Run example + Main->>StreamResp: Call stream_response() + StreamResp-->>Main: Return Response::Stream + Main->>Stream: Iterate stream.next() (5 times) + Stream-->>Main: Yield Frame(n) + Main->>User: Print received frame: Frame(n) +``` + +```mermaid +classDiagram + class Frame { + +u32 0 + +Debug + +PartialEq + +bincode::Encode + +bincode::BorrowDecode + } + class Response { + <> + +Stream(Pin>>>) + } + Frame <.. Response : used as generic + class stream_response { + +stream_response() Response + } + stream_response ..> Response : returns + stream_response ..> Frame : yields +``` From 395cf6abd72cace80d016de7dcc732d11f1c1fc1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 3 Jul 2025 23:25:51 +0100 Subject: [PATCH 4/4] Fix Response type in async-stream diagram --- examples/async_stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/async_stream.md b/examples/async_stream.md index 208c8341..7c294142 100644 --- a/examples/async_stream.md +++ b/examples/async_stream.md @@ -32,7 +32,7 @@ classDiagram } Frame <.. Response : used as generic class stream_response { - +stream_response() Response + +stream_response() Response } stream_response ..> Response : returns stream_response ..> Frame : yields