From bd9e4a84b79b4092f4c87be352354486420cc90f Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Tue, 1 Jun 2021 17:29:47 +0200 Subject: [PATCH 01/13] Support semi join --- ballista/rust/core/proto/ballista.proto | 1 + .../core/src/serde/logical_plan/from_proto.rs | 1 + .../core/src/serde/logical_plan/to_proto.rs | 1 + .../src/serde/physical_plan/from_proto.rs | 1 + .../core/src/serde/physical_plan/to_proto.rs | 1 + datafusion/src/logical_plan/builder.rs | 4 ++ datafusion/src/logical_plan/plan.rs | 4 +- .../src/optimizer/hash_build_probe_order.rs | 4 +- datafusion/src/physical_plan/hash_join.rs | 70 ++++++++++++++++--- datafusion/src/physical_plan/hash_utils.rs | 5 ++ datafusion/src/physical_plan/planner.rs | 1 + 11 files changed, 82 insertions(+), 11 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index da0c615e3b23e..03872147b797b 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -363,6 +363,7 @@ enum JoinType { LEFT = 1; RIGHT = 2; FULL = 3; + SEMI = 4; } message JoinNode { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 10c4670e809aa..48471263885f6 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -265,6 +265,7 @@ impl TryInto for &protobuf::LogicalPlanNode { protobuf::JoinType::Left => JoinType::Left, protobuf::JoinType::Right => JoinType::Right, protobuf::JoinType::Full => JoinType::Full, + protobuf::JoinType::Semi => JoinType::Semi, }; LogicalPlanBuilder::from(&convert_box_required!(join.left)?) .join( diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index b630dfcc0d1b4..e1c0c5e44df64 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -834,6 +834,7 @@ impl TryInto for &LogicalPlan { JoinType::Left => protobuf::JoinType::Left, JoinType::Right => protobuf::JoinType::Right, JoinType::Full => protobuf::JoinType::Full, + JoinType::Semi => protobuf::JoinType::Semi, }; let left_join_column = on.iter().map(|on| on.0.to_owned()).collect(); let right_join_column = on.iter().map(|on| on.1.to_owned()).collect(); diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 2039def908bc0..7f98a8378b0b2 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -379,6 +379,7 @@ impl TryInto> for &protobuf::PhysicalPlanNode { protobuf::JoinType::Left => JoinType::Left, protobuf::JoinType::Right => JoinType::Right, protobuf::JoinType::Full => JoinType::Full, + protobuf::JoinType::Semi => JoinType::Semi, }; Ok(Arc::new(HashJoinExec::try_new( left, diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 9571f3de2e76b..c409f94749518 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -133,6 +133,7 @@ impl TryInto for Arc { JoinType::Left => protobuf::JoinType::Left, JoinType::Right => protobuf::JoinType::Right, JoinType::Full => protobuf::JoinType::Full, + JoinType::Semi => protobuf::JoinType::Semi, }; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 9515ac2ff3739..5e44a3e097f34 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -410,6 +410,10 @@ fn build_join_schema( // left then right left_fields.chain(right_fields).cloned().collect() } + JoinType::Semi => { + // Only use the left side for the schema + left.fields().clone() + } JoinType::Right => { // remove left-side join keys if they have the same names as the right-side let duplicate_keys = &on diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 4027916c8a7cd..d10f8b573345c 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -34,7 +34,7 @@ use std::{ }; /// Join type -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum JoinType { /// Inner Join Inner, @@ -44,6 +44,8 @@ pub enum JoinType { Right, /// Full Join Full, + /// Semi Join + Semi, } /// A LogicalPlan represents the different types of relational diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index 100ae4fb09b73..d6abfcf5c8b93 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -125,7 +125,7 @@ impl OptimizerRule for HashBuildProbeOrder { on, join_type, schema, - } => { + } if *join_type != JoinType::Semi => { let left = self.optimize(left, execution_props)?; let right = self.optimize(right, execution_props)?; if should_swap_join_order(&left, &right) { @@ -187,6 +187,7 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Explain { .. } | LogicalPlan::Union { .. } + | LogicalPlan::Join { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); @@ -216,6 +217,7 @@ fn swap_join_type(join_type: JoinType) -> JoinType { JoinType::Full => JoinType::Full, JoinType::Left => JoinType::Right, JoinType::Right => JoinType::Left, + _ => unreachable!() } } diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 01551cd4daf4c..a6db0fd9506ba 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -184,7 +184,7 @@ impl HashJoinExec { /// Calculates column indices and left/right placement on input / output schemas and jointype fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { - JoinType::Inner | JoinType::Left | JoinType::Full => { + JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Semi => { (true, self.left.schema(), self.right.schema()) } JoinType::Right => (false, self.right.schema(), self.left.schema()), @@ -376,7 +376,7 @@ impl ExecutionPlan for HashJoinExec { let column_indices = self.column_indices_from_schema()?; let num_rows = left_data.1.num_rows(); let visited_left_side = match self.join_type { - JoinType::Left | JoinType::Full => vec![false; num_rows], + JoinType::Left | JoinType::Full | JoinType::Semi => vec![false; num_rows], JoinType::Inner | JoinType::Right => vec![], }; Ok(Box::pin(HashJoinStream { @@ -544,6 +544,10 @@ fn build_batch( ) .unwrap(); + if join_type == JoinType::Semi { + return Ok((RecordBatch::new_empty(Arc::new(schema.clone())), left_indices)); + } + build_batch_from_indices( schema, &left_data.1, @@ -606,7 +610,7 @@ fn build_join_indexes( let left = &left_data.0; match join_type { - JoinType::Inner => { + JoinType::Inner | JoinType::Semi => { // Using a buffer builder to avoid slower normal builder let mut left_indices = UInt64BufferBuilder::new(0); let mut right_indices = UInt32BufferBuilder::new(0); @@ -1109,19 +1113,30 @@ pub fn create_hashes<'a>( } // Produces a batch for left-side rows that are not marked as being visited during the whole join -fn produce_unmatched( +fn produce_from_matched( visited_left_side: &[bool], schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, + unmatched: bool ) -> ArrowResult { // Find indices which didn't match any right row (are false) - let unmatched_indices: Vec = visited_left_side + let unmatched_indices: Vec = if unmatched { + visited_left_side .iter() .enumerate() .filter(|&(_, &value)| !value) .map(|(index, _)| index as u64) - .collect(); + .collect() + } else { + // produce those that did match + visited_left_side + .iter() + .enumerate() + .filter(|&(_, &value)| value) + .map(|(index, _)| index as u64) + .collect() + }; // generate batches by taking values from the left side and generating columns filled with null on the right side let indices = UInt64Array::from_iter_values(unmatched_indices); @@ -1171,7 +1186,7 @@ impl Stream for HashJoinStream { self.num_output_rows += batch.num_rows(); match self.join_type { - JoinType::Left | JoinType::Full => { + JoinType::Left | JoinType::Full | JoinType::Semi => { left_side.iter().flatten().for_each(|x| { self.visited_left_side[x as usize] = true; }); @@ -1185,12 +1200,13 @@ impl Stream for HashJoinStream { let start = Instant::now(); // For the left join, produce rows for unmatched rows match self.join_type { - JoinType::Left | JoinType::Full if !self.is_exhausted => { - let result = produce_unmatched( + JoinType::Left | JoinType::Full | JoinType::Semi if !self.is_exhausted => { + let result = produce_from_matched( &self.visited_left_side, &self.schema, &self.column_indices, &self.left_data, + self.join_type != JoinType::Semi ); if let Ok(ref batch) = result { self.num_input_batches += 1; @@ -1207,6 +1223,7 @@ impl Stream for HashJoinStream { } JoinType::Left | JoinType::Full + | JoinType::Semi | JoinType::Inner | JoinType::Right => {} } @@ -1666,6 +1683,41 @@ mod tests { Ok(()) } + #[tokio::test] + async fn join_semi() -> Result<()> { + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b1", &vec![4, 5, 6, 5]), // 5 is double on the right + ("c2", &vec![70, 80, 90, 100]), + ); + let on = &[("b1", "b1")]; + + let join = join(left, right, on, &JoinType::Semi)?; + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1"]); + + let stream = join.execute(0).await?; + let batches = common::collect(stream).await?; + + let expected = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 1 | 4 | 7 |", + "| 2 | 5 | 8 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[tokio::test] async fn join_right_one() -> Result<()> { let left = build_table( diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 7e030af3a124c..af0badbb54b3d 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -32,6 +32,8 @@ pub enum JoinType { Right, /// Full Join Full, + /// Semi Join + Semi, } /// The on clause of the join, as vector of (left, right) columns. @@ -130,6 +132,9 @@ pub fn build_join_schema( // left then right left_fields.chain(right_fields).cloned().collect() } + JoinType::Semi => { + left.fields().clone() + } }; Schema::new(fields) } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 7ddfaf8f68972..4971a027ef1e4 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -367,6 +367,7 @@ impl DefaultPhysicalPlanner { JoinType::Left => hash_utils::JoinType::Left, JoinType::Right => hash_utils::JoinType::Right, JoinType::Full => hash_utils::JoinType::Full, + JoinType::Semi => hash_utils::JoinType::Semi, }; if ctx_state.config.concurrency > 1 && ctx_state.config.repartition_joins { From c9abbee46ee214bae41d321082e556117363ad75 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Tue, 1 Jun 2021 17:42:34 +0200 Subject: [PATCH 02/13] Fmt --- .../src/optimizer/hash_build_probe_order.rs | 2 +- datafusion/src/physical_plan/hash_join.rs | 35 +++++++++++-------- datafusion/src/physical_plan/hash_utils.rs | 4 +-- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index d6abfcf5c8b93..4e7529cecb355 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -217,7 +217,7 @@ fn swap_join_type(join_type: JoinType) -> JoinType { JoinType::Full => JoinType::Full, JoinType::Left => JoinType::Right, JoinType::Right => JoinType::Left, - _ => unreachable!() + _ => unreachable!(), } } diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index a6db0fd9506ba..a28e1176dc926 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -545,7 +545,10 @@ fn build_batch( .unwrap(); if join_type == JoinType::Semi { - return Ok((RecordBatch::new_empty(Arc::new(schema.clone())), left_indices)); + return Ok(( + RecordBatch::new_empty(Arc::new(schema.clone())), + left_indices, + )); } build_batch_from_indices( @@ -1118,24 +1121,24 @@ fn produce_from_matched( schema: &SchemaRef, column_indices: &[ColumnIndex], left_data: &JoinLeftData, - unmatched: bool + unmatched: bool, ) -> ArrowResult { // Find indices which didn't match any right row (are false) let unmatched_indices: Vec = if unmatched { - visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| !value) - .map(|(index, _)| index as u64) - .collect() + visited_left_side + .iter() + .enumerate() + .filter(|&(_, &value)| !value) + .map(|(index, _)| index as u64) + .collect() } else { // produce those that did match visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| value) - .map(|(index, _)| index as u64) - .collect() + .iter() + .enumerate() + .filter(|&(_, &value)| value) + .map(|(index, _)| index as u64) + .collect() }; // generate batches by taking values from the left side and generating columns filled with null on the right side @@ -1200,13 +1203,15 @@ impl Stream for HashJoinStream { let start = Instant::now(); // For the left join, produce rows for unmatched rows match self.join_type { - JoinType::Left | JoinType::Full | JoinType::Semi if !self.is_exhausted => { + JoinType::Left | JoinType::Full | JoinType::Semi + if !self.is_exhausted => + { let result = produce_from_matched( &self.visited_left_side, &self.schema, &self.column_indices, &self.left_data, - self.join_type != JoinType::Semi + self.join_type != JoinType::Semi, ); if let Ok(ref batch) = result { self.num_input_batches += 1; diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index af0badbb54b3d..110319e4bb6b8 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -132,9 +132,7 @@ pub fn build_join_schema( // left then right left_fields.chain(right_fields).cloned().collect() } - JoinType::Semi => { - left.fields().clone() - } + JoinType::Semi => left.fields().clone(), }; Schema::new(fields) } From c91e74cc3836cf7bad6bed2f201254ae97d421a8 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Tue, 1 Jun 2021 17:44:21 +0200 Subject: [PATCH 03/13] Match on Semi --- datafusion/src/optimizer/hash_build_probe_order.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index 4e7529cecb355..b93bcde46e4ed 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -187,7 +187,7 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Explain { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } + | LogicalPlan::Join { join_type: JoinType::Semi, .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); From b8e3706d3e2306add93088181a9bb196ea512180 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Tue, 1 Jun 2021 17:47:03 +0200 Subject: [PATCH 04/13] Simplify --- datafusion/src/physical_plan/hash_join.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index a28e1176dc926..f4734ee2a15f8 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1124,25 +1124,23 @@ fn produce_from_matched( unmatched: bool, ) -> ArrowResult { // Find indices which didn't match any right row (are false) - let unmatched_indices: Vec = if unmatched { - visited_left_side + let indices = if unmatched { + UInt64Array::from_iter_values(visited_left_side .iter() .enumerate() .filter(|&(_, &value)| !value) .map(|(index, _)| index as u64) - .collect() + ) } else { // produce those that did match - visited_left_side + UInt64Array::from_iter_values(visited_left_side .iter() .enumerate() .filter(|&(_, &value)| value) - .map(|(index, _)| index as u64) - .collect() + .map(|(index, _)| index as u64)) }; // generate batches by taking values from the left side and generating columns filled with null on the right side - let indices = UInt64Array::from_iter_values(unmatched_indices); let num_rows = indices.len(); let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); for (idx, column_index) in column_indices.iter().enumerate() { From 8c8ecf79ede7f11be5456b37ec14a97552323bc2 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Tue, 1 Jun 2021 17:47:19 +0200 Subject: [PATCH 05/13] Fmt --- .../src/optimizer/hash_build_probe_order.rs | 5 +++- datafusion/src/physical_plan/hash_join.rs | 23 +++++++++++-------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index b93bcde46e4ed..978c3b2a5f9eb 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -187,7 +187,10 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Explain { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { join_type: JoinType::Semi, .. } + | LogicalPlan::Join { + join_type: JoinType::Semi, + .. + } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index f4734ee2a15f8..74772ec6400c0 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1125,19 +1125,22 @@ fn produce_from_matched( ) -> ArrowResult { // Find indices which didn't match any right row (are false) let indices = if unmatched { - UInt64Array::from_iter_values(visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| !value) - .map(|(index, _)| index as u64) + UInt64Array::from_iter_values( + visited_left_side + .iter() + .enumerate() + .filter(|&(_, &value)| !value) + .map(|(index, _)| index as u64), ) } else { // produce those that did match - UInt64Array::from_iter_values(visited_left_side - .iter() - .enumerate() - .filter(|&(_, &value)| value) - .map(|(index, _)| index as u64)) + UInt64Array::from_iter_values( + visited_left_side + .iter() + .enumerate() + .filter(|&(_, &value)| value) + .map(|(index, _)| index as u64), + ) }; // generate batches by taking values from the left side and generating columns filled with null on the right side From 592fd83987d571fc1206c9dac415e9610911606b Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Tue, 1 Jun 2021 18:14:47 +0200 Subject: [PATCH 06/13] Undo match --- datafusion/src/optimizer/hash_build_probe_order.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index 978c3b2a5f9eb..4e7529cecb355 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -187,10 +187,7 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Explain { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { - join_type: JoinType::Semi, - .. - } + | LogicalPlan::Join { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); From 56281e879bbacdfc334e999f0b91cffc9182b830 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Wed, 2 Jun 2021 19:33:04 +0200 Subject: [PATCH 07/13] Support anti join --- ballista/rust/core/proto/ballista.proto | 1 + .../core/src/serde/logical_plan/from_proto.rs | 1 + .../core/src/serde/logical_plan/to_proto.rs | 1 + .../src/serde/physical_plan/from_proto.rs | 1 + .../core/src/serde/physical_plan/to_proto.rs | 1 + datafusion/src/logical_plan/builder.rs | 2 +- datafusion/src/logical_plan/plan.rs | 2 + datafusion/src/physical_plan/hash_join.rs | 61 ++++++++++++++++--- datafusion/src/physical_plan/hash_utils.rs | 4 +- datafusion/src/physical_plan/planner.rs | 1 + 10 files changed, 65 insertions(+), 10 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 03872147b797b..8d5a9df0fc084 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -364,6 +364,7 @@ enum JoinType { RIGHT = 2; FULL = 3; SEMI = 4; + ANTI = 5; } message JoinNode { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 48471263885f6..ca201a7db7b01 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -266,6 +266,7 @@ impl TryInto for &protobuf::LogicalPlanNode { protobuf::JoinType::Right => JoinType::Right, protobuf::JoinType::Full => JoinType::Full, protobuf::JoinType::Semi => JoinType::Semi, + protobuf::JoinType::Anti => JoinType::Anti, }; LogicalPlanBuilder::from(&convert_box_required!(join.left)?) .join( diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index e1c0c5e44df64..1cd886b175cf2 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -835,6 +835,7 @@ impl TryInto for &LogicalPlan { JoinType::Right => protobuf::JoinType::Right, JoinType::Full => protobuf::JoinType::Full, JoinType::Semi => protobuf::JoinType::Semi, + JoinType::Anti => protobuf::JoinType::Anti, }; let left_join_column = on.iter().map(|on| on.0.to_owned()).collect(); let right_join_column = on.iter().map(|on| on.1.to_owned()).collect(); diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 7f98a8378b0b2..89307027d7012 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -380,6 +380,7 @@ impl TryInto> for &protobuf::PhysicalPlanNode { protobuf::JoinType::Right => JoinType::Right, protobuf::JoinType::Full => JoinType::Full, protobuf::JoinType::Semi => JoinType::Semi, + protobuf::JoinType::Anti => JoinType::Anti, }; Ok(Arc::new(HashJoinExec::try_new( left, diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index c409f94749518..26092e74a096a 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -134,6 +134,7 @@ impl TryInto for Arc { JoinType::Right => protobuf::JoinType::Right, JoinType::Full => protobuf::JoinType::Full, JoinType::Semi => protobuf::JoinType::Semi, + JoinType::Anti => protobuf::JoinType::Anti, }; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new( diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 5e44a3e097f34..439ef7ba17c0c 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -410,7 +410,7 @@ fn build_join_schema( // left then right left_fields.chain(right_fields).cloned().collect() } - JoinType::Semi => { + JoinType::Semi | JoinType::Anti => { // Only use the left side for the schema left.fields().clone() } diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index d10f8b573345c..e4e2a2859fce6 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -46,6 +46,8 @@ pub enum JoinType { Full, /// Semi Join Semi, + /// Anti Join + Anti, } /// A LogicalPlan represents the different types of relational diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 74772ec6400c0..3c00fecc94e11 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -184,9 +184,11 @@ impl HashJoinExec { /// Calculates column indices and left/right placement on input / output schemas and jointype fn column_indices_from_schema(&self) -> ArrowResult> { let (primary_is_left, primary_schema, secondary_schema) = match self.join_type { - JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Semi => { - (true, self.left.schema(), self.right.schema()) - } + JoinType::Inner + | JoinType::Left + | JoinType::Full + | JoinType::Semi + | JoinType::Anti => (true, self.left.schema(), self.right.schema()), JoinType::Right => (false, self.right.schema(), self.left.schema()), }; let mut column_indices = Vec::with_capacity(self.schema.fields().len()); @@ -376,7 +378,9 @@ impl ExecutionPlan for HashJoinExec { let column_indices = self.column_indices_from_schema()?; let num_rows = left_data.1.num_rows(); let visited_left_side = match self.join_type { - JoinType::Left | JoinType::Full | JoinType::Semi => vec![false; num_rows], + JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => { + vec![false; num_rows] + } JoinType::Inner | JoinType::Right => vec![], }; Ok(Box::pin(HashJoinStream { @@ -544,7 +548,7 @@ fn build_batch( ) .unwrap(); - if join_type == JoinType::Semi { + if matches!(join_type, JoinType::Semi | JoinType::Anti) { return Ok(( RecordBatch::new_empty(Arc::new(schema.clone())), left_indices, @@ -613,7 +617,7 @@ fn build_join_indexes( let left = &left_data.0; match join_type { - JoinType::Inner | JoinType::Semi => { + JoinType::Inner | JoinType::Semi | JoinType::Anti => { // Using a buffer builder to avoid slower normal builder let mut left_indices = UInt64BufferBuilder::new(0); let mut right_indices = UInt32BufferBuilder::new(0); @@ -1190,7 +1194,10 @@ impl Stream for HashJoinStream { self.num_output_rows += batch.num_rows(); match self.join_type { - JoinType::Left | JoinType::Full | JoinType::Semi => { + JoinType::Left + | JoinType::Full + | JoinType::Semi + | JoinType::Anti => { left_side.iter().flatten().for_each(|x| { self.visited_left_side[x as usize] = true; }); @@ -1204,7 +1211,10 @@ impl Stream for HashJoinStream { let start = Instant::now(); // For the left join, produce rows for unmatched rows match self.join_type { - JoinType::Left | JoinType::Full | JoinType::Semi + JoinType::Left + | JoinType::Full + | JoinType::Semi + | JoinType::Anti if !self.is_exhausted => { let result = produce_from_matched( @@ -1230,6 +1240,7 @@ impl Stream for HashJoinStream { JoinType::Left | JoinType::Full | JoinType::Semi + | JoinType::Anti | JoinType::Inner | JoinType::Right => {} } @@ -1724,6 +1735,40 @@ mod tests { Ok(()) } + #[tokio::test] + async fn join_anti() -> Result<()> { + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b1", &vec![4, 5, 6, 5]), // 5 is double on the right + ("c2", &vec![70, 80, 90, 100]), + ); + let on = &[("b1", "b1")]; + + let join = join(left, right, on, &JoinType::Anti)?; + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1"]); + + let stream = join.execute(0).await?; + let batches = common::collect(stream).await?; + + let expected = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 3 | 7 | 9 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[tokio::test] async fn join_right_one() -> Result<()> { let left = build_table( diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 110319e4bb6b8..a48710bfbfc35 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -34,6 +34,8 @@ pub enum JoinType { Full, /// Semi Join Semi, + /// Anti Join + Anti, } /// The on clause of the join, as vector of (left, right) columns. @@ -132,7 +134,7 @@ pub fn build_join_schema( // left then right left_fields.chain(right_fields).cloned().collect() } - JoinType::Semi => left.fields().clone(), + JoinType::Semi | JoinType::Anti => left.fields().clone(), }; Schema::new(fields) } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 4971a027ef1e4..9d86f67cb2e10 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -368,6 +368,7 @@ impl DefaultPhysicalPlanner { JoinType::Right => hash_utils::JoinType::Right, JoinType::Full => hash_utils::JoinType::Full, JoinType::Semi => hash_utils::JoinType::Semi, + JoinType::Anti => hash_utils::JoinType::Anti, }; if ctx_state.config.concurrency > 1 && ctx_state.config.repartition_joins { From 7491fcbe2b6e3ed1b0e988780e9d40186d32a1a9 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Thu, 3 Jun 2021 08:27:13 +0200 Subject: [PATCH 08/13] Remove check --- datafusion/src/optimizer/hash_build_probe_order.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index af8f6bcd3d637..7cfa026f16bf9 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -132,7 +132,7 @@ impl OptimizerRule for HashBuildProbeOrder { on, join_type, schema, - } if *join_type != JoinType::Semi => { + } => { let left = self.optimize(left, execution_props)?; let right = self.optimize(right, execution_props)?; if should_swap_join_order(&left, &right) && supports_swap(*join_type) { From 3fc16153adbe2f94d5037e214625ff024f013594 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Thu, 3 Jun 2021 08:28:09 +0200 Subject: [PATCH 09/13] Revert doc change --- datafusion/src/logical_plan/plan.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index d041b9af931a2..5391e76e7576e 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -49,6 +49,10 @@ pub enum JoinType { /// Anti Join Anti, } + +/// A LogicalPlan represents the different types of relational +/// operators (such as Projection, Filter, etc) and can be created by +/// the SQL query planner and the DataFrame API. /// /// A LogicalPlan represents transforming an input relation (table) to /// an output relation (table) with a (potentially) different From de3f9a108803658c0c0a786425b26649b0959c2a Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Thu, 3 Jun 2021 13:04:46 +0200 Subject: [PATCH 10/13] Add back test --- datafusion/src/physical_plan/hash_join.rs | 35 +++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 8d7c2e6f88b26..937f4a16eab03 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1736,6 +1736,41 @@ mod tests { Ok(()) } + #[tokio::test] + async fn join_anti() -> Result<()> { + let left = build_table( + ("a1", &vec![1, 2, 2, 3]), + ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b1", &vec![4, 5, 6, 5]), // 5 is double on the right + ("c2", &vec![70, 80, 90, 100]), + ); + let on = &[("b1", "b1")]; + + let join = join(left, right, on, &JoinType::Anti)?; + + let columns = columns(&join.schema()); + assert_eq!(columns, vec!["a1", "b1", "c1"]); + + let stream = join.execute(0).await?; + let batches = common::collect(stream).await?; + + let expected = vec![ + "+----+----+----+", + "| a1 | b1 | c1 |", + "+----+----+----+", + "| 3 | 7 | 9 |", + "| 2 | 5 | 8 |", + "| 2 | 5 | 8 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + Ok(()) + } + #[tokio::test] async fn join_right_one() -> Result<()> { let left = build_table( From 3f7d2411c98e30ad117951bbf5389cec7fc96107 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Thu, 3 Jun 2021 13:06:42 +0200 Subject: [PATCH 11/13] Fix --- datafusion/src/physical_plan/hash_join.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 937f4a16eab03..61cfb5c540ac3 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1763,8 +1763,6 @@ mod tests { "| a1 | b1 | c1 |", "+----+----+----+", "| 3 | 7 | 9 |", - "| 2 | 5 | 8 |", - "| 2 | 5 | 8 |", "+----+----+----+", ]; assert_batches_sorted_eq!(expected, &batches); From 49019036f6a69f145a2c39393d189e3233e711e2 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Thu, 3 Jun 2021 17:37:46 +0200 Subject: [PATCH 12/13] Linting --- datafusion/src/optimizer/hash_build_probe_order.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs b/datafusion/src/optimizer/hash_build_probe_order.rs index 7cfa026f16bf9..74d2b00901942 100644 --- a/datafusion/src/optimizer/hash_build_probe_order.rs +++ b/datafusion/src/optimizer/hash_build_probe_order.rs @@ -194,7 +194,6 @@ impl OptimizerRule for HashBuildProbeOrder { | LogicalPlan::CreateExternalTable { .. } | LogicalPlan::Explain { .. } | LogicalPlan::Union { .. } - | LogicalPlan::Join { .. } | LogicalPlan::Extension { .. } => { let expr = plan.expressions(); From 9089bee47976a2a1b6633f0300e8719ebdd045b6 Mon Sep 17 00:00:00 2001 From: Daniel Heres Date: Thu, 3 Jun 2021 17:41:55 +0200 Subject: [PATCH 13/13] Make test case more complete --- datafusion/src/physical_plan/hash_join.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 61cfb5c540ac3..d12e249cbe347 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -1739,9 +1739,9 @@ mod tests { #[tokio::test] async fn join_anti() -> Result<()> { let left = build_table( - ("a1", &vec![1, 2, 2, 3]), - ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the right - ("c1", &vec![7, 8, 8, 9]), + ("a1", &vec![1, 2, 2, 3, 5]), + ("b1", &vec![4, 5, 5, 7, 7]), // 7 does not exist on the right + ("c1", &vec![7, 8, 8, 9, 11]), ); let right = build_table( ("a2", &vec![10, 20, 30, 40]), @@ -1763,6 +1763,7 @@ mod tests { "| a1 | b1 | c1 |", "+----+----+----+", "| 3 | 7 | 9 |", + "| 5 | 7 | 11 |", "+----+----+----+", ]; assert_batches_sorted_eq!(expected, &batches);