From d3ef287438f0ff432a92541909aef211043ae85e Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 18 May 2025 01:24:33 -0400 Subject: [PATCH 01/13] feat: Support RightMark join for NestedLoop and Hash join --- .../common/src/functional_dependencies.rs | 2 +- datafusion/common/src/join_type.rs | 12 +- datafusion/core/tests/dataframe/mod.rs | 4 +- .../enforce_distribution.rs | 5 +- datafusion/expr/src/logical_plan/builder.rs | 4 + .../expr/src/logical_plan/invariants.rs | 5 +- datafusion/expr/src/logical_plan/plan.rs | 8 +- .../optimizer/src/optimize_projections/mod.rs | 3 +- datafusion/optimizer/src/push_down_filter.rs | 5 +- .../physical-expr/src/equivalence/class.rs | 4 +- .../src/enforce_distribution.rs | 2 +- .../src/enforce_sorting/sort_pushdown.rs | 3 +- .../physical-plan/src/joins/hash_join.rs | 134 ++++++++++++++++-- .../src/joins/nested_loop_join.rs | 64 +++++++-- .../src/joins/sort_merge_join.rs | 13 +- .../src/joins/symmetric_hash_join.rs | 20 ++- datafusion/physical-plan/src/joins/utils.rs | 57 +++++++- .../proto/datafusion_common.proto | 1 + .../proto-common/src/generated/pbjson.rs | 3 + .../proto-common/src/generated/prost.rs | 3 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/from_proto.rs | 1 + datafusion/proto/src/logical_plan/to_proto.rs | 1 + datafusion/sql/src/unparser/plan.rs | 40 ++++-- .../src/logical_plan/consumer/rel/join_rel.rs | 1 + .../substrait/src/logical_plan/producer.rs | 1 + 26 files changed, 346 insertions(+), 53 deletions(-) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 77e00d6dcda23..ab72fe8f67da4 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -374,7 +374,7 @@ impl FunctionalDependencies { // These joins preserve functional dependencies of the left side: left_func_dependencies } - JoinType::RightSemi | JoinType::RightAnti => { + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { // These joins preserve functional dependencies of the right side: right_func_dependencies } diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index ac81d977b7296..05d6f70aa3e10 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -67,6 +67,11 @@ pub enum JoinType { /// /// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf LeftMark, + /// Right Mark Join + /// + /// Same logic as the LeftMark Join above, however it returns a record for each record from the + /// right input. + RightMark, } impl JoinType { @@ -87,9 +92,8 @@ impl JoinType { JoinType::RightSemi => JoinType::LeftSemi, JoinType::LeftAnti => JoinType::RightAnti, JoinType::RightAnti => JoinType::LeftAnti, - JoinType::LeftMark => { - unreachable!("LeftMark join type does not support swapping") - } + JoinType::LeftMark => JoinType::RightMark, + JoinType::RightMark => JoinType::LeftMark, } } @@ -121,6 +125,7 @@ impl Display for JoinType { JoinType::LeftAnti => "LeftAnti", JoinType::RightAnti => "RightAnti", JoinType::LeftMark => "LeftMark", + JoinType::RightMark => "RightMark", }; write!(f, "{join_type}") } @@ -141,6 +146,7 @@ impl FromStr for JoinType { "LEFTANTI" => Ok(JoinType::LeftAnti), "RIGHTANTI" => Ok(JoinType::RightAnti), "LEFTMARK" => Ok(JoinType::LeftMark), + "RIGHTMARK" => Ok(JoinType::RightMark), _ => _not_impl_err!("The join type {s} does not exist or is not implemented"), } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 827808d923f59..072569aed877c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2094,6 +2094,7 @@ async fn verify_join_output_partitioning() -> Result<()> { JoinType::LeftAnti, JoinType::RightAnti, JoinType::LeftMark, + JoinType::RightMark, ]; let default_partition_count = SessionConfig::new().target_partitions(); @@ -2127,7 +2128,8 @@ async fn verify_join_output_partitioning() -> Result<()> { JoinType::Inner | JoinType::Right | JoinType::RightSemi - | JoinType::RightAnti => { + | JoinType::RightAnti + | JoinType::RightMark => { let right_exprs: Vec> = vec![ Arc::new(Column::new_with_schema("c2_c1", &join_schema)?), Arc::new(Column::new_with_schema("c2_c2", &join_schema)?), diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 4034800c30cba..f5b9eef6e398c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -649,7 +649,7 @@ fn multi_hash_joins() -> Result<()> { test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?; test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?; } - JoinType::RightSemi | JoinType::RightAnti => {} + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {} } match join_type { @@ -658,7 +658,8 @@ fn multi_hash_joins() -> Result<()> { | JoinType::Right | JoinType::Full | JoinType::RightSemi - | JoinType::RightAnti => { + | JoinType::RightAnti + | JoinType::RightMark => { // This time we use (b1 == c) for top join // Join on (b1 == c) let top_join_on = vec![( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index be2c236c529c7..8d10f3cd31ab4 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1620,6 +1620,10 @@ pub fn build_join_schema( .map(|(q, f)| (q.cloned(), Arc::clone(f))) .collect() } + JoinType::RightMark => right_fields + .map(|(q, f)| (q.cloned(), Arc::clone(f))) + .chain(once(mark_field(left))) + .collect(), }; let func_dependencies = left.functional_dependencies().join( right.functional_dependencies(), diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index 0c30c9785766b..d8d6739b0e8f0 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -310,7 +310,10 @@ fn check_inner_plan(inner_plan: &LogicalPlan) -> Result<()> { check_inner_plan(left)?; check_no_outer_references(right) } - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + JoinType::Right + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::RightMark => { check_no_outer_references(left)?; check_inner_plan(right) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 30d5f43f3a360..700a238878801 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -556,7 +556,9 @@ impl LogicalPlan { JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { left.head_output_expr() } - JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right.head_output_expr() + } }, LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { static_term.head_output_expr() @@ -1343,7 +1345,9 @@ impl LogicalPlan { JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { left.max_rows() } - JoinType::RightSemi | JoinType::RightAnti => right.max_rows(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right.max_rows() + } }, LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(), LogicalPlan::Union(Union { inputs, .. }) => { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1093c0b3cf691..f405f5be83c5b 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -694,7 +694,8 @@ fn split_join_requirements( | JoinType::Left | JoinType::Right | JoinType::Full - | JoinType::LeftMark => { + | JoinType::LeftMark + | JoinType::RightMark => { // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: indices.split_off(left_len) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index bbf0b0dd810e7..86c93d8004a3e 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -168,7 +168,7 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), // No columns from the left side of the join can be referenced in output // predicates for semi/anti joins, so whether we specify t/f doesn't matter. - JoinType::RightSemi | JoinType::RightAnti => (false, true), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true), } } @@ -191,6 +191,7 @@ pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) { JoinType::LeftAnti => (false, true), JoinType::RightAnti => (true, false), JoinType::LeftMark => (false, true), + JoinType::RightMark => (true, false), } } @@ -691,7 +692,7 @@ fn infer_join_predicates_from_on_filters( inferred_predicates, ) } - JoinType::Right | JoinType::RightSemi => { + JoinType::Right | JoinType::RightSemi | JoinType::RightMark => { infer_join_predicates_impl::( join_col_keys, on_filters, diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 98b1299a2ec6a..0d7e803581820 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -670,7 +670,9 @@ impl EquivalenceGroup { result } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => self.clone(), - JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right_equivalences.clone() + } } } diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 700b00c19dd57..b558592b30fbf 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -334,7 +334,7 @@ pub fn adjust_input_keys_ordering( left.schema().fields().len(), ) .unwrap_or_default(), - JoinType::RightSemi | JoinType::RightAnti => { + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { requirements.data.clone() } JoinType::Left diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 6d2c014f9e7cc..57469305f181c 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -484,7 +484,8 @@ fn expr_source_side( | JoinType::Left | JoinType::Right | JoinType::Full - | JoinType::LeftMark => { + | JoinType::LeftMark + | JoinType::RightMark => { let all_column_sides = required_exprs .iter() .filter_map(|r| { diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index c7719907e3bf4..073ba02748477 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -475,6 +475,7 @@ impl HashJoinExec { | JoinType::Right | JoinType::RightAnti | JoinType::RightSemi + | JoinType::RightMark ), ] } @@ -556,7 +557,8 @@ impl HashJoinExec { | JoinType::LeftSemi | JoinType::RightSemi | JoinType::Right - | JoinType::RightAnti => EmissionType::Incremental, + | JoinType::RightAnti + | JoinType::RightMark => EmissionType::Incremental, // If we need to generate unmatched rows from the *build side*, // we need to emit them at the end. JoinType::Left @@ -1561,15 +1563,27 @@ impl HashJoinStream { self.right_side_ordered, )?; - let result = build_batch_from_indices( - &self.schema, - build_side.left_data.batch(), - &state.batch, - &left_indices, - &right_indices, - &self.column_indices, - JoinSide::Left, - )?; + let result = if self.join_type == JoinType::RightMark { + build_batch_from_indices( + &self.schema, + &state.batch, + build_side.left_data.batch(), + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Right, + )? + } else { + build_batch_from_indices( + &self.schema, + build_side.left_data.batch(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + )? + }; self.join_metrics.output_batches.add(1); self.join_metrics.output_rows.add(result.num_rows()); @@ -3331,6 +3345,95 @@ mod tests { Ok(()) } + #[apply(batch_sizes)] + #[tokio::test] + async fn join_right_mark(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), // 6 does not exist on the left + ("c2", &vec![70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightMark, + false, + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]); + + let expected = [ + "+----+----+----+-------+", + "| a2 | b1 | c2 | mark |", + "+----+----+----+-------+", + "| 10 | 4 | 70 | true |", + "| 20 | 5 | 80 | true |", + "| 30 | 6 | 90 | false |", + "+----+----+----+-------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + + #[apply(batch_sizes)] + #[tokio::test] + async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b1", &vec![4, 4, 5, 6]), // 6 does not exist on the left + ("c2", &vec![60, 70, 80, 90]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches) = partitioned_join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightMark, + false, + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a2", "b1", "c2", "mark"]); + + let expected = [ + "+----+----+----+-------+", + "| a2 | b1 | c2 | mark |", + "+----+----+----+-------+", + "| 10 | 4 | 60 | true |", + "| 20 | 4 | 70 | true |", + "| 30 | 5 | 80 | true |", + "| 40 | 6 | 90 | false |", + "+----+----+----+-------+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[test] fn join_with_hash_collision() -> Result<()> { let mut hashmap_left = HashTable::with_capacity(2); @@ -3753,6 +3856,15 @@ mod tests { "| 3 | 7 | 9 | false |", "+----+----+----+-------+", ]; + let expected_right_mark = vec![ + "+----+----+----+-------+", + "| a2 | b2 | c2 | mark |", + "+----+----+----+-------+", + "| 10 | 4 | 70 | true |", + "| 20 | 5 | 80 | true |", + "| 30 | 6 | 90 | false |", + "+----+----+----+-------+", + ]; let test_cases = vec![ (JoinType::Inner, expected_inner), @@ -3764,6 +3876,7 @@ mod tests { (JoinType::RightSemi, expected_right_semi), (JoinType::RightAnti, expected_right_anti), (JoinType::LeftMark, expected_left_mark), + (JoinType::RightMark, expected_right_mark), ]; for (join_type, expected) in test_cases { @@ -4043,6 +4156,7 @@ mod tests { JoinType::RightSemi, JoinType::RightAnti, JoinType::LeftMark, + JoinType::RightMark, ]; for join_type in join_types { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index f87cf3d8864cf..d308e21d48695 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -274,7 +274,8 @@ impl NestedLoopJoinExec { | JoinType::LeftSemi | JoinType::RightSemi | JoinType::Right - | JoinType::RightAnti => EmissionType::Incremental, + | JoinType::RightAnti + | JoinType::RightMark => EmissionType::Incremental, // If we need to generate unmatched rows from the *build side*, // we need to emit them at the end. JoinType::Left @@ -1009,15 +1010,27 @@ fn join_left_and_right_batch( right_side_ordered, )?; - build_batch_from_indices( - schema, - left_batch, - right_batch, - &left_side, - &right_side, - column_indices, - JoinSide::Left, - ) + if join_type == JoinType::RightMark { + build_batch_from_indices( + schema, + right_batch, + left_batch, + &left_side, + &right_side, + column_indices, + JoinSide::Right, + ) + } else { + build_batch_from_indices( + schema, + left_batch, + right_batch, + &left_side, + &right_side, + column_indices, + JoinSide::Left, + ) + } } impl Stream for NestedLoopJoinStream { @@ -1464,6 +1477,36 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn join_right_mark_with_filter() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_left_table(); + let right = build_right_table(); + + let filter = prepare_join_filter(); + let (columns, batches) = multi_partitioned_join_collect( + left, + right, + &JoinType::RightMark, + Some(filter), + task_ctx, + ) + .await?; + assert_eq!(columns, vec!["a2", "b2", "c2", "mark"]); + + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+-----+-------+ + | a2 | b2 | c2 | mark | + +----+----+-----+-------+ + | 10 | 10 | 100 | false | + | 12 | 10 | 40 | false | + | 2 | 2 | 80 | true | + +----+----+-----+-------+ + "#); + + Ok(()) + } + #[tokio::test] async fn test_overallocation() -> Result<()> { let left = build_table( @@ -1492,6 +1535,7 @@ pub(crate) mod tests { JoinType::LeftMark, JoinType::RightSemi, JoinType::RightAnti, + JoinType::RightMark, ]; for join_type in join_types { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 07e48224af2ff..00b3adc3af7ce 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -221,9 +221,11 @@ impl SortMergeJoinExec { // When output schema contains only the right side, probe side is right. // Otherwise probe side is the left side. match join_type { - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { - JoinSide::Right - } + // TODO: sort merge support for right mark + JoinType::Right + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::RightMark => JoinSide::Right, JoinType::Inner | JoinType::Left | JoinType::Full @@ -241,7 +243,10 @@ impl SortMergeJoinExec { | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => vec![true, false], - JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { + JoinType::Right + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::RightMark => { vec![false, true] } _ => vec![false, false], diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 819a3302b0626..7b46717f7e05d 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -777,7 +777,11 @@ fn need_to_produce_result_in_final(build_side: JoinSide, join_type: JoinType) -> } else { matches!( join_type, - JoinType::Right | JoinType::RightAnti | JoinType::Full | JoinType::RightSemi + JoinType::Right + | JoinType::RightAnti + | JoinType::Full + | JoinType::RightSemi + | JoinType::RightMark ) } } @@ -825,6 +829,20 @@ where .collect(); (build_indices, probe_indices) } + (JoinSide::Right, JoinType::RightMark) => { + let build_indices = (0..prune_length) + .map(L::Native::from_usize) + .collect::>(); + let probe_indices = (0..prune_length) + .map(|idx| { + // For mark join we output a dummy index 0 to indicate the row had a match + visited_rows + .contains(&(idx + deleted_offset)) + .then_some(R::Native::from_usize(0).unwrap()) + }) + .collect(); + (build_indices, probe_indices) + } // In the case of `Left` or `Right` join, or `Full` join, get the anti indices (JoinSide::Left, JoinType::Left | JoinType::LeftAnti) | (JoinSide::Right, JoinType::Right | JoinType::RightAnti) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 4be14374249a6..f45e014297002 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -34,6 +34,7 @@ pub use super::join_filter::JoinFilter; pub use super::join_hash_map::{JoinHashMap, JoinHashMapType}; pub use crate::joins::{JoinOn, JoinOnRef}; +use arrow::array::PrimitiveBuilder; use arrow::array::{ builder::UInt64Builder, downcast_array, new_null_array, Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch, RecordBatchOptions, @@ -246,6 +247,7 @@ fn output_join_field(old_field: &Field, join_type: &JoinType, is_left: bool) -> JoinType::LeftAnti => false, // doesn't introduce nulls (or can it??) JoinType::RightAnti => false, // doesn't introduce nulls (or can it??) JoinType::LeftMark => false, + JoinType::RightMark => false, }; if force_nullable { @@ -312,6 +314,16 @@ pub fn build_join_schema( left_fields().chain(right_field).unzip() } JoinType::RightSemi | JoinType::RightAnti => right_fields().unzip(), + JoinType::RightMark => { + let left_field = once(( + Field::new("mark", arrow_schema::DataType::Boolean, false), + ColumnIndex { + index: 0, + side: JoinSide::None, + }, + )); + right_fields().chain(left_field).unzip() + } }; let metadata = left @@ -533,6 +545,15 @@ fn estimate_join_cardinality( column_statistics, }) } + JoinType::RightMark => { + let num_rows = *right_stats.num_rows.get_value()?; + let mut column_statistics = right_stats.column_statistics; + column_statistics.push(ColumnStatistics::new_unknown()); + Some(PartialJoinStatistics { + num_rows, + column_statistics, + }) + } } } @@ -975,6 +996,12 @@ pub(crate) fn adjust_indices_by_join_type( // the left_indices will not be used later for the `right anti` join Ok((left_indices, right_indices)) } + JoinType::RightMark => { + let right_indices = get_mark_indices(&adjust_range, &right_indices); + let left_indices = + UInt64Array::from_iter_values(adjust_range.map(|i| i as u64)); + Ok((left_indices, right_indices)) + } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { // matched or unmatched left row will be produced in the end of loop // When visit the right batch, we can output the matched left row and don't need to wait the end of loop @@ -1126,6 +1153,28 @@ where .collect() } +pub(crate) fn get_mark_indices( + range: &Range, + input_indices: &PrimitiveArray, +) -> PrimitiveArray +where + NativeAdapter: From<::Native>, +{ + let mut builder = PrimitiveBuilder::::new(); + + for idx in range.clone() { + let matched = input_indices.iter().flatten().any(|v| v.as_usize() == idx); + + if matched { + builder.append_value(0); + } else { + builder.append_null(); + } + } + + builder.finish() +} + /// Appends probe indices in order by considering the given build indices. /// /// This function constructs new build and probe indices by iterating through @@ -1301,7 +1350,9 @@ pub(crate) fn symmetric_join_output_partitioning( JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { left_partitioning.clone() } - JoinType::RightSemi | JoinType::RightAnti => right_partitioning.clone(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right_partitioning.clone() + } JoinType::Inner | JoinType::Right => { adjust_right_output_partitioning(right_partitioning, left_columns_len) } @@ -1322,7 +1373,9 @@ pub(crate) fn asymmetric_join_output_partitioning( right.output_partitioning(), left.schema().fields().len(), ), - JoinType::RightSemi | JoinType::RightAnti => right.output_partitioning().clone(), + JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { + right.output_partitioning().clone() + } JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 82f1e91d9c9b4..4409825d5d0dd 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -85,6 +85,7 @@ enum JoinType { RIGHTSEMI = 6; RIGHTANTI = 7; LEFTMARK = 8; + RIGHTMARK = 9; } enum JoinConstraint { diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b44b05e9ca296..95c5bfb265c10 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -3856,6 +3856,7 @@ impl serde::Serialize for JoinType { Self::Rightsemi => "RIGHTSEMI", Self::Rightanti => "RIGHTANTI", Self::Leftmark => "LEFTMARK", + Self::Rightmark => "RIGHTMARK", }; serializer.serialize_str(variant) } @@ -3876,6 +3877,7 @@ impl<'de> serde::Deserialize<'de> for JoinType { "RIGHTSEMI", "RIGHTANTI", "LEFTMARK", + "RIGHTMARK", ]; struct GeneratedVisitor; @@ -3925,6 +3927,7 @@ impl<'de> serde::Deserialize<'de> for JoinType { "RIGHTSEMI" => Ok(JoinType::Rightsemi), "RIGHTANTI" => Ok(JoinType::Rightanti), "LEFTMARK" => Ok(JoinType::Leftmark), + "RIGHTMARK" => Ok(JoinType::Rightmark), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index e029327d481d1..ada7109ae5d0c 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -906,6 +906,7 @@ pub enum JoinType { Rightsemi = 6, Rightanti = 7, Leftmark = 8, + Rightmark = 9, } impl JoinType { /// String value of the enum field names used in the ProtoBuf definition. @@ -923,6 +924,7 @@ impl JoinType { Self::Rightsemi => "RIGHTSEMI", Self::Rightanti => "RIGHTANTI", Self::Leftmark => "LEFTMARK", + Self::Rightmark => "RIGHTMARK", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -937,6 +939,7 @@ impl JoinType { "RIGHTSEMI" => Some(Self::Rightsemi), "RIGHTANTI" => Some(Self::Rightanti), "LEFTMARK" => Some(Self::Leftmark), + "RIGHTMARK" => Some(Self::Rightmark), _ => None, } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index e029327d481d1..ada7109ae5d0c 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -906,6 +906,7 @@ pub enum JoinType { Rightsemi = 6, Rightanti = 7, Leftmark = 8, + Rightmark = 9, } impl JoinType { /// String value of the enum field names used in the ProtoBuf definition. @@ -923,6 +924,7 @@ impl JoinType { Self::Rightsemi => "RIGHTSEMI", Self::Rightanti => "RIGHTANTI", Self::Leftmark => "LEFTMARK", + Self::Rightmark => "RIGHTMARK", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -937,6 +939,7 @@ impl JoinType { "RIGHTSEMI" => Some(Self::Rightsemi), "RIGHTANTI" => Some(Self::Rightanti), "LEFTMARK" => Some(Self::Leftmark), + "RIGHTMARK" => Some(Self::Rightmark), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 9f0489d6b0ea4..2ffc4625fb415 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -205,6 +205,7 @@ impl From for JoinType { protobuf::JoinType::Leftanti => JoinType::LeftAnti, protobuf::JoinType::Rightanti => JoinType::RightAnti, protobuf::JoinType::Leftmark => JoinType::LeftMark, + protobuf::JoinType::Rightmark => JoinType::RightMark, } } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 841c31fa035f4..abc81428abd32 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -687,6 +687,7 @@ impl From for protobuf::JoinType { JoinType::LeftAnti => protobuf::JoinType::Leftanti, JoinType::RightAnti => protobuf::JoinType::Rightanti, JoinType::LeftMark => protobuf::JoinType::Leftmark, + JoinType::RightMark => protobuf::JoinType::Rightmark, } } } diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 4209734138f1f..2228e9aa00f26 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -714,7 +714,8 @@ impl Unparser<'_> { | JoinType::LeftAnti | JoinType::LeftMark | JoinType::RightSemi - | JoinType::RightAnti => { + | JoinType::RightAnti + | JoinType::RightMark => { let mut query_builder = QueryBuilder::default(); let mut from = TableWithJoinsBuilder::default(); let mut exists_select: SelectBuilder = SelectBuilder::default(); @@ -738,7 +739,8 @@ impl Unparser<'_> { let negated = match join.join_type { JoinType::LeftSemi | JoinType::RightSemi - | JoinType::LeftMark => false, + | JoinType::LeftMark + | JoinType::RightMark => false, JoinType::LeftAnti | JoinType::RightAnti => true, _ => unreachable!(), }; @@ -746,13 +748,29 @@ impl Unparser<'_> { subquery: Box::new(query_builder.build()?), negated, }; - if join.join_type == JoinType::LeftMark { - let (table_ref, _) = right_plan.schema().qualified_field(0); - let column = self - .col_to_sql(&Column::new(table_ref.cloned(), "mark"))?; - select.replace_mark(&column, &exists_expr); - } else { - select.selection(Some(exists_expr)); + + match join.join_type { + JoinType::LeftMark => { + let (table_ref, _) = + right_plan.schema().qualified_field(0); + let column = self.col_to_sql(&Column::new( + table_ref.cloned(), + "mark", + ))?; + select.replace_mark(&column, &exists_expr); + } + JoinType::RightMark => { + let (table_ref, _) = + left_plan.schema().qualified_field(0); + let column = self.col_to_sql(&Column::new( + table_ref.cloned(), + "mark", + ))?; + select.replace_mark(&column, &exists_expr); + } + _ => { + select.selection(Some(exists_expr)); + } } if let Some(projection) = left_projection { select.projection(projection); @@ -1243,7 +1261,9 @@ impl Unparser<'_> { JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint), JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint), JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint), - JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"), + JoinType::LeftMark | JoinType::RightMark => { + unimplemented!("Unparsing of Mark join type") + } }) } diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs index 881157dcfa662..fab43a5ff411d 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs @@ -132,6 +132,7 @@ fn from_substrait_jointype(join_type: i32) -> datafusion::common::Result Ok(JoinType::LeftAnti), join_rel::JoinType::LeftSemi => Ok(JoinType::LeftSemi), join_rel::JoinType::LeftMark => Ok(JoinType::LeftMark), + join_rel::JoinType::RightMark => Ok(JoinType::RightMark), _ => plan_err!("unsupported join type {substrait_join_type:?}"), } } else { diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs index 31304b73a0048..2c997b1fcbb9e 100644 --- a/datafusion/substrait/src/logical_plan/producer.rs +++ b/datafusion/substrait/src/logical_plan/producer.rs @@ -1091,6 +1091,7 @@ fn to_substrait_jointype(join_type: JoinType) -> join_rel::JoinType { JoinType::LeftAnti => join_rel::JoinType::LeftAnti, JoinType::LeftSemi => join_rel::JoinType::LeftSemi, JoinType::LeftMark => join_rel::JoinType::LeftMark, + JoinType::RightMark => join_rel::JoinType::RightMark, JoinType::RightAnti | JoinType::RightSemi => { unimplemented!() } From 87a058b1718b191b10fe93ab98e6ea8ee984f0b1 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 1 Jun 2025 22:34:09 -0400 Subject: [PATCH 02/13] fixes --- datafusion/common/src/join_type.rs | 2 +- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 25 ++++++++++++++ .../src/joins/nested_loop_join.rs | 3 ++ .../src/joins/sort_merge_join.rs | 2 +- datafusion/physical-plan/src/joins/utils.rs | 34 ++++++++++--------- datafusion/sql/src/unparser/plan.rs | 24 +++++-------- 6 files changed, 56 insertions(+), 34 deletions(-) diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index 05d6f70aa3e10..d9a1478f02387 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -97,7 +97,7 @@ impl JoinType { } } - /// Does the join type support swapping inputs? + /// Does the join type support swapping inputs? pub fn supports_swap(&self) -> bool { matches!( self, diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 0c410c54ac3c8..f835281ac188a 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -305,6 +305,31 @@ async fn test_left_mark_join_1k_filtered() { .await } +// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support +#[tokio::test] +async fn test_right_mark_join_1k() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::RightMark, + None, + ) + .run_test(&[NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_mark_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj], false) + .await +} + type JoinFilterBuilder = Box, Arc) -> JoinFilter>; struct JoinFuzzTestCase { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index d308e21d48695..5b2ce904482c0 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1010,6 +1010,9 @@ fn join_left_and_right_batch( right_side_ordered, )?; + // Switch around the build side and probe side for `JoinType::RightMark` + // because in a RightMark join, we want to mark rows on the right table + // by looking for matches in the left. if join_type == JoinType::RightMark { build_batch_from_indices( schema, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 00b3adc3af7ce..56d2b7f079ce2 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -221,7 +221,7 @@ impl SortMergeJoinExec { // When output schema contains only the right side, probe side is right. // Otherwise probe side is the left side. match join_type { - // TODO: sort merge support for right mark + // TODO: sort merge support for right mark (tracked here: https://github.com/apache/datafusion/issues/16226) JoinType::Right | JoinType::RightSemi | JoinType::RightAnti diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index f45e014297002..1debfe9ebcb93 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -34,12 +34,12 @@ pub use super::join_filter::JoinFilter; pub use super::join_hash_map::{JoinHashMap, JoinHashMapType}; pub use crate::joins::{JoinOn, JoinOnRef}; -use arrow::array::PrimitiveBuilder; use arrow::array::{ builder::UInt64Builder, downcast_array, new_null_array, Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, UInt64Array, }; +use arrow::buffer::NullBuffer; use arrow::compute; use arrow::datatypes::{ ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type, @@ -925,7 +925,7 @@ pub(crate) fn build_batch_from_indices( for column_index in column_indices { let array = if column_index.side == JoinSide::None { - // LeftMark join, the mark column is a true if the indices is not null, otherwise it will be false + // For marks joins, the mark column is a true if the indices is not null, otherwise it will be false Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { let array = build_input_buffer.column(column_index.index); @@ -998,8 +998,8 @@ pub(crate) fn adjust_indices_by_join_type( } JoinType::RightMark => { let right_indices = get_mark_indices(&adjust_range, &right_indices); - let left_indices = - UInt64Array::from_iter_values(adjust_range.map(|i| i as u64)); + let left_indices_vec: Vec = adjust_range.map(|i| i as u64).collect(); + let left_indices = UInt64Array::from(left_indices_vec); Ok((left_indices, right_indices)) } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { @@ -1160,19 +1160,21 @@ pub(crate) fn get_mark_indices( where NativeAdapter: From<::Native>, { - let mut builder = PrimitiveBuilder::::new(); - - for idx in range.clone() { - let matched = input_indices.iter().flatten().any(|v| v.as_usize() == idx); - - if matched { - builder.append_value(0); - } else { - builder.append_null(); - } - } + let mut bitmap = BooleanBufferBuilder::new(range.len()); + bitmap.append_n(range.len(), false); + input_indices + .iter() + .flatten() + .map(|v| v.as_usize()) + .filter(|v| range.contains(v)) + .for_each(|v| { + bitmap.set_bit(v - range.start, true); + }); - builder.finish() + PrimitiveArray::new( + vec![0; range.len()].into(), + Some(NullBuffer::new(bitmap.finish())), + ) } /// Appends probe indices in order by considering the given build indices. diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 2228e9aa00f26..e56384c61e754 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -750,22 +750,14 @@ impl Unparser<'_> { }; match join.join_type { - JoinType::LeftMark => { - let (table_ref, _) = - right_plan.schema().qualified_field(0); - let column = self.col_to_sql(&Column::new( - table_ref.cloned(), - "mark", - ))?; - select.replace_mark(&column, &exists_expr); - } - JoinType::RightMark => { - let (table_ref, _) = - left_plan.schema().qualified_field(0); - let column = self.col_to_sql(&Column::new( - table_ref.cloned(), - "mark", - ))?; + JoinType::LeftMark | JoinType::RightMark => { + let source_schema = if join.join_type == JoinType::LeftMark { + left_plan.schema() + } else { + right_plan.schema() + }; + let (table_ref, _) = source_schema.qualified_field(0); + let column = self.col_to_sql(&Column::new(table_ref.cloned(), "mark"))?; select.replace_mark(&column, &exists_expr); } _ => { From 69b9dfb7fa8fb98e4cb230c97ffcacfd66c76e2d Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 1 Jun 2025 22:49:24 -0400 Subject: [PATCH 03/13] producer fix --- datafusion/substrait/src/logical_plan/producer/rel/join.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/substrait/src/logical_plan/producer/rel/join.rs b/datafusion/substrait/src/logical_plan/producer/rel/join.rs index 79564ad5daf1e..65c3e426d2694 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/join.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/join.rs @@ -113,6 +113,7 @@ fn to_substrait_jointype(join_type: JoinType) -> join_rel::JoinType { JoinType::LeftAnti => join_rel::JoinType::LeftAnti, JoinType::LeftSemi => join_rel::JoinType::LeftSemi, JoinType::LeftMark => join_rel::JoinType::LeftMark, + JoinType::RightMark => join_rel::JoinType::RightMark, JoinType::RightAnti | JoinType::RightSemi => { unimplemented!() } From 8f7f429b3e0b9e2c41b2a2bd21fe455977b59c00 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Sun, 1 Jun 2025 22:57:09 -0400 Subject: [PATCH 04/13] fmt --- .../physical-plan/src/joins/nested_loop_join.rs | 2 +- datafusion/sql/src/unparser/plan.rs | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 5b2ce904482c0..50f68b9d6e814 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1010,7 +1010,7 @@ fn join_left_and_right_batch( right_side_ordered, )?; - // Switch around the build side and probe side for `JoinType::RightMark` + // Switch around the build side and probe side for `JoinType::RightMark` // because in a RightMark join, we want to mark rows on the right table // by looking for matches in the left. if join_type == JoinType::RightMark { diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 42fd1dd084e1d..2779a2b5d3cb5 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -751,13 +751,17 @@ impl Unparser<'_> { match join.join_type { JoinType::LeftMark | JoinType::RightMark => { - let source_schema = if join.join_type == JoinType::LeftMark { - left_plan.schema() - } else { - right_plan.schema() - }; + let source_schema = + if join.join_type == JoinType::LeftMark { + left_plan.schema() + } else { + right_plan.schema() + }; let (table_ref, _) = source_schema.qualified_field(0); - let column = self.col_to_sql(&Column::new(table_ref.cloned(), "mark"))?; + let column = self.col_to_sql(&Column::new( + table_ref.cloned(), + "mark", + ))?; select.replace_mark(&column, &exists_expr); } _ => { From c4891393ae3ad6e3e8b1671e90d918a9f7f7ba13 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 2 Jun 2025 13:13:10 -0400 Subject: [PATCH 05/13] update --- datafusion/sql/src/unparser/plan.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 2779a2b5d3cb5..335c092a8bfc3 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -753,9 +753,9 @@ impl Unparser<'_> { JoinType::LeftMark | JoinType::RightMark => { let source_schema = if join.join_type == JoinType::LeftMark { - left_plan.schema() - } else { right_plan.schema() + } else { + left_plan.schema() }; let (table_ref, _) = source_schema.qualified_field(0); let column = self.col_to_sql(&Column::new( @@ -768,9 +768,6 @@ impl Unparser<'_> { select.selection(Some(exists_expr)); } } - if let Some(projection) = left_projection { - select.projection(projection); - } } JoinType::Inner | JoinType::Left From 17f4843d621a76478546531fd3cd6077fab8e93c Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 2 Jun 2025 14:30:23 -0400 Subject: [PATCH 06/13] fix --- datafusion/sql/src/unparser/plan.rs | 3 +++ datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap | 2 ++ 2 files changed, 5 insertions(+) create mode 100644 datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 335c092a8bfc3..1241c5330e7d2 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -768,6 +768,9 @@ impl Unparser<'_> { select.selection(Some(exists_expr)); } } + if let Some(projection) = left_projection { + select.projection(projection); + } } JoinType::Inner | JoinType::Left diff --git a/datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap b/datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap new file mode 100644 index 0000000000000..59578944ee213 --- /dev/null +++ b/datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap @@ -0,0 +1,2 @@ +{"run_id":"1748888647-232701000","line":2280,"new":{"module_name":"sql_integration__cases__plan_to_sql","snapshot_name":"unparse_left_semi_join","metadata":{"source":"datafusion/sql/tests/cases/plan_to_sql.rs","assertion_line":2280,"expression":"sql"},"snapshot":"SELECT * FROM \"t1\" WHERE EXISTS (SELECT 1 FROM \"t2\" AS \"__correlated_sq_1\" WHERE (\"t1\".\"c\" = \"__correlated_sq_1\".\"c\"))"},"old":{"module_name":"sql_integration__cases__plan_to_sql","metadata":{},"snapshot":"SELECT \"t1\".\"d\" FROM \"t1\" WHERE EXISTS (SELECT 1 FROM \"t2\" AS \"__correlated_sq_1\" WHERE (\"t1\".\"c\" = \"__correlated_sq_1\".\"c\"))"}} +{"run_id":"1748888999-160300000","line":2280,"new":null,"old":null} From 396a45ba0334e4613b7488062368cb4ac2fc63d5 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 2 Jun 2025 15:30:28 -0400 Subject: [PATCH 07/13] rem file --- datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap diff --git a/datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap b/datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap deleted file mode 100644 index 59578944ee213..0000000000000 --- a/datafusion/sql/tests/cases/.plan_to_sql.rs.pending-snap +++ /dev/null @@ -1,2 +0,0 @@ -{"run_id":"1748888647-232701000","line":2280,"new":{"module_name":"sql_integration__cases__plan_to_sql","snapshot_name":"unparse_left_semi_join","metadata":{"source":"datafusion/sql/tests/cases/plan_to_sql.rs","assertion_line":2280,"expression":"sql"},"snapshot":"SELECT * FROM \"t1\" WHERE EXISTS (SELECT 1 FROM \"t2\" AS \"__correlated_sq_1\" WHERE (\"t1\".\"c\" = \"__correlated_sq_1\".\"c\"))"},"old":{"module_name":"sql_integration__cases__plan_to_sql","metadata":{},"snapshot":"SELECT \"t1\".\"d\" FROM \"t1\" WHERE EXISTS (SELECT 1 FROM \"t2\" AS \"__correlated_sq_1\" WHERE (\"t1\".\"c\" = \"__correlated_sq_1\".\"c\"))"}} -{"run_id":"1748888999-160300000","line":2280,"new":null,"old":null} From b01e95d93fd844d24a5f17aa79753dd4410986e4 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 9 Jun 2025 15:37:32 -0400 Subject: [PATCH 08/13] fix --- datafusion/physical-expr/src/equivalence/class.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index cd5549549e751..8af6f3be0389a 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -794,7 +794,8 @@ impl EquivalenceGroup { JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { right_equivalences.clone() } - } + }; + Ok(group) } /// Checks if two expressions are equal directly or through equivalence From 1bef6dcef4bc52a7b1c5907c2765c0ec02d0c31a Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 9 Jun 2025 15:49:48 -0400 Subject: [PATCH 09/13] fmt --- datafusion/physical-plan/src/joins/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 2c2895aa50700..85f223eb49682 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1351,7 +1351,7 @@ pub(crate) fn asymmetric_join_output_partitioning( JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( right.output_partitioning(), left.schema().fields().len(), - ), + )?, JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => { right.output_partitioning().clone() } From ed8cd5625d72a773a3648759c0e7595a9ff656b6 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 11 Jun 2025 17:50:21 -0400 Subject: [PATCH 10/13] Update datafusion/physical-plan/src/joins/utils.rs Co-authored-by: Christian <9384305+ctsk@users.noreply.github.com> --- datafusion/physical-plan/src/joins/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 85f223eb49682..c69508e02ee45 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -901,7 +901,7 @@ pub(crate) fn build_batch_from_indices( for column_index in column_indices { let array = if column_index.side == JoinSide::None { - // For marks joins, the mark column is a true if the indices is not null, otherwise it will be false + // For mark joins, the mark column is a true if the indices is not null, otherwise it will be false Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { let array = build_input_buffer.column(column_index.index); From e9978b8280416e68b4f20cf96a2bb61bf47cd6d1 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 11 Jun 2025 18:41:33 -0400 Subject: [PATCH 11/13] fixes --- .../optimizer/src/optimize_projections/mod.rs | 8 +-- datafusion/physical-plan/src/joins/utils.rs | 54 ++++++++----------- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 572b23af91740..33af52824a292 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -672,10 +672,10 @@ fn outer_columns_helper_multi<'a, 'b>( /// Depending on the join type, it divides the requirement indices into those /// that apply to the left child and those that apply to the right child. /// -/// - For `INNER`, `LEFT`, `RIGHT` and `FULL` joins, the requirements are split -/// between left and right children. The right child indices are adjusted to -/// point to valid positions within the right child by subtracting the length -/// of the left child. +/// - For `INNER`, `LEFT`, `RIGHT`, `FULL`, `LEFTMARK`, and `RIGHTMARK` joins, +/// the requirements are split between left and right children. The right +/// child indices are adjusted to point to valid positions within the right +/// child by subtracting the length of the left child. /// /// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all /// requirements are re-routed to either the left child or the right child diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index c69508e02ee45..c023c58aa5bae 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1079,17 +1079,7 @@ pub(crate) fn get_anti_indices( where NativeAdapter: From<::Native>, { - let mut bitmap = BooleanBufferBuilder::new(range.len()); - bitmap.append_n(range.len(), false); - input_indices - .iter() - .flatten() - .map(|v| v.as_usize()) - .filter(|v| range.contains(v)) - .for_each(|v| { - bitmap.set_bit(v - range.start, true); - }); - + let bitmap = build_range_bitmap(&range, input_indices); let offset = range.start; // get the anti index @@ -1108,19 +1098,8 @@ pub(crate) fn get_semi_indices( where NativeAdapter: From<::Native>, { - let mut bitmap = BooleanBufferBuilder::new(range.len()); - bitmap.append_n(range.len(), false); - input_indices - .iter() - .flatten() - .map(|v| v.as_usize()) - .filter(|v| range.contains(v)) - .for_each(|v| { - bitmap.set_bit(v - range.start, true); - }); - + let bitmap = build_range_bitmap(&range, input_indices); let offset = range.start; - // get the semi index (range) .filter_map(|idx| { @@ -1136,21 +1115,30 @@ pub(crate) fn get_mark_indices( where NativeAdapter: From<::Native>, { - let mut bitmap = BooleanBufferBuilder::new(range.len()); - bitmap.append_n(range.len(), false); - input_indices + let mut bitmap = build_range_bitmap(&range, input_indices); + PrimitiveArray::new( + vec![0; range.len()].into(), + Some(NullBuffer::new(bitmap.finish())), + ) +} + +fn build_range_bitmap( + range: &Range, + input: &PrimitiveArray, +) -> BooleanBufferBuilder { + let mut builder = BooleanBufferBuilder::new(range.len()); + builder.append_n(range.len(), false); + + input .iter() .flatten() .map(|v| v.as_usize()) - .filter(|v| range.contains(v)) - .for_each(|v| { - bitmap.set_bit(v - range.start, true); + .filter(|&idx| range.contains(&idx)) + .for_each(|idx| { + builder.set_bit(idx - range.start, true); }); - PrimitiveArray::new( - vec![0; range.len()].into(), - Some(NullBuffer::new(bitmap.finish())), - ) + builder } /// Appends probe indices in order by considering the given build indices. From 92ecd2b6a6ebb224f14d3037695ba2fc8c8c1f59 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 11 Jun 2025 19:49:07 -0400 Subject: [PATCH 12/13] clippy --- datafusion/physical-plan/src/joins/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index c023c58aa5bae..2f612a7bfdcb1 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1115,7 +1115,7 @@ pub(crate) fn get_mark_indices( where NativeAdapter: From<::Native>, { - let mut bitmap = build_range_bitmap(&range, input_indices); + let mut bitmap = build_range_bitmap(range, input_indices); PrimitiveArray::new( vec![0; range.len()].into(), Some(NullBuffer::new(bitmap.finish())), From c0a41855eea536b29f5e3c037c1bcbb860006efd Mon Sep 17 00:00:00 2001 From: Jonathan Date: Thu, 12 Jun 2025 04:30:08 -0400 Subject: [PATCH 13/13] refactor --- datafusion/physical-plan/src/joins/utils.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 2f612a7bfdcb1..c5f7087ac195f 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1129,14 +1129,12 @@ fn build_range_bitmap( let mut builder = BooleanBufferBuilder::new(range.len()); builder.append_n(range.len(), false); - input - .iter() - .flatten() - .map(|v| v.as_usize()) - .filter(|&idx| range.contains(&idx)) - .for_each(|idx| { + input.iter().flatten().for_each(|v| { + let idx = v.as_usize(); + if range.contains(&idx) { builder.set_bit(idx - range.start, true); - }); + } + }); builder }