Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Metric, Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
Expand All @@ -47,8 +48,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
/// Time to fetch data from executor
fetch_time: Arc<SQLMetric>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl ShuffleReaderExec {
Expand All @@ -60,7 +61,7 @@ impl ShuffleReaderExec {
Ok(Self {
partition,
schema,
fetch_time: SQLMetric::time_nanos(),
metrics: ExecutionPlanMetricsSet::new(),
})
}
}
Expand Down Expand Up @@ -100,13 +101,16 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);

let start = Instant::now();
let fetch_time =
MetricBuilder::new(&self.metrics).subset_time("fetch_time", partition);
let timer = fetch_time.timer();

let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.fetch_time.add_elapsed(start);
timer.done();

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Expand Down Expand Up @@ -149,10 +153,8 @@ impl ExecutionPlan for ShuffleReaderExec {
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
metrics
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

Expand Down
60 changes: 33 additions & 27 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
};
use futures::StreamExt;
use hashbrown::HashMap;
Expand All @@ -71,24 +74,30 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
/// Shuffle write metrics
metrics: ShuffleWriteMetrics,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

#[derive(Debug, Clone)]
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
write_time: Arc<SQLMetric>,
input_rows: Arc<SQLMetric>,
output_rows: Arc<SQLMetric>,
write_time: metrics::Time,
input_rows: metrics::Count,
output_rows: metrics::Count,
}

impl ShuffleWriteMetrics {
fn new() -> Self {
fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
write_time: SQLMetric::time_nanos(),
input_rows: SQLMetric::counter(),
output_rows: SQLMetric::counter(),
write_time,
input_rows,
output_rows,
}
}
}
Expand All @@ -108,7 +117,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
metrics: ShuffleWriteMetrics::new(),
metrics: ExecutionPlanMetricsSet::new(),
})
}

Expand Down Expand Up @@ -139,9 +148,11 @@ impl ShuffleWriterExec {
path.push(&self.job_id);
path.push(&format!("{}", self.stage_id));

let write_metrics = ShuffleWriteMetrics::new(input_partition, &self.metrics);

match &self.shuffle_output_partitioning {
None => {
let start = Instant::now();
let timer = write_metrics.write_time.timer();
path.push(&format!("{}", input_partition));
std::fs::create_dir_all(&path)?;
path.push("data.arrow");
Expand All @@ -152,18 +163,18 @@ impl ShuffleWriterExec {
let stats = utils::write_stream_to_disk(
&mut stream,
path,
self.metrics.write_time.clone(),
&write_metrics.write_time,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

self.metrics
write_metrics
.input_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics
write_metrics
.output_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics.write_time.add_elapsed(start);
timer.done();

info!(
"Executed partition {} in {} seconds. Statistics: {}",
Expand Down Expand Up @@ -197,7 +208,7 @@ impl ShuffleWriterExec {
while let Some(result) = stream.next().await {
let input_batch = result?;

self.metrics.input_rows.add(input_batch.num_rows());
write_metrics.input_rows.add(input_batch.num_rows());

let arrays = exprs
.iter()
Expand Down Expand Up @@ -239,7 +250,7 @@ impl ShuffleWriterExec {

//TODO optimize so we don't write or fetch empty partitions
//if output_batch.num_rows() > 0 {
let start = Instant::now();
let timer = write_metrics.write_time.timer();
match &mut writers[output_partition] {
Some(w) => {
w.write(&output_batch)?;
Expand All @@ -260,9 +271,8 @@ impl ShuffleWriterExec {
writers[output_partition] = Some(writer);
}
}
self.metrics.output_rows.add(output_batch.num_rows());
self.metrics.write_time.add_elapsed(start);
//}
write_metrics.output_rows.add(output_batch.num_rows());
timer.done();
}
}

Expand Down Expand Up @@ -388,12 +398,8 @@ impl ExecutionPlan for ShuffleWriterExec {
Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone());
metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone());
metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
metrics
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn fmt_as(
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, SQLMetric,
metrics, AggregateExpr, ExecutionPlan, Metric, PhysicalExpr, RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use std::time::Instant;
Expand All @@ -71,7 +71,7 @@ use std::time::Instant;
pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send + Sync>>,
path: &str,
disk_write_metric: Arc<SQLMetric>,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
let file = File::create(&path).map_err(|e| {
BallistaError::General(format!(
Expand All @@ -97,13 +97,13 @@ pub async fn write_stream_to_disk(
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;

let start = Instant::now();
let timer = disk_write_metric.timer();
writer.write(&batch)?;
disk_write_metric.add_elapsed(start);
timer.done();
}
let start = Instant::now();
let timer = disk_write_metric.timer();
writer.finish()?;
disk_write_metric.add_elapsed(start);
timer.done();
Ok(PartitionStats::new(
Some(num_rows as u64),
Some(num_batches),
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,25 @@ mod tests {

use super::*;
use crate::datasource::datasource::Statistics;
use crate::physical_plan::parquet::{
ParquetExec, ParquetExecMetrics, ParquetPartition,
};
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
use crate::physical_plan::projection::ProjectionExec;

#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let schema = Arc::new(Schema::empty());
let metrics = ExecutionPlanMetricsSet::new();
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
metrics.clone(),
)],
schema,
None,
ParquetExecMetrics::new(),
metrics,
None,
2048,
None,
Expand All @@ -154,6 +155,7 @@ mod tests {
#[test]
fn repartition_deepest_node() -> Result<()> {
let schema = Arc::new(Schema::empty());
let metrics = ExecutionPlanMetricsSet::new();
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ProjectionExec::try_new(
Expand All @@ -162,10 +164,11 @@ mod tests {
vec![ParquetPartition::new(
vec!["x".to_string()],
Statistics::default(),
metrics.clone(),
)],
schema,
None,
ParquetExecMetrics::new(),
metrics,
None,
2048,
None,
Expand Down
8 changes: 8 additions & 0 deletions datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ impl ExecutionPlan for AnalyzeExec {
// Verbose output
// TODO make this more sophisticated
if verbose {
type_builder.append_value("Plan with Full Metrics").unwrap();

let annotated_plan =
DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref())
.indent()
.to_string();
plan_builder.append_value(annotated_plan).unwrap();

type_builder.append_value("Output Rows").unwrap();
plan_builder.append_value(total_rows.to_string()).unwrap();

Expand Down
Loading