diff --git a/docs/preamble-validator.md b/docs/preamble-validator.md index f6561504..94e4b324 100644 --- a/docs/preamble-validator.md +++ b/docs/preamble-validator.md @@ -4,7 +4,7 @@ client connects. The server decodes the preamble with [`read_preamble`](../src/preamble.rs) and can invoke user-supplied callbacks on success or failure. The helper uses `bincode` to decode any type implementing -`bincode::Decode` and reads exactly the number of bytes required. +`bincode::BorrowDecode` and reads exactly the number of bytes required. The flow is summarized below: diff --git a/src/extractor.rs b/src/extractor.rs index 51ffec53..a20b9d6c 100644 --- a/src/extractor.rs +++ b/src/extractor.rs @@ -73,7 +73,7 @@ impl SharedState { /// /// # Examples /// - /// ``` + /// ```no_run /// use std::sync::Arc; /// use wireframe::extractor::SharedState; /// @@ -108,7 +108,7 @@ impl std::ops::Deref for SharedState { /// /// # Examples /// - /// ``` + /// ```no_run /// use std::sync::Arc; /// use wireframe::extractor::SharedState; /// diff --git a/src/message.rs b/src/message.rs index dc3acdc6..c0933c33 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,13 +1,13 @@ use bincode::error::{DecodeError, EncodeError}; -use bincode::{Decode, Encode, config, decode_from_slice, encode_to_vec}; +use bincode::{BorrowDecode, Encode, borrow_decode_from_slice, config, encode_to_vec}; /// Wrapper trait for application message types. /// -/// Any type deriving [`Encode`] and [`Decode`] automatically implements +/// Any type deriving [`Encode`] and [`BorrowDecode`] automatically implements /// this trait via a blanket implementation. The default methods provide /// convenient helpers to serialize and deserialize using bincode's /// standard configuration. -pub trait Message: Encode + Decode<()> { +pub trait Message: Encode + for<'de> BorrowDecode<'de, ()> { /// Serialize the message into a byte vector. /// /// # Errors @@ -22,13 +22,24 @@ pub trait Message: Encode + Decode<()> { /// /// # Errors /// - /// Returns a [`DecodeError`] if deserialization fails. + /// Deserialises a message instance from a byte slice using the standard configuration. + /// + /// Returns the deserialised message and the number of bytes consumed, or a [`DecodeError`] if deserialisation fails. + /// + /// # Examples + /// + /// ``` + /// use your_crate::Message; + /// let bytes = /* some valid serialised message bytes */; + /// let (msg, consumed) = MyMessageType::from_bytes(&bytes).unwrap(); + /// assert!(consumed <= bytes.len()); + /// ``` fn from_bytes(bytes: &[u8]) -> Result<(Self, usize), DecodeError> where Self: Sized, { - decode_from_slice(bytes, config::standard()) + borrow_decode_from_slice(bytes, config::standard()) } } -impl Message for T where T: Encode + Decode<()> {} +impl Message for T where T: Encode + for<'de> BorrowDecode<'de, ()> {} diff --git a/src/preamble.rs b/src/preamble.rs index 81c9bc0c..cd94093a 100644 --- a/src/preamble.rs +++ b/src/preamble.rs @@ -1,9 +1,16 @@ use bincode::error::DecodeError; -use bincode::{Decode, config, decode_from_slice}; +use bincode::{BorrowDecode, borrow_decode_from_slice, config}; use tokio::io::{self, AsyncRead, AsyncReadExt}; const MAX_PREAMBLE_LEN: usize = 1024; +/// Trait bound for types accepted as connection preambles. +/// +/// The bound allows decoding borrowed data for any lifetime without +/// requiring an external decoding context. +pub trait Preamble: for<'de> BorrowDecode<'de, ()> + Send + 'static {} +impl Preamble for T where for<'de> T: BorrowDecode<'de, ()> + Send + 'static {} + async fn read_more( reader: &mut R, buf: &mut Vec, @@ -51,21 +58,52 @@ where /// # Errors /// /// Returns a [`DecodeError`] if decoding the preamble fails or an -/// underlying I/O error occurs while reading from `reader`. +/// Asynchronously reads and decodes a preamble of type `T` from an async reader using bincode. +/// +/// Attempts to decode a value of type `T` from the beginning of the byte stream, reading more bytes as needed until decoding succeeds or an error occurs. Any bytes remaining after the decoded value are returned as leftovers. +/// +/// # Returns +/// +/// A tuple containing the decoded value and a vector of leftover bytes following the decoded preamble. +/// +/// # Errors +/// +/// Returns a `DecodeError` if decoding fails or if an I/O error occurs while reading from the reader. +/// +/// # Examples +/// +/// ``` +/// use tokio::io::BufReader; +/// use bincode::BorrowDecode; +/// +/// #[derive(Debug, PartialEq, bincode::BorrowDecode)] +/// struct MyPreamble(u8); +/// +/// #[tokio::main] +/// async fn main() { +/// let data = bincode::encode_to_vec(MyPreamble(42), bincode::config::standard()).unwrap(); +/// let mut reader = BufReader::new(&data[..]); +/// let (preamble, leftover) = read_preamble::<_, MyPreamble>(&mut reader).await.unwrap(); +/// assert_eq!(preamble.0, 42); +/// assert!(leftover.is_empty()); +/// } +/// ``` pub async fn read_preamble(reader: &mut R) -> Result<(T, Vec), DecodeError> where R: AsyncRead + Unpin, - // `Decode` expects a decoding context type, not a lifetime. Most callers - // use the unit type as the context, which requires no external state. - T: Decode<()>, + // Decode borrowed data for any lifetime without external context. + for<'de> T: BorrowDecode<'de, ()>, { + // Read a small chunk upfront to avoid a guaranteed decode failure on the + // first iteration. let mut buf = Vec::new(); + read_more(reader, &mut buf, 8.min(MAX_PREAMBLE_LEN)).await?; // Build the decoder configuration once to avoid repeated allocations. let config = config::standard() .with_big_endian() .with_fixed_int_encoding(); loop { - match decode_from_slice::(&buf, config) { + match borrow_decode_from_slice::(&buf, config) { Ok((value, consumed)) => { let leftover = buf.split_off(consumed); return Ok((value, leftover)); diff --git a/src/server.rs b/src/server.rs index 4ad55484..2415da89 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,7 +13,7 @@ use tokio::time::{Duration, sleep}; use core::marker::PhantomData; -use crate::preamble::read_preamble; +use crate::preamble::{Preamble, read_preamble}; use crate::rewind_stream::RewindStream; use bincode::error::DecodeError; @@ -30,9 +30,9 @@ use crate::app::WireframeApp; pub struct WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - // `Decode`'s type parameter represents a decoding context. - // The unit type signals that no context is required. - T: bincode::Decode<()> + Send + 'static, + // `Preamble` covers types implementing `BorrowDecode` for any lifetime, + // enabling decoding of borrowed data without external context. + T: Preamble, { factory: F, listener: Option>, @@ -91,17 +91,17 @@ where /// /// # Examples /// - /// ```ignore + /// ```no_run /// # use wireframe::server::WireframeServer; - /// # let factory = || todo!(); + /// # let factory = || WireframeApp::new().unwrap(); /// # struct MyPreamble; /// let server = WireframeServer::new(factory).with_preamble::(); /// ``` #[must_use] - pub fn with_preamble(self) -> WireframeServer + pub fn with_preamble

(self) -> WireframeServer where - // Unit context indicates no external state is required when decoding. - T: bincode::Decode<()> + Send + 'static, + // New preamble types must satisfy the `Preamble` bound. + P: Preamble, { WireframeServer { factory: self.factory, @@ -117,8 +117,8 @@ where impl WireframeServer where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - // `Decode` is generic over a context type; we use `()` here. - T: bincode::Decode<()> + Send + 'static, + // The preamble type must satisfy the `Preamble` bound. + T: Preamble, { /// Set the number of worker tasks to spawn for the server. /// @@ -327,6 +327,9 @@ where } #[allow(clippy::type_complexity)] +/// Runs a worker task that accepts incoming TCP connections and processes them asynchronously. +/// +/// Each accepted connection is handled in a separate task, with optional callbacks for preamble decode success or failure. The worker listens for shutdown signals to terminate gracefully. Accept errors are retried with exponential backoff. async fn worker_task( listener: Arc, factory: F, @@ -335,8 +338,8 @@ async fn worker_task( shutdown_rx: &mut broadcast::Receiver<()>, ) where F: Fn() -> WireframeApp + Send + Sync + Clone + 'static, - // The unit context indicates no additional state is needed to decode `T`. - T: bincode::Decode<()> + Send + 'static, + // `Preamble` ensures `T` supports borrowed decoding. + T: Preamble, { let mut delay = Duration::from_millis(10); loop { @@ -361,6 +364,27 @@ async fn worker_task( } #[allow(clippy::type_complexity)] +/// Processes an incoming TCP stream by decoding a preamble and dispatching the connection to a `WireframeApp`. +/// +/// Attempts to asynchronously decode a preamble of type `T` from the provided stream. If decoding succeeds, invokes the optional success handler, wraps the stream to include any leftover bytes, and passes it to a new `WireframeApp` instance for connection handling. If decoding fails, invokes the optional failure handler and closes the connection. +/// +/// # Type Parameters +/// +/// - `F`: A factory closure that produces `WireframeApp` instances. +/// - `T`: The preamble type, which must support borrowed decoding via the `Preamble` trait. +/// +/// # Examples +/// +/// ```no_run +/// # use std::sync::Arc; +/// # use mycrate::{process_stream, WireframeApp, Preamble}; +/// # use tokio::net::TcpStream; +/// # async fn example() { +/// let stream: TcpStream = /* ... */; +/// let factory = || WireframeApp::new(); +/// process_stream::<_, ()>(stream, factory, None, None).await; +/// # } +/// ``` async fn process_stream( mut stream: tokio::net::TcpStream, factory: F, @@ -368,8 +392,8 @@ async fn process_stream( on_failure: Option>, ) where F: Fn() -> WireframeApp + Send + Sync + 'static, - // The decoding context parameter is `()`; no external state is needed. - T: bincode::Decode<()> + Send + 'static, + // `Preamble` ensures `T` supports borrowed decoding. + T: Preamble, { match read_preamble::<_, T>(&mut stream).await { Ok((preamble, leftover)) => {