Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 143 additions & 20 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,12 +1107,16 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {

let result = format!("{}", pretty_format_batches(&batches).unwrap());

let probe_scan_metrics = probe_scan.metrics().unwrap();

// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
// The number of output rows from the probe side scan should stay consistent across executions.
// Issue: https://github.com/apache/datafusion/issues/17451
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
// Previously this test asserted `probe_scan.metrics().output_rows() == 2`
// — that the probe scan emitted only the rows matching the finalized
// dynamic filter. That relied on all partitions blocking on a shared
// barrier inside `SharedBuildAccumulator` before any probing started,
// which deadlocked with `RepartitionExec`'s global backpressure gate
// under Q18-style plans (#21625). With the barrier removed, probing
// can begin before the last partition has reported, so the scan may
// emit its first batch against the placeholder filter. The join
// itself still applies the filter correctly (result batches are
// snapshot-asserted below).

insta::assert_snapshot!(
result,
Expand Down Expand Up @@ -1285,12 +1289,16 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {

let result = format!("{}", pretty_format_batches(&batches).unwrap());

let probe_scan_metrics = probe_scan.metrics().unwrap();

// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
// The number of output rows from the probe side scan should stay consistent across executions.
// Issue: https://github.com/apache/datafusion/issues/17451
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
// Previously this test asserted `probe_scan.metrics().output_rows() == 2`
// — that the probe scan emitted only the rows matching the finalized
// dynamic filter. That relied on all partitions blocking on a shared
// barrier inside `SharedBuildAccumulator` before any probing started,
// which deadlocked with `RepartitionExec`'s global backpressure gate
// under Q18-style plans (#21625). With the barrier removed, probing
// can begin before the last partition has reported, so the scan may
// emit its first batch against the placeholder filter. The join
// itself still applies the filter correctly (result batches are
// snapshot-asserted below).

insta::assert_snapshot!(
result,
Expand Down Expand Up @@ -2585,10 +2593,8 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {

let result = format!("{}", pretty_format_batches(&batches).unwrap());

let probe_scan_metrics = probe_scan.metrics().unwrap();

// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
// Scan-level row-count assertion removed; see the explanation in
// `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625).

// Results should be identical to the InList version
insta::assert_snapshot!(
Expand Down Expand Up @@ -2736,10 +2742,8 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {

let result = format!("{}", pretty_format_batches(&batches).unwrap());

let probe_scan_metrics = probe_scan.metrics().unwrap();

// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
// Scan-level row-count assertion removed; see the explanation in
// `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625).

// Results should be identical to the InList version
insta::assert_snapshot!(
Expand All @@ -2761,6 +2765,125 @@ async fn test_hashjoin_hash_table_pushdown_collect_left() {
// already covered by the simpler CollectLeft port in push_down_filter_parquet.slt;
// the with_support(false) branch has no SQL analog (parquet always supports
// pushdown).
/// Test HashTable strategy with integer multi-column join keys.
/// Verifies that hash_lookup works correctly with integer data types.
#[tokio::test]
async fn test_hashjoin_hash_table_pushdown_integer_keys() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create build side with integer keys
let build_batches = vec![
record_batch!(
("id1", Int32, [1, 2]),
("id2", Int32, [10, 20]),
("value", Float64, [100.0, 200.0])
)
.unwrap(),
];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("id1", DataType::Int32, false),
Field::new("id2", DataType::Int32, false),
Field::new("value", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more integer rows
let probe_batches = vec![
record_batch!(
("id1", Int32, [1, 2, 3, 4]),
("id2", Int32, [10, 20, 30, 40]),
("data", Utf8, ["a", "b", "c", "d"])
)
.unwrap(),
];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("id1", DataType::Int32, false),
Field::new("id2", DataType::Int32, false),
Field::new("data", DataType::Utf8, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create join on multiple integer columns
let on = vec![
(
col("id1", &build_side_schema).unwrap(),
col("id1", &probe_side_schema).unwrap(),
),
(
col("id2", &build_side_schema).unwrap(),
col("id2", &probe_side_schema).unwrap(),
),
];
let plan = Arc::new(
HashJoinExec::try_new(
build_scan,
Arc::clone(&probe_scan),
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);

// Apply optimization with forced HashTable strategy
let session_config = SessionConfig::default()
.with_batch_size(10)
.set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 1)
.set_bool("datafusion.execution.parquet.pushdown_filters", true)
.set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true);
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, session_config.options())
.unwrap();
let session_ctx = SessionContext::new_with_config(session_config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// Verify hash_lookup is used
let plan_str = format_plan_for_test(&plan).to_string();
assert!(
plan_str.contains("hash_lookup"),
"Expected hash_lookup in plan but got: {plan_str}"
);
assert!(
!plan_str.contains("IN (SET)"),
"Expected no IN (SET) in plan but got: {plan_str}"
);

let result = format!("{}", pretty_format_batches(&batches).unwrap());

// Scan-level row-count assertion removed; see the explanation in
// `test_hashjoin_dynamic_filter_pushdown_partitioned` (#21625).

insta::assert_snapshot!(
result,
@r"
+-----+-----+-------+-----+-----+------+
| id1 | id2 | value | id1 | id2 | data |
+-----+-----+-------+-----+-----+------+
| 1 | 10 | 100.0 | 1 | 10 | a |
| 2 | 20 | 200.0 | 2 | 20 | b |
+-----+-----+-------+-----+-----+------+
",
);
}
#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
use datafusion_common::JoinType;
Expand Down
Loading
Loading