From 39d6d1603ac42b5f83f8b4efbe9847398c3a07f1 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 1 Aug 2023 17:55:38 +0200 Subject: [PATCH 01/10] Do sorting eagerly when gathering input batches to benefit from the potential row reduction --- .../core/src/physical_plan/sorts/sort.rs | 108 ++++++------------ 1 file changed, 36 insertions(+), 72 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index f660f0acf89af..a2cd1564f899c 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -19,7 +19,7 @@ //! It will do in-memory sorting if it has enough memory budget //! but spills to disk if needed. -use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter}; +use crate::physical_plan::common::{batch_byte_size, IPCWriter}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, @@ -79,16 +79,15 @@ impl ExternalSorterMetrics { /// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). /// /// The basic architecture of the algorithm: -/// 1. get a non-empty new batch from input +/// 1. get a non-empty new batch from input and sort it /// 2. check with the memory manager if we could buffer the batch in memory /// 2.1 if memory sufficient, then buffer batch in memory, go to 1. -/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. +/// 2.2 if the memory threshold is reached, spill all buffered batches and to file. /// buffer the batch in memory, go to 1. /// 3. when input is exhausted, merge all in memory batches and spills to get a total order. struct ExternalSorter { schema: SchemaRef, in_mem_batches: Vec, - in_mem_batches_sorted: bool, spills: Vec, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, @@ -118,7 +117,6 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], - in_mem_batches_sorted: true, spills: vec![], expr: expr.into(), metrics, @@ -133,34 +131,32 @@ impl ExternalSorter { /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk - async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { + async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> { if input.num_rows() == 0 { return Ok(()); } + // Eagerly sort the batch to potentially reduce the number of rows + // after applying the fetch parameter; first perform a memory reservation + // for the sorting procedure. + let mut reservation = + MemoryConsumer::new(format!("insert_batch{}", self.partition_id)) + .register(&self.runtime.memory_pool); + + // TODO: This should probably be try_grow (#5885) + reservation.resize(input.get_array_memory_size()); + // Maybe we should keep a single batch at all times and perform + // concatenate with the incoming batch + sort instead? + input = sort_batch(&input, &self.expr, self.fetch)?; + reservation.free(); + let size = batch_byte_size(&input); if self.reservation.try_grow(size).is_err() { - let before = self.reservation.size(); - self.in_mem_sort().await?; - // Sorting may have freed memory, especially if fetch is not `None` - // - // As such we check again, and if the memory usage has dropped by - // a factor of 2, and we can allocate the necessary capacity, - // we don't spill - // - // The factor of 2 aims to avoid a degenerate case where the - // memory required for `fetch` is just under the memory available, - // causing repeated re-sorting of data - if self.reservation.size() > before / 2 - || self.reservation.try_grow(size).is_err() - { - self.spill().await?; - self.reservation.try_grow(size)? - } + self.spill().await?; + self.reservation.try_grow(size)? } self.in_mem_batches.push(input); - self.in_mem_batches_sorted = false; Ok(()) } @@ -169,12 +165,12 @@ impl ExternalSorter { } /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - fn sort(&mut self) -> Result { + fn merge(&mut self) -> Result { if self.spilled_before() { let mut streams = vec![]; if !self.in_mem_batches.is_empty() { let in_mem_stream = - self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + self.in_mem_stream(self.metrics.baseline.intermediate())?; streams.push(in_mem_stream); } @@ -192,7 +188,7 @@ impl ExternalSorter { self.fetch, ) } else if !self.in_mem_batches.is_empty() { - let result = self.in_mem_sort_stream(self.metrics.baseline.clone()); + let result = self.in_mem_stream(self.metrics.baseline.clone()); // Report to the memory manager we are no longer using memory self.reservation.free(); result @@ -221,10 +217,12 @@ impl ExternalSorter { debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - self.in_mem_sort().await?; - let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; - let batches = std::mem::take(&mut self.in_mem_batches); + + let batches = self.in_mem_stream(self.metrics.baseline.intermediate())? + .try_collect() + .await?; + spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; let used = self.reservation.free(); self.metrics.spill_count.add(1); @@ -233,30 +231,8 @@ impl ExternalSorter { Ok(used) } - /// Sorts the in_mem_batches in place - async fn in_mem_sort(&mut self) -> Result<()> { - if self.in_mem_batches_sorted { - return Ok(()); - } - - self.in_mem_batches = self - .in_mem_sort_stream(self.metrics.baseline.intermediate())? - .try_collect() - .await?; - - let size: usize = self - .in_mem_batches - .iter() - .map(|x| x.get_array_memory_size()) - .sum(); - - self.reservation.resize(size); - self.in_mem_batches_sorted = true; - Ok(()) - } - - /// Consumes in_mem_batches returning a sorted stream - fn in_mem_sort_stream( + /// Consumes in_mem_batches returning a stream + fn in_mem_stream( &mut self, metrics: BaselineMetrics, ) -> Result { @@ -275,14 +251,15 @@ impl ExternalSorter { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - return self.sort_batch_stream(batch, metrics); + let output = sort_batch(&batch, &self.expr, self.fetch)?; + return self.sort_batch_stream(output, metrics); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) + Ok(self.sort_batch_stream(batch, metrics)?) }) .collect::>()?; @@ -302,22 +279,9 @@ impl ExternalSorter { metrics: BaselineMetrics, ) -> Result { let schema = batch.schema(); - - let mut reservation = - MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(batch.get_array_memory_size()); - - let fetch = self.fetch; - let expressions = self.expr.clone(); let stream = futures::stream::once(futures::future::lazy(move |_| { - let sorted = sort_batch(&batch, &expressions, fetch)?; - metrics.record_output(sorted.num_rows()); - drop(batch); - reservation.free(); - Ok(sorted) + metrics.record_output(batch.num_rows()); + Ok(batch) })); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } @@ -629,7 +593,7 @@ impl ExecutionPlan for SortExec { let batch = batch?; sorter.insert_batch(batch).await?; } - sorter.sort() + sorter.merge() }) .try_flatten(), ))) From dcba28fe425da7abf3906ecd32a26d7c795932ee Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 2 Aug 2023 18:16:36 +0200 Subject: [PATCH 02/10] Default to old way of external sorting in the absence of a LIMIT clause --- .../core/src/physical_plan/sorts/sort.rs | 160 +++++++++++++----- 1 file changed, 115 insertions(+), 45 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index a2cd1564f899c..0c72356d2dba3 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -19,7 +19,7 @@ //! It will do in-memory sorting if it has enough memory budget //! but spills to disk if needed. -use crate::physical_plan::common::{batch_byte_size, IPCWriter}; +use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, @@ -79,15 +79,15 @@ impl ExternalSorterMetrics { /// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). /// /// The basic architecture of the algorithm: -/// 1. get a non-empty new batch from input and sort it +/// 1. get a non-empty new batch from input /// 2. check with the memory manager if we could buffer the batch in memory /// 2.1 if memory sufficient, then buffer batch in memory, go to 1. -/// 2.2 if the memory threshold is reached, spill all buffered batches and to file. +/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. /// buffer the batch in memory, go to 1. /// 3. when input is exhausted, merge all in memory batches and spills to get a total order. struct ExternalSorter { schema: SchemaRef, - in_mem_batches: Vec, + in_mem_batches: Vec<(bool, RecordBatch)>, spills: Vec, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, @@ -136,27 +136,46 @@ impl ExternalSorter { return Ok(()); } - // Eagerly sort the batch to potentially reduce the number of rows - // after applying the fetch parameter; first perform a memory reservation - // for the sorting procedure. - let mut reservation = - MemoryConsumer::new(format!("insert_batch{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(input.get_array_memory_size()); - // Maybe we should keep a single batch at all times and perform - // concatenate with the incoming batch + sort instead? - input = sort_batch(&input, &self.expr, self.fetch)?; - reservation.free(); + let mut batch_sorted = false; + if self.fetch.map_or(false, |f| f < input.num_rows()) { + // Eagerly sort the batch to potentially reduce the number of rows + // after applying the fetch parameter; first perform a memory reservation + // for the sorting procedure. + let mut reservation = + MemoryConsumer::new(format!("insert_batch{}", self.partition_id)) + .register(&self.runtime.memory_pool); + + // TODO: This should probably be try_grow (#5885) + reservation.resize(input.get_array_memory_size()); + // Maybe we should keep a single batch at all times and perform + // concatenate with the incoming batch + sort instead? + input = sort_batch(&input, &self.expr, self.fetch)?; + reservation.free(); + batch_sorted = true; + } let size = batch_byte_size(&input); if self.reservation.try_grow(size).is_err() { - self.spill().await?; - self.reservation.try_grow(size)? + let before = self.reservation.size(); + self.in_mem_sort().await?; + // Sorting may have freed memory, especially if fetch is not `None` + // + // As such we check again, and if the memory usage has dropped by + // a factor of 2, and we can allocate the necessary capacity, + // we don't spill + // + // The factor of 2 aims to avoid a degenerate case where the + // memory required for `fetch` is just under the memory available, + // causing repeated re-sorting of data + if self.reservation.size() > before / 2 + || self.reservation.try_grow(size).is_err() + { + self.spill().await?; + self.reservation.try_grow(size)? + } } - self.in_mem_batches.push(input); + self.in_mem_batches.push((batch_sorted, input)); Ok(()) } @@ -165,12 +184,12 @@ impl ExternalSorter { } /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - fn merge(&mut self) -> Result { + fn sort(&mut self) -> Result { if self.spilled_before() { let mut streams = vec![]; if !self.in_mem_batches.is_empty() { let in_mem_stream = - self.in_mem_stream(self.metrics.baseline.intermediate())?; + self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; streams.push(in_mem_stream); } @@ -188,7 +207,7 @@ impl ExternalSorter { self.fetch, ) } else if !self.in_mem_batches.is_empty() { - let result = self.in_mem_stream(self.metrics.baseline.clone()); + let result = self.in_mem_sort_stream(self.metrics.baseline.clone()); // Report to the memory manager we are no longer using memory self.reservation.free(); result @@ -217,11 +236,13 @@ impl ExternalSorter { debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); + self.in_mem_sort().await?; + let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?; - let batches = self.in_mem_stream(self.metrics.baseline.intermediate())? - .try_collect() - .await?; + let (sorted, batches): (Vec, Vec) = + std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); + assert_eq!(sorted.iter().all(|&s| s), true); spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; let used = self.reservation.free(); @@ -231,15 +252,39 @@ impl ExternalSorter { Ok(used) } - /// Consumes in_mem_batches returning a stream - fn in_mem_stream( + /// Sorts the in_mem_batches in place + async fn in_mem_sort(&mut self) -> Result<()> { + if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) { + return Ok(()); + } + + self.in_mem_batches = self + .in_mem_sort_stream(self.metrics.baseline.intermediate())? + .try_collect::>() + .await? + .into_iter() + .map(|batch| (true, batch)) + .collect(); + + let size: usize = self + .in_mem_batches + .iter() + .map(|(_, x)| x.get_array_memory_size()) + .sum(); + + self.reservation.resize(size); + Ok(()) + } + + /// Consumes in_mem_batches returning a sorted stream + fn in_mem_sort_stream( &mut self, metrics: BaselineMetrics, ) -> Result { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.remove(0); - let stream = self.sort_batch_stream(batch, metrics)?; + let (sorted, batch) = self.in_mem_batches.remove(0); + let stream = self.sort_batch_stream(batch, sorted, metrics)?; self.in_mem_batches.clear(); return Ok(stream); } @@ -249,17 +294,22 @@ impl ExternalSorter { // This is a very rough heuristic and likely could be refined further if self.reservation.size() < 1048576 { // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; + let (sorted, batches): (Vec, Vec) = + std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); + let batch = concat_batches(&self.schema, &batches)?; + let sorted = sorted.iter().all(|&s| s); self.in_mem_batches.clear(); - let output = sort_batch(&batch, &self.expr, self.fetch)?; - return self.sort_batch_stream(output, metrics); + return self.sort_batch_stream(batch, sorted, metrics); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() - .map(|batch| { + .map(|(sorted, batch)| { let metrics = self.metrics.baseline.intermediate(); - Ok(self.sort_batch_stream(batch, metrics)?) + Ok(spawn_buffered( + self.sort_batch_stream(batch, sorted, metrics)?, + 1, + )) }) .collect::>()?; @@ -276,14 +326,34 @@ impl ExternalSorter { fn sort_batch_stream( &self, batch: RecordBatch, + sorted: bool, metrics: BaselineMetrics, ) -> Result { let schema = batch.schema(); - let stream = futures::stream::once(futures::future::lazy(move |_| { - metrics.record_output(batch.num_rows()); - Ok(batch) - })); - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + + if !sorted { + // Reserve some memory for sorting the batch + let mut reservation = + MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) + .register(&self.runtime.memory_pool); + + // TODO: This should probably be try_grow (#5885) + reservation.resize(batch.get_array_memory_size()); + + let fetch = self.fetch; + let expressions = self.expr.clone(); + let stream = futures::stream::once(futures::future::lazy(move |_| { + let output = sort_batch(&batch, &expressions, fetch)?; + metrics.record_output(output.num_rows()); + drop(batch); + reservation.free(); + Ok(output) + })); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } else { + let stream = futures::stream::once(futures::future::lazy(move |_| Ok(batch))); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } } } @@ -424,8 +494,8 @@ impl SortExec { /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan #[deprecated( - since = "22.0.0", - note = "use `new`, `with_fetch` and `with_preserve_partioning` instead" + since = "22.0.0", + note = "use `new`, `with_fetch` and `with_preserve_partioning` instead" )] pub fn new_with_partitioning( expr: Vec, @@ -593,9 +663,9 @@ impl ExecutionPlan for SortExec { let batch = batch?; sorter.insert_batch(batch).await?; } - sorter.merge() + sorter.sort() }) - .try_flatten(), + .try_flatten(), ))) } @@ -799,7 +869,7 @@ mod tests { ], Arc::new(CoalescePartitionsExec::new(csv)), ) - .with_fetch(fetch), + .with_fetch(fetch), ); let task_ctx = session_ctx.task_ctx(); From c4a40f2aa71e1e62e4f98852cffbbc7d7aab1684 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 3 Aug 2023 13:02:44 +0200 Subject: [PATCH 03/10] Fix the edge case of sorting multiple batches with less than 1MB of combined size --- .../core/src/physical_plan/sorts/sort.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 0c72356d2dba3..0d37bae8ec3b2 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -147,8 +147,7 @@ impl ExternalSorter { // TODO: This should probably be try_grow (#5885) reservation.resize(input.get_array_memory_size()); - // Maybe we should keep a single batch at all times and perform - // concatenate with the incoming batch + sort instead? + // Maybe we should perform sorting in a parallel task to unblock the caller input = sort_batch(&input, &self.expr, self.fetch)?; reservation.free(); batch_sorted = true; @@ -242,7 +241,7 @@ impl ExternalSorter { let (sorted, batches): (Vec, Vec) = std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); - assert_eq!(sorted.iter().all(|&s| s), true); + assert!(sorted.iter().all(|&s| s)); spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?; let used = self.reservation.free(); @@ -294,12 +293,13 @@ impl ExternalSorter { // This is a very rough heuristic and likely could be refined further if self.reservation.size() < 1048576 { // Concatenate memory batches together and sort - let (sorted, batches): (Vec, Vec) = + let (_, batches): (Vec, Vec) = std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); let batch = concat_batches(&self.schema, &batches)?; - let sorted = sorted.iter().all(|&s| s); self.in_mem_batches.clear(); - return self.sort_batch_stream(batch, sorted, metrics); + // Even if all individual batches were themselves sorted the resulting concatenated one + // isn't guaranteed to be sorted, so we must perform sorting on the stream. + return self.sort_batch_stream(batch, false, metrics); } let streams = std::mem::take(&mut self.in_mem_batches) @@ -494,8 +494,8 @@ impl SortExec { /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan #[deprecated( - since = "22.0.0", - note = "use `new`, `with_fetch` and `with_preserve_partioning` instead" + since = "22.0.0", + note = "use `new`, `with_fetch` and `with_preserve_partioning` instead" )] pub fn new_with_partitioning( expr: Vec, @@ -665,7 +665,7 @@ impl ExecutionPlan for SortExec { } sorter.sort() }) - .try_flatten(), + .try_flatten(), ))) } @@ -869,7 +869,7 @@ mod tests { ], Arc::new(CoalescePartitionsExec::new(csv)), ) - .with_fetch(fetch), + .with_fetch(fetch), ); let task_ctx = session_ctx.task_ctx(); From ca0786f181c67b53bd55934d7bf0ba8ecfd54976 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 3 Aug 2023 17:30:26 +0200 Subject: [PATCH 04/10] Consolidate and extend the sort-spill fuzz tests --- .../core/src/physical_plan/sorts/sort.rs | 10 +- .../core/tests/fuzz_cases/order_spill_fuzz.rs | 137 ++++++++++-------- 2 files changed, 81 insertions(+), 66 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 0d37bae8ec3b2..d692bb98c177a 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -147,7 +147,6 @@ impl ExternalSorter { // TODO: This should probably be try_grow (#5885) reservation.resize(input.get_array_memory_size()); - // Maybe we should perform sorting in a parallel task to unblock the caller input = sort_batch(&input, &self.expr, self.fetch)?; reservation.free(); batch_sorted = true; @@ -253,7 +252,14 @@ impl ExternalSorter { /// Sorts the in_mem_batches in place async fn in_mem_sort(&mut self) -> Result<()> { - if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) { + if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) && self.fetch.is_none() { + // Do not sort if all the in-mem batches are sorted _and_ there was no `fetch` specified. + // If a `fetch` was specified we could hit a pathological case even if all the batches + // are sorted whereby we have ~100 batches with 1 row each (in case of `LIMIT 1`), and + // it turns out this is a problem when reading from the spills: + // `Failure while reading spill file: NamedTempFile("/var..."). Error: Execution error: channel closed` + // Even if a larger `fetch` was used we would likely benefit from merging the individual + // batches together during sort. return Ok(()); } diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index 1f72e0fcb45bf..c2e7322465d0e 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -30,76 +30,85 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use rand::Rng; +use rstest::rstest; use std::sync::Arc; use test_utils::{batches_to_vec, partitions_to_sorted_vec}; -#[tokio::test] -#[cfg_attr(tarpaulin, ignore)] -async fn test_sort_1k_mem() { - run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await -} - -#[tokio::test] -#[cfg_attr(tarpaulin, ignore)] -async fn test_sort_100k_mem() { - run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await -} - -#[tokio::test] -async fn test_sort_unlimited_mem() { - run_sort( - usize::MAX, - vec![(5, false), (2000, false), (1000000, false)], - ) - .await -} - /// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` -async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { - for (size, spill) in size_spill { - let input = vec![make_staggered_batches(size)]; - let first_batch = input - .iter() - .flat_map(|p| p.iter()) - .next() - .expect("at least one batch"); - let schema = first_batch.schema(); - - let sort = vec![PhysicalSortExpr { - expr: col("x", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: true, - }, - }]; - - let exec = MemoryExec::try_new(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); - - let runtime_config = RuntimeConfig::new() - .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); - let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); - - let task_ctx = session_ctx.task_ctx(); - let collected = collect(sort.clone(), task_ctx).await.unwrap(); - - let expected = partitions_to_sorted_vec(&input); - let actual = batches_to_vec(&collected); - - if spill { - assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); - } else { - assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); - } +#[rstest] +#[case::mem_10k_5_rows(10240, 5, None, false)] +#[case::mem_10k_20k_rows(10240, 20000, None, true)] +#[case::mem_10k_1m_rows(10240, 1000000, None, true)] +#[case::mem_10k_5_rows_fetch_10(10240, 5, Some(10), false)] +#[case::mem_10k_20k_rows_fetch_10(10240, 20000, Some(10), false)] +#[case::mem_10k_1m_rows_fetch_10(10240, 1000000, Some(10), false)] +#[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)] +#[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)] +#[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)] +#[case::mem_100k_5_rows(102400, 5, None, false)] +#[case::mem_100k_20k_rows(102400, 20000, None, false)] +#[case::mem_100k_1m_rows(102400, 1000000, None, true)] +#[case::mem_100k_5_rows_fetch_10(102400, 5, Some(10), false)] +#[case::mem_100k_20k_rows_fetch_10(102400, 20000, Some(10), false)] +#[case::mem_100k_1m_rows_fetch_10(102400, 1000000, Some(10), false)] +#[case::mem_100k_5_rows_fetch_1000(102400, 5, Some(1000), false)] +#[case::mem_100k_20k_rows_fetch_1000(102400, 20000, Some(1000), false)] +#[case::mem_100k_1m_rows_fetch_1000(102400, 1000000, Some(1000), false)] +#[case::mem_inf_5_rows(usize::MAX, 5, None, false)] +#[case::mem_inf_20k_rows(usize::MAX, 20000, None, false)] +#[case::mem_inf_1m_rows(usize::MAX, 1000000, None, false)] +#[tokio::test] +async fn test_sort_spill( + #[case] pool_size: usize, + #[case] size: usize, + #[case] fetch: Option, + #[case] spill: bool, +) { + let input = vec![make_staggered_batches(size)]; + let first_batch = input + .iter() + .flat_map(|p| p.iter()) + .next() + .expect("at least one batch"); + let schema = first_batch.schema(); + + let sort = vec![PhysicalSortExpr { + expr: col("x", &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }]; + + let exec = MemoryExec::try_new(&input, schema, None).unwrap(); + let sort = Arc::new(SortExec::new(sort, Arc::new(exec)).with_fetch(fetch)); + + let runtime_config = + RuntimeConfig::new().with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); + let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); + let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + + let task_ctx = session_ctx.task_ctx(); + let collected = collect(sort.clone(), task_ctx).await.unwrap(); + + let mut expected = partitions_to_sorted_vec(&input); + if let Some(k) = fetch { + expected = expected.into_iter().take(k).collect(); + } + let actual = batches_to_vec(&collected); - assert_eq!( - session_ctx.runtime_env().memory_pool.reserved(), - 0, - "The sort should have returned all memory used back to the memory pool" - ); - assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); + if spill { + assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); + } else { + assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); } + + assert_eq!( + session_ctx.runtime_env().memory_pool.reserved(), + 0, + "The sort should have returned all memory used back to the memory pool" + ); + assert_eq!(expected, actual, "failure in @ pool_size {pool_size}"); } /// Return randomly sized record batches in a field named 'x' of type `Int32` From 7b1a4d2246d4d46bb0c2ec86b60ee705603a5f1b Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 4 Aug 2023 14:15:05 +0200 Subject: [PATCH 05/10] Extend memory limit tests for sort with a limit --- .../core/src/physical_plan/sorts/sort.rs | 16 +++--- .../core/tests/fuzz_cases/order_spill_fuzz.rs | 12 ++--- datafusion/core/tests/memory_limit.rs | 49 ++++++++++++++++--- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 03ede2351da6f..71a2e9fe94d12 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -196,8 +196,8 @@ impl ExternalSorterMetrics { struct ExternalSorter { /// schema of the output (and the input) schema: SchemaRef, - /// A vector of tuples, considting of a flag denoting whether - /// the batch is sorted, and the batch itself + /// A vector of tuples, with each tuple consisting of a flag + /// denoting whether the batch is sorted, and the batch itself in_mem_batches: Vec<(bool, RecordBatch)>, /// If data has previously been spilled, the locations of the /// spill files (in Arrow IPC format) @@ -387,14 +387,18 @@ impl ExternalSorter { /// Sorts the in_mem_batches in place async fn in_mem_sort(&mut self) -> Result<()> { - if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) && self.fetch.is_none() { + if self.in_mem_batches.is_empty() + || self.in_mem_batches.iter().all(|(sorted, _)| *sorted) + && self.fetch.is_none() + { // Do not sort if all the in-mem batches are sorted _and_ there was no `fetch` specified. // If a `fetch` was specified we could hit a pathological case even if all the batches - // are sorted whereby we have ~100 batches with 1 row each (in case of `LIMIT 1`), and - // it turns out this is a problem when reading from the spills: + // are sorted whereby we have ~100 in-mem batches with 1 row each (in case of `LIMIT 1`), + // and then if this gets spilled to disk it turns out this is a problem when reading + // a series of 1-row batches from the spill: // `Failure while reading spill file: NamedTempFile("/var..."). Error: Execution error: channel closed` // Even if a larger `fetch` was used we would likely benefit from merging the individual - // batches together during sort. + // truncated batches together during sort. return Ok(()); } diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index c2e7322465d0e..bc89f68351fb3 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -39,18 +39,18 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec}; #[case::mem_10k_5_rows(10240, 5, None, false)] #[case::mem_10k_20k_rows(10240, 20000, None, true)] #[case::mem_10k_1m_rows(10240, 1000000, None, true)] -#[case::mem_10k_5_rows_fetch_10(10240, 5, Some(10), false)] -#[case::mem_10k_20k_rows_fetch_10(10240, 20000, Some(10), false)] -#[case::mem_10k_1m_rows_fetch_10(10240, 1000000, Some(10), false)] +#[case::mem_10k_5_rows_fetch_1(10240, 5, Some(1), false)] +#[case::mem_10k_20k_rows_fetch_1(10240, 20000, Some(1), false)] +#[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), false)] #[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)] #[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)] #[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)] #[case::mem_100k_5_rows(102400, 5, None, false)] #[case::mem_100k_20k_rows(102400, 20000, None, false)] #[case::mem_100k_1m_rows(102400, 1000000, None, true)] -#[case::mem_100k_5_rows_fetch_10(102400, 5, Some(10), false)] -#[case::mem_100k_20k_rows_fetch_10(102400, 20000, Some(10), false)] -#[case::mem_100k_1m_rows_fetch_10(102400, 1000000, Some(10), false)] +#[case::mem_100k_5_rows_fetch_1(102400, 5, Some(1), false)] +#[case::mem_100k_20k_rows_fetch_1(102400, 20000, Some(1), false)] +#[case::mem_100k_1m_rows_fetch_1(102400, 1000000, Some(1), false)] #[case::mem_100k_5_rows_fetch_1000(102400, 5, Some(1000), false)] #[case::mem_100k_20k_rows_fetch_1000(102400, 20000, Some(1000), false)] #[case::mem_100k_1m_rows_fetch_1000(102400, 1000000, Some(1000), false)] diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index a7cff6cbd7581..199d715b12a4d 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -16,12 +16,14 @@ // under the License. //! This module contains tests for limiting memory at runtime in DataFusion +#![allow(clippy::items_after_test_module)] use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::streaming::PartitionStream; use futures::StreamExt; +use rstest::rstest; use std::sync::Arc; use datafusion::datasource::streaming::StreamingTable; @@ -45,17 +47,38 @@ fn init() { let _ = env_logger::try_init(); } +#[rstest] +#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 100_000)] +#[case::cant_spill_to_disk(vec!["Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)"], 200_000)] +#[case::no_oom(vec![], 600_000)] #[tokio::test] -async fn oom_sort() { +async fn sort(#[case] expected_errors: Vec<&str>, #[case] memory_limit: usize) { TestCase::new( "select * from t order by host DESC", - vec![ - "Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)", - ], - 200_000, + expected_errors, + memory_limit, + ) + .run() + .await +} + +// We expect to see lower memory thresholds in general when applying a `LIMIT` clause due to eager sorting +#[rstest] +#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)] +#[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)] +#[case::no_oom(vec![], 80_000)] +#[tokio::test] +async fn sort_with_limit( + #[case] expected_errors: Vec<&str>, + #[case] memory_limit: usize, +) { + TestCase::new( + "select * from t order by host DESC limit 10", + expected_errors, + memory_limit, ) - .run() - .await + .run() + .await } #[tokio::test] @@ -267,9 +290,19 @@ impl TestCase { match df.collect().await { Ok(_batches) => { - panic!("Unexpected success when running, expected memory limit failure") + if !expected_errors.is_empty() { + panic!( + "Unexpected success when running, expected memory limit failure" + ) + } } Err(e) => { + if expected_errors.is_empty() { + panic!( + "Unexpected failure when running, expected sufficient memory {e}" + ) + } + for error_substring in expected_errors { assert_contains!(e.to_string(), error_substring); } From 24d835983c13f36f31b7c165fc53a51cf5d237f7 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Sat, 5 Aug 2023 09:19:27 +0200 Subject: [PATCH 06/10] Skip in memory sorting if there is only one already sorted batch This holds for both no-fetch and fetch cases equally, unlike the case of multiple sorted batches. --- datafusion/core/src/physical_plan/sorts/sort.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 71a2e9fe94d12..b6c3be91e7857 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -387,9 +387,11 @@ impl ExternalSorter { /// Sorts the in_mem_batches in place async fn in_mem_sort(&mut self) -> Result<()> { - if self.in_mem_batches.is_empty() - || self.in_mem_batches.iter().all(|(sorted, _)| *sorted) - && self.fetch.is_none() + let batch_count = self.in_mem_batches.len(); + let all_batches_sorted = self.in_mem_batches.iter().all(|(sorted, _)| *sorted); + if batch_count == 0 + || batch_count == 1 && all_batches_sorted + || all_batches_sorted && self.fetch.is_none() { // Do not sort if all the in-mem batches are sorted _and_ there was no `fetch` specified. // If a `fetch` was specified we could hit a pathological case even if all the batches From 4ce818488198799d32be380ccf1abd886ee7e40e Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Sat, 5 Aug 2023 20:42:19 +0200 Subject: [PATCH 07/10] Alter the sqllogictests window test output to match the new one The new output has swapped frist two rows since the sort column value is the same and with LIMIT in place the sort in unstable. --- datafusion/core/src/physical_plan/sorts/sort.rs | 16 ++++++---------- .../tests/sqllogictests/test_files/window.slt | 4 ++-- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index b6c3be91e7857..0c445c70eadb0 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -85,6 +85,10 @@ impl ExternalSorterMetrics { /// /// 1. get a non-empty new batch from input /// +/// 1.2. if a `fetch` parameter has been provided, and the batch size +/// is larger than `fetch`, sort the incoming batch in order to +/// reduce its size and thus use less memory. +/// /// 2. check with the memory manager there is sufficient space to /// buffer the batch in memory 2.1 if memory sufficient, buffer /// batch in memory, go to 1. @@ -259,16 +263,8 @@ impl ExternalSorter { let mut batch_sorted = false; if self.fetch.map_or(false, |f| f < input.num_rows()) { // Eagerly sort the batch to potentially reduce the number of rows - // after applying the fetch parameter; first perform a memory reservation - // for the sorting procedure. - let mut reservation = - MemoryConsumer::new(format!("insert_batch{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(input.get_array_memory_size()); + // after applying the fetch parameter. input = sort_batch(&input, &self.expr, self.fetch)?; - reservation.free(); batch_sorted = true; } @@ -358,7 +354,7 @@ impl ExternalSorter { } /// Writes any `in_memory_batches` to a spill file and clears - /// the batches. The contents of the spil file are sorted. + /// the batches. The contents of the spill file are sorted. /// /// Returns the amount of memory freed. async fn spill(&mut self) -> Result { diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index cd257aaa92de7..2df635a63a002 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -3208,8 +3208,8 @@ SELECT ORDER BY C3 LIMIT 5 ---- -0.970671228336 0.970671228336 0.850672105305 0.850672105305 +0.970671228336 0.970671228336 0.152498292972 0.152498292972 0.369363046006 0.369363046006 0.56535284223 0.56535284223 @@ -3257,8 +3257,8 @@ SELECT ORDER BY C3 LIMIT 5 ---- -0.970671228336 0.014793053078 0.850672105305 0.014793053078 +0.970671228336 0.014793053078 0.152498292972 0.014793053078 0.369363046006 0.014793053078 0.56535284223 0.014793053078 From a22cc714ae82221efda3bc21916ce753b05cf350 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 7 Aug 2023 23:00:46 +0200 Subject: [PATCH 08/10] Stream individual batches from the spill files separately Even when a fetch value is specified do not try to merge-stream sorted in-mem batches during the sorting procedure,\n as this seems to introduce time regressions. --- .../core/src/physical_plan/sorts/sort.rs | 79 ++++++++++--------- .../core/tests/fuzz_cases/order_spill_fuzz.rs | 14 ++-- datafusion/core/tests/memory_limit.rs | 2 +- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 0c445c70eadb0..566fbdd20b66c 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -35,6 +35,7 @@ use arrow::compute::{concat_batches, lexsort_to_indices, take}; use arrow::datatypes::SchemaRef; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; +use arrow_schema::ArrowError; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::memory_pool::{ human_readable_size, MemoryConsumer, MemoryReservation, @@ -45,6 +46,7 @@ use datafusion_physical_expr::EquivalenceProperties; use futures::{StreamExt, TryStreamExt}; use log::{debug, error, trace}; use std::any::Any; +use std::ffi::OsString; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::File; @@ -52,7 +54,6 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::NamedTempFile; -use tokio::sync::mpsc::Sender; use tokio::task; struct ExternalSorterMetrics { @@ -316,8 +317,9 @@ impl ExternalSorter { } for spill in self.spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone())?; - streams.push(stream); + let spill_streams = + read_spill_as_streams(spill.path(), self.schema.clone())?; + streams.extend(spill_streams); } streaming_merge( @@ -385,18 +387,7 @@ impl ExternalSorter { async fn in_mem_sort(&mut self) -> Result<()> { let batch_count = self.in_mem_batches.len(); let all_batches_sorted = self.in_mem_batches.iter().all(|(sorted, _)| *sorted); - if batch_count == 0 - || batch_count == 1 && all_batches_sorted - || all_batches_sorted && self.fetch.is_none() - { - // Do not sort if all the in-mem batches are sorted _and_ there was no `fetch` specified. - // If a `fetch` was specified we could hit a pathological case even if all the batches - // are sorted whereby we have ~100 in-mem batches with 1 row each (in case of `LIMIT 1`), - // and then if this gets spilled to disk it turns out this is a problem when reading - // a series of 1-row batches from the spill: - // `Failure while reading spill file: NamedTempFile("/var..."). Error: Execution error: channel closed` - // Even if a larger `fetch` was used we would likely benefit from merging the individual - // truncated batches together during sort. + if batch_count == 0 || all_batches_sorted { return Ok(()); } @@ -604,22 +595,6 @@ async fn spill_sorted_batches( } } -fn read_spill_as_stream( - path: NamedTempFile, - schema: SchemaRef, -) -> Result { - let mut builder = RecordBatchReceiverStream::builder(schema, 2); - let sender = builder.tx(); - - builder.spawn_blocking(move || { - if let Err(e) = read_spill(sender, path.path()) { - error!("Failure while reading spill file: {:?}. Error: {}", path, e); - } - }); - - Ok(builder.build()) -} - fn write_sorted( batches: Vec, path: PathBuf, @@ -639,15 +614,47 @@ fn write_sorted( Ok(()) } -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { +/// Stream batches from spill files. +/// +/// Each spill file has one or more batches. Intra-batch order is guaranteed (each one is sorted), +/// but the inter-batch ordering is not guaranteed, hence why we need to convert each batch from the +/// spill to a separate input stream for the merge-sort procedure. +fn read_spill_as_streams( + path: &Path, + schema: SchemaRef, +) -> Result> { let file = BufReader::new(File::open(path)?); let reader = FileReader::try_new(file, None)?; + + let mut streams = vec![]; + let file_path = path.as_os_str().to_os_string(); for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| DataFusionError::Execution(format!("{e}")))?; + let stream = build_receiver_stream(batch, schema.clone(), file_path.clone()); + streams.push(stream); } - Ok(()) + Ok(streams) +} + +fn build_receiver_stream( + maybe_batch: Result, + schema: SchemaRef, + file_path: OsString, +) -> SendableRecordBatchStream { + let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 2); + let sender = builder.tx(); + + builder.spawn_blocking(move || { + if let Err(e) = sender + .blocking_send(maybe_batch.map_err(Into::into)) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + { + error!( + "Failure while reading spill file: {:?}. Error: {}", + file_path, e + ); + } + }); + builder.build() } /// Sort execution plan. diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index bc89f68351fb3..98075c675878f 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -41,19 +41,19 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec}; #[case::mem_10k_1m_rows(10240, 1000000, None, true)] #[case::mem_10k_5_rows_fetch_1(10240, 5, Some(1), false)] #[case::mem_10k_20k_rows_fetch_1(10240, 20000, Some(1), false)] -#[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), false)] +#[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), true)] #[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)] #[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)] #[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)] #[case::mem_100k_5_rows(102400, 5, None, false)] #[case::mem_100k_20k_rows(102400, 20000, None, false)] #[case::mem_100k_1m_rows(102400, 1000000, None, true)] -#[case::mem_100k_5_rows_fetch_1(102400, 5, Some(1), false)] -#[case::mem_100k_20k_rows_fetch_1(102400, 20000, Some(1), false)] -#[case::mem_100k_1m_rows_fetch_1(102400, 1000000, Some(1), false)] -#[case::mem_100k_5_rows_fetch_1000(102400, 5, Some(1000), false)] -#[case::mem_100k_20k_rows_fetch_1000(102400, 20000, Some(1000), false)] -#[case::mem_100k_1m_rows_fetch_1000(102400, 1000000, Some(1000), false)] +#[case::mem_100k_5_rows_fetch_10(102400, 5, Some(10), false)] +#[case::mem_100k_20k_rows_fetch_10(102400, 20000, Some(10), false)] +#[case::mem_100k_1m_rows_fetch_10(102400, 1000000, Some(10), false)] +#[case::mem_100k_5_rows_fetch_10000(102400, 5, Some(10000), false)] +#[case::mem_100k_20k_rows_fetch_10000(102400, 20000, Some(10000), false)] +#[case::mem_100k_1m_rows_fetch_10000(102400, 1000000, Some(10000), false)] #[case::mem_inf_5_rows(usize::MAX, 5, None, false)] #[case::mem_inf_20k_rows(usize::MAX, 20000, None, false)] #[case::mem_inf_1m_rows(usize::MAX, 1000000, None, false)] diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index 199d715b12a4d..27d768748e265 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -66,7 +66,7 @@ async fn sort(#[case] expected_errors: Vec<&str>, #[case] memory_limit: usize) { #[rstest] #[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)] #[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)] -#[case::no_oom(vec![], 80_000)] +//#[case::no_oom(vec![], 80_000)] #[tokio::test] async fn sort_with_limit( #[case] expected_errors: Vec<&str>, From fff49abd8f87622dd14b68cee4c98d45eff3e142 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 8 Aug 2023 17:14:39 +0200 Subject: [PATCH 09/10] Bring back streaming of entire spills in a single stream This is to cover the case when the inter-batch order is guaranteed. --- .../core/src/physical_plan/sorts/sort.rs | 126 ++++++++++++------ .../core/tests/fuzz_cases/order_spill_fuzz.rs | 4 + .../tests/sqllogictests/test_files/window.slt | 2 +- 3 files changed, 89 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 566fbdd20b66c..02aa1b8f2d5b8 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -35,7 +35,6 @@ use arrow::compute::{concat_batches, lexsort_to_indices, take}; use arrow::datatypes::SchemaRef; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; -use arrow_schema::ArrowError; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::memory_pool::{ human_readable_size, MemoryConsumer, MemoryReservation, @@ -46,7 +45,6 @@ use datafusion_physical_expr::EquivalenceProperties; use futures::{StreamExt, TryStreamExt}; use log::{debug, error, trace}; use std::any::Any; -use std::ffi::OsString; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::File; @@ -54,6 +52,7 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::NamedTempFile; +use tokio::sync::mpsc::Sender; use tokio::task; struct ExternalSorterMetrics { @@ -204,9 +203,15 @@ struct ExternalSorter { /// A vector of tuples, with each tuple consisting of a flag /// denoting whether the batch is sorted, and the batch itself in_mem_batches: Vec<(bool, RecordBatch)>, - /// If data has previously been spilled, the locations of the - /// spill files (in Arrow IPC format) - spills: Vec, + /// A flag denoting whether the inter-batch order is guaranteed; + /// note that this is a stronger signal than just having all + /// individual batches sorted—it means that we can stream the + /// entire vector of batches inside one stream for the merge-sort + inter_batch_order: bool, + /// If data has previously been spilled, a vector of tuples: + /// 0. flag denoting whether the inter-batch order was true on spill + /// 1. the location of the spill file (in Arrow IPC format) + spills: Vec<(bool, NamedTempFile)>, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, /// Runtime metrics @@ -242,6 +247,7 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], + inter_batch_order: false, spills: vec![], expr: expr.into(), metrics, @@ -262,9 +268,13 @@ impl ExternalSorter { } let mut batch_sorted = false; - if self.fetch.map_or(false, |f| f < input.num_rows()) { + if self + .fetch + .map_or(false, |f| f <= input.num_rows() && f <= 100) + { // Eagerly sort the batch to potentially reduce the number of rows // after applying the fetch parameter. + // Currently only applied for fetch of 100 rows or less. input = sort_batch(&input, &self.expr, self.fetch)?; batch_sorted = true; } @@ -291,6 +301,7 @@ impl ExternalSorter { } self.in_mem_batches.push((batch_sorted, input)); + self.inter_batch_order = false; Ok(()) } @@ -316,10 +327,17 @@ impl ExternalSorter { streams.push(in_mem_stream); } - for spill in self.spills.drain(..) { - let spill_streams = - read_spill_as_streams(spill.path(), self.schema.clone())?; - streams.extend(spill_streams); + for (inter_batch_order, spill) in self.spills.drain(..) { + if inter_batch_order { + // We can read all the batches into a single stream + let spill_stream = read_spill_as_stream(spill, self.schema.clone())?; + streams.push(spill_stream); + } else { + // We need to assign each batch to a separate stream + let spill_streams = + read_spill_as_streams(spill.path(), self.schema.clone())?; + streams.extend(spill_streams); + } } streaming_merge( @@ -379,7 +397,7 @@ impl ExternalSorter { let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); - self.spills.push(spillfile); + self.spills.push((self.inter_batch_order, spillfile)); Ok(used) } @@ -398,6 +416,8 @@ impl ExternalSorter { .into_iter() .map(|batch| (true, batch)) .collect(); + // We're now also guaranteed that the inter-batch order holds + self.inter_batch_order = true; let size: usize = self .in_mem_batches @@ -479,17 +499,16 @@ impl ExternalSorter { return Ok(stream); } - // If less than 1MB of in-memory data, concatenate and sort in place + // If less than 1MB of in-memory data and no batch is sorted, concatenate and sort in place // // This is a very rough heuristic and likely could be refined further - if self.reservation.size() < 1048576 { + let no_batches_sorted = !self.in_mem_batches.iter().any(|(sorted, _)| *sorted); + if self.reservation.size() < 1048576 && no_batches_sorted { // Concatenate memory batches together and sort let (_, batches): (Vec, Vec) = std::mem::take(&mut self.in_mem_batches).into_iter().unzip(); let batch = concat_batches(&self.schema, &batches)?; self.in_mem_batches.clear(); - // Even if all individual batches were themselves sorted the resulting concatenated one - // isn't guaranteed to be sorted, so we must perform sorting on the stream. return self.sort_batch_stream(batch, false, metrics); } @@ -614,11 +633,40 @@ fn write_sorted( Ok(()) } -/// Stream batches from spill files. +/// Stream batches from spill files inside a single stream. +fn read_spill_as_stream( + path: NamedTempFile, + schema: SchemaRef, +) -> Result { + let mut builder = RecordBatchReceiverStream::builder(schema, 2); + let sender = builder.tx(); + + builder.spawn_blocking(move || { + if let Err(e) = read_spill(sender, path.path()) { + error!("Failure while reading spill file: {:?}. Error: {}", path, e); + } + }); + + Ok(builder.build()) +} + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = FileReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| DataFusionError::Execution(format!("{e}")))?; + } + Ok(()) +} + +/// Stream batches from spill files inside separate streams. /// /// Each spill file has one or more batches. Intra-batch order is guaranteed (each one is sorted), -/// but the inter-batch ordering is not guaranteed, hence why we need to convert each batch from the -/// spill to a separate input stream for the merge-sort procedure. +/// but the inter-batch ordering is not always guaranteed, i.e. when `fetch` is `Some` and we do +/// eager sorting. Hence in this case we need to convert each batch from the spill to a separate +/// input stream for the merge-sort procedure. fn read_spill_as_streams( path: &Path, schema: SchemaRef, @@ -628,35 +676,29 @@ fn read_spill_as_streams( let mut streams = vec![]; let file_path = path.as_os_str().to_os_string(); + for batch in reader { - let stream = build_receiver_stream(batch, schema.clone(), file_path.clone()); - streams.push(stream); + let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 2); + let sender = Arc::new(builder.tx()); + + let spill_path = file_path.clone(); + builder.spawn_blocking(move || { + if let Err(e) = sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| DataFusionError::Execution(format!("{e}"))) + { + error!( + "Failure while reading spill file: {:?}. Error: {}", + spill_path, e + ); + } + }); + + streams.push(builder.build()); } Ok(streams) } -fn build_receiver_stream( - maybe_batch: Result, - schema: SchemaRef, - file_path: OsString, -) -> SendableRecordBatchStream { - let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 2); - let sender = builder.tx(); - - builder.spawn_blocking(move || { - if let Err(e) = sender - .blocking_send(maybe_batch.map_err(Into::into)) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) - { - error!( - "Failure while reading spill file: {:?}. Error: {}", - file_path, e - ); - } - }); - builder.build() -} - /// Sort execution plan. /// /// Support sorting datasets that are larger than the memory allotted diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index 98075c675878f..af9ef7e564fe0 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -54,6 +54,10 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec}; #[case::mem_100k_5_rows_fetch_10000(102400, 5, Some(10000), false)] #[case::mem_100k_20k_rows_fetch_10000(102400, 20000, Some(10000), false)] #[case::mem_100k_1m_rows_fetch_10000(102400, 1000000, Some(10000), false)] +// Test with mem > 1MB to exercise the intermediate streaming-merge on spills +// (i.e. skip the <1MB concatenation heuristic) +#[case::mem_2m_1m_rows(2097152, 500000, None, true)] +#[case::mem_2m_10m_rows_fetch_10(2097152, 10000000, Some(10), false)] #[case::mem_inf_5_rows(usize::MAX, 5, None, false)] #[case::mem_inf_20k_rows(usize::MAX, 20000, None, false)] #[case::mem_inf_1m_rows(usize::MAX, 1000000, None, false)] diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 2df635a63a002..4f129eb0e0762 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -2672,8 +2672,8 @@ SELECT ORDER BY ts DESC LIMIT 5; ---- -289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286 +289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289 289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283 286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278 275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275 From daa93378d418de286d301f161c655a6c5d15493f Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 8 Aug 2023 22:41:01 +0200 Subject: [PATCH 10/10] Consolidate and simplify the top-k eager sorting logic --- .../core/src/physical_plan/sorts/sort.rs | 73 +++---------------- .../core/tests/fuzz_cases/order_spill_fuzz.rs | 4 +- datafusion/core/tests/memory_limit.rs | 2 +- 3 files changed, 15 insertions(+), 64 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 02aa1b8f2d5b8..1ca9d21128f0f 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -207,11 +207,10 @@ struct ExternalSorter { /// note that this is a stronger signal than just having all /// individual batches sorted—it means that we can stream the /// entire vector of batches inside one stream for the merge-sort - inter_batch_order: bool, - /// If data has previously been spilled, a vector of tuples: - /// 0. flag denoting whether the inter-batch order was true on spill - /// 1. the location of the spill file (in Arrow IPC format) - spills: Vec<(bool, NamedTempFile)>, + in_mem_batches_sorted: bool, + /// If data has previously been spilled, the locations of the + /// spill files (in Arrow IPC format) + spills: Vec, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, /// Runtime metrics @@ -247,7 +246,7 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], - inter_batch_order: false, + in_mem_batches_sorted: false, spills: vec![], expr: expr.into(), metrics, @@ -301,7 +300,7 @@ impl ExternalSorter { } self.in_mem_batches.push((batch_sorted, input)); - self.inter_batch_order = false; + self.in_mem_batches_sorted = batch_sorted && self.in_mem_batches.len() == 1; Ok(()) } @@ -327,17 +326,9 @@ impl ExternalSorter { streams.push(in_mem_stream); } - for (inter_batch_order, spill) in self.spills.drain(..) { - if inter_batch_order { - // We can read all the batches into a single stream - let spill_stream = read_spill_as_stream(spill, self.schema.clone())?; - streams.push(spill_stream); - } else { - // We need to assign each batch to a separate stream - let spill_streams = - read_spill_as_streams(spill.path(), self.schema.clone())?; - streams.extend(spill_streams); - } + for spill in self.spills.drain(..) { + let stream = read_spill_as_stream(spill, self.schema.clone())?; + streams.push(stream); } streaming_merge( @@ -397,15 +388,13 @@ impl ExternalSorter { let used = self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(used); - self.spills.push((self.inter_batch_order, spillfile)); + self.spills.push(spillfile); Ok(used) } /// Sorts the in_mem_batches in place async fn in_mem_sort(&mut self) -> Result<()> { - let batch_count = self.in_mem_batches.len(); - let all_batches_sorted = self.in_mem_batches.iter().all(|(sorted, _)| *sorted); - if batch_count == 0 || all_batches_sorted { + if self.in_mem_batches.is_empty() || self.in_mem_batches_sorted { return Ok(()); } @@ -417,7 +406,7 @@ impl ExternalSorter { .map(|batch| (true, batch)) .collect(); // We're now also guaranteed that the inter-batch order holds - self.inter_batch_order = true; + self.in_mem_batches_sorted = true; let size: usize = self .in_mem_batches @@ -661,44 +650,6 @@ fn read_spill(sender: Sender>, path: &Path) -> Result<()> { Ok(()) } -/// Stream batches from spill files inside separate streams. -/// -/// Each spill file has one or more batches. Intra-batch order is guaranteed (each one is sorted), -/// but the inter-batch ordering is not always guaranteed, i.e. when `fetch` is `Some` and we do -/// eager sorting. Hence in this case we need to convert each batch from the spill to a separate -/// input stream for the merge-sort procedure. -fn read_spill_as_streams( - path: &Path, - schema: SchemaRef, -) -> Result> { - let file = BufReader::new(File::open(path)?); - let reader = FileReader::try_new(file, None)?; - - let mut streams = vec![]; - let file_path = path.as_os_str().to_os_string(); - - for batch in reader { - let mut builder = RecordBatchReceiverStream::builder(schema.clone(), 2); - let sender = Arc::new(builder.tx()); - - let spill_path = file_path.clone(); - builder.spawn_blocking(move || { - if let Err(e) = sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) - { - error!( - "Failure while reading spill file: {:?}. Error: {}", - spill_path, e - ); - } - }); - - streams.push(builder.build()); - } - Ok(streams) -} - /// Sort execution plan. /// /// Support sorting datasets that are larger than the memory allotted diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index af9ef7e564fe0..f77ebc47fa2ae 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -41,7 +41,7 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec}; #[case::mem_10k_1m_rows(10240, 1000000, None, true)] #[case::mem_10k_5_rows_fetch_1(10240, 5, Some(1), false)] #[case::mem_10k_20k_rows_fetch_1(10240, 20000, Some(1), false)] -#[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), true)] +#[case::mem_10k_1m_rows_fetch_1(10240, 1000000, Some(1), false)] #[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)] #[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)] #[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)] @@ -57,7 +57,7 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec}; // Test with mem > 1MB to exercise the intermediate streaming-merge on spills // (i.e. skip the <1MB concatenation heuristic) #[case::mem_2m_1m_rows(2097152, 500000, None, true)] -#[case::mem_2m_10m_rows_fetch_10(2097152, 10000000, Some(10), false)] +#[case::mem_2m_10m_rows_fetch_100(2097152, 10000000, Some(100), false)] #[case::mem_inf_5_rows(usize::MAX, 5, None, false)] #[case::mem_inf_20k_rows(usize::MAX, 20000, None, false)] #[case::mem_inf_1m_rows(usize::MAX, 1000000, None, false)] diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index 27d768748e265..199d715b12a4d 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -66,7 +66,7 @@ async fn sort(#[case] expected_errors: Vec<&str>, #[case] memory_limit: usize) { #[rstest] #[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)] #[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)] -//#[case::no_oom(vec![], 80_000)] +#[case::no_oom(vec![], 80_000)] #[tokio::test] async fn sort_with_limit( #[case] expected_errors: Vec<&str>,