Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/datadog-serverless-compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env

[dependencies]
datadog-trace-agent = { path = "../datadog-trace-agent" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" }
dogstatsd = { path = "../dogstatsd", default-features = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false }
Expand Down
10 changes: 5 additions & 5 deletions crates/datadog-trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ async-trait = "0.1.64"
tracing = { version = "0.1", default-features = false }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0"
libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb" }
libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb", features = [
libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" }
libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" }
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", features = [
"mini_agent",
] }
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb" }
libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" }
datadog-fips = { path = "../datadog-fips" }
reqwest = { version = "0.12.23", features = ["json", "http2"], default-features = false }
bytes = "1.10.1"
Expand All @@ -35,6 +35,6 @@ rmp-serde = "1.1.1"
serial_test = "2.0.0"
duplicate = "0.4.1"
tempfile = "3.3.0"
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "660c550b6311a209d9cf7de762e54b6b7109bcdb", features = [
libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95", features = [
"test-utils",
] }
70 changes: 20 additions & 50 deletions crates/datadog-trace-agent/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ mod tests {

use super::*;

#[test]
fn test_add() {
let mut aggregator = TraceAggregator::default();
let tracer_header_tags = TracerHeaderTags {
fn create_test_send_data(size: usize) -> SendData {
let tracer_header_tags: TracerHeaderTags<'_> = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
Expand All @@ -91,41 +89,31 @@ mod tests {
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let payload = SendData::new(
1,
SendData::new(
size,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);
)
}

aggregator.add(payload.clone());
#[test]
fn test_add() {
let mut aggregator = TraceAggregator::default();
let payload = create_test_send_data(1);

aggregator.add(payload);
assert_eq!(aggregator.queue.len(), 1);
assert_eq!(aggregator.queue[0].is_empty(), payload.is_empty());
assert_eq!(aggregator.queue[0].is_empty(), false);
assert_eq!(aggregator.queue[0].len(), 1);
}

#[test]
fn test_get_batch() {
let mut aggregator = TraceAggregator::default();
let tracer_header_tags = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
lang_vendor: "lang_vendor",
tracer_version: "tracer_version",
container_id: "container_id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let payload = SendData::new(
1,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);
let payload = create_test_send_data(1);

aggregator.add(payload.clone());
aggregator.add(payload);
assert_eq!(aggregator.queue.len(), 1);
let batch = aggregator.get_batch();
assert_eq!(batch.len(), 1);
Expand All @@ -134,29 +122,11 @@ mod tests {
#[test]
fn test_get_batch_full_entries() {
let mut aggregator = TraceAggregator::new(2);
let tracer_header_tags = TracerHeaderTags {
lang: "lang",
lang_version: "lang_version",
lang_interpreter: "lang_interpreter",
lang_vendor: "lang_vendor",
tracer_version: "tracer_version",
container_id: "container_id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
let payload = SendData::new(
1,
TracerPayloadCollection::V07(Vec::new()),
tracer_header_tags,
&Endpoint::from_slice("localhost"),
);

// Add 3 payloads
aggregator.add(payload.clone());
aggregator.add(payload.clone());
aggregator.add(payload.clone());
// Add 3 payloads - create new instances since SendData doesn't implement Clone
aggregator.add(create_test_send_data(1));
aggregator.add(create_test_send_data(1));
aggregator.add(create_test_send_data(1));

// The batch should only contain the first 2 payloads
let first_batch = aggregator.get_batch();
Expand Down
54 changes: 12 additions & 42 deletions crates/datadog-trace-agent/src/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ pub trait TraceFlusher {
/// implementing flushing logic that calls flush_traces.
async fn start_trace_flusher(&self, mut rx: Receiver<SendData>);
/// Given a `Vec<SendData>`, a tracer payload, send it to the Datadog intake endpoint.
/// Returns the traces back if there was an error sending them.
async fn send(&self, traces: Vec<SendData>) -> Option<Vec<SendData>>;

async fn send(&self, traces: Vec<SendData>);
/// Flushes traces by getting every available batch on the aggregator.
/// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces.
/// Returns any traces that failed to send and should be retried.
async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>>;
async fn flush(&self);
}

#[derive(Clone)]
Expand Down Expand Up @@ -58,72 +54,46 @@ impl TraceFlusher for ServerlessTraceFlusher {
self.config.trace_flush_interval_secs,
))
.await;
self.flush(None).await;
self.flush().await;
}
}

async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
let mut failed_batch: Option<Vec<SendData>> = None;

if let Some(traces) = failed_traces {
// If we have traces from a previous failed attempt, try to send those first
if !traces.is_empty() {
debug!("Retrying to send {} previously failed traces", traces.len());
let retry_result = self.send(traces).await;
if retry_result.is_some() {
// Still failed, return to retry later
return retry_result;
}
}
}

// Process new traces from the aggregator
async fn flush(&self) {
// Process traces from the aggregator
let mut guard = self.aggregator.lock().await;
let mut traces = guard.get_batch();

while !traces.is_empty() {
if let Some(failed) = self.send(traces).await {
// Keep track of the failed batch
failed_batch = Some(failed);
// Stop processing more batches if we have a failure
break;
}

self.send(traces).await;
traces = guard.get_batch();
}

failed_batch
}

async fn send(&self, traces: Vec<SendData>) -> Option<Vec<SendData>> {
async fn send(&self, traces: Vec<SendData>) {
if traces.is_empty() {
return None;
return;
}
debug!("Flushing {} traces", traces.len());

// Since we return the original traces on error, we need to clone them before coalescing
let traces_clone = traces.clone();

let http_client =
match ServerlessTraceFlusher::get_http_client(self.config.proxy_url.as_ref()) {
Ok(client) => client,
Err(e) => {
error!("Failed to create HTTP client: {e:?}");
return None;
return;
}
};

// Retries are handled internally by SendData::send()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you realize retries are now handled internally by SendData?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reviewing the PR in libdatadog with the breaking change and noticed the send_with_retry method. feat(sidecar)!: introduce TraceData to unify text and binary data

Digging further I found that this retry logic was added some time ago in May 2024! I added a link to this PR in the description for additional context. Add retry logic to trace_utils::SendData

for coalesced_traces in trace_utils::coalesce_send_data(traces) {
match coalesced_traces.send(&http_client).await.last_result {
let result = coalesced_traces.send(&http_client).await;
match result.last_result {
Ok(_) => debug!("Successfully flushed traces"),
Err(e) => {
error!("Error sending trace: {e:?}");
// Return the original traces for retry
return Some(traces_clone);
}
}
}
None
}
}

Expand Down
Loading
Loading