Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ impl HashJoinStream {
filter,
JoinSide::Left,
None,
self.join_type,
)?
} else {
(left_indices, right_indices)
Expand Down Expand Up @@ -707,6 +708,7 @@ impl HashJoinStream {
&right_indices,
&self.column_indices,
join_side,
self.join_type,
)?;

self.output_buffer.push_batch(batch)?;
Expand Down Expand Up @@ -770,6 +772,7 @@ impl HashJoinStream {
&right_side,
&self.column_indices,
JoinSide::Left,
self.join_type,
)?;
self.output_buffer.push_batch(batch)?;
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ pub(crate) fn build_side_determined_results(
&probe_indices,
column_indices,
build_hash_joiner.build_side,
join_type,
)
.map(|batch| (batch.num_rows() > 0).then_some(batch))
} else {
Expand Down Expand Up @@ -993,6 +994,7 @@ pub(crate) fn join_with_probe_batch(
filter,
build_hash_joiner.build_side,
None,
join_type,
)?
} else {
(build_indices, probe_indices)
Expand Down Expand Up @@ -1031,6 +1033,7 @@ pub(crate) fn join_with_probe_batch(
&probe_indices,
column_indices,
build_hash_joiner.build_side,
join_type,
)
.map(|batch| (batch.num_rows() > 0).then_some(batch))
}
Expand Down
15 changes: 14 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map(
(left_indices, right_indices)
}

#[expect(clippy::too_many_arguments)]
pub(crate) fn apply_join_filter_to_indices(
build_input_buffer: &RecordBatch,
probe_batch: &RecordBatch,
Expand All @@ -918,6 +919,7 @@ pub(crate) fn apply_join_filter_to_indices(
filter: &JoinFilter,
build_side: JoinSide,
max_intermediate_size: Option<usize>,
join_type: JoinType,
) -> Result<(UInt64Array, UInt32Array)> {
if build_indices.is_empty() && probe_indices.is_empty() {
return Ok((build_indices, probe_indices));
Expand All @@ -938,6 +940,7 @@ pub(crate) fn apply_join_filter_to_indices(
&probe_indices.slice(i, len),
filter.column_indices(),
build_side,
join_type,
)?;
let filter_result = filter
.expression()
Expand All @@ -959,6 +962,7 @@ pub(crate) fn apply_join_filter_to_indices(
&probe_indices,
filter.column_indices(),
build_side,
join_type,
)?;

filter
Expand All @@ -979,6 +983,7 @@ pub(crate) fn apply_join_filter_to_indices(

/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
/// The resulting batch has [Schema] `schema`.
#[expect(clippy::too_many_arguments)]
pub(crate) fn build_batch_from_indices(
schema: &Schema,
build_input_buffer: &RecordBatch,
Expand All @@ -987,11 +992,19 @@ pub(crate) fn build_batch_from_indices(
probe_indices: &UInt32Array,
column_indices: &[ColumnIndex],
build_side: JoinSide,
join_type: JoinType,
) -> Result<RecordBatch> {
if schema.fields().is_empty() {
// For RightAnti and RightSemi joins, after `adjust_indices_by_join_type`
// the build_indices were untouched so only probe_indices hold the actual
// row count.
let row_count = match join_type {
JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(),
_ => build_indices.len(),
};
let options = RecordBatchOptions::new()
.with_match_field_names(true)
.with_row_count(Some(build_indices.len()));
.with_row_count(Some(row_count));

return Ok(RecordBatch::try_new_with_options(
Arc::new(schema.clone()),
Expand Down
43 changes: 43 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5226,3 +5226,46 @@ DROP TABLE issue_20437_small;

statement count 0
DROP TABLE issue_20437_large;

# Test count(*) with right semi/anti joins returns correct row counts
# issue: https://github.com/apache/datafusion/issues/20669

statement ok
CREATE TABLE t1 (k INT, v INT);

statement ok
CREATE TABLE t2 (k INT, v INT);

statement ok
INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i);

statement ok
INSERT INTO t2 VALUES (1, 1);

query I
WITH t AS (
SELECT *
FROM t1
LEFT ANTI JOIN t2 ON t1.k = t2.k
)
SELECT count(*)
FROM t;
----
99

query I
WITH t AS (
SELECT *
FROM t1
LEFT SEMI JOIN t2 ON t1.k = t2.k
)
SELECT count(*)
FROM t;
----
1

statement count 0
DROP TABLE t1;

statement count 0
DROP TABLE t2;
Loading