Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rust/lance-datafusion/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ pub struct ExecutionSummaryCounts {
/// Additional metrics for more detailed statistics. These are subject to change in the future
/// and should only be used for debugging purposes.
pub all_counts: HashMap<String, usize>,
/// Additional time metrics for more detailed statistics, stored in nanoseconds.
/// These are subject to change in the future and should only be used for debugging purposes.
pub all_times: HashMap<String, usize>,
}

pub fn collect_execution_metrics(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
Expand All @@ -510,6 +513,13 @@ pub fn collect_execution_metrics(node: &dyn ExecutionPlan, counts: &mut Executio
}
}
}
for (metric_name, time) in metrics.iter_times() {
let existing = counts
.all_times
.entry(metric_name.as_ref().to_string())
.or_insert(0);
*existing += time.value();
}
// Include gauge-based I/O metrics (some nodes record I/O as gauges)
for (metric_name, gauge) in metrics.iter_gauges() {
match metric_name.as_ref() {
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-datafusion/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub fn reader_to_stream(batches: Box<dyn RecordBatchReader + Send>) -> SendableR
pub trait MetricsExt {
fn find_count(&self, name: &str) -> Option<Count>;
fn iter_counts(&self) -> impl Iterator<Item = (impl AsRef<str>, &Count)>;
fn iter_times(&self) -> impl Iterator<Item = (impl AsRef<str>, &Time)>;
fn iter_gauges(&self) -> impl Iterator<Item = (impl AsRef<str>, &Gauge)>;
}

Expand All @@ -179,6 +180,13 @@ impl MetricsExt for MetricsSet {
})
}

fn iter_times(&self) -> impl Iterator<Item = (impl AsRef<str>, &Time)> {
self.iter().filter_map(|m| match m.value() {
MetricValue::Time { name, time } => Some((name, time)),
_ => None,
})
}

fn iter_gauges(&self) -> impl Iterator<Item = (impl AsRef<str>, &Gauge)> {
self.iter().filter_map(|m| match m.value() {
MetricValue::Gauge { name, gauge } => Some((name, gauge)),
Expand Down Expand Up @@ -242,5 +250,6 @@ pub const ROWS_SCANNED_METRIC: &str = "rows_scanned";
pub const TASK_WAIT_TIME_METRIC: &str = "task_wait_time";
pub const DELTAS_SEARCHED_METRIC: &str = "deltas_searched";
pub const PARTITIONS_SEARCHED_METRIC: &str = "partitions_searched";
pub const FIND_PARTITIONS_ELAPSED_METRIC: &str = "find_partitions_elapsed";
pub const SCALAR_INDEX_SEARCH_TIME_METRIC: &str = "search_time";
pub const SCALAR_INDEX_SER_TIME_METRIC: &str = "ser_time";
4 changes: 4 additions & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ harness = false
name = "vector_throughput"
harness = false

[[bench]]
name = "distributed_vector_build"
harness = false

[[bench]]
name = "mem_wal_write"
harness = false
Expand Down
Loading
Loading