Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/ffi/trace_exporter.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ int main(int argc, char** argv)
ddog_TelemetryClientConfig telemetry_config = {
.interval = 60000,
.runtime_id = DDOG_CHARSLICE_C("12345678-1234-1234-1234-123456789abc"),
.debug_enabled = true
.debug_enabled = true,
.session_id = DDOG_CHARSLICE_C("12345678-1234-1234-1234-123456789abc"),
.root_session_id = DDOG_CHARSLICE_C("87654321-1234-1234-1234-123456789abc"),
.parent_session_id = DDOG_CHARSLICE_C(""),
};

ret = ddog_trace_exporter_config_enable_telemetry(config, &telemetry_config);
Expand Down
72 changes: 69 additions & 3 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use libdd_common_ffi::{
{slice::AsBytes, slice::ByteSlice},
};
use libdd_data_pipeline::trace_exporter::{
TelemetryConfig, TraceExporter as GenericTraceExporter, TraceExporterInputFormat,
TraceExporterOutputFormat,
TelemetryConfig, TelemetryInstrumentationSessions, TraceExporter as GenericTraceExporter,
TraceExporterInputFormat, TraceExporterOutputFormat,
};

type TraceExporter = GenericTraceExporter<NativeCapabilities>;
Expand Down Expand Up @@ -48,6 +48,13 @@ pub struct TelemetryClientConfig<'a> {
/// When enabled, sets the DD-Telemetry-Debug-Enabled header to true.
/// Defaults to false.
pub debug_enabled: bool,

/// HTTP header `dd-session-id` (empty = omitted).
pub session_id: CharSlice<'a>,
/// HTTP header `dd-root-session-id` (empty = omitted).
pub root_session_id: CharSlice<'a>,
/// HTTP header `dd-parent-session-id` (empty = omitted).
pub parent_session_id: CharSlice<'a>,
Comment thread
mabdinur marked this conversation as resolved.
}

/// The TraceExporterConfig object will hold the configuration properties for the TraceExporter.
Expand All @@ -69,6 +76,7 @@ pub struct TraceExporterConfig {
compute_stats: bool,
client_computed_stats: bool,
telemetry_cfg: Option<TelemetryConfig>,
telemetry_instrumentation_sessions: TelemetryInstrumentationSessions,
health_metrics_enabled: bool,
process_tags: Option<String>,
test_session_token: Option<String>,
Expand Down Expand Up @@ -308,8 +316,23 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_enable_telemetry(
},
debug_enabled: telemetry_cfg.debug_enabled,
};
debug!(telemetry_cfg = ?cfg, "Configuring telemetry");
let sessions = TelemetryInstrumentationSessions {
session_id: match sanitize_string(telemetry_cfg.session_id) {
Ok(s) => Some(s),
Err(e) => return Some(e),
},
root_session_id: match sanitize_string(telemetry_cfg.root_session_id) {
Ok(s) => Some(s),
Err(e) => return Some(e),
},
parent_session_id: match sanitize_string(telemetry_cfg.parent_session_id) {
Ok(s) => Some(s),
Err(e) => return Some(e),
},
};
debug!(telemetry_cfg = ?cfg, telemetry_sessions = ?sessions, "Configuring telemetry");
config.telemetry_cfg = Some(cfg);
config.telemetry_instrumentation_sessions = sessions;
}
None
} else {
Expand Down Expand Up @@ -524,6 +547,9 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
if let Some(cfg) = &config.telemetry_cfg {
builder.enable_telemetry(cfg.clone());
}
builder.set_telemetry_instrumentation_sessions(
config.telemetry_instrumentation_sessions.clone(),
);

if let Some(token) = &config.test_session_token {
builder.set_test_session_token(token);
Expand Down Expand Up @@ -871,6 +897,9 @@ mod tests {
interval: 1000,
runtime_id: CharSlice::from("id"),
debug_enabled: false,
session_id: CharSlice::empty(),
root_session_id: CharSlice::empty(),
parent_session_id: CharSlice::empty(),
}),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
Expand All @@ -888,6 +917,9 @@ mod tests {
interval: 1000,
runtime_id: CharSlice::from("foo"),
debug_enabled: true,
session_id: CharSlice::empty(),
root_session_id: CharSlice::empty(),
parent_session_id: CharSlice::empty(),
}),
);
assert!(error.is_none());
Expand All @@ -903,6 +935,40 @@ mod tests {
"foo"
);
assert!(cfg.telemetry_cfg.as_ref().unwrap().debug_enabled);
assert_eq!(
cfg.telemetry_instrumentation_sessions.session_id.as_deref(),
Some("")
);
assert_eq!(
cfg.telemetry_instrumentation_sessions
.root_session_id
.as_deref(),
Some("")
);
assert_eq!(
cfg.telemetry_instrumentation_sessions
.parent_session_id
.as_deref(),
Some("")
);

