Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/hardening-wireframe-a-guide-to-production-resilience.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ This allows the protocol implementation to serialize a proper error frame
(e.g., an SQL error code) to send to the client before terminating the current
operation, rather than just abruptly closing the connection.

When a stream concludes successfully, the connection actor calls the
`stream_end_frame` hook to produce a terminator frame with no payload. This
explicit marker lets clients recognise that the logical stream has ended and
helps avoid lingering resources or stalled state machines. The terminator is
only appended if the protocol supplies one (that is, the hook returns
`Some(frame)`), and the frame passes through the `before_send` hook like any
other, allowing final mutation or metadata to be applied consistently.

### 5.2 Dead Letter Queues (DLQ) for Guaranteed Pushing

In some systems (e.g., financial transactions, critical audit logs), dropping a
Expand Down
2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ stream.
is the unique request ID. For each message in a multi-packet response, this
ID must match the original request's ID.

- [ ] Define a mechanism to signal the end of a multi-packet stream, such as
- [x] Define a mechanism to signal the end of a multi-packet stream, such as
a frame with a specific flag and no payload.

- [ ] **Core Library Implementation:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ async fn handle_large_query(req: Request) -> io::Result<Response<MyFrame>> {

See `examples/async_stream.rs` for a runnable demonstration of this pattern.

Completion of a streaming response is signalled by a protocol-defined
terminator frame. The new `stream_end_frame` hook allows implementations to
emit a frame with an explicit end-of-stream flag and no payload, ensuring
clients can unambiguously detect when a logical stream has finished.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
If the hook returns `None`, no explicit terminator frame is sent and the stream
ends silently; clients detect completion by the normal end-of-stream condition.

#### The Connection Actor

The underlying engine for this duplex communication is the
Expand Down
22 changes: 17 additions & 5 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,9 @@ where
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
let processed = matches!(res, Some(Ok(_)));
let is_none = res.is_none();
self.handle_response(res, state, out)?;
if processed {
let produced = self.handle_response(res, state, out)?;
if produced {
self.after_low();
}
if is_none {
Expand Down Expand Up @@ -396,33 +395,46 @@ where
///
/// Protocol errors are passed to `handle_error` and do not terminate the
/// actor. I/O errors propagate to the caller.
///
/// Returns `true` if a frame was appended to `out`.
fn handle_response(
&mut self,
res: Option<Result<F, WireframeError<E>>>,
state: &mut ActorState,
out: &mut Vec<F>,
) -> Result<(), WireframeError<E>> {
) -> Result<bool, WireframeError<E>> {
let mut produced = false;
match res {
Some(Ok(mut frame)) => {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
produced = true;
}
Some(Err(WireframeError::Protocol(e))) => {
warn!(error = ?e, "protocol error");
self.hooks.handle_error(e, &mut self.ctx);
state.mark_closed();
// Stop polling the response after a protocol error to avoid
// double-closing and duplicate `on_command_end` signalling.
self.response = None;
self.hooks.on_command_end(&mut self.ctx);
crate::metrics::inc_handler_errors();
}
Some(Err(e)) => return Err(e),
None => {
state.mark_closed();
if let Some(mut frame) = self.hooks.stream_end_frame(&mut self.ctx) {
self.hooks.before_send(&mut frame, &mut self.ctx);
out.push(frame);
crate::metrics::inc_frames(crate::metrics::Direction::Outbound);
produced = true;
}
self.hooks.on_command_end(&mut self.ctx);
}
}

Ok(())
Ok(produced)
}

/// Await cancellation on the provided shutdown token.
Expand Down
25 changes: 25 additions & 0 deletions src/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ pub trait WireframeProtocol: Send + Sync + 'static {
/// }
/// ```
fn handle_error(&self, _error: Self::ProtocolError, _ctx: &mut ConnectionContext) {}

/// Produce a frame signalling end-of-stream.
///
/// Implementations should set any protocol-specific flag indicating that
/// no further frames will follow. Returning `None` omits the terminator
/// frame, and the stream ends silently. The `before_send` hook runs for
/// this frame if registered.
fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { None }
}

/// Type alias for the `before_send` callback.
Expand All @@ -67,6 +75,9 @@ type OnCommandEndHook = Box<dyn FnMut(&mut ConnectionContext) + Send + 'static>;
/// Type alias for the `handle_error` callback.
type HandleErrorHook<E> = Box<dyn FnMut(E, &mut ConnectionContext) + Send + 'static>;

/// Type alias for the `stream_end_frame` callback.
type StreamEndHook<F> = Box<dyn FnMut(&mut ConnectionContext) -> Option<F> + Send + 'static>;

/// Callbacks used by the connection actor.
pub struct ProtocolHooks<F, E> {
/// Invoked when a connection is established.
Expand All @@ -77,6 +88,8 @@ pub struct ProtocolHooks<F, E> {
pub on_command_end: Option<OnCommandEndHook>,
/// Invoked when a handler returns a protocol error.
pub handle_error: Option<HandleErrorHook<E>>,
/// Invoked to construct an end-of-stream frame.
pub stream_end: Option<StreamEndHook<F>>,
}

impl<F, E> Default for ProtocolHooks<F, E> {
Expand All @@ -86,6 +99,7 @@ impl<F, E> Default for ProtocolHooks<F, E> {
before_send: None,
on_command_end: None,
handle_error: None,
stream_end: None,
}
}
}
Expand Down Expand Up @@ -118,6 +132,11 @@ impl<F, E> ProtocolHooks<F, E> {
}
}

/// Run the `stream_end_frame` hook if registered.
pub fn stream_end_frame(&mut self, ctx: &mut ConnectionContext) -> Option<F> {
self.stream_end.as_mut().and_then(|hook| hook(ctx))
}

/// Construct hooks from a [`WireframeProtocol`] implementation.
pub fn from_protocol<P>(protocol: &Arc<P>) -> Self
where
Expand All @@ -143,11 +162,17 @@ impl<F, E> ProtocolHooks<F, E> {
protocol_error.handle_error(e, ctx);
}) as HandleErrorHook<P::ProtocolError>;

