From 9e293929713b12bc461c7f58c831b5aeed39dc71 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 6 Nov 2023 16:43:03 -0500 Subject: [PATCH 1/4] Minor: Improve HashJoinStream docstrings --- .../physical-plan/src/joins/hash_join.rs | 119 +++++++++++------- 1 file changed, 72 insertions(+), 47 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 0e2d552b543b..6072c1439c98 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -206,7 +206,7 @@ pub struct HashJoinExec { pub join_type: JoinType, /// The output schema for the join schema: SchemaRef, - /// Build-side data + /// Future that consumes left input and builds the hash table left_fut: OnceAsync, /// Shared the `RandomState` for the hashing algorithm random_state: RandomState, @@ -747,27 +747,36 @@ where Ok(()) } -/// A stream that issues [RecordBatch]es as they arrive from the right of the join. +/// [`Stream`] for [`HashJoinExec`] that does the actual join. +/// +/// This stream: +/// 1. Reads the entire left side input and constructs a hash table +/// 2. +/// 2. [RecordBatch]es as they arrive from the right of the join. struct HashJoinStream { /// Input schema schema: Arc, - /// columns from the left + /// equijoin columns from the left (build side) on_left: Vec, - /// columns from the right used to compute the hash + /// equijoin columns from the right (probe side) on_right: Vec, - /// join filter + /// optional join filter filter: Option, - /// type of the join + /// type of the join (left, right, semi, etc) join_type: JoinType, - /// future for data from left side + /// future which builds hash table from left side left_fut: OnceFut, - /// Keeps track of the left side rows whether they are visited + /// Which left (probe) side rows have been matches while creating output. + /// For some OUTER joins, we need to know which rows have not been matched + /// to produce the correct. visited_left_side: Option, - /// right + /// right (probe) input right: SendableRecordBatchStream, /// Random state used for hashing initialization random_state: RandomState, - /// There is nothing to process anymore and left side is processed in case of left join + /// The join output is complete. For outer joins, this is used to + /// distinguish when the input stream is exhausted and when any unmatched + /// rows are output. is_exhausted: bool, /// Metrics join_metrics: BuildProbeJoinMetrics, @@ -785,37 +794,51 @@ impl RecordBatchStream for HashJoinStream { } } -// Returns build/probe indices satisfying the equality condition. -// On LEFT.b1 = RIGHT.b2 -// LEFT Table: -// a1 b1 c1 -// 1 1 10 -// 3 3 30 -// 5 5 50 -// 7 7 70 -// 9 8 90 -// 11 8 110 -// 13 10 130 -// RIGHT Table: -// a2 b2 c2 -// 2 2 20 -// 4 4 40 -// 6 6 60 -// 8 8 80 -// 10 10 100 -// 12 10 120 -// The result is -// "+----+----+-----+----+----+-----+", -// "| a1 | b1 | c1 | a2 | b2 | c2 |", -// "+----+----+-----+----+----+-----+", -// "| 9 | 8 | 90 | 8 | 8 | 80 |", -// "| 11 | 8 | 110 | 8 | 8 | 80 |", -// "| 13 | 10 | 130 | 10 | 10 | 100 |", -// "| 13 | 10 | 130 | 12 | 10 | 120 |", -// "+----+----+-----+----+----+-----+" -// And the result of build and probe indices are: -// Build indices: 4, 5, 6, 6 -// Probe indices: 3, 3, 4, 5 +/// Returns build/probe indices satisfying the equality condition. +/// +/// # Example +/// +/// For `LEFT.b1 = RIGHT.b2`: +/// LEFT Table: +/// ```text +/// a1 b1 c1 +/// 1 1 10 +/// 3 3 30 +/// 5 5 50 +/// 7 7 70 +/// 9 8 90 +/// 11 8 110 +/// 13 10 130 +/// ``` +/// +/// RIGHT Table: +/// ```text +/// a2 b2 c2 +/// 2 2 20 +/// 4 4 40 +/// 6 6 60 +/// 8 8 80 +/// 10 10 100 +/// 12 10 120 +/// ``` +/// +/// The result is +/// ```text +/// "+----+----+-----+----+----+-----+", +/// "| a1 | b1 | c1 | a2 | b2 | c2 |", +/// "+----+----+-----+----+----+-----+", +/// "| 9 | 8 | 90 | 8 | 8 | 80 |", +/// "| 11 | 8 | 110 | 8 | 8 | 80 |", +/// "| 13 | 10 | 130 | 10 | 10 | 100 |", +/// "| 13 | 10 | 130 | 12 | 10 | 120 |", +/// "+----+----+-----+----+----+-----+" +/// ``` +/// +/// And the result of build and probe indices are: +/// ```text +/// Build indices: 4, 5, 6, 6 +/// Probe indices: 3, 3, 4, 5 +/// ``` #[allow(clippy::too_many_arguments)] pub fn build_equal_condition_join_indices( build_hashmap: &T, @@ -1003,13 +1026,14 @@ impl HashJoinStream { cx: &mut std::task::Context<'_>, ) -> Poll>> { let build_timer = self.join_metrics.build_time.timer(); + // build hash table from left (build) side, if not yet done let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, Err(e) => return Poll::Ready(Some(Err(e))), }; build_timer.done(); - // Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet + // Reserving memory for visited_left_side bitmap in case it hasn't been initialized yet // and join_type requires to store it if self.visited_left_side.is_none() && need_produce_result_in_final(self.join_type) @@ -1024,11 +1048,11 @@ impl HashJoinStream { let visited_left_side = self.visited_left_side.get_or_insert_with(|| { let num_rows = left_data.1.num_rows(); if need_produce_result_in_final(self.join_type) { - // these join type need the bitmap to identify which row has be matched or unmatched. - // For the `left semi` join, need to use the bitmap to produce the matched row in the left side - // For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null - // For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side - // For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null + // Some join types need to track which row has be matched or unmatched: + // `left semi` join: need to use the bitmap to produce the matched row in the left side + // `left` join: need to use the bitmap to produce the unmatched row in the left side with null + // `left anti` join: need to use the bitmap to produce the unmatched row in the left side + // `full` join: need to use the bitmap to produce the unmatched row in the left side with null let mut buffer = BooleanBufferBuilder::new(num_rows); buffer.append_n(num_rows, false); buffer @@ -1037,6 +1061,7 @@ impl HashJoinStream { } }); let mut hashes_buffer = vec![]; + // get next right (probe) input batch self.right .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { From ad1767d213b99814bd8f75a3b8d335cfcdbf4e9d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 6 Nov 2023 16:46:32 -0500 Subject: [PATCH 2/4] fix comments --- datafusion/physical-plan/src/joins/hash_join.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 6072c1439c98..f27d608c3ba4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -750,9 +750,11 @@ where /// [`Stream`] for [`HashJoinExec`] that does the actual join. /// /// This stream: -/// 1. Reads the entire left side input and constructs a hash table -/// 2. -/// 2. [RecordBatch]es as they arrive from the right of the join. +/// +/// 1. Reads the entire left input and constructs a hash table +/// +/// 2. Streams [RecordBatch]es as they arrive from the right input and joins +/// them with the contents of the hash table struct HashJoinStream { /// Input schema schema: Arc, From 8588810953d696a176fb287f7a69e31ad76f1141 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 7 Nov 2023 19:10:31 +0100 Subject: [PATCH 3/4] Update datafusion/physical-plan/src/joins/hash_join.rs Co-authored-by: comphead --- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index f27d608c3ba4..53c36dddb149 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -751,7 +751,7 @@ where /// /// This stream: /// -/// 1. Reads the entire left input and constructs a hash table +/// 1. Reads the entire left input (build) and constructs a hash table /// /// 2. Streams [RecordBatch]es as they arrive from the right input and joins /// them with the contents of the hash table From 2ec5f39b7f8bdcb46debe0dd452fda2d84eb7c2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 7 Nov 2023 19:10:36 +0100 Subject: [PATCH 4/4] Update datafusion/physical-plan/src/joins/hash_join.rs Co-authored-by: comphead --- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 53c36dddb149..1a2db87d98a2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -753,7 +753,7 @@ where /// /// 1. Reads the entire left input (build) and constructs a hash table /// -/// 2. Streams [RecordBatch]es as they arrive from the right input and joins +/// 2. Streams [RecordBatch]es as they arrive from the right input (probe) and joins /// them with the contents of the hash table struct HashJoinStream { /// Input schema