From ab109a7af47986e299a15ab1bf55d191c7974e23 Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 14 Aug 2025 20:59:56 +0100 Subject: [PATCH 1/5] Signal end of stream with terminator frame --- ...eframe-a-guide-to-production-resilience.md | 5 +++ docs/roadmap.md | 2 +- ...-set-philosophy-and-capability-maturity.md | 5 +++ src/connection.rs | 19 +++++--- src/hooks.rs | 24 ++++++++++ tests/cucumber.rs | 7 ++- tests/features/stream_end.feature | 4 ++ tests/steps/mod.rs | 1 + tests/steps/stream_end_steps.rs | 10 +++++ tests/stream_end.rs | 40 +++++++++++++++++ tests/world.rs | 44 ++++++++++++++++++- 11 files changed, 152 insertions(+), 9 deletions(-) create mode 100644 tests/features/stream_end.feature create mode 100644 tests/steps/stream_end_steps.rs create mode 100644 tests/stream_end.rs diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 7f41c0f4..10162d6a 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -322,6 +322,11 @@ This allows the protocol implementation to serialize a proper error frame (e.g., an SQL error code) to send to the client before terminating the current operation, rather than just abruptly closing the connection. +When a stream concludes successfully, the connection actor calls the +`stream_end_frame` hook to produce a terminator frame with no payload. This +explicit marker lets clients recognise that the logical stream has ended and +helps avoid lingering resources or stalled state machines. + ### 5.2 Dead Letter Queues (DLQ) for Guaranteed Pushing In some systems (e.g., financial transactions, critical audit logs), dropping a diff --git a/docs/roadmap.md b/docs/roadmap.md index 729a2c36..6d06e610 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -189,7 +189,7 @@ stream. is the unique request ID. For each message in a multi-packet response, this ID must match the original request's ID. - - [ ] Define a mechanism to signal the end of a multi-packet stream, such as + - [x] Define a mechanism to signal the end of a multi-packet stream, such as a frame with a specific flag and no payload. - [ ] **Core Library Implementation:** diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index e5d44ab2..589186a2 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -89,6 +89,11 @@ async fn handle_large_query(req: Request) -> io::Result> { See `examples/async_stream.rs` for a runnable demonstration of this pattern. +Completion of a streaming response is signalled by a protocol-defined +terminator frame. The new `stream_end_frame` hook allows implementations to +emit a frame with an explicit end-of-stream flag and no payload, ensuring +clients can unambiguously detect when a logical stream has finished. + #### The Connection Actor The underlying engine for this duplex communication is the diff --git a/src/connection.rs b/src/connection.rs index 9d7a11d8..fa840863 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -342,10 +342,9 @@ where state: &mut ActorState, out: &mut Vec, ) -> Result<(), WireframeError> { - let processed = matches!(res, Some(Ok(_))); let is_none = res.is_none(); - self.handle_response(res, state, out)?; - if processed { + let produced = self.handle_response(res, state, out)?; + if produced { self.after_low(); } if is_none { @@ -396,17 +395,21 @@ where /// /// Protocol errors are passed to `handle_error` and do not terminate the /// actor. I/O errors propagate to the caller. + /// + /// Returns `true` if a frame was appended to `out`. fn handle_response( &mut self, res: Option>>, state: &mut ActorState, out: &mut Vec, - ) -> Result<(), WireframeError> { + ) -> Result> { + let mut produced = false; match res { Some(Ok(mut frame)) => { self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); crate::metrics::inc_frames(crate::metrics::Direction::Outbound); + produced = true; } Some(Err(WireframeError::Protocol(e))) => { warn!(error = ?e, "protocol error"); @@ -418,11 +421,17 @@ where Some(Err(e)) => return Err(e), None => { state.mark_closed(); + if let Some(mut frame) = self.hooks.stream_end(&mut self.ctx) { + self.hooks.before_send(&mut frame, &mut self.ctx); + out.push(frame); + crate::metrics::inc_frames(crate::metrics::Direction::Outbound); + produced = true; + } self.hooks.on_command_end(&mut self.ctx); } } - Ok(()) + Ok(produced) } /// Await cancellation on the provided shutdown token. diff --git a/src/hooks.rs b/src/hooks.rs index f7343777..dbf47753 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -52,6 +52,13 @@ pub trait WireframeProtocol: Send + Sync + 'static { /// } /// ``` fn handle_error(&self, _error: Self::ProtocolError, _ctx: &mut ConnectionContext) {} + + /// Produce a frame signalling the end of a streaming response. + /// + /// Implementations should set any protocol-specific flag indicating that + /// no further frames will follow. Returning `None` omits the terminator + /// frame. + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { None } } /// Type alias for the `before_send` callback. @@ -67,6 +74,9 @@ type OnCommandEndHook = Box; /// Type alias for the `handle_error` callback. type HandleErrorHook = Box; +/// Type alias for the `stream_end_frame` callback. +type StreamEndHook = Box Option + Send + 'static>; + /// Callbacks used by the connection actor. pub struct ProtocolHooks { /// Invoked when a connection is established. @@ -77,6 +87,8 @@ pub struct ProtocolHooks { pub on_command_end: Option, /// Invoked when a handler returns a protocol error. pub handle_error: Option>, + /// Invoked to construct an end-of-stream frame. + pub stream_end: Option>, } impl Default for ProtocolHooks { @@ -86,6 +98,7 @@ impl Default for ProtocolHooks { before_send: None, on_command_end: None, handle_error: None, + stream_end: None, } } } @@ -118,6 +131,11 @@ impl ProtocolHooks { } } + /// Run the `stream_end` hook if registered. + pub fn stream_end(&mut self, ctx: &mut ConnectionContext) -> Option { + self.stream_end.as_mut().and_then(|hook| hook(ctx)) + } + /// Construct hooks from a [`WireframeProtocol`] implementation. pub fn from_protocol

