From b1724b72d36dfe15390ab776ef19c805e7eec2ae Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 22:27:16 +0100 Subject: [PATCH 1/4] Add WireframeProtocol trait and builder integration --- src/app.rs | 32 ++++++++++++++ src/connection.rs | 14 ++++--- src/hooks.rs | 67 ++++++++++++++++++++++++++---- src/lib.rs | 2 +- tests/connection_actor.rs | 6 +-- tests/wireframe_protocol.rs | 83 +++++++++++++++++++++++++++++++++++++ 6 files changed, 185 insertions(+), 19 deletions(-) create mode 100644 tests/wireframe_protocol.rs diff --git a/src/app.rs b/src/app.rs index 4dd31b83..9ee8fb5d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,6 +18,7 @@ use tokio::io::{self, AsyncWrite, AsyncWriteExt}; use crate::{ frame::{FrameProcessor, LengthFormat, LengthPrefixedProcessor}, + hooks::{ProtocolHooks, WireframeProtocol}, message::Message, middleware::{HandlerService, Service, ServiceRequest, Transform}, serializer::{BincodeSerializer, Serializer}, @@ -80,6 +81,7 @@ pub struct WireframeApp< app_data: HashMap>, on_connect: Option>>, on_disconnect: Option>>, + protocol: Option, ProtocolError = ()>>>, } /// Alias for asynchronous route handlers. @@ -235,6 +237,7 @@ where app_data: HashMap::new(), on_connect: None, on_disconnect: None, + protocol: None, } } } @@ -360,6 +363,7 @@ where app_data: self.app_data, on_connect: Some(Arc::new(move || Box::pin(f()))), on_disconnect: None, + protocol: self.protocol, }) } @@ -381,6 +385,33 @@ where Ok(self) } + /// Install a [`WireframeProtocol`] implementation. + #[must_use] + pub fn with_protocol

(mut self, protocol: P) -> Self + where + P: WireframeProtocol, ProtocolError = ()> + 'static, + { + self.protocol = Some(Arc::new(protocol)); + self + } + + /// Get a clone of the configured protocol, if any. + #[must_use] + pub fn protocol( + &self, + ) -> Option, ProtocolError = ()>>> { + self.protocol.as_ref().map(Arc::clone) + } + + /// Return protocol hooks derived from the installed protocol. + #[must_use] + pub fn protocol_hooks(&self) -> ProtocolHooks> { + self.protocol + .as_ref() + .map(|p| ProtocolHooks::from_protocol(Arc::clone(p))) + .unwrap_or_default() + } + /// Set the frame processor used for encoding and decoding frames. #[must_use] pub fn frame_processor