let mut cfg = TraceExporterConfig::default();
let error = ddog_trace_exporter_config_enable_telemetry(
Some(&mut cfg),
Some(&TelemetryClientConfig {
interval: 500,
runtime_id: CharSlice::from("rid"),
debug_enabled: false,
session_id: CharSlice::from("sess-z"),
root_session_id: CharSlice::from("root-z"),
parent_session_id: CharSlice::from("par-z"),
}),
);
assert!(error.is_none());
let s = &cfg.telemetry_instrumentation_sessions;
assert_eq!(s.session_id.as_deref(), Some("sess-z"));
assert_eq!(s.root_session_id.as_deref(), Some("root-z"));
assert_eq!(s.parent_session_id.as_deref(), Some("par-z"));
}
}

Expand Down
72 changes: 72 additions & 0 deletions libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ impl TelemetryClientBuilder {
self
}

/// `dd-session-id` header (with non-empty session id).
pub fn set_session_id(mut self, id: &str) -> Self {
self.config.session_id = Some(id.to_string());
self
}

/// `dd-root-session-id` (omitted if equal to session id).
pub fn set_root_session_id(mut self, id: &str) -> Self {
self.config.root_session_id = Some(id.to_string());
self
}

/// `dd-parent-session-id` (omitted if equal to session id).
pub fn set_parent_session_id(mut self, id: &str) -> Self {
self.config.parent_session_id = Some(id.to_string());
self
}

/// Sets the debug enabled flag for the telemetry client.
pub fn set_debug_enabled(mut self, debug: bool) -> Self {
self.config.debug_enabled = debug;
Expand Down Expand Up @@ -878,4 +896,58 @@ mod tests {
})
.expect("Failed to get runtime");
}

