diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 54a57ed901162..f86a20dd9c97a 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -63,36 +63,59 @@ async fn explain_analyze_baseline_metrics() { "AggregateExec: mode=Partial, gby=[]", "metrics=[output_rows=3, elapsed_compute=" ); + assert_metrics!( + &formatted, + "AggregateExec: mode=Partial, gby=[]", + "output_bytes=" + ); assert_metrics!( &formatted, "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", "metrics=[output_rows=5, elapsed_compute=" ); + assert_metrics!( + &formatted, + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", + "output_bytes=" + ); assert_metrics!( &formatted, "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", "metrics=[output_rows=99, elapsed_compute=" ); + assert_metrics!( + &formatted, + "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", + "output_bytes=" + ); assert_metrics!( &formatted, "ProjectionExec: expr=[]", "metrics=[output_rows=5, elapsed_compute=" ); + assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes="); assert_metrics!( &formatted, "CoalesceBatchesExec: target_batch_size=4096", "metrics=[output_rows=5, elapsed_compute" ); + assert_metrics!( + &formatted, + "CoalesceBatchesExec: target_batch_size=4096", + "output_bytes=" + ); assert_metrics!( &formatted, "UnionExec", "metrics=[output_rows=3, elapsed_compute=" ); + assert_metrics!(&formatted, "UnionExec", "output_bytes="); assert_metrics!( &formatted, "WindowAggExec", "metrics=[output_rows=1, elapsed_compute=" ); + assert_metrics!(&formatted, "WindowAggExec", "output_bytes="); fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool { use datafusion::physical_plan; diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index 45cef58b5dd8c..858773b94664d 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -21,6 +21,8 @@ use std::task::Poll; use arrow::record_batch::RecordBatch; +use crate::spill::get_record_batch_memory_size; + use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; use datafusion_common::Result; @@ -53,6 +55,16 @@ pub struct BaselineMetrics { /// output rows: the total output rows output_rows: Count, + + /// Memory usage of all output batches. + /// + /// Note: This value may be overestimated. If multiple output `RecordBatch` + /// instances share underlying memory buffers, their sizes will be counted + /// multiple times. + /// Issue: + output_bytes: Count, + // Remember to update `docs/source/user-guide/metrics.md` when updating comments + // or adding new metrics } impl BaselineMetrics { @@ -71,6 +83,9 @@ impl BaselineMetrics { output_rows: MetricBuilder::new(metrics) .with_type(super::MetricType::SUMMARY) .output_rows(partition), + output_bytes: MetricBuilder::new(metrics) + .with_type(super::MetricType::SUMMARY) + .output_bytes(partition), } } @@ -84,6 +99,7 @@ impl BaselineMetrics { end_time: Default::default(), elapsed_compute: self.elapsed_compute.clone(), output_rows: Default::default(), + output_bytes: Default::default(), } } @@ -211,6 +227,8 @@ impl RecordOutput for usize { impl RecordOutput for RecordBatch { fn record_output(self, bm: &BaselineMetrics) -> Self { bm.record_output(self.num_rows()); + let n_bytes = get_record_batch_memory_size(&self); + bm.output_bytes.add(n_bytes); self } } @@ -218,6 +236,8 @@ impl RecordOutput for RecordBatch { impl RecordOutput for &RecordBatch { fn record_output(self, bm: &BaselineMetrics) -> Self { bm.record_output(self.num_rows()); + let n_bytes = get_record_batch_memory_size(self); + bm.output_bytes.add(n_bytes); self } } diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index 74ba5a2a18343..88ec1a3f67d12 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -151,6 +151,14 @@ impl<'a> MetricBuilder<'a> { count } + /// Consume self and create a new counter for recording total output bytes + pub fn output_bytes(self, partition: usize) -> Count { + let count = Count::new(); + self.with_partition(partition) + .build(MetricValue::OutputBytes(count.clone())); + count + } + /// Consume self and create a new gauge for reporting current memory usage pub fn mem_used(self, partition: usize) -> Gauge { let gauge = Gauge::new(); diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 0fd7bfb8c812d..02aad6eb60ac3 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -296,6 +296,7 @@ impl MetricsSet { MetricValue::ElapsedCompute(_) => false, MetricValue::SpillCount(_) => false, MetricValue::SpilledBytes(_) => false, + MetricValue::OutputBytes(_) => false, MetricValue::SpilledRows(_) => false, MetricValue::CurrentMemoryUsage(_) => false, MetricValue::Gauge { name, .. } => name == metric_name, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 3149fca95ba84..fc947935503c5 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -395,6 +395,8 @@ pub enum MetricValue { SpillCount(Count), /// Total size of spilled bytes produced: "spilled_bytes" metric SpilledBytes(Count), + /// Total size of output bytes produced: "output_bytes" metric + OutputBytes(Count), /// Total size of spilled rows produced: "spilled_rows" metric SpilledRows(Count), /// Current memory used @@ -449,6 +451,9 @@ impl PartialEq for MetricValue { (MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => { count == other } + (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => { + count == other + } (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => { count == other } @@ -505,6 +510,7 @@ impl MetricValue { Self::OutputRows(_) => "output_rows", Self::SpillCount(_) => "spill_count", Self::SpilledBytes(_) => "spilled_bytes", + Self::OutputBytes(_) => "output_bytes", Self::SpilledRows(_) => "spilled_rows", Self::CurrentMemoryUsage(_) => "mem_used", Self::ElapsedCompute(_) => "elapsed_compute", @@ -523,6 +529,7 @@ impl MetricValue { Self::OutputRows(count) => count.value(), Self::SpillCount(count) => count.value(), Self::SpilledBytes(bytes) => bytes.value(), + Self::OutputBytes(bytes) => bytes.value(), Self::SpilledRows(count) => count.value(), Self::CurrentMemoryUsage(used) => used.value(), Self::ElapsedCompute(time) => time.value(), @@ -550,6 +557,7 @@ impl MetricValue { Self::OutputRows(_) => Self::OutputRows(Count::new()), Self::SpillCount(_) => Self::SpillCount(Count::new()), Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()), + Self::OutputBytes(_) => Self::OutputBytes(Count::new()), Self::SpilledRows(_) => Self::SpilledRows(Count::new()), Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()), Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()), @@ -588,6 +596,7 @@ impl MetricValue { (Self::OutputRows(count), Self::OutputRows(other_count)) | (Self::SpillCount(count), Self::SpillCount(other_count)) | (Self::SpilledBytes(count), Self::SpilledBytes(other_count)) + | (Self::OutputBytes(count), Self::OutputBytes(other_count)) | (Self::SpilledRows(count), Self::SpilledRows(other_count)) | ( Self::Count { count, .. }, @@ -638,18 +647,21 @@ impl MetricValue { /// numbers are "more useful" (and displayed first) pub fn display_sort_key(&self) -> u8 { match self { - Self::OutputRows(_) => 0, // show first - Self::ElapsedCompute(_) => 1, // show second - Self::SpillCount(_) => 2, - Self::SpilledBytes(_) => 3, - Self::SpilledRows(_) => 4, - Self::CurrentMemoryUsage(_) => 5, - Self::Count { .. } => 6, - Self::Gauge { .. } => 7, - Self::Time { .. } => 8, - Self::StartTimestamp(_) => 9, // show timestamps last - Self::EndTimestamp(_) => 10, - Self::Custom { .. } => 11, + // `BaselineMetrics` that is common for most operators + Self::OutputRows(_) => 0, + Self::ElapsedCompute(_) => 1, + Self::OutputBytes(_) => 2, + // Other metrics + Self::SpillCount(_) => 3, + Self::SpilledBytes(_) => 4, + Self::SpilledRows(_) => 5, + Self::CurrentMemoryUsage(_) => 6, + Self::Count { .. } => 7, + Self::Gauge { .. } => 8, + Self::Time { .. } => 9, + Self::StartTimestamp(_) => 10, // show timestamps last + Self::EndTimestamp(_) => 11, + Self::Custom { .. } => 12, } } @@ -669,7 +681,7 @@ impl Display for MetricValue { | Self::Count { count, .. } => { write!(f, "{count}") } - Self::SpilledBytes(count) => { + Self::SpilledBytes(count) | Self::OutputBytes(count) => { let readable_count = human_readable_size(count.value()); write!(f, "{readable_count}") } diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index f2634b901518b..1fb2f4a5c7700 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -27,10 +27,11 @@ DataFusion operators expose runtime metrics so you can understand where time is `BaselineMetrics` are available in most physical operators to capture common measurements. -| Metric | Description | -| --------------- | ------------------------------------------------------ | -| elapsed_compute | CPU time the operator actively spends processing work. | -| output_rows | Total number of rows the operator produces. | +| Metric | Description | +| --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| elapsed_compute | CPU time the operator actively spends processing work. | +| output_rows | Total number of rows the operator produces. | +| output_bytes | Memory usage of all output batches. Note: This value may be overestimated. If multiple output `RecordBatch` instances share underlying memory buffers, their sizes will be counted multiple times. | ## Operator-specific Metrics