diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index d5ce8013d5..f5f5c6a8b7 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -229,11 +229,11 @@ impl TraceExporter { match self.input_format { TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count), - TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_slice(data) { + TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_bytes(data) { Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)), Err(e) => Err(TraceExporterError::Deserialization(e)), }, - TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_slice(data) { + TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_bytes(data) { Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)), Err(e) => Err(TraceExporterError::Deserialization(e)), }, diff --git a/tinybytes/src/bytes_string.rs b/tinybytes/src/bytes_string.rs index 6a46060b0e..a4a0b1edaf 100644 --- a/tinybytes/src/bytes_string.rs +++ b/tinybytes/src/bytes_string.rs @@ -83,6 +83,25 @@ impl BytesString { } } + /// Creates a `Option` from a string slice within the given buffer. + /// + /// # Arguments + /// + /// * `bytes` - A `tinybytes::Bytes` instance that will be converted into a `BytesString`. + /// * `slice` - The string slice pointing into the given bytes that will form the `BytesString`. + /// + /// # Return + /// + /// Returns `None` if `slice` is not pointing into `bytes`. + pub fn try_from_bytes_slice(bytes: &Bytes, slice: &str) -> Option { + // SAFETY: This is safe as a str slice is definitely a valid UTF-8 slice. + unsafe { + Some(Self::from_bytes_unchecked( + bytes.slice_ref(slice.as_bytes())?, + )) + } + } + /// Creates a `BytesString` from a `tinybytes::Bytes` instance without validating the bytes. /// /// This function does not perform any validation on the provided bytes, and assumes that the diff --git a/tinybytes/src/lib.rs b/tinybytes/src/lib.rs index ec287d8a15..fa5a9add35 100644 --- a/tinybytes/src/lib.rs +++ b/tinybytes/src/lib.rs @@ -86,7 +86,7 @@ impl Bytes { /// /// let slice = bytes.slice(6..11); /// assert_eq!(slice.as_ref(), b"world"); - /// ``` + /// ``` pub fn slice(&self, range: impl RangeBounds) -> Self { use std::ops::Bound; @@ -151,7 +151,7 @@ impl Bytes { /// /// let invalid_subset = b"invalid"; /// assert!(bytes.slice_ref(invalid_subset).is_none()); - /// ``` + /// ``` pub fn slice_ref(&self, subset: &[u8]) -> Option { // An empty slice can be a subset of any slice. if subset.is_empty() { diff --git a/trace-utils/src/msgpack_decoder/decode/error.rs b/trace-utils/src/msgpack_decoder/decode/error.rs index ff74819f57..46171471de 100644 --- a/trace-utils/src/msgpack_decoder/decode/error.rs +++ b/trace-utils/src/msgpack_decoder/decode/error.rs @@ -1,12 +1,18 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +/// Represent error that can happen while decoding msgpack. #[derive(Debug, PartialEq)] pub enum DecodeError { + /// Failed to convert a number to the expected type. InvalidConversion(String), + /// Payload does not match the expected type for a trace payload. InvalidType(String), + /// Payload is not a valid msgpack object. InvalidFormat(String), + /// Failed to read the buffer. IOError, + /// The payload contains non-utf8 strings. Utf8Error(String), } @@ -14,9 +20,9 @@ impl std::fmt::Display for DecodeError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { DecodeError::InvalidConversion(msg) => write!(f, "Failed to convert value: {}", msg), - DecodeError::IOError => write!(f, "Failed to read from buffer"), DecodeError::InvalidType(msg) => write!(f, "Invalid type encountered: {}", msg), DecodeError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg), + DecodeError::IOError => write!(f, "Failed to read from buffer"), DecodeError::Utf8Error(msg) => write!(f, "Failed to read utf8 value: {}", msg), } } diff --git a/trace-utils/src/msgpack_decoder/decode/map.rs b/trace-utils/src/msgpack_decoder/decode/map.rs index ed6874ca2e..c6dd22fb1d 100644 --- a/trace-utils/src/msgpack_decoder/decode/map.rs +++ b/trace-utils/src/msgpack_decoder/decode/map.rs @@ -4,7 +4,6 @@ use crate::msgpack_decoder::decode::error::DecodeError; use rmp::{decode, decode::RmpRead, Marker}; use std::collections::HashMap; -use tinybytes::Bytes; /// Reads a map from the buffer and returns it as a `HashMap`. /// @@ -14,7 +13,7 @@ use tinybytes::Bytes; /// # Arguments /// /// * `len` - The number of key-value pairs to read from the buffer. -/// * `buf` - A reference to the Bytes containing the encoded map data. +/// * `buf` - A reference to the slice containing the encoded map data. /// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a /// `Result<(K, V), DecodeError>`. /// @@ -34,14 +33,14 @@ use tinybytes::Bytes; /// * `V` - The type of the values in the map. /// * `F` - The type of the function used to read key-value pairs from the buffer. #[inline] -pub fn read_map( +pub fn read_map<'a, K, V, F>( len: usize, - buf: &mut Bytes, + buf: &mut &'a [u8], read_pair: F, ) -> Result, DecodeError> where K: std::hash::Hash + Eq, - F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>, + F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>, { let mut map = HashMap::with_capacity(len); for _ in 0..len { diff --git a/trace-utils/src/msgpack_decoder/decode/meta_struct.rs b/trace-utils/src/msgpack_decoder/decode/meta_struct.rs index fb0f4fbd10..145200877c 100644 --- a/trace-utils/src/msgpack_decoder/decode/meta_struct.rs +++ b/trace-utils/src/msgpack_decoder/decode/meta_struct.rs @@ -3,10 +3,10 @@ use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes}; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; use rmp::decode; use std::collections::HashMap; -use tinybytes::{Bytes, BytesString}; +use tinybytes::Bytes; fn read_byte_array_len(buf: &mut &[u8]) -> Result { decode::read_bin_len(buf).map_err(|_| { @@ -15,27 +15,28 @@ fn read_byte_array_len(buf: &mut &[u8]) -> Result { } #[inline] -pub fn read_meta_struct(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); +pub fn read_meta_struct<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + if handle_null_marker(buf) { + return Ok(HashMap::default()); } - fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Bytes), DecodeError> { - let key = read_string_bytes(buf)?; - let byte_array_len = read_byte_array_len(unsafe { buf.as_mut_slice() })? as usize; + fn read_meta_struct_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, Bytes), DecodeError> { + let key = read_string_ref(buf)?; + let byte_array_len = read_byte_array_len(buf)? as usize; - let data = buf - .slice_ref(&buf[0..byte_array_len]) - .ok_or_else(|| DecodeError::InvalidFormat("Invalid data length".to_string()))?; - unsafe { - // SAFETY: forwarding the buffer requires that buf is borrowed from static. - *buf.as_mut_slice() = &buf.as_mut_slice()[byte_array_len..]; + let slice = buf.get(0..byte_array_len); + if let Some(slice) = slice { + let data = Bytes::copy_from_slice(slice); + *buf = &buf[byte_array_len..]; + Ok((key, data)) + } else { + Err(DecodeError::InvalidFormat( + "Invalid data length".to_string(), + )) } - - Ok((key, data)) } - let len = read_map_len(unsafe { buf.as_mut_slice() })?; + let len = read_map_len(buf)?; read_map(len, buf, read_meta_struct_pair) } @@ -47,8 +48,9 @@ mod tests { fn read_meta_test() { let meta = HashMap::from([("key".to_string(), Bytes::from(vec![1, 2, 3, 4]))]); - let mut bytes = Bytes::from(rmp_serde::to_vec_named(&meta).unwrap()); - let res = read_meta_struct(&mut bytes).unwrap(); + let serialized = rmp_serde::to_vec_named(&meta).unwrap(); + let mut slice = serialized.as_ref(); + let res = read_meta_struct(&mut slice).unwrap(); assert_eq!(res.get("key").unwrap().to_vec(), vec![1, 2, 3, 4]); } @@ -57,8 +59,9 @@ mod tests { fn read_meta_wrong_family_test() { let meta = HashMap::from([("key".to_string(), vec![1, 2, 3, 4])]); - let mut bytes = Bytes::from(rmp_serde::to_vec_named(&meta).unwrap()); - let res = read_meta_struct(&mut bytes); + let serialized = rmp_serde::to_vec_named(&meta).unwrap(); + let mut slice = serialized.as_ref(); + let res = read_meta_struct(&mut slice); assert!(res.is_err()); matches!(res.unwrap_err(), DecodeError::InvalidFormat(_)); diff --git a/trace-utils/src/msgpack_decoder/decode/metrics.rs b/trace-utils/src/msgpack_decoder/decode/metrics.rs index 477d1bd394..5b84ea16be 100644 --- a/trace-utils/src/msgpack_decoder/decode/metrics.rs +++ b/trace-utils/src/msgpack_decoder/decode/metrics.rs @@ -3,25 +3,24 @@ use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::map::{read_map, read_map_len}; -use crate::msgpack_decoder::decode::number::read_number_bytes; -use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes}; +use crate::msgpack_decoder::decode::number::read_number_slice; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; use std::collections::HashMap; -use tinybytes::{Bytes, BytesString}; #[inline] -pub fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> { - let key = read_string_bytes(buf)?; - let v = read_number_bytes(buf)?; +pub fn read_metric_pair<'a>(buf: &mut &'a [u8]) -> Result<(&'a str, f64), DecodeError> { + let key = read_string_ref(buf)?; + let v = read_number_slice(buf)?; Ok((key, v)) } #[inline] -pub fn read_metrics(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); +pub fn read_metrics<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + if handle_null_marker(buf) { + return Ok(HashMap::default()); } - let len = read_map_len(unsafe { buf.as_mut_slice() })?; + let len = read_map_len(buf)?; read_map(len, buf, read_metric_pair) } diff --git a/trace-utils/src/msgpack_decoder/decode/number.rs b/trace-utils/src/msgpack_decoder/decode/number.rs index e33f47d6c7..84e30ac8b5 100644 --- a/trace-utils/src/msgpack_decoder/decode/number.rs +++ b/trace-utils/src/msgpack_decoder/decode/number.rs @@ -4,7 +4,6 @@ use crate::msgpack_decoder::decode::error::DecodeError; use rmp::{decode::RmpRead, Marker}; use std::fmt; -use tinybytes::Bytes; #[derive(Debug, PartialEq)] pub enum Number { @@ -198,16 +197,18 @@ fn read_number(buf: &mut &[u8], allow_null: bool) -> Result } } -pub fn read_number_bytes>( - buf: &mut Bytes, +/// Read a msgpack encoded number from `buf`. +pub fn read_number_slice>( + buf: &mut &[u8], ) -> Result { - read_number(unsafe { buf.as_mut_slice() }, false)?.try_into() + read_number(buf, false)?.try_into() } -pub fn read_nullable_number_bytes>( - buf: &mut Bytes, +/// Read a msgpack encoded number from `buf` and return 0 if null. +pub fn read_nullable_number_slice>( + buf: &mut &[u8], ) -> Result { - read_number(unsafe { buf.as_mut_slice() }, true)?.try_into() + read_number(buf, true)?.try_into() } #[cfg(test)] @@ -217,45 +218,45 @@ mod tests { use std::f64; #[test] - fn test_decoding_not_nullable_bytes_to_unsigned() { + fn test_decoding_not_nullable_slice_to_unsigned() { let mut buf = Vec::new(); let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: u8 = read_number_bytes(&mut bytes).unwrap(); + let mut slice = buf.as_slice(); + let result: u8 = read_number_slice(&mut slice).unwrap(); assert_eq!(result, expected_value); } #[test] - fn test_decoding_not_nullable_bytes_to_signed() { + fn test_decoding_not_nullable_slice_to_signed() { let mut buf = Vec::new(); let expected_value = 42; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: i8 = read_number_bytes(&mut bytes).unwrap(); + let mut slice = buf.as_slice(); + let result: i8 = read_number_slice(&mut slice).unwrap(); assert_eq!(result, expected_value); } #[test] - fn test_decoding_not_nullable_bytes_to_float() { + fn test_decoding_not_nullable_slice_to_float() { let mut buf = Vec::new(); let expected_value = 42.98; let val = json!(expected_value); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: f64 = read_number_bytes(&mut bytes).unwrap(); + let mut slice = buf.as_slice(); + let result: f64 = read_number_slice(&mut slice).unwrap(); assert_eq!(result, expected_value); } #[test] - fn test_decoding_null_through_read_number_bytes_raises_exception() { + fn test_decoding_null_through_read_number_slice_raises_exception() { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: Result = read_number_bytes(&mut bytes); + let mut slice = buf.as_slice(); + let result: Result = read_number_slice(&mut slice); assert!(matches!(result, Err(DecodeError::InvalidType(_)))); assert_eq!( @@ -265,32 +266,32 @@ mod tests { } #[test] - fn test_decoding_null_bytes_to_unsigned() { + fn test_decoding_null_slice_to_unsigned() { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: u8 = read_nullable_number_bytes(&mut bytes).unwrap(); + let mut slice = buf.as_slice(); + let result: u8 = read_nullable_number_slice(&mut slice).unwrap(); assert_eq!(result, 0); } #[test] - fn test_decoding_null_bytes_to_signed() { + fn test_decoding_null_slice_to_signed() { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: i8 = read_nullable_number_bytes(&mut bytes).unwrap(); + let mut slice = buf.as_slice(); + let result: i8 = read_nullable_number_slice(&mut slice).unwrap(); assert_eq!(result, 0); } #[test] - fn test_decoding_null_bytes_to_float() { + fn test_decoding_null_slice_to_float() { let mut buf = Vec::new(); let val = json!(null); rmp_serde::encode::write_named(&mut buf, &val).unwrap(); - let mut bytes = Bytes::from(buf.clone()); - let result: f64 = read_nullable_number_bytes(&mut bytes).unwrap(); + let mut slice = buf.as_slice(); + let result: f64 = read_nullable_number_slice(&mut slice).unwrap(); assert_eq!(result, 0.0); } diff --git a/trace-utils/src/msgpack_decoder/decode/span_event.rs b/trace-utils/src/msgpack_decoder/decode/span_event.rs index 33d0f5c6cf..7f2882282e 100644 --- a/trace-utils/src/msgpack_decoder/decode/span_event.rs +++ b/trace-utils/src/msgpack_decoder/decode/span_event.rs @@ -2,15 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_number_bytes; -use crate::msgpack_decoder::decode::string::{ - handle_null_marker, read_string_bytes, read_string_ref, -}; -use crate::span::{AttributeAnyValueBytes, AttributeArrayValueBytes, SpanEventBytes}; -use rmp::decode::ValueReadError; +use crate::msgpack_decoder::decode::number::read_number_slice; +use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_ref}; +use crate::span::{AttributeAnyValueSlice, AttributeArrayValueSlice, SpanEventSlice}; use std::collections::HashMap; use std::str::FromStr; -use tinybytes::{Bytes, BytesString}; /// Reads a slice of bytes and decodes it into a vector of `SpanEvent` objects. /// @@ -28,16 +24,18 @@ use tinybytes::{Bytes, BytesString}; /// This function will return an error if: /// - The marker for the array length cannot be read. /// - Any `SpanEvent` cannot be decoded. -pub(crate) fn read_span_events(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_vec) = handle_null_marker(buf, Vec::default) { - return Ok(empty_vec); +pub(crate) fn read_span_events<'a>( + buf: &mut &'a [u8], +) -> Result>, DecodeError> { + if handle_null_marker(buf) { + return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { + let len = rmp::decode::read_array_len(buf).map_err(|_| { DecodeError::InvalidType("Unable to get array len for span events".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec = Vec::with_capacity(len as usize); for _ in 0..len { vec.push(decode_span_event(buf)?); } @@ -65,15 +63,15 @@ impl FromStr for SpanEventKey { } } -fn decode_span_event(buf: &mut Bytes) -> Result { - let mut event = SpanEventBytes::default(); - let event_size = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) +fn decode_span_event<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + let mut event = SpanEventSlice::default(); + let event_size = rmp::decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidType("Unable to get map len for event size".to_owned()))?; for _ in 0..event_size { - match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { - SpanEventKey::TimeUnixNano => event.time_unix_nano = read_number_bytes(buf)?, - SpanEventKey::Name => event.name = read_string_bytes(buf)?, + match read_string_ref(buf)?.parse::()? { + SpanEventKey::TimeUnixNano => event.time_unix_nano = read_number_slice(buf)?, + SpanEventKey::Name => event.name = read_string_ref(buf)?, SpanEventKey::Attributes => event.attributes = read_attributes_map(buf)?, } } @@ -81,16 +79,16 @@ fn decode_span_event(buf: &mut Bytes) -> Result { Ok(event) } -fn read_attributes_map( - buf: &mut Bytes, -) -> Result, DecodeError> { - let len = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) +fn read_attributes_map<'a>( + buf: &mut &'a [u8], +) -> Result>, DecodeError> { + let len = rmp::decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidType("Unable to get map len for attributes".to_owned()))?; #[allow(clippy::expect_used)] let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_bytes(buf)?; + let key = read_string_ref(buf)?; let value = decode_attribute_any(buf)?; map.insert(key, value); } @@ -123,16 +121,11 @@ impl FromStr for AttributeAnyKey { } } -pub fn read_boolean_bytes(buf: &mut Bytes) -> Result { - rmp::decode::read_bool(unsafe { buf.as_mut_slice() }) -} - -fn decode_attribute_any(buf: &mut Bytes) -> Result { - let mut attribute: Option = None; - let attribute_size = - rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) - })?; +fn decode_attribute_any<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + let mut attribute: Option = None; + let attribute_size = rmp::decode::read_map_len(buf).map_err(|_| { + DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) + })?; if attribute_size != 2 { return Err(DecodeError::InvalidFormat( @@ -142,15 +135,15 @@ fn decode_attribute_any(buf: &mut Bytes) -> Result = None; for _ in 0..attribute_size { - match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { - AttributeAnyKey::Type => attribute_type = Some(read_number_bytes(buf)?), + match read_string_ref(buf)?.parse::()? { + AttributeAnyKey::Type => attribute_type = Some(read_number_slice(buf)?), AttributeAnyKey::SingleValue(key) => { - attribute = Some(AttributeAnyValueBytes::SingleValue(get_attribute_from_key( + attribute = Some(AttributeAnyValueSlice::SingleValue(get_attribute_from_key( buf, key, )?)) } AttributeAnyKey::ArrayValue => { - attribute = Some(AttributeAnyValueBytes::Array(read_attributes_array(buf)?)) + attribute = Some(AttributeAnyValueSlice::Array(read_attributes_array(buf)?)) } } } @@ -175,16 +168,18 @@ fn decode_attribute_any(buf: &mut Bytes) -> Result Result, DecodeError> { - if let Some(empty_vec) = handle_null_marker(buf, Vec::default) { - return Ok(empty_vec); +fn read_attributes_array<'a>( + buf: &mut &'a [u8], +) -> Result>, DecodeError> { + if handle_null_marker(buf) { + return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { + let len = rmp::decode::read_array_len(buf).map_err(|_| { DecodeError::InvalidType("Unable to get array len for event attributes".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec = Vec::with_capacity(len as usize); if len > 0 { let first = decode_attribute_array(buf, None)?; let array_type = (&first).into(); @@ -222,44 +217,43 @@ impl FromStr for AttributeArrayKey { } } -fn get_attribute_from_key( - buf: &mut Bytes, +fn get_attribute_from_key<'a>( + buf: &mut &'a [u8], key: AttributeArrayKey, -) -> Result { +) -> Result, DecodeError> { match key { AttributeArrayKey::StringValue => { - Ok(AttributeArrayValueBytes::String(read_string_bytes(buf)?)) + Ok(AttributeArrayValueSlice::String(read_string_ref(buf)?)) } AttributeArrayKey::BoolValue => { - let boolean = read_boolean_bytes(buf); + let boolean = rmp::decode::read_bool(buf); if let Ok(value) = boolean { match value { - true => Ok(AttributeArrayValueBytes::Boolean(true)), - false => Ok(AttributeArrayValueBytes::Boolean(false)), + true => Ok(AttributeArrayValueSlice::Boolean(true)), + false => Ok(AttributeArrayValueSlice::Boolean(false)), } } else { Err(DecodeError::InvalidType("Invalid boolean field".to_owned())) } } AttributeArrayKey::IntValue => { - Ok(AttributeArrayValueBytes::Integer(read_number_bytes(buf)?)) + Ok(AttributeArrayValueSlice::Integer(read_number_slice(buf)?)) } AttributeArrayKey::DoubleValue => { - Ok(AttributeArrayValueBytes::Double(read_number_bytes(buf)?)) + Ok(AttributeArrayValueSlice::Double(read_number_slice(buf)?)) } _ => Err(DecodeError::InvalidFormat("Invalid attribute".to_owned())), } } -fn decode_attribute_array( - buf: &mut Bytes, +fn decode_attribute_array<'a>( + buf: &mut &'a [u8], array_type: Option, -) -> Result { - let mut attribute: Option = None; - let attribute_size = - rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) - })?; +) -> Result, DecodeError> { + let mut attribute: Option = None; + let attribute_size = rmp::decode::read_map_len(buf).map_err(|_| { + DecodeError::InvalidType("Unable to get map len for attribute size".to_owned()) + })?; if attribute_size != 2 { return Err(DecodeError::InvalidFormat( @@ -269,8 +263,8 @@ fn decode_attribute_array( let mut attribute_type: Option = None; for _ in 0..attribute_size { - match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { - AttributeArrayKey::Type => attribute_type = Some(read_number_bytes(buf)?), + match read_string_ref(buf)?.parse::()? { + AttributeArrayKey::Type => attribute_type = Some(read_number_slice(buf)?), key => attribute = Some(get_attribute_from_key(buf, key)?), } } diff --git a/trace-utils/src/msgpack_decoder/decode/span_link.rs b/trace-utils/src/msgpack_decoder/decode/span_link.rs index 51a7b8f6c8..95901fced8 100644 --- a/trace-utils/src/msgpack_decoder/decode/span_link.rs +++ b/trace-utils/src/msgpack_decoder/decode/span_link.rs @@ -2,13 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_number_bytes; +use crate::msgpack_decoder::decode::number::read_number_slice; use crate::msgpack_decoder::decode::string::{ - handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref, + handle_null_marker, read_str_map_to_strings, read_string_ref, }; -use crate::span::SpanLinkBytes; +use crate::span::SpanLinkSlice; use std::str::FromStr; -use tinybytes::Bytes; /// Reads a slice of bytes and decodes it into a vector of `SpanLink` objects. /// @@ -27,16 +26,18 @@ use tinybytes::Bytes; /// - The marker for the array length cannot be read. /// - Any `SpanLink` cannot be decoded. /// ``` -pub(crate) fn read_span_links(buf: &mut Bytes) -> Result, DecodeError> { - if let Some(empty_vec) = handle_null_marker(buf, Vec::default) { - return Ok(empty_vec); +pub(crate) fn read_span_links<'a>( + buf: &mut &'a [u8], +) -> Result>, DecodeError> { + if handle_null_marker(buf) { + return Ok(Vec::default()); } - let len = rmp::decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| { + let len = rmp::decode::read_array_len(buf).map_err(|_| { DecodeError::InvalidType("Unable to get array len for span links".to_owned()) })?; - let mut vec: Vec = Vec::with_capacity(len as usize); + let mut vec: Vec = Vec::with_capacity(len as usize); for _ in 0..len { vec.push(decode_span_link(buf)?); } @@ -70,19 +71,19 @@ impl FromStr for SpanLinkKey { } } -fn decode_span_link(buf: &mut Bytes) -> Result { - let mut span = SpanLinkBytes::default(); - let span_size = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) +fn decode_span_link<'a>(buf: &mut &'a [u8]) -> Result, DecodeError> { + let mut span = SpanLinkSlice::default(); + let span_size = rmp::decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidType("Unable to get map len for span size".to_owned()))?; for _ in 0..span_size { - match read_string_ref(unsafe { buf.as_mut_slice() })?.parse::()? { - SpanLinkKey::TraceId => span.trace_id = read_number_bytes(buf)?, - SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_bytes(buf)?, - SpanLinkKey::SpanId => span.span_id = read_number_bytes(buf)?, - SpanLinkKey::Attributes => span.attributes = read_str_map_to_bytes_strings(buf)?, - SpanLinkKey::Tracestate => span.tracestate = read_string_bytes(buf)?, - SpanLinkKey::Flags => span.flags = read_number_bytes(buf)?, + match read_string_ref(buf)?.parse::()? { + SpanLinkKey::TraceId => span.trace_id = read_number_slice(buf)?, + SpanLinkKey::TraceIdHigh => span.trace_id_high = read_number_slice(buf)?, + SpanLinkKey::SpanId => span.span_id = read_number_slice(buf)?, + SpanLinkKey::Attributes => span.attributes = read_str_map_to_strings(buf)?, + SpanLinkKey::Tracestate => span.tracestate = read_string_ref(buf)?, + SpanLinkKey::Flags => span.flags = read_number_slice(buf)?, } } diff --git a/trace-utils/src/msgpack_decoder/decode/string.rs b/trace-utils/src/msgpack_decoder/decode/string.rs index 5a08cd20bd..80ff193fea 100644 --- a/trace-utils/src/msgpack_decoder/decode/string.rs +++ b/trace-utils/src/msgpack_decoder/decode/string.rs @@ -5,11 +5,14 @@ use crate::msgpack_decoder::decode::error::DecodeError; use rmp::decode; use rmp::decode::DecodeStringError; use std::collections::HashMap; -use tinybytes::{Bytes, BytesString}; // https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192) const NULL_MARKER: &u8 = &0xc0; +/// Read a string from `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string. #[inline] pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { decode::read_str_from_slice(buf).map_err(|e| match e { @@ -23,6 +26,10 @@ pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> { }) } +/// Read a string from the slices `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string. #[inline] pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { read_string_ref_nomut(buf).map(|(str, newbuf)| { @@ -31,68 +38,68 @@ pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { }) } +/// Read a nullable string from the slices `buf`. +/// +/// # Errors +/// Fails if the buffer doesn't contain a valid utf8 msgpack string or a null marker. #[inline] -pub fn read_string_bytes(buf: &mut Bytes) -> Result { - // Note: we need to pass a &'static lifetime here, otherwise it'll complain - read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| { - let string = BytesString::from_bytes_slice(buf, str); - *unsafe { buf.as_mut_slice() } = newbuf; - string - }) -} - -#[inline] -pub fn read_nullable_string_bytes(buf: &mut Bytes) -> Result { - if let Some(empty_string) = handle_null_marker(buf, BytesString::default) { - Ok(empty_string) +pub fn read_nullable_string<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> { + if handle_null_marker(buf) { + Ok("") } else { - read_string_bytes(buf) + read_string_ref(buf) } } +/// Read a hashmap of (string, string) from the slices `buf`. +/// +/// # Errors +/// Fails if the buffer does not contain a valid map length prefix, +/// or if any key or value is not a valid utf8 msgpack string. #[inline] -// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the -// BytesStrings. -pub fn read_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - let len = decode::read_map_len(unsafe { buf.as_mut_slice() }) +pub fn read_str_map_to_strings<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + let len = decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; #[allow(clippy::expect_used)] let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize")); for _ in 0..len { - let key = read_string_bytes(buf)?; - let value = read_string_bytes(buf)?; + let key = read_string_ref(buf)?; + let value = read_string_ref(buf)?; map.insert(key, value); } Ok(map) } +/// Read a nullable hashmap of (string, string) from the slices `buf`. +/// +/// # Errors +/// Fails if the buffer does not contain a valid map length prefix, +/// or if any key or value is not a valid utf8 msgpack string. #[inline] -pub fn read_nullable_str_map_to_bytes_strings( - buf: &mut Bytes, -) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); +pub fn read_nullable_str_map_to_strings<'a>( + buf: &mut &'a [u8], +) -> Result, DecodeError> { + if handle_null_marker(buf) { + return Ok(HashMap::default()); } - read_str_map_to_bytes_strings(buf) + read_str_map_to_strings(buf) } -/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is -/// null and return the default value. If it is not null, you can continue to decode as expected. +/// Handle the null value by peeking if the next value is a null marker, and will only advance the +/// buffer if it is null. If it is not null, you can continue to decode as expected. +/// +/// # Returns +/// A boolean indicating whether the next value is null or not. #[inline] -pub fn handle_null_marker(buf: &mut Bytes, default: F) -> Option -where - F: FnOnce() -> T, -{ - let slice = unsafe { buf.as_mut_slice() }; - - if slice.first() == Some(NULL_MARKER) { - *slice = &slice[1..]; - Some(default()) +pub fn handle_null_marker(buf: &mut &[u8]) -> bool { + if buf.first() == Some(NULL_MARKER) { + *buf = &buf[1..]; + true } else { - None + false } } diff --git a/trace-utils/src/msgpack_decoder/v04/mod.rs b/trace-utils/src/msgpack_decoder/v04/mod.rs index 3aa19c5013..5687385b2c 100644 --- a/trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/trace-utils/src/msgpack_decoder/v04/mod.rs @@ -5,11 +5,10 @@ pub(crate) mod span; use self::span::decode_span; use crate::msgpack_decoder::decode::error::DecodeError; -use crate::span::SpanBytes; +use crate::span::{SpanBytes, SpanSlice}; -/// Decodes a slice of bytes into a vector of `TracerPayloadV04` objects. -/// -/// +/// Decodes a Bytes buffer into a `Vec>` object, also represented as a vector of +/// `TracerPayloadV04` objects. /// /// # Arguments /// @@ -31,7 +30,7 @@ use crate::span::SpanBytes; /// /// ``` /// use datadog_trace_protobuf::pb::Span; -/// use datadog_trace_utils::msgpack_decoder::v04::from_slice; +/// use datadog_trace_utils::msgpack_decoder::v04::from_bytes; /// use rmp_serde::to_vec_named; /// use tinybytes; /// @@ -42,18 +41,76 @@ use crate::span::SpanBytes; /// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); /// let (decoded_traces, _payload_size) = -/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// from_bytes(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name.as_str()); /// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { - let trace_count = - rmp::decode::read_array_len(unsafe { data.as_mut_slice() }).map_err(|_| { - DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) - })?; +pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { + let mut parsed_data = data.clone(); + let (traces_ref, size) = from_slice(unsafe { parsed_data.as_mut_slice() })?; + + #[allow(clippy::unwrap_used)] + let traces_owned = traces_ref + .iter() + .map(|trace| { + trace + .iter() + // Safe to unwrap since the spans use subslices of the `data` slice + .map(|span| span.try_to_bytes(&data).unwrap()) + .collect() + }) + .collect(); + Ok((traces_owned, size)) +} + +/// Decodes a slice of bytes into a `Vec>` object. +/// The resulting spans have the same lifetime as the initial buffer. +/// +/// # Arguments +/// +/// * `data` - A slice of bytes containing the encoded data. Bytes are expected to be encoded +/// msgpack data containing a list of a list of v04 spans. +/// +/// # Returns +/// +/// * `Ok(Vec)` - A vector of decoded `Vec` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The array length for trace count or span count cannot be read. +/// - Any span cannot be decoded. +/// +/// # Examples +/// +/// ``` +/// use datadog_trace_protobuf::pb::Span; +/// use datadog_trace_utils::msgpack_decoder::v04::from_slice; +/// use rmp_serde::to_vec_named; +/// use tinybytes; +/// +/// let span = Span { +/// name: "test-span".to_owned(), +/// ..Default::default() +/// }; +/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); +/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); +/// let (decoded_traces, _payload_size) = +/// from_slice(&encoded_data_as_tinybytes).expect("Decoding failed"); +/// +/// assert_eq!(1, decoded_traces.len()); +/// assert_eq!(1, decoded_traces[0].len()); +/// let decoded_span = &decoded_traces[0][0]; +/// assert_eq!("test-span", decoded_span.name); +/// ``` +pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { + let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) + })?; let start_len = data.len(); @@ -66,12 +123,9 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, us .expect("Unable to cast trace_count to usize"), ), |mut traces, _| { - let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) - .map_err(|_| { - DecodeError::InvalidFormat( - "Unable to read array len for span count".to_owned(), - ) - })?; + let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| { + DecodeError::InvalidFormat("Unable to read array len for span count".to_owned()) + })?; let trace = (0..span_count).try_fold( Vec::with_capacity( @@ -109,10 +163,8 @@ mod tests { #[test] fn test_empty_array() { let encoded_data = vec![0x90]; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - let (_decoded_traces, decoded_size) = from_slice(bytes).expect("Decoding failed"); + let slice = encoded_data.as_ref(); + let (_decoded_traces, decoded_size) = from_slice(slice).expect("Decoding failed"); assert_eq!(0, decoded_size); } @@ -127,7 +179,7 @@ mod tests { let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (_decoded_traces, decoded_size) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(expected_size, decoded_size); } @@ -142,7 +194,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -157,7 +209,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -171,7 +223,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -186,7 +238,7 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -201,7 +253,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -219,7 +271,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -243,7 +295,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -264,7 +316,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -289,7 +341,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -311,7 +363,7 @@ mod tests { span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -335,7 +387,7 @@ mod tests { span["metrics"] = json!(expected_metrics.clone()); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -355,7 +407,7 @@ mod tests { span["metrics"] = json!(null); let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -382,7 +434,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -429,7 +481,7 @@ mod tests { let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); let (decoded_traces, _) = - from_slice(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); + from_bytes(tinybytes::Bytes::from(encoded_data)).expect("Decoding failed"); assert_eq!(1, decoded_traces.len()); assert_eq!(1, decoded_traces[0].len()); @@ -447,11 +499,9 @@ mod tests { let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); // This changes the map size from 11 to 12 to trigger an InvalidMarkerRead error. encoded_data[2] = 0x8c; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); + let slice = encoded_data.as_ref(); - let result = from_slice(bytes); + let result = from_slice(slice); assert_eq!( Err(DecodeError::InvalidFormat( "Expected at least bytes 1, but only got 0 (pos 0)".to_owned() @@ -470,11 +520,9 @@ mod tests { ..Default::default() }; let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap(); - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); + let slice = encoded_data.as_ref(); - let result = from_slice(bytes); + let result = from_slice(slice); assert_eq!( Err(DecodeError::Utf8Error( "invalid utf-8 sequence of 1 bytes from index 1".to_owned() @@ -490,12 +538,9 @@ mod tests { // This changes the entire payload to a map with 12 keys in order to trigger an error when // reading the array len of traces encoded_data[0] = 0x8c; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); + let slice = encoded_data.as_ref(); + let result = from_slice(slice); assert_eq!( Err(DecodeError::InvalidFormat( "Unable to read array len for trace count".to_string() @@ -511,13 +556,9 @@ mod tests { // This changes the entire payload to a map with 12 keys in order to trigger an error when // reading the array len of spans encoded_data[1] = 0x8c; + let slice = encoded_data.as_ref(); - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); - + let result = from_slice(slice); assert_eq!( Err(DecodeError::InvalidFormat( "Unable to read array len for span count".to_owned() @@ -533,12 +574,9 @@ mod tests { // Modify the encoded data to cause a type mismatch by changing the marker for the `name` // field to an integer marker encoded_data[3] = 0x01; - let encoded_data = - unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(encoded_data.as_ref()) }; - let bytes = tinybytes::Bytes::from_static(encoded_data); - - let result = from_slice(bytes); + let slice = encoded_data.as_ref(); + let result = from_slice(slice); assert_eq!( Err(DecodeError::InvalidType( "Type mismatch at marker FixPos(1)".to_owned() @@ -549,7 +587,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - fn fuzz_from_slice() { + fn fuzz_from_bytes() { check!() .with_type::<( String, @@ -601,7 +639,7 @@ mod tests { ..Default::default() }; let encoded_data = to_vec_named(&vec![vec![span]]).unwrap(); - let result = from_slice(tinybytes::Bytes::from(encoded_data)); + let result = from_bytes(tinybytes::Bytes::from(encoded_data)); assert!(result.is_ok()); }, diff --git a/trace-utils/src/msgpack_decoder/v04/span.rs b/trace-utils/src/msgpack_decoder/v04/span.rs index 36b911f49e..d7f4e1feb6 100644 --- a/trace-utils/src/msgpack_decoder/v04/span.rs +++ b/trace-utils/src/msgpack_decoder/v04/span.rs @@ -2,15 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::msgpack_decoder::decode::error::DecodeError; -use crate::msgpack_decoder::decode::number::read_nullable_number_bytes; +use crate::msgpack_decoder::decode::number::read_nullable_number_slice; use crate::msgpack_decoder::decode::span_event::read_span_events; use crate::msgpack_decoder::decode::span_link::read_span_links; use crate::msgpack_decoder::decode::string::{ - read_nullable_str_map_to_bytes_strings, read_nullable_string_bytes, read_string_ref, + read_nullable_str_map_to_strings, read_nullable_string, read_string_ref, }; use crate::msgpack_decoder::decode::{meta_struct::read_meta_struct, metrics::read_metrics}; -use crate::span::{SpanBytes, SpanKey}; -use tinybytes::Bytes; +use crate::span::{SpanKey, SpanSlice}; /// Decodes a slice of bytes into a `Span` object. /// @@ -28,10 +27,10 @@ use tinybytes::Bytes; /// This function will return an error if: /// - The map length cannot be read. /// - Any key or value cannot be decoded. -pub fn decode_span(buffer: &mut Bytes) -> Result { - let mut span = SpanBytes::default(); +pub fn decode_span<'a>(buffer: &mut &'a [u8]) -> Result, DecodeError> { + let mut span = SpanSlice::default(); - let span_size = rmp::decode::read_map_len(unsafe { buffer.as_mut_slice() }).map_err(|_| { + let span_size = rmp::decode::read_map_len(buffer).map_err(|_| { DecodeError::InvalidFormat("Unable to get map len for span size".to_owned()) })?; @@ -44,23 +43,23 @@ pub fn decode_span(buffer: &mut Bytes) -> Result { // Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the // BytesStrings -fn fill_span(span: &mut SpanBytes, buf: &mut Bytes) -> Result<(), DecodeError> { - let key = read_string_ref(unsafe { buf.as_mut_slice() })? +fn fill_span<'a>(span: &mut SpanSlice<'a>, buf: &mut &'a [u8]) -> Result<(), DecodeError> { + let key = read_string_ref(buf)? .parse::() .map_err(|e| DecodeError::InvalidFormat(e.message))?; match key { - SpanKey::Service => span.service = read_nullable_string_bytes(buf)?, - SpanKey::Name => span.name = read_nullable_string_bytes(buf)?, - SpanKey::Resource => span.resource = read_nullable_string_bytes(buf)?, - SpanKey::TraceId => span.trace_id = read_nullable_number_bytes(buf)?, - SpanKey::SpanId => span.span_id = read_nullable_number_bytes(buf)?, - SpanKey::ParentId => span.parent_id = read_nullable_number_bytes(buf)?, - SpanKey::Start => span.start = read_nullable_number_bytes(buf)?, - SpanKey::Duration => span.duration = read_nullable_number_bytes(buf)?, - SpanKey::Error => span.error = read_nullable_number_bytes(buf)?, - SpanKey::Type => span.r#type = read_nullable_string_bytes(buf)?, - SpanKey::Meta => span.meta = read_nullable_str_map_to_bytes_strings(buf)?, + SpanKey::Service => span.service = read_nullable_string(buf)?, + SpanKey::Name => span.name = read_nullable_string(buf)?, + SpanKey::Resource => span.resource = read_nullable_string(buf)?, + SpanKey::TraceId => span.trace_id = read_nullable_number_slice(buf)?, + SpanKey::SpanId => span.span_id = read_nullable_number_slice(buf)?, + SpanKey::ParentId => span.parent_id = read_nullable_number_slice(buf)?, + SpanKey::Start => span.start = read_nullable_number_slice(buf)?, + SpanKey::Duration => span.duration = read_nullable_number_slice(buf)?, + SpanKey::Error => span.error = read_nullable_number_slice(buf)?, + SpanKey::Type => span.r#type = read_nullable_string(buf)?, + SpanKey::Meta => span.meta = read_nullable_str_map_to_strings(buf)?, SpanKey::Metrics => span.metrics = read_metrics(buf)?, SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?, SpanKey::SpanLinks => span.span_links = read_span_links(buf)?, diff --git a/trace-utils/src/msgpack_decoder/v05/mod.rs b/trace-utils/src/msgpack_decoder/v05/mod.rs index f6f18f8e37..5a32e3e0e8 100644 --- a/trace-utils/src/msgpack_decoder/v05/mod.rs +++ b/trace-utils/src/msgpack_decoder/v05/mod.rs @@ -4,18 +4,17 @@ use crate::msgpack_decoder::decode::error::DecodeError; use crate::msgpack_decoder::decode::{ map::read_map_len, - number::read_number_bytes, - string::{handle_null_marker, read_string_bytes}, + number::read_number_slice, + string::{handle_null_marker, read_string_ref}, }; -use crate::span::SpanBytes; +use crate::span::{SpanBytes, SpanSlice}; use std::collections::HashMap; const PAYLOAD_LEN: u32 = 2; const SPAN_ELEM_COUNT: u32 = 12; -/// Decodes a slice of bytes into a vector of `TracerPayloadV05` objects. -/// -/// +/// Decodes a Bytes buffer into a `Vec>` object, also represented as a vector of +/// `TracerPayloadV05` objects. /// /// # Arguments /// @@ -36,7 +35,7 @@ const SPAN_ELEM_COUNT: u32 = 12; /// # Examples /// /// ``` -/// use datadog_trace_utils::msgpack_decoder::v05::from_slice; +/// use datadog_trace_utils::msgpack_decoder::v05::from_bytes; /// use rmp_serde::to_vec; /// use std::collections::HashMap; /// use tinybytes; @@ -61,15 +60,87 @@ const SPAN_ELEM_COUNT: u32 = 12; /// let encoded_data = to_vec(&data).unwrap(); /// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); /// let (decoded_traces, _payload_size) = -/// from_slice(encoded_data_as_tinybytes).expect("Decoding failed"); +/// from_bytes(encoded_data_as_tinybytes).expect("Decoding failed"); /// /// assert_eq!(1, decoded_traces.len()); /// assert_eq!(1, decoded_traces[0].len()); /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("", decoded_span.name.as_str()); /// ``` -pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { - let data_elem = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) +pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize), DecodeError> { + let mut parsed_data = data.clone(); + let (traces_ref, size) = from_slice(unsafe { parsed_data.as_mut_slice() })?; + + #[allow(clippy::unwrap_used)] + let traces_owned = traces_ref + .iter() + .map(|trace| { + trace + .iter() + // Safe to unwrap since the spans use subslices of the `data` slice + .map(|span| span.try_to_bytes(&data).unwrap()) + .collect() + }) + .collect(); + Ok((traces_owned, size)) +} + +/// Decodes a slice of bytes into a `Vec>` object. +/// The resulting spans have the same lifetime as the initial buffer. +/// +/// # Arguments +/// +/// * `data` - A slice of bytes containing the encoded data. Bytes are expected to be encoded +/// msgpack data containing a list of a list of v05 spans. +/// +/// # Returns +/// +/// * `Ok(Vec>)` - A vector of decoded `Vec` objects if successful. +/// * `Err(DecodeError)` - An error if the decoding process fails. +/// +/// # Errors +/// +/// This function will return an error if: +/// - The array length for trace count or span count cannot be read. +/// - Any span cannot be decoded. +/// +/// # Examples +/// +/// ``` +/// use datadog_trace_utils::msgpack_decoder::v05::from_slice; +/// use rmp_serde::to_vec; +/// use std::collections::HashMap; +/// use tinybytes; +/// +/// let data = ( +/// vec!["".to_string()], +/// vec![vec![( +/// 0, +/// 0, +/// 0, +/// 1, +/// 2, +/// 3, +/// 4, +/// 5, +/// 6, +/// HashMap::::new(), +/// HashMap::::new(), +/// 0, +/// )]], +/// ); +/// let encoded_data = to_vec(&data).unwrap(); +/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data); +/// let (decoded_traces, _payload_size) = +/// from_slice(&encoded_data_as_tinybytes).expect("Decoding failed"); +/// +/// assert_eq!(1, decoded_traces.len()); +/// assert_eq!(1, decoded_traces[0].len()); +/// let decoded_span = &decoded_traces[0][0]; +/// assert_eq!("", decoded_span.name); +/// ``` +pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { + let data_elem = rmp::decode::read_array_len(&mut data) .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?; if data_elem != PAYLOAD_LEN { @@ -80,16 +151,16 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, us let dict = deserialize_dict(&mut data)?; - let trace_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) + let trace_count = rmp::decode::read_array_len(&mut data) .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?; - let mut traces: Vec> = Vec::with_capacity(trace_count as usize); + let mut traces: Vec> = Vec::with_capacity(trace_count as usize); let start_len = data.len(); for _ in 0..trace_count { - let span_count = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) + let span_count = rmp::decode::read_array_len(&mut data) .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?; - let mut trace: Vec = Vec::with_capacity(span_count as usize); + let mut trace: Vec = Vec::with_capacity(span_count as usize); for _ in 0..span_count { let span = deserialize_span(&mut data, &dict)?; @@ -100,26 +171,21 @@ pub fn from_slice(mut data: tinybytes::Bytes) -> Result<(Vec>, us Ok((traces, start_len - data.len())) } -fn deserialize_dict( - data: &mut tinybytes::Bytes, -) -> Result, DecodeError> { - let dict_len = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) +fn deserialize_dict<'a>(data: &mut &'a [u8]) -> Result, DecodeError> { + let dict_len = rmp::decode::read_array_len(data) .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?; - let mut dict: Vec = Vec::with_capacity(dict_len as usize); + let mut dict: Vec<&'a str> = Vec::with_capacity(dict_len as usize); for _ in 0..dict_len { - let str = read_string_bytes(data)?; + let str = read_string_ref(data)?; dict.push(str); } Ok(dict) } -fn deserialize_span( - data: &mut tinybytes::Bytes, - dict: &[tinybytes::BytesString], -) -> Result { - let mut span = SpanBytes::default(); - let span_len = rmp::decode::read_array_len(unsafe { data.as_mut_slice() }) +fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result, DecodeError> { + let mut span = SpanSlice::default(); + let span_len = rmp::decode::read_array_len(data) .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?; if span_len != SPAN_ELEM_COUNT { @@ -131,12 +197,12 @@ fn deserialize_span( span.service = get_from_dict(data, dict)?; span.name = get_from_dict(data, dict)?; span.resource = get_from_dict(data, dict)?; - span.trace_id = read_number_bytes(data)?; - span.span_id = read_number_bytes(data)?; - span.parent_id = read_number_bytes(data)?; - span.start = read_number_bytes(data)?; - span.duration = read_number_bytes(data)?; - span.error = read_number_bytes(data)?; + span.trace_id = read_number_slice(data)?; + span.span_id = read_number_slice(data)?; + span.parent_id = read_number_slice(data)?; + span.start = read_number_slice(data)?; + span.duration = read_number_slice(data)?; + span.error = read_number_slice(data)?; span.meta = read_indexed_map_to_bytes_strings(data, dict)?; span.metrics = read_metrics(data, dict)?; span.r#type = get_from_dict(data, dict)?; @@ -144,24 +210,21 @@ fn deserialize_span( Ok(span) } -fn get_from_dict( - data: &mut tinybytes::Bytes, - dict: &[tinybytes::BytesString], -) -> Result { - let index: u32 = read_number_bytes(data)?; +fn get_from_dict<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result<&'a str, DecodeError> { + let index: u32 = read_number_slice(data)?; match dict.get(index as usize) { - Some(value) => Ok(value.clone()), + Some(value) => Ok(value), None => Err(DecodeError::InvalidFormat( "Unable to locate string in the dictionary".to_string(), )), } } -fn read_indexed_map_to_bytes_strings( - buf: &mut tinybytes::Bytes, - dict: &[tinybytes::BytesString], -) -> Result, DecodeError> { - let len = rmp::decode::read_map_len(unsafe { buf.as_mut_slice() }) +fn read_indexed_map_to_bytes_strings<'a>( + buf: &mut &[u8], + dict: &[&'a str], +) -> Result, DecodeError> { + let len = rmp::decode::read_map_len(buf) .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?; #[allow(clippy::expect_used)] @@ -174,20 +237,20 @@ fn read_indexed_map_to_bytes_strings( Ok(map) } -fn read_metrics( - buf: &mut tinybytes::Bytes, - dict: &[tinybytes::BytesString], -) -> Result, DecodeError> { - if let Some(empty_map) = handle_null_marker(buf, HashMap::default) { - return Ok(empty_map); +fn read_metrics<'a>( + buf: &mut &[u8], + dict: &[&'a str], +) -> Result, DecodeError> { + if handle_null_marker(buf) { + return Ok(HashMap::default()); } - let len = read_map_len(unsafe { buf.as_mut_slice() })?; + let len = read_map_len(buf)?; let mut map = HashMap::with_capacity(len); for _ in 0..len { let k = get_from_dict(buf, dict)?; - let v = read_number_bytes(buf)?; + let v = read_number_slice(buf)?; map.insert(k, v); } Ok(map) @@ -233,20 +296,19 @@ mod tests { fn deserialize_dict_test() { let dict = vec!["foo", "bar", "baz"]; let mpack = rmp_serde::to_vec(&dict).unwrap(); - let mut payload = tinybytes::Bytes::from(mpack); + let mut payload = mpack.as_ref(); let result = deserialize_dict(&mut payload).unwrap(); - let result: Vec<_> = result.iter().map(|e| e.as_str()).collect(); assert_eq!(dict, result); } #[test] - fn from_slice_invalid_size_test() { + fn from_bytes_invalid_size_test() { // 3 empty array. let empty_three: [u8; 3] = [0x93, 0x90, 0x90]; let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_three) }; let bytes = tinybytes::Bytes::from_static(payload); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert!(result.is_err()); matches!(result.err().unwrap(), DecodeError::InvalidFormat(_)); @@ -255,14 +317,14 @@ mod tests { let empty_one: [u8; 2] = [0x91, 0x90]; let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_one) }; let bytes = tinybytes::Bytes::from_static(payload); - let result = from_slice(bytes); + let result = from_bytes(bytes); assert!(result.is_err()); matches!(result.err().unwrap(), DecodeError::InvalidFormat(_)); } #[test] - fn from_slice_test() { + fn from_bytes_test() { let data: V05Payload = ( vec![ "".to_string(), @@ -293,7 +355,7 @@ mod tests { )]], ); let msgpack = rmp_serde::to_vec(&data).unwrap(); - let (traces, _) = from_slice(tinybytes::Bytes::from(msgpack)).unwrap(); + let (traces, _) = from_bytes(tinybytes::Bytes::from(msgpack)).unwrap(); let span = &traces[0][0]; assert_eq!(span.service.as_str(), "my-service"); @@ -352,7 +414,7 @@ mod tests { ); let payload = rmp_serde::to_vec(&data).unwrap(); let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) }; - let result = from_slice(tinybytes::Bytes::from_static(payload)); + let result = from_bytes(tinybytes::Bytes::from_static(payload)); assert!(result.is_err()); @@ -392,7 +454,7 @@ mod tests { let payload = rmp_serde::to_vec(&data).unwrap(); let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) }; - let result = from_slice(tinybytes::Bytes::from_static(payload)); + let result = from_bytes(tinybytes::Bytes::from_static(payload)); assert!(result.is_err()); diff --git a/trace-utils/src/span/mod.rs b/trace-utils/src/span/mod.rs index 856ddab061..3e8f144a53 100644 --- a/trace-utils/src/span/mod.rs +++ b/trace-utils/src/span/mod.rs @@ -103,6 +103,8 @@ where pub meta: HashMap, #[serde(skip_serializing_if = "HashMap::is_empty")] pub metrics: HashMap, + // TODO: APMSP-1941 - Replace `Bytes` with a wrapper that borrows the underlying + // slice and serializes to bytes in MessagePack. #[serde(skip_serializing_if = "HashMap::is_empty")] pub meta_struct: HashMap, #[serde(skip_serializing_if = "Vec::is_empty")] @@ -251,6 +253,144 @@ pub type SpanEventBytes = SpanEvent; pub type AttributeAnyValueBytes = AttributeAnyValue; pub type AttributeArrayValueBytes = AttributeArrayValue; +pub type SpanSlice<'a> = Span<&'a str>; +pub type SpanLinkSlice<'a> = SpanLink<&'a str>; +pub type SpanEventSlice<'a> = SpanEvent<&'a str>; +pub type AttributeAnyValueSlice<'a> = AttributeAnyValue<&'a str>; +pub type AttributeArrayValueSlice<'a> = AttributeArrayValue<&'a str>; + +impl SpanSlice<'_> { + /// Converts a borrowed `SpanSlice` into an owned `SpanBytes`, by resolving all internal + /// references into slices of the provided `Bytes` buffer. Returns `None` if any slice is + /// out of bounds or invalid. + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(SpanBytes { + service: BytesString::try_from_bytes_slice(bytes, self.service)?, + name: BytesString::try_from_bytes_slice(bytes, self.name)?, + resource: BytesString::try_from_bytes_slice(bytes, self.resource)?, + r#type: BytesString::try_from_bytes_slice(bytes, self.r#type)?, + trace_id: self.trace_id, + span_id: self.span_id, + parent_id: self.parent_id, + start: self.start, + duration: self.duration, + error: self.error, + meta: self + .meta + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + BytesString::try_from_bytes_slice(bytes, v)?, + )) + }) + .collect::>>()?, + metrics: self + .metrics + .iter() + .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, *v))) + .collect::>>()?, + meta_struct: self + .meta_struct + .iter() + .map(|(k, v)| Some((BytesString::try_from_bytes_slice(bytes, k)?, v.clone()))) + .collect::>>()?, + span_links: self + .span_links + .iter() + .map(|link| link.try_to_bytes(bytes)) + .collect::>>()?, + span_events: self + .span_events + .iter() + .map(|event| event.try_to_bytes(bytes)) + .collect::>>()?, + }) + } +} + +impl SpanLinkSlice<'_> { + /// Converts a borrowed `SpanLinkSlice` into an owned `SpanLinkBytes`, using the provided + /// `Bytes` buffer to resolve all referenced strings. Returns `None` if conversion fails due + /// to invalid slice ranges. + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(SpanLinkBytes { + trace_id: self.trace_id, + trace_id_high: self.trace_id_high, + span_id: self.span_id, + attributes: self + .attributes + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + BytesString::try_from_bytes_slice(bytes, v)?, + )) + }) + .collect::>>()?, + tracestate: BytesString::try_from_bytes_slice(bytes, self.tracestate)?, + flags: self.flags, + }) + } +} + +impl SpanEventSlice<'_> { + /// Converts a borrowed `SpanEventSlice` into an owned `SpanEventBytes`, resolving references + /// into the provided `Bytes` buffer. Fails with `None` if any slice is invalid or cannot be + /// converted. + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + Some(SpanEventBytes { + time_unix_nano: self.time_unix_nano, + name: BytesString::try_from_bytes_slice(bytes, self.name)?, + attributes: self + .attributes + .iter() + .map(|(k, v)| { + Some(( + BytesString::try_from_bytes_slice(bytes, k)?, + v.try_to_bytes(bytes)?, + )) + }) + .collect::>>()?, + }) + } +} + +impl AttributeAnyValueSlice<'_> { + /// Converts a borrowed `AttributeAnyValueSlice` into its owned `AttributeAnyValueBytes` + /// representation, using the provided `Bytes` buffer. Recursively processes inner values if + /// it's an array. + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + match self { + AttributeAnyValue::SingleValue(value) => { + Some(AttributeAnyValue::SingleValue(value.try_to_bytes(bytes)?)) + } + AttributeAnyValue::Array(value) => Some(AttributeAnyValue::Array( + value + .iter() + .map(|attribute| attribute.try_to_bytes(bytes)) + .collect::>>()?, + )), + } + } +} + +impl AttributeArrayValueSlice<'_> { + /// Converts a single `AttributeArrayValueSlice` item into its owned form + /// (`AttributeArrayValueBytes`), borrowing data from the provided `Bytes` buffer when + /// necessary. + pub fn try_to_bytes(&self, bytes: &Bytes) -> Option { + match self { + AttributeArrayValue::String(value) => Some(AttributeArrayValue::String( + BytesString::try_from_bytes_slice(bytes, value)?, + )), + AttributeArrayValue::Boolean(value) => Some(AttributeArrayValue::Boolean(*value)), + AttributeArrayValue::Integer(value) => Some(AttributeArrayValue::Integer(*value)), + AttributeArrayValue::Double(value) => Some(AttributeArrayValue::Double(*value)), + } + } +} + #[derive(Debug)] pub struct SpanKeyParseError { pub message: String, @@ -335,31 +475,26 @@ mod tests { }; let serialized = rmp_serde::encode::to_vec_named(&span).unwrap(); - let mut serialized_bytes = tinybytes::Bytes::from(serialized); - let deserialized = decode_span(&mut serialized_bytes).unwrap(); + let mut serialized_slice = serialized.as_ref(); + let deserialized = decode_span(&mut serialized_slice).unwrap(); - assert_eq!(span.name, deserialized.name.as_str()); - assert_eq!(span.resource, deserialized.resource.as_str()); + assert_eq!(span.name, deserialized.name); + assert_eq!(span.resource, deserialized.resource); assert_eq!( span.span_links[0].trace_id, deserialized.span_links[0].trace_id ); assert_eq!( span.span_links[0].tracestate, - deserialized.span_links[0].tracestate.as_str() - ); - assert_eq!( - span.span_events[0].name, - deserialized.span_events[0].name.as_str() + deserialized.span_links[0].tracestate ); + assert_eq!(span.span_events[0].name, deserialized.span_events[0].name); assert_eq!( span.span_events[0].time_unix_nano, deserialized.span_events[0].time_unix_nano ); for attribut in &deserialized.span_events[0].attributes { - assert!(span.span_events[0] - .attributes - .contains_key(attribut.0.as_str())) + assert!(span.span_events[0].attributes.contains_key(attribut.0)) } } diff --git a/trace-utils/src/tracer_payload.rs b/trace-utils/src/tracer_payload.rs index fc3474a8ba..8274d07c81 100644 --- a/trace-utils/src/tracer_payload.rs +++ b/trace-utils/src/tracer_payload.rs @@ -379,7 +379,7 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto fn try_into(self) -> Result { match self.encoding_type { TraceEncoding::V04 => { - let (traces, size) = match msgpack_decoder::v04::from_slice(self.data) { + let (traces, size) = match msgpack_decoder::v04::from_bytes(self.data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}") @@ -399,7 +399,7 @@ impl<'a, T: TraceChunkProcessor + 'a> TryInto )?) }, TraceEncoding::V05 => { - let (traces, size) = match msgpack_decoder::v05::from_slice(self.data) { + let (traces, size) = match msgpack_decoder::v05::from_bytes(self.data) { Ok(res) => res, Err(e) => { anyhow::bail!("Error deserializing trace from request body: {e}")