Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ wireframe_testing = { path = "./wireframe_testing" }
logtest = "^2.0"
proptest = "^1.0"
loom = "^0.7"
async-stream = "0.3"

[features]
advanced-tests = []
Expand Down
4 changes: 2 additions & 2 deletions docs/asynchronous-outbound-messaging-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ design documents.
[Resilience Guide §3.2][resilience-registry]).
- [x] **Document `async-stream`** for creating `Response::Stream` values
([Roadmap #2.4][roadmap-2-4]).
- [ ] **Example handler using `async-stream`** demonstrating `Response::Stream`
generation in the examples directory.
- [x] **Example handler using `async-stream`** demonstrating `Response::Stream`
generation in the examples directory (`examples/async_stream.rs`).
- [x] **Tests covering streams and push delivery** drawing on
[Testing Guide §4][testing-guide-advanced].

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ server-initiated pushes and streaming responses.
#### The Unified `Response` Enum and Declarative Handler Model

To provide a clean, unified API, the handler return type will evolve. A more
ergonomic, declarative approach replaces the previous imperative model.
Handlers will return an enhanced

`Response` enum, giving developers clear and efficient ways to express their
intent.
ergonomic, declarative approach replaces the previous imperative model. Handlers
will return an enhanced `Response` enum, giving developers clear and efficient
ways to express their intent.

Rust

Expand Down Expand Up @@ -88,6 +86,8 @@ async fn handle_large_query(req: Request) -> io::Result<Response<MyFrame>> {
}
```

See `examples/async_stream.rs` for a runnable demonstration of this pattern.

#### The Connection Actor

The underlying engine for this duplex communication is the
Expand Down
39 changes: 39 additions & 0 deletions examples/async_stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Async Stream Example

This example demonstrates generating a `Response::Stream` using `async-stream`.
It returns five frames to illustrate the pattern.

```mermaid
sequenceDiagram
actor User
participant Main as main()
participant StreamResp as stream_response()
participant Stream as Stream<Frame>
User->>Main: Run example
Main->>StreamResp: Call stream_response()
StreamResp-->>Main: Return Response::Stream
Main->>Stream: Iterate stream.next() (5 times)
Stream-->>Main: Yield Frame(n)
Main->>User: Print received frame: Frame(n)
```

```mermaid
classDiagram
class Frame {
+u32 0
+Debug
+PartialEq
+bincode::Encode
+bincode::BorrowDecode
}
class Response {
<<generic>>
+Stream(Pin<Box<dyn Stream<Item=Result<T, E>>>>)
}
Frame <.. Response : used as generic
class stream_response {
+stream_response() Response<Frame>
}
stream_response ..> Response : returns
stream_response ..> Frame : yields
```
31 changes: 31 additions & 0 deletions examples/async_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//! Demonstrates generating `Response::Stream` values using `async-stream`.
//!
//! The `stream_response` function yields five sequential frames using
//! `async_stream::try_stream`. It returns a `Response` that can be
//! consumed by a `ConnectionActor`.

use async_stream::try_stream;
use futures::StreamExt;
use wireframe::response::Response;
Comment thread
leynos marked this conversation as resolved.

#[derive(bincode::Encode, bincode::BorrowDecode, Debug, PartialEq)]
struct Frame(u32);

fn stream_response() -> Response<Frame> {
let frames = try_stream! {
for n in 0..5u32 {
yield Frame(n);
}
};
Response::Stream(Box::pin(frames))
}

#[tokio::main]
async fn main() {
let Response::Stream(mut stream) = stream_response() else {
return;
};
while let Some(Ok(frame)) = stream.next().await {
println!("received frame: {frame:?}");
}
}
6 changes: 3 additions & 3 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ use futures::stream::Stream;
///
/// Each yielded item is a `Result` containing either a frame `F` or a
/// [`WireframeError`] describing why the stream failed.
pub type FrameStream<F, E> =
pub type FrameStream<F, E = ()> =
Pin<Box<dyn Stream<Item = Result<F, WireframeError<E>>> + Send + 'static>>;

/// Represents the full response to a request.
pub enum Response<F, E> {
pub enum Response<F, E = ()> {
/// A single frame reply.
Single(F),
/// An optimised list of frames.
Expand Down Expand Up @@ -73,7 +73,7 @@ impl<F, E> From<Vec<F>> for Response<F, E> {

/// A generic error type for wireframe operations.
#[derive(Debug)]
pub enum WireframeError<E> {
pub enum WireframeError<E = ()> {
/// An error in the underlying transport (e.g., socket closed).
Io(std::io::Error),
/// A protocol-defined logical error.
Expand Down
34 changes: 34 additions & 0 deletions tests/async_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Tests for streams generated with the `async-stream` crate.
//!
//! These ensure that a `ConnectionActor` correctly drains frames from an
//! async-stream based `FrameStream`.

use async_stream::try_stream;
use rstest::rstest;
use tokio_util::sync::CancellationToken;
use wireframe::{
connection::ConnectionActor,
push::PushQueues,
response::{FrameStream, WireframeError},
};

fn frame_stream() -> impl futures::Stream<Item = Result<u8, WireframeError>> {
try_stream! {
for n in 0u8..3 {
yield n;
}
}
}

#[rstest]
#[tokio::test]
async fn async_stream_frames_processed_in_order() {
let (queues, handle) = PushQueues::<u8>::bounded(8, 8);
let shutdown = CancellationToken::new();
let stream: FrameStream<u8> = Box::pin(frame_stream());

let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown);
let mut out = Vec::new();
actor.run(&mut out).await.unwrap();
assert_eq!(out, vec![0, 1, 2]);
}