diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index c49eb65988e02..140cf7e5c75b5 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -18,6 +18,6 @@ mod aggregate_fuzz; mod join_fuzz; mod merge_fuzz; -mod order_spill_fuzz; +mod sort_fuzz; mod sort_preserving_repartition_fuzz; mod window_fuzz; diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs similarity index 52% rename from datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs rename to datafusion/core/tests/fuzz_cases/sort_fuzz.rs index d927b2807d7be..6c427c7fb7b3d 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs @@ -36,28 +36,112 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec}; #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_1k_mem() { - run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await + SortTest::new() + .with_int32_batches(5) + .with_pool_size(10240) + .with_should_spill(false) + .run() + .await; + + SortTest::new() + .with_int32_batches(20000) + .with_pool_size(10240) + .with_should_spill(true) + .run() + .await; + + SortTest::new() + .with_int32_batches(1000000) + .with_pool_size(10240) + .with_should_spill(true) + .run() + .await; } #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn test_sort_100k_mem() { - run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await + SortTest::new() + .with_int32_batches(5) + .with_pool_size(102400) + .with_should_spill(false) + .run() + .await; + + SortTest::new() + .with_int32_batches(20000) + .with_pool_size(102400) + .with_should_spill(false) + .run() + .await; + + SortTest::new() + .with_int32_batches(1000000) + .with_pool_size(102400) + .with_should_spill(true) + .run() + .await; } #[tokio::test] async fn test_sort_unlimited_mem() { - run_sort( - usize::MAX, - vec![(5, false), (2000, false), (1000000, false)], - ) - .await + SortTest::new() + .with_int32_batches(5) + .with_pool_size(usize::MAX) + .with_should_spill(false) + .run() + .await; + + SortTest::new() + .with_int32_batches(20000) + .with_pool_size(usize::MAX) + .with_should_spill(false) + .run() + .await; + + SortTest::new() + .with_int32_batches(1000000) + .with_pool_size(usize::MAX) + .with_should_spill(false) + .run() + .await; } -/// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` -async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { - for (size, spill) in size_spill { - let input = vec![make_staggered_batches(size)]; +#[derive(Debug, Default)] +struct SortTest { + input: Vec>, + /// GreedyMemoryPool size, if specified + pool_size: Option, + /// If true, expect the sort to spill + should_spill: bool, +} + +impl SortTest { + fn new() -> Self { + Default::default() + } + + /// Create batches of int32 values of rows + fn with_int32_batches(mut self, rows: usize) -> Self { + self.input = vec![make_staggered_i32_batches(rows)]; + self + } + + /// specify that this test should use a memory pool of the specifeid size + fn with_pool_size(mut self, pool_size: usize) -> Self { + self.pool_size = Some(pool_size); + self + } + + fn with_should_spill(mut self, should_spill: bool) -> Self { + self.should_spill = should_spill; + self + } + + /// Sort the input using SortExec and ensure the results are + /// correct according to `Vec::sort` both with and without spilling + async fn run(&self) { + let input = self.input.clone(); let first_batch = input .iter() .flat_map(|p| p.iter()) @@ -77,19 +161,23 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); let session_config = SessionConfig::new(); - // Make sure there is enough space for the initial spill - // reservation - let pool_size = pool_size.saturating_add( - session_config - .options() - .execution - .sort_spill_reservation_bytes, - ); + let session_ctx = if let Some(pool_size) = self.pool_size { + // Make sure there is enough space for the initial spill + // reservation + let pool_size = pool_size.saturating_add( + session_config + .options() + .execution + .sort_spill_reservation_bytes, + ); - let runtime_config = RuntimeConfig::new() - .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); - let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let session_ctx = SessionContext::with_config_rt(session_config, runtime); + let runtime_config = RuntimeConfig::new() + .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); + let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); + SessionContext::with_config_rt(session_config, runtime) + } else { + SessionContext::with_config(session_config) + }; let task_ctx = session_ctx.task_ctx(); let collected = collect(sort.clone(), task_ctx).await.unwrap(); @@ -97,17 +185,17 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let expected = partitions_to_sorted_vec(&input); let actual = batches_to_vec(&collected); - if spill { + if self.should_spill { assert_ne!( sort.metrics().unwrap().spill_count().unwrap(), 0, - "{pool_size} {size}" + "Expected spill, but did not: {self:?}" ); } else { assert_eq!( sort.metrics().unwrap().spill_count().unwrap(), 0, - "{pool_size} {size}" + "Expected no spill, but did: {self:?}" ); } @@ -116,13 +204,13 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { 0, "The sort should have returned all memory used back to the memory pool" ); - assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); + assert_eq!(expected, actual, "failure in @ pool_size {self:?}"); } } /// Return randomly sized record batches in a field named 'x' of type `Int32` /// with randomized i32 content -fn make_staggered_batches(len: usize) -> Vec { +fn make_staggered_i32_batches(len: usize) -> Vec { let mut rng = rand::thread_rng(); let max_batch = 1024;