Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ jobs:
shell: bash
run: chmod +x ./scripts/install-protoc.sh && ./scripts/install-protoc.sh $HOME
- shell: bash
run: cargo nextest run --workspace
run: |
if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then
cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes
else
cargo nextest run --workspace
fi
9 changes: 8 additions & 1 deletion crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,20 @@ async fn start_dogstatsd(
// 2. Start the aggregator service in the background
tokio::spawn(service.run());

#[cfg(all(windows, feature = "windows-pipes"))]
let dogstatsd_config = DogStatsDConfig {
host: AGENT_HOST.to_string(),
port,
metric_namespace,
#[cfg(all(windows, feature = "windows-pipes"))]
windows_pipe_name,
};

#[cfg(not(all(windows, feature = "windows-pipes")))]
let dogstatsd_config = DogStatsDConfig {
host: AGENT_HOST.to_string(),
port,
metric_namespace,
};
Comment thread
duncanpharvey marked this conversation as resolved.
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();

// 3. Use handle in DogStatsD (cheap to clone)
Expand Down
36 changes: 36 additions & 0 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,39 @@ mod tests {
assert_eq!(config.tags.function_tags(), None);
}
}

/// Test helpers for creating Config instances in tests
#[cfg(any(test, debug_assertions))]
pub mod test_helpers {
use super::*;

/// Create a test config with TCP transport
#[allow(clippy::unwrap_used)]
pub fn create_tcp_test_config(port: u16) -> Config {
Config {
dd_site: "mock-datadoghq.com".to_string(),
dd_apm_receiver_port: port,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
dd_apm_windows_pipe_name: None,
dd_dogstatsd_port: 8125,
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
dd_dogstatsd_windows_pipe_name: None,
env_type: trace_utils::EnvironmentType::AzureFunction,
app_name: Some("test-app".to_string()),
max_request_content_length: 10_000_000,
obfuscation_config: obfuscation_config::ObfuscationConfig::new().unwrap(),
os: std::env::consts::OS.to_string(),
tags: Tags::new(),
stats_flush_interval_secs: 10,
trace_flush_interval_secs: 5,
trace_intake: Endpoint::default(),
trace_stats_intake: Endpoint::default(),
profiling_intake: Endpoint::default(),
proxy_request_timeout_secs: 30,
proxy_request_max_retries: 3,
proxy_request_retry_backoff_base_ms: 100,
verify_env_timeout_ms: 1000,
proxy_url: None,
}
}
}
149 changes: 149 additions & 0 deletions crates/datadog-trace-agent/tests/common/mock_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Simple mock HTTP server for testing flushers

use http_body_util::BodyExt;
use hyper::{body::Incoming, Request, Response};
use hyper_util::rt::TokioIo;
use libdd_common::hyper_migration;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;

#[derive(Clone, Debug)]
pub struct ReceivedRequest {
pub method: String,
pub path: String,
pub headers: Vec<(String, String)>,
pub body: Vec<u8>,
}

pub struct MockServer {
pub addr: SocketAddr,
pub received_requests: Arc<Mutex<Vec<ReceivedRequest>>>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}

impl MockServer {
/// Start a mock HTTP server on a random port
pub async fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind mock server");
let addr = listener.local_addr().expect("Failed to get local addr");

let received_requests = Arc::new(Mutex::new(Vec::new()));
let requests_clone = received_requests.clone();

let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
loop {
tokio::select! {
result = listener.accept() => {
let (stream, _) = match result {
Ok(conn) => conn,
Err(e) => {
eprintln!("Mock server accept error: {}", e);
break;
}
};

let io = TokioIo::new(stream);
let requests = requests_clone.clone();

tokio::spawn(async move {
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
let requests = requests.clone();
async move {
// Capture the request
let method = req.method().to_string();
let path = req.uri().path().to_string();
let headers: Vec<(String, String)> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();

// Read the body
let body_bytes = req
.into_body()
.collect()
.await
.map(|collected| collected.to_bytes().to_vec())
.unwrap_or_default();

// Store the request
requests.lock().unwrap().push(ReceivedRequest {
method,
path,
headers,
body: body_bytes,
});

// Return 200 OK
Ok::<_, hyper::http::Error>(
Response::builder()
.status(200)
.body(hyper_migration::Body::from(r#"{"ok":true}"#))
.unwrap(),
)
}
});

let _ = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service)
.await;
});
}
_ = &mut shutdown_rx => {
break;
}
}
}
});

MockServer {
addr,
received_requests,
shutdown_tx: Some(shutdown_tx),
}
}

/// Get the base URL of the mock server
pub fn url(&self) -> String {
format!("http://{}", self.addr)
}

/// Get all received requests
#[allow(dead_code)]
pub fn get_requests(&self) -> Vec<ReceivedRequest> {
self.received_requests.lock().unwrap().clone()
}

/// Get requests matching a path
pub fn get_requests_for_path(&self, path: &str) -> Vec<ReceivedRequest> {
self.received_requests
.lock()
.unwrap()
.iter()
.filter(|req| req.path == path)
.cloned()
.collect()
}

/// Clear all received requests
#[allow(dead_code)]
pub fn clear_requests(&self) {
self.received_requests.lock().unwrap().clear();
}
}

impl Drop for MockServer {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
1 change: 1 addition & 0 deletions crates/datadog-trace-agent/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
//! Common test utilities, mocks, and helpers for integration tests

pub mod helpers;
pub mod mock_server;
pub mod mocks;
Loading
Loading