diff --git a/docs/roadmap.md b/docs/roadmap.md index 21c802e1..fd200523 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -39,14 +39,15 @@ after formatting. Line numbers below refer to that file. server::WireframeServer, }; - async fn handler() {} + use wireframe::app::Envelope; + async fn handler(_env: &Envelope) {} #[tokio::main] async fn main() -> std::io::Result<()> { let factory = || { WireframeApp::new() .unwrap() - .route(1, Box::new(|| Box::pin(handler()))) + .route(1, Box::new(|env| Box::pin(handler(env)))) .unwrap() }; diff --git a/src/app.rs b/src/app.rs index c529e14e..78ebc857 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,8 +1,8 @@ //! Application builder configuring routes and middleware. //! -//! `WireframeApp` stores registered routes, services, and middleware -//! for a [`WireframeServer`]. Most builder methods return [`Result`] -//! so callers can chain registrations ergonomically. +//! This module defines [`WireframeApp`], an Actix-inspired builder for +//! managing connection state, routing, and middleware in a `WireframeServer`. +//! It exposes convenience methods to register handlers and lifecycle hooks. use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin, sync::Arc}; @@ -38,7 +38,7 @@ pub struct WireframeApp Pin + Send>> + Send + Sync>; +pub type Service = Box Pin + Send>> + Send + Sync>; /// Trait representing middleware components. pub trait Middleware: Send + Sync {} @@ -81,6 +81,21 @@ impl From for SendError { fn from(e: io::Error) -> Self { SendError::Io(e) } } +/// Basic envelope type used by [`handle_connection`]. +/// +/// Incoming frames are deserialized into an `Envelope` containing the +/// message identifier and raw payload bytes. +#[derive(bincode::Decode, bincode::Encode)] +pub struct Envelope { + id: u32, + msg: Vec, +} + +/// Number of idle polls before terminating a connection. +const MAX_IDLE_POLLS: u32 = 50; // ~5s with 100ms timeout +/// Maximum consecutive deserialization failures before closing a connection. +const MAX_DESER_FAILURES: u32 = 10; + /// Result type used throughout the builder API. pub type Result = std::result::Result; @@ -171,8 +186,8 @@ where /// # Type Parameters /// /// This method changes the connection state type parameter from `C` to `C2`. - /// This means that any subsequent builder methods will operate on the new connection state type `C2`. - /// Be aware of this type transition when chaining builder methods. + /// This means that any subsequent builder methods will operate on the new connection state type + /// `C2`. Be aware of this type transition when chaining builder methods. /// /// # Errors /// @@ -271,7 +286,7 @@ where /// This placeholder immediately closes the connection after the /// preamble phase. A warning is logged so tests and callers are /// aware of the current limitation. - pub async fn handle_connection(&self, _stream: W) + pub async fn handle_connection(&self, mut stream: W) where W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, { @@ -281,14 +296,133 @@ where None }; - log::warn!( - "`WireframeApp::handle_connection` called, but connection handling is not \ - implemented; closing stream" - ); - tokio::task::yield_now().await; + if let Err(e) = self.process_stream(&mut stream).await { + log::warn!("connection terminated with error: {e}"); + } if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { teardown(state).await; } } + + async fn process_stream(&self, stream: &mut W) -> io::Result<()> + where + W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, + { + let mut buf = BytesMut::with_capacity(1024); + let mut idle = 0u32; + let mut deser_failures = 0u32; + + loop { + if let Some(frame) = self.frame_processor.decode(&mut buf)? { + self.handle_frame(stream, &frame, &mut deser_failures) + .await?; + idle = 0; + continue; + } + + if self.read_and_update(stream, &mut buf, &mut idle).await? { + break; + } + } + + Ok(()) + } + + async fn read_and_update( + &self, + stream: &mut W, + buf: &mut BytesMut, + idle: &mut u32, + ) -> io::Result + where + W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, + { + match self.read_into(stream, buf).await { + Ok(Some(0)) => Ok(true), + Ok(Some(_)) => { + *idle = 0; + Ok(false) + } + Ok(None) => { + *idle += 1; + Ok(*idle >= MAX_IDLE_POLLS) + } + Err(e) if Self::is_transient_error(&e) => Ok(false), + Err(e) if Self::is_fatal_error(&e) => Ok(true), + Err(e) => Err(e), + } + } + + fn is_transient_error(e: &io::Error) -> bool { + matches!( + e.kind(), + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted + ) + } + + fn is_fatal_error(e: &io::Error) -> bool { + matches!( + e.kind(), + io::ErrorKind::UnexpectedEof + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::BrokenPipe + ) + } + + async fn read_into(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result> + where + W: tokio::io::AsyncRead + Unpin, + { + use tokio::{ + io::AsyncReadExt, + time::{Duration, timeout}, + }; + + const READ_TIMEOUT: Duration = Duration::from_millis(100); + + match timeout(READ_TIMEOUT, stream.read_buf(buf)).await { + Ok(Ok(n)) => Ok(Some(n)), + Ok(Err(e)) => Err(e), + Err(_) => Ok(None), + } + } + + async fn handle_frame( + &self, + stream: &mut W, + frame: &[u8], + deser_failures: &mut u32, + ) -> io::Result<()> + where + W: tokio::io::AsyncWrite + Unpin, + { + match self.serializer.deserialize::(frame) { + Ok((env, _)) => { + *deser_failures = 0; + if let Some(handler) = self.routes.get(&env.id) { + handler(&env).await; + } else { + log::warn!("no handler for message id {}", env.id); + } + + if let Err(e) = self.send_response(stream, &env).await { + log::warn!("failed to send response: {e}"); + } + } + Err(e) => { + *deser_failures += 1; + log::warn!("failed to deserialize message: {e}"); + if *deser_failures >= MAX_DESER_FAILURES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "too many deserialization failures", + )); + } + } + } + + Ok(()) + } } diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index bcfc41b8..b02a552d 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -67,7 +67,7 @@ async fn teardown_without_setup_does_not_run() { let app = WireframeApp::new() .unwrap() - .on_connection_teardown(move |_| { + .on_connection_teardown(move |()| { let teardown_clone = teardown_clone.clone(); async move { teardown_clone.fetch_add(1, Ordering::SeqCst);