From 10bff6c321938911e2bd6757c1a35953735aa2e7 Mon Sep 17 00:00:00 2001 From: Gyuheon Oh Date: Wed, 10 Dec 2025 20:59:15 +0000 Subject: [PATCH 1/4] First commit --- libdd-crashtracker/src/receiver/receive_report.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libdd-crashtracker/src/receiver/receive_report.rs b/libdd-crashtracker/src/receiver/receive_report.rs index 0bac778d57..096e2e71f5 100644 --- a/libdd-crashtracker/src/receiver/receive_report.rs +++ b/libdd-crashtracker/src/receiver/receive_report.rs @@ -299,6 +299,7 @@ pub(crate) async fn receive_report_from_stream( let mut remaining_timeout = Duration::MAX; //TODO: This assumes that the input is valid UTF-8. + // TODO: We need to log issues here to datadog loop { // We need to wait until at least we receive config, metadata, and siginfo (on non-Windows // platforms) before sending the crash ping From 4b629e005231f308fea0f56f4889f81c5ee44766 Mon Sep 17 00:00:00 2001 From: Gyuheon Oh Date: Wed, 10 Dec 2025 22:08:35 +0000 Subject: [PATCH 2/4] First pass logger component --- .../src/crash_info/telemetry.rs | 22 ++- libdd-crashtracker/src/shared/log.rs | 135 ++++++++++++++++++ libdd-crashtracker/src/shared/mod.rs | 2 + 3 files changed, 156 insertions(+), 3 deletions(-) create mode 100644 libdd-crashtracker/src/shared/log.rs diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index ac1bee1a7a..3ac6b3a3c5 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -236,6 +236,22 @@ impl TelemetryCrashUploader { Ok(s) } + /// Send a general log message to the telemetry log intake. + pub async fn send_log_payload( + &self, + message: String, + tags: String, + level: LogLevel, + ) -> anyhow::Result<()> { + let tracer_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + self.send_crash_log_payload(message, tags, tracer_time, level, false, false) + .await + } + pub async fn upload_crash_ping(&self, crash_ping: &CrashPing) -> anyhow::Result<()> { let tags = self.build_crash_ping_tags(crash_ping.crash_uuid(), crash_ping.siginfo()); let tracer_time = SystemTime::now() @@ -244,7 +260,7 @@ impl TelemetryCrashUploader { .unwrap_or(0); let message = serde_json::to_string(crash_ping)?; - self.send_log_payload( + self.send_crash_log_payload( message, tags, tracer_time, @@ -305,7 +321,7 @@ impl TelemetryCrashUploader { |ts| ts.timestamp() as u64, ); - self.send_log_payload( + self.send_crash_log_payload( message, tags, tracer_time, @@ -316,7 +332,7 @@ impl TelemetryCrashUploader { .await } - async fn send_log_payload( + async fn send_crash_log_payload( &self, message: String, tags: String, diff --git a/libdd-crashtracker/src/shared/log.rs b/libdd-crashtracker/src/shared/log.rs new file mode 100644 index 0000000000..fa0d71f3b9 --- /dev/null +++ b/libdd-crashtracker/src/shared/log.rs @@ -0,0 +1,135 @@ +#![allow(dead_code)] +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::crash_info::{Metadata, TelemetryCrashUploader}; +use anyhow::Context; +use libdd_common::Endpoint; +use libdd_telemetry::data::LogLevel; + +#[allow(dead_code)] +/// Structured log entry that can be sent via the telemetry log intake. +#[derive(Debug, Clone)] +pub struct LogEntry { + pub level: LogLevel, + pub message: String, + /// Comma-separated tags string (e.g. "service:foo,env:bar"). + pub tags: String, +} + +impl LogEntry { + pub fn new(level: LogLevel, message: impl Into) -> Self { + Self { + level, + message: message.into(), + tags: String::new(), + } + } + + pub fn with_tags(mut self, tags: impl Into) -> Self { + self.tags = tags.into(); + self + } +} + +/// Minimal uploader that sends log events to the same telemetry log intake used for crash reports. +/// Crashtracking logs need to go to where telemetry uploads go; let's reuse the same uploader +pub struct LogUploader { + inner: TelemetryCrashUploader, +} + +impl LogUploader { + pub fn new(metadata: &Metadata, endpoint: &Option) -> anyhow::Result { + let inner = + TelemetryCrashUploader::new(metadata, endpoint).context("creating telemetry logger")?; + Ok(Self { inner }) + } + + pub async fn send_log(&self, entry: LogEntry) -> anyhow::Result<()> { + self.inner + .send_log_payload(entry.message, entry.tags, entry.level) + .await + } + + pub async fn send( + &self, + level: LogLevel, + message: impl Into, + tags: impl Into, + ) -> anyhow::Result<()> { + let entry = LogEntry::new(level, message).with_tags(tags); + self.send_log(entry).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::crash_info::Metadata; + use libdd_common::Endpoint; + use libdd_telemetry::data::LogLevel; + use std::fs; + + #[test] + fn log_entry_defaults() { + let entry = LogEntry::new(LogLevel::Debug, "hello"); + assert_eq!(entry.level, LogLevel::Debug); + assert_eq!(entry.message, "hello"); + assert_eq!(entry.tags, ""); + } + + #[test] + fn log_entry_with_tags() { + let entry = LogEntry::new(LogLevel::Error, "msg").with_tags("service:foo,env:bar"); + assert_eq!(entry.tags, "service:foo,env:bar"); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn send_log_writes_file_payload() -> anyhow::Result<()> { + let tmp = tempfile::tempdir().unwrap(); + let mut base_path = tmp.keep(); + base_path.push("log_payload"); + let telemetry_path = base_path.with_extension("telemetry"); + + let metadata = Metadata::new( + "libdatadog".to_string(), + "1.0.0".to_string(), + "native".to_string(), + vec![ + "service:foo".to_string(), + "service_version:bar".to_string(), + "runtime-id:xyz".to_string(), + "language:native".to_string(), + ], + ); + + let uploader = LogUploader::new( + &metadata, + &Some(Endpoint::from_slice(&format!( + "file://{}", + base_path.to_str().unwrap() + ))), + )?; + + uploader + .send(LogLevel::Warn, "hello log", "service:foo,env:bar") + .await?; + + let payload: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&telemetry_path)?)?; + + assert_eq!(payload["api_version"], "v2"); + assert_eq!(payload["request_type"], "logs"); + assert_eq!(payload["origin"], "Crashtracker"); + + let log_entry = &payload["payload"][0]; + assert_eq!(log_entry["level"], "WARN"); + assert_eq!(log_entry["is_sensitive"], false); + assert_eq!(log_entry["is_crash"], false); + assert_eq!(log_entry["message"], "hello log"); + assert_eq!(log_entry["tags"], "service:foo,env:bar"); + + Ok(()) + } +} diff --git a/libdd-crashtracker/src/shared/mod.rs b/libdd-crashtracker/src/shared/mod.rs index 0628ecabf5..faa43e2d2d 100644 --- a/libdd-crashtracker/src/shared/mod.rs +++ b/libdd-crashtracker/src/shared/mod.rs @@ -8,5 +8,7 @@ pub(crate) mod configuration; #[cfg(not(feature = "benchmarking"))] pub(crate) mod constants; +pub(crate) mod log; + #[cfg(feature = "benchmarking")] pub mod constants; From 5acb827fcd8cf32e2daca7cb8b2b49813b247572 Mon Sep 17 00:00:00 2001 From: Gyuheon Oh Date: Thu, 11 Dec 2025 05:26:24 +0000 Subject: [PATCH 3/4] Lets just reuse telemetry crash uploader --- .../src/crash_info/errors_intake.rs | 2 +- libdd-crashtracker/src/crash_info/mod.rs | 4 +- .../src/crash_info/telemetry.rs | 132 +++++++++++------ .../src/receiver/receive_report.rs | 109 +++++++++++++- libdd-crashtracker/src/shared/log.rs | 135 ------------------ libdd-crashtracker/src/shared/mod.rs | 2 - 6 files changed, 201 insertions(+), 183 deletions(-) delete mode 100644 libdd-crashtracker/src/shared/log.rs diff --git a/libdd-crashtracker/src/crash_info/errors_intake.rs b/libdd-crashtracker/src/crash_info/errors_intake.rs index 8e10e321cf..f2962af00f 100644 --- a/libdd-crashtracker/src/crash_info/errors_intake.rs +++ b/libdd-crashtracker/src/crash_info/errors_intake.rs @@ -521,7 +521,7 @@ impl ErrorsIntakeUploader { self.send_payload(&payload).await } - pub async fn upload_to_errors_intake(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { + pub async fn upload_crash_info(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { let payload = ErrorsIntakePayload::from_crash_info(crash_info)?; self.send_payload(&payload).await } diff --git a/libdd-crashtracker/src/crash_info/mod.rs b/libdd-crashtracker/src/crash_info/mod.rs index acb5df93c3..ffe3cf4b1d 100644 --- a/libdd-crashtracker/src/crash_info/mod.rs +++ b/libdd-crashtracker/src/crash_info/mod.rs @@ -154,14 +154,14 @@ impl CrashInfo { async fn upload_to_telemetry(&self, endpoint: &Option) -> anyhow::Result<()> { let uploader = TelemetryCrashUploader::new(&self.metadata, endpoint)?; - uploader.upload_to_telemetry(self).await?; + uploader.upload_crash_info(self).await?; Ok(()) } async fn upload_to_errors_intake(&self, endpoint: &Option) -> anyhow::Result<()> { let uploader = ErrorsIntakeUploader::new(endpoint)?; if uploader.is_enabled() { - uploader.upload_to_errors_intake(self).await?; + uploader.upload_crash_info(self).await?; } Ok(()) } diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index 3ac6b3a3c5..f0ed53ea8e 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -236,8 +236,8 @@ impl TelemetryCrashUploader { Ok(s) } - /// Send a general log message to the telemetry log intake. - pub async fn send_log_payload( + /// Send a general (non crash report, non crash ping) log message to the telemetry log intake. + pub async fn upload_general_log( &self, message: String, tags: String, @@ -252,6 +252,7 @@ impl TelemetryCrashUploader { .await } + /// Send a crash ping telemetry log to indicate that crash processing has started pub async fn upload_crash_ping(&self, crash_ping: &CrashPing) -> anyhow::Result<()> { let tags = self.build_crash_ping_tags(crash_ping.crash_uuid(), crash_ping.siginfo()); let tracer_time = SystemTime::now() @@ -271,44 +272,8 @@ impl TelemetryCrashUploader { .await } - fn build_crash_ping_tags(&self, crash_uuid: &str, sig_info: Option<&SigInfo>) -> String { - let metadata = &self.metadata; - let mut tags = format!( - "uuid:{},is_crash_ping:true,service:{},language_name:{},language_version:{},tracer_version:{}", - crash_uuid, - metadata.application.service_name, - metadata.application.language_name, - metadata.application.language_version, - metadata.application.tracer_version - ); - - if let Some(sig_info) = sig_info { - tags.push_str(&format!( - ",si_code_human_readable:{:?},si_signo:{},si_signo_human_readable:{:?}", - sig_info.si_code_human_readable, - sig_info.si_signo, - sig_info.si_signo_human_readable - )); - } - - self.append_optional_tags(&mut tags); - tags - } - - fn append_optional_tags(&self, tags: &mut String) { - let metadata = &self.metadata; - if let Some(env) = &metadata.application.env { - tags.push_str(&format!(",env:{env}")); - } - if let Some(runtime_name) = &metadata.application.runtime_name { - tags.push_str(&format!(",runtime_name:{runtime_name}")); - } - if let Some(runtime_version) = &metadata.application.runtime_version { - tags.push_str(&format!(",runtime_version:{runtime_version}")); - } - } - - pub async fn upload_to_telemetry(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { + /// Send a crash info telemetry log to indicate that crash processing has completed + pub async fn upload_crash_info(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { let message = serde_json::to_string(crash_info)?; let tags = extract_crash_info_tags(crash_info).unwrap_or_default(); let tracer_time = crash_info.timestamp.parse::>().map_or_else( @@ -332,6 +297,8 @@ impl TelemetryCrashUploader { .await } + /// Shared helper that builds `data::Telemetry` payload and calls `send_telemetry_payload` + /// to send the payload to the telemetry log intake. async fn send_crash_log_payload( &self, message: String, @@ -363,6 +330,7 @@ impl TelemetryCrashUploader { self.send_telemetry_payload(&payload).await } + /// Helper to perform actual HTTP (or file) submission via configured telemetry client async fn send_telemetry_payload(&self, payload: &data::Telemetry<'_>) -> anyhow::Result<()> { let client = libdd_telemetry::worker::http_client::from_config(&self.cfg); let req = request_builder(&self.cfg)? @@ -395,6 +363,43 @@ impl TelemetryCrashUploader { Ok(()) } + + fn build_crash_ping_tags(&self, crash_uuid: &str, sig_info: Option<&SigInfo>) -> String { + let metadata = &self.metadata; + let mut tags = format!( + "uuid:{},is_crash_ping:true,service:{},language_name:{},language_version:{},tracer_version:{}", + crash_uuid, + metadata.application.service_name, + metadata.application.language_name, + metadata.application.language_version, + metadata.application.tracer_version + ); + + if let Some(sig_info) = sig_info { + tags.push_str(&format!( + ",si_code_human_readable:{:?},si_signo:{},si_signo_human_readable:{:?}", + sig_info.si_code_human_readable, + sig_info.si_signo, + sig_info.si_signo_human_readable + )); + } + + self.append_optional_tags(&mut tags); + tags + } + + fn append_optional_tags(&self, tags: &mut String) { + let metadata = &self.metadata; + if let Some(env) = &metadata.application.env { + tags.push_str(&format!(",env:{env}")); + } + if let Some(runtime_name) = &metadata.application.runtime_name { + tags.push_str(&format!(",runtime_name:{runtime_name}")); + } + if let Some(runtime_version) = &metadata.application.runtime_version { + tags.push_str(&format!(",runtime_version:{runtime_version}")); + } + } } fn extract_crash_info_tags(crash_info: &CrashInfo) -> anyhow::Result { @@ -439,6 +444,7 @@ mod tests { use super::TelemetryCrashUploader; use crate::crash_info::{test_utils::TestInstance, CrashInfo, CrashInfoBuilder, Metadata}; use libdd_common::Endpoint; + use libdd_telemetry::data::LogLevel; use std::{collections::HashSet, fs}; use uuid::Uuid; @@ -485,7 +491,7 @@ mod tests { .unwrap(); let test_instance = super::CrashInfo::test_instance(seed); - t.upload_to_telemetry(&test_instance).await.unwrap(); + t.upload_crash_info(&test_instance).await.unwrap(); let payload: serde_json::value::Value = serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap()).unwrap(); @@ -893,4 +899,48 @@ mod tests { .contains("crash processing started")); Ok(()) } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_general_log_upload() -> anyhow::Result<()> { + let tmp = tempfile::tempdir().unwrap(); + let output_filename = { + let mut p = tmp.keep(); + p.push("general_log_upload"); + p + }; + + let mut uploader = new_test_uploader(7); + uploader + .cfg + .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap()))?; + + uploader + .upload_general_log( + "hello general log".to_string(), + "service:foo,env:bar,crash_uuid:1234567890".to_string(), + LogLevel::Warn, + ) + .await?; + + let payload: serde_json::value::Value = + serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap())?; + + println!("payload: {:?}", payload.to_string()); + assert_eq!(payload["api_version"], "v2"); + assert_eq!(payload["request_type"], "logs"); + assert_eq!(payload["origin"], "Crashtracker"); + + let log_entry = &payload["payload"][0]; + assert_eq!(log_entry["level"], "WARN"); + assert_eq!(log_entry["is_sensitive"], false); + assert_eq!(log_entry["is_crash"], false); + assert_eq!(log_entry["message"], "hello general log"); + let tags = log_entry["tags"].as_str().unwrap(); + assert!(tags.contains("service:foo")); + assert!(tags.contains("env:bar")); + assert!(tags.contains("crash_uuid:1234567890")); + + Ok(()) + } } diff --git a/libdd-crashtracker/src/receiver/receive_report.rs b/libdd-crashtracker/src/receiver/receive_report.rs index 096e2e71f5..93dac04c77 100644 --- a/libdd-crashtracker/src/receiver/receive_report.rs +++ b/libdd-crashtracker/src/receiver/receive_report.rs @@ -2,16 +2,41 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - crash_info::{CrashInfo, CrashInfoBuilder, ErrorKind, SigInfo, Span, StackFrame}, + crash_info::{ + CrashInfo, CrashInfoBuilder, ErrorKind, SigInfo, Span, StackFrame, TelemetryCrashUploader, + }, runtime_callback::RuntimeStack, shared::constants::*, CrashtrackerConfiguration, }; + use anyhow::Context; +use libdd_telemetry::data::LogLevel; use serde::{Deserialize, Serialize}; use std::time::{Duration, Instant}; use tokio::io::AsyncBufReadExt; +#[derive(Debug)] +enum ReceiverIssue { + Timeout, + IoError, + ProcessLine, + AttachAdditionalFile, + IncompleteStacktrace, +} + +impl ReceiverIssue { + fn tag(&self) -> &'static str { + match self { + ReceiverIssue::Timeout => "receiver_issue:timeout", + ReceiverIssue::IoError => "receiver_issue:io_error", + ReceiverIssue::ProcessLine => "receiver_issue:process_line_error", + ReceiverIssue::AttachAdditionalFile => "receiver_issue:attach_additional_file_error", + ReceiverIssue::IncompleteStacktrace => "receiver_issue:incomplete_stacktrace", + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] struct RuntimeStackFrame { #[serde(default, skip_serializing_if = "Option::is_none")] @@ -290,6 +315,7 @@ pub(crate) async fn receive_report_from_stream( let mut builder = CrashInfoBuilder::new(); let mut stdin_state = StdinState::Waiting; let mut config: Option = None; + let mut telemetry_logger: Option = None; let mut crash_ping_sent = false; @@ -299,8 +325,16 @@ pub(crate) async fn receive_report_from_stream( let mut remaining_timeout = Duration::MAX; //TODO: This assumes that the input is valid UTF-8. - // TODO: We need to log issues here to datadog loop { + // Initialize telemetry logger once we have both config and metadata. + if telemetry_logger.is_none() { + if let (Some(cfg), Some(md)) = (&config, builder.metadata.clone()) { + if let Ok(logger) = TelemetryCrashUploader::new(&md, cfg.endpoint()) { + telemetry_logger = Some(logger); + } + } + } + // We need to wait until at least we receive config, metadata, and siginfo (on non-Windows // platforms) before sending the crash ping if !crash_ping_sent && builder.is_ping_ready() { @@ -325,10 +359,39 @@ pub(crate) async fn receive_report_from_stream( let next_line = tokio::time::timeout(remaining_timeout, lines.next_line()).await; let Ok(next_line) = next_line else { builder.with_log_message(format!("Timeout: {next_line:?}"), true)?; + if let Some(logger) = telemetry_logger.as_ref() { + let _ = logger + .upload_general_log( + format!("Timeout while waiting for crash report input: {next_line:?}"), + format!( + "{},crash_uuid:{}", + ReceiverIssue::Timeout.tag(), + builder.uuid + ), + LogLevel::Warn, + ) + .await; + } break; }; let Ok(next_line) = next_line else { builder.with_log_message(format!("IO Error: {next_line:?}"), true)?; + // We ignore error from uploading the log to telemetry, because what are we going to do? + // If upload is failing, its not worth the effort to retry the request so we should just + // continue on. At least we will get the log message in the crash info + if let Some(logger) = telemetry_logger.as_ref() { + let _ = logger + .upload_general_log( + format!("IO error while reading crash report input: {next_line:?}"), + format!( + "{},crash_uuid:{}", + ReceiverIssue::IoError.tag(), + builder.uuid + ), + LogLevel::Warn, + ) + .await; + } break; }; let Some(next_line) = next_line else { break }; @@ -346,6 +409,19 @@ pub(crate) async fn receive_report_from_stream( format!("Unable to process line: {next_line}. Error: {e}"), true, )?; + if let Some(logger) = telemetry_logger.as_ref() { + let _ = logger + .upload_general_log( + format!("Unable to process line: {next_line}. Error: {e}"), + format!( + "{},crash_uuid:{}", + ReceiverIssue::ProcessLine.tag(), + builder.uuid + ), + LogLevel::Warn, + ) + .await; + } break; } } @@ -373,10 +449,39 @@ pub(crate) async fn receive_report_from_stream( for filename in config.additional_files() { if let Err(e) = builder.with_file(filename.clone()) { builder.with_log_message(e.to_string(), true)?; + if let Some(logger) = telemetry_logger.as_ref() { + let _ = logger + .upload_general_log( + format!("Unable to attach additional file {filename:?}: {e}"), + format!( + "{},crash_uuid:{}", + ReceiverIssue::AttachAdditionalFile.tag(), + builder.uuid + ), + LogLevel::Warn, + ) + .await; + } } } let crash_info = builder.build()?; + if crash_info.incomplete { + if let Some(logger) = telemetry_logger.as_ref() { + let _ = logger + .upload_general_log( + "CrashInfo stacktrace incomplete".to_string(), + format!( + "{},crash_uuid:{}", + ReceiverIssue::IncompleteStacktrace.tag(), + crash_info.uuid + ), + LogLevel::Warn, + ) + .await; + } + } + Ok(Some((config, crash_info))) } diff --git a/libdd-crashtracker/src/shared/log.rs b/libdd-crashtracker/src/shared/log.rs deleted file mode 100644 index fa0d71f3b9..0000000000 --- a/libdd-crashtracker/src/shared/log.rs +++ /dev/null @@ -1,135 +0,0 @@ -#![allow(dead_code)] -// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ -// SPDX-License-Identifier: Apache-2.0 - -use crate::crash_info::{Metadata, TelemetryCrashUploader}; -use anyhow::Context; -use libdd_common::Endpoint; -use libdd_telemetry::data::LogLevel; - -#[allow(dead_code)] -/// Structured log entry that can be sent via the telemetry log intake. -#[derive(Debug, Clone)] -pub struct LogEntry { - pub level: LogLevel, - pub message: String, - /// Comma-separated tags string (e.g. "service:foo,env:bar"). - pub tags: String, -} - -impl LogEntry { - pub fn new(level: LogLevel, message: impl Into) -> Self { - Self { - level, - message: message.into(), - tags: String::new(), - } - } - - pub fn with_tags(mut self, tags: impl Into) -> Self { - self.tags = tags.into(); - self - } -} - -/// Minimal uploader that sends log events to the same telemetry log intake used for crash reports. -/// Crashtracking logs need to go to where telemetry uploads go; let's reuse the same uploader -pub struct LogUploader { - inner: TelemetryCrashUploader, -} - -impl LogUploader { - pub fn new(metadata: &Metadata, endpoint: &Option) -> anyhow::Result { - let inner = - TelemetryCrashUploader::new(metadata, endpoint).context("creating telemetry logger")?; - Ok(Self { inner }) - } - - pub async fn send_log(&self, entry: LogEntry) -> anyhow::Result<()> { - self.inner - .send_log_payload(entry.message, entry.tags, entry.level) - .await - } - - pub async fn send( - &self, - level: LogLevel, - message: impl Into, - tags: impl Into, - ) -> anyhow::Result<()> { - let entry = LogEntry::new(level, message).with_tags(tags); - self.send_log(entry).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::crash_info::Metadata; - use libdd_common::Endpoint; - use libdd_telemetry::data::LogLevel; - use std::fs; - - #[test] - fn log_entry_defaults() { - let entry = LogEntry::new(LogLevel::Debug, "hello"); - assert_eq!(entry.level, LogLevel::Debug); - assert_eq!(entry.message, "hello"); - assert_eq!(entry.tags, ""); - } - - #[test] - fn log_entry_with_tags() { - let entry = LogEntry::new(LogLevel::Error, "msg").with_tags("service:foo,env:bar"); - assert_eq!(entry.tags, "service:foo,env:bar"); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn send_log_writes_file_payload() -> anyhow::Result<()> { - let tmp = tempfile::tempdir().unwrap(); - let mut base_path = tmp.keep(); - base_path.push("log_payload"); - let telemetry_path = base_path.with_extension("telemetry"); - - let metadata = Metadata::new( - "libdatadog".to_string(), - "1.0.0".to_string(), - "native".to_string(), - vec![ - "service:foo".to_string(), - "service_version:bar".to_string(), - "runtime-id:xyz".to_string(), - "language:native".to_string(), - ], - ); - - let uploader = LogUploader::new( - &metadata, - &Some(Endpoint::from_slice(&format!( - "file://{}", - base_path.to_str().unwrap() - ))), - )?; - - uploader - .send(LogLevel::Warn, "hello log", "service:foo,env:bar") - .await?; - - let payload: serde_json::Value = - serde_json::from_str(&fs::read_to_string(&telemetry_path)?)?; - - assert_eq!(payload["api_version"], "v2"); - assert_eq!(payload["request_type"], "logs"); - assert_eq!(payload["origin"], "Crashtracker"); - - let log_entry = &payload["payload"][0]; - assert_eq!(log_entry["level"], "WARN"); - assert_eq!(log_entry["is_sensitive"], false); - assert_eq!(log_entry["is_crash"], false); - assert_eq!(log_entry["message"], "hello log"); - assert_eq!(log_entry["tags"], "service:foo,env:bar"); - - Ok(()) - } -} diff --git a/libdd-crashtracker/src/shared/mod.rs b/libdd-crashtracker/src/shared/mod.rs index faa43e2d2d..0628ecabf5 100644 --- a/libdd-crashtracker/src/shared/mod.rs +++ b/libdd-crashtracker/src/shared/mod.rs @@ -8,7 +8,5 @@ pub(crate) mod configuration; #[cfg(not(feature = "benchmarking"))] pub(crate) mod constants; -pub(crate) mod log; - #[cfg(feature = "benchmarking")] pub mod constants; From 5f39eb636e1e449d8cf8598f97442c6fdd9ee167 Mon Sep 17 00:00:00 2001 From: Gyuheon Oh Date: Thu, 11 Dec 2025 14:55:03 +0000 Subject: [PATCH 4/4] move logging into one helper --- .../src/crash_info/telemetry.rs | 10 +- .../src/receiver/receive_report.rs | 145 +++++++++--------- 2 files changed, 78 insertions(+), 77 deletions(-) diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index f0ed53ea8e..d54d8a8925 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -248,7 +248,7 @@ impl TelemetryCrashUploader { .map(|d| d.as_secs()) .unwrap_or(0); - self.send_crash_log_payload(message, tags, tracer_time, level, false, false) + self.send_log_payload(message, tags, tracer_time, level, false, false) .await } @@ -261,7 +261,7 @@ impl TelemetryCrashUploader { .unwrap_or(0); let message = serde_json::to_string(crash_ping)?; - self.send_crash_log_payload( + self.send_log_payload( message, tags, tracer_time, @@ -286,7 +286,7 @@ impl TelemetryCrashUploader { |ts| ts.timestamp() as u64, ); - self.send_crash_log_payload( + self.send_log_payload( message, tags, tracer_time, @@ -299,7 +299,7 @@ impl TelemetryCrashUploader { /// Shared helper that builds `data::Telemetry` payload and calls `send_telemetry_payload` /// to send the payload to the telemetry log intake. - async fn send_crash_log_payload( + async fn send_log_payload( &self, message: String, tags: String, @@ -925,8 +925,8 @@ mod tests { let payload: serde_json::value::Value = serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap())?; - println!("payload: {:?}", payload.to_string()); + assert_eq!(payload["api_version"], "v2"); assert_eq!(payload["request_type"], "logs"); assert_eq!(payload["origin"], "Crashtracker"); diff --git a/libdd-crashtracker/src/receiver/receive_report.rs b/libdd-crashtracker/src/receiver/receive_report.rs index 93dac04c77..5f44cb0e59 100644 --- a/libdd-crashtracker/src/receiver/receive_report.rs +++ b/libdd-crashtracker/src/receiver/receive_report.rs @@ -13,6 +13,7 @@ use crate::{ use anyhow::Context; use libdd_telemetry::data::LogLevel; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::AsyncBufReadExt; @@ -23,6 +24,7 @@ enum ReceiverIssue { ProcessLine, AttachAdditionalFile, IncompleteStacktrace, + UnexpectedLine, } impl ReceiverIssue { @@ -33,10 +35,26 @@ impl ReceiverIssue { ReceiverIssue::ProcessLine => "receiver_issue:process_line_error", ReceiverIssue::AttachAdditionalFile => "receiver_issue:attach_additional_file_error", ReceiverIssue::IncompleteStacktrace => "receiver_issue:incomplete_stacktrace", + ReceiverIssue::UnexpectedLine => "receiver_issue:unexpected_line", } } } +fn emit_debug_log( + logger: &Option>, + issue: ReceiverIssue, + crash_uuid: &str, + message: String, + level: LogLevel, +) { + if let Some(logger) = logger.as_ref().map(Arc::clone) { + let tags = format!("{},crash_uuid:{}", issue.tag(), crash_uuid); + tokio::spawn(async move { + let _ = logger.upload_general_log(message, tags, level).await; + }); + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] struct RuntimeStackFrame { #[serde(default, skip_serializing_if = "Option::is_none")] @@ -109,6 +127,7 @@ fn process_line( config: &mut Option, line: &str, state: StdinState, + telemetry_logger: &Option>, ) -> anyhow::Result { let next = match state { StdinState::AdditionalTags if line.starts_with(DD_CRASHTRACK_END_ADDITIONAL_TAGS) => { @@ -293,10 +312,15 @@ fn process_line( StdinState::Done } StdinState::Waiting => { - builder.with_log_message( - format!("Unexpected line while receiving crashreport: {line}"), - true, - )?; + let msg = format!("Unexpected line while receiving crashreport: {line}"); + builder.with_log_message(msg.clone(), true)?; + emit_debug_log( + telemetry_logger, + ReceiverIssue::UnexpectedLine, + &builder.uuid.to_string(), + msg, + LogLevel::Warn, + ); StdinState::Waiting } }; @@ -315,7 +339,7 @@ pub(crate) async fn receive_report_from_stream( let mut builder = CrashInfoBuilder::new(); let mut stdin_state = StdinState::Waiting; let mut config: Option = None; - let mut telemetry_logger: Option = None; + let mut telemetry_logger: Option> = None; let mut crash_ping_sent = false; @@ -330,7 +354,7 @@ pub(crate) async fn receive_report_from_stream( if telemetry_logger.is_none() { if let (Some(cfg), Some(md)) = (&config, builder.metadata.clone()) { if let Ok(logger) = TelemetryCrashUploader::new(&md, cfg.endpoint()) { - telemetry_logger = Some(logger); + telemetry_logger = Some(Arc::new(logger)); } } } @@ -359,19 +383,13 @@ pub(crate) async fn receive_report_from_stream( let next_line = tokio::time::timeout(remaining_timeout, lines.next_line()).await; let Ok(next_line) = next_line else { builder.with_log_message(format!("Timeout: {next_line:?}"), true)?; - if let Some(logger) = telemetry_logger.as_ref() { - let _ = logger - .upload_general_log( - format!("Timeout while waiting for crash report input: {next_line:?}"), - format!( - "{},crash_uuid:{}", - ReceiverIssue::Timeout.tag(), - builder.uuid - ), - LogLevel::Warn, - ) - .await; - } + emit_debug_log( + &telemetry_logger, + ReceiverIssue::Timeout, + &builder.uuid.to_string(), + format!("Timeout while waiting for crash report input: {next_line:?}"), + LogLevel::Warn, + ); break; }; let Ok(next_line) = next_line else { @@ -379,24 +397,24 @@ pub(crate) async fn receive_report_from_stream( // We ignore error from uploading the log to telemetry, because what are we going to do? // If upload is failing, its not worth the effort to retry the request so we should just // continue on. At least we will get the log message in the crash info - if let Some(logger) = telemetry_logger.as_ref() { - let _ = logger - .upload_general_log( - format!("IO error while reading crash report input: {next_line:?}"), - format!( - "{},crash_uuid:{}", - ReceiverIssue::IoError.tag(), - builder.uuid - ), - LogLevel::Warn, - ) - .await; - } + emit_debug_log( + &telemetry_logger, + ReceiverIssue::IoError, + &builder.uuid.to_string(), + format!("IO error while reading crash report input: {next_line:?}"), + LogLevel::Warn, + ); break; }; let Some(next_line) = next_line else { break }; - match process_line(&mut builder, &mut config, &next_line, stdin_state) { + match process_line( + &mut builder, + &mut config, + &next_line, + stdin_state, + &telemetry_logger, + ) { Ok(next_state) => { stdin_state = next_state; if matches!(stdin_state, StdinState::Done) { @@ -409,19 +427,13 @@ pub(crate) async fn receive_report_from_stream( format!("Unable to process line: {next_line}. Error: {e}"), true, )?; - if let Some(logger) = telemetry_logger.as_ref() { - let _ = logger - .upload_general_log( - format!("Unable to process line: {next_line}. Error: {e}"), - format!( - "{},crash_uuid:{}", - ReceiverIssue::ProcessLine.tag(), - builder.uuid - ), - LogLevel::Warn, - ) - .await; - } + emit_debug_log( + &telemetry_logger, + ReceiverIssue::ProcessLine, + &builder.uuid.to_string(), + format!("Unable to process line: {next_line}. Error: {e}"), + LogLevel::Warn, + ); break; } } @@ -446,41 +458,30 @@ pub(crate) async fn receive_report_from_stream( // Without a config, we don't even know the endpoint to transmit to. Not much to do to recover. let config = config.context("Missing crashtracker configuration")?; + for filename in config.additional_files() { if let Err(e) = builder.with_file(filename.clone()) { builder.with_log_message(e.to_string(), true)?; - if let Some(logger) = telemetry_logger.as_ref() { - let _ = logger - .upload_general_log( - format!("Unable to attach additional file {filename:?}: {e}"), - format!( - "{},crash_uuid:{}", - ReceiverIssue::AttachAdditionalFile.tag(), - builder.uuid - ), - LogLevel::Warn, - ) - .await; - } + emit_debug_log( + &telemetry_logger, + ReceiverIssue::AttachAdditionalFile, + &builder.uuid.to_string(), + format!("Unable to attach additional file {filename:?}: {e}"), + LogLevel::Warn, + ); } } let crash_info = builder.build()?; if crash_info.incomplete { - if let Some(logger) = telemetry_logger.as_ref() { - let _ = logger - .upload_general_log( - "CrashInfo stacktrace incomplete".to_string(), - format!( - "{},crash_uuid:{}", - ReceiverIssue::IncompleteStacktrace.tag(), - crash_info.uuid - ), - LogLevel::Warn, - ) - .await; - } + emit_debug_log( + &telemetry_logger, + ReceiverIssue::IncompleteStacktrace, + &crash_info.uuid, + "CrashInfo stacktrace incomplete".to_string(), + LogLevel::Warn, + ); } Ok(Some((config, crash_info)))