diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 7f41c0f4..78a6ce68 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -322,6 +322,14 @@ This allows the protocol implementation to serialize a proper error frame (e.g., an SQL error code) to send to the client before terminating the current operation, rather than just abruptly closing the connection. +When a stream concludes successfully, the connection actor calls the +`stream_end_frame` hook to produce a terminator frame with no payload. This +explicit marker lets clients recognise that the logical stream has ended and +helps avoid lingering resources or stalled state machines. The terminator is +only appended if the protocol supplies one (that is, the hook returns +`Some(frame)`), and the frame passes through the `before_send` hook like any +other, allowing final mutation or metadata to be applied consistently. + ### 5.2 Dead Letter Queues (DLQ) for Guaranteed Pushing In some systems (e.g., financial transactions, critical audit logs), dropping a diff --git a/docs/roadmap.md b/docs/roadmap.md index 729a2c36..6d06e610 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -189,7 +189,7 @@ stream. is the unique request ID. For each message in a multi-packet response, this ID must match the original request's ID. - - [ ] Define a mechanism to signal the end of a multi-packet stream, such as + - [x] Define a mechanism to signal the end of a multi-packet stream, such as a frame with a specific flag and no payload. - [ ] **Core Library Implementation:** diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index e5d44ab2..7a70c955 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -89,6 +89,14 @@ async fn handle_large_query(req: Request) -> io::Result> { See `examples/async_stream.rs` for a runnable demonstration of this pattern. +Completion of a streaming response is signalled by a protocol-defined +terminator frame. The new `stream_end_frame` hook allows implementations to +emit a frame with an explicit end-of-stream flag and no payload, ensuring +clients can unambiguously detect when a logical stream has finished. + +If the hook returns `None`, no explicit terminator frame is sent and the stream +ends silently; clients detect completion by the normal end-of-stream condition. + #### The Connection Actor The underlying engine for this duplex communication is the diff --git a/src/connection.rs b/src/connection.rs index 9d7a11d8..ca5b50a4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -342,10 +342,9 @@ where state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { - let processed = matches!(res, Some(Ok(_))); let is_none = res.is_none(); - self.handle_response(res, state, out)?; - if processed { + let produced = self.handle_response(res, state, out)?; + if produced { self.after_low(); } if is_none { @@ -396,33 +395,46 @@ where /// /// Protocol errors are passed to `handle_error` and do not terminate the /// actor. I/O errors propagate to the caller. + /// + /// Returns `true` if a frame was appended to `out`. fn handle_response( &mut self, res: Option>>, state: &mut ActorState, out: &mut Vec, - ) -> Result<(), WireframeError> { + ) -> Result> { + let mut produced = false; match res { Some(Ok(mut frame)) => { self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); crate::metrics::inc_frames(crate::metrics::Direction::Outbound); + produced = true; } Some(Err(WireframeError::Protocol(e))) => { warn!(error = ?e, "protocol error"); self.hooks.handle_error(e, &mut self.ctx); state.mark_closed(); + // Stop polling the response after a protocol error to avoid + // double-closing and duplicate `on_command_end` signalling. + self.response = None; self.hooks.on_command_end(&mut self.ctx); crate::metrics::inc_handler_errors(); } Some(Err(e)) => return Err(e), None => { state.mark_closed(); + if let Some(mut frame) = self.hooks.stream_end_frame(&mut self.ctx) { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + crate::metrics::inc_frames(crate::metrics::Direction::Outbound); + produced = true; + } self.hooks.on_command_end(&mut self.ctx); } } - Ok(()) + Ok(produced) } /// Await cancellation on the provided shutdown token. diff --git a/src/hooks.rs b/src/hooks.rs index f7343777..7c61d601 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -52,6 +52,14 @@ pub trait WireframeProtocol: Send + Sync + 'static { /// } /// ``` fn handle_error(&self, _error: Self::ProtocolError, _ctx: &mut ConnectionContext) {} + + /// Produce a frame signalling end-of-stream. + /// + /// Implementations should set any protocol-specific flag indicating that + /// no further frames will follow. Returning `None` omits the terminator + /// frame, and the stream ends silently. The `before_send` hook runs for + /// this frame if registered. + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { None } } /// Type alias for the `before_send` callback. @@ -67,6 +75,9 @@ type OnCommandEndHook = Box; /// Type alias for the `handle_error` callback. type HandleErrorHook = Box; +/// Type alias for the `stream_end_frame` callback. +type StreamEndHook = Box Option + Send + 'static>; + /// Callbacks used by the connection actor. pub struct ProtocolHooks { /// Invoked when a connection is established. @@ -77,6 +88,8 @@ pub struct ProtocolHooks { pub on_command_end: Option, /// Invoked when a handler returns a protocol error. pub handle_error: Option>, + /// Invoked to construct an end-of-stream frame. + pub stream_end: Option>, } impl Default for ProtocolHooks { @@ -86,6 +99,7 @@ impl Default for ProtocolHooks { before_send: None, on_command_end: None, handle_error: None, + stream_end: None, } } } @@ -118,6 +132,11 @@ impl ProtocolHooks { } } + /// Run the `stream_end_frame` hook if registered. + pub fn stream_end_frame(&mut self, ctx: &mut ConnectionContext) -> Option { + self.stream_end.as_mut().and_then(|hook| hook(ctx)) + } + /// Construct hooks from a [`WireframeProtocol`] implementation. pub fn from_protocol

