From 19a514a4f4478c4f81e5490137b705065f4dfdee Mon Sep 17 00:00:00 2001 From: alexgallotta <5581237+alexgallotta@users.noreply.github.com> Date: Fri, 18 Oct 2024 18:28:14 -0400 Subject: [PATCH 1/2] refactor --- .../src/config/trace_propagation_style.rs | 24 + bottlecap/src/traces/propagation/mod.rs | 273 +++++----- .../traces/propagation/text_map_propagator.rs | 497 ++++-------------- .../src/traces/propagation/traceparent.rs | 259 +++++++++ 4 files changed, 517 insertions(+), 536 deletions(-) create mode 100644 bottlecap/src/traces/propagation/traceparent.rs diff --git a/bottlecap/src/config/trace_propagation_style.rs b/bottlecap/src/config/trace_propagation_style.rs index 6ebc9dc74..cc102157f 100644 --- a/bottlecap/src/config/trace_propagation_style.rs +++ b/bottlecap/src/config/trace_propagation_style.rs @@ -1,6 +1,9 @@ use std::{fmt::Display, str::FromStr}; +use crate::traces::context::SpanContext; +use crate::traces::propagation::carrier::{Extractor, Injector}; use serde::{Deserialize, Deserializer}; +use crate::traces::propagation::text_map_propagator::extract_datadog; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TracePropagationStyle { @@ -11,6 +14,27 @@ pub enum TracePropagationStyle { None, } +pub trait Propagator { + fn extract(&self, carrier: &dyn Extractor) -> Option; + fn inject(&self, context: SpanContext, carrier: &mut dyn Injector); +} + +impl Propagator for TracePropagationStyle { + fn extract(&self, carrier: &dyn Extractor) -> Option { + match self { + TracePropagationStyle::Datadog => extract_datadog(carrier), + TracePropagationStyle::B3Multi => todo!(), + TracePropagationStyle::B3 => todo!(), + TracePropagationStyle::TraceContext => traceparent::extract_trace_context(carrier), + TracePropagationStyle::None => todo!(), + } + } + + fn inject(&self, _context: SpanContext, _carrier: &mut dyn Injector) { + todo!() + } +} + impl FromStr for TracePropagationStyle { type Err = String; diff --git a/bottlecap/src/traces/propagation/mod.rs b/bottlecap/src/traces/propagation/mod.rs index e93d81329..f880a709b 100644 --- a/bottlecap/src/traces/propagation/mod.rs +++ b/bottlecap/src/traces/propagation/mod.rs @@ -1,32 +1,28 @@ -use std::{collections::HashMap, sync::Arc}; - +use crate::config::trace_propagation_style::Propagator; +use crate::traces::propagation::traceparent::TRACESTATE_KEY; use crate::{ config::{self, trace_propagation_style::TracePropagationStyle}, traces::context::SpanContext, }; use carrier::{Extractor, Injector}; use datadog_trace_protobuf::pb::SpanLink; +use std::{collections::HashMap, sync::Arc}; use text_map_propagator::{ BAGGAGE_PREFIX, DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, DATADOG_LAST_PARENT_ID_KEY, - TRACESTATE_KEY, }; pub mod carrier; pub mod error; pub mod text_map_propagator; - -pub trait Propagator { - fn extract(&self, carrier: &dyn Extractor) -> Option; - fn inject(&self, context: SpanContext, carrier: &mut dyn Injector); -} +mod traceparent; pub struct DatadogCompositePropagator { - propagators: Vec>, + propagators: Vec, config: Arc, } #[allow(clippy::never_loop)] -impl Propagator for DatadogCompositePropagator { +impl DatadogCompositePropagator { fn extract(&self, carrier: &dyn Extractor) -> Option { if self.config.trace_propagation_extract_first { for propagator in &self.propagators { @@ -34,7 +30,7 @@ impl Propagator for DatadogCompositePropagator { if self.config.trace_propagation_http_baggage_enabled { if let Some(mut context) = context { - Self::attach_baggage(&mut context, carrier); + attach_baggage(&mut context, carrier); return Some(context); } } @@ -43,160 +39,141 @@ impl Propagator for DatadogCompositePropagator { } } - let (contexts, styles) = self.extract_available_contexts(carrier); + let (contexts, styles) = + extract_available_contexts(&self.config.trace_propagation_style, carrier); if contexts.is_empty() { return None; } - let mut context = Self::resolve_contexts(contexts, styles, carrier); + let mut context = resolve_contexts(contexts, styles); if self.config.trace_propagation_http_baggage_enabled { - Self::attach_baggage(&mut context, carrier); + attach_baggage(&mut context, carrier); } Some(context) } +} - fn inject(&self, _context: SpanContext, _carrier: &mut dyn Injector) { - todo!() +pub fn new(config: Arc) -> DatadogCompositePropagator { + let propagators: Vec = config + .trace_propagation_style_extract + .iter() + .filter_map(|style| match style { + TracePropagationStyle::Datadog => Some(TracePropagationStyle::Datadog), + TracePropagationStyle::TraceContext => Some(TracePropagationStyle::TraceContext), + _ => None, + }) + .collect(); + + DatadogCompositePropagator { + propagators, + config, } } -impl DatadogCompositePropagator { - #[must_use] - pub fn new(config: Arc) -> Self { - let propagators: Vec> = config - .trace_propagation_style_extract - .iter() - .filter_map(|style| match style { - TracePropagationStyle::Datadog => { - Some(Box::new(text_map_propagator::DatadogHeaderPropagator) - as Box) - } - TracePropagationStyle::TraceContext => { - Some(Box::new(text_map_propagator::TraceContextPropagator) - as Box) - } - _ => None, - }) - .collect(); - - Self { - propagators, - config, +fn extract_available_contexts( + propagator_style: &Vec, + carrier: &dyn Extractor, +) -> (Vec, Vec) { + let mut contexts = Vec::::new(); + let mut styles = Vec::::new(); + + for propagator in propagator_style { + if let Some(context) = propagator.extract(carrier) { + contexts.push(context); + styles.push(*propagator); } } + (contexts, styles) +} - fn extract_available_contexts( - &self, - carrier: &dyn Extractor, - ) -> (Vec, Vec) { - let mut contexts = Vec::::new(); - let mut styles = Vec::::new(); - - for (i, propagator) in self.propagators.iter().enumerate() { - if let Some(context) = propagator.extract(carrier) { - contexts.push(context); - styles.push(self.config.trace_propagation_style_extract[i]); +fn resolve_contexts(contexts: Vec, styles: Vec) -> SpanContext { + let mut primary_context = contexts[0].clone(); + let mut links = Vec::::new(); + + let mut i = 1; + for context in contexts.iter().skip(1) { + let style = styles[i]; + + if context.span_id != 0 + && context.trace_id != 0 + && context.trace_id != primary_context.trace_id + { + let sampling = context.sampling.unwrap_or_default().priority.unwrap_or(0); + let tracestate: Option = match style { + TracePropagationStyle::TraceContext => context.tags.get(TRACESTATE_KEY).cloned(), + _ => None, + }; + let attributes = HashMap::from([ + ("reason".to_string(), "terminated_context".to_string()), + ("context_headers".to_string(), style.to_string()), + ]); + let trace_id_high_str = context + .tags + .get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY) + .cloned() + .unwrap_or_default(); + let trace_ig_high = u64::from_str_radix(&trace_id_high_str, 16).unwrap_or_default(); + + links.push(SpanLink { + trace_id: context.trace_id, + trace_id_high: trace_ig_high, + span_id: context.span_id, + flags: u32::from(sampling > 0), + tracestate: tracestate.unwrap_or_default(), + attributes, + }); + } else if style == TracePropagationStyle::TraceContext { + if let Some(tracestate) = context.tags.get(TRACESTATE_KEY) { + primary_context + .tags + .insert(TRACESTATE_KEY.to_string(), tracestate.clone()); } - } - (contexts, styles) - } - - fn resolve_contexts( - contexts: Vec, - styles: Vec, - _carrier: &dyn Extractor, - ) -> SpanContext { - let mut primary_context = contexts[0].clone(); - let mut links = Vec::::new(); - - let mut i = 1; - for context in contexts.iter().skip(1) { - let style = styles[i]; - - if context.span_id != 0 - && context.trace_id != 0 - && context.trace_id != primary_context.trace_id + if primary_context.trace_id == context.trace_id + && primary_context.span_id != context.span_id { - let sampling = context.sampling.unwrap_or_default().priority.unwrap_or(0); - let tracestate: Option = match style { - TracePropagationStyle::TraceContext => { - context.tags.get(TRACESTATE_KEY).cloned() - } - _ => None, - }; - let attributes = HashMap::from([ - ("reason".to_string(), "terminated_context".to_string()), - ("context_headers".to_string(), style.to_string()), - ]); - let trace_id_high_str = context - .tags - .get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY) - .cloned() - .unwrap_or_default(); - let trace_ig_high = u64::from_str_radix(&trace_id_high_str, 16).unwrap_or_default(); - - links.push(SpanLink { - trace_id: context.trace_id, - trace_id_high: trace_ig_high, - span_id: context.span_id, - flags: u32::from(sampling > 0), - tracestate: tracestate.unwrap_or_default(), - attributes, - }); - } else if style == TracePropagationStyle::TraceContext { - if let Some(tracestate) = context.tags.get(TRACESTATE_KEY) { + let mut dd_context: Option = None; + if styles.contains(&TracePropagationStyle::Datadog) { + let position = styles + .iter() + .position(|&s| s == TracePropagationStyle::Datadog) + .unwrap_or_default(); + dd_context = contexts.get(position).cloned(); + } + + if let Some(parent_id) = context.tags.get(DATADOG_LAST_PARENT_ID_KEY) { primary_context .tags - .insert(TRACESTATE_KEY.to_string(), tracestate.clone()); + .insert(DATADOG_LAST_PARENT_ID_KEY.to_string(), parent_id.clone()); + } else if let Some(sc) = dd_context { + primary_context.tags.insert( + DATADOG_LAST_PARENT_ID_KEY.to_string(), + format!("{:016x}", sc.span_id), + ); } - if primary_context.trace_id == context.trace_id - && primary_context.span_id != context.span_id - { - let mut dd_context: Option = None; - if styles.contains(&TracePropagationStyle::Datadog) { - let position = styles - .iter() - .position(|&s| s == TracePropagationStyle::Datadog) - .unwrap_or_default(); - dd_context = contexts.get(position).cloned(); - } - - if let Some(parent_id) = context.tags.get(DATADOG_LAST_PARENT_ID_KEY) { - primary_context - .tags - .insert(DATADOG_LAST_PARENT_ID_KEY.to_string(), parent_id.clone()); - } else if let Some(sc) = dd_context { - primary_context.tags.insert( - DATADOG_LAST_PARENT_ID_KEY.to_string(), - format!("{:016x}", sc.span_id), - ); - } - - primary_context.span_id = context.span_id; - } + primary_context.span_id = context.span_id; } - - i += 1; } - primary_context.links = links; - - primary_context + i += 1; } - fn attach_baggage(context: &mut SpanContext, carrier: &dyn Extractor) { - let keys = carrier.keys(); + primary_context.links = links; - for key in keys { - if let Some(stripped) = key.strip_prefix(BAGGAGE_PREFIX) { - context.tags.insert( - stripped.to_string(), - carrier.get(key).unwrap_or_default().to_string(), - ); - } + primary_context +} + +fn attach_baggage(context: &mut SpanContext, carrier: &dyn Extractor) { + let keys = carrier.keys(); + + for key in keys { + if let Some(stripped) = key.strip_prefix(BAGGAGE_PREFIX) { + context.tags.insert( + stripped.to_string(), + carrier.get(key).unwrap_or_default().to_string(), + ); } } } @@ -349,7 +326,7 @@ pub mod tests { config.trace_propagation_style_extract.clone_from(&s); } - let propagator = DatadogCompositePropagator::new(Arc::new(config)); + let propagator = new(Arc::new(config)); let context = propagator.extract(&carrier).unwrap_or_default(); @@ -779,7 +756,7 @@ pub mod tests { TracePropagationStyle::B3Multi, ]; - let propagator = DatadogCompositePropagator::new(Arc::new(config)); + let propagator = new(Arc::new(config)); assert_eq!(propagator.propagators.len(), 2); } @@ -788,21 +765,18 @@ pub mod tests { fn test_new_no_propagators() { let mut config = config::Config::default(); config.trace_propagation_style_extract = vec![TracePropagationStyle::None]; - let propagator = DatadogCompositePropagator::new(Arc::new(config)); + let propagator = new(Arc::new(config)); assert_eq!(propagator.propagators.len(), 0); } #[test] fn test_extract_available_contexts() { - let mut config = config::Config::default(); - config.trace_propagation_style_extract = vec![ + let trace_propagation_style_extract = vec![ TracePropagationStyle::Datadog, TracePropagationStyle::TraceContext, ]; - let propagator = DatadogCompositePropagator::new(Arc::new(config)); - let carrier = HashMap::from([ ( "traceparent".to_string(), @@ -827,7 +801,8 @@ pub mod tests { "_dd.p.test=value,_dd.p.tid=9291375655657946024,any=tag".to_string(), ), ]); - let (contexts, styles) = propagator.extract_available_contexts(&carrier); + let (contexts, styles) = + extract_available_contexts(&trace_propagation_style_extract, &carrier); assert_eq!(contexts.len(), 2); assert_eq!(styles.len(), 2); @@ -835,10 +810,7 @@ pub mod tests { #[test] fn test_extract_available_contexts_no_contexts() { - let mut config = config::Config::default(); - config.trace_propagation_style_extract = vec![TracePropagationStyle::Datadog]; - - let propagator = DatadogCompositePropagator::new(Arc::new(config)); + let trace_propagation_style_extract = vec![TracePropagationStyle::Datadog]; let carrier = HashMap::from([ ( @@ -850,7 +822,8 @@ pub mod tests { "dd=p:00f067aa0ba902b7;s:2;o:rum".to_string(), ), ]); - let (contexts, styles) = propagator.extract_available_contexts(&carrier); + let (contexts, styles) = + extract_available_contexts(&trace_propagation_style_extract, &carrier); assert_eq!(contexts.len(), 0); assert_eq!(styles.len(), 0); @@ -865,7 +838,7 @@ pub mod tests { ("ot-baggage-key1".to_string(), "value1".to_string()), ]); - DatadogCompositePropagator::attach_baggage(&mut context, &carrier); + attach_baggage(&mut context, &carrier); assert_eq!(context.tags.len(), 1); assert_eq!(context.tags.get("key1").unwrap(), "value1"); diff --git a/bottlecap/src/traces/propagation/text_map_propagator.rs b/bottlecap/src/traces/propagation/text_map_propagator.rs index 42b4a17fe..e04f6a668 100644 --- a/bottlecap/src/traces/propagation/text_map_propagator.rs +++ b/bottlecap/src/traces/propagation/text_map_propagator.rs @@ -1,15 +1,14 @@ use std::collections::HashMap; -use lazy_static::lazy_static; -use regex::Regex; -use tracing::{debug, error, warn}; - +use crate::config::trace_propagation_style::{Propagator, TracePropagationStyle}; use crate::traces::context::{Sampling, SpanContext}; use crate::traces::propagation::{ carrier::{Extractor, Injector}, error::Error, - Propagator, }; +use lazy_static::lazy_static; +use regex::Regex; +use tracing::{debug, error, warn}; // Datadog Keys const DATADOG_TRACE_ID_KEY: &str = "x-datadog-trace-id"; @@ -23,16 +22,9 @@ const DATADOG_PROPAGATION_ERROR_KEY: &str = "_dd.propagation_error"; pub const DATADOG_LAST_PARENT_ID_KEY: &str = "_dd.parent_id"; const DATADOG_SAMPLING_DECISION_KEY: &str = "_dd.p.dm"; -// Traceparent Keys -const TRACEPARENT_KEY: &str = "traceparent"; -pub const TRACESTATE_KEY: &str = "tracestate"; - pub const BAGGAGE_PREFIX: &str = "ot-baggage-"; lazy_static! { - static ref TRACEPARENT_REGEX: Regex = - Regex::new(r"(?i)^([a-f0-9]{2})-([a-f0-9]{32})-([a-f0-9]{16})-([a-f0-9]{2})(-.*)?$") - .expect("failed creating regex"); static ref INVALID_SEGMENT_REGEX: Regex = Regex::new(r"^0+$").expect("failed creating regex"); static ref VALID_TAG_KEY_REGEX: Regex = Regex::new(r"^_dd\.p\.[\x21-\x2b\x2d-\x7e]+$").expect("failed creating regex"); @@ -43,415 +35,148 @@ lazy_static! { static ref VALID_SAMPLING_DECISION_REGEX: Regex = Regex::new(r"^-([0-9])$").expect("failed creating regex"); } - -#[derive(Clone, Copy)] -pub struct DatadogHeaderPropagator; - -impl Propagator for DatadogHeaderPropagator { - fn extract(&self, carrier: &dyn Extractor) -> Option { - Self::extract_context(carrier) - } - - fn inject(&self, _context: SpanContext, _carrier: &mut dyn Injector) { - todo!(); - } -} - -impl DatadogHeaderPropagator { - fn extract_context(carrier: &dyn Extractor) -> Option { - let trace_id = match Self::extract_trace_id(carrier) { - Ok(trace_id) => trace_id, - Err(e) => { - debug!("{e}"); - return None; - } - }; - - let parent_id = Self::extract_parent_id(carrier).unwrap_or(0); - let sampling_priority = match Self::extract_sampling_priority(carrier) { - Ok(sampling_priority) => sampling_priority, - Err(e) => { - debug!("{e}"); - return None; - } - }; - let origin = Self::extract_origin(carrier); - let mut tags = Self::extract_tags(carrier); - Self::validate_sampling_decision(&mut tags); - - Some(SpanContext { - trace_id, - span_id: parent_id, - sampling: Some(Sampling { - priority: Some(sampling_priority), - mechanism: None, - }), - origin, - tags, - links: Vec::new(), - }) - } - - fn validate_sampling_decision(tags: &mut HashMap) { - let should_remove = - tags.get(DATADOG_SAMPLING_DECISION_KEY) - .map_or(false, |sampling_decision| { - let is_invalid = !VALID_SAMPLING_DECISION_REGEX.is_match(sampling_decision); - if is_invalid { - warn!("Failed to decode `_dd.p.dm`: {}", sampling_decision); - } - is_invalid - }); - - if should_remove { - tags.remove(DATADOG_SAMPLING_DECISION_KEY); - tags.insert( - DATADOG_PROPAGATION_ERROR_KEY.to_string(), - "decoding_error".to_string(), - ); - } - - // todo: appsec standalone - } - - fn extract_trace_id(carrier: &dyn Extractor) -> Result { - let trace_id = carrier - .get(DATADOG_TRACE_ID_KEY) - .ok_or(Error::extract("`trace_id` not found", "datadog"))?; - - if INVALID_SEGMENT_REGEX.is_match(trace_id) { - return Err(Error::extract("Invalid `trace_id` found", "datadog")); +pub(crate) fn extract_datadog(carrier: &dyn Extractor) -> Option { + let trace_id = match extract_trace_id(carrier) { + Ok(trace_id) => trace_id, + Err(e) => { + debug!("{e}"); + return None; } + }; - trace_id - .parse::() - .map_err(|_| Error::extract("Failed to decode `trace_id`", "datadog")) - } - - fn extract_parent_id(carrier: &dyn Extractor) -> Option { - let parent_id = carrier.get(DATADOG_PARENT_ID_KEY)?; - - parent_id.parse::().ok() - } - - fn extract_sampling_priority(carrier: &dyn Extractor) -> Result { - // todo: enum? Default is USER_KEEP=2 - let sampling_priority = carrier.get(DATADOG_SAMPLING_PRIORITY_KEY).unwrap_or("2"); - - sampling_priority - .parse::() - .map_err(|_| Error::extract("Failed to decode `sampling_priority`", "datadog")) - } - - fn extract_origin(carrier: &dyn Extractor) -> Option { - let origin = carrier.get(DATADOG_ORIGIN_KEY)?; - Some(origin.to_string()) - } - - fn extract_tags(carrier: &dyn Extractor) -> HashMap { - let carrier_tags = carrier.get(DATADOG_TAGS_KEY).unwrap_or_default(); - let mut tags: HashMap = HashMap::new(); - - // todo: - // - trace propagation disabled - // - trace propagation max lenght - - let pairs = carrier_tags.split(','); - for pair in pairs { - if let Some((k, v)) = pair.split_once('=') { - // todo: reject key on tags extract reject - if k.starts_with("_dd.p.") { - tags.insert(k.to_string(), v.to_string()); - } - } + let parent_id = extract_parent_id(carrier).unwrap_or(0); + let sampling_priority = match extract_sampling_priority(carrier) { + Ok(sampling_priority) => sampling_priority, + Err(e) => { + debug!("{e}"); + return None; } + }; + let origin = extract_origin(carrier); + let mut tags = extract_tags(carrier); + validate_sampling_decision(&mut tags); + + Some(SpanContext { + trace_id, + span_id: parent_id, + sampling: Some(Sampling { + priority: Some(sampling_priority), + mechanism: None, + }), + origin, + tags, + links: Vec::new(), + }) +} - // Handle 128bit trace ID - if !tags.is_empty() { - if let Some(trace_id_higher_order_bits) = - carrier.get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY) - { - if !Self::higher_order_bits_valid(trace_id_higher_order_bits) { - warn!("Malformed Trace ID: {trace_id_higher_order_bits} Failed to decode trace ID from carrier."); - tags.insert( - DATADOG_PROPAGATION_ERROR_KEY.to_string(), - format!("malformed tid {trace_id_higher_order_bits}"), - ); - tags.remove(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY); +fn validate_sampling_decision(tags: &mut HashMap) { + let should_remove = + tags.get(DATADOG_SAMPLING_DECISION_KEY) + .map_or(false, |sampling_decision| { + let is_invalid = !VALID_SAMPLING_DECISION_REGEX.is_match(sampling_decision); + if is_invalid { + warn!("Failed to decode `_dd.p.dm`: {}", sampling_decision); } - } - } - - if !tags.contains_key(DATADOG_SAMPLING_DECISION_KEY) { - tags.insert(DATADOG_SAMPLING_DECISION_KEY.to_string(), "-3".to_string()); - } - - tags + is_invalid + }); + + if should_remove { + tags.remove(DATADOG_SAMPLING_DECISION_KEY); + tags.insert( + DATADOG_PROPAGATION_ERROR_KEY.to_string(), + "decoding_error".to_string(), + ); } - fn higher_order_bits_valid(trace_id_higher_order_bits: &str) -> bool { - if trace_id_higher_order_bits.len() != 16 { - return false; - } + // todo: appsec standalone +} - match u64::from_str_radix(trace_id_higher_order_bits, 16) { - Ok(_) => {} - Err(_) => return false, - } +fn extract_trace_id(carrier: &dyn Extractor) -> Result { + let trace_id = carrier + .get(DATADOG_TRACE_ID_KEY) + .ok_or(Error::extract("`trace_id` not found", "datadog"))?; - true + if INVALID_SEGMENT_REGEX.is_match(trace_id) { + return Err(Error::extract("Invalid `trace_id` found", "datadog")); } -} -struct Traceparent { - sampling_priority: i8, - trace_id: u128, - span_id: u64, + trace_id + .parse::() + .map_err(|_| Error::extract("Failed to decode `trace_id`", "datadog")) } -struct Tracestate { - sampling_priority: Option, - origin: Option, - lower_order_trace_id: Option, +fn extract_parent_id(carrier: &dyn Extractor) -> Option { + let parent_id = carrier.get(DATADOG_PARENT_ID_KEY)?; + + parent_id.parse::().ok() } -#[derive(Clone, Copy)] -pub struct TraceContextPropagator; +fn extract_sampling_priority(carrier: &dyn Extractor) -> Result { + // todo: enum? Default is USER_KEEP=2 + let sampling_priority = carrier.get(DATADOG_SAMPLING_PRIORITY_KEY).unwrap_or("2"); -impl Propagator for TraceContextPropagator { - fn extract(&self, carrier: &dyn Extractor) -> Option { - Self::extract_context(carrier) - } + sampling_priority + .parse::() + .map_err(|_| Error::extract("Failed to decode `sampling_priority`", "datadog")) +} - fn inject(&self, _context: SpanContext, _carrier: &mut dyn Injector) { - todo!() - } +fn extract_origin(carrier: &dyn Extractor) -> Option { + let origin = carrier.get(DATADOG_ORIGIN_KEY)?; + Some(origin.to_string()) } -impl TraceContextPropagator { - fn extract_context(carrier: &dyn Extractor) -> Option { - let tp = carrier.get(TRACEPARENT_KEY)?.trim(); - - match Self::extract_traceparent(tp) { - Ok(traceparent) => { - let mut tags = HashMap::new(); - tags.insert(TRACEPARENT_KEY.to_string(), tp.to_string()); - - let mut origin = None; - let mut sampling_priority = traceparent.sampling_priority; - if let Some(ts) = carrier.get(TRACESTATE_KEY) { - if let Some(tracestate) = Self::extract_tracestate(ts, &mut tags) { - if let Some(lpid) = tracestate.lower_order_trace_id { - tags.insert(DATADOG_LAST_PARENT_ID_KEY.to_string(), lpid); - } - - origin = tracestate.origin; - - sampling_priority = Self::define_sampling_priority( - traceparent.sampling_priority, - tracestate.sampling_priority, - ); - } - } else { - debug!("No `dd` value found in tracestate"); - } +fn extract_tags(carrier: &dyn Extractor) -> HashMap { + let carrier_tags = carrier.get(DATADOG_TAGS_KEY).unwrap_or_default(); + let mut tags: HashMap = HashMap::new(); - let (trace_id_higher_order_bits, trace_id_lower_order_bits) = - Self::split_trace_id(traceparent.trace_id); - tags.insert( - DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY.to_string(), - trace_id_higher_order_bits.to_string(), - ); + // todo: + // - trace propagation disabled + // - trace propagation max lenght - Some(SpanContext { - trace_id: trace_id_lower_order_bits, - span_id: traceparent.span_id, - sampling: Some(Sampling { - priority: Some(sampling_priority), - mechanism: None, - }), - origin, - tags, - links: Vec::new(), - }) - } - Err(e) => { - error!("Failed to extract traceparent: {e}"); - None + let pairs = carrier_tags.split(','); + for pair in pairs { + if let Some((k, v)) = pair.split_once('=') { + // todo: reject key on tags extract reject + if k.starts_with("_dd.p.") { + tags.insert(k.to_string(), v.to_string()); } } } - fn extract_tracestate( - tracestate: &str, - tags: &mut HashMap, - ) -> Option { - let ts_v = tracestate.split(',').map(str::trim); - let ts = ts_v.clone().collect::>().join(","); - - if INVALID_ASCII_CHARACTERS_REGEX.is_match(&ts) { - debug!("Received invalid tracestate header {tracestate}"); - return None; - } - - tags.insert(TRACESTATE_KEY.to_string(), ts.to_string()); - - let mut dd: Option> = None; - for v in ts_v.clone() { - if let Some(stripped) = v.strip_prefix("dd=") { - dd = Some( - stripped - .split(';') - .filter_map(|item| { - let mut parts = item.splitn(2, ':'); - Some((parts.next()?.to_string(), parts.next()?.to_string())) - }) - .collect(), + // Handle 128bit trace ID + if !tags.is_empty() { + if let Some(trace_id_higher_order_bits) = + carrier.get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY) + { + if !higher_order_bits_valid(trace_id_higher_order_bits) { + warn!("Malformed Trace ID: {trace_id_higher_order_bits} Failed to decode trace ID from carrier."); + tags.insert( + DATADOG_PROPAGATION_ERROR_KEY.to_string(), + format!("malformed tid {trace_id_higher_order_bits}"), ); + tags.remove(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY); } } - - if let Some(dd) = dd { - let mut tracestate = Tracestate { - sampling_priority: None, - origin: None, - lower_order_trace_id: None, - }; - - if let Some(ts_sp) = dd.get("s") { - if let Ok(p_sp) = ts_sp.parse::() { - tracestate.sampling_priority = Some(p_sp); - } - } - - if let Some(o) = dd.get("o") { - tracestate.origin = Some(Self::decode_tag_value(o)); - } - - if let Some(lo_tid) = dd.get("p") { - tracestate.lower_order_trace_id = Some(lo_tid.to_string()); - } - - // Convert from `t.` to `_dd.p.` - for (k, v) in &dd { - if let Some(stripped) = k.strip_prefix("t.") { - let nk = format!("_dd.p.{stripped}"); - tags.insert(nk, Self::decode_tag_value(v)); - } - } - - return Some(tracestate); - } - - None - } - - fn decode_tag_value(value: &str) -> String { - value.replace('~', "=") - } - - fn define_sampling_priority( - traceparent_sampling_priority: i8, - tracestate_sampling_priority: Option, - ) -> i8 { - if let Some(ts_sp) = tracestate_sampling_priority { - if (traceparent_sampling_priority == 1 && ts_sp > 0) - || (traceparent_sampling_priority == 0 && ts_sp < 0) - { - return ts_sp; - } - } - - traceparent_sampling_priority } - fn extract_traceparent(traceparent: &str) -> Result { - let captures = TRACEPARENT_REGEX - .captures(traceparent) - .ok_or_else(|| Error::extract("invalid traceparent", "traceparent"))?; - - let version = &captures[1]; - let trace_id = &captures[2]; - let span_id = &captures[3]; - let flags = &captures[4]; - let tail = captures.get(5).map_or("", |m| m.as_str()); - - Self::extract_version(version, tail)?; - - let trace_id = Self::extract_trace_id(trace_id)?; - let span_id = Self::extract_span_id(span_id)?; - - let trace_flags = Self::extract_trace_flags(flags)?; - let sampling_priority = i8::from(trace_flags & 0x1 != 0); - - Ok(Traceparent { - sampling_priority, - trace_id, - span_id, - }) + if !tags.contains_key(DATADOG_SAMPLING_DECISION_KEY) { + tags.insert(DATADOG_SAMPLING_DECISION_KEY.to_string(), "-3".to_string()); } - fn extract_version(version: &str, tail: &str) -> Result<(), Error> { - match version { - "ff" => { - return Err(Error::extract( - "`ff` is an invalid traceparent version", - "traceparent", - )) - } - "00" => { - if !tail.is_empty() { - return Err(Error::extract("Traceparent with version `00` should contain only 4 values delimited by `-`", "traceparent")); - } - } - _ => { - warn!("Unsupported traceparent version {version}, still atempenting to parse"); - } - } - - Ok(()) - } - - fn extract_trace_id(trace_id: &str) -> Result { - if INVALID_SEGMENT_REGEX.is_match(trace_id) { - return Err(Error::extract( - "`0` value for trace_id is invalid", - "traceparent", - )); - } - - u128::from_str_radix(trace_id, 16) - .map_err(|_| Error::extract("Failed to decode trace_id", "traceparent")) - } - - #[allow(clippy::cast_possible_truncation)] - fn split_trace_id(trace_id: u128) -> (u64, u64) { - let trace_id_lower_order_bits = trace_id as u64; - let trace_id_higher_order_bits = (trace_id >> 64) as u64; + tags +} - (trace_id_higher_order_bits, trace_id_lower_order_bits) +fn higher_order_bits_valid(trace_id_higher_order_bits: &str) -> bool { + if trace_id_higher_order_bits.len() != 16 { + return false; } - fn extract_span_id(span_id: &str) -> Result { - if INVALID_SEGMENT_REGEX.is_match(span_id) { - return Err(Error::extract( - "`0` value for span_id is invalid", - "traceparent", - )); - } - - u64::from_str_radix(span_id, 16) - .map_err(|_| Error::extract("Failed to decode span_id", "traceparent")) + match u64::from_str_radix(trace_id_higher_order_bits, 16) { + Ok(_) => {} + Err(_) => return false, } - fn extract_trace_flags(flags: &str) -> Result { - if flags.len() != 2 { - return Err(Error::extract("Invalid trace flags length", "traceparent")); - } - - u8::from_str_radix(flags, 16) - .map_err(|_| Error::extract("Failed to decode trace_flags", "traceparent")) - } + true } #[cfg(test)] @@ -471,7 +196,7 @@ mod test { ), ]); - let propagator = DatadogHeaderPropagator; + let propagator = TracePropagationStyle::Datadog; let context = propagator .extract(&headers) @@ -500,7 +225,7 @@ mod test { ), ]); - let propagator = TraceContextPropagator; + let propagator = TracePropagationStyle::TraceContext; let context = propagator .extract(&headers) .expect("couldn't extract trace context"); diff --git a/bottlecap/src/traces/propagation/traceparent.rs b/bottlecap/src/traces/propagation/traceparent.rs new file mode 100644 index 000000000..341cba2a0 --- /dev/null +++ b/bottlecap/src/traces/propagation/traceparent.rs @@ -0,0 +1,259 @@ +use crate::traces::context::{Sampling, SpanContext}; +use crate::traces::propagation::carrier::Extractor; +use crate::traces::propagation::error::Error; + +// Traceparent Keys +const TRACEPARENT_KEY: &str = "traceparent"; +pub(crate) const TRACESTATE_KEY: &str = "tracestate"; + +use crate::traces::propagation::text_map_propagator::{ + DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, DATADOG_LAST_PARENT_ID_KEY, +}; +use lazy_static::lazy_static; +use regex::Regex; +use std::collections::HashMap; +use tracing::{debug, error, warn}; + +lazy_static! { + static ref INVALID_SEGMENT_REGEX: Regex = Regex::new(r"^0+$").expect("failed creating regex"); + static ref INVALID_ASCII_CHARACTERS_REGEX: Regex = + Regex::new(r"[^\x20-\x7E]+").expect("failed creating regex"); + static ref TRACEPARENT_REGEX: Regex = + Regex::new(r"(?i)^([a-f0-9]{2})-([a-f0-9]{32})-([a-f0-9]{16})-([a-f0-9]{2})(-.*)?$") + .expect("failed creating regex"); +} + +struct Traceparent { + sampling_priority: i8, + trace_id: u128, + span_id: u64, +} + +struct Tracestate { + sampling_priority: Option, + origin: Option, + lower_order_trace_id: Option, +} +fn extract(carrier: &dyn Extractor) -> Option { + let tp = carrier.get(TRACEPARENT_KEY)?.trim(); + + match extract_traceparent(tp) { + Ok(traceparent) => { + let mut tags = HashMap::new(); + tags.insert(TRACEPARENT_KEY.to_string(), tp.to_string()); + + let mut origin = None; + let mut sampling_priority = traceparent.sampling_priority; + if let Some(ts) = carrier.get(TRACESTATE_KEY) { + if let Some(tracestate) = extract_tracestate(ts, &mut tags) { + if let Some(lpid) = tracestate.lower_order_trace_id { + tags.insert(DATADOG_LAST_PARENT_ID_KEY.to_string(), lpid); + } + + origin = tracestate.origin; + + sampling_priority = define_sampling_priority( + traceparent.sampling_priority, + tracestate.sampling_priority, + ); + } + } else { + debug!("No `dd` value found in tracestate"); + } + + let (trace_id_higher_order_bits, trace_id_lower_order_bits) = + split_trace_id(traceparent.trace_id); + tags.insert( + DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY.to_string(), + trace_id_higher_order_bits.to_string(), + ); + + Some(SpanContext { + trace_id: trace_id_lower_order_bits, + span_id: traceparent.span_id, + sampling: Some(Sampling { + priority: Some(sampling_priority), + mechanism: None, + }), + origin, + tags, + links: Vec::new(), + }) + } + Err(e) => { + error!("Failed to extract traceparent: {e}"); + None + } + } +} + +fn extract_tracestate(tracestate: &str, tags: &mut HashMap) -> Option { + let ts_v = tracestate.split(',').map(str::trim); + let ts = ts_v.clone().collect::>().join(","); + + if INVALID_ASCII_CHARACTERS_REGEX.is_match(&ts) { + debug!("Received invalid tracestate header {tracestate}"); + return None; + } + + tags.insert(TRACESTATE_KEY.to_string(), ts.to_string()); + + let mut dd: Option> = None; + for v in ts_v.clone() { + if let Some(stripped) = v.strip_prefix("dd=") { + dd = Some( + stripped + .split(';') + .filter_map(|item| { + let mut parts = item.splitn(2, ':'); + Some((parts.next()?.to_string(), parts.next()?.to_string())) + }) + .collect(), + ); + } + } + + if let Some(dd) = dd { + let mut tracestate = Tracestate { + sampling_priority: None, + origin: None, + lower_order_trace_id: None, + }; + + if let Some(ts_sp) = dd.get("s") { + if let Ok(p_sp) = ts_sp.parse::() { + tracestate.sampling_priority = Some(p_sp); + } + } + + if let Some(o) = dd.get("o") { + tracestate.origin = Some(decode_tag_value(o)); + } + + if let Some(lo_tid) = dd.get("p") { + tracestate.lower_order_trace_id = Some(lo_tid.to_string()); + } + + // Convert from `t.` to `_dd.p.` + for (k, v) in &dd { + if let Some(stripped) = k.strip_prefix("t.") { + let nk = format!("_dd.p.{stripped}"); + tags.insert(nk, decode_tag_value(v)); + } + } + + return Some(tracestate); + } + + None +} + +fn decode_tag_value(value: &str) -> String { + value.replace('~', "=") +} + +fn define_sampling_priority( + traceparent_sampling_priority: i8, + tracestate_sampling_priority: Option, +) -> i8 { + if let Some(ts_sp) = tracestate_sampling_priority { + if (traceparent_sampling_priority == 1 && ts_sp > 0) + || (traceparent_sampling_priority == 0 && ts_sp < 0) + { + return ts_sp; + } + } + + traceparent_sampling_priority +} + +fn extract_traceparent(traceparent: &str) -> Result { + let captures = TRACEPARENT_REGEX + .captures(traceparent) + .ok_or_else(|| Error::extract("invalid traceparent", "traceparent"))?; + + let version = &captures[1]; + let trace_id = &captures[2]; + let span_id = &captures[3]; + let flags = &captures[4]; + let tail = captures.get(5).map_or("", |m| m.as_str()); + + extract_version(version, tail)?; + + let trace_id = extract_trace_id(trace_id)?; + let span_id = extract_span_id(span_id)?; + + let trace_flags = extract_trace_flags(flags)?; + let sampling_priority = i8::from(trace_flags & 0x1 != 0); + + Ok(Traceparent { + sampling_priority, + trace_id, + span_id, + }) +} + +fn extract_version(version: &str, tail: &str) -> Result<(), Error> { + match version { + "ff" => { + return Err(Error::extract( + "`ff` is an invalid traceparent version", + "traceparent", + )) + } + "00" => { + if !tail.is_empty() { + return Err(Error::extract( + "Traceparent with version `00` should contain only 4 values delimited by `-`", + "traceparent", + )); + } + } + _ => { + warn!("Unsupported traceparent version {version}, still atempenting to parse"); + } + } + + Ok(()) +} + +fn extract_trace_id(trace_id: &str) -> Result { + if INVALID_SEGMENT_REGEX.is_match(trace_id) { + return Err(Error::extract( + "`0` value for trace_id is invalid", + "traceparent", + )); + } + + u128::from_str_radix(trace_id, 16) + .map_err(|_| Error::extract("Failed to decode trace_id", "traceparent")) +} + +#[allow(clippy::cast_possible_truncation)] +fn split_trace_id(trace_id: u128) -> (u64, u64) { + let trace_id_lower_order_bits = trace_id as u64; + let trace_id_higher_order_bits = (trace_id >> 64) as u64; + + (trace_id_higher_order_bits, trace_id_lower_order_bits) +} + +fn extract_span_id(span_id: &str) -> Result { + if INVALID_SEGMENT_REGEX.is_match(span_id) { + return Err(Error::extract( + "`0` value for span_id is invalid", + "traceparent", + )); + } + + u64::from_str_radix(span_id, 16) + .map_err(|_| Error::extract("Failed to decode span_id", "traceparent")) +} + +fn extract_trace_flags(flags: &str) -> Result { + if flags.len() != 2 { + return Err(Error::extract("Invalid trace flags length", "traceparent")); + } + + u8::from_str_radix(flags, 16) + .map_err(|_| Error::extract("Failed to decode trace_flags", "traceparent")) +} From 0f0564f18741759d2977714c9c775b6b23560582 Mon Sep 17 00:00:00 2001 From: alexgallotta <5581237+alexgallotta@users.noreply.github.com> Date: Sun, 20 Oct 2024 10:10:15 -0400 Subject: [PATCH 2/2] no need of propagator trait anymore --- .../src/config/trace_propagation_style.rs | 20 ++++++------------- bottlecap/src/traces/propagation/mod.rs | 5 ++--- .../traces/propagation/text_map_propagator.rs | 6 +++--- .../src/traces/propagation/traceparent.rs | 2 +- 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/bottlecap/src/config/trace_propagation_style.rs b/bottlecap/src/config/trace_propagation_style.rs index cc102157f..99b1d0a67 100644 --- a/bottlecap/src/config/trace_propagation_style.rs +++ b/bottlecap/src/config/trace_propagation_style.rs @@ -1,9 +1,10 @@ use std::{fmt::Display, str::FromStr}; use crate::traces::context::SpanContext; -use crate::traces::propagation::carrier::{Extractor, Injector}; -use serde::{Deserialize, Deserializer}; +use crate::traces::propagation::carrier::Extractor; use crate::traces::propagation::text_map_propagator::extract_datadog; +use serde::{Deserialize, Deserializer}; +use crate::traces::propagation::traceparent; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TracePropagationStyle { @@ -14,25 +15,16 @@ pub enum TracePropagationStyle { None, } -pub trait Propagator { - fn extract(&self, carrier: &dyn Extractor) -> Option; - fn inject(&self, context: SpanContext, carrier: &mut dyn Injector); -} - -impl Propagator for TracePropagationStyle { - fn extract(&self, carrier: &dyn Extractor) -> Option { +impl TracePropagationStyle { + pub fn extract(&self, carrier: &dyn Extractor) -> Option { match self { TracePropagationStyle::Datadog => extract_datadog(carrier), TracePropagationStyle::B3Multi => todo!(), TracePropagationStyle::B3 => todo!(), - TracePropagationStyle::TraceContext => traceparent::extract_trace_context(carrier), + TracePropagationStyle::TraceContext => traceparent::extract(carrier), TracePropagationStyle::None => todo!(), } } - - fn inject(&self, _context: SpanContext, _carrier: &mut dyn Injector) { - todo!() - } } impl FromStr for TracePropagationStyle { diff --git a/bottlecap/src/traces/propagation/mod.rs b/bottlecap/src/traces/propagation/mod.rs index f880a709b..2e9026a0c 100644 --- a/bottlecap/src/traces/propagation/mod.rs +++ b/bottlecap/src/traces/propagation/mod.rs @@ -1,10 +1,9 @@ -use crate::config::trace_propagation_style::Propagator; use crate::traces::propagation::traceparent::TRACESTATE_KEY; use crate::{ config::{self, trace_propagation_style::TracePropagationStyle}, traces::context::SpanContext, }; -use carrier::{Extractor, Injector}; +use carrier::Extractor; use datadog_trace_protobuf::pb::SpanLink; use std::{collections::HashMap, sync::Arc}; use text_map_propagator::{ @@ -14,7 +13,7 @@ use text_map_propagator::{ pub mod carrier; pub mod error; pub mod text_map_propagator; -mod traceparent; +pub mod traceparent; pub struct DatadogCompositePropagator { propagators: Vec, diff --git a/bottlecap/src/traces/propagation/text_map_propagator.rs b/bottlecap/src/traces/propagation/text_map_propagator.rs index e04f6a668..2d963f5aa 100644 --- a/bottlecap/src/traces/propagation/text_map_propagator.rs +++ b/bottlecap/src/traces/propagation/text_map_propagator.rs @@ -1,14 +1,13 @@ use std::collections::HashMap; -use crate::config::trace_propagation_style::{Propagator, TracePropagationStyle}; use crate::traces::context::{Sampling, SpanContext}; use crate::traces::propagation::{ - carrier::{Extractor, Injector}, + carrier::Extractor, error::Error, }; use lazy_static::lazy_static; use regex::Regex; -use tracing::{debug, error, warn}; +use tracing::{debug, warn}; // Datadog Keys const DATADOG_TRACE_ID_KEY: &str = "x-datadog-trace-id"; @@ -181,6 +180,7 @@ fn higher_order_bits_valid(trace_id_higher_order_bits: &str) -> bool { #[cfg(test)] mod test { + use crate::config::trace_propagation_style::TracePropagationStyle; use super::*; #[test] diff --git a/bottlecap/src/traces/propagation/traceparent.rs b/bottlecap/src/traces/propagation/traceparent.rs index 341cba2a0..3e3f4fe1b 100644 --- a/bottlecap/src/traces/propagation/traceparent.rs +++ b/bottlecap/src/traces/propagation/traceparent.rs @@ -34,7 +34,7 @@ struct Tracestate { origin: Option, lower_order_trace_id: Option, } -fn extract(carrier: &dyn Extractor) -> Option { +pub(crate) fn extract(carrier: &dyn Extractor) -> Option { let tp = carrier.get(TRACEPARENT_KEY)?.trim(); match extract_traceparent(tp) {