Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,51 @@ pub trait WireframeProtocol: Send + Sync + 'static {
WireframeApp::new().with_protocol(MySqlProtocolImpl);
```

```mermaid
classDiagram
class WireframeProtocol {
<<trait>>
+Frame: FrameLike
+ProtocolError
+on_connection_setup(PushHandle<Frame>, &mut ConnectionContext)
+before_send(&mut Frame, &mut ConnectionContext)
+on_command_end(&mut ConnectionContext)
}
class ProtocolHooks {
-before_send: Option<BeforeSendHook<F>>
-on_command_end: Option<OnCommandEndHook>
+before_send(&mut self, &mut F, &mut ConnectionContext)
+on_command_end(&mut self, &mut ConnectionContext)
+from_protocol(protocol: Arc<P>)
}
class ConnectionContext {
<<struct>>
}
class WireframeApp {
-protocol: Option<Arc<dyn WireframeProtocol<Frame=Vec<u8>, ProtocolError=()>>>
+with_protocol(protocol)
+protocol()
+protocol_hooks()
}
class ConnectionActor {
-hooks: ProtocolHooks<F>
-ctx: ConnectionContext
}
WireframeApp --> "1" WireframeProtocol : uses
WireframeApp --> "1" ProtocolHooks : creates
ProtocolHooks --> "1" WireframeProtocol : from_protocol
ConnectionActor --> "1" ProtocolHooks : uses
ConnectionActor --> "1" ConnectionContext : owns
ProtocolHooks --> "1" ConnectionContext : passes to hooks
WireframeProtocol --> "1" ConnectionContext : uses
WireframeProtocol --> "1" PushHandle : uses
WireframeProtocol <|.. ProtocolHooks : implemented by
```

`ConnectionContext` is intentionally empty today. It offers a stable extension
point for per-connection data without breaking existing protocol
implementations.

## 5. Error Handling & Resilience

### 5.1 `BrokenPipe` on Connection Loss
Expand Down
40 changes: 40 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::io::{self, AsyncWrite, AsyncWriteExt};

use crate::{
frame::{FrameProcessor, LengthFormat, LengthPrefixedProcessor},
hooks::{ProtocolHooks, WireframeProtocol},
message::Message,
middleware::{HandlerService, Service, ServiceRequest, Transform},
serializer::{BincodeSerializer, Serializer},
Expand Down Expand Up @@ -80,6 +81,7 @@ pub struct WireframeApp<
app_data: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
on_connect: Option<Arc<ConnectionSetup<C>>>,
on_disconnect: Option<Arc<ConnectionTeardown<C>>>,
protocol: Option<Arc<dyn WireframeProtocol<Frame = Vec<u8>, ProtocolError = ()>>>,
}

/// Alias for asynchronous route handlers.
Expand Down Expand Up @@ -235,6 +237,7 @@ where
app_data: HashMap::new(),
on_connect: None,
on_disconnect: None,
protocol: None,
}
}
}
Expand Down Expand Up @@ -360,6 +363,7 @@ where
app_data: self.app_data,
on_connect: Some(Arc::new(move || Box::pin(f()))),
on_disconnect: None,
protocol: self.protocol,
})
}

Expand All @@ -381,6 +385,41 @@ where
Ok(self)
}

/// Install a [`WireframeProtocol`] implementation.
///
/// The protocol defines hooks for connection setup, frame modification, and
/// command completion. It is wrapped in an [`Arc`] and stored for later use
/// by the connection actor.
#[must_use]
pub fn with_protocol<P>(mut self, protocol: P) -> Self
where
P: WireframeProtocol<Frame = Vec<u8>, ProtocolError = ()> + 'static,
{
self.protocol = Some(Arc::new(protocol));
self
}

/// Get a clone of the configured protocol, if any.
///
/// Returns `None` if no protocol was installed via [`with_protocol`](Self::with_protocol).
#[must_use]
pub fn protocol(
&self,
) -> Option<Arc<dyn WireframeProtocol<Frame = Vec<u8>, ProtocolError = ()>>> {
self.protocol.as_ref().map(Arc::clone)
}

/// Return protocol hooks derived from the installed protocol.
///
/// If no protocol is installed, returns default (no-op) hooks.
#[must_use]
pub fn protocol_hooks(&self) -> ProtocolHooks<Vec<u8>> {
self.protocol
.as_ref()
.map(|p| ProtocolHooks::from_protocol(&Arc::clone(p)))
.unwrap_or_default()
}

/// Set the frame processor used for encoding and decoding frames.
#[must_use]
pub fn frame_processor<P>(mut self, processor: P) -> Self
Expand All @@ -406,6 +445,7 @@ where
app_data: self.app_data,
on_connect: self.on_connect,
on_disconnect: self.on_disconnect,
protocol: self.protocol,
}
}

Expand Down
32 changes: 19 additions & 13 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
hooks::ProtocolHooks,
push::{FrameLike, PushQueues},
hooks::{ConnectionContext, ProtocolHooks},
push::{FrameLike, PushHandle, PushQueues},
response::{FrameStream, WireframeError},
};

Expand Down Expand Up @@ -48,9 +48,9 @@ impl Default for FairnessConfig {
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, _handle) = PushQueues::<u8>::bounded(8, 8);
/// let (queues, handle) = PushQueues::<u8>::bounded(8, 8);
/// let shutdown = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown);
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown);
/// # drop(actor);
/// ```
pub struct ConnectionActor<F, E> {
Expand All @@ -59,6 +59,7 @@ pub struct ConnectionActor<F, E> {
response: Option<FrameStream<F, E>>, // current streaming response
shutdown: CancellationToken,
hooks: ProtocolHooks<F>,
ctx: ConnectionContext,
fairness: FairnessConfig,
high_counter: usize,
high_start: Option<Instant>,
Expand All @@ -76,34 +77,39 @@ where
/// use tokio_util::sync::CancellationToken;
/// use wireframe::{connection::ConnectionActor, push::PushQueues};
///
/// let (queues, _handle) = PushQueues::<u8>::bounded(4, 4);
/// let (queues, handle) = PushQueues::<u8>::bounded(4, 4);
/// let token = CancellationToken::new();
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, token);
/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, token);
/// # drop(actor);
/// ```
#[must_use]
pub fn new(
queues: PushQueues<F>,
handle: PushHandle<F>,
response: Option<FrameStream<F, E>>,
shutdown: CancellationToken,
) -> Self {
Self::with_hooks(queues, response, shutdown, ProtocolHooks::default())
Self::with_hooks(queues, handle, response, shutdown, ProtocolHooks::default())
}

