From 3c87ee55b61d7c46fecd8af0cb19afb2a5e247e2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 10:03:19 -0600 Subject: [PATCH 01/14] save --- .../core/src/execution/datafusion/planner.rs | 42 +++++++-- native/core/src/execution/mod.rs | 91 +++++++++++++++++++ native/core/src/execution/operators/copy.rs | 14 ++- 3 files changed, 132 insertions(+), 15 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d7c8d74592..08de5344d3 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -83,6 +83,7 @@ use datafusion::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; +use crate::execution::DebugExec; use datafusion_comet_proto::{ spark_expression::{ self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, @@ -917,16 +918,20 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); - let copy_exec = if can_reuse_input_batch(&child) { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) + let child: Arc = Arc::new(DebugExec::new(child)); + + let child: Arc = if can_reuse_input_batch(&child) { + Arc::new(CopyExec::new(child, CopyMode::DeepCopy)) } else { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) + child }; - Ok(( - scans, - Arc::new(SortExec::new(exprs?, copy_exec).with_fetch(fetch)), - )) + let child = Arc::new(DebugExec::new(child)); + + let sort_exec = Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)); + let sort_exec = Arc::new(DebugExec::new(sort_exec)); + + Ok((scans, sort_exec)) } OpStruct::Scan(scan) => { let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec(); @@ -1030,6 +1035,7 @@ impl PhysicalPlanner { &join.right_join_keys, join.join_type, &join.condition, + true )?; let sort_options = join @@ -1058,6 +1064,8 @@ impl PhysicalPlanner { false, )?); + let join = Arc::new(DebugExec::new(join)); + Ok((scans, join)) } OpStruct::HashJoin(join) => { @@ -1068,6 +1076,7 @@ impl PhysicalPlanner { &join.right_join_keys, join.join_type, &join.condition, + false )?; let hash_join = Arc::new(HashJoinExec::try_new( join_params.left, @@ -1143,6 +1152,7 @@ impl PhysicalPlanner { right_join_keys: &[Expr], join_type: i32, condition: &Option, + is_sort_merge: bool, ) -> Result<(JoinParameters, Vec), ExecutionError> { assert!(children.len() == 2); let (mut left_scans, left) = self.create_plan(&children[0], inputs)?; @@ -1266,18 +1276,27 @@ impl PhysicalPlanner { // DataFusion Join operators keep the input batch internally. We need // to copy the input batch to avoid the data corruption from reusing the input // batch. - let left = if can_reuse_input_batch(&left) { + let left = if is_sort_merge { + // sortexec already unpacks + left + } else if can_reuse_input_batch(&left) { Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) } else { Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) }; - let right = if can_reuse_input_batch(&right) { + let right = if is_sort_merge { + // sortexec already unpacks + right + } else if can_reuse_input_batch(&right) { Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) } else { Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) }; + let left = Arc::new(DebugExec::new(left)); + let right = Arc::new(DebugExec::new(right)); + Ok(( JoinParameters { left, @@ -1918,7 +1937,10 @@ impl From for DataFusionError { /// modification. This is used to determine if we need to copy the input batch to avoid /// data corruption from reusing the input batch. fn can_reuse_input_batch(op: &Arc) -> bool { - if op.as_any().is::() || op.as_any().is::() { + if op.as_any().is::() + || op.as_any().is::() + || op.as_any().is::() + { can_reuse_input_batch(op.children()[0]) } else { op.as_any().is::() diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index f17935702c..afc6351bf9 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -26,12 +26,103 @@ pub mod operators; pub mod serde; pub mod shuffle; pub(crate) mod sort; + +use ::datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; pub use datafusion_comet_spark_expr::timezone; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use futures::{Stream, StreamExt}; +use std::any::Any; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; pub(crate) mod utils; mod memory_pool; pub use memory_pool::*; +#[derive(Debug)] +pub struct DebugExec { + child: Arc, +} + +impl DebugExec { + pub fn new(child: Arc) -> Self { + Self { child } + } +} + +impl DisplayAs for DebugExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "DebugExec") + } +} + +impl ExecutionPlan for DebugExec { + fn name(&self) -> &str { + "DebugExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.child.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + unreachable!() + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion_common::Result { + let stream = self.child.execute(partition, context)?; + Ok(Box::pin(DebugStream { + name: self.child.name().to_owned(), + child_stream: stream, + })) + } +} + +pub struct DebugStream { + name: String, + child_stream: SendableRecordBatchStream, +} + +impl Stream for DebugStream { + type Item = datafusion_common::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.child_stream.poll_next_unpin(cx).map(|x| match x { + Some(Err(e)) => { + println!("{}.poll_next() returned an error: {}", self.name, e); + panic!("{}.poll_next() returned an error: {}", self.name, e); + // Some(Err(e)) + } + other => other, + }) + } +} + +impl RecordBatchStream for DebugStream { + fn schema(&self) -> SchemaRef { + self.child_stream.schema() + } +} + #[cfg(test)] mod tests { #[test] diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index d6c095a77b..0b5e788fc7 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -50,7 +50,11 @@ pub struct CopyExec { #[derive(Debug, PartialEq, Clone)] pub enum CopyMode { + /// Perform a deep copy but do not unpack dictionaries + DeepCopy, + /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, + /// Perform a clone and also unpack dictionaries UnpackOrClone, } @@ -64,7 +68,7 @@ impl CopyExec { .fields .iter() .map(|f: &FieldRef| match f.data_type() { - DataType::Dictionary(_, value_type) => { + DataType::Dictionary(_, value_type) if mode != CopyMode::DeepCopy=> { Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable()) } _ => f.as_ref().clone(), @@ -258,15 +262,15 @@ fn copy_array(array: &dyn Array) -> ArrayRef { /// array is a primitive array, we simply copy the array. fn copy_or_unpack_array(array: &Arc, mode: &CopyMode) -> Result { match array.data_type() { - DataType::Dictionary(_, value_type) => { + DataType::Dictionary(_, value_type) if mode != &CopyMode::DeepCopy => { let options = CastOptions::default(); cast_with_options(array, value_type.as_ref(), &options) } _ => { - if mode == &CopyMode::UnpackOrDeepCopy { - Ok(copy_array(array)) - } else { + if mode == &CopyMode::UnpackOrClone { Ok(Arc::clone(array)) + } else { + Ok(copy_array(array)) } } } From aa8f29481e28730b5155f2facc9ab52870c4dec4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 10:11:54 -0600 Subject: [PATCH 02/14] remove debug code --- .../core/src/execution/datafusion/planner.rs | 23 ++--- native/core/src/execution/mod.rs | 91 ------------------- 2 files changed, 6 insertions(+), 108 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 08de5344d3..0c665338a7 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -83,7 +83,6 @@ use datafusion::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use crate::execution::DebugExec; use datafusion_comet_proto::{ spark_expression::{ self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, @@ -918,18 +917,14 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); - let child: Arc = Arc::new(DebugExec::new(child)); - let child: Arc = if can_reuse_input_batch(&child) { + // perform a deep copy but do not unpack dictionaries Arc::new(CopyExec::new(child, CopyMode::DeepCopy)) } else { child }; - let child = Arc::new(DebugExec::new(child)); - let sort_exec = Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)); - let sort_exec = Arc::new(DebugExec::new(sort_exec)); Ok((scans, sort_exec)) } @@ -1064,8 +1059,6 @@ impl PhysicalPlanner { false, )?); - let join = Arc::new(DebugExec::new(join)); - Ok((scans, join)) } OpStruct::HashJoin(join) => { @@ -1277,7 +1270,8 @@ impl PhysicalPlanner { // to copy the input batch to avoid the data corruption from reusing the input // batch. let left = if is_sort_merge { - // sortexec already unpacks + // SortExec does not produce dictionary arrays and does not re-use batches, + // so no need for a CopyExec in this case left } else if can_reuse_input_batch(&left) { Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) @@ -1286,7 +1280,8 @@ impl PhysicalPlanner { }; let right = if is_sort_merge { - // sortexec already unpacks + // SortExec does not produce dictionary arrays and does not re-use batches, + // so no need for a CopyExec in this case right } else if can_reuse_input_batch(&right) { Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) @@ -1294,9 +1289,6 @@ impl PhysicalPlanner { Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) }; - let left = Arc::new(DebugExec::new(left)); - let right = Arc::new(DebugExec::new(right)); - Ok(( JoinParameters { left, @@ -1937,10 +1929,7 @@ impl From for DataFusionError { /// modification. This is used to determine if we need to copy the input batch to avoid /// data corruption from reusing the input batch. fn can_reuse_input_batch(op: &Arc) -> bool { - if op.as_any().is::() - || op.as_any().is::() - || op.as_any().is::() - { + if op.as_any().is::() || op.as_any().is::() { can_reuse_input_batch(op.children()[0]) } else { op.as_any().is::() diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index afc6351bf9..f17935702c 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -26,103 +26,12 @@ pub mod operators; pub mod serde; pub mod shuffle; pub(crate) mod sort; - -use ::datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; -use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; pub use datafusion_comet_spark_expr::timezone; -use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use futures::{Stream, StreamExt}; -use std::any::Any; -use std::fmt::Formatter; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; pub(crate) mod utils; mod memory_pool; pub use memory_pool::*; -#[derive(Debug)] -pub struct DebugExec { - child: Arc, -} - -impl DebugExec { - pub fn new(child: Arc) -> Self { - Self { child } - } -} - -impl DisplayAs for DebugExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "DebugExec") - } -} - -impl ExecutionPlan for DebugExec { - fn name(&self) -> &str { - "DebugExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - self.child.properties() - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.child] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> datafusion_common::Result> { - unreachable!() - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> datafusion_common::Result { - let stream = self.child.execute(partition, context)?; - Ok(Box::pin(DebugStream { - name: self.child.name().to_owned(), - child_stream: stream, - })) - } -} - -pub struct DebugStream { - name: String, - child_stream: SendableRecordBatchStream, -} - -impl Stream for DebugStream { - type Item = datafusion_common::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.child_stream.poll_next_unpin(cx).map(|x| match x { - Some(Err(e)) => { - println!("{}.poll_next() returned an error: {}", self.name, e); - panic!("{}.poll_next() returned an error: {}", self.name, e); - // Some(Err(e)) - } - other => other, - }) - } -} - -impl RecordBatchStream for DebugStream { - fn schema(&self) -> SchemaRef { - self.child_stream.schema() - } -} - #[cfg(test)] mod tests { #[test] From f401dd5c1854962d3a241cd14f03f1e5ce39314e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 10:41:44 -0600 Subject: [PATCH 03/14] small refactor --- .../core/src/execution/datafusion/planner.rs | 37 ++++++++----------- native/core/src/execution/operators/copy.rs | 2 +- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 0c665338a7..02c5ed06ce 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1030,7 +1030,7 @@ impl PhysicalPlanner { &join.right_join_keys, join.join_type, &join.condition, - true + true, )?; let sort_options = join @@ -1069,7 +1069,7 @@ impl PhysicalPlanner { &join.right_join_keys, join.join_type, &join.condition, - false + false, )?; let hash_join = Arc::new(HashJoinExec::try_new( join_params.left, @@ -1269,25 +1269,8 @@ impl PhysicalPlanner { // DataFusion Join operators keep the input batch internally. We need // to copy the input batch to avoid the data corruption from reusing the input // batch. - let left = if is_sort_merge { - // SortExec does not produce dictionary arrays and does not re-use batches, - // so no need for a CopyExec in this case - left - } else if can_reuse_input_batch(&left) { - Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone)) - }; - - let right = if is_sort_merge { - // SortExec does not produce dictionary arrays and does not re-use batches, - // so no need for a CopyExec in this case - right - } else if can_reuse_input_batch(&right) { - Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone)) - }; + let left = Self::wrap_in_copy_exec(is_sort_merge, left); + let right = Self::wrap_in_copy_exec(is_sort_merge, right); Ok(( JoinParameters { @@ -1301,6 +1284,18 @@ impl PhysicalPlanner { )) } + fn wrap_in_copy_exec(is_sort_merge: bool, plan: Arc) -> Arc { + if is_sort_merge { + // SortExec does not produce dictionary arrays and does not re-use batches, + // so no need for a CopyExec in this case + plan + } else if can_reuse_input_batch(&plan) { + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) + } else { + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) + } + } + /// Create a DataFusion physical aggregate expression from Spark physical aggregate expression fn create_agg_expr( &self, diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 0b5e788fc7..7127f8619c 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -68,7 +68,7 @@ impl CopyExec { .fields .iter() .map(|f: &FieldRef| match f.data_type() { - DataType::Dictionary(_, value_type) if mode != CopyMode::DeepCopy=> { + DataType::Dictionary(_, value_type) if mode != CopyMode::DeepCopy => { Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable()) } _ => f.as_ref().clone(), From b14497a506205a2c26ef7d87a788a90967d337fb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:13:42 -0600 Subject: [PATCH 04/14] remove another CopyExec --- .../core/src/execution/datafusion/planner.rs | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 02c5ed06ce..4b122c5325 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -916,16 +916,7 @@ impl PhysicalPlanner { .collect(); let fetch = sort.fetch.map(|num| num as usize); - - let child: Arc = if can_reuse_input_batch(&child) { - // perform a deep copy but do not unpack dictionaries - Arc::new(CopyExec::new(child, CopyMode::DeepCopy)) - } else { - child - }; - let sort_exec = Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)); - Ok((scans, sort_exec)) } OpStruct::Scan(scan) => { @@ -1269,8 +1260,8 @@ impl PhysicalPlanner { // DataFusion Join operators keep the input batch internally. We need // to copy the input batch to avoid the data corruption from reusing the input // batch. - let left = Self::wrap_in_copy_exec(is_sort_merge, left); - let right = Self::wrap_in_copy_exec(is_sort_merge, right); + let left = Self::wrap_join_input_in_copy_exec(is_sort_merge, left); + let right = Self::wrap_join_input_in_copy_exec(is_sort_merge, right); Ok(( JoinParameters { @@ -1284,10 +1275,14 @@ impl PhysicalPlanner { )) } - fn wrap_in_copy_exec(is_sort_merge: bool, plan: Arc) -> Arc { + fn wrap_join_input_in_copy_exec( + is_sort_merge: bool, + plan: Arc, + ) -> Arc { if is_sort_merge { - // SortExec does not produce dictionary arrays and does not re-use batches, - // so no need for a CopyExec in this case + // The input to a SortMergeJoin is always a SortExec, which does not produce + // dictionary-encoded arrays and does not re-use batches, so there is no need for + // a CopyExec in this case plan } else if can_reuse_input_batch(&plan) { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) From b09ee5241d1dcb08234912c6a7294fe5a8698952 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:14:49 -0600 Subject: [PATCH 05/14] revert changes to CopyExec --- native/core/src/execution/operators/copy.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 7127f8619c..d8e75c67f1 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -50,8 +50,6 @@ pub struct CopyExec { #[derive(Debug, PartialEq, Clone)] pub enum CopyMode { - /// Perform a deep copy but do not unpack dictionaries - DeepCopy, /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, /// Perform a clone and also unpack dictionaries @@ -68,7 +66,7 @@ impl CopyExec { .fields .iter() .map(|f: &FieldRef| match f.data_type() { - DataType::Dictionary(_, value_type) if mode != CopyMode::DeepCopy => { + DataType::Dictionary(_, value_type) => { Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable()) } _ => f.as_ref().clone(), @@ -262,15 +260,15 @@ fn copy_array(array: &dyn Array) -> ArrayRef { /// array is a primitive array, we simply copy the array. fn copy_or_unpack_array(array: &Arc, mode: &CopyMode) -> Result { match array.data_type() { - DataType::Dictionary(_, value_type) if mode != &CopyMode::DeepCopy => { + DataType::Dictionary(_, value_type) => { let options = CastOptions::default(); cast_with_options(array, value_type.as_ref(), &options) } _ => { - if mode == &CopyMode::UnpackOrClone { - Ok(Arc::clone(array)) - } else { + if mode == &CopyMode::UnpackOrDeepCopy { Ok(copy_array(array)) + } else { + Ok(Arc::clone(array)) } } } From b252fc550f341b736f309b01e85bff3b50306ae7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:19:20 -0600 Subject: [PATCH 06/14] Revert "revert changes to CopyExec" This reverts commit b09ee5241d1dcb08234912c6a7294fe5a8698952. --- native/core/src/execution/operators/copy.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index d8e75c67f1..7127f8619c 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -50,6 +50,8 @@ pub struct CopyExec { #[derive(Debug, PartialEq, Clone)] pub enum CopyMode { + /// Perform a deep copy but do not unpack dictionaries + DeepCopy, /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, /// Perform a clone and also unpack dictionaries @@ -66,7 +68,7 @@ impl CopyExec { .fields .iter() .map(|f: &FieldRef| match f.data_type() { - DataType::Dictionary(_, value_type) => { + DataType::Dictionary(_, value_type) if mode != CopyMode::DeepCopy => { Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable()) } _ => f.as_ref().clone(), @@ -260,15 +262,15 @@ fn copy_array(array: &dyn Array) -> ArrayRef { /// array is a primitive array, we simply copy the array. fn copy_or_unpack_array(array: &Arc, mode: &CopyMode) -> Result { match array.data_type() { - DataType::Dictionary(_, value_type) => { + DataType::Dictionary(_, value_type) if mode != &CopyMode::DeepCopy => { let options = CastOptions::default(); cast_with_options(array, value_type.as_ref(), &options) } _ => { - if mode == &CopyMode::UnpackOrDeepCopy { - Ok(copy_array(array)) - } else { + if mode == &CopyMode::UnpackOrClone { Ok(Arc::clone(array)) + } else { + Ok(copy_array(array)) } } } From f9b0c49eb448aa39f76389663a5d6c6fca188d7f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:19:35 -0600 Subject: [PATCH 07/14] Revert "remove another CopyExec" This reverts commit b14497a506205a2c26ef7d87a788a90967d337fb. --- .../core/src/execution/datafusion/planner.rs | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 4b122c5325..02c5ed06ce 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -916,7 +916,16 @@ impl PhysicalPlanner { .collect(); let fetch = sort.fetch.map(|num| num as usize); + + let child: Arc = if can_reuse_input_batch(&child) { + // perform a deep copy but do not unpack dictionaries + Arc::new(CopyExec::new(child, CopyMode::DeepCopy)) + } else { + child + }; + let sort_exec = Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)); + Ok((scans, sort_exec)) } OpStruct::Scan(scan) => { @@ -1260,8 +1269,8 @@ impl PhysicalPlanner { // DataFusion Join operators keep the input batch internally. We need // to copy the input batch to avoid the data corruption from reusing the input // batch. - let left = Self::wrap_join_input_in_copy_exec(is_sort_merge, left); - let right = Self::wrap_join_input_in_copy_exec(is_sort_merge, right); + let left = Self::wrap_in_copy_exec(is_sort_merge, left); + let right = Self::wrap_in_copy_exec(is_sort_merge, right); Ok(( JoinParameters { @@ -1275,14 +1284,10 @@ impl PhysicalPlanner { )) } - fn wrap_join_input_in_copy_exec( - is_sort_merge: bool, - plan: Arc, - ) -> Arc { + fn wrap_in_copy_exec(is_sort_merge: bool, plan: Arc) -> Arc { if is_sort_merge { - // The input to a SortMergeJoin is always a SortExec, which does not produce - // dictionary-encoded arrays and does not re-use batches, so there is no need for - // a CopyExec in this case + // SortExec does not produce dictionary arrays and does not re-use batches, + // so no need for a CopyExec in this case plan } else if can_reuse_input_batch(&plan) { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) From 609a0fbc95762a68d34ee30326b19e603f5e5658 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:31:03 -0600 Subject: [PATCH 08/14] Revert some changes --- native/core/src/execution/datafusion/planner.rs | 2 +- native/core/src/execution/operators/copy.rs | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 02c5ed06ce..28381fbd96 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -919,7 +919,7 @@ impl PhysicalPlanner { let child: Arc = if can_reuse_input_batch(&child) { // perform a deep copy but do not unpack dictionaries - Arc::new(CopyExec::new(child, CopyMode::DeepCopy)) + Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) } else { child }; diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 7127f8619c..d8e75c67f1 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -50,8 +50,6 @@ pub struct CopyExec { #[derive(Debug, PartialEq, Clone)] pub enum CopyMode { - /// Perform a deep copy but do not unpack dictionaries - DeepCopy, /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, /// Perform a clone and also unpack dictionaries @@ -68,7 +66,7 @@ impl CopyExec { .fields .iter() .map(|f: &FieldRef| match f.data_type() { - DataType::Dictionary(_, value_type) if mode != CopyMode::DeepCopy => { + DataType::Dictionary(_, value_type) => { Field::new(f.name(), value_type.as_ref().clone(), f.is_nullable()) } _ => f.as_ref().clone(), @@ -262,15 +260,15 @@ fn copy_array(array: &dyn Array) -> ArrayRef { /// array is a primitive array, we simply copy the array. fn copy_or_unpack_array(array: &Arc, mode: &CopyMode) -> Result { match array.data_type() { - DataType::Dictionary(_, value_type) if mode != &CopyMode::DeepCopy => { + DataType::Dictionary(_, value_type) => { let options = CastOptions::default(); cast_with_options(array, value_type.as_ref(), &options) } _ => { - if mode == &CopyMode::UnpackOrClone { - Ok(Arc::clone(array)) - } else { + if mode == &CopyMode::UnpackOrDeepCopy { Ok(copy_array(array)) + } else { + Ok(Arc::clone(array)) } } } From 0e139e90aea0808b351e9cbba49fb60fd3bbd06e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:38:12 -0600 Subject: [PATCH 09/14] remove comment --- native/core/src/execution/datafusion/planner.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 28381fbd96..19ab10345c 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -918,7 +918,6 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); let child: Arc = if can_reuse_input_batch(&child) { - // perform a deep copy but do not unpack dictionaries Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) } else { child From cae110006d1a3321984c1780c22190b6855eed4f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 11:59:50 -0600 Subject: [PATCH 10/14] Revert a change --- native/core/src/execution/datafusion/planner.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 19ab10345c..69a7a9cf7b 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -917,15 +917,16 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); - let child: Arc = if can_reuse_input_batch(&child) { + let copy_exec = if can_reuse_input_batch(&child) { Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) } else { - child + Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) }; - let sort_exec = Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)); - - Ok((scans, sort_exec)) + Ok(( + scans, + Arc::new(SortExec::new(exprs?, copy_exec).with_fetch(fetch)), + )) } OpStruct::Scan(scan) => { let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec(); From 9837d2cdb9e27903d76d9b87418e347efdeb8481 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 12:27:54 -0600 Subject: [PATCH 11/14] cargo fmt --- native/core/src/execution/datafusion/planner.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 69a7a9cf7b..ac92d5c882 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1284,7 +1284,10 @@ impl PhysicalPlanner { )) } - fn wrap_in_copy_exec(is_sort_merge: bool, plan: Arc) -> Arc { + fn wrap_in_copy_exec( + is_sort_merge: bool, + plan: Arc, + ) -> Arc { if is_sort_merge { // SortExec does not produce dictionary arrays and does not re-use batches, // so no need for a CopyExec in this case From 31d0b0a38165e9315a2c0d981406dd48f7f7a3af Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 12:47:41 -0600 Subject: [PATCH 12/14] clippy --- native/core/src/execution/datafusion/planner.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index ac92d5c882..a7067ab15f 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1137,6 +1137,7 @@ impl PhysicalPlanner { } } + #[allow(clippy::too_many_arguments)] fn parse_join_parameters( &self, inputs: &mut Vec>, From 5c9dbb895c72f8fb9d8cd667ad0d62f0d11d2101 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 24 Sep 2024 15:26:23 -0600 Subject: [PATCH 13/14] refactor to avoid duplication --- .../core/src/execution/datafusion/planner.rs | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index a7067ab15f..2a30f264bb 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -917,15 +917,11 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); - let copy_exec = if can_reuse_input_batch(&child) { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone)) - }; + let child = Self::wrap_in_copy_exec(child); Ok(( scans, - Arc::new(SortExec::new(exprs?, copy_exec).with_fetch(fetch)), + Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)), )) } OpStruct::Scan(scan) => { @@ -1030,7 +1026,6 @@ impl PhysicalPlanner { &join.right_join_keys, join.join_type, &join.condition, - true, )?; let sort_options = join @@ -1069,11 +1064,18 @@ impl PhysicalPlanner { &join.right_join_keys, join.join_type, &join.condition, - false, )?; + + // HashJoinExec may cache the input batch internally. We need + // to copy the input batch to avoid the data corruption from reusing the input + // batch. We also need to unpack dictionary arrays, because the join operators + // do not support them. + let left = Self::wrap_in_copy_exec(join_params.left); + let right = Self::wrap_in_copy_exec(join_params.right); + let hash_join = Arc::new(HashJoinExec::try_new( - join_params.left, - join_params.right, + left, + right, join_params.join_on, join_params.join_filter, &join_params.join_type, @@ -1146,7 +1148,6 @@ impl PhysicalPlanner { right_join_keys: &[Expr], join_type: i32, condition: &Option, - is_sort_merge: bool, ) -> Result<(JoinParameters, Vec), ExecutionError> { assert!(children.len() == 2); let (mut left_scans, left) = self.create_plan(&children[0], inputs)?; @@ -1267,12 +1268,6 @@ impl PhysicalPlanner { None }; - // DataFusion Join operators keep the input batch internally. We need - // to copy the input batch to avoid the data corruption from reusing the input - // batch. - let left = Self::wrap_in_copy_exec(is_sort_merge, left); - let right = Self::wrap_in_copy_exec(is_sort_merge, right); - Ok(( JoinParameters { left, @@ -1285,15 +1280,8 @@ impl PhysicalPlanner { )) } - fn wrap_in_copy_exec( - is_sort_merge: bool, - plan: Arc, - ) -> Arc { - if is_sort_merge { - // SortExec does not produce dictionary arrays and does not re-use batches, - // so no need for a CopyExec in this case - plan - } else if can_reuse_input_batch(&plan) { + fn wrap_in_copy_exec(plan: Arc) -> Arc { + if can_reuse_input_batch(&plan) { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) } else { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) From 8b7b468cd0d74629ece3325dd67b6b461a19b181 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 25 Sep 2024 09:08:40 -0600 Subject: [PATCH 14/14] address feedback --- native/core/src/execution/datafusion/planner.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 2a30f264bb..bf970a92b2 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -917,6 +917,10 @@ impl PhysicalPlanner { let fetch = sort.fetch.map(|num| num as usize); + // SortExec caches batches so we need to make a copy of incoming batches. Also, + // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and + // it would be more efficient if we could avoid that. + // https://github.com/apache/datafusion-comet/issues/963 let child = Self::wrap_in_copy_exec(child); Ok(( @@ -1280,6 +1284,8 @@ impl PhysicalPlanner { )) } + /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays + /// and make a deep copy of other arrays if the plan re-uses batches. fn wrap_in_copy_exec(plan: Arc) -> Arc { if can_reuse_input_batch(&plan) { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy))