Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2024"
serde = { version = "1", features = ["derive"] }
bincode = "2"
tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time", "io-util"] }
tokio-util = "0.7"
Comment thread
leynos marked this conversation as resolved.
futures = "0.3"
async-trait = "0.1"
bytes = "1"
Expand Down
2 changes: 1 addition & 1 deletion docs/asynchronous-outbound-messaging-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ design documents.
channels. See [Design §3.1][design-queues].
- [x] **Unified `Response<F, E>` and `WireframeError<E>` types** to capture
protocol errors and transport failures ([Roadmap #1.1][roadmap-1-1]).
- [ ] **Connection actor** with a biased `select!` loop that polls for shutdown,
- [x] **Connection actor** with a biased `select!` loop that polls for shutdown,
high/low queues and response streams as described in
[Design §3.2][design-write-loop].
- [ ] **Internal protocol hooks** `before_send` and `on_command_end` invoked
Expand Down
10 changes: 6 additions & 4 deletions docs/wireframe-1-0-detailed-development-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ breaking the project down into four distinct phases, each with a set of
well-defined tasks. The dependencies between tasks are explicitly noted to
ensure a logical and stable development progression.

As of this roadmap's publication, only the push queue utilities exist in
`src/push.rs`. No connection actor or write loop has been implemented. Phase 1
therefore begins by introducing the connection actor with its biased `select!`
loop and integrating the push queues.
At the time of writing, the push queue utilities and the connection actor with
its biased `select!` write loop are implemented. The first phase therefore
focuses on integrating the remaining foundational pieces, preparing the API for
public consumption.

## Phase 1: Foundational Mechanics

Expand All @@ -26,6 +26,7 @@ all public-facing features will be built.*
| 1.4 | Initial FragmentStrategy Trait | Define the initial `FragmentStrategy` trait and the `FragmentMeta` struct. Focus on the core methods: `decode_header` and `encode_header`. | Medium | - |
| 1.5 | Basic FragmentAdapter | Implement the `FragmentAdapter` as a `FrameProcessor`. Build the inbound reassembly logic for a single, non-multiplexed stream of fragments and the outbound logic for splitting a single large frame. | Large | #1.4 |
| 1.6 | Internal Hook Plumbing | Add the invocation points for the protocol-specific hooks (`before_send`, `on_command_end`, etc.) within the connection actor, even if the public trait is not yet defined. | Small | #1.3 |

## Phase 2: Public APIs & Developer Ergonomics

*Focus: Exposing the new functionality to developers through a clean, ergonomic,
Expand All @@ -38,6 +39,7 @@ and intuitive.*
| 2.4 | async-stream Integration & Docs | Remove the proposed `FrameSink` from the design. Update the `Response::Stream` handling and write documentation recommending `async-stream` as the canonical way to create streams imperatively. | Small | #1.1 |
| 2.5 | Initial Test Suite | Write unit and integration tests for the new public APIs. Verify that `Response::Vec` and `Response::Stream` work, and that `PushHandle` can successfully send frames that are received by a client. | Large | #2.1, #2.3, #2.4 |
| 2.6 | Basic Fragmentation Example | Implement a simple `FragmentStrategy` (e.g. `LenFlag32K`) and an example showing the `FragmentAdapter` in use. This validates the adapter's basic functionality. | Medium | #1.5, #2.5 |

## Phase 3: Production Hardening & Resilience

*Focus: Adding the critical features required for robust, secure, and reliable
Expand Down
172 changes: 172 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//! Connection actor responsible for outbound frames.
//!
//! The actor polls a shutdown token, high- and low-priority push queues,
//! and an optional response stream using a `tokio::select!` loop. The
//! `biased` keyword ensures high-priority messages are processed before
//! low-priority ones, with streamed responses handled last.

use futures::StreamExt;
use tokio_util::sync::CancellationToken;

use crate::{
push::{FrameLike, PushQueues},
response::{FrameStream, WireframeError},
};

/// Actor driving outbound frame delivery for a connection.
pub struct ConnectionActor<F, E> {
queues: PushQueues<F>,
response: Option<FrameStream<F, E>>, // current streaming response
shutdown: CancellationToken,
}

impl<F, E> ConnectionActor<F, E>
where
F: FrameLike,
{
/// Create a new `ConnectionActor` from the provided components.
#[must_use]
pub fn new(
queues: PushQueues<F>,
response: Option<FrameStream<F, E>>,
shutdown: CancellationToken,
) -> Self {
Self {
queues,
response,
shutdown,
}
}

/// Access the underlying push queues.
///
/// This is mainly used in tests to close the queues when no actor is
/// draining them.
#[must_use]
pub fn queues_mut(&mut self) -> &mut PushQueues<F> { &mut self.queues }

/// Set or replace the current streaming response.
pub fn set_response(&mut self, stream: Option<FrameStream<F, E>>) { self.response = stream; }

/// Get a clone of the shutdown token used by the actor.
#[must_use]
pub fn shutdown_token(&self) -> CancellationToken { self.shutdown.clone() }

/// Drive the actor until all sources are exhausted or shutdown is triggered.
///
/// Frames are appended to `out` in the order they are processed.
///
/// # Errors
///
/// Returns a [`WireframeError`] if the response stream yields an error.
pub async fn run(&mut self, out: &mut Vec<F>) -> Result<(), WireframeError<E>> {
// If cancellation has already been requested, exit immediately. Nothing
// will be drained and any streaming response is abandoned. This mirrors
// a hard shutdown and is required for the tests.
if self.shutdown.is_cancelled() {
return Ok(());
}

let mut state = ActorState::new(self.response.is_none());

while !state.is_done() {
self.poll_sources(&mut state, out).await?;
}

Ok(())
}

async fn poll_sources(
&mut self,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
tokio::select! {
biased;

() = self.shutdown.cancelled(), if !state.shutting_down => {
state.shutting_down = true;
self.start_shutdown(&mut state.resp_closed);
}

res = self.queues.high_priority_rx.recv(), if !state.push.high => {
Self::handle_push(res, &mut state.push.high, out);
}

res = self.queues.low_priority_rx.recv(), if !state.push.low => {
Self::handle_push(res, &mut state.push.low, out);
}

res = async {
if let Some(stream) = &mut self.response {
stream.next().await
} else {
None
}
}, if !state.shutting_down && !state.resp_closed => {
Self::handle_response(res, &mut state.resp_closed, out)?;
}
}

Ok(())
}

fn start_shutdown(&mut self, resp_closed: &mut bool) {
self.queues.high_priority_rx.close();
self.queues.low_priority_rx.close();
// Drop any streaming response so shutdown is prompt. Queued frames are
// still drained, but streamed responses may be truncated.
self.response = None;
*resp_closed = true;
}

fn handle_push(res: Option<F>, closed: &mut bool, out: &mut Vec<F>) {
match res {
Some(frame) => out.push(frame),
None => *closed = true,
}
}

fn handle_response(
res: Option<Result<F, WireframeError<E>>>,
closed: &mut bool,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
match res {
Some(Ok(frame)) => out.push(frame),
Some(Err(e)) => return Err(e),
None => *closed = true,
}

Ok(())
}
}

struct PushClosed {
high: bool,
low: bool,
}

struct ActorState {
push: PushClosed,
resp_closed: bool,
shutting_down: bool,
}

impl ActorState {
fn new(resp_closed: bool) -> Self {
Self {
push: PushClosed {
high: false,
low: false,
},
resp_closed,
shutting_down: false,
}
}

fn is_done(&self) -> bool {
let push_drained = self.push.high && self.push.low;
push_drained && (self.resp_closed || self.shutting_down)
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
pub mod app;
pub mod serializer;
pub use serializer::{BincodeSerializer, Serializer};
pub mod connection;
pub mod extractor;
pub mod frame;
pub mod message;
Expand All @@ -17,4 +18,5 @@ pub mod response;
pub mod rewind_stream;
pub mod server;

pub use connection::ConnectionActor;
pub use response::{FrameStream, Response, WireframeError};
13 changes: 11 additions & 2 deletions src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ impl<F: FrameLike> PushHandle<F> {

/// Receiver ends of the push queues stored by the connection actor.
pub struct PushQueues<F> {
pub high_priority_rx: mpsc::Receiver<F>,
pub low_priority_rx: mpsc::Receiver<F>,
pub(crate) high_priority_rx: mpsc::Receiver<F>,
pub(crate) low_priority_rx: mpsc::Receiver<F>,
}

impl<F: FrameLike> PushQueues<F> {
Expand Down Expand Up @@ -154,4 +154,13 @@ impl<F: FrameLike> PushQueues<F> {
res = self.low_priority_rx.recv() => res.map(|f| (PushPriority::Low, f)),
}
}

/// Close both receivers to prevent further pushes from being accepted.
///
/// This is primarily used in tests to release resources when no actor is
/// draining the queues.
pub fn close(&mut self) {
self.high_priority_rx.close();
self.low_priority_rx.close();
}
}
Loading