From 2727a8c12f05b853885e5137485e2d59a57edb1d Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Fri, 17 Oct 2025 12:49:42 -0600 Subject: [PATCH 1/2] Adds instrumentation to PUT ops in the CLI - Adds instrumentation around put_opts - Adds instrumentation around put_multipart - Adds tests for newly instrumented methods --- .../src/object_storage/instrumented.rs | 130 +++++++++++++++++- 1 file changed, 127 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 94445ee64ef4c..c9e153b46d4bd 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -119,6 +119,54 @@ impl InstrumentedObjectStore { != InstrumentedObjectStoreMode::Disabled as u8 } + async fn instrumented_put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let timestamp = Utc::now(); + let start = Instant::now(); + let size = payload.content_length(); + let ret = self.inner.put_opts(location, payload, opts).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Put, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: Some(size), + range: None, + extra_display: None, + }); + + Ok(ret) + } + + async fn instrumented_put_multipart( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + let timestamp = Utc::now(); + let start = Instant::now(); + let ret = self.inner.put_multipart_opts(location, opts).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Put, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(ret) + } + async fn instrumented_get_opts( &self, location: &Path, @@ -207,6 +255,10 @@ impl ObjectStore for InstrumentedObjectStore { payload: PutPayload, opts: PutOptions, ) -> Result { + if self.enabled() { + return self.instrumented_put_opts(location, payload, opts).await; + } + self.inner.put_opts(location, payload, opts).await } @@ -215,6 +267,10 @@ impl ObjectStore for InstrumentedObjectStore { location: &Path, opts: PutMultipartOptions, ) -> Result> { + if self.enabled() { + return self.instrumented_put_multipart(location, opts).await; + } + self.inner.put_multipart_opts(location, opts).await } @@ -267,7 +323,7 @@ pub enum Operation { Get, _Head, List, - _Put, + Put, } /// Holds profiling details about individual requests made through an [`InstrumentedObjectStore`] @@ -477,6 +533,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { #[cfg(test)] mod tests { + use object_store::WriteMultipart; + use super::*; #[test] @@ -618,6 +676,72 @@ mod tests { assert!(request.extra_display.is_none()); } + #[tokio::test] + async fn instrumented_store_put_opts() { + // The `setup_test_store()` method comes with data already `put` into it, so we'll setup + // manually for this test + let store = Arc::new(object_store::memory::InMemory::new()); + let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8); + let instrumented = InstrumentedObjectStore::new(store, mode); + + let path = Path::from("test/data"); + let payload = PutPayload::from_static(b"test_data"); + let size = payload.content_length(); + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + instrumented.put(&path, payload.clone()).await.unwrap(); + assert!(instrumented.requests.lock().is_empty()); + + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + instrumented.put(&path, payload).await.unwrap(); + assert_eq!(instrumented.requests.lock().len(), 1); + + let request = instrumented.take_requests().pop().unwrap(); + assert_eq!(request.op, Operation::Put); + assert_eq!(request.path, path); + assert!(request.duration.is_some()); + assert_eq!(request.size.unwrap(), size); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + + #[tokio::test] + async fn instrumented_store_put_multipart() { + // The `setup_test_store()` method comes with data already `put` into it, so we'll setup + // manually for this test + let store = Arc::new(object_store::memory::InMemory::new()); + let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8); + let instrumented = InstrumentedObjectStore::new(store, mode); + + let path = Path::from("test/data"); + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + let mp = instrumented.put_multipart(&path).await.unwrap(); + let mut write = WriteMultipart::new(mp); + write.write(b"test_data"); + write.finish().await.unwrap(); + assert!(instrumented.requests.lock().is_empty()); + + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + let mp = instrumented.put_multipart(&path).await.unwrap(); + let mut write = WriteMultipart::new(mp); + write.write(b"test_data"); + write.finish().await.unwrap(); + assert_eq!(instrumented.requests.lock().len(), 1); + + let request = instrumented.take_requests().pop().unwrap(); + assert_eq!(request.op, Operation::Put); + assert_eq!(request.path, path); + assert!(request.duration.is_some()); + assert!(request.size.is_none()); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + #[test] fn request_details() { let rd = RequestDetails { @@ -717,7 +841,7 @@ mod tests { // Add Put requests to test grouping requests.push(RequestDetails { - op: Operation::_Put, + op: Operation::Put, path: Path::from("test4"), timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(), duration: Some(Duration::from_millis(200)), @@ -732,7 +856,7 @@ mod tests { let get_summary = summaries.get(&Operation::Get).unwrap(); assert_eq!(get_summary.count, 3); - let put_summary = summaries.get(&Operation::_Put).unwrap(); + let put_summary = summaries.get(&Operation::Put).unwrap(); assert_eq!(put_summary.count, 1); assert_eq!( put_summary.duration_stats.as_ref().unwrap().min, From a66d66e2feb735290329dccbe7fb5e91e24ef482 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 17 Oct 2025 17:13:14 -0400 Subject: [PATCH 2/2] fmt --- datafusion-cli/src/object_storage/instrumented.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 5766768ff400f..722d4e1ce7a86 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -964,7 +964,6 @@ mod tests { "); } - #[test] fn request_summary_only_duration() { // Test request with only duration (no size)