diff --git a/Cargo.lock b/Cargo.lock index df99e98a..0c67811e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,6 +121,12 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -1207,6 +1213,7 @@ dependencies = [ "async-stream", "async-trait", "bincode", + "byteorder", "bytes", "dashmap", "futures", diff --git a/Cargo.toml b/Cargo.toml index 8cb4cd46..d8900275 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ tokio-util = { version = "0.7", features = ["rt"] } futures = "0.3" async-trait = "0.1" bytes = "1" +byteorder = "1" log = "0.4" dashmap = "5" diff --git a/docs/rust-binary-router-library-design.md b/docs/rust-binary-router-library-design.md index fd515843..3599733d 100644 --- a/docs/rust-binary-router-library-design.md +++ b/docs/rust-binary-router-library-design.md @@ -1123,50 +1123,57 @@ examples are invaluable. They make the abstract design tangible and showcase how 2. **Frame Processor Implementation** (Simple Length-Prefixed Framing using `tokio-util`): - ```rust - // Crate: my_frame_processor.rs - use bytes::{BytesMut, Buf, BufMut}; - use tokio_util::codec::{Encoder, Decoder}; - use std::io; + ```rust + // Crate: my_frame_processor.rs + use bytes::{BytesMut, Buf, BufMut}; + use tokio_util::codec::{Decoder, Encoder}; + use byteorder::{BigEndian, ReadBytesExt}; + use std::io; - pub struct LengthPrefixedCodec; + const MAX_FRAME_LEN: usize = 16 * 1024 * 1024; // 16 MiB upper limit - impl Decoder for LengthPrefixedCodec { - type Item = BytesMut; // Raw frame payload - type Error = io::Error; + pub struct LengthPrefixedCodec; - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - if src.len() < 4 { return Ok(None); } // Not enough data for length prefix + impl Decoder for LengthPrefixedCodec { + type Item = BytesMut; // Raw frame payload + type Error = io::Error; - let mut length_bytes = [0u8; 4]; - length_bytes.copy_from_slice(&src[..4]); - let length = u32::from_be_bytes(length_bytes) as usize; + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.len() < 4 { return Ok(None); } // Not enough data for length prefix - if src.len() < 4 + length { - src.reserve(4 + length - src.len()); - return Ok(None); // Not enough data for full frame - } + let length = (&src[..4]) + .read_u32::() + .expect("slice length checked") as usize; - src.advance(4); // Consume length prefix - Ok(Some(src.split_to(length))) - } - } + if length > MAX_FRAME_LEN { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "frame too large")); + } - impl> Encoder for LengthPrefixedCodec { - type Error = io::Error; + if src.len() < 4 + length { + src.reserve(4 + length - src.len()); + return Ok(None); // Not enough data for full frame + } - fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { - let data = item.as_ref(); - dst.reserve(4 + data.len()); - dst.put_u32(data.len() as u32); - dst.put_slice(data); - Ok(()) - } - } - ``` + src.advance(4); // Consume length prefix + Ok(Some(src.split_to(length))) + } + } + + impl> Encoder for LengthPrefixedCodec { + type Error = io::Error; + + fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { + let data = item.as_ref(); + dst.reserve(4 + data.len()); + dst.put_u32(data.len() as u32); + dst.put_slice(data); + Ok(()) + } + } + ``` - (Note: "wireframe" would abstract the direct use of `Encoder`/`Decoder` - behind its own `FrameProcessor` trait or provide helpers.) + (Note: "wireframe" would abstract the direct use of `Encoder`/`Decoder` behind + its own `FrameProcessor` trait or provide helpers.) 1. **Server Setup and Handler**: diff --git a/src/frame.rs b/src/frame.rs index 65148f00..0fadf4f1 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -6,8 +6,132 @@ use std::io; +use byteorder::{BigEndian, LittleEndian, ReadBytesExt}; + +const ERR_UNSUPPORTED_PREFIX: &str = "unsupported length prefix size"; +const ERR_FRAME_TOO_LARGE: &str = "frame too large"; +const ERR_INCOMPLETE_PREFIX: &str = "incomplete length prefix"; + +#[derive(Copy, Clone)] +enum PrefixErr { + UnsupportedSize, + Incomplete, +} + +fn prefix_err(kind: PrefixErr) -> io::Error { + match kind { + PrefixErr::UnsupportedSize => { + io::Error::new(io::ErrorKind::InvalidInput, ERR_UNSUPPORTED_PREFIX) + } + PrefixErr::Incomplete => { + io::Error::new(io::ErrorKind::UnexpectedEof, ERR_INCOMPLETE_PREFIX) + } + } +} + +/// Checked conversion from `usize` to a specific prefix integer type. +/// +/// Returns `ERR_FRAME_TOO_LARGE` if the value does not fit in `T`. +fn checked_prefix_cast>(len: usize) -> io::Result { + T::try_from(len).map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, ERR_FRAME_TOO_LARGE)) +} + use bytes::{Buf, BufMut, BytesMut}; +/// Converts a byte slice into a `u64` according to `size` and `endianness`. +/// +/// Only prefix sizes of `1`, `2`, `4`, or `8` bytes are supported. `bytes` must +/// contain at least `size` bytes. +/// +/// # Errors +/// Returns [`io::ErrorKind::InvalidInput`] if `size` is unsupported or +/// [`io::ErrorKind::UnexpectedEof`] if `bytes` is too short. +/// +/// # Examples +/// +/// ```rust,no_run +/// use wireframe::frame::{Endianness, bytes_to_u64}; +/// let buf = [0x00, 0x10, 0x20, 0x30]; +/// assert_eq!(bytes_to_u64(&buf, 2, Endianness::Big).unwrap(), 0x0010); +/// ``` +pub fn bytes_to_u64(bytes: &[u8], size: usize, endianness: Endianness) -> io::Result { + if !matches!(size, 1 | 2 | 4 | 8) { + return Err(prefix_err(PrefixErr::UnsupportedSize)); + } + if bytes.len() < size { + return Err(prefix_err(PrefixErr::Incomplete)); + } + + let mut cur = io::Cursor::new(&bytes[..size]); + let val = match (size, endianness) { + (1, _) => cur.read_u8().map(u64::from), + (2, Endianness::Big) => cur.read_u16::().map(u64::from), + (2, Endianness::Little) => cur.read_u16::().map(u64::from), + (4, Endianness::Big) => cur.read_u32::().map(u64::from), + (4, Endianness::Little) => cur.read_u32::().map(u64::from), + (8, Endianness::Big) => cur.read_u64::(), + (8, Endianness::Little) => cur.read_u64::(), + // size is validated above so this branch is unreachable + _ => unreachable!(), + }?; + Ok(val) +} + +/// Encodes an integer directly into `out` according to `size` and `endianness`. +/// +/// The function supports prefix sizes of `1`, `2`, `4`, or `8` bytes. +/// +/// # Errors +/// Returns [`io::ErrorKind::InvalidInput`] if the size is unsupported or if +/// `len` does not fit into the prefix. +/// +/// # Examples +/// +/// ```rust,no_run +/// use wireframe::frame::{Endianness, u64_to_bytes}; +/// let mut buf = [0u8; 8]; +/// let written = u64_to_bytes(0x1234, 2, Endianness::Big, &mut buf).unwrap(); +/// assert_eq!(&buf[..written], [0x12, 0x34]); +/// ``` +#[must_use = "length prefix byte count must be used"] +pub fn u64_to_bytes( + len: usize, + size: usize, + endianness: Endianness, + out: &mut [u8; 8], +) -> io::Result { + match (size, endianness) { + (1, _) => { + out[0] = checked_prefix_cast::(len)?; + } + (2, Endianness::Big) => { + out[..2].copy_from_slice(&checked_prefix_cast::(len)?.to_be_bytes()); + } + (2, Endianness::Little) => { + out[..2].copy_from_slice(&checked_prefix_cast::(len)?.to_le_bytes()); + } + (4, Endianness::Big) => { + out[..4].copy_from_slice(&checked_prefix_cast::(len)?.to_be_bytes()); + } + (4, Endianness::Little) => { + out[..4].copy_from_slice(&checked_prefix_cast::(len)?.to_le_bytes()); + } + (8, Endianness::Big) => { + out[..8].copy_from_slice(&checked_prefix_cast::(len)?.to_be_bytes()); + } + (8, Endianness::Little) => { + out[..8].copy_from_slice(&checked_prefix_cast::(len)?.to_le_bytes()); + } + _ => { + return Err(prefix_err(PrefixErr::UnsupportedSize)); + } + } + + out[size..].fill(0); + + Ok(size) +} + /// Byte order used for encoding and decoding length prefixes. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Endianness { @@ -67,30 +191,9 @@ impl LengthFormat { /// Returns an error if the prefix size is unsupported or if the decoded length does not fit in /// a `usize`. fn read_len(&self, bytes: &[u8]) -> io::Result { - let len = match (self.bytes, self.endianness) { - (1, _) => u64::from(u8::from_ne_bytes([bytes[0]])), - (2, Endianness::Big) => u64::from(u16::from_be_bytes([bytes[0], bytes[1]])), - (2, Endianness::Little) => u64::from(u16::from_le_bytes([bytes[0], bytes[1]])), - (4, Endianness::Big) => { - u64::from(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])) - } - (4, Endianness::Little) => { - u64::from(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])) - } - (8, Endianness::Big) => u64::from_be_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ]), - (8, Endianness::Little) => u64::from_le_bytes([ - bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], - ]), - _ => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "unsupported length prefix size", - )); - } - }; - usize::try_from(len).map_err(|_| io::Error::other("frame too large")) + let len = bytes_to_u64(bytes, self.bytes, self.endianness)?; + usize::try_from(len) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, ERR_FRAME_TOO_LARGE)) } /// Writes a length prefix to the destination buffer using the configured size and endianness. @@ -106,48 +209,9 @@ impl LengthFormat { /// Returns an error if `len` exceeds the maximum value for the configured prefix size or if the /// prefix size is not supported. fn write_len(&self, len: usize, dst: &mut BytesMut) -> io::Result<()> { - match (self.bytes, self.endianness) { - (1, _) => dst.put_u8( - u8::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))?, - ), - (2, Endianness::Big) => dst.put_slice( - &u16::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))? - .to_be_bytes(), - ), - (2, Endianness::Little) => dst.put_slice( - &u16::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))? - .to_le_bytes(), - ), - (4, Endianness::Big) => dst.put_slice( - &u32::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))? - .to_be_bytes(), - ), - (4, Endianness::Little) => dst.put_slice( - &u32::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))? - .to_le_bytes(), - ), - (8, Endianness::Big) => dst.put_slice( - &u64::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))? - .to_be_bytes(), - ), - (8, Endianness::Little) => dst.put_slice( - &u64::try_from(len) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "frame too large"))? - .to_le_bytes(), - ), - _ => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "unsupported length prefix size", - )); - } - } + let mut buf = [0u8; 8]; + let written = u64_to_bytes(len, self.bytes, self.endianness, &mut buf)?; + dst.put_slice(&buf[..written]); Ok(()) } } @@ -255,7 +319,12 @@ impl FrameProcessor for LengthPrefixedProcessor { return Ok(None); } let len = self.format.read_len(&src[..self.format.bytes])?; - if src.len() < self.format.bytes + len { + let needed = self + .format + .bytes + .checked_add(len) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, ERR_FRAME_TOO_LARGE))?; + if src.len() < needed { return Ok(None); } src.advance(self.format.bytes); @@ -273,3 +342,99 @@ impl FrameProcessor for LengthPrefixedProcessor { Ok(()) } } + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + + #[rstest] + #[case(vec![0x12], 1, Endianness::Big, 0x12)] + #[case(vec![0x12, 0x34], 2, Endianness::Big, 0x1234)] + #[case(vec![0x34, 0x12], 2, Endianness::Little, 0x1234)] + #[case(vec![0, 0, 0, 1], 4, Endianness::Big, 1)] + #[case(vec![1, 0, 0, 0], 4, Endianness::Little, 1)] + #[case(vec![0, 0, 0, 0, 0, 0, 0, 1], 8, Endianness::Big, 1)] + #[case(vec![1, 0, 0, 0, 0, 0, 0, 0], 8, Endianness::Little, 1)] + fn bytes_to_u64_ok( + #[case] bytes: Vec, + #[case] size: usize, + #[case] endianness: Endianness, + #[case] expected: u64, + ) { + assert_eq!( + bytes_to_u64(&bytes, size, endianness).expect("bytes_to_u64 should succeed"), + expected + ); + } + + #[rstest] + #[case(0x12usize, 1, Endianness::Big, vec![0x12])] + #[case(0x1234usize, 2, Endianness::Big, vec![0x12, 0x34])] + #[case(0x1234usize, 2, Endianness::Little, vec![0x34, 0x12])] + #[case(1usize, 4, Endianness::Big, vec![0, 0, 0, 1])] + #[case(1usize, 4, Endianness::Little, vec![1, 0, 0, 0])] + #[case(1usize, 8, Endianness::Big, vec![0, 0, 0, 0, 0, 0, 0, 1])] + #[case(1usize, 8, Endianness::Little, vec![1, 0, 0, 0, 0, 0, 0, 0])] + fn u64_to_bytes_ok( + #[case] value: usize, + #[case] size: usize, + #[case] endianness: Endianness, + #[case] expected: Vec, + ) { + let mut buf = [0u8; 8]; + let written = + u64_to_bytes(value, size, endianness, &mut buf).expect("u64_to_bytes should succeed"); + assert_eq!(written, size); + assert_eq!(&buf[..written], expected.as_slice()); + } + + #[rstest] + #[case(vec![0x01], 2, Endianness::Big)] + #[case(vec![0x02, 0x03], 4, Endianness::Little)] + fn bytes_to_u64_short( + #[case] bytes: Vec, + #[case] size: usize, + #[case] endianness: Endianness, + ) { + let err = bytes_to_u64(&bytes, size, endianness) + .expect_err("bytes_to_u64 must error for truncated prefix slice"); + assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof); + } + + #[rstest] + #[case(vec![0x01, 0x02, 0x03], 3, Endianness::Big)] + #[case(vec![0x01, 0x02, 0x03], 3, Endianness::Little)] + fn bytes_to_u64_unsupported( + #[case] bytes: Vec, + #[case] size: usize, + #[case] endianness: Endianness, + ) { + let err = bytes_to_u64(&bytes, size, endianness) + .expect_err("bytes_to_u64 must fail for unsupported size"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + } + + #[rstest] + fn u64_to_bytes_large() { + let mut buf = [0u8; 8]; + let err = u64_to_bytes(300, 1, Endianness::Big, &mut buf) + .expect_err("u64_to_bytes must fail if value exceeds 1-byte prefix"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + } + + #[rstest] + #[case(1usize, 3, Endianness::Big)] + #[case(1usize, 3, Endianness::Little)] + fn u64_to_bytes_unsupported( + #[case] value: usize, + #[case] size: usize, + #[case] endianness: Endianness, + ) { + let mut buf = [0u8; 8]; + let err = u64_to_bytes(value, size, endianness, &mut buf) + .expect_err("u64_to_bytes must fail for unsupported size"); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + } +} diff --git a/tests/response.rs b/tests/response.rs index d7417e64..2ad51051 100644 --- a/tests/response.rs +++ b/tests/response.rs @@ -131,7 +131,7 @@ async fn send_response_propagates_write_error() { let err = app .send_response(&mut writer, &TestResp(3)) .await - .expect_err("expected error"); + .expect_err("send_response should propagate write error"); assert!(matches!(err, wireframe::app::SendError::Io(_))); } @@ -145,7 +145,7 @@ fn encode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: En let mut buf = BytesMut::new(); let err = processor .encode(&vec![1, 2], &mut buf) - .expect_err("expected error"); + .expect_err("encode must fail for unsupported prefix size"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -157,7 +157,9 @@ fn decode_fails_for_invalid_prefix_size(#[case] bytes: usize, #[case] endian: En let fmt = LengthFormat::new(bytes, endian); let processor = LengthPrefixedProcessor::new(fmt); let mut buf = BytesMut::from(vec![0u8; bytes].as_slice()); - let err = processor.decode(&mut buf).expect_err("expected error"); + let err = processor + .decode(&mut buf) + .expect_err("decode must fail for unsupported prefix size"); assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); } @@ -184,6 +186,6 @@ async fn send_response_returns_encode_error() { let err = app .send_response(&mut Vec::new(), &FailingResp) .await - .expect_err("expected error"); + .expect_err("send_response should fail when encode errors"); assert!(matches!(err, wireframe::app::SendError::Serialize(_))); }