diff --git a/libdd-data-pipeline/src/lib.rs b/libdd-data-pipeline/src/lib.rs index 1b80fd26e0..57572cd97a 100644 --- a/libdd-data-pipeline/src/lib.rs +++ b/libdd-data-pipeline/src/lib.rs @@ -15,6 +15,6 @@ mod health_metrics; mod pausable_worker; #[allow(missing_docs)] pub mod stats_exporter; -pub mod telemetry; +pub(crate) mod telemetry; #[allow(missing_docs)] pub mod trace_exporter; diff --git a/libdd-data-pipeline/src/telemetry/metrics.rs b/libdd-data-pipeline/src/telemetry/metrics.rs index ad1d22c119..feeb74e492 100644 --- a/libdd-data-pipeline/src/telemetry/metrics.rs +++ b/libdd-data-pipeline/src/telemetry/metrics.rs @@ -25,8 +25,12 @@ pub enum MetricKind { ApiResponses, /// trace_chunks_sent metric ChunksSent, - /// trace_chunks_dropped metric - ChunksDropped, + /// trace_chunks_dropped metric (reason: p0_drop) + ChunksDroppedP0, + /// trace_chunks_dropped metric (reason: serialization_error) + ChunksDroppedSerializationError, + /// trace_chunks_dropped metric (reason: send_failure) + ChunksDroppedSendFailure, } /// Constants for metric names @@ -96,13 +100,25 @@ const METRICS: &[Metric] = &[ name: CHUNKS_DROPPED_STR, metric_type: MetricType::Count, namespace: MetricNamespace::Tracers, - tags: &[tag!["src_library", "libdatadog"]], + tags: &[tag!["src_library", "libdatadog"], tag!["reason", "p0_drop"]], }, Metric { name: CHUNKS_DROPPED_STR, metric_type: MetricType::Count, namespace: MetricNamespace::Tracers, - tags: &[tag!["src_library", "libdatadog"]], + tags: &[ + tag!["src_library", "libdatadog"], + tag!["reason", "serialization_error"], + ], + }, + Metric { + name: CHUNKS_DROPPED_STR, + metric_type: MetricType::Count, + namespace: MetricNamespace::Tracers, + tags: &[ + tag!["src_library", "libdatadog"], + tag!["reason", "send_failure"], + ], }, ]; diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 53bf212dbf..0181ae3680 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -147,7 +147,9 @@ pub struct SendPayloadTelemetry { errors_status_code: u64, bytes_sent: u64, chunks_sent: u64, - chunks_dropped: u64, + chunks_dropped_p0: u64, + chunks_dropped_serialization_error: u64, + chunks_dropped_send_failure: u64, responses_count_per_code: HashMap, } @@ -160,16 +162,31 @@ impl From<&SendDataResult> for SendPayloadTelemetry { errors_status_code: value.errors_status_code, bytes_sent: value.bytes_sent, chunks_sent: value.chunks_sent, - chunks_dropped: value.chunks_dropped, + chunks_dropped_send_failure: value.chunks_dropped, responses_count_per_code: value.responses_count_per_code.clone(), + ..Default::default() } } } impl SendPayloadTelemetry { /// Create a [`SendPayloadTelemetry`] from a [`SendWithRetryResult`]. - pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self { - let mut telemetry = Self::default(); + /// + /// # Arguments + /// * `value` - The result of sending traces with retry + /// * `bytes_sent` - The number of bytes in the payload + /// * `chunks` - The number of trace chunks in the payload + /// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling + pub fn from_retry_result( + value: &SendWithRetryResult, + bytes_sent: u64, + chunks: u64, + chunks_dropped_p0: u64, + ) -> Self { + let mut telemetry = Self { + chunks_dropped_p0, + ..Default::default() + }; match value { Ok((response, attempts)) => { telemetry.chunks_sent = chunks; @@ -179,29 +196,30 @@ impl SendPayloadTelemetry { .insert(response.status().into(), 1); telemetry.requests_count = *attempts as u64; } - Err(err) => { - telemetry.chunks_dropped = chunks; - match err { - SendWithRetryError::Http(response, attempts) => { - telemetry.errors_status_code = 1; - telemetry - .responses_count_per_code - .insert(response.status().into(), 1); - telemetry.requests_count = *attempts as u64; - } - SendWithRetryError::Timeout(attempts) => { - telemetry.errors_timeout = 1; - telemetry.requests_count = *attempts as u64; - } - SendWithRetryError::Network(_, attempts) => { - telemetry.errors_network = 1; - telemetry.requests_count = *attempts as u64; - } - SendWithRetryError::Build(attempts) => { - telemetry.requests_count = *attempts as u64; - } + Err(err) => match err { + SendWithRetryError::Http(response, attempts) => { + telemetry.chunks_dropped_send_failure = chunks; + telemetry.errors_status_code = 1; + telemetry + .responses_count_per_code + .insert(response.status().into(), 1); + telemetry.requests_count = *attempts as u64; } - } + SendWithRetryError::Timeout(attempts) => { + telemetry.chunks_dropped_send_failure = chunks; + telemetry.errors_timeout = 1; + telemetry.requests_count = *attempts as u64; + } + SendWithRetryError::Network(_, attempts) => { + telemetry.chunks_dropped_send_failure = chunks; + telemetry.errors_network = 1; + telemetry.requests_count = *attempts as u64; + } + SendWithRetryError::Build(attempts) => { + telemetry.chunks_dropped_serialization_error = chunks; + telemetry.requests_count = *attempts as u64; + } + }, }; telemetry } @@ -243,10 +261,24 @@ impl TelemetryClient { self.worker .add_point(data.chunks_sent as f64, key, vec![])?; } - if data.chunks_dropped > 0 { - let key = self.metrics.get(metrics::MetricKind::ChunksDropped); + if data.chunks_dropped_p0 > 0 { + let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0); + self.worker + .add_point(data.chunks_dropped_p0 as f64, key, vec![])?; + } + if data.chunks_dropped_serialization_error > 0 { + let key = self + .metrics + .get(metrics::MetricKind::ChunksDroppedSerializationError); self.worker - .add_point(data.chunks_dropped as f64, key, vec![])?; + .add_point(data.chunks_dropped_serialization_error as f64, key, vec![])?; + } + if data.chunks_dropped_send_failure > 0 { + let key = self + .metrics + .get(metrics::MetricKind::ChunksDroppedSendFailure); + self.worker + .add_point(data.chunks_dropped_send_failure as f64, key, vec![])?; } if !data.responses_count_per_code.is_empty() { let key = self.metrics.get(metrics::MetricKind::ApiResponses); @@ -550,8 +582,8 @@ mod tests { #[cfg_attr(miri, ignore)] #[tokio::test] - async fn chunks_dropped_test() { - let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog"\],"common":true,"type":"count"#).unwrap(); + async fn chunks_dropped_send_failure_test() { + let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:send_failure"\],"common":true,"type":"count"#).unwrap(); let server = MockServer::start_async().await; let telemetry_srv = server @@ -562,7 +594,65 @@ mod tests { .await; let data = SendPayloadTelemetry { - chunks_dropped: 1, + chunks_dropped_send_failure: 1, + ..Default::default() + }; + + let client = get_test_client(&server.url("/")).await; + + client.start().await; + let _ = client.send(&data); + client.shutdown().await; + while telemetry_srv.calls_async().await == 0 { + sleep(Duration::from_millis(10)).await; + } + telemetry_srv.assert_calls_async(1).await; + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn chunks_dropped_p0_test() { + let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap(); + let server = MockServer::start_async().await; + + let telemetry_srv = server + .mock_async(|when, then| { + when.method(POST).body_matches(payload); + then.status(200).body(""); + }) + .await; + + let data = SendPayloadTelemetry { + chunks_dropped_p0: 1, + ..Default::default() + }; + + let client = get_test_client(&server.url("/")).await; + + client.start().await; + let _ = client.send(&data); + client.shutdown().await; + while telemetry_srv.calls_async().await == 0 { + sleep(Duration::from_millis(10)).await; + } + telemetry_srv.assert_calls_async(1).await; + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn chunks_dropped_serialization_error_test() { + let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:serialization_error"\],"common":true,"type":"count"#).unwrap(); + let server = MockServer::start_async().await; + + let telemetry_srv = server + .mock_async(|when, then| { + when.method(POST).body_matches(payload); + then.status(200).body(""); + }) + .await; + + let data = SendPayloadTelemetry { + chunks_dropped_serialization_error: 1, ..Default::default() }; @@ -580,13 +670,30 @@ mod tests { #[test] fn telemetry_from_ok_response_test() { let result = Ok((Response::default(), 3)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 0); + assert_eq!( + telemetry, + SendPayloadTelemetry { + bytes_sent: 4, + chunks_sent: 5, + requests_count: 3, + responses_count_per_code: HashMap::from([(200, 1)]), + ..Default::default() + } + ) + } + + #[test] + fn telemetry_from_ok_response_with_p0_drops_test() { + let result = Ok((Response::default(), 3)); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 10); assert_eq!( telemetry, SendPayloadTelemetry { bytes_sent: 4, chunks_sent: 5, requests_count: 3, + chunks_dropped_p0: 10, responses_count_per_code: HashMap::from([(200, 1)]), ..Default::default() } @@ -598,11 +705,11 @@ mod tests { let mut error_response = Response::default(); *error_response.status_mut() = StatusCode::BAD_REQUEST; let result = Err(SendWithRetryError::Http(error_response, 5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); assert_eq!( telemetry, SendPayloadTelemetry { - chunks_dropped: 2, + chunks_dropped_send_failure: 2, requests_count: 5, errors_status_code: 1, responses_count_per_code: HashMap::from([(400, 1)]), @@ -621,11 +728,11 @@ mod tests { .unwrap_err(); let result = Err(SendWithRetryError::Network(hyper_error, 5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); assert_eq!( telemetry, SendPayloadTelemetry { - chunks_dropped: 2, + chunks_dropped_send_failure: 2, requests_count: 5, errors_network: 1, ..Default::default() @@ -636,11 +743,11 @@ mod tests { #[test] fn telemetry_from_timeout_error_test() { let result = Err(SendWithRetryError::Timeout(5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); assert_eq!( telemetry, SendPayloadTelemetry { - chunks_dropped: 2, + chunks_dropped_send_failure: 2, requests_count: 5, errors_timeout: 1, ..Default::default() @@ -652,11 +759,11 @@ mod tests { #[tokio::test] async fn telemetry_from_build_error_test() { let result = Err(SendWithRetryError::Build(5)); - let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0); assert_eq!( telemetry, SendPayloadTelemetry { - chunks_dropped: 2, + chunks_dropped_serialization_error: 2, requests_count: 5, ..Default::default() } @@ -684,8 +791,9 @@ mod tests { errors_status_code: 3, bytes_sent: 4, chunks_sent: 5, - chunks_dropped: 6, + chunks_dropped_send_failure: 6, responses_count_per_code: HashMap::from([(200, 3)]), + ..Default::default() }; assert_eq!(SendPayloadTelemetry::from(&result), expected_telemetry) diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 1c4bb22292..2b13150a45 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -650,6 +650,7 @@ impl TraceExporter { mp_payload: Vec, headers: HashMap<&'static str, String>, chunks: usize, + chunks_dropped_p0: usize, ) -> Result { let strategy = RetryStrategy::default(); let payload_len = mp_payload.len(); @@ -679,6 +680,7 @@ impl TraceExporter { &result, payload_len as u64, chunks as u64, + chunks_dropped_p0 as u64, )) { error!(?e, "Error sending telemetry"); } @@ -694,7 +696,7 @@ impl TraceExporter { let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); // Process stats computation - stats::process_traces_for_stats( + let dropped_p0_stats = stats::process_traces_for_stats( &mut traces, &mut header_tags, &self.client_side_stats, @@ -717,6 +719,7 @@ impl TraceExporter { prepared.data, prepared.headers, prepared.chunk_count, + dropped_p0_stats.dropped_p0_traces, ) .await } diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 3f2654e2f5..2346ba687a 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -218,13 +218,14 @@ fn add_spans_to_stats( } } -/// Process traces for stats computation and update header tags accordingly +/// Process traces for stats computation and update header tags accordingly. +/// Returns the number of P0 traces and spans that were dropped. pub(crate) fn process_traces_for_stats( traces: &mut Vec>>, header_tags: &mut libdd_trace_utils::trace_utils::TracerHeaderTags, client_side_stats: &ArcSwap, client_computed_top_level: bool, -) { +) -> libdd_trace_utils::span::trace_utils::DroppedP0Stats { if let StatsComputationStatus::Enabled { stats_concentrator, .. } = &**client_side_stats.load() @@ -237,17 +238,21 @@ pub(crate) fn process_traces_for_stats( add_spans_to_stats(stats_concentrator, traces); // Once stats have been computed we can drop all chunks that are not going to be // sampled by the agent - let libdd_trace_utils::span::trace_utils::DroppedP0Stats { - dropped_p0_traces, - dropped_p0_spans, - } = libdd_trace_utils::span::trace_utils::drop_chunks(traces); + let dropped_p0_stats = libdd_trace_utils::span::trace_utils::drop_chunks(traces); // Update the headers to indicate that stats have been computed and forward dropped // traces counts header_tags.client_computed_top_level = true; header_tags.client_computed_stats = true; - header_tags.dropped_p0_traces = dropped_p0_traces; - header_tags.dropped_p0_spans = dropped_p0_spans; + header_tags.dropped_p0_traces = dropped_p0_stats.dropped_p0_traces; + header_tags.dropped_p0_spans = dropped_p0_stats.dropped_p0_spans; + + dropped_p0_stats + } else { + libdd_trace_utils::span::trace_utils::DroppedP0Stats { + dropped_p0_traces: 0, + dropped_p0_spans: 0, + } } }