diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 49a569610..004bdbaf3 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -495,11 +495,11 @@ dependencies = [ "datadog-fips", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "datadog-trace-agent", - "datadog-trace-normalization 19.0.1", - "datadog-trace-obfuscation 19.0.1", - "datadog-trace-protobuf 19.0.1", - "datadog-trace-utils 19.0.1", - "ddcommon 19.0.1", + "datadog-trace-normalization 19.1.0", + "datadog-trace-obfuscation 19.1.0", + "datadog-trace-protobuf 19.1.0", + "datadog-trace-utils 19.1.0", + "ddcommon 19.1.0", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/)", "dogstatsd", "figment", @@ -791,11 +791,11 @@ dependencies = [ [[package]] name = "datadog-trace-normalization" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=ca19adc5c782be316210b69510ebb6c8dff12d87#ca19adc5c782be316210b69510ebb6c8dff12d87" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", - "datadog-trace-protobuf 19.0.1", + "datadog-trace-protobuf 19.1.0", ] [[package]] @@ -817,13 +817,13 @@ dependencies = [ [[package]] name = "datadog-trace-obfuscation" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=ca19adc5c782be316210b69510ebb6c8dff12d87#ca19adc5c782be316210b69510ebb6c8dff12d87" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", - "datadog-trace-protobuf 19.0.1", - "datadog-trace-utils 19.0.1", - "ddcommon 19.0.1", + "datadog-trace-protobuf 19.1.0", + "datadog-trace-utils 19.1.0", + "ddcommon 19.1.0", "log", "percent-encoding", "regex", @@ -844,8 +844,8 @@ dependencies = [ [[package]] name = "datadog-trace-protobuf" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=ca19adc5c782be316210b69510ebb6c8dff12d87#ca19adc5c782be316210b69510ebb6c8dff12d87" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "prost", "serde", @@ -882,14 +882,14 @@ dependencies = [ [[package]] name = "datadog-trace-utils" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=ca19adc5c782be316210b69510ebb6c8dff12d87#ca19adc5c782be316210b69510ebb6c8dff12d87" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", "bytes 1.10.1", - "datadog-trace-normalization 19.0.1", - "datadog-trace-protobuf 19.0.1", - "ddcommon 19.0.1", + "datadog-trace-normalization 19.1.0", + "datadog-trace-protobuf 19.1.0", + "ddcommon 19.1.0", "flate2", "futures 0.3.31", "http-body-util", @@ -902,7 +902,7 @@ dependencies = [ "rmpv", "serde", "serde_json", - "tinybytes 19.0.1", + "tinybytes 19.1.0", "tokio", "tracing", "zstd", @@ -947,8 +947,8 @@ dependencies = [ [[package]] name = "ddcommon" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=ca19adc5c782be316210b69510ebb6c8dff12d87#ca19adc5c782be316210b69510ebb6c8dff12d87" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "anyhow", "cc", @@ -3474,8 +3474,8 @@ dependencies = [ [[package]] name = "tinybytes" -version = "19.0.1" -source = "git+https://github.com/DataDog/libdatadog?rev=ca19adc5c782be316210b69510ebb6c8dff12d87#ca19adc5c782be316210b69510ebb6c8dff12d87" +version = "19.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=8a49c7df2d9cbf05118bfd5b85772676f71b34f2#8a49c7df2d9cbf05118bfd5b85772676f71b34f2" dependencies = [ "serde", ] diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index a43773c50..56ed5c85f 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -52,11 +52,11 @@ rustls-native-certs = { version = "0.8.1", optional = true } # be found in the clippy.toml file adjacent to this Cargo.toml. datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222"} ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/" } -ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" } -datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" } -datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" , features = ["mini_agent"] } -datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" } -datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "ca19adc5c782be316210b69510ebb6c8dff12d87" } +ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } +datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } +datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" , features = ["mini_agent"] } +datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } +datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "8a49c7df2d9cbf05118bfd5b85772676f71b34f2" } dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } datadog-trace-agent = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78" } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "c3d8ed4f90591c6958921145d485463860307f78", default-features = false } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index a55f0b471..11609f423 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -50,7 +50,8 @@ use bottlecap::{ proxy_flusher::Flusher as ProxyFlusher, stats_aggregator::StatsAggregator, stats_flusher::{self, StatsFlusher}, - stats_processor, trace_agent, trace_aggregator, + stats_processor, trace_agent, + trace_aggregator::{self, SendDataBuilderInfo}, trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher}, trace_processor, }, @@ -802,7 +803,7 @@ async fn handle_event_bus_event( invocation_processor: Arc>, tags_provider: Arc, trace_processor: Arc, - trace_agent_channel: Sender, + trace_agent_channel: Sender, ) -> Option { match event { Event::Metric(event) => { @@ -1024,7 +1025,7 @@ fn start_trace_agent( invocation_processor: Arc>, trace_aggregator: Arc>, ) -> ( - Sender, + Sender, Arc, Arc, Arc, @@ -1146,7 +1147,7 @@ fn start_otlp_agent( config: &Arc, tags_provider: Arc, trace_processor: Arc, - trace_tx: Sender, + trace_tx: Sender, ) -> Option { if !should_enable_otlp_agent(config) { return None; diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ae9f1eb64..959a74eed 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -6,7 +6,7 @@ use std::{ use chrono::{DateTime, Utc}; use datadog_trace_protobuf::pb::Span; -use datadog_trace_utils::{send_data::SendData, tracer_header_tags}; +use datadog_trace_utils::tracer_header_tags; use dogstatsd::aggregator::Aggregator as MetricsAggregator; use serde_json::{json, Value}; use tokio::sync::{mpsc::Sender, watch}; @@ -35,6 +35,7 @@ use crate::{ }, DatadogCompositePropagator, Propagator, }, + trace_aggregator::SendDataBuilderInfo, trace_processor::{self, TraceProcessor}, }, }; @@ -284,7 +285,7 @@ impl Processor { status: Status, tags_provider: Arc, trace_processor: Arc, - trace_agent_tx: Sender, + trace_agent_tx: Sender, timestamp: i64, ) { // Set the runtime duration metric @@ -328,7 +329,7 @@ impl Processor { status: Status, tags_provider: Arc, trace_processor: Arc, - trace_agent_tx: Sender, + trace_agent_tx: Sender, ) { let context = self.enrich_ctx_at_platform_done(request_id, status); @@ -415,7 +416,7 @@ impl Processor { &mut self, tags_provider: &Arc, trace_processor: &Arc, - trace_agent_tx: &Sender, + trace_agent_tx: &Sender, context: Context, ) { let mut body_size = std::mem::size_of_val(&context.invocation_span); @@ -467,7 +468,7 @@ impl Processor { &mut self, tags_provider: &Arc, trace_processor: &Arc, - trace_agent_tx: &Sender, + trace_agent_tx: &Sender, ) { if let Some(cold_start_context) = self.context_buffer.get_context_with_cold_start() { if let Some(cold_start_span) = &mut cold_start_context.cold_start_span { @@ -500,7 +501,7 @@ impl Processor { body_size: usize, tags_provider: &Arc, trace_processor: &Arc, - trace_agent_tx: &Sender, + trace_agent_tx: &Sender, ) { // todo: figure out what to do here let header_tags = tracer_header_tags::TracerHeaderTags { @@ -516,7 +517,7 @@ impl Processor { dropped_p0_spans: 0, }; - let send_data: SendData = trace_processor.process_traces( + let send_data_builder_info: SendDataBuilderInfo = trace_processor.process_traces( self.config.clone(), tags_provider.clone(), header_tags, @@ -525,7 +526,7 @@ impl Processor { self.inferrer.span_pointers.clone(), ); - if let Err(e) = trace_agent_tx.send(send_data).await { + if let Err(e) = trace_agent_tx.send(send_data_builder_info).await { debug!("Failed to send context spans to agent: {e}"); } } diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index 571128e01..31ddbd169 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -5,7 +5,6 @@ use axum::{ routing::post, Router, }; -use datadog_trace_utils::send_data::SendData; use datadog_trace_utils::trace_utils::TracerHeaderTags as DatadogTracerHeaderTags; use serde_json::json; use std::net::SocketAddr; @@ -19,7 +18,7 @@ use crate::{ http::{extract_request_body, handler_not_found}, otlp::processor::Processor as OtlpProcessor, tags::provider, - traces::trace_processor::TraceProcessor, + traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor}, }; const OTLP_AGENT_HTTP_PORT: u16 = 4318; @@ -29,7 +28,7 @@ type AgentState = ( Arc, OtlpProcessor, Arc, - Sender, + Sender, ); pub struct Agent { @@ -37,7 +36,7 @@ pub struct Agent { tags_provider: Arc, processor: OtlpProcessor, trace_processor: Arc, - trace_tx: Sender, + trace_tx: Sender, port: u16, cancel_token: CancellationToken, } @@ -47,7 +46,7 @@ impl Agent { config: Arc, tags_provider: Arc, trace_processor: Arc, - trace_tx: Sender, + trace_tx: Sender, ) -> Self { let port = Self::parse_port( &config.otlp_config_receiver_protocols_http_endpoint, @@ -157,16 +156,7 @@ impl Agent { let tracer_header_tags: DatadogTracerHeaderTags = (&parts.headers).into(); let body_size = size_of_val(&traces); - let send_data = trace_processor.process_traces( - config, - tags_provider, - tracer_header_tags, - traces, - body_size, - None, - ); - - if send_data.is_empty() { + if body_size == 0 { return ( StatusCode::INTERNAL_SERVER_ERROR, json!({ "message": "Not sending traces, processor returned empty data" }) @@ -175,7 +165,16 @@ impl Agent { .into_response(); } - match trace_tx.send(send_data).await { + let send_data_builder = trace_processor.process_traces( + config, + tags_provider, + tracer_header_tags, + traces, + body_size, + None, + ); + + match trace_tx.send(send_data_builder).await { Ok(()) => { debug!("OTLP | Successfully buffered traces to be flushed."); ( diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 511598939..c9d5879e7 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -29,12 +29,13 @@ use crate::{ tags::provider, traces::{ proxy_aggregator::{self, ProxyRequest}, - stats_aggregator, stats_processor, trace_aggregator, trace_processor, - INVOCATION_SPAN_RESOURCE, + stats_aggregator, stats_processor, + trace_aggregator::{self, SendDataBuilderInfo}, + trace_processor, INVOCATION_SPAN_RESOURCE, }, }; use datadog_trace_protobuf::pb; -use datadog_trace_utils::trace_utils::{self, SendData}; +use datadog_trace_utils::trace_utils::{self}; use ddcommon::hyper_migration; const TRACE_AGENT_PORT: usize = 8126; @@ -69,7 +70,7 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load"; pub struct TraceState { pub config: Arc, pub trace_processor: Arc, - pub trace_tx: Sender, + pub trace_tx: Sender, pub invocation_processor: Arc>, pub tags_provider: Arc, } @@ -94,7 +95,7 @@ pub struct TraceAgent { pub proxy_aggregator: Arc>, pub tags_provider: Arc, invocation_processor: Arc>, - tx: Sender, + tx: Sender, shutdown_token: CancellationToken, } @@ -120,16 +121,16 @@ impl TraceAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized // processed trace payloads to our trace flusher. - let (trace_tx, mut trace_rx): (Sender, Receiver) = + let (trace_tx, mut trace_rx): (Sender, Receiver) = mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. tokio::spawn(async move { - while let Some(tracer_payload) = trace_rx.recv().await { + while let Some(tracer_payload_info) = trace_rx.recv().await { let mut aggregator = trace_aggregator.lock().await; - aggregator.add(tracer_payload); + aggregator.add(tracer_payload_info); } }); @@ -392,7 +393,7 @@ impl TraceAgent { config: Arc, request: Request, trace_processor: Arc, - trace_tx: Sender, + trace_tx: Sender, invocation_processor: Arc>, tags_provider: Arc, version: ApiVersion, @@ -537,7 +538,7 @@ impl TraceAgent { } #[must_use] - pub fn get_sender_copy(&self) -> Sender { + pub fn get_sender_copy(&self) -> Sender { self.tx.clone() } diff --git a/bottlecap/src/traces/trace_aggregator.rs b/bottlecap/src/traces/trace_aggregator.rs index 2120f7e21..4c3f23a2f 100644 --- a/bottlecap/src/traces/trace_aggregator.rs +++ b/bottlecap/src/traces/trace_aggregator.rs @@ -1,4 +1,4 @@ -use datadog_trace_utils::send_data::SendData; +use datadog_trace_utils::send_data::SendDataBuilder; use std::collections::VecDeque; /// Maximum content size per payload uncompressed in bytes, @@ -7,11 +7,24 @@ use std::collections::VecDeque; /// pub const MAX_CONTENT_SIZE_BYTES: usize = (32 * 1_024 * 1_024) / 10; +// Bundle SendDataBuilder with payload size because SendDataBuilder doesn't +// expose a getter for the size +pub struct SendDataBuilderInfo { + pub builder: SendDataBuilder, + pub size: usize, +} + +impl SendDataBuilderInfo { + pub fn new(builder: SendDataBuilder, size: usize) -> Self { + Self { builder, size } + } +} + #[allow(clippy::module_name_repetitions)] pub struct TraceAggregator { - queue: VecDeque, + queue: VecDeque, max_content_size_bytes: usize, - buffer: Vec, + buffer: Vec, } impl Default for TraceAggregator { @@ -35,26 +48,26 @@ impl TraceAggregator { } } - pub fn add(&mut self, p: SendData) { - self.queue.push_back(p); + pub fn add(&mut self, payload_info: SendDataBuilderInfo) { + self.queue.push_back(payload_info); } - pub fn get_batch(&mut self) -> Vec { + pub fn get_batch(&mut self) -> Vec { let mut batch_size = 0; // Fill the batch while batch_size < self.max_content_size_bytes { - if let Some(payload) = self.queue.pop_front() { + if let Some(payload_info) = self.queue.pop_front() { // TODO(duncanista): revisit if this is bigger than limit - let payload_size = payload.len(); + let payload_size = payload_info.size; // Put stats back in the queue if batch_size + payload_size > self.max_content_size_bytes { - self.queue.push_front(payload); + self.queue.push_front(payload_info); break; } batch_size += payload_size; - self.buffer.push(payload); + self.buffer.push(payload_info.builder); } else { break; } @@ -89,16 +102,16 @@ mod tests { dropped_p0_traces: 0, dropped_p0_spans: 0, }; - let payload = SendData::new( - 1, + let size = 1; + let payload = SendDataBuilder::new( + size, TracerPayloadCollection::V07(Vec::new()), tracer_header_tags, &Endpoint::from_slice("localhost"), ); - aggregator.add(payload.clone()); + aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); assert_eq!(aggregator.queue.len(), 1); - assert_eq!(aggregator.queue[0].is_empty(), payload.is_empty()); } #[test] @@ -116,14 +129,15 @@ mod tests { dropped_p0_traces: 0, dropped_p0_spans: 0, }; - let payload = SendData::new( - 1, + let size = 1; + let payload = SendDataBuilder::new( + size, TracerPayloadCollection::V07(Vec::new()), tracer_header_tags, &Endpoint::from_slice("localhost"), ); - aggregator.add(payload.clone()); + aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); assert_eq!(aggregator.queue.len(), 1); let batch = aggregator.get_batch(); assert_eq!(batch.len(), 1); @@ -144,17 +158,18 @@ mod tests { dropped_p0_traces: 0, dropped_p0_spans: 0, }; - let payload = SendData::new( - 1, + let size = 1; + let payload = SendDataBuilder::new( + size, TracerPayloadCollection::V07(Vec::new()), tracer_header_tags, &Endpoint::from_slice("localhost"), ); // Add 3 payloads - aggregator.add(payload.clone()); - aggregator.add(payload.clone()); - aggregator.add(payload.clone()); + aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); + aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); + aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); // The batch should only contain the first 2 payloads let first_batch = aggregator.get_batch(); diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 48db23892..699c3c6cc 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{debug, error}; -use datadog_trace_utils::trace_utils::{self, SendData}; +use datadog_trace_utils::{ + send_data::SendDataBuilder, + trace_utils::{self, SendData}, +}; use crate::config::Config; use crate::traces::trace_aggregator::TraceAggregator; @@ -56,9 +59,13 @@ impl TraceFlusher for ServerlessTraceFlusher { // Process new traces from the aggregator let mut guard = self.aggregator.lock().await; - let mut traces = guard.get_batch(); + let mut trace_builders = guard.get_batch(); - while !traces.is_empty() { + while !trace_builders.is_empty() { + let traces: Vec<_> = trace_builders + .into_iter() + .map(SendDataBuilder::build) + .collect(); if let Some(failed) = self.send(traces).await { // Keep track of the failed batch failed_batch = Some(failed); @@ -66,7 +73,7 @@ impl TraceFlusher for ServerlessTraceFlusher { break; } - traces = guard.get_batch(); + trace_builders = guard.get_batch(); } failed_batch diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 55ef3ff09..58b87e7a4 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -13,7 +13,7 @@ use datadog_trace_obfuscation::obfuscate::obfuscate_span; use datadog_trace_obfuscation::obfuscation_config; use datadog_trace_protobuf::pb; use datadog_trace_protobuf::pb::Span; -use datadog_trace_utils::send_data::{Compression, SendData, SendDataBuilder}; +use datadog_trace_utils::send_data::{Compression, SendDataBuilder}; use datadog_trace_utils::send_with_retry::{RetryBackoffType, RetryStrategy}; use datadog_trace_utils::trace_utils::{self}; use datadog_trace_utils::tracer_header_tags; @@ -23,6 +23,8 @@ use std::str::FromStr; use std::sync::Arc; use tracing::error; +use super::trace_aggregator::SendDataBuilderInfo; + #[derive(Clone)] #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { @@ -124,7 +126,7 @@ pub trait TraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> SendData; + ) -> SendDataBuilderInfo; } impl TraceProcessor for ServerlessTraceProcessor { @@ -136,7 +138,7 @@ impl TraceProcessor for ServerlessTraceProcessor { traces: Vec>, body_size: usize, span_pointers: Option>, - ) -> SendData { + ) -> SendDataBuilderInfo { let mut payload = trace_utils::collect_pb_trace_chunks( traces, &header_tags, @@ -166,17 +168,16 @@ impl TraceProcessor for ServerlessTraceProcessor { test_token: None, }; - let send_data_builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint); - let mut send_data = send_data_builder + let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint) .with_compression(Compression::Zstd(6)) - .build(); - send_data.set_retry_strategy(RetryStrategy::new( - 1, - 100, - RetryBackoffType::Exponential, - None, - )); - send_data + .with_retry_strategy(RetryStrategy::new( + 1, + 100, + RetryBackoffType::Exponential, + None, + )); + + SendDataBuilderInfo::new(builder, body_size) } } @@ -325,12 +326,13 @@ mod tests { app_version: String::new(), }; - let received_payload = - if let TracerPayloadCollection::V07(payload) = tracer_payload.get_payloads() { - Some(payload[0].clone()) - } else { - None - }; + let received_payload = if let TracerPayloadCollection::V07(payload) = + tracer_payload.builder.build().get_payloads() + { + Some(payload[0].clone()) + } else { + None + }; assert_eq!( expected_tracer_payload,