/// Create a new `ConnectionActor` with custom protocol hooks.
#[must_use]
pub fn with_hooks(
queues: PushQueues<F>,
handle: PushHandle<F>,
response: Option<FrameStream<F, E>>,
shutdown: CancellationToken,
hooks: ProtocolHooks<F>,
mut hooks: ProtocolHooks<F>,
) -> Self {
let mut ctx = ConnectionContext;
hooks.on_connection_setup(handle, &mut ctx);
Self {
high_rx: Some(queues.high_priority_rx),
low_rx: Some(queues.low_priority_rx),
response,
shutdown,
hooks,
ctx,
fairness: FairnessConfig::default(),
high_counter: 0,
high_start: None,
Expand Down Expand Up @@ -208,7 +214,7 @@ where
/// Handle the result of polling the high-priority queue.
fn process_high(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
if let Some(mut frame) = res {
self.hooks.before_send(&mut frame);
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_high(out, state);
} else {
Expand All @@ -221,7 +227,7 @@ where
/// Handle the result of polling the low-priority queue.
fn process_low(&mut self, res: Option<F>, state: &mut ActorState, out: &mut Vec<F>) {
if let Some(mut frame) = res {
self.hooks.before_send(&mut frame);
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
} else {
Expand Down Expand Up @@ -274,7 +280,7 @@ where
{
match rx.try_recv() {
Ok(mut frame) => {
self.hooks.before_send(&mut frame);
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
self.after_low();
}
Expand Down Expand Up @@ -317,13 +323,13 @@ where
) -> Result<(), WireframeError<E>> {
match res {
Some(Ok(mut frame)) => {
self.hooks.before_send(&mut frame);
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
}
Some(Err(e)) => return Err(e),
None => {
state.mark_closed();
self.hooks.on_command_end();
self.hooks.on_command_end(&mut self.ctx);
}
}

Expand Down
89 changes: 80 additions & 9 deletions src/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,54 @@
//! Internal protocol hooks called by the connection actor.
//!
//! This module defines [`ProtocolHooks`], a container for optional callback
//! functions invoked during connection output. The hooks are placeholders for
//! the future `WireframeProtocol` trait described in the design documents.
//! This module defines [`ProtocolHooks`] along with the public
//! [`WireframeProtocol`] trait. `ProtocolHooks` stores optional callbacks
//! invoked during connection output. Applications configure these callbacks via
//! an implementation of [`WireframeProtocol`].

use std::sync::Arc;

use crate::push::{FrameLike, PushHandle};

/// Per-connection state passed to protocol callbacks.
///
/// This empty struct is intentionally extensible. Future protocol features may
/// require storing connection-local data without breaking existing APIs.
#[derive(Default)]
pub struct ConnectionContext;
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Trait encapsulating protocol-specific logic and callbacks.
pub trait WireframeProtocol: Send + Sync + 'static {
/// Frame type written to the socket.
type Frame: FrameLike;
/// Custom error type for protocol operations.
type ProtocolError;

/// Called once when a new connection is established. The provided
/// [`PushHandle`] may be stored by the implementation to enable
/// asynchronous server pushes.
fn on_connection_setup(&self, _handle: PushHandle<Self::Frame>, _ctx: &mut ConnectionContext) {}

/// Invoked before any frame (push or response) is written to the socket.
fn before_send(&self, _frame: &mut Self::Frame, _ctx: &mut ConnectionContext) {}

/// Invoked when a request/response cycle completes.
fn on_command_end(&self, _ctx: &mut ConnectionContext) {}
}

/// Type alias for the `before_send` callback.
type BeforeSendHook<F> = Box<dyn FnMut(&mut F) + Send + 'static>;
type BeforeSendHook<F> = Box<dyn FnMut(&mut F, &mut ConnectionContext) + Send + 'static>;

/// Type alias for the `on_connection_setup` callback.
type OnConnectionSetupHook<F> =
Box<dyn FnOnce(PushHandle<F>, &mut ConnectionContext) + Send + 'static>;

/// Type alias for the `on_command_end` callback.
type OnCommandEndHook = Box<dyn FnMut() + Send + 'static>;
type OnCommandEndHook = Box<dyn FnMut(&mut ConnectionContext) + Send + 'static>;

/// Callbacks used by the connection actor.
pub struct ProtocolHooks<F> {
/// Invoked when a connection is established.
pub on_connection_setup: Option<OnConnectionSetupHook<F>>,
/// Invoked before a frame is written to the socket.
pub before_send: Option<BeforeSendHook<F>>,
/// Invoked once a command completes.
Expand All @@ -21,24 +58,58 @@ pub struct ProtocolHooks<F> {
impl<F> Default for ProtocolHooks<F> {
fn default() -> Self {
Self {
on_connection_setup: None,
before_send: None,
on_command_end: None,
}
}
}

impl<F> ProtocolHooks<F> {
/// Run the `on_connection_setup` hook if registered.
pub fn on_connection_setup(&mut self, handle: PushHandle<F>, ctx: &mut ConnectionContext) {
if let Some(hook) = self.on_connection_setup.take() {
hook(handle, ctx);
}
}
/// Run the `before_send` hook if registered.
pub fn before_send(&mut self, frame: &mut F) {
pub fn before_send(&mut self, frame: &mut F, ctx: &mut ConnectionContext) {
if let Some(hook) = &mut self.before_send {
hook(frame);
hook(frame, ctx);
}
}

/// Run the `on_command_end` hook if registered.
pub fn on_command_end(&mut self) {
pub fn on_command_end(&mut self, ctx: &mut ConnectionContext) {
if let Some(hook) = &mut self.on_command_end {
hook();
hook(ctx);
}
}

/// Construct hooks from a [`WireframeProtocol`] implementation.
pub fn from_protocol<P>(protocol: &Arc<P>) -> Self
where
P: WireframeProtocol<Frame = F> + ?Sized,
{
let protocol_before = Arc::clone(protocol);
let before = Box::new(move |frame: &mut F, ctx: &mut ConnectionContext| {
protocol_before.before_send(frame, ctx);
}) as BeforeSendHook<F>;

let protocol_end = Arc::clone(protocol);
let end = Box::new(move |ctx: &mut ConnectionContext| {
protocol_end.on_command_end(ctx);
}) as OnCommandEndHook;

let protocol_setup = Arc::clone(protocol);
let setup = Box::new(move |handle: PushHandle<F>, ctx: &mut ConnectionContext| {
protocol_setup.on_connection_setup(handle, ctx);
}) as OnConnectionSetupHook<F>;

Self {
on_connection_setup: Some(setup),
before_send: Some(before),
on_command_end: Some(end),
}
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ pub mod rewind_stream;
pub mod server;

pub use connection::ConnectionActor;
pub use hooks::ProtocolHooks;
pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol};
pub use response::{FrameStream, Response, WireframeError};
Loading