From 2a9187e9ddf4c583aca1dab72d1ceb5a8526aa8f Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Jun 2025 19:03:01 +0100 Subject: [PATCH 1/6] Implement connection handling --- src/app.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/src/app.rs b/src/app.rs index c529e14e..e8c5fa68 100644 --- a/src/app.rs +++ b/src/app.rs @@ -171,8 +171,8 @@ where /// # Type Parameters /// /// This method changes the connection state type parameter from `C` to `C2`. - /// This means that any subsequent builder methods will operate on the new connection state type `C2`. - /// Be aware of this type transition when chaining builder methods. + /// This means that any subsequent builder methods will operate on the new connection state type + /// `C2`. Be aware of this type transition when chaining builder methods. /// /// # Errors /// @@ -271,7 +271,7 @@ where /// This placeholder immediately closes the connection after the /// preamble phase. A warning is logged so tests and callers are /// aware of the current limitation. - pub async fn handle_connection(&self, _stream: W) + pub async fn handle_connection(&self, mut stream: W) where W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, { @@ -281,14 +281,53 @@ where None }; - log::warn!( - "`WireframeApp::handle_connection` called, but connection handling is not \ - implemented; closing stream" - ); - tokio::task::yield_now().await; + if let Err(e) = self.process_stream(&mut stream).await { + log::warn!("connection terminated with error: {e}"); + } if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) { teardown(state).await; } } + + async fn process_stream(&self, stream: &mut W) -> io::Result<()> + where + W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, + { + use tokio::io::AsyncReadExt; + + #[derive(bincode::Decode, bincode::Encode)] + struct Envelope { + id: u32, + msg: Vec, + } + + use tokio::time::{Duration, timeout}; + + let mut buf = BytesMut::with_capacity(1024); + loop { + if let Some(frame) = self.frame_processor.decode(&mut buf)? { + match self.serializer.deserialize::(&frame) { + Ok((env, _)) => { + if let Some(handler) = self.routes.get(&env.id) { + handler().await; + } else { + log::warn!("no handler for message id {}", env.id); + } + + let _ = self.send_response(stream, &env).await; + } + Err(e) => log::warn!("failed to deserialize message: {e}"), + } + } else { + match timeout(Duration::from_millis(10), stream.read_buf(&mut buf)).await { + Ok(Ok(0)) | Err(_) => break, + Ok(Ok(_)) => {} + Ok(Err(e)) => return Err(e), + } + } + } + + Ok(()) + } } From e0c6b35ad31812b7817837aec6440c973ca6815d Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 18 Jun 2025 22:13:06 +0100 Subject: [PATCH 2/6] Fix lint in lifecycle test --- tests/lifecycle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index bcfc41b8..b02a552d 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -67,7 +67,7 @@ async fn teardown_without_setup_does_not_run() { let app = WireframeApp::new() .unwrap() - .on_connection_teardown(move |_| { + .on_connection_teardown(move |()| { let teardown_clone = teardown_clone.clone(); async move { teardown_clone.fetch_add(1, Ordering::SeqCst); From 3c42f796889c946dc8a9d9744c3604418457c2aa Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 19 Jun 2025 01:26:12 +0100 Subject: [PATCH 3/6] Refine connection processing --- src/app.rs | 83 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/src/app.rs b/src/app.rs index e8c5fa68..32dd9877 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,8 +1,8 @@ //! Application builder configuring routes and middleware. //! -//! `WireframeApp` stores registered routes, services, and middleware -//! for a [`WireframeServer`]. Most builder methods return [`Result`] -//! so callers can chain registrations ergonomically. +//! This module defines [`WireframeApp`], an Actix-inspired builder for +//! managing connection state, routing, and middleware in a `WireframeServer`. +//! It exposes convenience methods to register handlers and lifecycle hooks. use std::{boxed::Box, collections::HashMap, future::Future, pin::Pin, sync::Arc}; @@ -81,6 +81,16 @@ impl From for SendError { fn from(e: io::Error) -> Self { SendError::Io(e) } } +/// Basic envelope type used by [`handle_connection`]. +/// +/// Incoming frames are deserialized into an `Envelope` containing the +/// message identifier and raw payload bytes. +#[derive(bincode::Decode, bincode::Encode)] +struct Envelope { + id: u32, + msg: Vec, +} + /// Result type used throughout the builder API. pub type Result = std::result::Result; @@ -294,38 +304,51 @@ where where W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, { - use tokio::io::AsyncReadExt; - - #[derive(bincode::Decode, bincode::Encode)] - struct Envelope { - id: u32, - msg: Vec, - } - - use tokio::time::{Duration, timeout}; - let mut buf = BytesMut::with_capacity(1024); loop { if let Some(frame) = self.frame_processor.decode(&mut buf)? { - match self.serializer.deserialize::(&frame) { - Ok((env, _)) => { - if let Some(handler) = self.routes.get(&env.id) { - handler().await; - } else { - log::warn!("no handler for message id {}", env.id); - } - - let _ = self.send_response(stream, &env).await; - } - Err(e) => log::warn!("failed to deserialize message: {e}"), - } - } else { - match timeout(Duration::from_millis(10), stream.read_buf(&mut buf)).await { - Ok(Ok(0)) | Err(_) => break, - Ok(Ok(_)) => {} - Ok(Err(e)) => return Err(e), + self.handle_frame(stream, &frame).await?; + } else if !self.read_into(stream, &mut buf).await? { + break; + } + } + + Ok(()) + } + + async fn read_into(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result + where + W: tokio::io::AsyncRead + Unpin, + { + use tokio::{ + io::AsyncReadExt, + time::{Duration, timeout}, + }; + + const READ_TIMEOUT: Duration = Duration::from_millis(100); + + match timeout(READ_TIMEOUT, stream.read_buf(buf)).await { + Ok(Ok(_)) => Ok(true), + Ok(Err(e)) => Err(e), + _ => Ok(false), + } + } + + async fn handle_frame(&self, stream: &mut W, frame: &[u8]) -> io::Result<()> + where + W: tokio::io::AsyncWrite + Unpin, + { + match self.serializer.deserialize::(frame) { + Ok((env, _)) => { + if let Some(handler) = self.routes.get(&env.id) { + handler().await; + } else { + log::warn!("no handler for message id {}", env.id); } + + let _ = self.send_response(stream, &env).await; } + Err(e) => log::warn!("failed to deserialize message: {e}"), } Ok(()) From 2aa568fab2cdc262bb2daede864281ea4ded733b Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 19 Jun 2025 02:27:46 +0100 Subject: [PATCH 4/6] Pass envelopes to handlers --- docs/roadmap.md | 5 +++-- src/app.rs | 35 ++++++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/docs/roadmap.md b/docs/roadmap.md index 21c802e1..fd200523 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -39,14 +39,15 @@ after formatting. Line numbers below refer to that file. server::WireframeServer, }; - async fn handler() {} + use wireframe::app::Envelope; + async fn handler(_env: &Envelope) {} #[tokio::main] async fn main() -> std::io::Result<()> { let factory = || { WireframeApp::new() .unwrap() - .route(1, Box::new(|| Box::pin(handler()))) + .route(1, Box::new(|env| Box::pin(handler(env)))) .unwrap() }; diff --git a/src/app.rs b/src/app.rs index 32dd9877..60e6da45 100644 --- a/src/app.rs +++ b/src/app.rs @@ -38,7 +38,7 @@ pub struct WireframeApp Pin + Send>> + Send + Sync>; +pub type Service = Box Pin + Send>> + Send + Sync>; /// Trait representing middleware components. pub trait Middleware: Send + Sync {} @@ -86,11 +86,14 @@ impl From for SendError { /// Incoming frames are deserialized into an `Envelope` containing the /// message identifier and raw payload bytes. #[derive(bincode::Decode, bincode::Encode)] -struct Envelope { +pub struct Envelope { id: u32, msg: Vec, } +/// Number of idle polls before terminating a connection. +const MAX_IDLE_POLLS: u32 = 50; // ~5s with 100ms timeout + /// Result type used throughout the builder API. pub type Result = std::result::Result; @@ -305,18 +308,30 @@ where W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, { let mut buf = BytesMut::with_capacity(1024); + let mut idle = 0u32; + loop { if let Some(frame) = self.frame_processor.decode(&mut buf)? { self.handle_frame(stream, &frame).await?; - } else if !self.read_into(stream, &mut buf).await? { - break; + idle = 0; + } else { + match self.read_into(stream, &mut buf).await? { + Some(0) => break, + Some(_) => idle = 0, + None => { + idle += 1; + if idle >= MAX_IDLE_POLLS { + break; + } + } + } } } Ok(()) } - async fn read_into(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result + async fn read_into(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result> where W: tokio::io::AsyncRead + Unpin, { @@ -328,9 +343,9 @@ where const READ_TIMEOUT: Duration = Duration::from_millis(100); match timeout(READ_TIMEOUT, stream.read_buf(buf)).await { - Ok(Ok(_)) => Ok(true), + Ok(Ok(n)) => Ok(Some(n)), Ok(Err(e)) => Err(e), - _ => Ok(false), + Err(_) => Ok(None), } } @@ -341,12 +356,14 @@ where match self.serializer.deserialize::(frame) { Ok((env, _)) => { if let Some(handler) = self.routes.get(&env.id) { - handler().await; + handler(&env).await; } else { log::warn!("no handler for message id {}", env.id); } - let _ = self.send_response(stream, &env).await; + if let Err(e) = self.send_response(stream, &env).await { + log::warn!("failed to send response: {e}"); + } } Err(e) => log::warn!("failed to deserialize message: {e}"), } From b09af17182d1d98891b5aba0159df76eea0ae3bc Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 19 Jun 2025 10:09:54 +0100 Subject: [PATCH 5/6] Handle transient read errors --- src/app.rs | 45 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/src/app.rs b/src/app.rs index 60e6da45..0fb7d719 100644 --- a/src/app.rs +++ b/src/app.rs @@ -93,6 +93,8 @@ pub struct Envelope { /// Number of idle polls before terminating a connection. const MAX_IDLE_POLLS: u32 = 50; // ~5s with 100ms timeout +/// Maximum consecutive deserialization failures before closing a connection. +const MAX_DESER_FAILURES: u32 = 10; /// Result type used throughout the builder API. pub type Result = std::result::Result; @@ -309,21 +311,31 @@ where { let mut buf = BytesMut::with_capacity(1024); let mut idle = 0u32; + let mut deser_failures = 0u32; loop { if let Some(frame) = self.frame_processor.decode(&mut buf)? { - self.handle_frame(stream, &frame).await?; + self.handle_frame(stream, &frame, &mut deser_failures) + .await?; idle = 0; } else { - match self.read_into(stream, &mut buf).await? { - Some(0) => break, - Some(_) => idle = 0, - None => { + match self.read_into(stream, &mut buf).await { + Ok(Some(0)) => break, + Ok(Some(_)) => idle = 0, + Ok(None) => { idle += 1; if idle >= MAX_IDLE_POLLS { break; } } + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => {} + io::ErrorKind::UnexpectedEof + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::BrokenPipe => break, + _ => return Err(e), + }, } } } @@ -349,23 +361,38 @@ where } } - async fn handle_frame(&self, stream: &mut W, frame: &[u8]) -> io::Result<()> + async fn handle_frame( + &self, + stream: &mut W, + frame: &[u8], + deser_failures: &mut u32, + ) -> io::Result<()> where W: tokio::io::AsyncWrite + Unpin, { match self.serializer.deserialize::(frame) { Ok((env, _)) => { + *deser_failures = 0; if let Some(handler) = self.routes.get(&env.id) { handler(&env).await; } else { log::warn!("no handler for message id {}", env.id); } - if let Err(e) = self.send_response(stream, &env).await { - log::warn!("failed to send response: {e}"); + if let Err(e) = stream.write_all(frame).await { + log::warn!("failed to echo response: {e}"); + } + } + Err(e) => { + *deser_failures += 1; + log::warn!("failed to deserialize message: {e}"); + if *deser_failures >= MAX_DESER_FAILURES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "too many deserialization failures", + )); } } - Err(e) => log::warn!("failed to deserialize message: {e}"), } Ok(()) From e2e3f2b800bfbc30bf4116c822be045bcae6544a Mon Sep 17 00:00:00 2001 From: Leynos Date: Thu, 19 Jun 2025 10:26:39 +0100 Subject: [PATCH 6/6] Refactor connection loop and send responses --- src/app.rs | 70 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/src/app.rs b/src/app.rs index 0fb7d719..78ebc857 100644 --- a/src/app.rs +++ b/src/app.rs @@ -318,31 +318,59 @@ where self.handle_frame(stream, &frame, &mut deser_failures) .await?; idle = 0; - } else { - match self.read_into(stream, &mut buf).await { - Ok(Some(0)) => break, - Ok(Some(_)) => idle = 0, - Ok(None) => { - idle += 1; - if idle >= MAX_IDLE_POLLS { - break; - } - } - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => {} - io::ErrorKind::UnexpectedEof - | io::ErrorKind::ConnectionReset - | io::ErrorKind::ConnectionAborted - | io::ErrorKind::BrokenPipe => break, - _ => return Err(e), - }, - } + continue; + } + + if self.read_and_update(stream, &mut buf, &mut idle).await? { + break; } } Ok(()) } + async fn read_and_update( + &self, + stream: &mut W, + buf: &mut BytesMut, + idle: &mut u32, + ) -> io::Result + where + W: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, + { + match self.read_into(stream, buf).await { + Ok(Some(0)) => Ok(true), + Ok(Some(_)) => { + *idle = 0; + Ok(false) + } + Ok(None) => { + *idle += 1; + Ok(*idle >= MAX_IDLE_POLLS) + } + Err(e) if Self::is_transient_error(&e) => Ok(false), + Err(e) if Self::is_fatal_error(&e) => Ok(true), + Err(e) => Err(e), + } + } + + fn is_transient_error(e: &io::Error) -> bool { + matches!( + e.kind(), + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted + ) + } + + fn is_fatal_error(e: &io::Error) -> bool { + matches!( + e.kind(), + io::ErrorKind::UnexpectedEof + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::BrokenPipe + ) + } + async fn read_into(&self, stream: &mut W, buf: &mut BytesMut) -> io::Result> where W: tokio::io::AsyncRead + Unpin, @@ -379,8 +407,8 @@ where log::warn!("no handler for message id {}", env.id); } - if let Err(e) = stream.write_all(frame).await { - log::warn!("failed to echo response: {e}"); + if let Err(e) = self.send_response(stream, &env).await { + log::warn!("failed to send response: {e}"); } } Err(e) => {