diff --git a/Cargo.lock b/Cargo.lock index a20aad4ec5aae..9f9263e529034 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2458,6 +2458,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap 2.8.0", + "insta", "itertools 0.14.0", "log", "parking_lot", diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 4a10398e5a9ef..1f38e2ed31263 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -68,6 +68,7 @@ tokio = { workspace = true } criterion = { workspace = true, features = ["async_futures"] } datafusion-functions-aggregate = { workspace = true } datafusion-functions-window = { workspace = true } +insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } rstest_reuse = "0.7.0" diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 84ba0fe3b630b..8906468f68db2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1425,10 +1425,8 @@ mod tests { }; use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::{DataType, Int32Type}; - use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, internal_err, DataFusionError, - ScalarValue, - }; + use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; + use datafusion_common::{internal_err, DataFusionError, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::RuntimeEnvBuilder; @@ -1445,6 +1443,7 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use futures::{FutureExt, Stream}; + use insta::{allow_duplicates, assert_snapshot}; // Generate a schema which consists of 5 columns (a, b, c, d, e) fn create_test_schema() -> Result { @@ -1601,56 +1600,63 @@ mod tests { let result = collect(partial_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; - let expected = if spill { + if spill { // In spill mode, we test with the limited memory, if the mem usage exceeds, // we trigger the early emit rule, which turns out the partial aggregate result. - vec![ - "+---+-----+---------------+-----------------+", - "| a | b | __grouping_id | COUNT(1)[count] |", - "+---+-----+---------------+-----------------+", - "| | 1.0 | 2 | 1 |", - "| | 1.0 | 2 | 1 |", - "| | 2.0 | 2 | 1 |", - "| | 2.0 | 2 | 1 |", - "| | 3.0 | 2 | 1 |", - "| | 3.0 | 2 | 1 |", - "| | 4.0 | 2 | 1 |", - "| | 4.0 | 2 | 1 |", - "| 2 | | 1 | 1 |", - "| 2 | | 1 | 1 |", - "| 2 | 1.0 | 0 | 1 |", - "| 2 | 1.0 | 0 | 1 |", - "| 3 | | 1 | 1 |", - "| 3 | | 1 | 2 |", - "| 3 | 2.0 | 0 | 2 |", - "| 3 | 3.0 | 0 | 1 |", - "| 4 | | 1 | 1 |", - "| 4 | | 1 | 2 |", - "| 4 | 3.0 | 0 | 1 |", - "| 4 | 4.0 | 0 | 2 |", - "+---+-----+---------------+-----------------+", - ] + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&result), + @r" ++---+-----+---------------+-----------------+ +| a | b | __grouping_id | COUNT(1)[count] | ++---+-----+---------------+-----------------+ +| | 1.0 | 2 | 1 | +| | 1.0 | 2 | 1 | +| | 2.0 | 2 | 1 | +| | 2.0 | 2 | 1 | +| | 3.0 | 2 | 1 | +| | 3.0 | 2 | 1 | +| | 4.0 | 2 | 1 | +| | 4.0 | 2 | 1 | +| 2 | | 1 | 1 | +| 2 | | 1 | 1 | +| 2 | 1.0 | 0 | 1 | +| 2 | 1.0 | 0 | 1 | +| 3 | | 1 | 1 | +| 3 | | 1 | 2 | +| 3 | 2.0 | 0 | 2 | +| 3 | 3.0 | 0 | 1 | +| 4 | | 1 | 1 | +| 4 | | 1 | 2 | +| 4 | 3.0 | 0 | 1 | +| 4 | 4.0 | 0 | 2 | ++---+-----+---------------+-----------------+ + " + ); + } } else { - vec![ - "+---+-----+---------------+-----------------+", - "| a | b | __grouping_id | COUNT(1)[count] |", - "+---+-----+---------------+-----------------+", - "| | 1.0 | 2 | 2 |", - "| | 2.0 | 2 | 2 |", - "| | 3.0 | 2 | 2 |", - "| | 4.0 | 2 | 2 |", - "| 2 | | 1 | 2 |", - "| 2 | 1.0 | 0 | 2 |", - "| 3 | | 1 | 3 |", - "| 3 | 2.0 | 0 | 2 |", - "| 3 | 3.0 | 0 | 1 |", - "| 4 | | 1 | 3 |", - "| 4 | 3.0 | 0 | 1 |", - "| 4 | 4.0 | 0 | 2 |", - "+---+-----+---------------+-----------------+", - ] + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&result), + @r" ++---+-----+---------------+-----------------+ +| a | b | __grouping_id | COUNT(1)[count] | ++---+-----+---------------+-----------------+ +| | 1.0 | 2 | 2 | +| | 2.0 | 2 | 2 | +| | 3.0 | 2 | 2 | +| | 4.0 | 2 | 2 | +| 2 | | 1 | 2 | +| 2 | 1.0 | 0 | 2 | +| 3 | | 1 | 3 | +| 3 | 2.0 | 0 | 2 | +| 3 | 3.0 | 0 | 1 | +| 4 | | 1 | 3 | +| 4 | 3.0 | 0 | 1 | +| 4 | 4.0 | 0 | 2 | ++---+-----+---------------+-----------------+ + " + ); + } }; - assert_batches_sorted_eq!(expected, &result); let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); @@ -1676,26 +1682,29 @@ mod tests { assert_eq!(batch.num_columns(), 4); assert_eq!(batch.num_rows(), 12); - let expected = vec![ - "+---+-----+---------------+----------+", - "| a | b | __grouping_id | COUNT(1) |", - "+---+-----+---------------+----------+", - "| | 1.0 | 2 | 2 |", - "| | 2.0 | 2 | 2 |", - "| | 3.0 | 2 | 2 |", - "| | 4.0 | 2 | 2 |", - "| 2 | | 1 | 2 |", - "| 2 | 1.0 | 0 | 2 |", - "| 3 | | 1 | 3 |", - "| 3 | 2.0 | 0 | 2 |", - "| 3 | 3.0 | 0 | 1 |", - "| 4 | | 1 | 3 |", - "| 4 | 3.0 | 0 | 1 |", - "| 4 | 4.0 | 0 | 2 |", - "+---+-----+---------------+----------+", - ]; - - assert_batches_sorted_eq!(&expected, &result); + allow_duplicates! { + assert_snapshot!( + batches_to_sort_string(&result), + @r" + +---+-----+---------------+----------+ + | a | b | __grouping_id | COUNT(1) | + +---+-----+---------------+----------+ + | | 1.0 | 2 | 2 | + | | 2.0 | 2 | 2 | + | | 3.0 | 2 | 2 | + | | 4.0 | 2 | 2 | + | 2 | | 1 | 2 | + | 2 | 1.0 | 0 | 2 | + | 3 | | 1 | 3 | + | 3 | 2.0 | 0 | 2 | + | 3 | 3.0 | 0 | 1 | + | 4 | | 1 | 3 | + | 4 | 3.0 | 0 | 1 | + | 4 | 4.0 | 0 | 2 | + +---+-----+---------------+----------+ + " + ); + } let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); @@ -1740,30 +1749,33 @@ mod tests { let result = collect(partial_aggregate.execute(0, Arc::clone(&task_ctx))?).await?; - let expected = if spill { - vec![ - "+---+---------------+-------------+", - "| a | AVG(b)[count] | AVG(b)[sum] |", - "+---+---------------+-------------+", - "| 2 | 1 | 1.0 |", - "| 2 | 1 | 1.0 |", - "| 3 | 1 | 2.0 |", - "| 3 | 2 | 5.0 |", - "| 4 | 3 | 11.0 |", - "+---+---------------+-------------+", - ] + if spill { + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&result), @r" + +---+---------------+-------------+ + | a | AVG(b)[count] | AVG(b)[sum] | + +---+---------------+-------------+ + | 2 | 1 | 1.0 | + | 2 | 1 | 1.0 | + | 3 | 1 | 2.0 | + | 3 | 2 | 5.0 | + | 4 | 3 | 11.0 | + +---+---------------+-------------+ + "); + } } else { - vec![ - "+---+---------------+-------------+", - "| a | AVG(b)[count] | AVG(b)[sum] |", - "+---+---------------+-------------+", - "| 2 | 2 | 2.0 |", - "| 3 | 3 | 7.0 |", - "| 4 | 3 | 11.0 |", - "+---+---------------+-------------+", - ] + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&result), @r" + +---+---------------+-------------+ + | a | AVG(b)[count] | AVG(b)[sum] | + +---+---------------+-------------+ + | 2 | 2 | 2.0 | + | 3 | 3 | 7.0 | + | 4 | 3 | 11.0 | + +---+---------------+-------------+ + "); + } }; - assert_batches_sorted_eq!(expected, &result); let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); @@ -1789,17 +1801,19 @@ mod tests { assert_eq!(batch.num_columns(), 2); assert_eq!(batch.num_rows(), 3); - let expected = vec![ - "+---+--------------------+", - "| a | AVG(b) |", - "+---+--------------------+", - "| 2 | 1.0 |", - "| 3 | 2.3333333333333335 |", // 3, (2 + 3 + 2) / 3 - "| 4 | 3.6666666666666665 |", // 4, (3 + 4 + 4) / 3 - "+---+--------------------+", - ]; - - assert_batches_sorted_eq!(&expected, &result); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&result), @r" + +---+--------------------+ + | a | AVG(b) | + +---+--------------------+ + | 2 | 1.0 | + | 3 | 2.3333333333333335 | + | 4 | 3.6666666666666665 | + +---+--------------------+ + "); + // For row 2: 3, (2 + 3 + 2) / 3 + // For row 3: 4, (3 + 4 + 4) / 3 + } let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); @@ -2313,27 +2327,29 @@ mod tests { let result = crate::collect(aggregate_final, task_ctx).await?; if is_first_acc { - let expected = [ - "+---+--------------------------------------------+", - "| a | first_value(b) ORDER BY [b ASC NULLS LAST] |", - "+---+--------------------------------------------+", - "| 2 | 0.0 |", - "| 3 | 1.0 |", - "| 4 | 3.0 |", - "+---+--------------------------------------------+", - ]; - assert_batches_eq!(expected, &result); + allow_duplicates! { + assert_snapshot!(batches_to_string(&result), @r" + +---+--------------------------------------------+ + | a | first_value(b) ORDER BY [b ASC NULLS LAST] | + +---+--------------------------------------------+ + | 2 | 0.0 | + | 3 | 1.0 | + | 4 | 3.0 | + +---+--------------------------------------------+ + "); + } } else { - let expected = [ - "+---+-------------------------------------------+", - "| a | last_value(b) ORDER BY [b ASC NULLS LAST] |", - "+---+-------------------------------------------+", - "| 2 | 3.0 |", - "| 3 | 5.0 |", - "| 4 | 6.0 |", - "+---+-------------------------------------------+", - ]; - assert_batches_eq!(expected, &result); + allow_duplicates! { + assert_snapshot!(batches_to_string(&result), @r" + +---+-------------------------------------------+ + | a | last_value(b) ORDER BY [b ASC NULLS LAST] | + +---+-------------------------------------------+ + | 2 | 3.0 | + | 3 | 5.0 | + | 4 | 6.0 | + +---+-------------------------------------------+ + "); + } }; Ok(()) } @@ -2527,16 +2543,17 @@ mod tests { let output = collect(aggregate_exec.execute(0, Arc::new(TaskContext::default()))?).await?; - let expected = [ - "+-----+-----+-------+---------------+-------+", - "| a | b | const | __grouping_id | 1 |", - "+-----+-----+-------+---------------+-------+", - "| | | 1 | 6 | 32768 |", - "| | 0.0 | | 5 | 32768 |", - "| 0.0 | | | 3 | 32768 |", - "+-----+-----+-------+---------------+-------+", - ]; - assert_batches_sorted_eq!(expected, &output); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&output), @r" + +-----+-----+-------+---------------+-------+ + | a | b | const | __grouping_id | 1 | + +-----+-----+-------+---------------+-------+ + | | | 1 | 6 | 32768 | + | | 0.0 | | 5 | 32768 | + | 0.0 | | | 3 | 32768 | + +-----+-----+-------+---------------+-------+ + "); + } Ok(()) } @@ -2642,15 +2659,16 @@ mod tests { let ctx = TaskContext::default().with_session_config(session_config); let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; - let expected = [ - "+--------------+------------+", - "| labels | SUM(value) |", - "+--------------+------------+", - "| {a: a, b: b} | 2 |", - "| {a: , b: c} | 1 |", - "+--------------+------------+", - ]; - assert_batches_eq!(expected, &output); + allow_duplicates! { + assert_snapshot!(batches_to_string(&output), @r" + +--------------+------------+ + | labels | SUM(value) | + +--------------+------------+ + | {a: a, b: b} | 2 | + | {a: , b: c} | 1 | + +--------------+------------+ + "); + } Ok(()) } @@ -2717,19 +2735,20 @@ mod tests { let ctx = TaskContext::default().with_session_config(session_config); let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; - let expected = [ - "+-----+-------------------+", - "| key | COUNT(val)[count] |", - "+-----+-------------------+", - "| 1 | 1 |", - "| 2 | 1 |", - "| 3 | 1 |", - "| 2 | 1 |", - "| 3 | 1 |", - "| 4 | 1 |", - "+-----+-------------------+", - ]; - assert_batches_eq!(expected, &output); + allow_duplicates! { + assert_snapshot!(batches_to_string(&output), @r" + +-----+-------------------+ + | key | COUNT(val)[count] | + +-----+-------------------+ + | 1 | 1 | + | 2 | 1 | + | 3 | 1 | + | 2 | 1 | + | 3 | 1 | + | 4 | 1 | + +-----+-------------------+ + "); + } Ok(()) } @@ -2804,20 +2823,21 @@ mod tests { let ctx = TaskContext::default().with_session_config(session_config); let output = collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?; - let expected = [ - "+-----+-------------------+", - "| key | COUNT(val)[count] |", - "+-----+-------------------+", - "| 1 | 1 |", - "| 2 | 2 |", - "| 3 | 2 |", - "| 4 | 1 |", - "| 2 | 1 |", - "| 3 | 1 |", - "| 4 | 1 |", - "+-----+-------------------+", - ]; - assert_batches_eq!(expected, &output); + allow_duplicates! { + assert_snapshot!(batches_to_string(&output), @r" + +-----+-------------------+ + | key | COUNT(val)[count] | + +-----+-------------------+ + | 1 | 1 | + | 2 | 2 | + | 3 | 2 | + | 4 | 1 | + | 2 | 1 | + | 3 | 1 | + | 4 | 1 | + +-----+-------------------+ + "); + } Ok(()) } @@ -2948,19 +2968,17 @@ mod tests { assert_spill_count_metric(expect_spill, single_aggregate); - #[rustfmt::skip] - assert_batches_sorted_eq!( - [ - "+---+--------+--------+", - "| a | MIN(b) | AVG(b) |", - "+---+--------+--------+", - "| 2 | 1.0 | 1.0 |", - "| 3 | 2.0 | 2.0 |", - "| 4 | 3.0 | 3.5 |", - "+---+--------+--------+", - ], - &result - ); + allow_duplicates! { + assert_snapshot!(batches_to_string(&result), @r" + +---+--------+--------+ + | a | MIN(b) | AVG(b) | + +---+--------+--------+ + | 2 | 1.0 | 1.0 | + | 3 | 2.0 | 2.0 | + | 4 | 3.0 | 3.5 | + +---+--------+--------+ + "); + } Ok(()) } diff --git a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs index 25cf4251888df..a09d70f7471f3 100644 --- a/datafusion/physical-plan/src/aggregates/topk/priority_map.rs +++ b/datafusion/physical-plan/src/aggregates/topk/priority_map.rs @@ -113,6 +113,7 @@ mod tests { }; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::util::pretty::pretty_format_batches; + use insta::assert_snapshot; use std::sync::Arc; #[test] @@ -180,15 +181,15 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 1 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -205,15 +206,15 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 1 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -230,15 +231,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 2 | 2 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -255,15 +255,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 1 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -280,15 +279,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 2 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -305,15 +303,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 1 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -330,15 +327,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 2 | 2 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -355,15 +351,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 1 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -380,15 +375,14 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ | 1 | 2 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } @@ -406,7 +400,7 @@ mod tests { let cols = agg.emit()?; let batch = RecordBatch::try_new(test_schema(), cols)?; let actual = format!("{}", pretty_format_batches(&[batch])?); - let expected = r#" + assert_snapshot!(actual, @r#" +----------+--------------+ | trace_id | timestamp_ms | +----------+--------------+ @@ -414,8 +408,7 @@ mod tests { | 1 | 1 | +----------+--------------+ "# - .trim(); - assert_eq!(actual, expected); + ); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 35c8961065a5b..639fae7615af0 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -645,8 +645,9 @@ mod tests { use crate::common; use crate::test::build_table_scan_i32; - use datafusion_common::{assert_batches_sorted_eq, assert_contains}; + use datafusion_common::{assert_contains, test_util::batches_to_sort_string}; use datafusion_execution::runtime_env::RuntimeEnvBuilder; + use insta::assert_snapshot; async fn join_collect( left: Arc, @@ -829,20 +830,19 @@ mod tests { let (columns, batches) = join_collect(left, right, task_ctx).await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 12 | 14 |", - "| 1 | 4 | 7 | 11 | 13 | 15 |", - "| 2 | 5 | 8 | 10 | 12 | 14 |", - "| 2 | 5 | 8 | 11 | 13 | 15 |", - "| 3 | 6 | 9 | 10 | 12 | 14 |", - "| 3 | 6 | 9 | 11 | 13 | 15 |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 12 | 14 | + | 1 | 4 | 7 | 11 | 13 | 15 | + | 2 | 5 | 8 | 10 | 12 | 14 | + | 2 | 5 | 8 | 11 | 13 | 15 | + | 3 | 6 | 9 | 10 | 12 | 14 | + | 3 | 6 | 9 | 11 | 13 | 15 | + +----+----+----+----+----+----+ + "#); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e9d6354e21d71..376c3590b88f0 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1663,6 +1663,7 @@ mod tests { use arrow::array::{Date32Array, Int32Array, StructArray}; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field}; + use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, ScalarValue, @@ -1673,6 +1674,7 @@ mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::PhysicalExpr; use hashbrown::HashTable; + use insta::{allow_duplicates, assert_snapshot}; use rstest::*; use rstest_reuse::*; @@ -1883,18 +1885,18 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 5 | 9 | 20 | 5 | 80 |", - "+----+----+----+----+----+----+", - ]; - - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + // Inner join output is expected to preserve both inputs order + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 5 | 9 | 20 | 5 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -1930,16 +1932,17 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 5 | 9 | 20 | 5 | 80 |", - "+----+----+----+----+----+----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 5 | 9 | 20 | 5 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -1967,18 +1970,18 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 5 | 9 | 20 | 5 | 80 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 5 | 9 | 20 | 5 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2006,19 +2009,19 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 3 | 5 | 9 | 20 | 5 | 80 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 0 | 4 | 6 | 10 | 4 | 70 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | 3 | 5 | 9 | 20 | 5 | 80 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 0 | 4 | 6 | 10 | 4 | 70 | + | 1 | 4 | 7 | 10 | 4 | 70 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2069,18 +2072,18 @@ mod tests { assert_eq!(batches.len(), expected_batch_count); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b2 | c1 | a1 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 1 | 7 | 1 | 1 | 70 |", - "| 2 | 2 | 8 | 2 | 2 | 80 |", - "| 2 | 2 | 9 | 2 | 2 | 80 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b2 | c1 | a1 | b2 | c2 | + +----+----+----+----+----+----+ + | 1 | 1 | 7 | 1 | 1 | 70 | + | 2 | 2 | 8 | 2 | 2 | 80 | + | 2 | 2 | 9 | 2 | 2 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2139,18 +2142,18 @@ mod tests { assert_eq!(batches.len(), expected_batch_count); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b2 | c1 | a1 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 1 | 7 | 1 | 1 | 70 |", - "| 2 | 2 | 8 | 2 | 2 | 80 |", - "| 2 | 2 | 9 | 2 | 2 | 80 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b2 | c1 | a1 | b2 | c2 | + +----+----+----+----+----+----+ + | 1 | 1 | 7 | 1 | 1 | 70 | + | 2 | 2 | 8 | 2 | 2 | 80 | + | 2 | 2 | 9 | 2 | 2 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2188,19 +2191,19 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 3 | 5 | 9 | 20 | 5 | 80 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 0 | 4 | 6 | 10 | 4 | 70 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | 3 | 5 | 9 | 20 | 5 | 80 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 0 | 4 | 6 | 10 | 4 | 70 | + | 1 | 4 | 7 | 10 | 4 | 70 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2257,16 +2260,16 @@ mod tests { }; assert_eq!(batches.len(), expected_batch_count); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + +----+----+----+----+----+----+ + "#); + } // second part let stream = join.execute(1, Arc::clone(&task_ctx))?; @@ -2282,17 +2285,17 @@ mod tests { }; assert_eq!(batches.len(), expected_batch_count); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 2 | 5 | 8 | 30 | 5 | 90 |", - "| 3 | 5 | 9 | 30 | 5 | 90 |", - "+----+----+----+----+----+----+", - ]; - // Inner join output is expected to preserve both inputs order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 2 | 5 | 8 | 30 | 5 | 90 | + | 3 | 5 | 9 | 30 | 5 | 90 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2334,19 +2337,19 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } } #[apply(batch_sizes)] @@ -2377,21 +2380,21 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| | | | 30 | 6 | 90 |", - "| | | | 30 | 6 | 90 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | | | | 30 | 6 | 90 | + | | | | 30 | 6 | 90 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } } #[apply(batch_sizes)] @@ -2418,17 +2421,17 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | | | |", - "| 2 | 5 | 8 | | | |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | | | | + | 2 | 5 | 8 | | | | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } } #[apply(batch_sizes)] @@ -2455,17 +2458,17 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | | | |", - "| 2 | 5 | 8 | | | |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | | | | + | 2 | 5 | 8 | | | | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } } #[apply(batch_sizes)] @@ -2498,16 +2501,17 @@ mod tests { .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2542,16 +2546,17 @@ mod tests { .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -2597,16 +2602,17 @@ mod tests { let batches = common::collect(stream).await?; // ignore the order - let expected = [ - "+----+----+-----+", - "| a1 | b1 | c1 |", - "+----+----+-----+", - "| 11 | 8 | 110 |", - "| 13 | 10 | 130 |", - "| 9 | 8 | 90 |", - "+----+----+-----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 11 | 8 | 110 | + | 13 | 10 | 130 | + | 9 | 8 | 90 | + +----+----+-----+ + "#); + } Ok(()) } @@ -2658,16 +2664,17 @@ mod tests { let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a1 | b1 | c1 |", - "+----+----+-----+", - "| 11 | 8 | 110 |", - "| 13 | 10 | 130 |", - "| 9 | 8 | 90 |", - "+----+----+-----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 11 | 8 | 110 | + | 13 | 10 | 130 | + | 9 | 8 | 90 | + +----+----+-----+ + "); + } // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10 let filter_expression = Arc::new(BinaryExpr::new( @@ -2689,14 +2696,15 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a1 | b1 | c1 |", - "+----+----+-----+", - "| 13 | 10 | 130 |", - "+----+----+-----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 13 | 10 | 130 | + +----+----+-----+ + "#); + } Ok(()) } @@ -2722,18 +2730,18 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a2 | b2 | c2 |", - "+----+----+-----+", - "| 8 | 8 | 20 |", - "| 12 | 10 | 40 |", - "| 10 | 10 | 100 |", - "+----+----+-----+", - ]; - // RightSemi join output is expected to preserve right input order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 8 | 8 | 20 | + | 12 | 10 | 40 | + | 10 | 10 | 100 | + +----+----+-----+ + "#); + } Ok(()) } @@ -2785,18 +2793,18 @@ mod tests { let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a2 | b2 | c2 |", - "+----+----+-----+", - "| 8 | 8 | 20 |", - "| 12 | 10 | 40 |", - "| 10 | 10 | 100 |", - "+----+----+-----+", - ]; - // RightSemi join output is expected to preserve right input order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 8 | 8 | 20 | + | 12 | 10 | 40 | + | 10 | 10 | 100 | + +----+----+-----+ + "#); + } // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9 let filter_expression = Arc::new(BinaryExpr::new( @@ -2816,17 +2824,17 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a2 | b2 | c2 |", - "+----+----+-----+", - "| 12 | 10 | 40 |", - "| 10 | 10 | 100 |", - "+----+----+-----+", - ]; - // RightSemi join output is expected to preserve right input order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 12 | 10 | 40 | + | 10 | 10 | 100 | + +----+----+-----+ + "#); + } Ok(()) } @@ -2851,17 +2859,18 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+----+", - "| a1 | b1 | c1 |", - "+----+----+----+", - "| 1 | 1 | 10 |", - "| 3 | 3 | 30 |", - "| 5 | 5 | 50 |", - "| 7 | 7 | 70 |", - "+----+----+----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+ + | a1 | b1 | c1 | + +----+----+----+ + | 1 | 1 | 10 | + | 3 | 3 | 30 | + | 5 | 5 | 50 | + | 7 | 7 | 70 | + +----+----+----+ + "#); + } Ok(()) } @@ -2910,19 +2919,20 @@ mod tests { let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a1 | b1 | c1 |", - "+----+----+-----+", - "| 1 | 1 | 10 |", - "| 11 | 8 | 110 |", - "| 3 | 3 | 30 |", - "| 5 | 5 | 50 |", - "| 7 | 7 | 70 |", - "| 9 | 8 | 90 |", - "+----+----+-----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 1 | 1 | 10 | + | 11 | 8 | 110 | + | 3 | 3 | 30 | + | 5 | 5 | 50 | + | 7 | 7 | 70 | + | 9 | 8 | 90 | + +----+----+-----+ + "#); + } // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13 let filter_expression = Arc::new(BinaryExpr::new( @@ -2945,19 +2955,20 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a1 | b1 | c1 |", - "+----+----+-----+", - "| 1 | 1 | 10 |", - "| 11 | 8 | 110 |", - "| 3 | 3 | 30 |", - "| 5 | 5 | 50 |", - "| 7 | 7 | 70 |", - "| 9 | 8 | 90 |", - "+----+----+-----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 1 | 1 | 10 | + | 11 | 8 | 110 | + | 3 | 3 | 30 | + | 5 | 5 | 50 | + | 7 | 7 | 70 | + | 9 | 8 | 90 | + +----+----+-----+ + "#); + } Ok(()) } @@ -2981,18 +2992,18 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a2 | b2 | c2 |", - "+----+----+-----+", - "| 6 | 6 | 60 |", - "| 2 | 2 | 80 |", - "| 4 | 4 | 120 |", - "+----+----+-----+", - ]; - // RightAnti join output is expected to preserve right input order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 6 | 6 | 60 | + | 2 | 2 | 80 | + | 4 | 4 | 120 | + +----+----+-----+ + "#); + } Ok(()) } @@ -3042,20 +3053,20 @@ mod tests { let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a2 | b2 | c2 |", - "+----+----+-----+", - "| 12 | 10 | 40 |", - "| 6 | 6 | 60 |", - "| 2 | 2 | 80 |", - "| 10 | 10 | 100 |", - "| 4 | 4 | 120 |", - "+----+----+-----+", - ]; - // RightAnti join output is expected to preserve right input order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 12 | 10 | 40 | + | 6 | 6 | 60 | + | 2 | 2 | 80 | + | 10 | 10 | 100 | + | 4 | 4 | 120 | + +----+----+-----+ + "#); + } // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8 let column_indices = vec![ColumnIndex { @@ -3083,19 +3094,19 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+-----+", - "| a2 | b2 | c2 |", - "+----+----+-----+", - "| 8 | 8 | 20 |", - "| 6 | 6 | 60 |", - "| 2 | 2 | 80 |", - "| 4 | 4 | 120 |", - "+----+----+-----+", - ]; - // RightAnti join output is expected to preserve right input order - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 8 | 8 | 20 | + | 6 | 6 | 60 | + | 2 | 2 | 80 | + | 4 | 4 | 120 | + +----+----+-----+ + "#); + } Ok(()) } @@ -3124,17 +3135,17 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| | | | 30 | 6 | 90 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | | | | 30 | 6 | 90 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -3164,17 +3175,17 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b1 | c2 |", - "+----+----+----+----+----+----+", - "| | | | 30 | 6 | 90 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "+----+----+----+----+----+----+", - ]; - - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b1 | c2 | + +----+----+----+----+----+----+ + | | | | 30 | 6 | 90 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -3206,17 +3217,18 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+----+----+----+----+----+----+", - "| a1 | b1 | c1 | a2 | b2 | c2 |", - "+----+----+----+----+----+----+", - "| | | | 30 | 6 | 90 |", - "| 1 | 4 | 7 | 10 | 4 | 70 |", - "| 2 | 5 | 8 | 20 | 5 | 80 |", - "| 3 | 7 | 9 | | | |", - "+----+----+----+----+----+----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+----+ + | | | | 30 | 6 | 90 | + | 1 | 4 | 7 | 10 | 4 | 70 | + | 2 | 5 | 8 | 20 | 5 | 80 | + | 3 | 7 | 9 | | | | + +----+----+----+----+----+----+ + "#); + } Ok(()) } @@ -3251,16 +3263,17 @@ mod tests { .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]); - let expected = [ - "+----+----+----+-------+", - "| a1 | b1 | c1 | mark |", - "+----+----+----+-------+", - "| 1 | 4 | 7 | true |", - "| 2 | 5 | 8 | true |", - "| 3 | 7 | 9 | false |", - "+----+----+----+-------+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+-------+ + | a1 | b1 | c1 | mark | + +----+----+----+-------+ + | 1 | 4 | 7 | true | + | 2 | 5 | 8 | true | + | 3 | 7 | 9 | false | + +----+----+----+-------+ + "#); + } Ok(()) } @@ -3295,16 +3308,17 @@ mod tests { .await?; assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]); - let expected = [ - "+----+----+----+-------+", - "| a1 | b1 | c1 | mark |", - "+----+----+----+-------+", - "| 1 | 4 | 7 | true |", - "| 2 | 5 | 8 | true |", - "| 3 | 7 | 9 | false |", - "+----+----+----+-------+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +----+----+----+-------+ + | a1 | b1 | c1 | mark | + +----+----+----+-------+ + | 1 | 4 | 7 | true | + | 2 | 5 | 8 | true | + | 3 | 7 | 9 | false | + +----+----+----+-------+ + "#); + } Ok(()) } @@ -3401,15 +3415,16 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+---+---+---+----+---+----+", - "| a | b | c | a | b | c |", - "+---+---+---+----+---+----+", - "| 1 | 4 | 7 | 10 | 1 | 70 |", - "| 2 | 5 | 8 | 20 | 2 | 80 |", - "+---+---+---+----+---+----+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +---+---+---+----+---+----+ + | a | b | c | a | b | c | + +---+---+---+----+---+----+ + | 1 | 4 | 7 | 10 | 1 | 70 | + | 2 | 5 | 8 | 20 | 2 | 80 | + +---+---+---+----+---+----+ + "#); + } Ok(()) } @@ -3470,15 +3485,16 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+---+---+---+----+---+---+", - "| a | b | c | a | b | c |", - "+---+---+---+----+---+---+", - "| 2 | 7 | 9 | 10 | 2 | 7 |", - "| 2 | 7 | 9 | 20 | 2 | 5 |", - "+---+---+---+----+---+---+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +---+---+---+----+---+---+ + | a | b | c | a | b | c | + +---+---+---+----+---+---+ + | 2 | 7 | 9 | 10 | 2 | 7 | + | 2 | 7 | 9 | 20 | 2 | 5 | + +---+---+---+----+---+---+ + "#); + } Ok(()) } @@ -3511,18 +3527,19 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+---+---+---+----+---+---+", - "| a | b | c | a | b | c |", - "+---+---+---+----+---+---+", - "| 0 | 4 | 7 | | | |", - "| 1 | 5 | 8 | | | |", - "| 2 | 7 | 9 | 10 | 2 | 7 |", - "| 2 | 7 | 9 | 20 | 2 | 5 |", - "| 2 | 8 | 1 | | | |", - "+---+---+---+----+---+---+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +---+---+---+----+---+---+ + | a | b | c | a | b | c | + +---+---+---+----+---+---+ + | 0 | 4 | 7 | | | | + | 1 | 5 | 8 | | | | + | 2 | 7 | 9 | 10 | 2 | 7 | + | 2 | 7 | 9 | 20 | 2 | 5 | + | 2 | 8 | 1 | | | | + +---+---+---+----+---+---+ + "#); + } Ok(()) } @@ -3555,17 +3572,18 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+---+---+---+----+---+---+", - "| a | b | c | a | b | c |", - "+---+---+---+----+---+---+", - "| | | | 30 | 3 | 6 |", - "| | | | 40 | 4 | 4 |", - "| 2 | 7 | 9 | 10 | 2 | 7 |", - "| 2 | 7 | 9 | 20 | 2 | 5 |", - "+---+---+---+----+---+---+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +---+---+---+----+---+---+ + | a | b | c | a | b | c | + +---+---+---+----+---+---+ + | | | | 30 | 3 | 6 | + | | | | 40 | 4 | 4 | + | 2 | 7 | 9 | 10 | 2 | 7 | + | 2 | 7 | 9 | 20 | 2 | 5 | + +---+---+---+----+---+---+ + "#); + } Ok(()) } @@ -3613,6 +3631,23 @@ mod tests { ]; assert_batches_sorted_eq!(expected, &batches); + // THIS MIGRATION HAULTED DUE TO ISSUE #15312 + //allow_duplicates! { + // assert_snapshot!(batches_to_sort_string(&batches), @r#" + // +---+---+---+----+---+---+ + // | a | b | c | a | b | c | + // +---+---+---+----+---+---+ + // | | | | 30 | 3 | 6 | + // | | | | 40 | 4 | 4 | + // | 2 | 7 | 9 | 10 | 2 | 7 | + // | 2 | 7 | 9 | 20 | 2 | 5 | + // | 0 | 4 | 7 | | | | + // | 1 | 5 | 8 | | | | + // | 2 | 8 | 1 | | | | + // +---+---+---+----+---+---+ + // "#) + //} + Ok(()) } @@ -3768,16 +3803,17 @@ mod tests { let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; - let expected = [ - "+------------+---+------------+---+", - "| date | n | date | n |", - "+------------+---+------------+---+", - "| 2022-04-26 | 2 | 2022-04-26 | 4 |", - "| 2022-04-26 | 2 | 2022-04-26 | 5 |", - "| 2022-04-27 | 3 | 2022-04-27 | 6 |", - "+------------+---+------------+---+", - ]; - assert_batches_sorted_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r#" + +------------+---+------------+---+ + | date | n | date | n | + +------------+---+------------+---+ + | 2022-04-26 | 2 | 2022-04-26 | 4 | + | 2022-04-26 | 2 | 2022-04-26 | 5 | + | 2022-04-27 | 3 | 2022-04-27 | 6 | + +------------+---+------------+---+ + "#); + } Ok(()) } @@ -4162,16 +4198,17 @@ mod tests { assert_eq!(columns, vec!["n1", "n2"]); - let expected = [ - "+--------+--------+", - "| n1 | n2 |", - "+--------+--------+", - "| {a: } | {a: } |", - "| {a: 1} | {a: 1} |", - "| {a: 2} | {a: 2} |", - "+--------+--------+", - ]; - assert_batches_eq!(expected, &batches); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r#" + +--------+--------+ + | n1 | n2 | + +--------+--------+ + | {a: } | {a: } | + | {a: 1} | {a: 1} | + | {a: 2} | {a: 2} | + +--------+--------+ + "#); + } Ok(()) } @@ -4198,14 +4235,15 @@ mod tests { ) .await?; - let expected_null_eq = [ - "+----+----+", - "| n1 | n2 |", - "+----+----+", - "| | |", - "+----+----+", - ]; - assert_batches_eq!(expected_null_eq, &batches_null_eq); + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches_null_eq), @r#" + +----+----+ + | n1 | n2 | + +----+----+ + | | | + +----+----+ + "#); + } let (_, batches_null_neq) = join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?; diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 4e70d2dc4ee59..c06b09f2fecd5 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -958,7 +958,8 @@ mod tests { }; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::{Field, Int32Type}; - use datafusion_common::assert_batches_eq; + use datafusion_common::test_util::batches_to_string; + use insta::assert_snapshot; // Create a GenericListArray with the following list values: // [A, B, C], [], NULL, [D], NULL, [NULL, F] @@ -1145,33 +1146,33 @@ mod tests { )? .unwrap(); - let expected = &[ -"+---------------------------------+---------------------------------+---------------------------------+", -"| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 |", -"+---------------------------------+---------------------------------+---------------------------------+", -"| [1, 2, 3] | 1 | a |", -"| | 2 | b |", -"| [4, 5] | 3 | |", -"| [1, 2, 3] | | a |", -"| | | b |", -"| [4, 5] | | |", -"| [1, 2, 3] | 4 | a |", -"| | 5 | b |", -"| [4, 5] | | |", -"| [7, 8, 9, 10] | 7 | c |", -"| | 8 | d |", -"| [11, 12, 13] | 9 | |", -"| | 10 | |", -"| [7, 8, 9, 10] | | c |", -"| | | d |", -"| [11, 12, 13] | | |", -"| [7, 8, 9, 10] | 11 | c |", -"| | 12 | d |", -"| [11, 12, 13] | 13 | |", -"| | | e |", -"+---------------------------------+---------------------------------+---------------------------------+", - ]; - assert_batches_eq!(expected, &[ret]); + assert_snapshot!(batches_to_string(&[ret]), + @r###" ++---------------------------------+---------------------------------+---------------------------------+ +| col1_unnest_placeholder_depth_1 | col1_unnest_placeholder_depth_2 | col2_unnest_placeholder_depth_1 | ++---------------------------------+---------------------------------+---------------------------------+ +| [1, 2, 3] | 1 | a | +| | 2 | b | +| [4, 5] | 3 | | +| [1, 2, 3] | | a | +| | | b | +| [4, 5] | | | +| [1, 2, 3] | 4 | a | +| | 5 | b | +| [4, 5] | | | +| [7, 8, 9, 10] | 7 | c | +| | 8 | d | +| [11, 12, 13] | 9 | | +| | 10 | | +| [7, 8, 9, 10] | | c | +| | | d | +| [11, 12, 13] | | | +| [7, 8, 9, 10] | 11 | c | +| | 12 | d | +| [11, 12, 13] | 13 | | +| | | e | ++---------------------------------+---------------------------------+---------------------------------+ + "###); Ok(()) }