Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/preamble-validator.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
client connects. The server decodes the preamble with
[`read_preamble`](../src/preamble.rs) and can invoke user-supplied callbacks on
success or failure. The helper uses `bincode` to decode any type implementing
`bincode::Decode` and reads exactly the number of bytes required.
`bincode::BorrowDecode` and reads exactly the number of bytes required.

The flow is summarized below:

Expand Down
4 changes: 2 additions & 2 deletions src/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<T: Send + Sync> SharedState<T> {
///
/// # Examples
///
/// ```
/// ```no_run
/// use std::sync::Arc;
/// use wireframe::extractor::SharedState;
///
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<T: Send + Sync> std::ops::Deref for SharedState<T> {
///
/// # Examples
///
/// ```
/// ```no_run
/// use std::sync::Arc;
/// use wireframe::extractor::SharedState;
///
Expand Down
23 changes: 17 additions & 6 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use bincode::error::{DecodeError, EncodeError};
use bincode::{Decode, Encode, config, decode_from_slice, encode_to_vec};
use bincode::{BorrowDecode, Encode, borrow_decode_from_slice, config, encode_to_vec};

/// Wrapper trait for application message types.
///
/// Any type deriving [`Encode`] and [`Decode`] automatically implements
/// Any type deriving [`Encode`] and [`BorrowDecode`] automatically implements
/// this trait via a blanket implementation. The default methods provide
/// convenient helpers to serialize and deserialize using bincode's
/// standard configuration.
pub trait Message: Encode + Decode<()> {
pub trait Message: Encode + for<'de> BorrowDecode<'de, ()> {
/// Serialize the message into a byte vector.
///
/// # Errors
Expand All @@ -22,13 +22,24 @@ pub trait Message: Encode + Decode<()> {
///
/// # Errors
///
/// Returns a [`DecodeError`] if deserialization fails.
/// Deserialises a message instance from a byte slice using the standard configuration.
///
/// Returns the deserialised message and the number of bytes consumed, or a [`DecodeError`] if deserialisation fails.
///
/// # Examples
///
/// ```
/// use your_crate::Message;
/// let bytes = /* some valid serialised message bytes */;
/// let (msg, consumed) = MyMessageType::from_bytes(&bytes).unwrap();
/// assert!(consumed <= bytes.len());
/// ```
fn from_bytes(bytes: &[u8]) -> Result<(Self, usize), DecodeError>
where
Self: Sized,
{
decode_from_slice(bytes, config::standard())
borrow_decode_from_slice(bytes, config::standard())
}
}

impl<T> Message for T where T: Encode + Decode<()> {}
impl<T> Message for T where T: Encode + for<'de> BorrowDecode<'de, ()> {}
50 changes: 44 additions & 6 deletions src/preamble.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use bincode::error::DecodeError;
use bincode::{Decode, config, decode_from_slice};
use bincode::{BorrowDecode, borrow_decode_from_slice, config};
use tokio::io::{self, AsyncRead, AsyncReadExt};

const MAX_PREAMBLE_LEN: usize = 1024;

/// Trait bound for types accepted as connection preambles.
///
/// The bound allows decoding borrowed data for any lifetime without
/// requiring an external decoding context.
pub trait Preamble: for<'de> BorrowDecode<'de, ()> + Send + 'static {}
impl<T> Preamble for T where for<'de> T: BorrowDecode<'de, ()> + Send + 'static {}

async fn read_more<R>(
reader: &mut R,
buf: &mut Vec<u8>,
Expand Down Expand Up @@ -51,21 +58,52 @@ where
/// # Errors
///
/// Returns a [`DecodeError`] if decoding the preamble fails or an
/// underlying I/O error occurs while reading from `reader`.
/// Asynchronously reads and decodes a preamble of type `T` from an async reader using bincode.
///
/// Attempts to decode a value of type `T` from the beginning of the byte stream, reading more bytes as needed until decoding succeeds or an error occurs. Any bytes remaining after the decoded value are returned as leftovers.
///
/// # Returns
///
/// A tuple containing the decoded value and a vector of leftover bytes following the decoded preamble.
///
/// # Errors
///
/// Returns a `DecodeError` if decoding fails or if an I/O error occurs while reading from the reader.
///
/// # Examples
///
/// ```
/// use tokio::io::BufReader;
/// use bincode::BorrowDecode;
///
/// #[derive(Debug, PartialEq, bincode::BorrowDecode)]
/// struct MyPreamble(u8);
///
/// #[tokio::main]
/// async fn main() {
/// let data = bincode::encode_to_vec(MyPreamble(42), bincode::config::standard()).unwrap();
/// let mut reader = BufReader::new(&data[..]);
/// let (preamble, leftover) = read_preamble::<_, MyPreamble>(&mut reader).await.unwrap();
/// assert_eq!(preamble.0, 42);
/// assert!(leftover.is_empty());
/// }
/// ```
pub async fn read_preamble<R, T>(reader: &mut R) -> Result<(T, Vec<u8>), DecodeError>
where
R: AsyncRead + Unpin,
// `Decode` expects a decoding context type, not a lifetime. Most callers
// use the unit type as the context, which requires no external state.
T: Decode<()>,
// Decode borrowed data for any lifetime without external context.
for<'de> T: BorrowDecode<'de, ()>,
{
// Read a small chunk upfront to avoid a guaranteed decode failure on the
// first iteration.
let mut buf = Vec::new();
read_more(reader, &mut buf, 8.min(MAX_PREAMBLE_LEN)).await?;
// Build the decoder configuration once to avoid repeated allocations.
let config = config::standard()
.with_big_endian()
.with_fixed_int_encoding();
loop {
match decode_from_slice::<T, _>(&buf, config) {
match borrow_decode_from_slice::<T, _>(&buf, config) {
Ok((value, consumed)) => {
let leftover = buf.split_off(consumed);
return Ok((value, leftover));
Expand Down
54 changes: 39 additions & 15 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::time::{Duration, sleep};

use core::marker::PhantomData;

use crate::preamble::read_preamble;
use crate::preamble::{Preamble, read_preamble};
use crate::rewind_stream::RewindStream;
use bincode::error::DecodeError;

Expand All @@ -30,9 +30,9 @@ use crate::app::WireframeApp;
pub struct WireframeServer<F, T = ()>
where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
// `Decode`'s type parameter represents a decoding context.
// The unit type signals that no context is required.
T: bincode::Decode<()> + Send + 'static,
// `Preamble` covers types implementing `BorrowDecode` for any lifetime,
// enabling decoding of borrowed data without external context.
T: Preamble,
{
factory: F,
listener: Option<Arc<TcpListener>>,
Expand Down Expand Up @@ -91,17 +91,17 @@ where
///
/// # Examples
///
/// ```ignore
/// ```no_run
/// # use wireframe::server::WireframeServer;
/// # let factory = || todo!();
/// # let factory = || WireframeApp::new().unwrap();
/// # struct MyPreamble;
/// let server = WireframeServer::new(factory).with_preamble::<MyPreamble>();
/// ```
#[must_use]
pub fn with_preamble<T>(self) -> WireframeServer<F, T>
pub fn with_preamble<P>(self) -> WireframeServer<F, P>
where
// Unit context indicates no external state is required when decoding.
T: bincode::Decode<()> + Send + 'static,
// New preamble types must satisfy the `Preamble` bound.
P: Preamble,
{
WireframeServer {
factory: self.factory,
Expand All @@ -117,8 +117,8 @@ where
impl<F, T> WireframeServer<F, T>
where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
// `Decode` is generic over a context type; we use `()` here.
T: bincode::Decode<()> + Send + 'static,
// The preamble type must satisfy the `Preamble` bound.
T: Preamble,
{
/// Set the number of worker tasks to spawn for the server.
///
Expand Down Expand Up @@ -327,6 +327,9 @@ where
}

#[allow(clippy::type_complexity)]
/// Runs a worker task that accepts incoming TCP connections and processes them asynchronously.
///
/// Each accepted connection is handled in a separate task, with optional callbacks for preamble decode success or failure. The worker listens for shutdown signals to terminate gracefully. Accept errors are retried with exponential backoff.
async fn worker_task<F, T>(
listener: Arc<TcpListener>,
factory: F,
Expand All @@ -335,8 +338,8 @@ async fn worker_task<F, T>(
shutdown_rx: &mut broadcast::Receiver<()>,
) where
F: Fn() -> WireframeApp + Send + Sync + Clone + 'static,
// The unit context indicates no additional state is needed to decode `T`.
T: bincode::Decode<()> + Send + 'static,
// `Preamble` ensures `T` supports borrowed decoding.
T: Preamble,
{
let mut delay = Duration::from_millis(10);
loop {
Expand All @@ -361,15 +364,36 @@ async fn worker_task<F, T>(
}

#[allow(clippy::type_complexity)]
/// Processes an incoming TCP stream by decoding a preamble and dispatching the connection to a `WireframeApp`.
///
/// Attempts to asynchronously decode a preamble of type `T` from the provided stream. If decoding succeeds, invokes the optional success handler, wraps the stream to include any leftover bytes, and passes it to a new `WireframeApp` instance for connection handling. If decoding fails, invokes the optional failure handler and closes the connection.
///
/// # Type Parameters
///
/// - `F`: A factory closure that produces `WireframeApp` instances.
/// - `T`: The preamble type, which must support borrowed decoding via the `Preamble` trait.
///
/// # Examples
///
/// ```no_run
/// # use std::sync::Arc;
/// # use mycrate::{process_stream, WireframeApp, Preamble};
/// # use tokio::net::TcpStream;
/// # async fn example() {
/// let stream: TcpStream = /* ... */;
/// let factory = || WireframeApp::new();
/// process_stream::<_, ()>(stream, factory, None, None).await;
/// # }
/// ```
async fn process_stream<F, T>(
mut stream: tokio::net::TcpStream,
factory: F,
on_success: Option<Arc<dyn Fn(&T) + Send + Sync>>,
on_failure: Option<Arc<dyn Fn(&DecodeError) + Send + Sync>>,
) where
F: Fn() -> WireframeApp + Send + Sync + 'static,
// The decoding context parameter is `()`; no external state is needed.
T: bincode::Decode<()> + Send + 'static,
// `Preamble` ensures `T` supports borrowed decoding.
T: Preamble,
{
match read_preamble::<_, T>(&mut stream).await {
Ok((preamble, leftover)) => {
Expand Down
Loading