(protocol: &Arc

) -> Self where @@ -143,11 +162,17 @@ impl ProtocolHooks { protocol_error.handle_error(e, ctx); }) as HandleErrorHook; + let protocol_stream_end = Arc::clone(protocol); + let stream_end = + Box::new(move |ctx: &mut ConnectionContext| protocol_stream_end.stream_end_frame(ctx)) + as StreamEndHook; + Self { on_connection_setup: Some(setup), before_send: Some(before), on_command_end: Some(end), handle_error: Some(err), + stream_end: Some(stream_end), } } } diff --git a/tests/common/terminator.rs b/tests/common/terminator.rs new file mode 100644 index 00000000..92dbf1fc --- /dev/null +++ b/tests/common/terminator.rs @@ -0,0 +1,15 @@ +//! Test protocol that appends a terminator frame at end-of-stream. +//! +//! Used across integration and behavioural tests verifying the stream +//! termination mechanism. +use wireframe::hooks::{ConnectionContext, WireframeProtocol}; + +/// Protocol that produces `0` as an explicit end-of-stream marker. +pub struct Terminator; + +impl WireframeProtocol for Terminator { + type Frame = u8; + type ProtocolError = (); + + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { Some(0) } +} diff --git a/tests/cucumber.rs b/tests/cucumber.rs index b922302e..82ac70d2 100644 --- a/tests/cucumber.rs +++ b/tests/cucumber.rs @@ -1,8 +1,9 @@ //! Cucumber test runner for integration tests. //! -//! Orchestrates two distinct test suites: +//! Orchestrates three distinct test suites: //! - `PanicWorld`: Tests server resilience during connection panics //! - `CorrelationWorld`: Tests correlation ID propagation in multi-frame responses +//! - `StreamEndWorld`: Verifies end-of-stream signalling //! //! # Example //! @@ -10,6 +11,7 @@ //! ```text //! tests/features/connection_panic.feature -> PanicWorld context //! tests/features/correlation_id.feature -> CorrelationWorld context +//! tests/features/stream_end.feature -> StreamEndWorld context //! ``` //! //! Each context provides specialised step definitions and state management @@ -19,10 +21,11 @@ mod steps; mod world; use cucumber::World; -use world::{CorrelationWorld, PanicWorld}; +use world::{CorrelationWorld, PanicWorld, StreamEndWorld}; #[tokio::main] async fn main() { PanicWorld::run("tests/features/connection_panic.feature").await; CorrelationWorld::run("tests/features/correlation_id.feature").await; + StreamEndWorld::run("tests/features/stream_end.feature").await; } diff --git a/tests/features/stream_end.feature b/tests/features/stream_end.feature new file mode 100644 index 00000000..e41859d7 --- /dev/null +++ b/tests/features/stream_end.feature @@ -0,0 +1,4 @@ +Feature: Stream terminator frame + Scenario: Connection actor emits terminator after stream + When a streaming response completes + Then an end-of-stream frame is sent diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index cc3a25ca..f486d2ed 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -5,3 +5,4 @@ mod correlation_steps; mod panic_steps; +mod stream_end_steps; diff --git a/tests/steps/stream_end_steps.rs b/tests/steps/stream_end_steps.rs new file mode 100644 index 00000000..171f2827 --- /dev/null +++ b/tests/steps/stream_end_steps.rs @@ -0,0 +1,10 @@ +//! Steps for stream terminator behavioural tests. +use cucumber::{then, when}; + +use crate::world::StreamEndWorld; + +#[when("a streaming response completes")] +async fn when_stream(world: &mut StreamEndWorld) { world.process().await; } + +#[then("an end-of-stream frame is sent")] +fn then_end(world: &mut StreamEndWorld) { world.verify(); } diff --git a/tests/stream_end.rs b/tests/stream_end.rs new file mode 100644 index 00000000..383e2b72 --- /dev/null +++ b/tests/stream_end.rs @@ -0,0 +1,63 @@ +//! Tests for explicit end-of-stream signalling. +use std::sync::Arc; + +use async_stream::try_stream; +use rstest::rstest; +use tokio_util::sync::CancellationToken; +use wireframe::{ + connection::ConnectionActor, + hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}, + push::PushQueues, + response::FrameStream, +}; + +#[path = "common/terminator.rs"] +mod terminator; +use terminator::Terminator; + +#[rstest] +#[tokio::test] +async fn emits_end_frame() { + let stream: FrameStream = Box::pin(try_stream! { + yield 1; + yield 2; + }); + + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); + let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); + + let mut out = Vec::new(); + actor.run(&mut out).await.expect("actor run failed"); + + assert_eq!(out, vec![1, 2, 0]); +} + +#[rstest] +#[tokio::test] +async fn emits_no_end_frame_when_none() { + struct NoTerminator; + + impl WireframeProtocol for NoTerminator { + type Frame = u8; + type ProtocolError = (); + + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { None } + } + + let stream: FrameStream = Box::pin(try_stream! { + yield 7; + yield 8; + }); + + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let hooks = ProtocolHooks::from_protocol(&Arc::new(NoTerminator)); + let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); + + let mut out = Vec::new(); + actor.run(&mut out).await.expect("actor run failed"); + + assert_eq!(out, vec![7, 8]); +} diff --git a/tests/world.rs b/tests/world.rs index 168b1fed..9162a876 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -3,7 +3,7 @@ //! Provides shared state management for behavioural tests verifying //! server resilience against connection task panics. -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use async_stream::try_stream; use cucumber::World; @@ -12,6 +12,7 @@ use tokio_util::sync::CancellationToken; use wireframe::{ app::{Envelope, Packet, WireframeApp}, connection::ConnectionActor, + hooks::ProtocolHooks, push::PushQueues, response::FrameStream, server::WireframeServer, @@ -20,6 +21,9 @@ use wireframe::{ #[path = "common/mod.rs"] mod common; use common::unused_listener; +#[path = "common/terminator.rs"] +mod terminator; +use terminator::Terminator; #[derive(Debug)] struct PanicServer { @@ -157,3 +161,37 @@ impl CorrelationWorld { ); } } + +/// Cucumber world that captures frames from a streaming response and verifies +/// that a protocol-provided terminator frame is appended at end-of-stream. +#[derive(Debug, Default, World)] +pub struct StreamEndWorld { + frames: Vec, +} + +impl StreamEndWorld { + /// Run the connection actor and record emitted frames. + /// + /// # Panics + /// Panics if the actor fails to run successfully. + pub async fn process(&mut self) { + let stream: FrameStream = Box::pin(try_stream! { + yield 1u8; + yield 2u8; + }); + + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); + let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); + actor.run(&mut self.frames).await.expect("actor run failed"); + } + + /// Verify that a terminator frame was appended to the stream. + /// + /// # Panics + /// Panics if the expected terminator is missing. + pub fn verify(&self) { + assert_eq!(self.frames, vec![1, 2, 0]); + } +}