Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions crates/dogstatsd/src/aggregator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ use crate::metric::{Metric, SortedTags};
use datadog_protos::metrics::SketchPayload;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, warn};
use ustr::Ustr;

#[derive(Debug)]
pub enum AggregatorCommand {
InsertBatch(Vec<Metric>),
Flush(oneshot::Sender<FlushResponse>),
GetEntryById {
name: Ustr,
tags: Option<SortedTags>,
timestamp: i64,
response_tx: oneshot::Sender<Option<Metric>>,
},
Shutdown,
}

Expand Down Expand Up @@ -45,6 +52,27 @@ impl AggregatorHandle {
.map_err(|e| format!("Failed to receive flush response: {}", e))
}

pub async fn get_entry_by_id(
&self,
name: Ustr,
tags: Option<SortedTags>,
timestamp: i64,
) -> Result<Option<Metric>, String> {
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send(AggregatorCommand::GetEntryById {
name,
tags,
timestamp,
response_tx,
})
.map_err(|e| format!("Failed to send get_entry_by_id command: {}", e))?;

response_rx
.await
.map_err(|e| format!("Failed to receive get_entry_by_id response: {}", e))
}

pub fn shutdown(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::Shutdown)
}
Expand Down Expand Up @@ -102,6 +130,19 @@ impl AggregatorService {
}
}

AggregatorCommand::GetEntryById {
name,
tags,
timestamp,
response_tx,
} => {
let entry = self.aggregator.get_entry_by_id(name, &tags, timestamp);
let response = entry.cloned();
if let Err(_) = response_tx.send(response) {
error!("Failed to send get_entry_by_id response - receiver dropped");
Comment on lines +141 to +142
Copy link

Copilot AI Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The error value is being ignored with _. Consider logging the error or using a more descriptive variable name like _send_error to clarify the intentional discard.

Suggested change
if let Err(_) = response_tx.send(response) {
error!("Failed to send get_entry_by_id response - receiver dropped");
if let Err(send_error) = response_tx.send(response) {
error!("Failed to send get_entry_by_id response - receiver dropped: {:?}", send_error);

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope

}
}

AggregatorCommand::Shutdown => {
debug!("Aggregator service shutting down");
break;
Expand Down Expand Up @@ -174,4 +215,44 @@ mod tests {
handle.shutdown().expect("Failed to shutdown");
service_task.await.expect("Service task failed");
}

#[tokio::test]
async fn test_aggregator_service_get_entry_by_id() {
use ustr::ustr;

let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service");

let service_task = tokio::spawn(service.run());

let metric = parse("test_metric:42|c|#env:prod").expect("metric parse failed");
let metric_name = metric.name;
let metric_tags = metric.tags.clone();
let metric_timestamp = metric.timestamp;

handle
.insert_batch(vec![metric.clone()])
.expect("Failed to insert metric");

let result = handle
.get_entry_by_id(metric_name, metric_tags.clone(), metric_timestamp)
.await
.expect("Failed to get entry");

assert!(result.is_some());
let retrieved_metric = result.unwrap();
assert_eq!(retrieved_metric.name, metric_name);
assert_eq!(retrieved_metric.timestamp, metric_timestamp);

let non_existent = handle
.get_entry_by_id(ustr("non_existent"), None, 0)
.await
.expect("Failed to get entry");

assert!(non_existent.is_none());

// Shutdown the service
handle.shutdown().expect("Failed to shutdown");
service_task.await.expect("Service task failed");
}
}
Loading