Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,59 @@ async fn explain_analyze_baseline_metrics() {
"AggregateExec: mode=Partial, gby=[]",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
"AggregateExec: mode=Partial, gby=[]",
"output_bytes="
);
assert_metrics!(
&formatted,
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(
&formatted,
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
"output_bytes="
);
assert_metrics!(
&formatted,
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"metrics=[output_rows=99, elapsed_compute="
);
assert_metrics!(
&formatted,
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"output_bytes="
);
assert_metrics!(
&formatted,
"ProjectionExec: expr=[]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes=");
assert_metrics!(
&formatted,
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute"
);
assert_metrics!(
&formatted,
"CoalesceBatchesExec: target_batch_size=4096",
"output_bytes="
);
assert_metrics!(
&formatted,
"UnionExec",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(&formatted, "UnionExec", "output_bytes=");
assert_metrics!(
&formatted,
"WindowAggExec",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(&formatted, "WindowAggExec", "output_bytes=");

fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
use datafusion::physical_plan;
Expand Down
20 changes: 20 additions & 0 deletions datafusion/physical-plan/src/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::task::Poll;

use arrow::record_batch::RecordBatch;

use crate::spill::get_record_batch_memory_size;

use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
use datafusion_common::Result;

Expand Down Expand Up @@ -53,6 +55,16 @@ pub struct BaselineMetrics {

/// output rows: the total output rows
output_rows: Count,

/// Memory usage of all output batches.
///
/// Note: This value may be overestimated. If multiple output `RecordBatch`
/// instances share underlying memory buffers, their sizes will be counted
/// multiple times.
/// Issue: <https://github.com/apache/datafusion/issues/16841>
output_bytes: Count,
// Remember to update `docs/source/user-guide/metrics.md` when updating comments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 hope people read it before making changes

// or adding new metrics
}

impl BaselineMetrics {
Expand All @@ -71,6 +83,9 @@ impl BaselineMetrics {
output_rows: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.output_rows(partition),
output_bytes: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.output_bytes(partition),
}
}

Expand All @@ -84,6 +99,7 @@ impl BaselineMetrics {
end_time: Default::default(),
elapsed_compute: self.elapsed_compute.clone(),
output_rows: Default::default(),
output_bytes: Default::default(),
}
}

Expand Down Expand Up @@ -211,13 +227,17 @@ impl RecordOutput for usize {
impl RecordOutput for RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(&self);
bm.output_bytes.add(n_bytes);
self
}
}

impl RecordOutput for &RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(self);
bm.output_bytes.add(n_bytes);
self
}
}
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ impl<'a> MetricBuilder<'a> {
count
}

/// Consume self and create a new counter for recording total output bytes
pub fn output_bytes(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::OutputBytes(count.clone()));
count
}

/// Consume self and create a new gauge for reporting current memory usage
pub fn mem_used(self, partition: usize) -> Gauge {
let gauge = Gauge::new();
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl MetricsSet {
MetricValue::ElapsedCompute(_) => false,
MetricValue::SpillCount(_) => false,
MetricValue::SpilledBytes(_) => false,
MetricValue::OutputBytes(_) => false,
MetricValue::SpilledRows(_) => false,
MetricValue::CurrentMemoryUsage(_) => false,
MetricValue::Gauge { name, .. } => name == metric_name,
Expand Down
38 changes: 25 additions & 13 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ pub enum MetricValue {
SpillCount(Count),
/// Total size of spilled bytes produced: "spilled_bytes" metric
SpilledBytes(Count),
/// Total size of output bytes produced: "output_bytes" metric
OutputBytes(Count),
/// Total size of spilled rows produced: "spilled_rows" metric
SpilledRows(Count),
/// Current memory used
Expand Down Expand Up @@ -449,6 +451,9 @@ impl PartialEq for MetricValue {
(MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
count == other
}
(MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
count == other
}
(MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
count == other
}
Expand Down Expand Up @@ -505,6 +510,7 @@ impl MetricValue {
Self::OutputRows(_) => "output_rows",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
Self::OutputBytes(_) => "output_bytes",
Self::SpilledRows(_) => "spilled_rows",
Self::CurrentMemoryUsage(_) => "mem_used",
Self::ElapsedCompute(_) => "elapsed_compute",
Expand All @@ -523,6 +529,7 @@ impl MetricValue {
Self::OutputRows(count) => count.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
Self::OutputBytes(bytes) => bytes.value(),
Self::SpilledRows(count) => count.value(),
Self::CurrentMemoryUsage(used) => used.value(),
Self::ElapsedCompute(time) => time.value(),
Expand Down Expand Up @@ -550,6 +557,7 @@ impl MetricValue {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Expand Down Expand Up @@ -588,6 +596,7 @@ impl MetricValue {
(Self::OutputRows(count), Self::OutputRows(other_count))
| (Self::SpillCount(count), Self::SpillCount(other_count))
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
| (Self::OutputBytes(count), Self::OutputBytes(other_count))
| (Self::SpilledRows(count), Self::SpilledRows(other_count))
| (
Self::Count { count, .. },
Expand Down Expand Up @@ -638,18 +647,21 @@ impl MetricValue {
/// numbers are "more useful" (and displayed first)
pub fn display_sort_key(&self) -> u8 {
match self {
Self::OutputRows(_) => 0, // show first
Self::ElapsedCompute(_) => 1, // show second
Self::SpillCount(_) => 2,
Self::SpilledBytes(_) => 3,
Self::SpilledRows(_) => 4,
Self::CurrentMemoryUsage(_) => 5,
Self::Count { .. } => 6,
Self::Gauge { .. } => 7,
Self::Time { .. } => 8,
Self::StartTimestamp(_) => 9, // show timestamps last
Self::EndTimestamp(_) => 10,
Self::Custom { .. } => 11,
// `BaselineMetrics` that is common for most operators
Self::OutputRows(_) => 0,
Self::ElapsedCompute(_) => 1,
Self::OutputBytes(_) => 2,
// Other metrics
Self::SpillCount(_) => 3,
Self::SpilledBytes(_) => 4,
Self::SpilledRows(_) => 5,
Self::CurrentMemoryUsage(_) => 6,
Self::Count { .. } => 7,
Self::Gauge { .. } => 8,
Self::Time { .. } => 9,
Self::StartTimestamp(_) => 10, // show timestamps last
Self::EndTimestamp(_) => 11,
Self::Custom { .. } => 12,
}
}

Expand All @@ -669,7 +681,7 @@ impl Display for MetricValue {
| Self::Count { count, .. } => {
write!(f, "{count}")
}
Self::SpilledBytes(count) => {
Self::SpilledBytes(count) | Self::OutputBytes(count) => {
let readable_count = human_readable_size(count.value());
write!(f, "{readable_count}")
}
Expand Down
9 changes: 5 additions & 4 deletions docs/source/user-guide/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ DataFusion operators expose runtime metrics so you can understand where time is

`BaselineMetrics` are available in most physical operators to capture common measurements.

| Metric | Description |
| --------------- | ------------------------------------------------------ |
| elapsed_compute | CPU time the operator actively spends processing work. |
| output_rows | Total number of rows the operator produces. |
| Metric | Description |
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| elapsed_compute | CPU time the operator actively spends processing work. |
| output_rows | Total number of rows the operator produces. |
| output_bytes | Memory usage of all output batches. Note: This value may be overestimated. If multiple output `RecordBatch` instances share underlying memory buffers, their sizes will be counted multiple times. |

## Operator-specific Metrics

Expand Down