diff --git a/Cargo.lock b/Cargo.lock index 3905a0aa..eba1e08b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,28 @@ dependencies = [ "memchr", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.88" @@ -1174,6 +1196,7 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" name = "wireframe" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "bincode", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 7775b619..6e879628 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ wireframe_testing = { path = "./wireframe_testing" } logtest = "^2.0" proptest = "^1.0" loom = "^0.7" +async-stream = "0.3" [features] advanced-tests = [] diff --git a/docs/asynchronous-outbound-messaging-roadmap.md b/docs/asynchronous-outbound-messaging-roadmap.md index 15fef8c4..488b5437 100644 --- a/docs/asynchronous-outbound-messaging-roadmap.md +++ b/docs/asynchronous-outbound-messaging-roadmap.md @@ -31,8 +31,8 @@ design documents. [Resilience Guide §3.2][resilience-registry]). - [x] **Document `async-stream`** for creating `Response::Stream` values ([Roadmap #2.4][roadmap-2-4]). -- [ ] **Example handler using `async-stream`** demonstrating `Response::Stream` - generation in the examples directory. +- [x] **Example handler using `async-stream`** demonstrating `Response::Stream` + generation in the examples directory (`examples/async_stream.rs`). - [x] **Tests covering streams and push delivery** drawing on [Testing Guide §4][testing-guide-advanced]. diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 7bc3b2e5..16a730db 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -41,11 +41,9 @@ server-initiated pushes and streaming responses. #### The Unified `Response` Enum and Declarative Handler Model To provide a clean, unified API, the handler return type will evolve. A more -ergonomic, declarative approach replaces the previous imperative model. -Handlers will return an enhanced - -`Response` enum, giving developers clear and efficient ways to express their -intent. +ergonomic, declarative approach replaces the previous imperative model. Handlers +will return an enhanced `Response` enum, giving developers clear and efficient +ways to express their intent. Rust @@ -88,6 +86,8 @@ async fn handle_large_query(req: Request) -> io::Result> { } ``` +See `examples/async_stream.rs` for a runnable demonstration of this pattern. + #### The Connection Actor The underlying engine for this duplex communication is the diff --git a/examples/async_stream.md b/examples/async_stream.md new file mode 100644 index 00000000..7c294142 --- /dev/null +++ b/examples/async_stream.md @@ -0,0 +1,39 @@ +# Async Stream Example + +This example demonstrates generating a `Response::Stream` using `async-stream`. +It returns five frames to illustrate the pattern. + +```mermaid +sequenceDiagram + actor User + participant Main as main() + participant StreamResp as stream_response() + participant Stream as Stream + User->>Main: Run example + Main->>StreamResp: Call stream_response() + StreamResp-->>Main: Return Response::Stream + Main->>Stream: Iterate stream.next() (5 times) + Stream-->>Main: Yield Frame(n) + Main->>User: Print received frame: Frame(n) +``` + +```mermaid +classDiagram + class Frame { + +u32 0 + +Debug + +PartialEq + +bincode::Encode + +bincode::BorrowDecode + } + class Response { + <> + +Stream(Pin>>>) + } + Frame <.. Response : used as generic + class stream_response { + +stream_response() Response + } + stream_response ..> Response : returns + stream_response ..> Frame : yields +``` diff --git a/examples/async_stream.rs b/examples/async_stream.rs new file mode 100644 index 00000000..ae1fa78e --- /dev/null +++ b/examples/async_stream.rs @@ -0,0 +1,31 @@ +//! Demonstrates generating `Response::Stream` values using `async-stream`. +//! +//! The `stream_response` function yields five sequential frames using +//! `async_stream::try_stream`. It returns a `Response` that can be +//! consumed by a `ConnectionActor`. + +use async_stream::try_stream; +use futures::StreamExt; +use wireframe::response::Response; + +#[derive(bincode::Encode, bincode::BorrowDecode, Debug, PartialEq)] +struct Frame(u32); + +fn stream_response() -> Response { + let frames = try_stream! { + for n in 0..5u32 { + yield Frame(n); + } + }; + Response::Stream(Box::pin(frames)) +} + +#[tokio::main] +async fn main() { + let Response::Stream(mut stream) = stream_response() else { + return; + }; + while let Some(Ok(frame)) = stream.next().await { + println!("received frame: {frame:?}"); + } +} diff --git a/src/response.rs b/src/response.rs index 0e975609..47f22bfc 100644 --- a/src/response.rs +++ b/src/response.rs @@ -37,11 +37,11 @@ use futures::stream::Stream; /// /// Each yielded item is a `Result` containing either a frame `F` or a /// [`WireframeError`] describing why the stream failed. -pub type FrameStream = +pub type FrameStream = Pin>> + Send + 'static>>; /// Represents the full response to a request. -pub enum Response { +pub enum Response { /// A single frame reply. Single(F), /// An optimised list of frames. @@ -73,7 +73,7 @@ impl From> for Response { /// A generic error type for wireframe operations. #[derive(Debug)] -pub enum WireframeError { +pub enum WireframeError { /// An error in the underlying transport (e.g., socket closed). Io(std::io::Error), /// A protocol-defined logical error. diff --git a/tests/async_stream.rs b/tests/async_stream.rs new file mode 100644 index 00000000..66b39427 --- /dev/null +++ b/tests/async_stream.rs @@ -0,0 +1,34 @@ +//! Tests for streams generated with the `async-stream` crate. +//! +//! These ensure that a `ConnectionActor` correctly drains frames from an +//! async-stream based `FrameStream`. + +use async_stream::try_stream; +use rstest::rstest; +use tokio_util::sync::CancellationToken; +use wireframe::{ + connection::ConnectionActor, + push::PushQueues, + response::{FrameStream, WireframeError}, +}; + +fn frame_stream() -> impl futures::Stream> { + try_stream! { + for n in 0u8..3 { + yield n; + } + } +} + +#[rstest] +#[tokio::test] +async fn async_stream_frames_processed_in_order() { + let (queues, handle) = PushQueues::::bounded(8, 8); + let shutdown = CancellationToken::new(); + let stream: FrameStream = Box::pin(frame_stream()); + + let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + assert_eq!(out, vec![0, 1, 2]); +}