From 3505dbaaca7127c814da564258abf276f6145889 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 2 Aug 2023 11:17:32 -0400 Subject: [PATCH 1/3] Account for memory usage in SortPreservingMerge --- datafusion/common/src/config.rs | 17 + .../core/src/physical_plan/repartition/mod.rs | 11 +- .../core/src/physical_plan/sorts/builder.rs | 22 +- .../core/src/physical_plan/sorts/cursor.rs | 20 +- .../core/src/physical_plan/sorts/merge.rs | 30 +- .../core/src/physical_plan/sorts/sort.rs | 131 +++++-- .../sorts/sort_preserving_merge.rs | 12 +- .../core/src/physical_plan/sorts/stream.rs | 12 +- .../core/tests/fuzz_cases/order_spill_fuzz.rs | 26 +- datafusion/core/tests/memory_limit.rs | 345 +++++++++++++++++- .../test_files/information_schema.slt | 2 + datafusion/execution/src/config.rs | 26 ++ datafusion/execution/src/disk_manager.rs | 9 + docs/source/user-guide/configs.md | 2 + 14 files changed, 591 insertions(+), 74 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f681ae57a3f1c..bf6fb450e3f8b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -235,6 +235,23 @@ config_namespace! { /// /// Defaults to the number of CPU cores on the system pub planning_concurrency: usize, default = num_cpus::get() + + /// How much memory is set aside, for each spillable sort, to + /// ensure an in-memory merge can occur. This setting has no + /// if the sort can not spill (there is no `DiskManager` + /// configured) + /// + /// As part of spilling to disk, in memory data must be sorted + /// / merged before writing the file. This in-memory + /// sort/merge requires memory as well, so To avoid allocating + /// once memory is exhausted, DataFusion sets aside this + /// many bytes before. + pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024 + + /// When sorting, below what size should data be concatenated + /// and sorted in a single RecordBatch rather than sorted in + /// batches and merged. + pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 } } diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index c47c9926819b3..c94d2f9b115fa 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering: let sort_exprs = self.input.output_ordering().unwrap_or(&[]); - // Merge streams (while preserving ordering) coming from input partitions to this partition: + + // Merge streams (while preserving ordering) coming from + // input partitions to this partition: + let fetch = None; + let merge_reservation = + MemoryConsumer::new(format!("{}[Merge {partition}]", self.name())) + .register(context.memory_pool()); streaming_merge( input_streams, self.schema(), sort_exprs, BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), - None, + fetch, + merge_reservation, ) } else { Ok(Box::pin(RepartitionStream { diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs b/datafusion/core/src/physical_plan/sorts/builder.rs index 1c5ec356eed9f..3527d57382230 100644 --- a/datafusion/core/src/physical_plan/sorts/builder.rs +++ b/datafusion/core/src/physical_plan/sorts/builder.rs @@ -19,6 +19,7 @@ use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { @@ -37,6 +38,9 @@ pub struct BatchBuilder { /// Maintain a list of [`RecordBatch`] and their corresponding stream batches: Vec<(usize, RecordBatch)>, + /// Accounts for memory used by buffered batches + reservation: MemoryReservation, + /// The current [`BatchCursor`] for each stream cursors: Vec, @@ -47,23 +51,31 @@ pub struct BatchBuilder { impl BatchBuilder { /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size` - pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self { + pub fn new( + schema: SchemaRef, + stream_count: usize, + batch_size: usize, + reservation: MemoryReservation, + ) -> Self { Self { schema, batches: Vec::with_capacity(stream_count * 2), cursors: vec![BatchCursor::default(); stream_count], indices: Vec::with_capacity(batch_size), + reservation, } } /// Append a new batch in `stream_idx` - pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) { + pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { + self.reservation.try_grow(batch.get_array_memory_size())?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { batch_idx, row_idx: 0, - } + }; + Ok(()) } /// Append the next row from `stream_idx` @@ -119,7 +131,7 @@ impl BatchBuilder { // We can therefore drop all but the last batch for each stream let mut batch_idx = 0; let mut retained = 0; - self.batches.retain(|(stream_idx, _)| { + self.batches.retain(|(stream_idx, batch)| { let stream_cursor = &mut self.cursors[*stream_idx]; let retain = stream_cursor.batch_idx == batch_idx; batch_idx += 1; @@ -127,6 +139,8 @@ impl BatchBuilder { if retain { stream_cursor.batch_idx = retained; retained += 1; + } else { + self.reservation.shrink(batch.get_array_memory_size()); } retain }); diff --git a/datafusion/core/src/physical_plan/sorts/cursor.rs b/datafusion/core/src/physical_plan/sorts/cursor.rs index a9e5122130572..c0c791288644b 100644 --- a/datafusion/core/src/physical_plan/sorts/cursor.rs +++ b/datafusion/core/src/physical_plan/sorts/cursor.rs @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp; use arrow::row::{Row, Rows}; use arrow_array::types::ByteArrayType; use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; +use datafusion_execution::memory_pool::MemoryReservation; use std::cmp::Ordering; /// A [`Cursor`] for [`Rows`] @@ -29,6 +30,11 @@ pub struct RowCursor { num_rows: usize, rows: Rows, + + /// Tracks for the memory used by in the `Rows` of this + /// cursor. Freed on drop + #[allow(dead_code)] + reservation: MemoryReservation, } impl std::fmt::Debug for RowCursor { @@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor { } impl RowCursor { - /// Create a new SortKeyCursor - pub fn new(rows: Rows) -> Self { + /// Create a new SortKeyCursor from `rows` and a `reservation` + /// that tracks its memory. + /// + /// Panic's if the reservation is not for exactly `rows.size()` + /// bytes + pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + assert_eq!( + rows.size(), + reservation.size(), + "memory reservation mismatch" + ); Self { cur_row: 0, num_rows: rows.num_rows(), rows, + reservation, } } diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs b/datafusion/core/src/physical_plan/sorts/merge.rs index 736df7dbe81a1..f8a1457dd62a1 100644 --- a/datafusion/core/src/physical_plan/sorts/merge.rs +++ b/datafusion/core/src/physical_plan/sorts/merge.rs @@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::*; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; use std::task::{ready, Context, Poll}; @@ -42,7 +43,7 @@ macro_rules! primitive_merge_helper { } macro_rules! merge_helper { - ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{ + ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ let streams = FieldCursorStream::<$t>::new($sort, $streams); return Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), @@ -50,6 +51,7 @@ macro_rules! merge_helper { $tracking_metrics, $batch_size, $fetch, + $reservation, ))); }}; } @@ -63,28 +65,36 @@ pub fn streaming_merge( metrics: BaselineMetrics, batch_size: usize, fetch: Option, + reservation: MemoryReservation, ) -> Result { // Special case single column comparisons with optimized cursor implementations if expressions.len() == 1 { let sort = expressions[0].clone(); let data_type = sort.expr.data_type(schema.as_ref())?; downcast_primitive! { - data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch), - DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch) + data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), + DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) _ => {} } } - let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?; + let streams = RowCursorStream::try_new( + schema.as_ref(), + expressions, + streams, + reservation.new_empty(), + )?; + Ok(Box::pin(SortPreservingMergeStream::new( Box::new(streams), schema, metrics, batch_size, fetch, + reservation, ))) } @@ -162,11 +172,12 @@ impl SortPreservingMergeStream { metrics: BaselineMetrics, batch_size: usize, fetch: Option, + reservation: MemoryReservation, ) -> Self { let stream_count = streams.partitions(); Self { - in_progress: BatchBuilder::new(schema, stream_count, batch_size), + in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation), streams, metrics, aborted: false, @@ -197,8 +208,7 @@ impl SortPreservingMergeStream { Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { self.cursors[idx] = Some(cursor); - self.in_progress.push_batch(idx, batch); - Poll::Ready(Ok(())) + Poll::Ready(self.in_progress.push_batch(idx, batch)) } } } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c7ae09bb2e340..5552215e47c98 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -210,23 +210,37 @@ struct ExternalSorter { /// If Some, the maximum number of output rows that will be /// produced. fetch: Option, - /// Memory usage tracking + /// Reservation for in_mem_batches reservation: MemoryReservation, - /// The partition id that this Sort is handling (for identification) - partition_id: usize, - /// A handle to the runtime to get Disk spill files + /// Reservation for the merging of in-memory batches. If the sort + /// might spill, `sort_spill_reservation_bytes` will be + /// pre-reserved to ensure there is some space for this sort/merg. + merge_reservation: MemoryReservation, + /// A handle to the runtime to get spill files runtime: Arc, /// The target number of rows for output batches batch_size: usize, + /// How much memory to reserve for performing in-memory sort/merges + /// prior to spilling. + sort_spill_reservation_bytes: usize, + /// If the in size of buffered memory batches is below this size, + /// the data will be concated and sorted in place rather than + /// sort/merged. + sort_in_place_threshold_bytes: usize, } impl ExternalSorter { + // TOOD: make a builder or some other nicer API to avoid the + // clippy warning + #[allow(clippy::too_many_arguments)] pub fn new( partition_id: usize, schema: SchemaRef, expr: Vec, batch_size: usize, fetch: Option, + sort_spill_reservation_bytes: usize, + sort_in_place_threshold_bytes: usize, metrics: &ExecutionPlanMetricsSet, runtime: Arc, ) -> Self { @@ -235,6 +249,10 @@ impl ExternalSorter { .with_can_spill(true) .register(&runtime.memory_pool); + let merge_reservation = + MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) + .register(&runtime.memory_pool); + Self { schema, in_mem_batches: vec![], @@ -244,9 +262,11 @@ impl ExternalSorter { metrics, fetch, reservation, - partition_id, + merge_reservation, runtime, batch_size, + sort_spill_reservation_bytes, + sort_in_place_threshold_bytes, } } @@ -257,6 +277,7 @@ impl ExternalSorter { if input.num_rows() == 0 { return Ok(()); } + self.reserve_memory_for_merge()?; let size = batch_byte_size(&input); if self.reservation.try_grow(size).is_err() { @@ -318,12 +339,10 @@ impl ExternalSorter { self.metrics.baseline.clone(), self.batch_size, self.fetch, + self.reservation.new_empty(), ) } else if !self.in_mem_batches.is_empty() { - let result = self.in_mem_sort_stream(self.metrics.baseline.clone()); - // Report to the memory manager we are no longer using memory - self.reservation.free(); - result + self.in_mem_sort_stream(self.metrics.baseline.clone()) } else { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } @@ -374,6 +393,11 @@ impl ExternalSorter { return Ok(()); } + // Release the memory reserved for merge back to the pool so + // there is some left when `in_memo_sort_stream` requests an + // allocation. + self.merge_reservation.free(); + self.in_mem_batches = self .in_mem_sort_stream(self.metrics.baseline.intermediate())? .try_collect() @@ -385,7 +409,10 @@ impl ExternalSorter { .map(|x| x.get_array_memory_size()) .sum(); - self.reservation.resize(size); + // Reserve headroom for next sort/merge + self.reserve_memory_for_merge()?; + + self.reservation.try_resize(size)?; self.in_mem_batches_sorted = true; Ok(()) } @@ -455,26 +482,27 @@ impl ExternalSorter { assert_ne!(self.in_mem_batches.len(), 0); if self.in_mem_batches.len() == 1 { let batch = self.in_mem_batches.remove(0); - let stream = self.sort_batch_stream(batch, metrics)?; - self.in_mem_batches.clear(); - return Ok(stream); + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); } - // If less than 1MB of in-memory data, concatenate and sort in place - // - // This is a very rough heuristic and likely could be refined further - if self.reservation.size() < 1048576 { + // If less than sort_in_place_threshold_bytes, concatenate and sort in place + if self.reservation.size() < self.sort_in_place_threshold_bytes { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - return self.sort_batch_stream(batch, metrics); + self.reservation.try_resize(batch.get_array_memory_size())?; + let reservation = self.reservation.take(); + return self.sort_batch_stream(batch, metrics, reservation); } let streams = std::mem::take(&mut self.in_mem_batches) .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) + let reservation = self.reservation.split(batch.get_array_memory_size()); + let input = self.sort_batch_stream(batch, metrics, reservation)?; + Ok(spawn_buffered(input, 1)) }) .collect::>()?; @@ -485,35 +513,49 @@ impl ExternalSorter { metrics, self.batch_size, self.fetch, + self.merge_reservation.new_empty(), ) } - /// Sorts a single `RecordBatch` into a single stream + /// Sorts a single `RecordBatch` into a single stream. + /// + /// `reservation` accounts for the memory used by this batch and + /// is released when the sort is complete fn sort_batch_stream( &self, batch: RecordBatch, metrics: BaselineMetrics, + reservation: MemoryReservation, ) -> Result { + assert_eq!(batch.get_array_memory_size(), reservation.size()); let schema = batch.schema(); - let mut reservation = - MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id)) - .register(&self.runtime.memory_pool); - - // TODO: This should probably be try_grow (#5885) - reservation.resize(batch.get_array_memory_size()); - let fetch = self.fetch; let expressions = self.expr.clone(); let stream = futures::stream::once(futures::future::lazy(move |_| { let sorted = sort_batch(&batch, &expressions, fetch)?; metrics.record_output(sorted.num_rows()); drop(batch); - reservation.free(); + drop(reservation); Ok(sorted) })); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } + + /// If this sort may spill, pre-allocates + /// `sort_spill_reservation_bytes` of memory to gurarantee memory + /// left for the in memory sort/merge. + fn reserve_memory_for_merge(&mut self) -> Result<()> { + // Reserve headroom for next merge sort + if self.runtime.disk_manager.tmp_files_enabled() { + let size = self.sort_spill_reservation_bytes; + if self.merge_reservation.size() != size { + self.merge_reservation.try_resize(size)?; + } + } + + Ok(()) + } } impl Debug for ExternalSorter { @@ -801,6 +843,8 @@ impl ExecutionPlan for SortExec { let mut input = self.input.execute(partition, context.clone())?; + let execution_options = &context.session_config().options().execution; + trace!("End SortExec's input.execute for partition: {}", partition); let mut sorter = ExternalSorter::new( @@ -809,6 +853,8 @@ impl ExecutionPlan for SortExec { self.expr.clone(), context.session_config().batch_size(), self.fetch, + execution_options.sort_spill_reservation_bytes, + execution_options.sort_in_place_threshold_bytes, &self.metrics_set, context.runtime_env(), ); @@ -914,9 +960,15 @@ mod tests { #[tokio::test] async fn test_sort_spill() -> Result<()> { // trigger spill there will be 4 batches with 5.5KB for each - let config = RuntimeConfig::new().with_memory_limit(12288, 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_config = SessionConfig::new(); + let sort_spill_reservation_bytes = session_config + .options() + .execution + .sort_spill_reservation_bytes; + let rt_config = RuntimeConfig::new() + .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0); + let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let partitions = 4; let csv = test::scan_partitioned_csv(partitions)?; @@ -996,11 +1048,18 @@ mod tests { ]; for (fetch, expect_spillage) in test_options { - let config = RuntimeConfig::new() - .with_memory_limit(avg_batch_size * (partitions - 1), 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = - SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_config = SessionConfig::new(); + let sort_spill_reservation_bytes = session_config + .options() + .execution + .sort_spill_reservation_bytes; + + let rt_config = RuntimeConfig::new().with_memory_limit( + sort_spill_reservation_bytes + avg_batch_size * (partitions - 1), + 1.0, + ); + let runtime = Arc::new(RuntimeEnv::new(rt_config)?); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let csv = test::scan_partitioned_csv(partitions)?; let schema = csv.schema(); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index e8d571631bab9..6b978b5ee753d 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -30,6 +30,7 @@ use crate::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use datafusion_execution::memory_pool::MemoryConsumer; use arrow::datatypes::SchemaRef; use datafusion_common::{DataFusionError, Result}; @@ -213,6 +214,10 @@ impl ExecutionPlan for SortPreservingMergeExec { ); let schema = self.schema(); + let reservation = + MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) + .register(&context.runtime_env().memory_pool); + match input_partitions { 0 => Err(DataFusionError::Internal( "SortPreservingMergeExec requires at least one input partition" @@ -241,6 +246,7 @@ impl ExecutionPlan for SortPreservingMergeExec { BaselineMetrics::new(&self.metrics, partition), context.session_config().batch_size(), self.fetch, + reservation, )?; debug!("Got stream result from SortPreservingMergeStream::new_from_receivers"); @@ -843,14 +849,18 @@ mod tests { } let metrics = ExecutionPlanMetricsSet::new(); + let reservation = + MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool); + let fetch = None; let merge_stream = streaming_merge( streams, batches.schema(), sort.as_slice(), BaselineMetrics::new(&metrics, 0), task_ctx.session_config().batch_size(), - None, + fetch, + reservation, ) .unwrap(); diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs b/datafusion/core/src/physical_plan/sorts/stream.rs index 97a3b85fa5353..9ef13b7eb25e4 100644 --- a/datafusion/core/src/physical_plan/sorts/stream.rs +++ b/datafusion/core/src/physical_plan/sorts/stream.rs @@ -23,6 +23,7 @@ use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; use futures::stream::{Fuse, StreamExt}; use std::marker::PhantomData; use std::sync::Arc; @@ -84,6 +85,8 @@ pub struct RowCursorStream { column_expressions: Vec>, /// Input streams streams: FusedStreams, + /// Tracks the memory used by `converter` + reservation: MemoryReservation, } impl RowCursorStream { @@ -91,6 +94,7 @@ impl RowCursorStream { schema: &Schema, expressions: &[PhysicalSortExpr], streams: Vec, + reservation: MemoryReservation, ) -> Result { let sort_fields = expressions .iter() @@ -104,6 +108,7 @@ impl RowCursorStream { let converter = RowConverter::new(sort_fields)?; Ok(Self { converter, + reservation, column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(), streams: FusedStreams(streams), }) @@ -117,7 +122,12 @@ impl RowCursorStream { .collect::>>()?; let rows = self.converter.convert_columns(&cols)?; - Ok(RowCursor::new(rows)) + self.reservation.try_resize(self.converter.size())?; + + // track the memory in the newly created Rows. + let mut rows_reservation = self.reservation.new_empty(); + rows_reservation.try_grow(rows.size())?; + Ok(RowCursor::new(rows, rows_reservation)) } } diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs index 1f72e0fcb45bf..d927b2807d7be 100644 --- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs @@ -22,13 +22,13 @@ use arrow::{ compute::SortOptions, record_batch::RecordBatch, }; -use datafusion::execution::memory_pool::GreedyMemoryPool; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::GreedyMemoryPool; use rand::Rng; use std::sync::Arc; use test_utils::{batches_to_vec, partitions_to_sorted_vec}; @@ -76,10 +76,20 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let exec = MemoryExec::try_new(&input, schema, None).unwrap(); let sort = Arc::new(SortExec::new(sort, Arc::new(exec))); + let session_config = SessionConfig::new(); + // Make sure there is enough space for the initial spill + // reservation + let pool_size = pool_size.saturating_add( + session_config + .options() + .execution + .sort_spill_reservation_bytes, + ); + let runtime_config = RuntimeConfig::new() .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))); let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); let task_ctx = session_ctx.task_ctx(); let collected = collect(sort.clone(), task_ctx).await.unwrap(); @@ -88,9 +98,17 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let actual = batches_to_vec(&collected); if spill { - assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); + assert_ne!( + sort.metrics().unwrap().spill_count().unwrap(), + 0, + "{pool_size} {size}" + ); } else { - assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); + assert_eq!( + sort.metrics().unwrap().spill_count().unwrap(), + 0, + "{pool_size} {size}" + ); } assert_eq!( diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index a7cff6cbd7581..80bbbed8f07fc 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -17,12 +17,21 @@ //! This module contains tests for limiting memory at runtime in DataFusion -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; +use arrow_array::{ArrayRef, DictionaryArray}; +use arrow_schema::SortOptions; +use async_trait::async_trait; +use datafusion::assert_batches_eq; use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::common::batch_byte_size; +use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::streaming::PartitionStream; +use datafusion_expr::{Expr, TableType}; +use datafusion_physical_expr::PhysicalSortExpr; use futures::StreamExt; -use std::sync::Arc; +use std::any::Any; +use std::sync::{Arc, OnceLock}; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; @@ -31,8 +40,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_optimizer::join_selection::JoinSelection; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::SendableRecordBatchStream; -use datafusion_common::assert_contains; +use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use datafusion_common::{assert_contains, Result}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_execution::TaskContext; @@ -196,6 +205,110 @@ async fn symmetric_hash_join() { .await } +#[tokio::test] +async fn sort_preserving_merge() { + let partition_size = batches_byte_size(&dict_batches()); + + TestCase::new( + // This query uses the exact same ordering as the input table + // so only a merge is needed + "select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10", + vec![ + "Resources exhausted: Failed to allocate additional", + "SortPreservingMergeExec", + ], + // provide insufficient memory to merge + partition_size / 2, + ) + // two partitions of data, so a merge is required + .with_scenario(Scenario::DictionaryStrings(2)) + .with_expected_plan( + // It is important that this plan only has + // SortPreservingMergeExec (not a Sort which would compete + // with the SortPreservingMergeExec for memory) + &[ + "+---------------+-------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+-------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Limit: skip=0, fetch=10 |", + "| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | GlobalLimitExec: skip=0, fetch=10 |", + "| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |", + "| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+-------------------------------------------------------------------------------------------------------------+", + ] + ) + .run() + .await +} + +#[tokio::test] +async fn sort_spill_reservation() { + let partition_size = batches_byte_size(&dict_batches()); + + let base_config = SessionConfig::new() + // do not allow the sort to use the 'concat in place' path + .with_sort_in_place_threshold_bytes(10); + + // This test case shows how sort_spill_reservation works by + // purposely sorting data that requires non trivial memory to + // sort/merge. + let test = TestCase::new( + // This query uses a different order than the input table to + // force a sort. It also needs to have multiple columns to + // force RowFormat / interner that makes merge require + // substantial memory + "select * from t ORDER BY a , b DESC", + vec![], // expected errors set below + // enough memory to sort if we don't try to merge it all at once + (partition_size * 5) / 2, + ) + // use a single partiton so only a sort is needed + .with_scenario(Scenario::DictionaryStrings(1)) + .with_disk_manager_config(DiskManagerConfig::NewOs) + .with_expected_plan( + // It is important that this plan only has a SortExec, not + // also merge, so we can ensure the sort could finish + // given enough merging memory + &[ + "+---------------+--------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+--------------------------------------------------------------------------------------------------------+", + "| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |", + "| | TableScan: t projection=[a, b] |", + "| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |", + "| | MemoryExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |", + "| | |", + "+---------------+--------------------------------------------------------------------------------------------------------+", + ] + ); + + let config = base_config + .clone() + // provide insufficient reserved space for merging, + // the sort will fail while trying to merge + .with_sort_spill_reservation_bytes(1024); + + test.clone() + .with_expected_errors(vec![ + "Resources exhausted: Failed to allocate additional", + "ExternalSorterMerge", // merging in sort fails + ]) + .with_config(config) + .run() + .await; + + let config = base_config + // reserve sufficient space up front for merge and this time, + // which will force the spills to happen with less buffered + // input and thus with enough to merge. + .with_sort_spill_reservation_bytes(2 * partition_size); + + test.with_config(config).with_expected_success().run().await; +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] @@ -205,9 +318,17 @@ struct TestCase { memory_limit: usize, config: SessionConfig, scenario: Scenario, + /// How should the disk manager (that allows spilling) be + /// configured? Defaults to `Disabled` + disk_manager_config: DiskManagerConfig, + /// Expected explain plan, if non emptry + expected_plan: Vec, + /// Is the plan expected to pass? Defaults to false + expected_success: bool, } impl TestCase { + // TODO remove expected errors and memory limits and query from constructor fn new<'a>( query: impl Into, expected_errors: impl IntoIterator, @@ -222,21 +343,56 @@ impl TestCase { memory_limit, config: SessionConfig::new(), scenario: Scenario::AccessLog, + disk_manager_config: DiskManagerConfig::Disabled, + expected_plan: vec![], + expected_success: false, } } + /// Set a list of expected strings that must appear in any errors + fn with_expected_errors<'a>( + mut self, + expected_errors: impl IntoIterator, + ) -> Self { + self.expected_errors = + expected_errors.into_iter().map(|s| s.to_string()).collect(); + self + } + /// Specify the configuration to use pub fn with_config(mut self, config: SessionConfig) -> Self { self.config = config; self } + /// Mark that the test expects the query to run successfully + pub fn with_expected_success(mut self) -> Self { + self.expected_success = true; + self + } + /// Specify the scenario to run pub fn with_scenario(mut self, scenario: Scenario) -> Self { self.scenario = scenario; self } + /// Specify if the disk manager should be enabled. If true, + /// operators that support it can spill + pub fn with_disk_manager_config( + mut self, + disk_manager_config: DiskManagerConfig, + ) -> Self { + self.disk_manager_config = disk_manager_config; + self + } + + /// Specify an expected plan to review + pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self { + self.expected_plan = expected_plan.iter().map(|s| s.to_string()).collect(); + self + } + /// Run the test, panic'ing on error async fn run(self) { let Self { @@ -245,33 +401,62 @@ impl TestCase { memory_limit, config, scenario, + disk_manager_config, + expected_plan, + expected_success, } = self; let table = scenario.table(); let rt_config = RuntimeConfig::new() // do not allow spilling - .with_disk_manager(DiskManagerConfig::Disabled) + .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); let runtime = RuntimeEnv::new(rt_config).unwrap(); // Configure execution - let state = SessionState::with_config_rt(config, Arc::new(runtime)) - .with_physical_optimizer_rules(scenario.rules()); + let state = SessionState::with_config_rt(config, Arc::new(runtime)); + let state = match scenario.rules() { + Some(rules) => state.with_physical_optimizer_rules(rules), + None => state, + }; let ctx = SessionContext::with_state(state); ctx.register_table("t", table).expect("registering table"); let df = ctx.sql(&query).await.expect("Planning query"); + if !expected_plan.is_empty() { + let expected_plan: Vec<_> = + expected_plan.iter().map(|s| s.as_str()).collect(); + let actual_plan = df + .clone() + .explain(false, false) + .unwrap() + .collect() + .await + .unwrap(); + assert_batches_eq!(expected_plan, &actual_plan); + } + match df.collect().await { Ok(_batches) => { - panic!("Unexpected success when running, expected memory limit failure") + if !expected_success { + panic!( + "Unexpected success when running, expected memory limit failure" + ) + } } Err(e) => { - for error_substring in expected_errors { - assert_contains!(e.to_string(), error_substring); + if expected_success { + panic!( + "Unexpected failure when running, expected success but got: {e}" + ) + } else { + for error_substring in expected_errors { + assert_contains!(e.to_string(), error_substring); + } } } } @@ -290,6 +475,9 @@ enum Scenario { /// 1000 rows of access log data with batches of 50 rows in a /// [`StreamingTable`] AccessLogStreaming, + + /// N partitions of of sorted, dictionary encoded strings + DictionaryStrings(usize), } impl Scenario { @@ -317,24 +505,53 @@ impl Scenario { .with_infinite_table(true); Arc::new(table) } + Self::DictionaryStrings(num_partitions) => { + use datafusion::physical_expr::expressions::col; + let batches: Vec> = std::iter::repeat(dict_batches()) + .take(*num_partitions) + .collect(); + + let schema = batches[0][0].schema(); + let options = SortOptions { + descending: false, + nulls_first: false, + }; + let sort_information = vec![ + PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options, + }, + PhysicalSortExpr { + expr: col("b", &schema).unwrap(), + options, + }, + ]; + + let table = SortedTableProvider::new(batches, sort_information); + Arc::new(table) + } } } - /// return the optimizer rules to use - fn rules(&self) -> Vec> { + /// return specific physical optimizer rules to use + fn rules(&self) -> Option>> { match self { Self::AccessLog => { // Disabling physical optimizer rules to avoid sorts / // repartitions (since RepartitionExec / SortExec also // has a memory budget which we'll likely hit first) - vec![] + Some(vec![]) } Self::AccessLogStreaming => { // Disable all physical optimizer rules except the // JoinSelection rule to avoid sorts or repartition, // as they also have memory budgets that may be hit // first - vec![Arc::new(JoinSelection::new())] + Some(vec![Arc::new(JoinSelection::new())]) + } + Self::DictionaryStrings(_) => { + // Use default rules + None } } } @@ -347,6 +564,56 @@ fn access_log_batches() -> Vec { .collect() } +static DICT_BATCHES: OnceLock> = OnceLock::new(); + +/// Returns 5 sorted string dictionary batches each with 50 rows with +/// this schema. +/// +/// a: Dictionary, +/// b: Dictionary, +fn dict_batches() -> Vec { + DICT_BATCHES.get_or_init(make_dict_batches).clone() +} + +fn make_dict_batches() -> Vec { + let batch_size = 50; + + let mut i = 0; + let gen = std::iter::from_fn(move || { + // create values like + // 0000000001 + // 0000000002 + // ... + // 0000000002 + + let values: Vec<_> = (i..i + batch_size).map(|x| format!("{x:010}")).collect(); + //println!("values: \n{values:?}"); + let array: DictionaryArray = + values.iter().map(|s| s.as_str()).collect(); + let array = Arc::new(array) as ArrayRef; + let batch = + RecordBatch::try_from_iter(vec![("a", array.clone()), ("b", array)]).unwrap(); + + i += batch_size; + Some(batch) + }); + + let num_batches = 5; + + let batches: Vec<_> = gen.take(num_batches).collect(); + + batches.iter().enumerate().for_each(|(i, batch)| { + println!("Dict batch[{i}] size is: {}", batch_byte_size(batch)); + }); + + batches +} + +// How many bytes does the memory from dict_batches consume? +fn batches_byte_size(batches: &[RecordBatch]) -> usize { + batches.iter().map(batch_byte_size).sum() +} + struct DummyStreamPartition { schema: SchemaRef, batches: Vec, @@ -366,3 +633,53 @@ impl PartitionStream for DummyStreamPartition { )) } } + +/// Wrapper over a TableProvider that can provide ordering information +struct SortedTableProvider { + schema: SchemaRef, + batches: Vec>, + sort_information: Vec, +} + +impl SortedTableProvider { + fn new( + batches: Vec>, + sort_information: Vec, + ) -> Self { + let schema = batches[0][0].schema(); + Self { + schema, + batches, + sort_information, + } + } +} + +#[async_trait] +impl TableProvider for SortedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let mem_exec = + MemoryExec::try_new(&self.batches, self.schema(), projection.cloned())? + .with_sort_information(self.sort_information.clone()); + + Ok(Arc::new(mem_exec)) + } +} diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index fcb818d5fd481..162e208201911 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -153,6 +153,8 @@ datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true datafusion.execution.planning_concurrency 13 +datafusion.execution.sort_in_place_threshold_bytes 1048576 +datafusion.execution.sort_spill_reservation_bytes 10485760 datafusion.execution.target_partitions 7 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index c8478499365ef..44fcc2ab49b47 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -287,6 +287,32 @@ impl SessionConfig { self.options.optimizer.enable_round_robin_repartition } + /// Set the size of [`sort_spill_reservation_bytes`] to control + /// memory pre-reservation + /// + /// [`sort_spill_reservation_bytes`]: datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes + pub fn with_sort_spill_reservation_bytes( + mut self, + sort_spill_reservation_bytes: usize, + ) -> Self { + self.options.execution.sort_spill_reservation_bytes = + sort_spill_reservation_bytes; + self + } + + /// Set the size of [`sort_in_place_threshold_bytes`] to control + /// how sort does things. + /// + /// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes + pub fn with_sort_in_place_threshold_bytes( + mut self, + sort_in_place_threshold_bytes: usize, + ) -> Self { + self.options.execution.sort_in_place_threshold_bytes = + sort_in_place_threshold_bytes; + self + } + /// Convert configuration options to name-value pairs with values /// converted to strings. /// diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 107c58fbe327d..e8d2ed9cc0f51 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -102,6 +102,13 @@ impl DiskManager { } } + /// Return true if this disk manager supports creating temporary + /// files. If this returns false, any call to `create_tmp_file` + /// will error. + pub fn tmp_files_enabled(&self) -> bool { + self.local_dirs.lock().is_some() + } + /// Return a temporary file from a randomized choice in the configured locations /// /// If the file can not be created for some reason, returns an @@ -198,6 +205,7 @@ mod tests { ); let dm = DiskManager::try_new(config)?; + assert!(dm.tmp_files_enabled()); let actual = dm.create_tmp_file("Testing")?; // the file should be in one of the specified local directories @@ -210,6 +218,7 @@ mod tests { fn test_disabled_disk_manager() { let config = DiskManagerConfig::Disabled; let manager = DiskManager::try_new(config).unwrap(); + assert!(!manager.tmp_files_enabled()); assert_eq!( manager.create_tmp_file("Testing").unwrap_err().to_string(), "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)", diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index bff7cb4da0125..abbc682e3d31f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -57,6 +57,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | How much memory is set aside, for each spillable sort, to ensure an in-memory merge can occur. This setting has no if the sort can not spill (there is no `DiskManager` configured) As part of spilling to disk, in memory data must be sorted / merged before writing the file. This in-memory sort/merge requires memory as well, so To avoid allocating once memory is exhausted, DataFusion sets aside this many bytes before. | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | From f87705e63061f040565eba5e5c2dc2b91782ddd6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 9 Aug 2023 09:49:14 -0400 Subject: [PATCH 2/3] Review Comments: Improve documentation and comments --- datafusion/common/src/config.rs | 17 ++++--- .../core/src/physical_plan/sorts/sort.rs | 2 +- docs/source/user-guide/configs.md | 44 ------------------- 3 files changed, 9 insertions(+), 54 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index bf6fb450e3f8b..fe7fb955033fc 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -236,16 +236,15 @@ config_namespace! { /// Defaults to the number of CPU cores on the system pub planning_concurrency: usize, default = num_cpus::get() - /// How much memory is set aside, for each spillable sort, to - /// ensure an in-memory merge can occur. This setting has no - /// if the sort can not spill (there is no `DiskManager` - /// configured) + /// Specifies the reserved memory for each spillable sort operation to + /// facilitate an in-memory merge. /// - /// As part of spilling to disk, in memory data must be sorted - /// / merged before writing the file. This in-memory - /// sort/merge requires memory as well, so To avoid allocating - /// once memory is exhausted, DataFusion sets aside this - /// many bytes before. + /// When a sort operation spills to disk, the in-memory data must be + /// sorted and merged before being written to a file. This setting reserves + /// a specific amount of memory for that in-memory sort/merge process. + /// + /// Note: This setting is irrelevant if the sort operation cannot spill + /// (i.e., if there's no `DiskManager` configured). pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024 /// When sorting, below what size should data be concatenated diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 5552215e47c98..411a425b51db1 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -214,7 +214,7 @@ struct ExternalSorter { reservation: MemoryReservation, /// Reservation for the merging of in-memory batches. If the sort /// might spill, `sort_spill_reservation_bytes` will be - /// pre-reserved to ensure there is some space for this sort/merg. + /// pre-reserved to ensure there is some space for this sort/merge. merge_reservation: MemoryReservation, /// A handle to the runtime to get spill files runtime: Arc, diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index abbc682e3d31f..196603d2edca7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,47 +35,3 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | default | description | -| ---------------------------------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | false | If the file has a header | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | -| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | -| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | -| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | -| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | -| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | -| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.sort_spill_reservation_bytes | 10485760 | How much memory is set aside, for each spillable sort, to ensure an in-memory merge can occur. This setting has no if the sort can not spill (there is no `DiskManager` configured) As part of spilling to disk, in memory data must be sorted / merged before writing the file. This in-memory sort/merge requires memory as well, so To avoid allocating once memory is exhausted, DataFusion sets aside this many bytes before. | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | -| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | -| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. | -| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | From 441bcbc5815d193aa486ac51688ca678f481e1bd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 9 Aug 2023 09:49:47 -0400 Subject: [PATCH 3/3] Review Comments: Improve documentation and comments --- docs/source/user-guide/configs.md | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 196603d2edca7..63c9c064bc525 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,3 +35,47 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. +| key | default | description | +| ---------------------------------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | false | If the file has a header | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | +| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | +| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | +| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | +| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | +| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | +| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | +| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. | +| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. |