From eecb846b9e9aa25c6614eaf33b2b4b8713170de1 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Jun 2025 13:02:47 +0100 Subject: [PATCH 1/2] Add lifecycle unhappy path tests --- docs/roadmap.md | 2 +- docs/rust-binary-router-library-design.md | 4 +- src/app.rs | 76 +++++++++++++++++++-- tests/lifecycle.rs | 82 +++++++++++++++++++++++ 4 files changed, 156 insertions(+), 8 deletions(-) create mode 100644 tests/lifecycle.rs diff --git a/docs/roadmap.md b/docs/roadmap.md index 37d7c9e9..21c802e1 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -66,7 +66,7 @@ after formatting. Line numbers below refer to that file. using the selected serialization format and write them back through the framing layer. -- [ ] Add connection lifecycle hooks. Integrate setup and teardown stages, so +- [x] Add connection lifecycle hooks. Integrate setup and teardown stages, so sessions can hold state (such as a logged-in user ID) across messages. ## 2. Middleware and Extractors diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index bfd89f5c..c4fd582e 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -939,7 +939,9 @@ pipeline. - **Request/Response Manipulation**: Modifying message content before it reaches a handler or before a response is sent. - **Connection Lifecycle Hooks**: Performing actions when connections are - established or terminated. + established or terminated. `WireframeApp` exposes `on_connection_setup` and + `on_connection_teardown` for initializing and cleaning up per-connection + session state. The middleware system promotes a clean separation of concerns. Cross-cutting functionalities like logging, authentication, or metrics collection can be diff --git a/src/app.rs b/src/app.rs index 75ae240f..9c88806b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,7 +4,7 @@ //! for a [`WireframeServer`]. Most builder methods return [`Result`] //! so callers can chain registrations ergonomically. -use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin}; +use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin, sync::Arc}; use bytes::BytesMut; use tokio::io::{self, AsyncWrite, AsyncWriteExt}; @@ -23,12 +23,15 @@ type BoxedFrameProcessor = /// The builder stores registered routes, services, and middleware /// without enforcing an ordering. Methods return [`Result`] so /// registrations can be chained ergonomically. -pub struct WireframeApp { +#[allow(clippy::type_complexity)] +pub struct WireframeApp { routes: HashMap, services: Vec, middleware: Vec>, frame_processor: BoxedFrameProcessor, serializer: S, + on_connect: Option Pin + Send>> + Send + Sync>>, + on_disconnect: Option Pin + Send>> + Send + Sync>>, } /// Alias for boxed asynchronous handlers. @@ -81,9 +84,10 @@ impl From for SendError { /// Result type used throughout the builder API. pub type Result = std::result::Result; -impl Default for WireframeApp +impl Default for WireframeApp where S: Serializer + Default, + C: Send + 'static, { fn default() -> Self { Self { @@ -92,11 +96,13 @@ where middleware: Vec::new(), frame_processor: Box::new(LengthPrefixedProcessor), serializer: S::default(), + on_connect: None, + on_disconnect: None, } } } -impl WireframeApp { +impl WireframeApp { /// Construct a new empty application builder. /// /// # Errors @@ -106,9 +112,10 @@ impl WireframeApp { pub fn new() -> Result { Ok(Self::default()) } } -impl WireframeApp +impl WireframeApp where S: Serializer, + C: Send + 'static, { /// Construct a new empty application builder. /// @@ -155,6 +162,51 @@ where Ok(self) } + /// Register a callback invoked when a new connection is established. + /// + /// The callback can perform authentication or other setup tasks and + /// returns connection-specific state stored for the connection's + /// lifetime. + /// + /// # Errors + /// + /// This function always succeeds currently but uses [`Result`] for + /// consistency with other builder methods. + pub fn on_connection_setup(self, f: F) -> Result> + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + C2: Send + 'static, + { + Ok(WireframeApp { + routes: self.routes, + services: self.services, + middleware: self.middleware, + frame_processor: self.frame_processor, + serializer: self.serializer, + on_connect: Some(Arc::new(move || Box::pin(f()))), + on_disconnect: None, + }) + } + + /// Register a callback invoked when a connection is closed. + /// + /// The callback receives the connection state produced by + /// [`on_connection_setup`](Self::on_connection_setup). + /// + /// # Errors + /// + /// This function always succeeds currently but uses [`Result`] for + /// consistency with other builder methods. + pub fn on_connection_teardown(mut self, f: F) -> Result + where + F: Fn(C) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + { + self.on_disconnect = Some(Arc::new(move |c| Box::pin(f(c)))); + Ok(self) + } + /// Set the frame processor used for encoding and decoding frames. #[must_use] pub fn frame_processor

