Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,25 @@ impl InstrumentedObjectStore {
Ok(ret)
}

async fn instrumented_delete(&self, location: &Path) -> Result<()> {
let timestamp = Utc::now();
let start = Instant::now();
self.inner.delete(location).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Delete,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});

Ok(())
}

fn instrumented_list(
&self,
prefix: Option<&Path>,
Expand Down Expand Up @@ -235,6 +254,25 @@ impl InstrumentedObjectStore {

Ok(ret)
}

async fn instrumented_head(&self, location: &Path) -> Result<ObjectMeta> {
let timestamp = Utc::now();
let start = Instant::now();
let ret = self.inner.head(location).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Head,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: None,
range: None,
extra_display: None,
});

Ok(ret)
}
}

impl fmt::Display for InstrumentedObjectStore {
Expand Down Expand Up @@ -285,6 +323,10 @@ impl ObjectStore for InstrumentedObjectStore {
}

async fn delete(&self, location: &Path) -> Result<()> {
if self.enabled() {
return self.instrumented_delete(location).await;
}

self.inner.delete(location).await
}

Expand Down Expand Up @@ -313,6 +355,10 @@ impl ObjectStore for InstrumentedObjectStore {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
if self.enabled() {
return self.instrumented_head(location).await;
}

self.inner.head(location).await
}
}
Expand All @@ -321,9 +367,9 @@ impl ObjectStore for InstrumentedObjectStore {
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Operation {
_Copy,
_Delete,
Delete,
Get,
_Head,
Head,
List,
Put,
}
Expand Down Expand Up @@ -753,6 +799,35 @@ mod tests {
assert!(request.extra_display.is_none());
}

#[tokio::test]
async fn instrumented_store_delete() {
let (instrumented, path) = setup_test_store().await;

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
instrumented.delete(&path).await.unwrap();
assert!(instrumented.requests.lock().is_empty());

// We need a new store so we have data to delete again
let (instrumented, path) = setup_test_store().await;
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
instrumented.delete(&path).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);

let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());

let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Delete);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}

#[tokio::test]
async fn instrumented_store_list() {
let (instrumented, path) = setup_test_store().await;
Expand Down Expand Up @@ -865,6 +940,33 @@ mod tests {
assert!(request.extra_display.is_none());
}

#[tokio::test]
async fn instrumented_store_head() {
let (instrumented, path) = setup_test_store().await;

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.head(&path).await.unwrap();
assert!(instrumented.requests.lock().is_empty());

instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let _ = instrumented.head(&path).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);

let mut requests = instrumented.take_requests();
assert_eq!(requests.len(), 1);
assert!(instrumented.requests.lock().is_empty());

let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Head);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}

#[test]
fn request_details() {
let rd = RequestDetails {
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ async fn test_object_store_profiling() {
// Output:
// <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
settings.add_filter(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)",
"<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s (size=\d+\s+)?path=(.*)",
"<TIMESTAMP> operation=$1 duration=[DURATION] ${2}path=$3",
);

// We also need to filter out the summary statistics (anything with an 's' at the end)
Expand Down
5 changes: 5 additions & 0 deletions datafusion-cli/tests/snapshots/object_store_profiling.snap
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ObjectStore Profile mode set to Trace

Object Store Profiling
Instrumented Object Store: instrument_mode: Trace, inner: AmazonS3(data)
<TIMESTAMP> operation=Head duration=[DURATION] path=cars.csv
<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv

Summaries:
Expand All @@ -45,6 +46,8 @@ Summaries:
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Get | duration | ...NORMALIZED...| 1 |
| Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 |
| Head | duration | ...NORMALIZED...| 1 |
| Head | size | | | | | 1 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is interesting that there are (even more!) requests going on to read files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Yes! This feature continues to provide interesting data. That being said, I'm 99% sure the head requests on read in this case is due to the parquet metadata cache! Refer to my comment on its initial PR here here: #16971 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I am saying is that if we tally up all the object store operations that are (currently) required to read a parquet file there is a lot of room for improvement. I think the next release of DataFusion is going to "magically get faster" for a bunch of people because we are optimizing these calls systematically.

I for one am very excited

+-----------+----------+-----------+-----------+-----------+-----------+-------+
ObjectStore Profile mode set to Summary
+-----+-------+---------------------+
Expand All @@ -63,6 +66,8 @@ Summaries:
+-----------+----------+-----------+-----------+-----------+-----------+-------+
| Get | duration | ...NORMALIZED...| 1 |
| Get | size | 1006 B | 1006 B | 1006 B | 1006 B | 1 |
| Head | duration | ...NORMALIZED...| 1 |
| Head | size | | | | | 1 |
+-----------+----------+-----------+-----------+-----------+-----------+-------+
ObjectStore Profile mode set to Disabled
+-----+-------+---------------------+
Expand Down