diff --git a/libdd-crashtracker/src/crash_info/errors_intake.rs b/libdd-crashtracker/src/crash_info/errors_intake.rs index 8e10e321cf..f2962af00f 100644 --- a/libdd-crashtracker/src/crash_info/errors_intake.rs +++ b/libdd-crashtracker/src/crash_info/errors_intake.rs @@ -521,7 +521,7 @@ impl ErrorsIntakeUploader { self.send_payload(&payload).await } - pub async fn upload_to_errors_intake(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { + pub async fn upload_crash_info(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { let payload = ErrorsIntakePayload::from_crash_info(crash_info)?; self.send_payload(&payload).await } diff --git a/libdd-crashtracker/src/crash_info/mod.rs b/libdd-crashtracker/src/crash_info/mod.rs index acb5df93c3..ffe3cf4b1d 100644 --- a/libdd-crashtracker/src/crash_info/mod.rs +++ b/libdd-crashtracker/src/crash_info/mod.rs @@ -154,14 +154,14 @@ impl CrashInfo { async fn upload_to_telemetry(&self, endpoint: &Option) -> anyhow::Result<()> { let uploader = TelemetryCrashUploader::new(&self.metadata, endpoint)?; - uploader.upload_to_telemetry(self).await?; + uploader.upload_crash_info(self).await?; Ok(()) } async fn upload_to_errors_intake(&self, endpoint: &Option) -> anyhow::Result<()> { let uploader = ErrorsIntakeUploader::new(endpoint)?; if uploader.is_enabled() { - uploader.upload_to_errors_intake(self).await?; + uploader.upload_crash_info(self).await?; } Ok(()) } diff --git a/libdd-crashtracker/src/crash_info/telemetry.rs b/libdd-crashtracker/src/crash_info/telemetry.rs index ac1bee1a7a..d54d8a8925 100644 --- a/libdd-crashtracker/src/crash_info/telemetry.rs +++ b/libdd-crashtracker/src/crash_info/telemetry.rs @@ -236,6 +236,23 @@ impl TelemetryCrashUploader { Ok(s) } + /// Send a general (non crash report, non crash ping) log message to the telemetry log intake. + pub async fn upload_general_log( + &self, + message: String, + tags: String, + level: LogLevel, + ) -> anyhow::Result<()> { + let tracer_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + self.send_log_payload(message, tags, tracer_time, level, false, false) + .await + } + + /// Send a crash ping telemetry log to indicate that crash processing has started pub async fn upload_crash_ping(&self, crash_ping: &CrashPing) -> anyhow::Result<()> { let tags = self.build_crash_ping_tags(crash_ping.crash_uuid(), crash_ping.siginfo()); let tracer_time = SystemTime::now() @@ -255,44 +272,8 @@ impl TelemetryCrashUploader { .await } - fn build_crash_ping_tags(&self, crash_uuid: &str, sig_info: Option<&SigInfo>) -> String { - let metadata = &self.metadata; - let mut tags = format!( - "uuid:{},is_crash_ping:true,service:{},language_name:{},language_version:{},tracer_version:{}", - crash_uuid, - metadata.application.service_name, - metadata.application.language_name, - metadata.application.language_version, - metadata.application.tracer_version - ); - - if let Some(sig_info) = sig_info { - tags.push_str(&format!( - ",si_code_human_readable:{:?},si_signo:{},si_signo_human_readable:{:?}", - sig_info.si_code_human_readable, - sig_info.si_signo, - sig_info.si_signo_human_readable - )); - } - - self.append_optional_tags(&mut tags); - tags - } - - fn append_optional_tags(&self, tags: &mut String) { - let metadata = &self.metadata; - if let Some(env) = &metadata.application.env { - tags.push_str(&format!(",env:{env}")); - } - if let Some(runtime_name) = &metadata.application.runtime_name { - tags.push_str(&format!(",runtime_name:{runtime_name}")); - } - if let Some(runtime_version) = &metadata.application.runtime_version { - tags.push_str(&format!(",runtime_version:{runtime_version}")); - } - } - - pub async fn upload_to_telemetry(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { + /// Send a crash info telemetry log to indicate that crash processing has completed + pub async fn upload_crash_info(&self, crash_info: &CrashInfo) -> anyhow::Result<()> { let message = serde_json::to_string(crash_info)?; let tags = extract_crash_info_tags(crash_info).unwrap_or_default(); let tracer_time = crash_info.timestamp.parse::>().map_or_else( @@ -316,6 +297,8 @@ impl TelemetryCrashUploader { .await } + /// Shared helper that builds `data::Telemetry` payload and calls `send_telemetry_payload` + /// to send the payload to the telemetry log intake. async fn send_log_payload( &self, message: String, @@ -347,6 +330,7 @@ impl TelemetryCrashUploader { self.send_telemetry_payload(&payload).await } + /// Helper to perform actual HTTP (or file) submission via configured telemetry client async fn send_telemetry_payload(&self, payload: &data::Telemetry<'_>) -> anyhow::Result<()> { let client = libdd_telemetry::worker::http_client::from_config(&self.cfg); let req = request_builder(&self.cfg)? @@ -379,6 +363,43 @@ impl TelemetryCrashUploader { Ok(()) } + + fn build_crash_ping_tags(&self, crash_uuid: &str, sig_info: Option<&SigInfo>) -> String { + let metadata = &self.metadata; + let mut tags = format!( + "uuid:{},is_crash_ping:true,service:{},language_name:{},language_version:{},tracer_version:{}", + crash_uuid, + metadata.application.service_name, + metadata.application.language_name, + metadata.application.language_version, + metadata.application.tracer_version + ); + + if let Some(sig_info) = sig_info { + tags.push_str(&format!( + ",si_code_human_readable:{:?},si_signo:{},si_signo_human_readable:{:?}", + sig_info.si_code_human_readable, + sig_info.si_signo, + sig_info.si_signo_human_readable + )); + } + + self.append_optional_tags(&mut tags); + tags + } + + fn append_optional_tags(&self, tags: &mut String) { + let metadata = &self.metadata; + if let Some(env) = &metadata.application.env { + tags.push_str(&format!(",env:{env}")); + } + if let Some(runtime_name) = &metadata.application.runtime_name { + tags.push_str(&format!(",runtime_name:{runtime_name}")); + } + if let Some(runtime_version) = &metadata.application.runtime_version { + tags.push_str(&format!(",runtime_version:{runtime_version}")); + } + } } fn extract_crash_info_tags(crash_info: &CrashInfo) -> anyhow::Result { @@ -423,6 +444,7 @@ mod tests { use super::TelemetryCrashUploader; use crate::crash_info::{test_utils::TestInstance, CrashInfo, CrashInfoBuilder, Metadata}; use libdd_common::Endpoint; + use libdd_telemetry::data::LogLevel; use std::{collections::HashSet, fs}; use uuid::Uuid; @@ -469,7 +491,7 @@ mod tests { .unwrap(); let test_instance = super::CrashInfo::test_instance(seed); - t.upload_to_telemetry(&test_instance).await.unwrap(); + t.upload_crash_info(&test_instance).await.unwrap(); let payload: serde_json::value::Value = serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap()).unwrap(); @@ -877,4 +899,48 @@ mod tests { .contains("crash processing started")); Ok(()) } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_general_log_upload() -> anyhow::Result<()> { + let tmp = tempfile::tempdir().unwrap(); + let output_filename = { + let mut p = tmp.keep(); + p.push("general_log_upload"); + p + }; + + let mut uploader = new_test_uploader(7); + uploader + .cfg + .set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap()))?; + + uploader + .upload_general_log( + "hello general log".to_string(), + "service:foo,env:bar,crash_uuid:1234567890".to_string(), + LogLevel::Warn, + ) + .await?; + + let payload: serde_json::value::Value = + serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap())?; + println!("payload: {:?}", payload.to_string()); + + assert_eq!(payload["api_version"], "v2"); + assert_eq!(payload["request_type"], "logs"); + assert_eq!(payload["origin"], "Crashtracker"); + + let log_entry = &payload["payload"][0]; + assert_eq!(log_entry["level"], "WARN"); + assert_eq!(log_entry["is_sensitive"], false); + assert_eq!(log_entry["is_crash"], false); + assert_eq!(log_entry["message"], "hello general log"); + let tags = log_entry["tags"].as_str().unwrap(); + assert!(tags.contains("service:foo")); + assert!(tags.contains("env:bar")); + assert!(tags.contains("crash_uuid:1234567890")); + + Ok(()) + } } diff --git a/libdd-crashtracker/src/receiver/receive_report.rs b/libdd-crashtracker/src/receiver/receive_report.rs index 0bac778d57..5f44cb0e59 100644 --- a/libdd-crashtracker/src/receiver/receive_report.rs +++ b/libdd-crashtracker/src/receiver/receive_report.rs @@ -2,16 +2,59 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - crash_info::{CrashInfo, CrashInfoBuilder, ErrorKind, SigInfo, Span, StackFrame}, + crash_info::{ + CrashInfo, CrashInfoBuilder, ErrorKind, SigInfo, Span, StackFrame, TelemetryCrashUploader, + }, runtime_callback::RuntimeStack, shared::constants::*, CrashtrackerConfiguration, }; + use anyhow::Context; +use libdd_telemetry::data::LogLevel; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::io::AsyncBufReadExt; +#[derive(Debug)] +enum ReceiverIssue { + Timeout, + IoError, + ProcessLine, + AttachAdditionalFile, + IncompleteStacktrace, + UnexpectedLine, +} + +impl ReceiverIssue { + fn tag(&self) -> &'static str { + match self { + ReceiverIssue::Timeout => "receiver_issue:timeout", + ReceiverIssue::IoError => "receiver_issue:io_error", + ReceiverIssue::ProcessLine => "receiver_issue:process_line_error", + ReceiverIssue::AttachAdditionalFile => "receiver_issue:attach_additional_file_error", + ReceiverIssue::IncompleteStacktrace => "receiver_issue:incomplete_stacktrace", + ReceiverIssue::UnexpectedLine => "receiver_issue:unexpected_line", + } + } +} + +fn emit_debug_log( + logger: &Option>, + issue: ReceiverIssue, + crash_uuid: &str, + message: String, + level: LogLevel, +) { + if let Some(logger) = logger.as_ref().map(Arc::clone) { + let tags = format!("{},crash_uuid:{}", issue.tag(), crash_uuid); + tokio::spawn(async move { + let _ = logger.upload_general_log(message, tags, level).await; + }); + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] struct RuntimeStackFrame { #[serde(default, skip_serializing_if = "Option::is_none")] @@ -84,6 +127,7 @@ fn process_line( config: &mut Option, line: &str, state: StdinState, + telemetry_logger: &Option>, ) -> anyhow::Result { let next = match state { StdinState::AdditionalTags if line.starts_with(DD_CRASHTRACK_END_ADDITIONAL_TAGS) => { @@ -268,10 +312,15 @@ fn process_line( StdinState::Done } StdinState::Waiting => { - builder.with_log_message( - format!("Unexpected line while receiving crashreport: {line}"), - true, - )?; + let msg = format!("Unexpected line while receiving crashreport: {line}"); + builder.with_log_message(msg.clone(), true)?; + emit_debug_log( + telemetry_logger, + ReceiverIssue::UnexpectedLine, + &builder.uuid.to_string(), + msg, + LogLevel::Warn, + ); StdinState::Waiting } }; @@ -290,6 +339,7 @@ pub(crate) async fn receive_report_from_stream( let mut builder = CrashInfoBuilder::new(); let mut stdin_state = StdinState::Waiting; let mut config: Option = None; + let mut telemetry_logger: Option> = None; let mut crash_ping_sent = false; @@ -300,6 +350,15 @@ pub(crate) async fn receive_report_from_stream( //TODO: This assumes that the input is valid UTF-8. loop { + // Initialize telemetry logger once we have both config and metadata. + if telemetry_logger.is_none() { + if let (Some(cfg), Some(md)) = (&config, builder.metadata.clone()) { + if let Ok(logger) = TelemetryCrashUploader::new(&md, cfg.endpoint()) { + telemetry_logger = Some(Arc::new(logger)); + } + } + } + // We need to wait until at least we receive config, metadata, and siginfo (on non-Windows // platforms) before sending the crash ping if !crash_ping_sent && builder.is_ping_ready() { @@ -324,15 +383,38 @@ pub(crate) async fn receive_report_from_stream( let next_line = tokio::time::timeout(remaining_timeout, lines.next_line()).await; let Ok(next_line) = next_line else { builder.with_log_message(format!("Timeout: {next_line:?}"), true)?; + emit_debug_log( + &telemetry_logger, + ReceiverIssue::Timeout, + &builder.uuid.to_string(), + format!("Timeout while waiting for crash report input: {next_line:?}"), + LogLevel::Warn, + ); break; }; let Ok(next_line) = next_line else { builder.with_log_message(format!("IO Error: {next_line:?}"), true)?; + // We ignore error from uploading the log to telemetry, because what are we going to do? + // If upload is failing, its not worth the effort to retry the request so we should just + // continue on. At least we will get the log message in the crash info + emit_debug_log( + &telemetry_logger, + ReceiverIssue::IoError, + &builder.uuid.to_string(), + format!("IO error while reading crash report input: {next_line:?}"), + LogLevel::Warn, + ); break; }; let Some(next_line) = next_line else { break }; - match process_line(&mut builder, &mut config, &next_line, stdin_state) { + match process_line( + &mut builder, + &mut config, + &next_line, + stdin_state, + &telemetry_logger, + ) { Ok(next_state) => { stdin_state = next_state; if matches!(stdin_state, StdinState::Done) { @@ -345,6 +427,13 @@ pub(crate) async fn receive_report_from_stream( format!("Unable to process line: {next_line}. Error: {e}"), true, )?; + emit_debug_log( + &telemetry_logger, + ReceiverIssue::ProcessLine, + &builder.uuid.to_string(), + format!("Unable to process line: {next_line}. Error: {e}"), + LogLevel::Warn, + ); break; } } @@ -369,13 +458,31 @@ pub(crate) async fn receive_report_from_stream( // Without a config, we don't even know the endpoint to transmit to. Not much to do to recover. let config = config.context("Missing crashtracker configuration")?; + for filename in config.additional_files() { if let Err(e) = builder.with_file(filename.clone()) { builder.with_log_message(e.to_string(), true)?; + emit_debug_log( + &telemetry_logger, + ReceiverIssue::AttachAdditionalFile, + &builder.uuid.to_string(), + format!("Unable to attach additional file {filename:?}: {e}"), + LogLevel::Warn, + ); } } let crash_info = builder.build()?; + if crash_info.incomplete { + emit_debug_log( + &telemetry_logger, + ReceiverIssue::IncompleteStacktrace, + &crash_info.uuid, + "CrashInfo stacktrace incomplete".to_string(), + LogLevel::Warn, + ); + } + Ok(Some((config, crash_info))) }