(mut self, processor: P) -> Self @@ -167,7 +219,7 @@ where /// Replace the serializer used for messages. #[must_use] - pub fn serializer(self, serializer: Ser) -> WireframeApp + pub fn serializer(self, serializer: Ser) -> WireframeApp where Ser: Serializer, { @@ -177,6 +229,8 @@ where middleware: self.middleware, frame_processor: self.frame_processor, serializer, + on_connect: self.on_connect, + on_disconnect: self.on_disconnect, } } @@ -215,10 +269,20 @@ where where W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, { + let state = if let Some(setup) = &self.on_connect { + Some((setup)().await) + } else { + None + }; + log::warn!( "`WireframeApp::handle_connection` called, but connection handling is not \ implemented; closing stream" ); tokio::task::yield_now().await; + + if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { + teardown(state).await; + } } } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs new file mode 100644 index 00000000..bcfc41b8 --- /dev/null +++ b/tests/lifecycle.rs @@ -0,0 +1,82 @@ +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use tokio::io::duplex; +use wireframe::app::WireframeApp; + +#[tokio::test] +async fn setup_and_teardown_callbacks_run() { + let setup_count = Arc::new(AtomicUsize::new(0)); + let teardown_count = Arc::new(AtomicUsize::new(0)); + + let setup_clone = setup_count.clone(); + let teardown_clone = teardown_count.clone(); + + let app = WireframeApp::new() + .unwrap() + .on_connection_setup(move || { + let setup_clone = setup_clone.clone(); + async move { + setup_clone.fetch_add(1, Ordering::SeqCst); + 42u32 + } + }) + .unwrap() + .on_connection_teardown(move |state| { + let teardown_clone = teardown_clone.clone(); + async move { + assert_eq!(state, 42u32); + teardown_clone.fetch_add(1, Ordering::SeqCst); + } + }) + .unwrap(); + + let (_client, server) = duplex(64); + app.handle_connection(server).await; + + assert_eq!(setup_count.load(Ordering::SeqCst), 1); + assert_eq!(teardown_count.load(Ordering::SeqCst), 1); +} +#[tokio::test] +async fn setup_without_teardown_runs() { + let setup_count = Arc::new(AtomicUsize::new(0)); + let setup_clone = setup_count.clone(); + + let app = WireframeApp::new() + .unwrap() + .on_connection_setup(move || { + let setup_clone = setup_clone.clone(); + async move { + setup_clone.fetch_add(1, Ordering::SeqCst); + } + }) + .unwrap(); + + let (_client, server) = duplex(64); + app.handle_connection(server).await; + + assert_eq!(setup_count.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn teardown_without_setup_does_not_run() { + let teardown_count = Arc::new(AtomicUsize::new(0)); + let teardown_clone = teardown_count.clone(); + + let app = WireframeApp::new() + .unwrap() + .on_connection_teardown(move |_| { + let teardown_clone = teardown_clone.clone(); + async move { + teardown_clone.fetch_add(1, Ordering::SeqCst); + } + }) + .unwrap(); + + let (_client, server) = duplex(64); + app.handle_connection(server).await; + + assert_eq!(teardown_count.load(Ordering::SeqCst), 0); +} From 14e931057262fafae16528f5b5014f35d14a7e91 Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Jun 2025 14:35:12 +0100 Subject: [PATCH 2/2] Add documentation Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- src/app.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/app.rs b/src/app.rs index 9c88806b..c529e14e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -168,6 +168,12 @@ where /// returns connection-specific state stored for the connection's /// lifetime. /// + /// # Type Parameters + /// + /// This method changes the connection state type parameter from `C` to `C2`. + /// This means that any subsequent builder methods will operate on the new connection state type `C2`. + /// Be aware of this type transition when chaining builder methods. + /// /// # Errors /// /// This function always succeeds currently but uses [`Result`] for