Skip to content
118 changes: 117 additions & 1 deletion bottlecap/src/traces/trace_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const DSM_ENDPOINT_PATH: &str = "/api/v0.1/pipeline_stats";
const DSM_AGENT_PATH: &str = "/v0.1/pipeline_stats";
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
const PROFILING_BACKEND_PATH: &str = "/api/v2/profile";
const LLM_OBS_SPANS_INTAKE_PATH: &str = "/api/v2/llmobs";
const LLM_OBS_EVAL_METRIC_INTAKE_PATH: &str = "/api/intake/llm-obs/v1/eval-metric";
const LLM_OBS_EVAL_METRIC_INTAKE_PATH_V2: &str = "/api/intake/llm-obs/v2/eval-metric";
const LLM_OBS_EVAL_METRIC_ENDPOINT_PATH: &str = "/evp_proxy/v2/api/intake/llm-obs/v1/eval-metric";
const LLM_OBS_EVAL_METRIC_ENDPOINT_PATH_V2: &str =
"/evp_proxy/v2/api/intake/llm-obs/v2/eval-metric";
const LLM_OBS_SPANS_ENDPOINT_PATH: &str = "/evp_proxy/v2/api/v2/llmobs";
const DD_ADDITIONAL_TAGS_HEADER: &str = "X-Datadog-Additional-Tags";
const INFO_ENDPOINT_PATH: &str = "/info";
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
Expand Down Expand Up @@ -181,6 +188,7 @@ impl TraceAgent {
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::pedantic)]
async fn trace_endpoint_handler(
config: Arc<config::Config>,
req: Request<Body>,
Expand Down Expand Up @@ -257,6 +265,51 @@ impl TraceAgent {
),
}
}
(&Method::POST, LLM_OBS_EVAL_METRIC_ENDPOINT_PATH) => {
match Self::handle_llm_obs_eval_metric_proxy(
config,
tags_provider,
api_key,
client,
req,
)
.await
{
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
&format!("LLM OBS Eval Metric endpoint error: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(&Method::POST, LLM_OBS_EVAL_METRIC_ENDPOINT_PATH_V2) => {
match Self::handle_llm_obs_eval_metric_proxy_v2(
config,
tags_provider,
api_key,
client,
req,
)
.await
{
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
&format!("LLM OBS Eval Metric endpoint error: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(&Method::POST, LLM_OBS_SPANS_ENDPOINT_PATH) => {
match Self::handle_llm_obs_spans_proxy(config, tags_provider, api_key, client, req)
.await
{
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
&format!("LLM OBS Spans endpoint error: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler() {
Ok(result) => Ok(result),
Err(err) => log_and_create_http_response(
Expand Down Expand Up @@ -354,7 +407,10 @@ impl TraceAgent {
STATS_ENDPOINT_PATH,
DSM_AGENT_PATH,
PROFILING_ENDPOINT_PATH,
INFO_ENDPOINT_PATH
INFO_ENDPOINT_PATH,
LLM_OBS_EVAL_METRIC_ENDPOINT_PATH,
LLM_OBS_EVAL_METRIC_ENDPOINT_PATH_V2,
LLM_OBS_SPANS_ENDPOINT_PATH,
],
"client_drop_p0s": true,
}
Expand Down Expand Up @@ -490,6 +546,66 @@ impl TraceAgent {
.await
}

async fn handle_llm_obs_eval_metric_proxy(
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
api_key: String,
client: reqwest::Client,
req: Request<Body>,
) -> http::Result<Response<Body>> {
Self::handle_proxy(
config,
client,
api_key,
tags_provider,
req,
"api",
LLM_OBS_EVAL_METRIC_INTAKE_PATH,
"llm_obs_eval_metric",
)
.await
}

async fn handle_llm_obs_eval_metric_proxy_v2(
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
api_key: String,
client: reqwest::Client,
req: Request<Body>,
) -> http::Result<Response<Body>> {
Self::handle_proxy(
config,
client,
api_key,
tags_provider,
req,
"api",
LLM_OBS_EVAL_METRIC_INTAKE_PATH_V2,
"llm_obs_eval_metric",
)
.await
}

async fn handle_llm_obs_spans_proxy(
config: Arc<config::Config>,
tags_provider: Arc<provider::Provider>,
api_key: String,
client: reqwest::Client,
req: Request<Body>,
) -> http::Result<Response<Body>> {
Self::handle_proxy(
config,
client,
api_key,
tags_provider,
req,
"llmobs-intake",
LLM_OBS_SPANS_INTAKE_PATH,
"llm_obs_spans",
)
.await
}

#[must_use]
pub fn get_sender_copy(&self) -> Sender<SendData> {
self.tx.clone()
Expand Down
Loading