Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl PhysicalOptimizerRule for JoinSelection {
let right = cross_join.right();
if should_swap_join_order(&**left, &**right) {
let new_join =
CrossJoinExec::try_new(Arc::clone(right), Arc::clone(left))?;
CrossJoinExec::new(Arc::clone(right), Arc::clone(left));
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
let proj = ProjectionExec::try_new(
swap_reverting_projection(&left.schema(), &right.schema()),
Expand Down
39 changes: 15 additions & 24 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use log::debug;
use std::time::Instant;

use super::utils::{
adjust_right_output_partitioning, check_join_is_valid,
cross_join_equivalence_properties, OnceAsync, OnceFut,
adjust_right_output_partitioning, cross_join_equivalence_properties, OnceAsync,
OnceFut,
};

/// Data of the left side
Expand All @@ -61,34 +61,25 @@ pub struct CrossJoinExec {
}

impl CrossJoinExec {
/// Tries to create a new [CrossJoinExec].
/// # Error
/// This function errors when left and right schema's can't be combined
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let left_schema = left.schema();
let right_schema = right.schema();
check_join_is_valid(&left_schema, &right_schema, &[])?;

let left_schema = left.schema();
let left_fields = left_schema.fields().iter();
let right_schema = right.schema();

let right_fields = right_schema.fields().iter();

/// Create a new [CrossJoinExec].
pub fn new(left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>) -> Self {
// left then right
let all_columns = left_fields.chain(right_fields).cloned().collect();
let all_columns = {
let left_schema = left.schema();
let right_schema = right.schema();
let left_fields = left_schema.fields().iter();
let right_fields = right_schema.fields().iter();
left_fields.chain(right_fields).cloned().collect()
};

let schema = Arc::new(Schema::new(all_columns));
Comment on lines +67 to 75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use Schema::merge here instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do something like this. This would preserve schema metadata as well.

        let input_schemas = vec![left.schema().as_ref().clone(), right.schema().as_ref().clone()];
        let schema = Arc::new(Schema::try_merge(input_schemas)?);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm afraid try_merge doesn't fit this context.
What we want here is to concatenate schema, but not merge schema. It could happen that a table cross joins with itself, or two tables have same named columns, in which cases, merge doesn't work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the metadata, we should also do concatenation, instead of merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove, what's your suggestion?


Ok(CrossJoinExec {
CrossJoinExec {
left,
right,
schema,
left_fut: Default::default(),
})
}
}

/// left (build) side which gets loaded in memory
Expand Down Expand Up @@ -156,10 +147,10 @@ impl ExecutionPlan for CrossJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CrossJoinExec::try_new(
Ok(Arc::new(CrossJoinExec::new(
children[0].clone(),
children[1].clone(),
)?))
)))
}

fn required_input_distribution(&self) -> Vec<Distribution> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ impl DefaultPhysicalPlanner {
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let left = self.create_initial_plan(left, session_state).await?;
let right = self.create_initial_plan(right, session_state).await?;
Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
Ok(Arc::new(CrossJoinExec::new(left, right)))
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
Expand Down