diff --git a/bin_tests/tests/crashtracker_bin_test.rs b/bin_tests/tests/crashtracker_bin_test.rs index e69a7d9be9..c8c8d793c1 100644 --- a/bin_tests/tests/crashtracker_bin_test.rs +++ b/bin_tests/tests/crashtracker_bin_test.rs @@ -1070,9 +1070,15 @@ fn assert_telemetry_message(crash_telemetry: &[u8], crash_typ: &str) { }), telemetry_payload["application"] ); - assert_eq!(telemetry_payload["payload"].as_array().unwrap().len(), 1); + assert_eq!( + telemetry_payload["payload"]["logs"] + .as_array() + .unwrap() + .len(), + 1 + ); - let log_entry = &telemetry_payload["payload"][0]; + let log_entry = &telemetry_payload["payload"]["logs"][0]; let tags_raw = log_entry["tags"].as_str().unwrap(); let is_crash_ping = tags_raw.contains("is_crash_ping:true"); @@ -1523,9 +1529,15 @@ fn assert_crash_ping_message(body: &str) { serde_json::from_str(body).expect("Crash ping should be valid JSON"); assert_eq!(telemetry_payload["request_type"], "logs"); - assert_eq!(telemetry_payload["payload"].as_array().unwrap().len(), 1); + assert_eq!( + telemetry_payload["payload"]["logs"] + .as_array() + .unwrap() + .len(), + 1 + ); - let log_entry = &telemetry_payload["payload"][0]; + let log_entry = &telemetry_payload["payload"]["logs"][0]; let tags = log_entry["tags"].as_str().unwrap(); assert!( diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index 6a859726b4..71a1fd3d0f 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -315,15 +315,17 @@ impl TelemetryCrashUploader { seq_id: 1, application: &self.metadata.application, host: &self.metadata.host, - payload: &data::Payload::Logs(vec![data::Log { - message, - level, - stack_trace: None, - tags, - is_sensitive, - count: 1, - is_crash, - }]), + payload: &data::Payload::Logs(data::Logs { + logs: vec![data::Log { + message, + level, + stack_trace: None, + tags, + is_sensitive, + count: 1, + is_crash, + }], + }), origin: Some("Crashtracker"), }; @@ -503,8 +505,8 @@ mod tests { assert_eq!(payload["tracer_time"], 1568898000); assert_eq!(payload["origin"], "Crashtracker"); - assert_eq!(payload["payload"].as_array().unwrap().len(), 1); - let tags = payload["payload"][0]["tags"] + assert_eq!(payload["payload"]["logs"].as_array().unwrap().len(), 1); + let tags = payload["payload"]["logs"][0]["tags"] .as_str() .unwrap() .split(',') @@ -525,12 +527,12 @@ mod tests { ]), tags ); - assert_eq!(payload["payload"][0]["is_sensitive"], true); - assert_eq!(payload["payload"][0]["level"], "ERROR"); + assert_eq!(payload["payload"]["logs"][0]["is_sensitive"], true); + assert_eq!(payload["payload"]["logs"][0]["level"], "ERROR"); let body: CrashInfo = - serde_json::from_str(payload["payload"][0]["message"].as_str().unwrap())?; + serde_json::from_str(payload["payload"]["logs"][0]["message"].as_str().unwrap())?; assert_eq!(body, test_instance); - assert_eq!(payload["payload"][0]["is_crash"], true); + assert_eq!(payload["payload"]["logs"][0]["is_crash"], true); Ok(()) } @@ -569,8 +571,8 @@ mod tests { assert_eq!(payload["request_type"], "logs"); assert_eq!(payload["origin"], "Crashtracker"); - assert_eq!(payload["payload"].as_array().unwrap().len(), 1); - let log_entry = &payload["payload"][0]; + assert_eq!(payload["payload"]["logs"].as_array().unwrap().len(), 1); + let log_entry = &payload["payload"]["logs"][0]; // Crash ping properties assert_eq!(log_entry["is_sensitive"], false); @@ -644,8 +646,8 @@ mod tests { assert_eq!(payload["request_type"], "logs"); assert_eq!(payload["origin"], "Crashtracker"); - assert_eq!(payload["payload"].as_array().unwrap().len(), 1); - let log_entry = &payload["payload"][0]; + assert_eq!(payload["payload"]["logs"].as_array().unwrap().len(), 1); + let log_entry = &payload["payload"]["logs"][0]; // Crash ping properties assert_eq!(log_entry["is_crash"], false); @@ -748,7 +750,7 @@ mod tests { assert_eq!(payload["request_type"], "logs"); assert_eq!(payload["origin"], "Crashtracker"); - let log_entry = &payload["payload"][0]; + let log_entry = &payload["payload"]["logs"][0]; assert_eq!(log_entry["level"], "DEBUG"); assert_eq!(log_entry["is_sensitive"], false); assert_eq!(log_entry["is_crash"], false); @@ -907,7 +909,7 @@ mod tests { assert_eq!(payload["request_type"], "logs"); assert_eq!(payload["origin"], "Crashtracker"); - let log_entry = &payload["payload"][0]; + let log_entry = &payload["payload"]["logs"][0]; assert_eq!(log_entry["level"], "DEBUG"); assert_eq!(log_entry["is_sensitive"], false); assert_eq!(log_entry["is_crash"], false); @@ -977,7 +979,7 @@ mod tests { assert_eq!(payload["request_type"], "logs"); assert_eq!(payload["origin"], "Crashtracker"); - let log_entry = &payload["payload"][0]; + let log_entry = &payload["payload"]["logs"][0]; assert_eq!(log_entry["level"], "WARN"); assert_eq!(log_entry["is_sensitive"], false); assert_eq!(log_entry["is_crash"], false); diff --git a/libdd-telemetry/examples/tm-ping.rs b/libdd-telemetry/examples/tm-ping.rs index e008ee04e8..4dad58e09c 100644 --- a/libdd-telemetry/examples/tm-ping.rs +++ b/libdd-telemetry/examples/tm-ping.rs @@ -31,7 +31,7 @@ fn build_request<'a>( payload: &'a data::Payload, ) -> data::Telemetry<'a> { data::Telemetry { - api_version: data::ApiVersion::V1, + api_version: data::ApiVersion::V2, tracer_time: SystemTime::UNIX_EPOCH.elapsed().map_or(0, |d| d.as_secs()), runtime_id: "runtime_id", seq_id: seq_id(), diff --git a/libdd-telemetry/examples/tm-send-sketch.rs b/libdd-telemetry/examples/tm-send-sketch.rs index 1eae121834..42ec3767e8 100644 --- a/libdd-telemetry/examples/tm-send-sketch.rs +++ b/libdd-telemetry/examples/tm-send-sketch.rs @@ -29,7 +29,7 @@ fn build_request<'a>( payload: &'a data::Payload, ) -> data::Telemetry<'a> { data::Telemetry { - api_version: data::ApiVersion::V1, + api_version: data::ApiVersion::V2, tracer_time: SystemTime::UNIX_EPOCH.elapsed().map_or(0, |d| d.as_secs()), runtime_id: "runtime_id", seq_id: seq_id(), diff --git a/libdd-telemetry/src/data/common.rs b/libdd-telemetry/src/data/common.rs index 032e4e1c48..5f68eabbca 100644 --- a/libdd-telemetry/src/data/common.rs +++ b/libdd-telemetry/src/data/common.rs @@ -7,8 +7,6 @@ use crate::data::*; #[derive(Serialize, Deserialize, Debug)] pub enum ApiVersion { - #[serde(rename = "v1")] - V1, #[serde(rename = "v2")] V2, } @@ -16,7 +14,6 @@ pub enum ApiVersion { impl ApiVersion { pub fn to_str(&self) -> &'static str { match self { - ApiVersion::V1 => "v1", ApiVersion::V2 => "v2", } } diff --git a/libdd-telemetry/src/data/payload.rs b/libdd-telemetry/src/data/payload.rs index 9c466c6618..e8503e5efb 100644 --- a/libdd-telemetry/src/data/payload.rs +++ b/libdd-telemetry/src/data/payload.rs @@ -17,7 +17,7 @@ pub enum Payload { AppClosing(#[serde(skip_serializing)] ()), GenerateMetrics(GenerateMetrics), Sketches(Distributions), - Logs(Vec), + Logs(Logs), MessageBatch(Vec), AppExtendedHeartbeat(AppStarted), } @@ -41,3 +41,474 @@ impl Payload { } } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_app_started_serialization() { + let payload = Payload::AppStarted(AppStarted { + configuration: vec![ + Configuration { + name: "sampling_rate".to_string(), + value: "0.5".to_string(), + origin: ConfigurationOrigin::EnvVar, + config_id: Some("config-123".to_string()), + seq_id: Some(42), + }, + Configuration { + name: "log_level".to_string(), + value: "debug".to_string(), + origin: ConfigurationOrigin::Code, + config_id: None, + seq_id: None, + }, + ], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-started", + "payload": { + "configuration": [ + { + "name": "sampling_rate", + "value": "0.5", + "origin": "env_var", + "config_id": "config-123", + "seq_id": 42 + }, + { + "name": "log_level", + "value": "debug", + "origin": "code", + "config_id": null, + "seq_id": null + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_dependencies_loaded_serialization() { + let payload = Payload::AppDependenciesLoaded(AppDependenciesLoaded { + dependencies: vec![ + Dependency { + name: "tokio".to_string(), + version: Some("1.32.0".to_string()), + }, + Dependency { + name: "serde".to_string(), + version: None, + }, + ], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-dependencies-loaded", + "payload": { + "dependencies": [ + { + "name": "tokio", + "version": "1.32.0" + }, + { + "name": "serde", + "version": null + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_integrations_change_serialization() { + let payload = Payload::AppIntegrationsChange(AppIntegrationsChange { + integrations: vec![ + Integration { + name: "postgres".to_string(), + enabled: true, + version: Some("0.19.0".to_string()), + compatible: Some(true), + auto_enabled: Some(false), + }, + Integration { + name: "redis".to_string(), + enabled: false, + version: None, + compatible: None, + auto_enabled: None, + }, + ], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-integrations-change", + "payload": { + "integrations": [ + { + "name": "postgres", + "enabled": true, + "version": "0.19.0", + "compatible": true, + "auto_enabled": false + }, + { + "name": "redis", + "enabled": false, + "version": null, + "compatible": null, + "auto_enabled": null + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_client_configuration_change_serialization() { + let payload = Payload::AppClientConfigurationChange(AppClientConfigurationChange { + configuration: vec![Configuration { + name: "timeout".to_string(), + value: "30s".to_string(), + origin: ConfigurationOrigin::RemoteConfig, + config_id: Some("remote-1".to_string()), + seq_id: Some(10), + }], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-client-configuration-change", + "payload": { + "configuration": [ + { + "name": "timeout", + "value": "30s", + "origin": "remote_config", + "config_id": "remote-1", + "seq_id": 10 + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_endpoints_serialization() { + let payload = Payload::AppEndpoints(AppEndpoints { + is_first: true, + endpoints: vec![ + json!({ + "method": "GET", + "path": "/api/users", + "operation_name": "get_users", + "resource_name": "users" + }), + json!({ + "method": "POST", + "path": "/api/users", + "operation_name": "create_user", + "resource_name": "users" + }), + ], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-endpoints", + "payload": { + "is_first": true, + "endpoints": [ + { + "method": "GET", + "path": "/api/users", + "operation_name": "get_users", + "resource_name": "users" + }, + { + "method": "POST", + "path": "/api/users", + "operation_name": "create_user", + "resource_name": "users" + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_heartbeat_serialization() { + let payload = Payload::AppHeartbeat(()); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-heartbeat" + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_closing_serialization() { + let payload = Payload::AppClosing(()); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-closing" + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_generate_metrics_serialization() { + let payload = Payload::GenerateMetrics(GenerateMetrics { + series: vec![ + metrics::Serie { + namespace: metrics::MetricNamespace::Tracers, + metric: "spans_created".to_string(), + points: vec![(1234567890, 42.0), (1234567900, 43.0)], + tags: vec![], + common: true, + _type: metrics::MetricType::Count, + interval: 10, + }, + metrics::Serie { + namespace: metrics::MetricNamespace::Profilers, + metric: "cpu_time".to_string(), + points: vec![(1234567890, 0.75)], + tags: vec![], + common: false, + _type: metrics::MetricType::Gauge, + interval: 60, + }, + ], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "generate-metrics", + "payload": { + "series": [ + { + "namespace": "tracers", + "metric": "spans_created", + "points": [[1234567890, 42.0], [1234567900, 43.0]], + "tags": [], + "common": true, + "type": "count", + "interval": 10 + }, + { + "namespace": "profilers", + "metric": "cpu_time", + "points": [[1234567890, 0.75]], + "tags": [], + "common": false, + "type": "gauge", + "interval": 60 + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_sketches_serialization() { + let payload = Payload::Sketches(Distributions { + series: vec![metrics::Distribution { + namespace: metrics::MetricNamespace::Tracers, + metric: "request_duration".to_string(), + tags: vec![], + sketch: metrics::SerializedSketch::B64 { + sketch_b64: "base64encodeddata".to_string(), + }, + common: true, + interval: 10, + _type: metrics::MetricType::Distribution, + }], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "sketches", + "payload": { + "series": [ + { + "namespace": "tracers", + "metric": "request_duration", + "tags": [], + "sketch_b64": "base64encodeddata", + "common": true, + "interval": 10, + "type": "distribution" + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_logs_serialization() { + let payload = Payload::Logs(Logs { + logs: vec![ + Log { + message: "Connection error".to_string(), + level: LogLevel::Error, + count: 1, + stack_trace: Some("at main.rs:42".to_string()), + tags: "env:prod".to_string(), + is_sensitive: false, + is_crash: false, + }, + Log { + message: "Deprecated function used".to_string(), + level: LogLevel::Warn, + count: 5, + stack_trace: None, + tags: String::new(), + is_sensitive: false, + is_crash: false, + }, + ], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "logs", + "payload": { + "logs": [ + { + "message": "Connection error", + "level": "ERROR", + "count": 1, + "stack_trace": "at main.rs:42", + "tags": "env:prod", + "is_sensitive": false, + "is_crash": false + }, + { + "message": "Deprecated function used", + "level": "WARN", + "count": 5, + "stack_trace": null, + "tags": "", + "is_sensitive": false, + "is_crash": false + } + ] + } + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_message_batch_serialization() { + let payload = Payload::MessageBatch(vec![ + Payload::AppHeartbeat(()), + Payload::Logs(Logs { + logs: vec![Log { + message: "Test log".to_string(), + level: LogLevel::Debug, + count: 1, + stack_trace: None, + tags: String::new(), + is_sensitive: false, + is_crash: false, + }], + }), + ]); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "message-batch", + "payload": [ + { + "request_type": "app-heartbeat" + }, + { + "request_type": "logs", + "payload": { + "logs": [ + { + "message": "Test log", + "level": "DEBUG", + "count": 1, + "stack_trace": null, + "tags": "", + "is_sensitive": false, + "is_crash": false + } + ] + } + } + ] + }); + + assert_eq!(serialized, expected); + } + + #[test] + fn test_app_extended_heartbeat_serialization() { + let payload = Payload::AppExtendedHeartbeat(AppStarted { + configuration: vec![Configuration { + name: "feature_flag".to_string(), + value: "enabled".to_string(), + origin: ConfigurationOrigin::Default, + config_id: None, + seq_id: None, + }], + }); + + let serialized = serde_json::to_value(&payload).unwrap(); + + let expected = json!({ + "request_type": "app-extended-heartbeat", + "payload": { + "configuration": [ + { + "name": "feature_flag", + "value": "enabled", + "origin": "default", + "config_id": null, + "seq_id": null + } + ] + } + }); + + assert_eq!(serialized, expected); + } +} diff --git a/libdd-telemetry/src/data/payloads.rs b/libdd-telemetry/src/data/payloads.rs index 48cef09758..4d67650255 100644 --- a/libdd-telemetry/src/data/payloads.rs +++ b/libdd-telemetry/src/data/payloads.rs @@ -106,6 +106,11 @@ pub enum LogLevel { Debug, } +#[derive(Serialize, Debug)] +pub struct Logs { + pub logs: Vec, +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "UPPERCASE")] #[repr(C)] diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index b2f38ba07b..707f3b76cd 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -582,7 +582,7 @@ impl TelemetryWorker { let mut payloads = Vec::new(); let logs = self.build_logs(); - if !logs.is_empty() { + if !logs.logs.is_empty() { payloads.push(data::Payload::Logs(logs)); } let metrics = self.build_metrics_series(); @@ -683,7 +683,7 @@ impl TelemetryWorker { } } Logs(p) => { - for _ in p { + for _ in &p.logs { self.data.logs.pop_front(); } } @@ -692,10 +692,10 @@ impl TelemetryWorker { } } - fn build_logs(&self) -> Vec { + fn build_logs(&self) -> data::Logs { // TODO: change the data model to take a &[Log] so don't have to clone data here let logs = self.data.logs.iter().map(|(_, l)| l.clone()).collect(); - logs + data::Logs { logs } } fn next_seq_id(&self) -> u64 {