/// Instrumentation session headers on telemetry requests match trace exporter configuration.
#[cfg_attr(miri, ignore)]
#[test]
fn session_headers_telemetry_test() {
let shared_runtime = SharedRuntime::new().expect("Failed to create runtime");
let server = MockServer::start();
let mut telemetry_srv = server.mock(|when, then| {
when.method(POST)
.body_includes(r#""runtime_id":"foo""#)
.body_includes(
r#""application":{"service_name":"test_service","service_version":"test_version","env":"test_env","language_name":"test_language","language_version":"test_language_version","tracer_version":"test_tracer_version"}"#,
)
.header("dd-session-id", "sess-e2e")
.header("dd-root-session-id", "root-e2e")
.header("dd-parent-session-id", "parent-e2e");
then.status(200).body("");
});
let (client, worker) = TelemetryClientBuilder::default()
.set_service_name("test_service")
.set_service_version("test_version")
.set_env("test_env")
.set_language("test_language")
.set_language_version("test_language_version")
.set_tracer_version("test_tracer_version")
.set_runtime_id("foo")
.set_url(&server.url("/"))
.set_heartbeat(100)
.set_debug_enabled(true)
.set_session_id("sess-e2e")
.set_root_session_id("root-e2e")
.set_parent_session_id("parent-e2e")
.build();
let handle = shared_runtime
.spawn_worker(worker, true)
.expect("Failed to spawn worker");
shared_runtime
.block_on(async {
client.start().await;
client
.send(&SendPayloadTelemetry {
requests_count: 1,
..Default::default()
})
.unwrap();
sleep(Duration::from_millis(100)).await;
handle.stop().await.expect("Failed to stop worker");
assert!(
poll_for_mock_hits(&mut telemetry_srv, 1000, 10, 1).await,
"telemetry server did not receive calls within timeout"
);
})
.expect("Failed to get runtime");
}
}
25 changes: 23 additions & 2 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use crate::trace_exporter::TelemetryConfig;
#[cfg(not(target_arch = "wasm32"))]
use crate::trace_exporter::TraceExporterWorkers;
use crate::trace_exporter::{
add_path, StatsComputationStatus, TraceExporter, TraceExporterError, TraceExporterInputFormat,
TraceExporterOutputFormat, TracerMetadata, INFO_ENDPOINT,
add_path, StatsComputationStatus, TelemetryInstrumentationSessions, TraceExporter,
TraceExporterError, TraceExporterInputFormat, TraceExporterOutputFormat, TracerMetadata,
INFO_ENDPOINT,
};
use arc_swap::ArcSwap;
use libdd_capabilities::{HttpClientTrait, MaybeSend};
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct TraceExporterBuilder {
peer_tags: Vec<String>,
#[cfg(feature = "telemetry")]
telemetry: Option<TelemetryConfig>,
telemetry_instrumentation_sessions: TelemetryInstrumentationSessions,
shared_runtime: Option<Arc<SharedRuntime>>,
health_metrics_enabled: bool,
test_session_token: Option<String>,
Expand Down Expand Up @@ -216,6 +218,15 @@ impl TraceExporterBuilder {
self
}

/// Sets optional instrumentation session headers on telemetry requests (`dd-session-id`, etc.).
pub fn set_telemetry_instrumentation_sessions(
&mut self,
sessions: TelemetryInstrumentationSessions,
) -> &mut Self {
self.telemetry_instrumentation_sessions = sessions;
self
}

/// Set a shared runtime used by the exporter for background workers.
pub fn set_shared_runtime(&mut self, shared_runtime: Arc<SharedRuntime>) -> &mut Self {
self.shared_runtime = Some(shared_runtime);
Expand Down Expand Up @@ -317,6 +328,7 @@ impl TraceExporterBuilder {

#[cfg(feature = "telemetry")]
let (telemetry_client, telemetry_handle) = {
let sessions = self.telemetry_instrumentation_sessions;
let telemetry = self.telemetry.map(|telemetry_config| {
let mut builder = TelemetryClientBuilder::default()
.set_language(&self.language)
Expand All @@ -331,6 +343,15 @@ impl TraceExporterBuilder {
if let Some(id) = telemetry_config.runtime_id {
builder = builder.set_runtime_id(&id);
}
if let Some(ref id) = sessions.session_id {
builder = builder.set_session_id(id);
}
if let Some(ref id) = sessions.root_session_id {
builder = builder.set_root_session_id(id);
}
if let Some(ref id) = sessions.parent_session_id {
builder = builder.set_parent_session_id(id);
}
Ok(builder.build())
});
match telemetry {
Expand Down
8 changes: 8 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ use tracing::{debug, error, warn};

const INFO_ENDPOINT: &str = "/info";

/// Values for optional telemetry HTTP session headers (`dd-session-id`, root/parent).
#[derive(Debug, Default, Clone)]
pub struct TelemetryInstrumentationSessions {
pub session_id: Option<String>,
pub root_session_id: Option<String>,
pub parent_session_id: Option<String>,
}

/// TraceExporterInputFormat represents the format of the input traces.
/// The input format can be either Proxy or V0.4, where V0.4 is the default.
#[derive(Copy, Clone, Debug, Default, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion libdd-telemetry-ffi/src/builder/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ crate::c_setters!(
runtime_id,
config.session_id,
config.parent_session_id,
config.root_session_id
config.root_session_id,
}
);

Expand Down
Loading