diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 7428c5ed671f2..c1596116eb197 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -258,7 +258,7 @@ impl PhysicalOptimizerRule for JoinSelection { let right = cross_join.right(); if should_swap_join_order(&**left, &**right) { let new_join = - CrossJoinExec::try_new(Arc::clone(right), Arc::clone(left))?; + CrossJoinExec::new(Arc::clone(right), Arc::clone(left)); // TODO avoid adding ProjectionExec again and again, only adding Final Projection let proj = ProjectionExec::try_new( swap_reverting_projection(&left.schema(), &right.schema()), diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 170153e07b073..1b43b609620d4 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -39,8 +39,8 @@ use log::debug; use std::time::Instant; use super::utils::{ - adjust_right_output_partitioning, check_join_is_valid, - cross_join_equivalence_properties, OnceAsync, OnceFut, + adjust_right_output_partitioning, cross_join_equivalence_properties, OnceAsync, + OnceFut, }; /// Data of the left side @@ -61,34 +61,25 @@ pub struct CrossJoinExec { } impl CrossJoinExec { - /// Tries to create a new [CrossJoinExec]. - /// # Error - /// This function errors when left and right schema's can't be combined - pub fn try_new( - left: Arc, - right: Arc, - ) -> Result { - let left_schema = left.schema(); - let right_schema = right.schema(); - check_join_is_valid(&left_schema, &right_schema, &[])?; - - let left_schema = left.schema(); - let left_fields = left_schema.fields().iter(); - let right_schema = right.schema(); - - let right_fields = right_schema.fields().iter(); - + /// Create a new [CrossJoinExec]. + pub fn new(left: Arc, right: Arc) -> Self { // left then right - let all_columns = left_fields.chain(right_fields).cloned().collect(); + let all_columns = { + let left_schema = left.schema(); + let right_schema = right.schema(); + let left_fields = left_schema.fields().iter(); + let right_fields = right_schema.fields().iter(); + left_fields.chain(right_fields).cloned().collect() + }; let schema = Arc::new(Schema::new(all_columns)); - Ok(CrossJoinExec { + CrossJoinExec { left, right, schema, left_fut: Default::default(), - }) + } } /// left (build) side which gets loaded in memory @@ -156,10 +147,10 @@ impl ExecutionPlan for CrossJoinExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(CrossJoinExec::try_new( + Ok(Arc::new(CrossJoinExec::new( children[0].clone(), children[1].clone(), - )?)) + ))) } fn required_input_distribution(&self) -> Vec { diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 19d3c3e0867ce..23a36239a1099 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -991,7 +991,7 @@ impl DefaultPhysicalPlanner { LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => { let left = self.create_initial_plan(left, session_state).await?; let right = self.create_initial_plan(right, session_state).await?; - Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) + Ok(Arc::new(CrossJoinExec::new(left, right))) } LogicalPlan::Subquery(_) => todo!(), LogicalPlan::EmptyRelation(EmptyRelation {