diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c7ae09bb2e340..1ca9d21128f0f 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -85,6 +85,10 @@ impl ExternalSorterMetrics { /// /// 1. get a non-empty new batch from input /// +/// 1.2. if a `fetch` parameter has been provided, and the batch size +/// is larger than `fetch`, sort the incoming batch in order to +/// reduce its size and thus use less memory. +/// /// 2. check with the memory manager there is sufficient space to /// buffer the batch in memory 2.1 if memory sufficient, buffer /// batch in memory, go to 1. @@ -196,9 +200,13 @@ impl ExternalSorterMetrics { struct ExternalSorter { /// schema of the output (and the input) schema: SchemaRef, - /// Potentially unsorted in memory buffer - in_mem_batches: Vec, - /// if `Self::in_mem_batches` are sorted + /// A vector of tuples, with each tuple consisting of a flag + /// denoting whether the batch is sorted, and the batch itself + in_mem_batches: Vec<(bool, RecordBatch)>, + /// A flag denoting whether the inter-batch order is guaranteed; + /// note that this is a stronger signal than just having all + /// individual batches sorted—it means that we can stream the + /// entire vector of batches inside one stream for the merge-sort in_mem_batches_sorted: bool, /// If data has previously been spilled, the locations of the /// spill files (in Arrow IPC format) @@ -238,7 +246,7 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], - in_mem_batches_sorted: true, + in_mem_batches_sorted: false, spills: vec![], expr: expr.into(), metrics, @@ -253,11 +261,23 @@ impl ExternalSorter { /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk - async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { + async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> { if input.num_rows() == 0 { return Ok(()); } + let mut batch_sorted = false; + if self + .fetch + .map_or(false, |f| f <= input.num_rows() && f <= 100) + { + // Eagerly sort the batch to potentially reduce the number of rows + // after applying the fetch parameter. + // Currently only applied for fetch of 100 rows or less. + input = sort_batch(&input, &self.expr, self.fetch)?; + batch_sorted = true; + } + let size = batch_byte_size(&input); if self.reservation.try_grow(size).is_err() { let before = self.reservation.size(); @@ -279,8 +299,8 @@ impl ExternalSorter { } } - self.in_mem_batches.push(input); - self.in_mem_batches_sorted = false; + self.in_mem_batches.push((batch_sorted, input)); + self.in_mem_batches_sorted = batch_sorted && self.in_mem_batches.len() == 1; Ok(()) } @@ -345,7 +365,7 @@ impl ExternalSorter { } /// Writes any `in_memory_batches` to a spill file and clears - /// the batches. The contents of the spil file are sorted. + /// the batches. The contents of the spill file are sorted. /// /// Returns the amount of memory freed. async fn spill(&mut self) -> Result { @@ -359,7 +379,11 @@ impl ExternalSorter { self.in_mem_sort().await?; let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; - let batches = std::mem::take(&mut self.in_mem_batches); + + let (sorted, batches): (Vec, Vec) = + std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); + assert!(sorted.iter().all(|&s| s)); + spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; let used = self.reservation.free(); self.metrics.spill_count.add(1); @@ -370,23 +394,27 @@ impl ExternalSorter { /// Sorts the in_mem_batches in place async fn in_mem_sort(&mut self) -> Result<()> { - if self.in_mem_batches_sorted { + if self.in_mem_batches.is_empty() || self.in_mem_batches_sorted { return Ok(()); } self.in_mem_batches = self .in_mem_sort_stream(self.metrics.baseline.intermediate())? - .try_collect() - .await?; + .try_collect::>() + .await? + .into_iter() + .map(|batch| (true, batch)) + .collect(); + // We're now also guaranteed that the inter-batch order holds + self.in_mem_batches_sorted = true; let size: usize = self .in_mem_batches .iter() - .map(|x| x.get_array_memory_size()) + .map(|(_, x)| x.get_array_memory_size()) .sum(); self.reservation.resize(size); - self.in_mem_batches_sorted = true; Ok(()) } @@ -454,27 +482,33 @@ impl ExternalSorter { ) -> Result { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.remove(0); - let stream = self.sort_batch_stream(batch, metrics)?; + let (sorted, batch) = self.in_mem_batches.remove(0); + let stream = self.sort_batch_stream(batch, sorted, metrics)?; self.in_mem_batches.clear(); return Ok(stream); } - // If less than 1MB of in-memory data, concatenate and sort in place + // If less than 1MB of in-memory data and no batch is sorted, concatenate and sort in place // // This is a very rough heuristic and likely could be refined further - if self.reservation.size() < 1048576 { + let no_batches_sorted = !self.in_mem_batches.iter().any(|(sorted, _)| *sorted); + if self.reservation.size() < 1048576 && no_batches_sorted { // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; + let (_, batches): (Vec, Vec) = + std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); + let batch = concat_batches(&self.schema, &batches)?; self.in_mem_batches.clear(); - return self.sort_batch_stream(batch, metrics); + return self.sort_batch_stream(batch, false, metrics); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() - .map(|batch| { + .map(|(sorted, batch)| { let metrics = self.metrics.baseline.intermediate(); - Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) + Ok(spawn_buffered( + self.sort_batch_stream(batch, sorted, metrics)?, + 1, + )) }) .collect::>()?; @@ -492,27 +526,34 @@ impl ExternalSorter { fn sort_batch_stream( &self, batch: RecordBatch, + sorted: bool, metrics: BaselineMetrics, ) -> Result { let schema = batch.schema(); - let mut reservation = - MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(batch.get_array_memory_size()); - - let fetch = self.fetch; - let expressions = self.expr.clone(); - let stream = futures::stream::once(futures::future::lazy(move |_| { - let sorted = sort_batch(&batch, &expressions, fetch)?; - metrics.record_output(sorted.num_rows()); - drop(batch); - reservation.free(); - Ok(sorted) - })); - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + if !sorted { + // Reserve some memory for sorting the batch + let mut reservation = + MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) + .register(&self.runtime.memory_pool); + + // TODO: This should probably be try_grow (#5885) + reservation.resize(batch.get_array_memory_size()); + + let fetch = self.fetch; + let expressions = self.expr.clone(); + let stream = futures::stream::once(futures::future::lazy(move |_| { + let output = sort_batch(&batch, &expressions, fetch)?; + metrics.record_output(output.num_rows()); + drop(batch); + reservation.free(); + Ok(output) + })); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } else { + let stream = futures::stream::once(futures::future::lazy(move |_| Ok(batch))); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } } } @@ -562,22 +603,6 @@ async fn spill_sorted_batches( } } -fn read_spill_as_stream( - path: NamedTempFile, - schema: SchemaRef, -) -> Result { - let mut builder = RecordBatchReceiverStream::builder(schema, 2); - let sender = builder.tx(); - - builder.spawn_blocking(move || { - if let Err(e) = read_spill(sender, path.path()) { - error!("Failure while reading spill file: {:?}. Error: {}", path, e); - } - }); - - Ok(builder.build()) -} - fn write_sorted( batches: Vec, path: PathBuf, @@ -597,6 +622,23 @@ fn write_sorted( Ok(()) } +/// Stream batches from spill files inside a single stream. +fn read_spill_as_stream( + path: NamedTempFile, + schema: SchemaRef, +) -> Result { + let mut builder = RecordBatchReceiverStream::builder(schema, 2); + let sender = builder.tx(); + + builder.spawn_blocking(move || { + if let Err(e) = read_spill(sender, path.path()) { + error!("Failure while reading spill file: {:?}. Error: {}", path, e); + } + }); + + Ok(builder.build()) +} + fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(path)?); let reader = FileReader::try_new(file, None)?; diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index 1f72e0fcb45bf..f77ebc47fa2ae 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -30,76 +30,89 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use rand::Rng; +use rstest::rstest; use std::sync::Arc; use test_utils::{batches_to_vec, partitions_to_sorted_vec}; -#[tokio::test] -#[cfg_attr(tarpaulin, ignore)] -async fn test_sort_1k_mem() { - run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await -} - -#[tokio::test] -#[cfg_attr(tarpaulin, ignore)] -async fn test_sort_100k_mem() { - run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await -} - -#[tokio::test] -async fn test_sort_unlimited_mem() { - run_sort( - usize::MAX, - vec![(5, false), (2000, false), (1000000, false)], - ) - .await -} - /// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` -async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { - for (size, spill) in size_spill { - let input = vec![make_staggered_batches(size)]; - let first_batch = input - .iter() - .flat_map(|p| p.iter()) - .next() - .expect("at least one batch"); - let schema = first_batch.schema(); - - let sort = vec![PhysicalSortExpr { - expr: col("x", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: true, - }, - }]; - - let exec = MemoryExec::try_new(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); - - let runtime_config = RuntimeConfig::new() - .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); - let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); - - let task_ctx = session_ctx.task_ctx(); - let collected = collect(sort.clone(), task_ctx).await.unwrap(); - - let expected = partitions_to_sorted_vec(&input); - let actual = batches_to_vec(&collected); - - if spill { - assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); - } else { - assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); - } +#[rstest] +#[case::mem_10k_5_rows(10240, 5, None, false)] +#[case::mem_10k_20k_rows(10240, 20000, None, true)] +#[case::mem_10k_1m_rows(10240, 1000000, None, true)] +#[case::mem_10k_5_rows_fetch_1(10240, 5, Some(1), false)] +#[case::mem_10k_20k_rows_fetch_1(10240, 20000, Some(1), false)] +#[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), false)] +#[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)] +#[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)] +#[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)] +#[case::mem_100k_5_rows(102400, 5, None, false)] +#[case::mem_100k_20k_rows(102400, 20000, None, false)] +#[case::mem_100k_1m_rows(102400, 1000000, None, true)] +#[case::mem_100k_5_rows_fetch_10(102400, 5, Some(10), false)] +#[case::mem_100k_20k_rows_fetch_10(102400, 20000, Some(10), false)] +#[case::mem_100k_1m_rows_fetch_10(102400, 1000000, Some(10), false)] +#[case::mem_100k_5_rows_fetch_10000(102400, 5, Some(10000), false)] +#[case::mem_100k_20k_rows_fetch_10000(102400, 20000, Some(10000), false)] +#[case::mem_100k_1m_rows_fetch_10000(102400, 1000000, Some(10000), false)] +// Test with mem > 1MB to exercise the intermediate streaming-merge on spills +// (i.e. skip the <1MB concatenation heuristic) +#[case::mem_2m_1m_rows(2097152, 500000, None, true)] +#[case::mem_2m_10m_rows_fetch_100(2097152, 10000000, Some(100), false)] +#[case::mem_inf_5_rows(usize::MAX, 5, None, false)] +#[case::mem_inf_20k_rows(usize::MAX, 20000, None, false)] +#[case::mem_inf_1m_rows(usize::MAX, 1000000, None, false)] +#[tokio::test] +async fn test_sort_spill( + #[case] pool_size: usize, + #[case] size: usize, + #[case] fetch: Option, + #[case] spill: bool, +) { + let input = vec![make_staggered_batches(size)]; + let first_batch = input + .iter() + .flat_map(|p| p.iter()) + .next() + .expect("at least one batch"); + let schema = first_batch.schema(); + + let sort = vec![PhysicalSortExpr { + expr: col("x", &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }]; + + let exec = MemoryExec::try_new(&input, schema, None).unwrap(); + let sort = Arc::new(SortExec::new(sort, Arc::new(exec)).with_fetch(fetch)); + + let runtime_config = + RuntimeConfig::new().with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); + let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); + let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + + let task_ctx = session_ctx.task_ctx(); + let collected = collect(sort.clone(), task_ctx).await.unwrap(); + + let mut expected = partitions_to_sorted_vec(&input); + if let Some(k) = fetch { + expected = expected.into_iter().take(k).collect(); + } + let actual = batches_to_vec(&collected); - assert_eq!( - session_ctx.runtime_env().memory_pool.reserved(), - 0, - "The sort should have returned all memory used back to the memory pool" - ); - assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); + if spill { + assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); + } else { + assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); } + + assert_eq!( + session_ctx.runtime_env().memory_pool.reserved(), + 0, + "The sort should have returned all memory used back to the memory pool" + ); + assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); } /// Return randomly sized record batches in a field named 'x' of type `Int32` diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index a7cff6cbd7581..199d715b12a4d 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -16,12 +16,14 @@ // under the License. //! This module contains tests for limiting memory at runtime in DataFusion +#![allow(clippy::items_after_test_module)] use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::streaming::PartitionStream; use futures::StreamExt; +use rstest::rstest; use std::sync::Arc; use datafusion::datasource::streaming::StreamingTable; @@ -45,17 +47,38 @@ fn init() { let _ = env_logger::try_init(); } +#[rstest] +#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 100_000)] +#[case::cant_spill_to_disk(vec!["Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)"], 200_000)] +#[case::no_oom(vec![], 600_000)] #[tokio::test] -async fn oom_sort() { +async fn sort(#[case] expected_errors: Vec<&str>, #[case] memory_limit: usize) { TestCase::new( "select * from t order by host DESC", - vec![ - "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)", - ], - 200_000, + expected_errors, + memory_limit, + ) + .run() + .await +} + +// We expect to see lower memory thresholds in general when applying a `LIMIT` clause due to eager sorting +#[rstest] +#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)] +#[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)] +#[case::no_oom(vec![], 80_000)] +#[tokio::test] +async fn sort_with_limit( + #[case] expected_errors: Vec<&str>, + #[case] memory_limit: usize, +) { + TestCase::new( + "select * from t order by host DESC limit 10", + expected_errors, + memory_limit, ) - .run() - .await + .run() + .await } #[tokio::test] @@ -267,9 +290,19 @@ impl TestCase { match df.collect().await { Ok(_batches) => { - panic!("Unexpected success when running, expected memory limit failure") + if !expected_errors.is_empty() { + panic!( + "Unexpected success when running, expected memory limit failure" + ) + } } Err(e) => { + if expected_errors.is_empty() { + panic!( + "Unexpected failure when running, expected sufficient memory {e}" + ) + } + for error_substring in expected_errors { assert_contains!(e.to_string(), error_substring); } diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index cd257aaa92de7..4f129eb0e0762 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -2672,8 +2672,8 @@ SELECT ORDER BY ts DESC LIMIT 5; ---- -289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286 +289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289 289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283 286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278 275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275 @@ -3208,8 +3208,8 @@ SELECT ORDER BY C3 LIMIT 5 ---- -0.970671228336 0.970671228336 0.850672105305 0.850672105305 +0.970671228336 0.970671228336 0.152498292972 0.152498292972 0.369363046006 0.369363046006 0.56535284223 0.56535284223 @@ -3257,8 +3257,8 @@ SELECT ORDER BY C3 LIMIT 5 ---- -0.970671228336 0.014793053078 0.850672105305 0.014793053078 +0.970671228336 0.014793053078 0.152498292972 0.014793053078 0.369363046006 0.014793053078 0.56535284223 0.014793053078