Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libdd-crashtracker/src/crash_info/errors_intake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl ErrorsIntakeUploader {
self.send_payload(&payload).await
}

pub async fn upload_to_errors_intake(&self, crash_info: &CrashInfo) -> anyhow::Result<()> {
pub async fn upload_crash_info(&self, crash_info: &CrashInfo) -> anyhow::Result<()> {
let payload = ErrorsIntakePayload::from_crash_info(crash_info)?;
self.send_payload(&payload).await
}
Expand Down
4 changes: 2 additions & 2 deletions libdd-crashtracker/src/crash_info/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ impl CrashInfo {

async fn upload_to_telemetry(&self, endpoint: &Option<Endpoint>) -> anyhow::Result<()> {
let uploader = TelemetryCrashUploader::new(&self.metadata, endpoint)?;
uploader.upload_to_telemetry(self).await?;
uploader.upload_crash_info(self).await?;
Ok(())
}

async fn upload_to_errors_intake(&self, endpoint: &Option<Endpoint>) -> anyhow::Result<()> {
let uploader = ErrorsIntakeUploader::new(endpoint)?;
if uploader.is_enabled() {
uploader.upload_to_errors_intake(self).await?;
uploader.upload_crash_info(self).await?;
}
Ok(())
}
Expand Down
144 changes: 105 additions & 39 deletions libdd-crashtracker/src/crash_info/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,23 @@ impl TelemetryCrashUploader {
Ok(s)
}

/// Send a general (non crash report, non crash ping) log message to the telemetry log intake.
pub async fn upload_general_log(
&self,
message: String,
tags: String,
level: LogLevel,
) -> anyhow::Result<()> {
let tracer_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);

self.send_log_payload(message, tags, tracer_time, level, false, false)
.await
}

/// Send a crash ping telemetry log to indicate that crash processing has started
pub async fn upload_crash_ping(&self, crash_ping: &CrashPing) -> anyhow::Result<()> {
let tags = self.build_crash_ping_tags(crash_ping.crash_uuid(), crash_ping.siginfo());
let tracer_time = SystemTime::now()
Expand All @@ -255,44 +272,8 @@ impl TelemetryCrashUploader {
.await
}

fn build_crash_ping_tags(&self, crash_uuid: &str, sig_info: Option<&SigInfo>) -> String {
let metadata = &self.metadata;
let mut tags = format!(
"uuid:{},is_crash_ping:true,service:{},language_name:{},language_version:{},tracer_version:{}",
crash_uuid,
metadata.application.service_name,
metadata.application.language_name,
metadata.application.language_version,
metadata.application.tracer_version
);

if let Some(sig_info) = sig_info {
tags.push_str(&format!(
",si_code_human_readable:{:?},si_signo:{},si_signo_human_readable:{:?}",
sig_info.si_code_human_readable,
sig_info.si_signo,
sig_info.si_signo_human_readable
));
}

self.append_optional_tags(&mut tags);
tags
}

fn append_optional_tags(&self, tags: &mut String) {
let metadata = &self.metadata;
if let Some(env) = &metadata.application.env {
tags.push_str(&format!(",env:{env}"));
}
if let Some(runtime_name) = &metadata.application.runtime_name {
tags.push_str(&format!(",runtime_name:{runtime_name}"));
}
if let Some(runtime_version) = &metadata.application.runtime_version {
tags.push_str(&format!(",runtime_version:{runtime_version}"));
}
}

pub async fn upload_to_telemetry(&self, crash_info: &CrashInfo) -> anyhow::Result<()> {
/// Send a crash info telemetry log to indicate that crash processing has completed
pub async fn upload_crash_info(&self, crash_info: &CrashInfo) -> anyhow::Result<()> {
let message = serde_json::to_string(crash_info)?;
let tags = extract_crash_info_tags(crash_info).unwrap_or_default();
let tracer_time = crash_info.timestamp.parse::<DateTime<Utc>>().map_or_else(
Expand All @@ -316,6 +297,8 @@ impl TelemetryCrashUploader {
.await
}

/// Shared helper that builds `data::Telemetry` payload and calls `send_telemetry_payload`
/// to send the payload to the telemetry log intake.
async fn send_log_payload(
&self,
message: String,
Expand Down Expand Up @@ -347,6 +330,7 @@ impl TelemetryCrashUploader {
self.send_telemetry_payload(&payload).await
}

/// Helper to perform actual HTTP (or file) submission via configured telemetry client
async fn send_telemetry_payload(&self, payload: &data::Telemetry<'_>) -> anyhow::Result<()> {
let client = libdd_telemetry::worker::http_client::from_config(&self.cfg);
let req = request_builder(&self.cfg)?
Expand Down Expand Up @@ -379,6 +363,43 @@ impl TelemetryCrashUploader {

Ok(())
}

fn build_crash_ping_tags(&self, crash_uuid: &str, sig_info: Option<&SigInfo>) -> String {
let metadata = &self.metadata;
let mut tags = format!(
"uuid:{},is_crash_ping:true,service:{},language_name:{},language_version:{},tracer_version:{}",
crash_uuid,
metadata.application.service_name,
metadata.application.language_name,
metadata.application.language_version,
metadata.application.tracer_version
);

if let Some(sig_info) = sig_info {
tags.push_str(&format!(
",si_code_human_readable:{:?},si_signo:{},si_signo_human_readable:{:?}",
sig_info.si_code_human_readable,
sig_info.si_signo,
sig_info.si_signo_human_readable
));
}

self.append_optional_tags(&mut tags);
tags
}

fn append_optional_tags(&self, tags: &mut String) {
let metadata = &self.metadata;
if let Some(env) = &metadata.application.env {
tags.push_str(&format!(",env:{env}"));
}
if let Some(runtime_name) = &metadata.application.runtime_name {
tags.push_str(&format!(",runtime_name:{runtime_name}"));
}
if let Some(runtime_version) = &metadata.application.runtime_version {
tags.push_str(&format!(",runtime_version:{runtime_version}"));
}
}
}

fn extract_crash_info_tags(crash_info: &CrashInfo) -> anyhow::Result<String> {
Expand Down Expand Up @@ -423,6 +444,7 @@ mod tests {
use super::TelemetryCrashUploader;
use crate::crash_info::{test_utils::TestInstance, CrashInfo, CrashInfoBuilder, Metadata};
use libdd_common::Endpoint;
use libdd_telemetry::data::LogLevel;
use std::{collections::HashSet, fs};
use uuid::Uuid;

Expand Down Expand Up @@ -469,7 +491,7 @@ mod tests {
.unwrap();
let test_instance = super::CrashInfo::test_instance(seed);

t.upload_to_telemetry(&test_instance).await.unwrap();
t.upload_crash_info(&test_instance).await.unwrap();

let payload: serde_json::value::Value =
serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap()).unwrap();
Expand Down Expand Up @@ -877,4 +899,48 @@ mod tests {
.contains("crash processing started"));
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_general_log_upload() -> anyhow::Result<()> {
let tmp = tempfile::tempdir().unwrap();
let output_filename = {
let mut p = tmp.keep();
p.push("general_log_upload");
p
};

let mut uploader = new_test_uploader(7);
uploader
.cfg
.set_host_from_url(&format!("file://{}", output_filename.to_str().unwrap()))?;

uploader
.upload_general_log(
"hello general log".to_string(),
"service:foo,env:bar,crash_uuid:1234567890".to_string(),
LogLevel::Warn,
)
.await?;

let payload: serde_json::value::Value =
serde_json::de::from_str(&fs::read_to_string(&output_filename).unwrap())?;
println!("payload: {:?}", payload.to_string());

assert_eq!(payload["api_version"], "v2");
assert_eq!(payload["request_type"], "logs");
assert_eq!(payload["origin"], "Crashtracker");

let log_entry = &payload["payload"][0];
assert_eq!(log_entry["level"], "WARN");
assert_eq!(log_entry["is_sensitive"], false);
assert_eq!(log_entry["is_crash"], false);
assert_eq!(log_entry["message"], "hello general log");
let tags = log_entry["tags"].as_str().unwrap();
assert!(tags.contains("service:foo"));
assert!(tags.contains("env:bar"));
assert!(tags.contains("crash_uuid:1234567890"));

Ok(())
}
}
Loading
Loading