Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
serde = { version = "1", features = ["derive"] }
bincode = "2"
tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time"] }
tokio = { version = "1", default-features = false, features = ["net", "signal", "rt-multi-thread", "macros", "sync", "time", "io-util"] }
futures = "0.3"
async-trait = "0.1"
bytes = "1"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ reduce this boilerplate through layered abstractions:

- **Transport adapter** built on Tokio I/O
- **Framing layer** for length‑prefixed or custom frames
- **Connection preamble** with customizable validation callbacks [[docs](docs/preamble-validator.md)]
- **Serialization engine** using `bincode` or a `wire-rs` wrapper
- **Routing engine** that dispatches messages by ID
- **Handler invocation** with extractor support
Expand Down
33 changes: 33 additions & 0 deletions docs/preamble-validator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Connection Preamble Validation

`wireframe` supports an optional connection preamble that is read as soon as a
client connects. The server decodes the preamble with
[`read_preamble`](../src/preamble.rs) and can invoke user-supplied callbacks on
success or failure. The helper uses `bincode` to decode any type implementing
`bincode::Decode` and reads exactly the number of bytes required.

The flow is summarized below:

```mermaid
sequenceDiagram
participant Client
participant Server
participant PreambleDecoder
participant SuccessCallback
participant FailureCallback

Client->>Server: Connects and sends preamble bytes
Server->>PreambleDecoder: Reads and decodes preamble
alt Decode success
PreambleDecoder-->>Server: Decoded preamble (T)
Server->>SuccessCallback: Invoke with preamble data
else Decode failure
PreambleDecoder-->>Server: DecodeError
Server->>FailureCallback: Invoke with error
end
Server-->>Client: (Continues or closes connection)
```

In the tests, a `HotlinePreamble` struct illustrates the pattern, but any
preamble type may be used. Register callbacks via `on_preamble_decode_success`
and `on_preamble_decode_failure` on `WireframeServer`.
7 changes: 4 additions & 3 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ after formatting. Line numbers below refer to that file.
}
```

- [ ] Add connection preamble support.
Provide built-in parsing of a handshake preamble (e.g., Hotline's "TRTP")
and invoke a user-configured handler on success or failure.
- [x] Add connection preamble support.
Provide generic parsing of connection preambles with a Hotline handshake
example in the tests. Invoke user-configured callbacks on decode success
or failure. See [preamble-validator](preamble-validator.md).
- [ ] Add response serialization and transmission.
Encode handler responses using the selected serialization format and write
them back through the framing layer.
Expand Down
12 changes: 12 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,16 @@ impl WireframeApp {
self.middleware.push(Box::new(mw));
Ok(self)
}

/// Handle an accepted connection.
///
/// This placeholder simply drops the stream. Future implementations
/// will decode frames and dispatch them to registered handlers.
pub async fn handle_connection<S>(&self, _stream: S)
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
// Connection handling will be implemented later.
tokio::task::yield_now().await;
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ pub mod extractor;
pub mod frame;
pub mod message;
pub mod middleware;
pub mod preamble;
pub mod rewind_stream;
pub mod server;
79 changes: 79 additions & 0 deletions src/preamble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use bincode::error::DecodeError;
use bincode::{Decode, config, decode_from_slice};
use tokio::io::{self, AsyncRead, AsyncReadExt};

const MAX_PREAMBLE_LEN: usize = 1024;

async fn read_more<R>(
reader: &mut R,
buf: &mut Vec<u8>,
additional: usize,
) -> Result<(), DecodeError>
where
R: AsyncRead + Unpin,
{
let start = buf.len();
if start + additional > MAX_PREAMBLE_LEN {
return Err(DecodeError::Other("preamble too long"));
}
buf.resize(start + additional, 0);
let mut read = 0;
while read < additional {
match reader
.read(&mut buf[start + read..start + additional])
.await
{
Ok(0) => {
return Err(DecodeError::Io {
inner: io::Error::from(io::ErrorKind::UnexpectedEof),
additional: additional - read,
});
}
Ok(n) => read += n,
Err(inner) => {
return Err(DecodeError::Io {
inner,
additional: additional - read,
});
}
}
}
Ok(())
}

/// Read and decode a connection preamble using bincode.
///
/// This helper reads the exact number of bytes required by `T`, as
/// indicated by [`DecodeError::UnexpectedEnd`]. Additional bytes are
/// requested from the reader until decoding succeeds or fails for some
/// other reason.
///
/// # Errors
///
/// Returns a [`DecodeError`] if decoding the preamble fails or an
/// underlying I/O error occurs while reading from `reader`.
pub async fn read_preamble<R, T>(reader: &mut R) -> Result<(T, Vec<u8>), DecodeError>
where
R: AsyncRead + Unpin,
// `Decode` expects a decoding context type, not a lifetime. Most callers
// use the unit type as the context, which requires no external state.
T: Decode<()>,
{
let mut buf = Vec::new();
// Build the decoder configuration once to avoid repeated allocations.
let config = config::standard()
.with_big_endian()
.with_fixed_int_encoding();
loop {
match decode_from_slice::<T, _>(&buf, config) {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Ok((value, consumed)) => {
let leftover = buf.split_off(consumed);
return Ok((value, leftover));
}
Err(DecodeError::UnexpectedEnd { additional }) => {
read_more(reader, &mut buf, additional).await?;
}
Err(e) => return Err(e),
}
}
}
72 changes: 72 additions & 0 deletions src/rewind_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

/// A stream adapter that replays buffered bytes before reading
/// from the underlying stream.
pub struct RewindStream<S> {
leftover: Vec<u8>,
pos: usize,
inner: S,
}

impl<S> RewindStream<S> {
/// Create a new `RewindStream` that will yield `leftover` before
/// delegating to `inner`.
pub fn new(leftover: Vec<u8>, inner: S) -> Self {
Self {
leftover,
pos: 0,
inner,
}
}
}

impl<S: AsyncRead + Unpin> AsyncRead for RewindStream<S> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.pos < self.leftover.len() {
let remaining = self.leftover.len() - self.pos;
let to_copy = remaining.min(buf.remaining());
let start = self.pos;
let end = start + to_copy;
buf.put_slice(&self.leftover[start..end]);
self.pos += to_copy;
if self.pos < self.leftover.len() || to_copy > 0 {
return Poll::Ready(Ok(()));
}
}

if self.pos >= self.leftover.len() && !self.leftover.is_empty() {
self.leftover.clear();
self.pos = 0;
}

Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

impl<S: AsyncWrite + Unpin> AsyncWrite for RewindStream<S> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}

impl<S: Unpin> Unpin for RewindStream<S> {}
Loading
Loading