let protocol_stream_end = Arc::clone(protocol);
let stream_end =
Box::new(move |ctx: &mut ConnectionContext| protocol_stream_end.stream_end_frame(ctx))
as StreamEndHook<F>;

Self {
on_connection_setup: Some(setup),
before_send: Some(before),
on_command_end: Some(end),
handle_error: Some(err),
stream_end: Some(stream_end),
}
}
}
15 changes: 15 additions & 0 deletions tests/common/terminator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! Test protocol that appends a terminator frame at end-of-stream.
//!
//! Used across integration and behavioural tests verifying the stream
//! termination mechanism.
use wireframe::hooks::{ConnectionContext, WireframeProtocol};

/// Protocol that produces `0` as an explicit end-of-stream marker.
pub struct Terminator;

impl WireframeProtocol for Terminator {
type Frame = u8;
type ProtocolError = ();

fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { Some(0) }
}
7 changes: 5 additions & 2 deletions tests/cucumber.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
//! Cucumber test runner for integration tests.
//!
//! Orchestrates two distinct test suites:
//! Orchestrates three distinct test suites:
//! - `PanicWorld`: Tests server resilience during connection panics
//! - `CorrelationWorld`: Tests correlation ID propagation in multi-frame responses
//! - `StreamEndWorld`: Verifies end-of-stream signalling
//!
//! # Example
//!
//! The runner executes feature files sequentially:
//! ```text
//! tests/features/connection_panic.feature -> PanicWorld context
//! tests/features/correlation_id.feature -> CorrelationWorld context
//! tests/features/stream_end.feature -> StreamEndWorld context
//! ```
//!
//! Each context provides specialised step definitions and state management
Expand All @@ -19,10 +21,11 @@ mod steps;
mod world;

use cucumber::World;
use world::{CorrelationWorld, PanicWorld};
use world::{CorrelationWorld, PanicWorld, StreamEndWorld};

#[tokio::main]
async fn main() {
PanicWorld::run("tests/features/connection_panic.feature").await;
CorrelationWorld::run("tests/features/correlation_id.feature").await;
StreamEndWorld::run("tests/features/stream_end.feature").await;
}
4 changes: 4 additions & 0 deletions tests/features/stream_end.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Feature: Stream terminator frame
Scenario: Connection actor emits terminator after stream
When a streaming response completes
Then an end-of-stream frame is sent
1 change: 1 addition & 0 deletions tests/steps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