(protocol: &Arc

) -> Self where @@ -143,11 +161,17 @@ impl ProtocolHooks { protocol_error.handle_error(e, ctx); }) as HandleErrorHook; + let protocol_stream_end = Arc::clone(protocol); + let stream_end = + Box::new(move |ctx: &mut ConnectionContext| protocol_stream_end.stream_end_frame(ctx)) + as StreamEndHook; + Self { on_connection_setup: Some(setup), before_send: Some(before), on_command_end: Some(end), handle_error: Some(err), + stream_end: Some(stream_end), } } } diff --git a/tests/cucumber.rs b/tests/cucumber.rs index b922302e..82ac70d2 100644 --- a/tests/cucumber.rs +++ b/tests/cucumber.rs @@ -1,8 +1,9 @@ //! Cucumber test runner for integration tests. //! -//! Orchestrates two distinct test suites: +//! Orchestrates three distinct test suites: //! - `PanicWorld`: Tests server resilience during connection panics //! - `CorrelationWorld`: Tests correlation ID propagation in multi-frame responses +//! - `StreamEndWorld`: Verifies end-of-stream signalling //! //! # Example //! @@ -10,6 +11,7 @@ //! ```text //! tests/features/connection_panic.feature -> PanicWorld context //! tests/features/correlation_id.feature -> CorrelationWorld context +//! tests/features/stream_end.feature -> StreamEndWorld context //! ``` //! //! Each context provides specialised step definitions and state management @@ -19,10 +21,11 @@ mod steps; mod world; use cucumber::World; -use world::{CorrelationWorld, PanicWorld}; +use world::{CorrelationWorld, PanicWorld, StreamEndWorld}; #[tokio::main] async fn main() { PanicWorld::run("tests/features/connection_panic.feature").await; CorrelationWorld::run("tests/features/correlation_id.feature").await; + StreamEndWorld::run("tests/features/stream_end.feature").await; } diff --git a/tests/features/stream_end.feature b/tests/features/stream_end.feature new file mode 100644 index 00000000..e41859d7 --- /dev/null +++ b/tests/features/stream_end.feature @@ -0,0 +1,4 @@ +Feature: Stream terminator frame + Scenario: Connection actor emits terminator after stream + When a streaming response completes + Then an end-of-stream frame is sent diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index cc3a25ca..f486d2ed 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -5,3 +5,4 @@ mod correlation_steps; mod panic_steps; +mod stream_end_steps; diff --git a/tests/steps/stream_end_steps.rs b/tests/steps/stream_end_steps.rs new file mode 100644 index 00000000..171f2827 --- /dev/null +++ b/tests/steps/stream_end_steps.rs @@ -0,0 +1,10 @@ +//! Steps for stream terminator behavioural tests. +use cucumber::{then, when}; + +use crate::world::StreamEndWorld; + +#[when("a streaming response completes")] +async fn when_stream(world: &mut StreamEndWorld) { world.process().await; } + +#[then("an end-of-stream frame is sent")] +fn then_end(world: &mut StreamEndWorld) { world.verify(); } diff --git a/tests/stream_end.rs b/tests/stream_end.rs new file mode 100644 index 00000000..44c1a9fe --- /dev/null +++ b/tests/stream_end.rs @@ -0,0 +1,40 @@ +//! Tests for explicit end-of-stream signalling. +use std::sync::Arc; + +use async_stream::try_stream; +use rstest::rstest; +use tokio_util::sync::CancellationToken; +use wireframe::{ + connection::ConnectionActor, + hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}, + push::PushQueues, + response::FrameStream, +}; + +struct TerminatorProto; + +impl WireframeProtocol for TerminatorProto { + type Frame = u8; + type ProtocolError = (); + + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { Some(0) } +} + +#[rstest] +#[tokio::test] +async fn emits_end_frame() { + let stream: FrameStream = Box::pin(try_stream! { + yield 1; + yield 2; + }); + + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let hooks = ProtocolHooks::from_protocol(&Arc::new(TerminatorProto)); + let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); + + let mut out = Vec::new(); + actor.run(&mut out).await.expect("actor run failed"); + + assert_eq!(out, vec![1, 2, 0]); +} diff --git a/tests/world.rs b/tests/world.rs index 168b1fed..62aba49d 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -3,7 +3,7 @@ //! Provides shared state management for behavioural tests verifying //! server resilience against connection task panics. -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use async_stream::try_stream; use cucumber::World; @@ -12,6 +12,7 @@ use tokio_util::sync::CancellationToken; use wireframe::{ app::{Envelope, Packet, WireframeApp}, connection::ConnectionActor, + hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}, push::PushQueues, response::FrameStream, server::WireframeServer, @@ -157,3 +158,44 @@ impl CorrelationWorld { ); } } + +#[derive(Debug, Default, World)] +pub struct StreamEndWorld { + frames: Vec, +} + +struct Terminator; + +impl WireframeProtocol for Terminator { + type Frame = u8; + type ProtocolError = (); + + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { + Some(0) + } +} + +impl StreamEndWorld { + /// Run the connection actor and record emitted frames. + /// + /// # Panics + /// Panics if the actor fails to run successfully. + pub async fn process(&mut self) { + let stream: FrameStream = Box::pin(try_stream! { + yield 1u8; + yield 2u8; + }); + + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); + let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); + actor.run(&mut self.frames).await.expect("actor run failed"); + } + + /// Verify that a terminator frame was appended to the stream. + /// + /// # Panics + /// Panics if the expected terminator is missing. + pub fn verify(&self) { assert_eq!(self.frames, vec![1, 2, 0]); } +} From 63cb2d9d33b72bb19adc9ee9baf1d184d69ea2b0 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 15 Aug 2025 11:40:56 +0100 Subject: [PATCH 2/5] Format StreamEndWorld for rustfmt compliance --- tests/world.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/world.rs b/tests/world.rs index 62aba49d..c98db5b8 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -170,9 +170,7 @@ impl WireframeProtocol for Terminator { type Frame = u8; type ProtocolError = (); - fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { - Some(0) - } + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { Some(0) } } impl StreamEndWorld { @@ -197,5 +195,7 @@ impl StreamEndWorld { /// /// # Panics /// Panics if the expected terminator is missing. - pub fn verify(&self) { assert_eq!(self.frames, vec![1, 2, 0]); } + pub fn verify(&self) { + assert_eq!(self.frames, vec![1, 2, 0]); + } } From 5abd37f15b13d704755c06b895face877adb2f49 Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 15 Aug 2025 12:06:10 +0100 Subject: [PATCH 3/5] Clear response on protocol errors --- src/connection.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index fa840863..f1d49eb1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -415,6 +415,9 @@ where warn!(error = ?e, "protocol error"); self.hooks.handle_error(e, &mut self.ctx); state.mark_closed(); + // Stop polling the response after a protocol error to avoid + // double-closing and duplicate `on_command_end` signalling. + self.response = None; self.hooks.on_command_end(&mut self.ctx); crate::metrics::inc_handler_errors(); } From e1fdd0e109c42fc3ce57cb20e8d063e7e7a0131e Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 15 Aug 2025 14:23:33 +0100 Subject: [PATCH 4/5] Clarify stream terminators and expand tests --- ...eframe-a-guide-to-production-resilience.md | 5 ++- ...-set-philosophy-and-capability-maturity.md | 3 ++ src/hooks.rs | 9 ++-- tests/common/terminator.rs | 15 +++++++ tests/stream_end.rs | 41 +++++++++++++++---- tests/world.rs | 16 +++----- 6 files changed, 65 insertions(+), 24 deletions(-) create mode 100644 tests/common/terminator.rs diff --git a/docs/hardening-wireframe-a-guide-to-production-resilience.md b/docs/hardening-wireframe-a-guide-to-production-resilience.md index 10162d6a..78a6ce68 100644 --- a/docs/hardening-wireframe-a-guide-to-production-resilience.md +++ b/docs/hardening-wireframe-a-guide-to-production-resilience.md @@ -325,7 +325,10 @@ operation, rather than just abruptly closing the connection. When a stream concludes successfully, the connection actor calls the `stream_end_frame` hook to produce a terminator frame with no payload. This explicit marker lets clients recognise that the logical stream has ended and -helps avoid lingering resources or stalled state machines. +helps avoid lingering resources or stalled state machines. The terminator is +only appended if the protocol supplies one (that is, the hook returns +`Some(frame)`), and the frame passes through the `before_send` hook like any +other, allowing final mutation or metadata to be applied consistently. ### 5.2 Dead Letter Queues (DLQ) for Guaranteed Pushing diff --git a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md index 589186a2..7a70c955 100644 --- a/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md +++ b/docs/the-road-to-wireframe-1-0-feature-set-philosophy-and-capability-maturity.md @@ -94,6 +94,9 @@ terminator frame. The new `stream_end_frame` hook allows implementations to emit a frame with an explicit end-of-stream flag and no payload, ensuring clients can unambiguously detect when a logical stream has finished. +If the hook returns `None`, no explicit terminator frame is sent and the stream +ends silently; clients detect completion by the normal end-of-stream condition. + #### The Connection Actor The underlying engine for this duplex communication is the diff --git a/src/hooks.rs b/src/hooks.rs index dbf47753..ecfa3f61 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -53,11 +53,12 @@ pub trait WireframeProtocol: Send + Sync + 'static { /// ``` fn handle_error(&self, _error: Self::ProtocolError, _ctx: &mut ConnectionContext) {} - /// Produce a frame signalling the end of a streaming response. + /// Optionally produce a frame signalling the end of a streaming response. /// - /// Implementations should set any protocol-specific flag indicating that - /// no further frames will follow. Returning `None` omits the terminator - /// frame. + /// Implementations may set protocol-specific flags indicating that no + /// further frames will follow. Returning `None` omits the terminator frame, + /// and the stream ends silently. Any produced frame is passed through + /// [`before_send`][WireframeProtocol::before_send] prior to emission. fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { None } } diff --git a/tests/common/terminator.rs b/tests/common/terminator.rs new file mode 100644 index 00000000..92dbf1fc --- /dev/null +++ b/tests/common/terminator.rs @@ -0,0 +1,15 @@ +//! Test protocol that appends a terminator frame at end-of-stream. +//! +//! Used across integration and behavioural tests verifying the stream +//! termination mechanism. +use wireframe::hooks::{ConnectionContext, WireframeProtocol}; + +/// Protocol that produces `0` as an explicit end-of-stream marker. +pub struct Terminator; + +impl WireframeProtocol for Terminator { + type Frame = u8; + type ProtocolError = (); + + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { Some(0) } +} diff --git a/tests/stream_end.rs b/tests/stream_end.rs index 44c1a9fe..383e2b72 100644 --- a/tests/stream_end.rs +++ b/tests/stream_end.rs @@ -11,14 +11,9 @@ use wireframe::{ response::FrameStream, }; -struct TerminatorProto; - -impl WireframeProtocol for TerminatorProto { - type Frame = u8; - type ProtocolError = (); - - fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { Some(0) } -} +#[path = "common/terminator.rs"] +mod terminator; +use terminator::Terminator; #[rstest] #[tokio::test] @@ -30,7 +25,7 @@ async fn emits_end_frame() { let (queues, handle) = PushQueues::bounded(1, 1); let shutdown = CancellationToken::new(); - let hooks = ProtocolHooks::from_protocol(&Arc::new(TerminatorProto)); + let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); let mut out = Vec::new(); @@ -38,3 +33,31 @@ async fn emits_end_frame() { assert_eq!(out, vec![1, 2, 0]); } + +#[rstest] +#[tokio::test] +async fn emits_no_end_frame_when_none() { + struct NoTerminator; + + impl WireframeProtocol for NoTerminator { + type Frame = u8; + type ProtocolError = (); + + fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { None } + } + + let stream: FrameStream = Box::pin(try_stream! { + yield 7; + yield 8; + }); + + let (queues, handle) = PushQueues::bounded(1, 1); + let shutdown = CancellationToken::new(); + let hooks = ProtocolHooks::from_protocol(&Arc::new(NoTerminator)); + let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); + + let mut out = Vec::new(); + actor.run(&mut out).await.expect("actor run failed"); + + assert_eq!(out, vec![7, 8]); +} diff --git a/tests/world.rs b/tests/world.rs index c98db5b8..9162a876 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use wireframe::{ app::{Envelope, Packet, WireframeApp}, connection::ConnectionActor, - hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}, + hooks::ProtocolHooks, push::PushQueues, response::FrameStream, server::WireframeServer, @@ -21,6 +21,9 @@ use wireframe::{ #[path = "common/mod.rs"] mod common; use common::unused_listener; +#[path = "common/terminator.rs"] +mod terminator; +use terminator::Terminator; #[derive(Debug)] struct PanicServer { @@ -159,20 +162,13 @@ impl CorrelationWorld { } } +/// Cucumber world that captures frames from a streaming response and verifies +/// that a protocol-provided terminator frame is appended at end-of-stream. #[derive(Debug, Default, World)] pub struct StreamEndWorld { frames: Vec, } -struct Terminator; - -impl WireframeProtocol for Terminator { - type Frame = u8; - type ProtocolError = (); - - fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { Some(0) } -} - impl StreamEndWorld { /// Run the connection actor and record emitted frames. /// From 6fc0b6cee6a9b3106eb007e745702eae8e714d5c Mon Sep 17 00:00:00 2001 From: Leynos Date: Fri, 15 Aug 2025 17:22:10 +0100 Subject: [PATCH 5/5] Rename stream_end hook method --- src/connection.rs | 2 +- src/hooks.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index f1d49eb1..ca5b50a4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -424,7 +424,7 @@ where Some(Err(e)) => return Err(e), None => { state.mark_closed(); - if let Some(mut frame) = self.hooks.stream_end(&mut self.ctx) { + if let Some(mut frame) = self.hooks.stream_end_frame(&mut self.ctx) { self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); crate::metrics::inc_frames(crate::metrics::Direction::Outbound); diff --git a/src/hooks.rs b/src/hooks.rs index ecfa3f61..7c61d601 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -53,12 +53,12 @@ pub trait WireframeProtocol: Send + Sync + 'static { /// ``` fn handle_error(&self, _error: Self::ProtocolError, _ctx: &mut ConnectionContext) {} - /// Optionally produce a frame signalling the end of a streaming response. + /// Produce a frame signalling end-of-stream. /// - /// Implementations may set protocol-specific flags indicating that no - /// further frames will follow. Returning `None` omits the terminator frame, - /// and the stream ends silently. Any produced frame is passed through - /// [`before_send`][WireframeProtocol::before_send] prior to emission. + /// Implementations should set any protocol-specific flag indicating that + /// no further frames will follow. Returning `None` omits the terminator + /// frame, and the stream ends silently. The `before_send` hook runs for + /// this frame if registered. fn stream_end_frame(&self, _ctx: &mut ConnectionContext) -> Option { None } } @@ -132,8 +132,8 @@ impl ProtocolHooks { } } - /// Run the `stream_end` hook if registered. - pub fn stream_end(&mut self, ctx: &mut ConnectionContext) -> Option { + /// Run the `stream_end_frame` hook if registered. + pub fn stream_end_frame(&mut self, ctx: &mut ConnectionContext) -> Option { self.stream_end.as_mut().and_then(|hook| hook(ctx)) }