From dae50022a39c4242c39da815157387fc4749fa62 Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Tue, 21 Oct 2025 11:28:16 -0600 Subject: [PATCH] Adds DELETE and HEAD instrumentation to CLI - Adds instrumentation to head requests in the instrumented object store - Adds instrumentatin to delete requests in the instrumented object store - Adds tests for new code and updates existing tests --- .../src/object_storage/instrumented.rs | 106 +++++++++++++++++- datafusion-cli/tests/cli_integration.rs | 4 +- .../snapshots/object_store_profiling.snap | 5 + 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 722d4e1ce7a86..4f8626888ed91 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -194,6 +194,25 @@ impl InstrumentedObjectStore { Ok(ret) } + async fn instrumented_delete(&self, location: &Path) -> Result<()> { + let timestamp = Utc::now(); + let start = Instant::now(); + self.inner.delete(location).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Delete, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(()) + } + fn instrumented_list( &self, prefix: Option<&Path>, @@ -235,6 +254,25 @@ impl InstrumentedObjectStore { Ok(ret) } + + async fn instrumented_head(&self, location: &Path) -> Result { + let timestamp = Utc::now(); + let start = Instant::now(); + let ret = self.inner.head(location).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Head, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(ret) + } } impl fmt::Display for InstrumentedObjectStore { @@ -285,6 +323,10 @@ impl ObjectStore for InstrumentedObjectStore { } async fn delete(&self, location: &Path) -> Result<()> { + if self.enabled() { + return self.instrumented_delete(location).await; + } + self.inner.delete(location).await } @@ -313,6 +355,10 @@ impl ObjectStore for InstrumentedObjectStore { } async fn head(&self, location: &Path) -> Result { + if self.enabled() { + return self.instrumented_head(location).await; + } + self.inner.head(location).await } } @@ -321,9 +367,9 @@ impl ObjectStore for InstrumentedObjectStore { #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum Operation { _Copy, - _Delete, + Delete, Get, - _Head, + Head, List, Put, } @@ -753,6 +799,35 @@ mod tests { assert!(request.extra_display.is_none()); } + #[tokio::test] + async fn instrumented_store_delete() { + let (instrumented, path) = setup_test_store().await; + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + instrumented.delete(&path).await.unwrap(); + assert!(instrumented.requests.lock().is_empty()); + + // We need a new store so we have data to delete again + let (instrumented, path) = setup_test_store().await; + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + instrumented.delete(&path).await.unwrap(); + assert_eq!(instrumented.requests.lock().len(), 1); + + let mut requests = instrumented.take_requests(); + assert_eq!(requests.len(), 1); + assert!(instrumented.requests.lock().is_empty()); + + let request = requests.pop().unwrap(); + assert_eq!(request.op, Operation::Delete); + assert_eq!(request.path, path); + assert!(request.duration.is_some()); + assert!(request.size.is_none()); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + #[tokio::test] async fn instrumented_store_list() { let (instrumented, path) = setup_test_store().await; @@ -865,6 +940,33 @@ mod tests { assert!(request.extra_display.is_none()); } + #[tokio::test] + async fn instrumented_store_head() { + let (instrumented, path) = setup_test_store().await; + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + let _ = instrumented.head(&path).await.unwrap(); + assert!(instrumented.requests.lock().is_empty()); + + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + let _ = instrumented.head(&path).await.unwrap(); + assert_eq!(instrumented.requests.lock().len(), 1); + + let mut requests = instrumented.take_requests(); + assert_eq!(requests.len(), 1); + assert!(instrumented.requests.lock().is_empty()); + + let request = requests.pop().unwrap(); + assert_eq!(request.op, Operation::Head); + assert_eq!(request.path, path); + assert!(request.duration.is_some()); + assert!(request.size.is_none()); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + #[test] fn request_details() { let rd = RequestDetails { diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 809e6fd32c4f0..c1395aa4f562c 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -411,8 +411,8 @@ async fn test_object_store_profiling() { // Output: // operation=Get duration=[DURATION] size=1006 path=cars.csv settings.add_filter( - r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)", - " operation=$1 duration=[DURATION] size=$2 path=$3", + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s (size=\d+\s+)?path=(.*)", + " operation=$1 duration=[DURATION] ${2}path=$3", ); // We also need to filter out the summary statistics (anything with an 's' at the end) diff --git a/datafusion-cli/tests/snapshots/object_store_profiling.snap b/datafusion-cli/tests/snapshots/object_store_profiling.snap index cff646f3b0e0c..029b07c324f5d 100644 --- a/datafusion-cli/tests/snapshots/object_store_profiling.snap +++ b/datafusion-cli/tests/snapshots/object_store_profiling.snap @@ -37,6 +37,7 @@ ObjectStore Profile mode set to Trace Object Store Profiling Instrumented Object Store: instrument_mode: Trace, inner: AmazonS3(data) + operation=Head duration=[DURATION] path=cars.csv operation=Get duration=[DURATION] size=1006 path=cars.csv Summaries: @@ -45,6 +46,8 @@ Summaries: +-----------+----------+-----------+-----------+-----------+-----------+-------+ | Get | duration | ...NORMALIZED...| 1 | | Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 | +| Head | duration | ...NORMALIZED...| 1 | +| Head | size | | | | | 1 | +-----------+----------+-----------+-----------+-----------+-----------+-------+ ObjectStore Profile mode set to Summary +-----+-------+---------------------+ @@ -63,6 +66,8 @@ Summaries: +-----------+----------+-----------+-----------+-----------+-----------+-------+ | Get | duration | ...NORMALIZED...| 1 | | Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 | +| Head | duration | ...NORMALIZED...| 1 | +| Head | size | | | | | 1 | +-----------+----------+-----------+-----------+-----------+-----------+-------+ ObjectStore Profile mode set to Disabled +-----+-------+---------------------+