diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index f6490801126fc..7d875a5588700 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -663,12 +663,7 @@ impl DataFrame { ) -> Result { let expr = on_exprs.into_iter().reduce(Expr::and); let plan = LogicalPlanBuilder::from(self.plan) - .join( - right.plan, - join_type, - (Vec::::new(), Vec::::new()), - expr, - )? + .join_on(right.plan, join_type, expr)? .build()?; Ok(DataFrame::new(self.session_state, plan)) } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3d62bcf55d6c1..539f6382adf3a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -626,6 +626,54 @@ impl LogicalPlanBuilder { self.join_detailed(right, join_type, join_keys, filter, false) } + /// Apply a join with on constraint. + /// + /// The `ExtractEquijoinPredicate` optimizer pass has the ability to split join predicates into + /// equijoin predicates and (other) filter predicates. Therefore, if you prefer not to manually split the + /// join predicates, it is recommended to use the `join_on` method instead of the `join` method. + /// + /// ``` + /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder, + /// # logical_plan::builder::LogicalTableSource, logical_plan::JoinType,}; + /// # use std::sync::Arc; + /// # use arrow::datatypes::{Schema, DataType, Field}; + /// # use datafusion_common::Result; + /// # fn main() -> Result<()> { + /// let example_schema = Arc::new(Schema::new(vec![ + /// Field::new("a", DataType::Int32, false), + /// Field::new("b", DataType::Int32, false), + /// Field::new("c", DataType::Int32, false), + /// ])); + /// let table_source = Arc::new(LogicalTableSource::new(example_schema)); + /// let left_table = table_source.clone(); + /// let right_table = table_source.clone(); + /// + /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?; + /// + /// let exprs = vec![col("left.a").eq(col("right.a")), col("left.b").not_eq(col("right.b"))] + /// .into_iter() + /// .reduce(Expr::and); + /// let plan = LogicalPlanBuilder::scan("left", left_table, None)? + /// .join_on(right_plan, JoinType::Inner, exprs)? + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn join_on( + self, + right: LogicalPlan, + join_type: JoinType, + on_exprs: Option, + ) -> Result { + self.join_detailed( + right, + join_type, + (Vec::::new(), Vec::::new()), + on_exprs, + false, + ) + } + pub(crate) fn normalize( plan: &LogicalPlan, column: impl Into + Clone,