From 3846869908654108a9d4f871eb1094ad7c33afa4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Dec 2020 13:45:39 -0700 Subject: [PATCH 1/3] Implement join metrics --- .../datafusion/src/physical_plan/hash_join.rs | 70 ++++++++++++++++--- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 1af388f0861..bb1d2cde0e9 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -21,6 +21,7 @@ use arrow::array::ArrayRef; use std::sync::Arc; use std::{any::Any, collections::HashSet}; +use std::time::Instant; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; @@ -47,6 +48,7 @@ use crate::error::{DataFusionError, Result}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; use ahash::RandomState; +use log::debug; // An index of (batch, row) uniquely identifying a row in a part. type Index = (usize, usize); @@ -160,6 +162,8 @@ impl ExecutionPlan for HashJoinExec { match build_side.as_ref() { Some(stream) => stream.clone(), None => { + let start = Instant::now(); + // merge all left parts into a single stream let merge = MergeExec::new(self.left.clone()); let stream = merge.execute(0).await?; @@ -186,8 +190,18 @@ impl ExecutionPlan for HashJoinExec { }) .await?; + let num_rows: usize = + left_data.1.iter().map(|batch| batch.num_rows()).sum(); + let left_side = Arc::new((left_data.0, left_data.1)); *build_side = Some(left_side.clone()); + + debug!( + "Built build-side of hash join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + left_side } } @@ -208,6 +222,11 @@ impl ExecutionPlan for HashJoinExec { join_type: self.join_type, left_data, right: stream, + num_input_batches: 0, + num_input_rows: 0, + num_output_batches: 0, + num_output_rows: 0, + join_time: 0, })) } } @@ -252,6 +271,16 @@ struct HashJoinStream { left_data: JoinLeftData, /// right right: SendableRecordBatchStream, + /// number of input batches + num_input_batches: usize, + /// number of input rows + num_input_rows: usize, + /// number of batches produced + num_output_batches: usize, + /// number of rows produced + num_output_rows: usize, + /// total time for joining stream-side batches to the build-side batches + join_time: usize, } impl RecordBatchStream for HashJoinStream { @@ -531,14 +560,39 @@ impl Stream for HashJoinStream { self.right .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { - Some(Ok(batch)) => Some(build_batch( - &batch, - &self.left_data, - &self.on_right, - &self.join_type, - &self.schema, - )), - other => other, + Some(Ok(batch)) => { + let start = Instant::now(); + let result = build_batch( + &batch, + &self.left_data, + &self.on_right, + &self.join_type, + &self.schema, + ); + self.num_input_batches += 1; + self.num_input_rows += batch.num_rows(); + match result { + Ok(ref batch) => { + self.join_time += start.elapsed().as_millis() as usize; + self.num_output_batches += 1; + self.num_output_rows += batch.num_rows(); + } + _ => {} + } + Some(result) + } + other => { + debug!( + "Processed {} stream-side input batches containing {} rows and \ + produced {} output batches containing {} rows in {} ms", + self.num_input_batches, + self.num_input_rows, + self.num_output_batches, + self.num_output_rows, + self.join_time + ); + other + } }) } } From a6a5b58fdae3b9e4d07c62c61608208eaf3952c9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Dec 2020 13:49:20 -0700 Subject: [PATCH 2/3] change stream-side to probe-side --- rust/datafusion/src/physical_plan/hash_join.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index bb1d2cde0e9..5ff21d15a67 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -20,8 +20,8 @@ use arrow::array::ArrayRef; use std::sync::Arc; -use std::{any::Any, collections::HashSet}; use std::time::Instant; +use std::{any::Any, collections::HashSet}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; @@ -279,7 +279,7 @@ struct HashJoinStream { num_output_batches: usize, /// number of rows produced num_output_rows: usize, - /// total time for joining stream-side batches to the build-side batches + /// total time for joining probe-side batches to the build-side batches join_time: usize, } @@ -583,7 +583,7 @@ impl Stream for HashJoinStream { } other => { debug!( - "Processed {} stream-side input batches containing {} rows and \ + "Processed {} probe-side input batches containing {} rows and \ produced {} output batches containing {} rows in {} ms", self.num_input_batches, self.num_input_rows, From 2e1cadbb27e9e4a56b6f996c8bf263a48ddac893 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 28 Dec 2020 15:01:47 -0700 Subject: [PATCH 3/3] optimize capacity for joined batches --- rust/datafusion/src/physical_plan/hash_join.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 5ff21d15a67..04fc33489e8 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -300,6 +300,7 @@ fn build_batch_from_indices( right: &RecordBatch, join_type: &JoinType, indices: &[(JoinIndex, RightIndex)], + capacity: usize, ) -> ArrowResult { if left.is_empty() { todo!("Create empty record batch"); @@ -336,7 +337,6 @@ fn build_batch_from_indices( .iter() .map(|array| array.as_ref()) .collect::>(); - let capacity = arrays.iter().map(|array| array.len()).sum(); let mut mutable = MutableArrayData::new(arrays, true, capacity); let is_left = @@ -431,10 +431,11 @@ fn build_batch( on_right: &HashSet, join_type: &JoinType, schema: &Schema, + capacity: usize, ) -> ArrowResult { let indices = build_join_indexes(&left_data.0, &batch, join_type, on_right).unwrap(); - build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices) + build_batch_from_indices(schema, &left_data.1, &batch, join_type, &indices, capacity) } /// returns a vector with (index from left, index from right). @@ -562,12 +563,18 @@ impl Stream for HashJoinStream { .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { let start = Instant::now(); + let capacity = if self.num_output_batches == 0 { + 1024 + } else { + self.num_output_rows / self.num_output_batches + 1024 + }; let result = build_batch( &batch, &self.left_data, &self.on_right, &self.join_type, &self.schema, + capacity, ); self.num_input_batches += 1; self.num_input_rows += batch.num_rows();