Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ impl TraceExporter {

match self.input_format {
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_slice(data) {
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_bytes(data) {
Comment thread
anais-raison marked this conversation as resolved.
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_slice(data) {
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_bytes(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
Expand Down
19 changes: 19 additions & 0 deletions tinybytes/src/bytes_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ impl BytesString {
}
}

/// Creates a `Option<BytesString>` from a string slice within the given buffer.
///
/// # Arguments
///
/// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`.
/// * `slice` - The string slice pointing into the given bytes that will form the `BytesString`.
Comment thread
anais-raison marked this conversation as resolved.
///
/// # Return
///
/// Returns `None` if `slice` is not pointing into `bytes`.
pub fn try_from_bytes_slice(bytes: &Bytes, slice: &str) -> Option<Self> {
// SAFETY: This is safe as a str slice is definitely a valid UTF-8 slice.
unsafe {
Some(Self::from_bytes_unchecked(
bytes.slice_ref(slice.as_bytes())?,
))
}
}

/// Creates a `BytesString` from a `tinybytes::Bytes` instance without validating the bytes.
///
/// This function does not perform any validation on the provided bytes, and assumes that the
Expand Down
4 changes: 2 additions & 2 deletions tinybytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Bytes {
///
/// let slice = bytes.slice(6..11);
/// assert_eq!(slice.as_ref(), b"world");
/// ```
/// ```
pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
use std::ops::Bound;

Expand Down Expand Up @@ -151,7 +151,7 @@ impl Bytes {
///
/// let invalid_subset = b"invalid";
/// assert!(bytes.slice_ref(invalid_subset).is_none());
/// ```
/// ```
pub fn slice_ref(&self, subset: &[u8]) -> Option<Bytes> {
// An empty slice can be a subset of any slice.
if subset.is_empty() {
Expand Down
8 changes: 7 additions & 1 deletion trace-utils/src/msgpack_decoder/decode/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

/// Represent error that can happen while decoding msgpack.
#[derive(Debug, PartialEq)]
pub enum DecodeError {
/// Failed to convert a number to the expected type.
InvalidConversion(String),
/// Payload does not match the expected type for a trace payload.
InvalidType(String),
/// Payload is not a valid msgpack object.
InvalidFormat(String),
/// Failed to read the buffer.
IOError,
/// The payload contains non-utf8 strings.
Utf8Error(String),
}

impl std::fmt::Display for DecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DecodeError::InvalidConversion(msg) => write!(f, "Failed to convert value: {}", msg),
DecodeError::IOError => write!(f, "Failed to read from buffer"),
DecodeError::InvalidType(msg) => write!(f, "Invalid type encountered: {}", msg),
DecodeError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg),
DecodeError::IOError => write!(f, "Failed to read from buffer"),
DecodeError::Utf8Error(msg) => write!(f, "Failed to read utf8 value: {}", msg),
}
}
Expand Down
9 changes: 4 additions & 5 deletions trace-utils/src/msgpack_decoder/decode/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::{decode, decode::RmpRead, Marker};
use std::collections::HashMap;
use tinybytes::Bytes;

/// Reads a map from the buffer and returns it as a `HashMap`.
///
Expand All @@ -14,7 +13,7 @@ use tinybytes::Bytes;
/// # Arguments
///
/// * `len` - The number of key-value pairs to read from the buffer.
/// * `buf` - A reference to the Bytes containing the encoded map data.
/// * `buf` - A reference to the slice containing the encoded map data.
/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a
/// `Result<(K, V), DecodeError>`.
///
Expand All @@ -34,14 +33,14 @@ use tinybytes::Bytes;
/// * `V` - The type of the values in the map.
/// * `F` - The type of the function used to read key-value pairs from the buffer.
#[inline]
pub fn read_map<K, V, F>(
pub fn read_map<'a, K, V, F>(
len: usize,
buf: &mut Bytes,
buf: &mut &'a [u8],
read_pair: F,
) -> Result<HashMap<K, V>, DecodeError>
where
K: std::hash::Hash + Eq,
F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>,
F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>,
{
let mut map = HashMap::with_capacity(len);
for _ in 0..len {
Expand Down
45 changes: 24 additions & 21 deletions trace-utils/src/msgpack_decoder/decode/meta_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes};
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref};
use rmp::decode;
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};
use tinybytes::Bytes;

fn read_byte_array_len(buf: &mut &[u8]) -> Result<u32, DecodeError> {
decode::read_bin_len(buf).map_err(|_| {
Expand All @@ -15,27 +15,28 @@ fn read_byte_array_len(buf: &mut &[u8]) -> Result<u32, DecodeError> {
}

#[inline]
pub fn read_meta_struct(buf: &mut Bytes) -> Result<HashMap<BytesString, Bytes>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result<HashMap<&'a str, Bytes>, DecodeError> {
Comment thread
anais-raison marked this conversation as resolved.
if handle_null_marker(buf) {
return Ok(HashMap::default());
}

fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Bytes), DecodeError> {
let key = read_string_bytes(buf)?;
let byte_array_len = read_byte_array_len(unsafe { buf.as_mut_slice() })? as usize;
fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Bytes), DecodeError> {
let key = read_string_ref(buf)?;
let byte_array_len = read_byte_array_len(buf)? as usize;

let data = buf
.slice_ref(&buf[0..byte_array_len])
.ok_or_else(|| DecodeError::InvalidFormat("Invalid data length".to_string()))?;
unsafe {
// SAFETY: forwarding the buffer requires that buf is borrowed from static.
*buf.as_mut_slice() = &buf.as_mut_slice()[byte_array_len..];
let slice = buf.get(0..byte_array_len);
if let Some(slice) = slice {
let data = Bytes::copy_from_slice(slice);
*buf = &buf[byte_array_len..];
Ok((key, data))
} else {
Err(DecodeError::InvalidFormat(
"Invalid data length".to_string(),
))
}

Ok((key, data))
}

let len = read_map_len(unsafe { buf.as_mut_slice() })?;
let len = read_map_len(buf)?;
read_map(len, buf, read_meta_struct_pair)
}

Expand All @@ -47,8 +48,9 @@ mod tests {
fn read_meta_test() {
let meta = HashMap::from([("key".to_string(), Bytes::from(vec![1, 2, 3, 4]))]);

let mut bytes = Bytes::from(rmp_serde::to_vec_named(&meta).unwrap());
let res = read_meta_struct(&mut bytes).unwrap();
let serialized = rmp_serde::to_vec_named(&meta).unwrap();
let mut slice = serialized.as_ref();
let res = read_meta_struct(&mut slice).unwrap();

assert_eq!(res.get("key").unwrap().to_vec(), vec![1, 2, 3, 4]);
}
Expand All @@ -57,8 +59,9 @@ mod tests {
fn read_meta_wrong_family_test() {
let meta = HashMap::from([("key".to_string(), vec![1, 2, 3, 4])]);

let mut bytes = Bytes::from(rmp_serde::to_vec_named(&meta).unwrap());
let res = read_meta_struct(&mut bytes);
let serialized = rmp_serde::to_vec_named(&meta).unwrap();
let mut slice = serialized.as_ref();
let res = read_meta_struct(&mut slice);

assert!(res.is_err());
matches!(res.unwrap_err(), DecodeError::InvalidFormat(_));
Expand Down
19 changes: 9 additions & 10 deletions trace-utils/src/msgpack_decoder/decode/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes};
use crate::msgpack_decoder::decode::number::read_number_slice;
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref};
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

#[inline]
pub fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> {
let key = read_string_bytes(buf)?;
let v = read_number_bytes(buf)?;
pub fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> {
let key = read_string_ref(buf)?;
let v = read_number_slice(buf)?;

Ok((key, v))
}
#[inline]
pub fn read_metrics(buf: &mut Bytes) -> Result<HashMap<BytesString, f64>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
pub fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result<HashMap<&'a str, f64>, DecodeError> {
if handle_null_marker(buf) {
return Ok(HashMap::default());
}

let len = read_map_len(unsafe { buf.as_mut_slice() })?;
let len = read_map_len(buf)?;

read_map(len, buf, read_metric_pair)
}
57 changes: 29 additions & 28 deletions trace-utils/src/msgpack_decoder/decode/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::{decode::RmpRead, Marker};
use std::fmt;
use tinybytes::Bytes;

#[derive(Debug, PartialEq)]
pub enum Number {
Expand Down Expand Up @@ -198,16 +197,18 @@ fn read_number(buf: &mut &[u8], allow_null: bool) -> Result<Number, DecodeError>
}
}

pub fn read_number_bytes<T: TryFrom<Number, Error = DecodeError>>(
buf: &mut Bytes,
/// Read a msgpack encoded number from `buf`.
pub fn read_number_slice<T: TryFrom<Number, Error = DecodeError>>(
buf: &mut &[u8],
) -> Result<T, DecodeError> {
read_number(unsafe { buf.as_mut_slice() }, false)?.try_into()
read_number(buf, false)?.try_into()
}

pub fn read_nullable_number_bytes<T: TryFrom<Number, Error = DecodeError>>(
buf: &mut Bytes,
/// Read a msgpack encoded number from `buf` and return 0 if null.
pub fn read_nullable_number_slice<T: TryFrom<Number, Error = DecodeError>>(
buf: &mut &[u8],
) -> Result<T, DecodeError> {
read_number(unsafe { buf.as_mut_slice() }, true)?.try_into()
read_number(buf, true)?.try_into()
}

#[cfg(test)]
Expand All @@ -217,45 +218,45 @@ mod tests {
use std::f64;

#[test]
fn test_decoding_not_nullable_bytes_to_unsigned() {
fn test_decoding_not_nullable_slice_to_unsigned() {
let mut buf = Vec::new();
let expected_value = 42;
let val = json!(expected_value);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: u8 = read_number_bytes(&mut bytes).unwrap();
let mut slice = buf.as_slice();
let result: u8 = read_number_slice(&mut slice).unwrap();
assert_eq!(result, expected_value);
}

#[test]
fn test_decoding_not_nullable_bytes_to_signed() {
fn test_decoding_not_nullable_slice_to_signed() {
let mut buf = Vec::new();
let expected_value = 42;
let val = json!(expected_value);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: i8 = read_number_bytes(&mut bytes).unwrap();
let mut slice = buf.as_slice();
let result: i8 = read_number_slice(&mut slice).unwrap();
assert_eq!(result, expected_value);
}

#[test]
fn test_decoding_not_nullable_bytes_to_float() {
fn test_decoding_not_nullable_slice_to_float() {
let mut buf = Vec::new();
let expected_value = 42.98;
let val = json!(expected_value);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: f64 = read_number_bytes(&mut bytes).unwrap();
let mut slice = buf.as_slice();
let result: f64 = read_number_slice(&mut slice).unwrap();
assert_eq!(result, expected_value);
}

#[test]
fn test_decoding_null_through_read_number_bytes_raises_exception() {
fn test_decoding_null_through_read_number_slice_raises_exception() {
let mut buf = Vec::new();
let val = json!(null);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: Result<u8, DecodeError> = read_number_bytes(&mut bytes);
let mut slice = buf.as_slice();
let result: Result<u8, DecodeError> = read_number_slice(&mut slice);
assert!(matches!(result, Err(DecodeError::InvalidType(_))));

assert_eq!(
Expand All @@ -265,32 +266,32 @@ mod tests {
}

#[test]
fn test_decoding_null_bytes_to_unsigned() {
fn test_decoding_null_slice_to_unsigned() {
let mut buf = Vec::new();
let val = json!(null);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: u8 = read_nullable_number_bytes(&mut bytes).unwrap();
let mut slice = buf.as_slice();
let result: u8 = read_nullable_number_slice(&mut slice).unwrap();
assert_eq!(result, 0);
}

#[test]
fn test_decoding_null_bytes_to_signed() {
fn test_decoding_null_slice_to_signed() {
let mut buf = Vec::new();
let val = json!(null);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: i8 = read_nullable_number_bytes(&mut bytes).unwrap();
let mut slice = buf.as_slice();
let result: i8 = read_nullable_number_slice(&mut slice).unwrap();
assert_eq!(result, 0);
}

#[test]
fn test_decoding_null_bytes_to_float() {
fn test_decoding_null_slice_to_float() {
let mut buf = Vec::new();
let val = json!(null);
rmp_serde::encode::write_named(&mut buf, &val).unwrap();
let mut bytes = Bytes::from(buf.clone());
let result: f64 = read_nullable_number_bytes(&mut bytes).unwrap();
let mut slice = buf.as_slice();
let result: f64 = read_nullable_number_slice(&mut slice).unwrap();
assert_eq!(result, 0.0);
}

Expand Down
Loading
Loading