Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl FunctionalDependencies {
// These joins preserve functional dependencies of the left side:
left_func_dependencies
}
JoinType::RightSemi | JoinType::RightAnti => {
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
// These joins preserve functional dependencies of the right side:
right_func_dependencies
}
Expand Down
14 changes: 10 additions & 4 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ pub enum JoinType {
///
/// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf
LeftMark,
/// Right Mark Join
///
/// Same logic as the LeftMark Join above, however it returns a record for each record from the
/// right input.
RightMark,
}

impl JoinType {
Expand All @@ -87,13 +92,12 @@ impl JoinType {
JoinType::RightSemi => JoinType::LeftSemi,
JoinType::LeftAnti => JoinType::RightAnti,
JoinType::RightAnti => JoinType::LeftAnti,
JoinType::LeftMark => {
unreachable!("LeftMark join type does not support swapping")
}
JoinType::LeftMark => JoinType::RightMark,
JoinType::RightMark => JoinType::LeftMark,
}
}

/// Does the join type support swapping inputs?
/// Does the join type support swapping inputs?
pub fn supports_swap(&self) -> bool {
matches!(
self,
Expand Down Expand Up @@ -121,6 +125,7 @@ impl Display for JoinType {
JoinType::LeftAnti => "LeftAnti",
JoinType::RightAnti => "RightAnti",
JoinType::LeftMark => "LeftMark",
JoinType::RightMark => "RightMark",
};
write!(f, "{join_type}")
}
Expand All @@ -141,6 +146,7 @@ impl FromStr for JoinType {
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
"LEFTMARK" => Ok(JoinType::LeftMark),
"RIGHTMARK" => Ok(JoinType::RightMark),
_ => _not_impl_err!("The join type {s} does not exist or is not implemented"),
}
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,7 @@ async fn verify_join_output_partitioning() -> Result<()> {
JoinType::LeftAnti,
JoinType::RightAnti,
JoinType::LeftMark,
JoinType::RightMark,
];

let default_partition_count = SessionConfig::new().target_partitions();
Expand Down Expand Up @@ -2178,7 +2179,8 @@ async fn verify_join_output_partitioning() -> Result<()> {
JoinType::Inner
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => {
| JoinType::RightAnti
| JoinType::RightMark => {
let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c2_c1", &join_schema)?),
Arc::new(Column::new_with_schema("c2_c2", &join_schema)?),
Expand Down
25 changes: 25 additions & 0 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,31 @@ async fn test_left_mark_join_1k_filtered() {
.await
}

// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed to add RightMark Join support to SortMergeJoin execution as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I will close #16226 as it is a duplicate

#[tokio::test]
async fn test_right_mark_join_1k() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::RightMark,
None,
)
.run_test(&[NljHj], false)
.await
}

#[tokio::test]
async fn test_right_mark_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::RightMark,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[NljHj], false)
.await
}

type JoinFilterBuilder = Box<dyn Fn(Arc<Schema>, Arc<Schema>) -> JoinFilter>;

struct JoinFuzzTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ fn multi_hash_joins() -> Result<()> {
test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
test_config.run(&expected, top_join, &SORT_DISTRIB_DISTRIB)?;
}
JoinType::RightSemi | JoinType::RightAnti => {}
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {}
}

match join_type {
Expand All @@ -636,7 +636,8 @@ fn multi_hash_joins() -> Result<()> {
| JoinType::Right
| JoinType::Full
| JoinType::RightSemi
| JoinType::RightAnti => {
| JoinType::RightAnti
| JoinType::RightMark => {
// This time we use (b1 == c) for top join
// Join on (b1 == c)
let top_join_on = vec![(
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,10 @@ pub fn build_join_schema(
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.collect()
}
JoinType::RightMark => right_fields
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.chain(once(mark_field(left)))
.collect(),
};
let func_dependencies = left.functional_dependencies().join(
right.functional_dependencies(),
Expand Down
5 changes: 4 additions & 1 deletion datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ fn check_inner_plan(inner_plan: &LogicalPlan) -> Result<()> {
check_inner_plan(left)?;
check_no_outer_references(right)
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark => {
check_no_outer_references(left)?;
check_inner_plan(right)
}
Expand Down
8 changes: 6 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,9 @@ impl LogicalPlan {
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
left.head_output_expr()
}
JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
right.head_output_expr()
}
},
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
static_term.head_output_expr()
Expand Down Expand Up @@ -1340,7 +1342,9 @@ impl LogicalPlan {
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
left.max_rows()
}
JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
right.max_rows()
}
},
LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
LogicalPlan::Union(Union { inputs, .. }) => {
Expand Down
11 changes: 6 additions & 5 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,10 @@ fn outer_columns_helper_multi<'a, 'b>(
/// Depending on the join type, it divides the requirement indices into those
/// that apply to the left child and those that apply to the right child.
///
/// - For `INNER`, `LEFT`, `RIGHT` and `FULL` joins, the requirements are split
/// between left and right children. The right child indices are adjusted to
/// point to valid positions within the right child by subtracting the length
/// of the left child.
/// - For `INNER`, `LEFT`, `RIGHT`, `FULL`, `LEFTMARK`, and `RIGHTMARK` joins,
/// the requirements are split between left and right children. The right
/// child indices are adjusted to point to valid positions within the right
/// child by subtracting the length of the left child.
///
/// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all
/// requirements are re-routed to either the left child or the right child
Expand Down Expand Up @@ -704,7 +704,8 @@ fn split_join_requirements(
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftMark => {
| JoinType::LeftMark
| JoinType::RightMark => {
Comment on lines -707 to +708
Copy link
Contributor

@ctsk ctsk Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit that I don't understand why LeftMark/RightMark are different from their Semi/Anti counterparts. It seems like the documentation of this method is missing LeftMark / RightMark too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the doc! I'm not quite sure either, I think it has to do with the extra boolean column crossing the boundary into the other side, resulting in the mark column being missing from the requirements

// Decrease right side indices by `left_len` so that they point to valid
// positions within the right child:
indices.split_off(left_len)
Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false),
// No columns from the left side of the join can be referenced in output
// predicates for semi/anti joins, so whether we specify t/f doesn't matter.
JoinType::RightSemi | JoinType::RightAnti => (false, true),
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true),
}
}

Expand All @@ -191,6 +191,7 @@ pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) {
JoinType::LeftAnti => (false, true),
JoinType::RightAnti => (true, false),
JoinType::LeftMark => (false, true),
JoinType::RightMark => (true, false),
}
}

Expand Down Expand Up @@ -691,7 +692,7 @@ fn infer_join_predicates_from_on_filters(
inferred_predicates,
)
}
JoinType::Right | JoinType::RightSemi => {
JoinType::Right | JoinType::RightSemi | JoinType::RightMark => {
infer_join_predicates_impl::<false, true>(
join_col_keys,
on_filters,
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,9 @@ impl EquivalenceGroup {
result
}
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => self.clone(),
JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(),
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
right_equivalences.clone()
}
};
Ok(group)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ pub fn adjust_input_keys_ordering(
left.schema().fields().len(),
)
.unwrap_or_default(),
JoinType::RightSemi | JoinType::RightAnti => {
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
requirements.data.clone()
}
JoinType::Left
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ fn expr_source_side(
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftMark => {
| JoinType::LeftMark
| JoinType::RightMark => {
let eq_group = eqp.eq_group();
let mut right_ordering = ordering.clone();
let (mut valid_left, mut valid_right) = (true, true);
Expand Down
Loading