(mut self, processor: P) -> Self @@ -406,6 +437,7 @@ where app_data: self.app_data, on_connect: self.on_connect, on_disconnect: self.on_disconnect, + protocol: self.protocol, } } diff --git a/src/connection.rs b/src/connection.rs index 20fd423a..6b9962a6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -13,7 +13,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use crate::{ - hooks::ProtocolHooks, + hooks::{ConnectionContext, ProtocolHooks}, push::{FrameLike, PushQueues}, response::{FrameStream, WireframeError}, }; @@ -59,6 +59,7 @@ pub struct ConnectionActor { response: Option>, // current streaming response shutdown: CancellationToken, hooks: ProtocolHooks, + ctx: ConnectionContext, fairness: FairnessConfig, high_counter: usize, high_start: Option, @@ -104,6 +105,7 @@ where response, shutdown, hooks, + ctx: ConnectionContext, fairness: FairnessConfig::default(), high_counter: 0, high_start: None, @@ -208,7 +210,7 @@ where /// Handle the result of polling the high-priority queue. fn process_high(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { if let Some(mut frame) = res { - self.hooks.before_send(&mut frame); + self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); self.after_high(out, state); } else { @@ -221,7 +223,7 @@ where /// Handle the result of polling the low-priority queue. fn process_low(&mut self, res: Option, state: &mut ActorState, out: &mut Vec) { if let Some(mut frame) = res { - self.hooks.before_send(&mut frame); + self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); self.after_low(); } else { @@ -274,7 +276,7 @@ where { match rx.try_recv() { Ok(mut frame) => { - self.hooks.before_send(&mut frame); + self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); self.after_low(); } @@ -317,13 +319,13 @@ where ) -> Result<(), WireframeError> { match res { Some(Ok(mut frame)) => { - self.hooks.before_send(&mut frame); + self.hooks.before_send(&mut frame, &mut self.ctx); out.push(frame); } Some(Err(e)) => return Err(e), None => { state.mark_closed(); - self.hooks.on_command_end(); + self.hooks.on_command_end(&mut self.ctx); } } diff --git a/src/hooks.rs b/src/hooks.rs index 825e1a92..4fd86c02 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -1,14 +1,42 @@ //! Internal protocol hooks called by the connection actor. //! -//! This module defines [`ProtocolHooks`], a container for optional callback -//! functions invoked during connection output. The hooks are placeholders for -//! the future `WireframeProtocol` trait described in the design documents. +//! This module defines [`ProtocolHooks`] along with the public +//! [`WireframeProtocol`] trait. `ProtocolHooks` stores optional callbacks +//! invoked during connection output. Applications configure these callbacks via +//! an implementation of [`WireframeProtocol`]. + +use std::sync::Arc; + +use crate::push::{FrameLike, PushHandle}; + +/// Per-connection state passed to protocol callbacks. +#[derive(Default)] +pub struct ConnectionContext; + +/// Trait encapsulating protocol-specific logic and callbacks. +pub trait WireframeProtocol: Send + Sync + 'static { + /// Frame type written to the socket. + type Frame: FrameLike; + /// Custom error type for protocol operations. + type ProtocolError; + + /// Called once when a new connection is established. The provided + /// [`PushHandle`] may be stored by the implementation to enable + /// asynchronous server pushes. + fn on_connection_setup(&self, _handle: PushHandle, _ctx: &mut ConnectionContext) {} + + /// Invoked before any frame (push or response) is written to the socket. + fn before_send(&self, _frame: &mut Self::Frame, _ctx: &mut ConnectionContext) {} + + /// Invoked when a request/response cycle completes. + fn on_command_end(&self, _ctx: &mut ConnectionContext) {} +} /// Type alias for the `before_send` callback. -type BeforeSendHook = Box; +type BeforeSendHook = Box; /// Type alias for the `on_command_end` callback. -type OnCommandEndHook = Box; +type OnCommandEndHook = Box; /// Callbacks used by the connection actor. pub struct ProtocolHooks { @@ -29,16 +57,37 @@ impl Default for ProtocolHooks { impl ProtocolHooks { /// Run the `before_send` hook if registered. - pub fn before_send(&mut self, frame: &mut F) { + pub fn before_send(&mut self, frame: &mut F, ctx: &mut ConnectionContext) { if let Some(hook) = &mut self.before_send { - hook(frame); + hook(frame, ctx); } } /// Run the `on_command_end` hook if registered. - pub fn on_command_end(&mut self) { + pub fn on_command_end(&mut self, ctx: &mut ConnectionContext) { if let Some(hook) = &mut self.on_command_end { - hook(); + hook(ctx); + } + } + + /// Construct hooks from a [`WireframeProtocol`] implementation. + pub fn from_protocol

(protocol: Arc

) -> Self + where + P: WireframeProtocol + ?Sized, + { + let before = { + let p = Arc::clone(&protocol); + Box::new(move |frame: &mut F, ctx: &mut ConnectionContext| { + p.before_send(frame, ctx); + }) as BeforeSendHook + }; + let end = Box::new(move |ctx: &mut ConnectionContext| { + protocol.on_command_end(ctx); + }) as OnCommandEndHook; + + Self { + before_send: Some(before), + on_command_end: Some(end), } } } diff --git a/src/lib.rs b/src/lib.rs index ddb652e0..a553add2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,5 +20,5 @@ pub mod rewind_stream; pub mod server; pub use connection::ConnectionActor; -pub use hooks::ProtocolHooks; +pub use hooks::{ConnectionContext, ProtocolHooks, WireframeProtocol}; pub use response::{FrameStream, Response, WireframeError}; diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 7cbd0f17..8ee81ff3 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -178,7 +178,7 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, }; -use wireframe::ProtocolHooks; +use wireframe::{ConnectionContext, ProtocolHooks}; #[rstest] #[tokio::test] @@ -192,7 +192,7 @@ async fn before_send_hook_modifies_frames( let stream = stream::iter(vec![Ok(2u8)]); let hooks = ProtocolHooks { - before_send: Some(Box::new(|f: &mut u8| *f += 1)), + before_send: Some(Box::new(|f: &mut u8, _ctx: &mut ConnectionContext| *f += 1)), ..ProtocolHooks::default() }; @@ -216,7 +216,7 @@ async fn on_command_end_hook_runs( let counter = Arc::new(AtomicUsize::new(0)); let c = counter.clone(); let hooks = ProtocolHooks { - on_command_end: Some(Box::new(move || { + on_command_end: Some(Box::new(move |_ctx: &mut ConnectionContext| { c.fetch_add(1, Ordering::SeqCst); })), ..ProtocolHooks::default() diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs new file mode 100644 index 00000000..95e6b84a --- /dev/null +++ b/tests/wireframe_protocol.rs @@ -0,0 +1,83 @@ +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use futures::stream; +use rstest::rstest; +use tokio_util::sync::CancellationToken; +use wireframe::{ + ConnectionContext, + WireframeProtocol, + app::WireframeApp, + connection::ConnectionActor, + push::PushQueues, +}; + +struct TestProtocol { + counter: Arc, +} + +impl WireframeProtocol for TestProtocol { + type Frame = Vec; + type ProtocolError = (); + + fn on_connection_setup( + &self, + _handle: wireframe::push::PushHandle, + _ctx: &mut ConnectionContext, + ) { + self.counter.fetch_add(1, Ordering::SeqCst); + } + + fn before_send(&self, frame: &mut Self::Frame, _ctx: &mut ConnectionContext) { frame.push(1); } + + fn on_command_end(&self, _ctx: &mut ConnectionContext) { + self.counter.fetch_add(1, Ordering::SeqCst); + } +} + +#[rstest] +#[tokio::test] +async fn builder_produces_protocol_hooks() { + let counter = Arc::new(AtomicUsize::new(0)); + let protocol = TestProtocol { + counter: counter.clone(), + }; + let app = WireframeApp::new().unwrap().with_protocol(protocol); + let mut hooks = app.protocol_hooks(); + + let mut frame = vec![1u8]; + hooks.before_send(&mut frame, &mut ConnectionContext); + hooks.on_command_end(&mut ConnectionContext); + + assert_eq!(frame, vec![1, 1]); + assert_eq!(counter.load(Ordering::SeqCst), 1); +} + +#[rstest] +#[tokio::test] +async fn connection_actor_uses_protocol_from_builder() { + let counter = Arc::new(AtomicUsize::new(0)); + let protocol = TestProtocol { + counter: counter.clone(), + }; + let app = WireframeApp::new().unwrap().with_protocol(protocol); + + let hooks = app.protocol_hooks(); + let (queues, handle) = PushQueues::bounded(8, 8); + handle.push_high_priority(vec![1]).await.unwrap(); + drop(handle); + let stream = stream::iter(vec![Ok(vec![2u8])]); + let mut actor: ConnectionActor<_, ()> = ConnectionActor::with_hooks( + queues, + Some(Box::pin(stream)), + CancellationToken::new(), + hooks, + ); + let mut out = Vec::new(); + actor.run(&mut out).await.unwrap(); + + assert_eq!(out, vec![vec![1, 1], vec![2, 1]]); + assert_eq!(counter.load(Ordering::SeqCst), 1); +} From 5bca6d6a0f96fdf93c5f188cd202b0df294213f2 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sat, 28 Jun 2025 23:08:50 +0100 Subject: [PATCH 2/4] Add protocol overview diagram --- .../asynchronous-outbound-messaging-design.md | 75 ++++++++++++++----- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index dab955b8..bec3b73a 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -39,13 +39,13 @@ sections describe how to build that actor from first principles using the biased The implementation must satisfy the following core requirements: -| ID | Requirement | +| ID | Requirement | | --- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | -| G1 | Any async task must be able to push frames to a live connection. | -| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | -| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | -| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | -| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | +| G1 | Any async task must be able to push frames to a live connection. | +| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | +| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | +| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | +| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | ## 3. Core Architecture: The Connection Actor @@ -70,7 +70,7 @@ manage two distinct, bounded `tokio::mpsc` channels for pushed frames: messages like heartbeats, session control notifications, or protocol-level pings. -2. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent +1. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent background messages like log forwarding or secondary status updates. The bounded nature of these channels provides an inherent and robust @@ -90,13 +90,13 @@ The polling order will be: 1. **Graceful Shutdown Signal:** The `CancellationToken` will be checked first to ensure immediate reaction to a server-wide shutdown request. -2. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be +1. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be drained next. -3. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be +1. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be processed after all high-priority messages. -4. **Handler Response Stream:** Frames from the active request's +1. **Handler Response Stream:** Frames from the active request's `Response::Stream` will be processed last. ```rust @@ -466,6 +466,47 @@ pub trait WireframeProtocol: Send + Sync + 'static { WireframeApp::new().with_protocol(MySqlProtocolImpl); ``` +```mermaid +classDiagram + class WireframeProtocol { + <> + +Frame: FrameLike + +ProtocolError + +on_connection_setup(PushHandle, &mut ConnectionContext) + +before_send(&mut Frame, &mut ConnectionContext) + +on_command_end(&mut ConnectionContext) + } + class ProtocolHooks { + -before_send: Option> + -on_command_end: Option + +before_send(&mut self, &mut F, &mut ConnectionContext) + +on_command_end(&mut self, &mut ConnectionContext) + +from_protocol(protocol: Arc

) + } + class ConnectionContext { + <> + } + class WireframeApp { + -protocol: Option, ProtocolError=()>>> + +with_protocol(protocol) + +protocol() + +protocol_hooks() + } + class ConnectionActor { + -hooks: ProtocolHooks + -ctx: ConnectionContext + } + WireframeApp --> "1" WireframeProtocol : uses + WireframeApp --> "1" ProtocolHooks : creates + ProtocolHooks --> "1" WireframeProtocol : from_protocol + ConnectionActor --> "1" ProtocolHooks : uses + ConnectionActor --> "1" ConnectionContext : owns + ProtocolHooks --> "1" ConnectionContext : passes to hooks + WireframeProtocol --> "1" ConnectionContext : uses + WireframeProtocol --> "1" PushHandle : uses + WireframeProtocol <|.. ProtocolHooks : implemented by +``` + ## 5. Error Handling & Resilience ### 5.1 `BrokenPipe` on Connection Loss @@ -577,11 +618,11 @@ sequenceDiagram ## 8. Measurable Objectives & Success Criteria -| Category | Objective | Success Metric | +| Category | Objective | Success Metric | | --------------- | ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | -| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | -| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | -| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | -| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | -| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | +| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | +| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | +| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | +| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | +| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | +| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | From c46f5218b28c86ad4e1d7ea466bb193e31886834 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 29 Jun 2025 00:45:46 +0100 Subject: [PATCH 3/4] Improve protocol docs and tests --- .../asynchronous-outbound-messaging-design.md | 38 ++++++++++--------- src/app.rs | 8 ++++ src/hooks.rs | 14 ++++--- tests/wireframe_protocol.rs | 7 ++++ 4 files changed, 44 insertions(+), 23 deletions(-) diff --git a/docs/asynchronous-outbound-messaging-design.md b/docs/asynchronous-outbound-messaging-design.md index bec3b73a..73f010c2 100644 --- a/docs/asynchronous-outbound-messaging-design.md +++ b/docs/asynchronous-outbound-messaging-design.md @@ -39,13 +39,13 @@ sections describe how to build that actor from first principles using the biased The implementation must satisfy the following core requirements: -| ID | Requirement | +| ID | Requirement | | --- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | -| G1 | Any async task must be able to push frames to a live connection. | -| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | -| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | -| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | -| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | +| G1 | Any async task must be able to push frames to a live connection. | +| G2 | Ordering-safety: Pushed frames must interleave correctly with normal request/response traffic and respect any per-message sequencing rules. | +| G3 | Back-pressure: Writers must block (or fail fast) when the peer cannot drain the socket, preventing unbounded memory consumption. | +| G4 | Generic—independent of any particular protocol; usable by both servers and clients built on wireframe. | +| G5 | Preserve the simple “return a reply” path for code that does not need pushes, ensuring backward compatibility and low friction for existing users. | ## 3. Core Architecture: The Connection Actor @@ -70,7 +70,7 @@ manage two distinct, bounded `tokio::mpsc` channels for pushed frames: messages like heartbeats, session control notifications, or protocol-level pings. -1. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent +2. `low_priority_push_rx: mpsc::Receiver`: For standard, non-urgent background messages like log forwarding or secondary status updates. The bounded nature of these channels provides an inherent and robust @@ -90,13 +90,13 @@ The polling order will be: 1. **Graceful Shutdown Signal:** The `CancellationToken` will be checked first to ensure immediate reaction to a server-wide shutdown request. -1. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be +2. **High-Priority Push Channel:** Messages from `high_priority_push_rx` will be drained next. -1. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be +3. **Low-Priority Push Channel:** Messages from `low_priority_push_rx` will be processed after all high-priority messages. -1. **Handler Response Stream:** Frames from the active request's +4. **Handler Response Stream:** Frames from the active request's `Response::Stream` will be processed last. ```rust @@ -507,6 +507,10 @@ classDiagram WireframeProtocol <|.. ProtocolHooks : implemented by ``` +`ConnectionContext` is intentionally empty today. It offers a stable extension +point for per-connection data without breaking existing protocol +implementations. + ## 5. Error Handling & Resilience ### 5.1 `BrokenPipe` on Connection Loss @@ -618,11 +622,11 @@ sequenceDiagram ## 8. Measurable Objectives & Success Criteria -| Category | Objective | Success Metric | +| Category | Objective | Success Metric | | --------------- | ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | -| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | -| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | -| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | -| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | -| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | +| API Correctness | The PushHandle, SessionRegistry, and WireframeProtocol trait are implemented exactly as specified in this document. | 100% of the public API surface is present and correctly typed. | +| Functionality | Pushed frames are delivered reliably and in the correct order of priority. | A test with concurrent high-priority, low-priority, and streaming producers must show that all frames are delivered and that the final written sequence respects the strict priority order. | +| Back-pressure | A slow consumer must cause producer tasks to suspend without consuming unbounded memory. | A test with a slow consumer and a fast producer must show the producer's push().await call blocks, and the process memory usage remains stable. | +| Resilience | The SessionRegistry must not leak memory when connections are terminated. | A long-running test that creates and destroys thousands of connections must show no corresponding growth in the SessionRegistry's size or the process's overall memory footprint. | +| Performance | The overhead of the push mechanism should be minimal for connections that do not use it. | A benchmark of a simple request-response workload with the push feature enabled (but unused) should show < 2% performance degradation compared to a build without the feature. | +| Performance | The latency for a high-priority push under no contention should be negligible. | The time from push_high_priority().await returning to the frame being written to the socket buffer should be < 10µs. | diff --git a/src/app.rs b/src/app.rs index 9ee8fb5d..eea96537 100644 --- a/src/app.rs +++ b/src/app.rs @@ -386,6 +386,10 @@ where } /// Install a [`WireframeProtocol`] implementation. + /// + /// The protocol defines hooks for connection setup, frame modification, and + /// command completion. It is wrapped in an [`Arc`] and stored for later use + /// by the connection actor. #[must_use] pub fn with_protocol

(mut self, protocol: P) -> Self where @@ -396,6 +400,8 @@ where } /// Get a clone of the configured protocol, if any. + /// + /// Returns `None` if no protocol was installed via [`with_protocol`](Self::with_protocol). #[must_use] pub fn protocol( &self, @@ -404,6 +410,8 @@ where } /// Return protocol hooks derived from the installed protocol. + /// + /// If no protocol is installed, returns default (no-op) hooks. #[must_use] pub fn protocol_hooks(&self) -> ProtocolHooks> { self.protocol diff --git a/src/hooks.rs b/src/hooks.rs index 4fd86c02..f5e2cc23 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -10,6 +10,9 @@ use std::sync::Arc; use crate::push::{FrameLike, PushHandle}; /// Per-connection state passed to protocol callbacks. +/// +/// This empty struct is intentionally extensible. Future protocol features may +/// require storing connection-local data without breaking existing APIs. #[derive(Default)] pub struct ConnectionContext; @@ -75,12 +78,11 @@ impl ProtocolHooks { where P: WireframeProtocol + ?Sized, { - let before = { - let p = Arc::clone(&protocol); - Box::new(move |frame: &mut F, ctx: &mut ConnectionContext| { - p.before_send(frame, ctx); - }) as BeforeSendHook - }; + let protocol_before = Arc::clone(&protocol); + let before = Box::new(move |frame: &mut F, ctx: &mut ConnectionContext| { + protocol_before.before_send(frame, ctx); + }) as BeforeSendHook; + let end = Box::new(move |ctx: &mut ConnectionContext| { protocol.on_command_end(ctx); }) as OnCommandEndHook; diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs index 95e6b84a..1d2b659e 100644 --- a/tests/wireframe_protocol.rs +++ b/tests/wireframe_protocol.rs @@ -1,3 +1,10 @@ +//! Integration tests for the `WireframeProtocol` trait. +//! +//! These tests ensure that protocol implementations integrate correctly with +//! [`WireframeApp`] and [`ConnectionActor`]. They verify that hooks are invoked +//! with the expected connection context and that frame mutations occur as +//! intended. + use std::sync::{ Arc, atomic::{AtomicUsize, Ordering}, From 06abbe8080b39200f78d74e1b838262eda1f37f8 Mon Sep 17 00:00:00 2001 From: Leynos Date: Sun, 29 Jun 2025 00:45:54 +0100 Subject: [PATCH 4/4] Add connection setup protocol hook --- src/app.rs | 2 +- src/connection.rs | 20 +++++++++++------- src/hooks.rs | 26 ++++++++++++++++++++--- tests/connection_actor.rs | 42 +++++++++++++++++++++---------------- tests/wireframe_protocol.rs | 10 ++++++--- 5 files changed, 67 insertions(+), 33 deletions(-) diff --git a/src/app.rs b/src/app.rs index eea96537..3905d0f6 100644 --- a/src/app.rs +++ b/src/app.rs @@ -416,7 +416,7 @@ where pub fn protocol_hooks(&self) -> ProtocolHooks> { self.protocol .as_ref() - .map(|p| ProtocolHooks::from_protocol(Arc::clone(p))) + .map(|p| ProtocolHooks::from_protocol(&Arc::clone(p))) .unwrap_or_default() } diff --git a/src/connection.rs b/src/connection.rs index 6b9962a6..f2460bdb 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken; use crate::{ hooks::{ConnectionContext, ProtocolHooks}, - push::{FrameLike, PushQueues}, + push::{FrameLike, PushHandle, PushQueues}, response::{FrameStream, WireframeError}, }; @@ -48,9 +48,9 @@ impl Default for FairnessConfig { /// use tokio_util::sync::CancellationToken; /// use wireframe::{connection::ConnectionActor, push::PushQueues}; /// -/// let (queues, _handle) = PushQueues::::bounded(8, 8); +/// let (queues, handle) = PushQueues::::bounded(8, 8); /// let shutdown = CancellationToken::new(); -/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown); +/// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, shutdown); /// # drop(actor); /// ``` pub struct ConnectionActor { @@ -77,35 +77,39 @@ where /// use tokio_util::sync::CancellationToken; /// use wireframe::{connection::ConnectionActor, push::PushQueues}; /// - /// let (queues, _handle) = PushQueues::::bounded(4, 4); + /// let (queues, handle) = PushQueues::::bounded(4, 4); /// let token = CancellationToken::new(); - /// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, token); + /// let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle, None, token); /// # drop(actor); /// ``` #[must_use] pub fn new( queues: PushQueues, + handle: PushHandle, response: Option>, shutdown: CancellationToken, ) -> Self { - Self::with_hooks(queues, response, shutdown, ProtocolHooks::default()) + Self::with_hooks(queues, handle, response, shutdown, ProtocolHooks::default()) } /// Create a new `ConnectionActor` with custom protocol hooks. #[must_use] pub fn with_hooks( queues: PushQueues, + handle: PushHandle, response: Option>, shutdown: CancellationToken, - hooks: ProtocolHooks, + mut hooks: ProtocolHooks, ) -> Self { + let mut ctx = ConnectionContext; + hooks.on_connection_setup(handle, &mut ctx); Self { high_rx: Some(queues.high_priority_rx), low_rx: Some(queues.low_priority_rx), response, shutdown, hooks, - ctx: ConnectionContext, + ctx, fairness: FairnessConfig::default(), high_counter: 0, high_start: None, diff --git a/src/hooks.rs b/src/hooks.rs index f5e2cc23..25b71a18 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -38,11 +38,17 @@ pub trait WireframeProtocol: Send + Sync + 'static { /// Type alias for the `before_send` callback. type BeforeSendHook = Box; +/// Type alias for the `on_connection_setup` callback. +type OnConnectionSetupHook = + Box, &mut ConnectionContext) + Send + 'static>; + /// Type alias for the `on_command_end` callback. type OnCommandEndHook = Box; /// Callbacks used by the connection actor. pub struct ProtocolHooks { + /// Invoked when a connection is established. + pub on_connection_setup: Option>, /// Invoked before a frame is written to the socket. pub before_send: Option>, /// Invoked once a command completes. @@ -52,6 +58,7 @@ pub struct ProtocolHooks { impl Default for ProtocolHooks { fn default() -> Self { Self { + on_connection_setup: None, before_send: None, on_command_end: None, } @@ -59,6 +66,12 @@ impl Default for ProtocolHooks { } impl ProtocolHooks { + /// Run the `on_connection_setup` hook if registered. + pub fn on_connection_setup(&mut self, handle: PushHandle, ctx: &mut ConnectionContext) { + if let Some(hook) = self.on_connection_setup.take() { + hook(handle, ctx); + } + } /// Run the `before_send` hook if registered. pub fn before_send(&mut self, frame: &mut F, ctx: &mut ConnectionContext) { if let Some(hook) = &mut self.before_send { @@ -74,20 +87,27 @@ impl ProtocolHooks { } /// Construct hooks from a [`WireframeProtocol`] implementation. - pub fn from_protocol

(protocol: Arc

) -> Self + pub fn from_protocol

(protocol: &Arc

) -> Self where P: WireframeProtocol + ?Sized, { - let protocol_before = Arc::clone(&protocol); + let protocol_before = Arc::clone(protocol); let before = Box::new(move |frame: &mut F, ctx: &mut ConnectionContext| { protocol_before.before_send(frame, ctx); }) as BeforeSendHook; + let protocol_end = Arc::clone(protocol); let end = Box::new(move |ctx: &mut ConnectionContext| { - protocol.on_command_end(ctx); + protocol_end.on_command_end(ctx); }) as OnCommandEndHook; + let protocol_setup = Arc::clone(protocol); + let setup = Box::new(move |handle: PushHandle, ctx: &mut ConnectionContext| { + protocol_setup.on_connection_setup(handle, ctx); + }) as OnConnectionSetupHook; + Self { + on_connection_setup: Some(setup), before_send: Some(before), on_command_end: Some(end), } diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index 8ee81ff3..5e034f79 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -34,11 +34,10 @@ async fn strict_priority_order( let (queues, handle) = queues; handle.push_low_priority(2).await.unwrap(); handle.push_high_priority(1).await.unwrap(); - drop(handle); let stream = stream::iter(vec![Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = - ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert_eq!(out, vec![1, 2, 3]); @@ -60,9 +59,9 @@ async fn fairness_yields_low_after_burst( handle.push_high_priority(n).await.unwrap(); } handle.push_low_priority(99).await.unwrap(); - drop(handle); - let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, handle, None, shutdown_token); actor.set_fairness(fairness); let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); @@ -76,9 +75,10 @@ async fn shutdown_signal_precedence( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - drop(handle); shutdown_token.cancel(); - let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, None, shutdown_token); + let mut actor: ConnectionActor<_, ()> = + ConnectionActor::new(queues, handle, None, shutdown_token); + // drop the handle after actor creation to mimic early disconnection let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert!(out.is_empty()); @@ -92,11 +92,11 @@ async fn complete_draining_of_sources( ) { let (queues, handle) = queues; handle.push_high_priority(1).await.unwrap(); - drop(handle); let stream = stream::iter(vec![Ok(2u8), Ok(3u8)]); let mut actor: ConnectionActor<_, ()> = - ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); + // drop handle after actor setup let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert_eq!(out, vec![1, 2, 3]); @@ -114,14 +114,13 @@ async fn error_propagation_from_stream( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - drop(handle); let stream = stream::iter(vec![ Ok(1u8), Ok(2u8), Err(WireframeError::Protocol(TestError::Kaboom)), ]); let mut actor: ConnectionActor<_, TestError> = - ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); let mut out = Vec::new(); let result = actor.run(&mut out).await; assert!(matches!( @@ -138,7 +137,6 @@ async fn interleaved_shutdown_during_stream( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - drop(handle); let token = shutdown_token.clone(); tokio::spawn(async move { sleep(Duration::from_millis(50)).await; @@ -154,7 +152,7 @@ async fn interleaved_shutdown_during_stream( } }); let mut actor: ConnectionActor<_, ()> = - ConnectionActor::new(queues, Some(Box::pin(stream)), shutdown_token); + ConnectionActor::new(queues, handle, Some(Box::pin(stream)), shutdown_token); let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert!(!out.is_empty() && out.len() < 5); @@ -188,7 +186,6 @@ async fn before_send_hook_modifies_frames( ) { let (queues, handle) = queues; handle.push_high_priority(1).await.unwrap(); - drop(handle); let stream = stream::iter(vec![Ok(2u8)]); let hooks = ProtocolHooks { @@ -196,8 +193,13 @@ async fn before_send_hook_modifies_frames( ..ProtocolHooks::default() }; - let mut actor: ConnectionActor<_, ()> = - ConnectionActor::with_hooks(queues, Some(Box::pin(stream)), shutdown_token, hooks); + let mut actor: ConnectionActor<_, ()> = ConnectionActor::with_hooks( + queues, + handle, + Some(Box::pin(stream)), + shutdown_token, + hooks, + ); let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert_eq!(out, vec![2, 3]); @@ -210,7 +212,6 @@ async fn on_command_end_hook_runs( shutdown_token: CancellationToken, ) { let (queues, handle) = queues; - drop(handle); let stream = stream::iter(vec![Ok(1u8)]); let counter = Arc::new(AtomicUsize::new(0)); @@ -222,8 +223,13 @@ async fn on_command_end_hook_runs( ..ProtocolHooks::default() }; - let mut actor: ConnectionActor<_, ()> = - ConnectionActor::with_hooks(queues, Some(Box::pin(stream)), shutdown_token, hooks); + let mut actor: ConnectionActor<_, ()> = ConnectionActor::with_hooks( + queues, + handle, + Some(Box::pin(stream)), + shutdown_token, + hooks, + ); let mut out = Vec::new(); actor.run(&mut out).await.unwrap(); assert_eq!(counter.load(Ordering::SeqCst), 1); diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs index 1d2b659e..e13e7934 100644 --- a/tests/wireframe_protocol.rs +++ b/tests/wireframe_protocol.rs @@ -54,12 +54,16 @@ async fn builder_produces_protocol_hooks() { let app = WireframeApp::new().unwrap().with_protocol(protocol); let mut hooks = app.protocol_hooks(); + let (queues, handle) = PushQueues::bounded(1, 1); + hooks.on_connection_setup(handle, &mut ConnectionContext); + drop(queues); // silence unused warnings + let mut frame = vec![1u8]; hooks.before_send(&mut frame, &mut ConnectionContext); hooks.on_command_end(&mut ConnectionContext); assert_eq!(frame, vec![1, 1]); - assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(counter.load(Ordering::SeqCst), 2); } #[rstest] @@ -74,10 +78,10 @@ async fn connection_actor_uses_protocol_from_builder() { let hooks = app.protocol_hooks(); let (queues, handle) = PushQueues::bounded(8, 8); handle.push_high_priority(vec![1]).await.unwrap(); - drop(handle); let stream = stream::iter(vec![Ok(vec![2u8])]); let mut actor: ConnectionActor<_, ()> = ConnectionActor::with_hooks( queues, + handle, Some(Box::pin(stream)), CancellationToken::new(), hooks, @@ -86,5 +90,5 @@ async fn connection_actor_uses_protocol_from_builder() { actor.run(&mut out).await.unwrap(); assert_eq!(out, vec![vec![1, 1], vec![2, 1]]); - assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(counter.load(Ordering::SeqCst), 2); }