Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 55 additions & 6 deletions libdd-profiling-ffi/src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,41 @@ use libdd_common_ffi::{
wrap_with_ffi_result, wrap_with_void_ffi_result, Handle, Result, ToInner, VoidResult,
};
use libdd_profiling::exporter;
use libdd_profiling::exporter::{ExporterManager, MimeType, ProfileExporter};
use libdd_profiling::exporter::{ExporterManager, ProfileExporter};
use libdd_profiling::internal::EncodedProfile;
use std::borrow::Cow;
use std::str::FromStr;

type TokioCancellationToken = tokio_util::sync::CancellationToken;

/// MIME type for file attachments
/// Invalid (0) is the default and will cause an error if used
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub enum MimeType {
Invalid = 0,
ApplicationJson,
ApplicationOctetStream,
TextCsv,
TextPlain,
TextXml,
}

impl TryFrom<MimeType> for exporter::MimeType {
type Error = anyhow::Error;

fn try_from(mime: MimeType) -> std::result::Result<Self, Self::Error> {
match mime {
MimeType::Invalid => anyhow::bail!("Invalid MIME type"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind too much, but something that stood out to me:

  • For all prior releases, we didn't send mime types.
  • Now suddenly, it's an error to not send one?
  • Should we really fail, or do whatever we did before? Should exporter::MimeType be optional, and if it's None, we don't set it or whatever we did before? Then a MimeType.Invalid can map to None?
  • This match isn't exhaustive, because it's repr(C), I'd be fine with unrecognized, non-zero values being a failure (what is happening?!)

MimeType::ApplicationJson => Ok(exporter::MimeType::ApplicationJson),
MimeType::ApplicationOctetStream => Ok(exporter::MimeType::ApplicationOctetStream),
MimeType::TextCsv => Ok(exporter::MimeType::TextCsv),
MimeType::TextPlain => Ok(exporter::MimeType::TextPlain),
MimeType::TextXml => Ok(exporter::MimeType::TextXml),
}
}
}

#[allow(dead_code)]
#[repr(C)]
pub enum ProfilingEndpoint<'a> {
Expand Down Expand Up @@ -180,15 +208,15 @@ pub unsafe extern "C" fn ddog_prof_Exporter_drop(mut exporter: *mut Handle<Profi
drop(exporter.take())
}

unsafe fn into_vec_files<'a>(slice: Slice<'a, File>) -> Vec<exporter::File<'a>> {
unsafe fn into_vec_files<'a>(slice: Slice<'a, File>) -> anyhow::Result<Vec<exporter::File<'a>>> {
slice
.into_slice()
.iter()
.map(|file| {
let name = file.name.try_to_utf8().unwrap_or("{invalid utf-8}");
let bytes = file.file.as_slice();
let mime = file.mime;
exporter::File { name, bytes, mime }
let mime = file.mime.try_into()?;
Ok(exporter::File { name, bytes, mime })
})
.collect()
}
Expand Down Expand Up @@ -244,7 +272,7 @@ pub unsafe extern "C" fn ddog_prof_Exporter_send_blocking(
wrap_with_ffi_result!({
let exporter = exporter.to_inner_mut()?;
let profile = *profile.take()?;
let files_to_compress_and_export = into_vec_files(files_to_compress_and_export);
let files_to_compress_and_export = into_vec_files(files_to_compress_and_export)?;
let tags: Vec<Tag> = optional_additional_tags
.map(|tags| tags.iter().cloned().collect())
.unwrap_or_default();
Expand Down Expand Up @@ -398,7 +426,7 @@ pub unsafe extern "C" fn ddog_prof_ExporterManager_queue(
wrap_with_void_ffi_result!({
let manager = manager.to_inner_mut()?;
let profile = *profile.take()?;
let files_to_compress_and_export = into_vec_files(files_to_compress_and_export);
let files_to_compress_and_export = into_vec_files(files_to_compress_and_export)?;
let tags: Vec<Tag> = optional_additional_tags
.map(|tags| tags.iter().cloned().collect())
.unwrap_or_default();
Expand Down Expand Up @@ -728,4 +756,25 @@ mod tests {
}
}
}

#[test]
fn test_invalid_mime_type_returns_error() {
// Test that Invalid MIME type (default value 0) returns an error
let data = b"test data";
let file = File {
name: CharSlice::from("test.bin"),
file: ByteSlice::from(&data[..]),
mime: MimeType::Invalid,
};

let files_slice = unsafe { Slice::from_raw_parts(&file as *const File, 1) };
let result = unsafe { into_vec_files(files_slice) };

assert!(result.is_err(), "Invalid MIME type should return an error");
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("Invalid MIME type"),
"Error message should mention invalid MIME type, got: {error_msg}"
);
}
}
45 changes: 38 additions & 7 deletions libdd-profiling/src/exporter/profile_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::errors::SendError;
use super::file_exporter::spawn_dump_server;
use anyhow::Context;
use libdd_common::tag::Tag;
use libdd_common::{azure_app_services, tag, Endpoint};
use libdd_common::{azure_app_services, entity_id, header, tag, Endpoint};
use reqwest::RequestBuilder;
use serde_json::json;
use std::io::Write;
Expand All @@ -38,6 +38,16 @@ use tokio_util::sync::CancellationToken;
use crate::internal::{EncodedProfile, Profile};
use crate::profiles::{Compressor, DefaultProfileCodec};

/// Helper to create Content-Encoding: zstd headers for compressed multipart parts
fn create_zstd_headers() -> reqwest::header::HeaderMap {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::CONTENT_ENCODING,
reqwest::header::HeaderValue::from_static("zstd"),
);
headers
}

#[derive(Debug)]
pub struct ProfileExporter {
client: reqwest::Client,
Expand All @@ -48,7 +58,6 @@ pub struct ProfileExporter {
runtime: Option<Runtime>,
}

#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub enum MimeType {
ApplicationJson,
Expand Down Expand Up @@ -185,6 +194,22 @@ impl ProfileExporter {
);
}

// Add container ID header if available
if let Some(container_id) = entity_id::get_container_id() {
headers.insert(
header::DATADOG_CONTAINER_ID,
reqwest::header::HeaderValue::from_static(container_id),
);
}

// Add entity ID header if available
if let Some(entity_id_value) = entity_id::get_entity_id() {
headers.insert(
header::DATADOG_ENTITY_ID,
reqwest::header::HeaderValue::from_static(entity_id_value),
);
}

// Add Azure App Services tags if available
if let Some(aas) = &*azure_app_services::AAS_METADATA {
let aas_tags = [
Expand Down Expand Up @@ -413,10 +438,10 @@ impl ProfileExporter {
"event",
reqwest::multipart::Part::bytes(event_bytes)
.file_name("event.json")
.mime_str("application/json")?,
.mime_str(mime::APPLICATION_JSON.as_ref())?,
);

// Add additional files (compressed)
// Add additional files (compressed with zstd)
for file in additional_files {
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
(file.bytes.len() >> 3).next_power_of_two(),
Expand All @@ -428,14 +453,20 @@ impl ProfileExporter {

form = form.part(
file.name.to_string(),
reqwest::multipart::Part::bytes(encoder.finish()?).file_name(file.name.to_string()),
reqwest::multipart::Part::bytes(encoder.finish()?)
.file_name(file.name.to_string())
.mime_str(file.mime.as_str())?
.headers(create_zstd_headers()),
);
}

// Add profile
// Add profile (already compressed with zstd)
Ok(form.part(
"profile.pprof",
reqwest::multipart::Part::bytes(profile.buffer).file_name("profile.pprof"),
reqwest::multipart::Part::bytes(profile.buffer)
.file_name("profile.pprof")
.mime_str(mime::APPLICATION_OCTET_STREAM.as_ref())?
.headers(create_zstd_headers()),
))
}
}
2 changes: 1 addition & 1 deletion libdd-profiling/src/exporter/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn parse_multipart(content_type: &str, body: &[u8]) -> anyhow::Result<Vec<Multip
.context("No boundary parameter found in Content-Type")?
.as_str();

// Parse multipart body
// Parse multipart body using the library
let cursor = Cursor::new(body);
let mut multipart = Multipart::with_body(cursor, boundary);
let mut parts = Vec::new();
Expand Down
150 changes: 150 additions & 0 deletions libdd-profiling/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Common test utilities shared across exporter tests

#![allow(dead_code)] // Test utilities are used across test modules

use libdd_profiling::exporter::utils::{parse_http_request, HttpRequest, MultipartPart};
use libdd_profiling::exporter::{File, MimeType, ProfileExporter};
use std::path::PathBuf;

/// Test constants
pub const TEST_LIB_NAME: &str = "dd-trace-foo";
pub const TEST_LIB_VERSION: &str = "1.2.3";
pub const FILE_WRITE_DELAY_MS: u64 = 200;

/// RAII guard to ensure test files are cleaned up even if the test panics
pub struct TempFileGuard(PathBuf);

impl Drop for TempFileGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}

impl std::ops::Deref for TempFileGuard {
type Target = PathBuf;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl AsRef<std::path::Path> for TempFileGuard {
fn as_ref(&self) -> &std::path::Path {
self.0.as_ref()
}
}

/// Create a file-based exporter and return the temp file path with auto-cleanup
pub fn create_file_exporter(
profiling_library_name: &str,
profiling_library_version: &str,
family: &str,
tags: Vec<libdd_common::tag::Tag>,
api_key: Option<&str>,
) -> anyhow::Result<(ProfileExporter, TempFileGuard)> {
use libdd_profiling::exporter::config;

// Create a unique temp file path
let file_path = std::env::temp_dir().join(format!(
"libdd_test_{}_{}_{:x}.http",
std::process::id(),
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
rand::random::<u64>()
));

let mut endpoint = config::file(file_path.to_string_lossy().as_ref())?;
if let Some(key) = api_key {
endpoint.api_key = Some(key.to_string().into());
}

let exporter = ProfileExporter::new(
profiling_library_name,
profiling_library_version,
family,
tags,
endpoint,
)?;

Ok((exporter, TempFileGuard(file_path)))
}

/// Read and parse the dumped HTTP request file
pub fn read_and_parse_request(file_path: &std::path::Path) -> anyhow::Result<HttpRequest> {
// Wait for file to be written
std::thread::sleep(std::time::Duration::from_millis(FILE_WRITE_DELAY_MS));
let request_bytes = std::fs::read(file_path)?;
parse_http_request(&request_bytes)
}

/// Extract and parse the event.json part from multipart request
pub fn extract_event_json(request: &HttpRequest) -> anyhow::Result<serde_json::Value> {
let event_part = request
.multipart_parts
.iter()
.find(|p| p.filename.as_deref() == Some("event.json"))
.ok_or_else(|| anyhow::anyhow!("event.json part not found"))?;

Ok(serde_json::from_slice(&event_part.content)?)
}

/// Create standard test additional files with different MIME types
pub fn create_test_additional_files() -> Vec<File<'static>> {
vec![
File {
name: "jit.pprof",
bytes: b"fake-jit-data",
mime: MimeType::ApplicationOctetStream,
},
File {
name: "metadata.json",
bytes: b"{\"test\": true}",
mime: MimeType::ApplicationJson,
},
]
}

/// Assert that a multipart part has the expected MIME type
pub fn assert_mime_type(parts: &[MultipartPart], part_name: &str, expected_mime: &str) {
let part = parts
.iter()
.find(|p| p.name == part_name)
.unwrap_or_else(|| panic!("{} part should exist", part_name));
assert_eq!(
part.content_type.as_deref(),
Some(expected_mime),
"{} should have {} content type",
part_name,
expected_mime
);
}

/// Assert all standard MIME types for a complete export
/// (event, profile.pprof, jit.pprof, metadata.json)
pub fn assert_all_standard_mime_types(parts: &[MultipartPart]) {
assert_mime_type(parts, "event", "application/json");
assert_mime_type(parts, "profile.pprof", "application/octet-stream");
assert_mime_type(parts, "jit.pprof", "application/octet-stream");
assert_mime_type(parts, "metadata.json", "application/json");
}

/// Assert that the expected number of parts have Content-Encoding: zstd
///
/// Note: Due to limitations in the multipart parsing library, we verify this by
/// checking the raw HTTP request body contains the header string.
pub fn assert_compressed_parts_have_encoding(request: &HttpRequest, expected_count: usize) {
// Convert body to string to search for Content-Encoding headers
let body_str = String::from_utf8_lossy(&request.body);

// Verify Content-Encoding headers are present for compressed parts
// Each part should have this header in its headers section
let encoding_count = body_str.matches("content-encoding: zstd").count()
+ body_str.matches("Content-Encoding: zstd").count();

assert_eq!(
encoding_count, expected_count,
"Expected exactly {expected_count} Content-Encoding: zstd headers in multipart body, found {encoding_count}"
);
}
Loading
Loading