diff --git a/crates/dogstatsd/src/aggregator_service.rs b/crates/dogstatsd/src/aggregator_service.rs index 04222529..820351c4 100644 --- a/crates/dogstatsd/src/aggregator_service.rs +++ b/crates/dogstatsd/src/aggregator_service.rs @@ -7,11 +7,18 @@ use crate::metric::{Metric, SortedTags}; use datadog_protos::metrics::SketchPayload; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, warn}; +use ustr::Ustr; #[derive(Debug)] pub enum AggregatorCommand { InsertBatch(Vec), Flush(oneshot::Sender), + GetEntryById { + name: Ustr, + tags: Option, + timestamp: i64, + response_tx: oneshot::Sender>, + }, Shutdown, } @@ -45,6 +52,27 @@ impl AggregatorHandle { .map_err(|e| format!("Failed to receive flush response: {}", e)) } + pub async fn get_entry_by_id( + &self, + name: Ustr, + tags: Option, + timestamp: i64, + ) -> Result, String> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(AggregatorCommand::GetEntryById { + name, + tags, + timestamp, + response_tx, + }) + .map_err(|e| format!("Failed to send get_entry_by_id command: {}", e))?; + + response_rx + .await + .map_err(|e| format!("Failed to receive get_entry_by_id response: {}", e)) + } + pub fn shutdown(&self) -> Result<(), mpsc::error::SendError> { self.tx.send(AggregatorCommand::Shutdown) } @@ -102,6 +130,19 @@ impl AggregatorService { } } + AggregatorCommand::GetEntryById { + name, + tags, + timestamp, + response_tx, + } => { + let entry = self.aggregator.get_entry_by_id(name, &tags, timestamp); + let response = entry.cloned(); + if let Err(_) = response_tx.send(response) { + error!("Failed to send get_entry_by_id response - receiver dropped"); + } + } + AggregatorCommand::Shutdown => { debug!("Aggregator service shutting down"); break; @@ -174,4 +215,44 @@ mod tests { handle.shutdown().expect("Failed to shutdown"); service_task.await.expect("Service task failed"); } + + #[tokio::test] + async fn test_aggregator_service_get_entry_by_id() { + use ustr::ustr; + + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service"); + + let service_task = tokio::spawn(service.run()); + + let metric = parse("test_metric:42|c|#env:prod").expect("metric parse failed"); + let metric_name = metric.name; + let metric_tags = metric.tags.clone(); + let metric_timestamp = metric.timestamp; + + handle + .insert_batch(vec![metric.clone()]) + .expect("Failed to insert metric"); + + let result = handle + .get_entry_by_id(metric_name, metric_tags.clone(), metric_timestamp) + .await + .expect("Failed to get entry"); + + assert!(result.is_some()); + let retrieved_metric = result.unwrap(); + assert_eq!(retrieved_metric.name, metric_name); + assert_eq!(retrieved_metric.timestamp, metric_timestamp); + + let non_existent = handle + .get_entry_by_id(ustr("non_existent"), None, 0) + .await + .expect("Failed to get entry"); + + assert!(non_existent.is_none()); + + // Shutdown the service + handle.shutdown().expect("Failed to shutdown"); + service_task.await.expect("Service task failed"); + } }