Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ after formatting. Line numbers below refer to that file.
using the selected serialization format and write them back through the
framing layer.

- [ ] Add connection lifecycle hooks. Integrate setup and teardown stages, so
- [x] Add connection lifecycle hooks. Integrate setup and teardown stages, so
sessions can hold state (such as a logged-in user ID) across messages.

## 2. Middleware and Extractors
Expand Down
4 changes: 3 additions & 1 deletion docs/rust-binary-router-library-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,9 @@ pipeline.
- **Request/Response Manipulation**: Modifying message content before it
reaches a handler or before a response is sent.
- **Connection Lifecycle Hooks**: Performing actions when connections are
established or terminated.
established or terminated. `WireframeApp` exposes `on_connection_setup` and
`on_connection_teardown` for initializing and cleaning up per-connection
session state.

The middleware system promotes a clean separation of concerns. Cross-cutting
functionalities like logging, authentication, or metrics collection can be
Expand Down
82 changes: 76 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! for a [`WireframeServer`]. Most builder methods return [`Result<Self>`]
//! so callers can chain registrations ergonomically.

use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin};
use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin, sync::Arc};

use bytes::BytesMut;
use tokio::io::{self, AsyncWrite, AsyncWriteExt};
Expand All @@ -23,12 +23,15 @@
/// The builder stores registered routes, services, and middleware
/// without enforcing an ordering. Methods return [`Result<Self>`] so
/// registrations can be chained ergonomically.
pub struct WireframeApp<S: Serializer = BincodeSerializer> {
#[allow(clippy::type_complexity)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (review_instructions): Function attribute is placed before the doc comment; attributes must come after the doc comment.

Please move the #[allow(clippy::type_complexity)] attribute below the doc comment for the struct, as per the style guide.

Review instructions:

Path patterns: **/*.rs

Instructions:
Ensure that function attributes are placed AFTER the function doc comment

pub struct WireframeApp<S: Serializer = BincodeSerializer, C: Send + 'static = ()> {
routes: HashMap<u32, Service>,
services: Vec<Service>,
middleware: Vec<Box<dyn Middleware>>,
frame_processor: BoxedFrameProcessor,
serializer: S,
on_connect: Option<Arc<dyn Fn() -> Pin<Box<dyn Future<Output = C> + Send>> + Send + Sync>>,
on_disconnect: Option<Arc<dyn Fn(C) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>>,
}

/// Alias for boxed asynchronous handlers.
Expand Down Expand Up @@ -81,9 +84,10 @@
/// Result type used throughout the builder API.
pub type Result<T> = std::result::Result<T, WireframeError>;

impl<S> Default for WireframeApp<S>
impl<S, C> Default for WireframeApp<S, C>
where
S: Serializer + Default,
C: Send + 'static,
{
fn default() -> Self {
Self {
Expand All @@ -92,11 +96,13 @@
middleware: Vec::new(),
frame_processor: Box::new(LengthPrefixedProcessor),
serializer: S::default(),
on_connect: None,
on_disconnect: None,
}
}
}

impl WireframeApp<BincodeSerializer> {
impl WireframeApp<BincodeSerializer, ()> {
/// Construct a new empty application builder.
///
/// # Errors
Expand All @@ -106,9 +112,10 @@
pub fn new() -> Result<Self> { Ok(Self::default()) }
}

impl<S> WireframeApp<S>
impl<S, C> WireframeApp<S, C>
where
S: Serializer,
C: Send + 'static,
{
/// Construct a new empty application builder.
///
Expand Down Expand Up @@ -155,6 +162,57 @@
Ok(self)
}

/// Register a callback invoked when a new connection is established.
///
/// The callback can perform authentication or other setup tasks and
/// returns connection-specific state stored for the connection's
/// lifetime.
///
/// # Type Parameters

Check warning on line 171 in src/app.rs

View workflow job for this annotation

GitHub Actions / build-test

Diff in /home/runner/work/wireframe/wireframe/src/app.rs
///
/// This method changes the connection state type parameter from `C` to `C2`.
/// This means that any subsequent builder methods will operate on the new connection state type `C2`.
/// Be aware of this type transition when chaining builder methods.
///
/// # Errors
///
/// This function always succeeds currently but uses [`Result`] for
/// consistency with other builder methods.
pub fn on_connection_setup<F, Fut, C2>(self, f: F) -> Result<WireframeApp<S, C2>>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = C2> + Send + 'static,
C2: Send + 'static,
{
Ok(WireframeApp {
routes: self.routes,
services: self.services,
middleware: self.middleware,
frame_processor: self.frame_processor,
serializer: self.serializer,
on_connect: Some(Arc::new(move || Box::pin(f()))),
on_disconnect: None,
})
}

/// Register a callback invoked when a connection is closed.
///
/// The callback receives the connection state produced by
/// [`on_connection_setup`](Self::on_connection_setup).
///
/// # Errors
///
/// This function always succeeds currently but uses [`Result`] for
/// consistency with other builder methods.
pub fn on_connection_teardown<F, Fut>(mut self, f: F) -> Result<Self>
where
F: Fn(C) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.on_disconnect = Some(Arc::new(move |c| Box::pin(f(c))));
Ok(self)
}

/// Set the frame processor used for encoding and decoding frames.
#[must_use]
pub fn frame_processor<P>(mut self, processor: P) -> Self
Expand All @@ -167,7 +225,7 @@

/// Replace the serializer used for messages.
#[must_use]
pub fn serializer<Ser>(self, serializer: Ser) -> WireframeApp<Ser>
pub fn serializer<Ser>(self, serializer: Ser) -> WireframeApp<Ser, C>
where
Ser: Serializer,
{
Expand All @@ -177,6 +235,8 @@
middleware: self.middleware,
frame_processor: self.frame_processor,
serializer,
on_connect: self.on_connect,
on_disconnect: self.on_disconnect,
}
}

Expand Down Expand Up @@ -215,10 +275,20 @@
where
W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let state = if let Some(setup) = &self.on_connect {
Some((setup)().await)
} else {
None
};

log::warn!(
"`WireframeApp::handle_connection` called, but connection handling is not \
implemented; closing stream"
);
tokio::task::yield_now().await;

if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) {
teardown(state).await;
}
}
}
82 changes: 82 additions & 0 deletions tests/lifecycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};

use tokio::io::duplex;
use wireframe::app::WireframeApp;

#[tokio::test]
async fn setup_and_teardown_callbacks_run() {
let setup_count = Arc::new(AtomicUsize::new(0));
let teardown_count = Arc::new(AtomicUsize::new(0));

let setup_clone = setup_count.clone();
let teardown_clone = teardown_count.clone();

let app = WireframeApp::new()
.unwrap()
.on_connection_setup(move || {
let setup_clone = setup_clone.clone();
async move {
setup_clone.fetch_add(1, Ordering::SeqCst);
42u32
}
})
.unwrap()
.on_connection_teardown(move |state| {
let teardown_clone = teardown_clone.clone();
async move {
assert_eq!(state, 42u32);
teardown_clone.fetch_add(1, Ordering::SeqCst);
}
})
.unwrap();

let (_client, server) = duplex(64);
app.handle_connection(server).await;

assert_eq!(setup_count.load(Ordering::SeqCst), 1);
assert_eq!(teardown_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn setup_without_teardown_runs() {
let setup_count = Arc::new(AtomicUsize::new(0));
let setup_clone = setup_count.clone();

let app = WireframeApp::new()
.unwrap()
.on_connection_setup(move || {
let setup_clone = setup_clone.clone();
async move {
setup_clone.fetch_add(1, Ordering::SeqCst);
}
})
.unwrap();

let (_client, server) = duplex(64);
app.handle_connection(server).await;

assert_eq!(setup_count.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn teardown_without_setup_does_not_run() {
let teardown_count = Arc::new(AtomicUsize::new(0));
let teardown_clone = teardown_count.clone();

let app = WireframeApp::new()
.unwrap()
.on_connection_teardown(move |_| {
let teardown_clone = teardown_clone.clone();
async move {
teardown_clone.fetch_add(1, Ordering::SeqCst);
}
})
.unwrap();

let (_client, server) = duplex(64);
app.handle_connection(server).await;

assert_eq!(teardown_count.load(Ordering::SeqCst), 0);
}
Loading