From 696b68c99cef42ff0aee2b6dab81d27d2cebbaae Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 28 Mar 2025 15:18:16 +0800 Subject: [PATCH 1/3] fix assertion fail in external sort by refactoring --- datafusion/core/tests/memory_limit/mod.rs | 32 ++++++++ datafusion/physical-plan/src/sorts/sort.rs | 88 ++++++++++++---------- 2 files changed, 81 insertions(+), 39 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 6a0a797d4dedc..3bf2ac975e9f1 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -26,6 +26,7 @@ mod memory_limit_validation; use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; +use arrow::util::pretty; use arrow_schema::{DataType, Field, Schema}; use datafusion::assert_batches_eq; use datafusion::datasource::memory::MemorySourceConfig; @@ -49,6 +50,7 @@ use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::common::collect; use datafusion_physical_plan::spill::get_record_batch_memory_size; use rand::Rng; use test_utils::AccessLogGenerator; @@ -493,6 +495,36 @@ async fn test_in_mem_buffer_almost_full() { let _ = df.collect().await.unwrap(); } +/// External sort should be able to run if there is very little pre-reserved memory +/// for merge (set configuration sort_spill_reservation_bytes to 0). +#[tokio::test] +async fn test_external_sort_zero_merge_reservation() { + let config = SessionConfig::new() + .with_sort_spill_reservation_bytes(0) + .with_target_partitions(14); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024))) + .build_arc() + .unwrap(); + + let ctx = SessionContext::new_with_config_rt(config, runtime); + + let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;"; + let df = ctx.sql(query).await.unwrap(); + + let physical_plan = df.create_physical_plan().await.unwrap(); + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = physical_plan.execute(0, task_ctx).unwrap(); + + // Ensures execution succeed + let _result = collect(stream).await; + + // Ensures the query spilled during execution + let metrics = physical_plan.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + assert!(spill_count > 0); +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index ed35492041be0..35203cb9e4194 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -216,10 +216,8 @@ struct ExternalSorter { // STATE BUFFERS: // Fields that hold intermediate data during sorting // ======================================================================== - /// Potentially unsorted in memory buffer + /// Unsorted input batches stored in the memory buffer in_mem_batches: Vec, - /// if `Self::in_mem_batches` are sorted - in_mem_batches_sorted: bool, /// During external sorting, in-memory intermediate data will be appended to /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`]. @@ -284,7 +282,6 @@ impl ExternalSorter { Self { schema, in_mem_batches: vec![], - in_mem_batches_sorted: false, in_progress_spill_file: None, finished_spill_files: vec![], expr: expr.into(), @@ -320,7 +317,6 @@ impl ExternalSorter { } self.in_mem_batches.push(input); - self.in_mem_batches_sorted = false; Ok(()) } @@ -397,16 +393,13 @@ impl ExternalSorter { self.metrics.spill_metrics.spill_file_count.value() } - /// When calling, all `in_mem_batches` must be sorted (*), and then all of them will - /// be appended to the in-progress spill file. - /// - /// (*) 'Sorted' here means globally sorted for all buffered batches when the - /// memory limit is reached, instead of partially sorted within the batch. - async fn spill_append(&mut self) -> Result<()> { - assert!(self.in_mem_batches_sorted); - - // we could always get a chance to free some memory as long as we are holding some - if self.in_mem_batches.is_empty() { + /// Appending globally sorted batches to the in-progress spill file, and clears + /// the `globally_sorted_batches` (also its memory reservation) afterwards. + async fn consume_and_spill_append( + &mut self, + globally_sorted_batches: &mut Vec, + ) -> Result<()> { + if globally_sorted_batches.is_empty() { return Ok(()); } @@ -416,21 +409,23 @@ impl ExternalSorter { Some(self.spill_manager.create_in_progress_file("Sorting")?); } - self.organize_stringview_arrays()?; + Self::organize_stringview_arrays(globally_sorted_batches)?; debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let batches = std::mem::take(&mut self.in_mem_batches); + let batches_to_spill = std::mem::take(globally_sorted_batches); self.reservation.free(); let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { internal_datafusion_err!("In-progress spill file should be initialized") })?; - for batch in batches { + for batch in batches_to_spill { in_progress_file.append_batch(&batch)?; } + assert!(globally_sorted_batches.is_empty()); + Ok(()) } @@ -449,7 +444,7 @@ impl ExternalSorter { Ok(()) } - /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each + /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each /// `StringViewArray` in sequential order by calling `gc()` on them. /// /// Note this is a workaround until is @@ -478,10 +473,12 @@ impl ExternalSorter { /// /// Then when spilling each batch, the writer has to write all referenced buffers /// repeatedly. - fn organize_stringview_arrays(&mut self) -> Result<()> { - let mut organized_batches = Vec::with_capacity(self.in_mem_batches.len()); + fn organize_stringview_arrays( + globally_sorted_batches: &mut Vec, + ) -> Result<()> { + let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len()); - for batch in self.in_mem_batches.drain(..) { + for batch in globally_sorted_batches.drain(..) { let mut new_columns: Vec> = Vec::with_capacity(batch.num_columns()); @@ -507,20 +504,17 @@ impl ExternalSorter { organized_batches.push(organized_batch); } - self.in_mem_batches = organized_batches; + *globally_sorted_batches = organized_batches; Ok(()) } - /// Sorts the in_mem_batches in place + /// Sorts the in_mem_batches and potentially spill the sorted batches. /// - /// Sorting may have freed memory, especially if fetch is `Some`. If - /// the memory usage has dropped by a factor of 2, then we don't have - /// to spill. Otherwise, we spill to free up memory for inserting - /// more batches. - /// The factor of 2 aims to avoid a degenerate case where the - /// memory required for `fetch` is just under the memory available, - /// causing repeated re-sorting of data + /// If the memory usage has dropped by a factor of 2, it might be a sort with + /// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the + /// sorted entries inside `in_mem_batches` to be sorted in the next iteration. + /// Otherwise, we spill the sorted run to free up memory for inserting more batches. /// /// # Arguments /// @@ -539,10 +533,14 @@ impl ExternalSorter { let mut sorted_stream = self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; + // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken + // to construct a globally sorted stream. + assert!(self.in_mem_batches.is_empty()); + // 'global' here refers to all buffered batches when the memory limit is + // reached. This variable will buffer the sorted batches after + // sort-preserving merge and incrementally append to spill files. + let mut globally_sorted_batches: Vec = vec![]; - // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty. - // We'll gradually collect the sorted stream into self.in_mem_batches, or directly - // write sorted batches to disk when the memory is insufficient. let mut spilled = false; while let Some(batch) = sorted_stream.next().await { let batch = batch?; @@ -551,12 +549,12 @@ impl ExternalSorter { // Although the reservation is not enough, the batch is // already in memory, so it's okay to combine it with previously // sorted batches, and spill together. - self.in_mem_batches.push(batch); - self.spill_append().await?; // reservation is freed in spill() + globally_sorted_batches.push(batch); + self.consume_and_spill_append(&mut globally_sorted_batches) + .await?; // reservation is freed in spill() spilled = true; } else { - self.in_mem_batches.push(batch); - self.in_mem_batches_sorted = true; + globally_sorted_batches.push(batch); } } @@ -570,12 +568,24 @@ impl ExternalSorter { if (self.reservation.size() > before / 2) || force_spill { // We have not freed more than 50% of the memory, so we have to spill to // free up more memory - self.spill_append().await?; + self.consume_and_spill_append(&mut globally_sorted_batches) + .await?; spilled = true; } if spilled { + // There might be some buffered batches that haven't trigger a spill yet. + self.consume_and_spill_append(&mut globally_sorted_batches) + .await?; self.spill_finish().await?; + } else { + // If the memory limit has reached before calling this function, and it + // didn't spill anything, it means this is a sorting with fetch top K + // element: after sorting only the top K elements will be kept in memory. + // For simplicity, those sorted top K entries are put back to unsorted + // `in_mem_batches` to be consumed by the next sort/merge. + assert!(self.in_mem_batches.is_empty()); + self.in_mem_batches = std::mem::take(&mut globally_sorted_batches); } // Reserve headroom for next sort/merge From 7543bbf0894230ce126b25c5a081e7d3145b2713 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 28 Mar 2025 16:11:58 +0800 Subject: [PATCH 2/3] clippy --- datafusion/core/tests/memory_limit/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 3bf2ac975e9f1..65cc08cfb2753 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -26,7 +26,6 @@ mod memory_limit_validation; use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, StringViewArray}; use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; -use arrow::util::pretty; use arrow_schema::{DataType, Field, Schema}; use datafusion::assert_batches_eq; use datafusion::datasource::memory::MemorySourceConfig; From 1f4eabba38d20f5ab13f1458ee924342b0a55aac Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Sun, 30 Mar 2025 11:54:19 +0800 Subject: [PATCH 3/3] avoid assert --- datafusion/physical-plan/src/sorts/sort.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 35203cb9e4194..adcf4cac43f17 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -424,7 +424,9 @@ impl ExternalSorter { in_progress_file.append_batch(&batch)?; } - assert!(globally_sorted_batches.is_empty()); + if !globally_sorted_batches.is_empty() { + return internal_err!("This function consumes globally_sorted_batches, so it should be empty after taking."); + } Ok(()) } @@ -535,7 +537,11 @@ impl ExternalSorter { self.in_mem_sort_stream(self.metrics.baseline.intermediate())?; // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` is taken // to construct a globally sorted stream. - assert!(self.in_mem_batches.is_empty()); + if !self.in_mem_batches.is_empty() { + return internal_err!( + "in_mem_batches should be empty after constructing sorted stream" + ); + } // 'global' here refers to all buffered batches when the memory limit is // reached. This variable will buffer the sorted batches after // sort-preserving merge and incrementally append to spill files. @@ -584,7 +590,10 @@ impl ExternalSorter { // element: after sorting only the top K elements will be kept in memory. // For simplicity, those sorted top K entries are put back to unsorted // `in_mem_batches` to be consumed by the next sort/merge. - assert!(self.in_mem_batches.is_empty()); + if !self.in_mem_batches.is_empty() { + return internal_err!("in_mem_batches should be cleared before"); + } + self.in_mem_batches = std::mem::take(&mut globally_sorted_batches); }