mod correlation_steps;
mod panic_steps;
mod stream_end_steps;
10 changes: 10 additions & 0 deletions tests/steps/stream_end_steps.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! Steps for stream terminator behavioural tests.
use cucumber::{then, when};

use crate::world::StreamEndWorld;

#[when("a streaming response completes")]
async fn when_stream(world: &mut StreamEndWorld) { world.process().await; }

#[then("an end-of-stream frame is sent")]
fn then_end(world: &mut StreamEndWorld) { world.verify(); }
63 changes: 63 additions & 0 deletions tests/stream_end.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! Tests for explicit end-of-stream signalling.
use std::sync::Arc;

use async_stream::try_stream;
use rstest::rstest;
use tokio_util::sync::CancellationToken;
use wireframe::{
connection::ConnectionActor,
hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol},
push::PushQueues,
response::FrameStream,
};

#[path = "common/terminator.rs"]
mod terminator;
use terminator::Terminator;

#[rstest]
#[tokio::test]
async fn emits_end_frame() {
let stream: FrameStream<u8> = Box::pin(try_stream! {
yield 1;
yield 2;
});

let (queues, handle) = PushQueues::bounded(1, 1);
let shutdown = CancellationToken::new();
let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator));
let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks);

let mut out = Vec::new();
actor.run(&mut out).await.expect("actor run failed");

assert_eq!(out, vec![1, 2, 0]);
}

#[rstest]
#[tokio::test]
async fn emits_no_end_frame_when_none() {
struct NoTerminator;

impl WireframeProtocol for NoTerminator {
type Frame = u8;
type ProtocolError = ();

fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option<Self::Frame> { None }
}

let stream: FrameStream<u8> = Box::pin(try_stream! {
yield 7;
yield 8;
});

let (queues, handle) = PushQueues::bounded(1, 1);
let shutdown = CancellationToken::new();
let hooks = ProtocolHooks::from_protocol(&Arc::new(NoTerminator));
let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks);

let mut out = Vec::new();
actor.run(&mut out).await.expect("actor run failed");

assert_eq!(out, vec![7, 8]);
}
40 changes: 39 additions & 1 deletion tests/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Provides shared state management for behavioural tests verifying
//! server resilience against connection task panics.

use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use async_stream::try_stream;
use cucumber::World;
Expand All @@ -12,6 +12,7 @@ use tokio_util::sync::CancellationToken;
use wireframe::{
app::{Envelope, Packet, WireframeApp},
connection::ConnectionActor,
hooks::ProtocolHooks,
push::PushQueues,
response::FrameStream,
server::WireframeServer,
Expand All @@ -20,6 +21,9 @@ use wireframe::{
#[path = "common/mod.rs"]
mod common;
use common::unused_listener;
#[path = "common/terminator.rs"]
mod terminator;
use terminator::Terminator;

#[derive(Debug)]
struct PanicServer {
Expand Down Expand Up @@ -157,3 +161,37 @@ impl CorrelationWorld {
);
}
}

/// Cucumber world that captures frames from a streaming response and verifies
/// that a protocol-provided terminator frame is appended at end-of-stream.
#[derive(Debug, Default, World)]
pub struct StreamEndWorld {
frames: Vec<u8>,
}

Comment thread
coderabbitai[bot] marked this conversation as resolved.
impl StreamEndWorld {
/// Run the connection actor and record emitted frames.
///
/// # Panics
/// Panics if the actor fails to run successfully.
pub async fn process(&mut self) {
let stream: FrameStream<u8> = Box::pin(try_stream! {
yield 1u8;
yield 2u8;
});

let (queues, handle) = PushQueues::bounded(1, 1);
let shutdown = CancellationToken::new();
let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator));
let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks);
actor.run(&mut self.frames).await.expect("actor run failed");
}

/// Verify that a terminator frame was appended to the stream.
///
/// # Panics
/// Panics if the expected terminator is missing.
pub fn verify(&self) {
assert_eq!(self.frames, vec![1, 2, 0]);
}
}
Loading