From d6659bc5d5093838dd22d58e423c99d515b3ea6b Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 12 Nov 2024 17:54:53 +0800 Subject: [PATCH 1/5] Fix record batch memory size double counting --- datafusion/core/tests/memory_limit/mod.rs | 14 +- datafusion/physical-plan/src/sorts/builder.rs | 6 +- datafusion/physical-plan/src/sorts/sort.rs | 24 ++- datafusion/physical-plan/src/spill.rs | 174 +++++++++++++++++- 4 files changed, 201 insertions(+), 17 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 6817969580da0..256d41caf308e 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{ }; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::spill::get_record_batch_memory_size; use futures::StreamExt; use std::any::Any; use std::num::NonZeroUsize; @@ -265,6 +266,10 @@ async fn sort_spill_reservation() { // This test case shows how sort_spill_reservation works by // purposely sorting data that requires non trivial memory to // sort/merge. + + // Merge operation needs extra memory to do row conversion, so make the + // memory limit larger. + let mem_limit = partition_size * 2; let test = TestCase::new() // This query uses a different order than the input table to // force a sort. It also needs to have multiple columns to @@ -272,7 +277,7 @@ async fn sort_spill_reservation() { // substantial memory .with_query("select * from t ORDER BY a , b DESC") // enough memory to sort if we don't try to merge it all at once - .with_memory_limit(partition_size) + .with_memory_limit(mem_limit) // use a single partition so only a sort is needed .with_scenario(scenario) .with_disk_manager_config(DiskManagerConfig::NewOs) @@ -311,7 +316,7 @@ async fn sort_spill_reservation() { // reserve sufficient space up front for merge and this time, // which will force the spills to happen with less buffered // input and thus with enough to merge. - .with_sort_spill_reservation_bytes(partition_size / 2); + .with_sort_spill_reservation_bytes(mem_limit / 2); test.with_config(config).with_expected_success().run().await; } @@ -774,7 +779,10 @@ fn make_dict_batches() -> Vec { // How many bytes does the memory from dict_batches consume? fn batches_byte_size(batches: &[RecordBatch]) -> usize { - batches.iter().map(|b| b.get_array_memory_size()).sum() + batches + .iter() + .map(|b| get_record_batch_memory_size(b)) + .sum() } #[derive(Debug)] diff --git a/datafusion/physical-plan/src/sorts/builder.rs b/datafusion/physical-plan/src/sorts/builder.rs index d32c60697ec8c..9b2fa968222c4 100644 --- a/datafusion/physical-plan/src/sorts/builder.rs +++ b/datafusion/physical-plan/src/sorts/builder.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::spill::get_record_batch_memory_size; use arrow::compute::interleave; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -69,7 +70,8 @@ impl BatchBuilder { /// Append a new batch in `stream_idx` pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> { - self.reservation.try_grow(batch.get_array_memory_size())?; + self.reservation + .try_grow(get_record_batch_memory_size(&batch))?; let batch_idx = self.batches.len(); self.batches.push((stream_idx, batch)); self.cursors[stream_idx] = BatchCursor { @@ -141,7 +143,7 @@ impl BatchBuilder { stream_cursor.batch_idx = retained; retained += 1; } else { - self.reservation.shrink(batch.get_array_memory_size()); + self.reservation.shrink(get_record_batch_memory_size(batch)); } retain }); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index ce7efce415779..9f7bd6b28a2e9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -31,7 +31,9 @@ use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use crate::sorts::streaming_merge::StreamingMergeBuilder; -use crate::spill::{read_spill_as_stream, spill_record_batches}; +use crate::spill::{ + get_record_batch_memory_size, read_spill_as_stream, spill_record_batches, +}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ @@ -286,10 +288,12 @@ impl ExternalSorter { } self.reserve_memory_for_merge()?; - let size = input.get_array_memory_size(); + let size = get_record_batch_memory_size(&input); + if self.reservation.try_grow(size).is_err() { let before = self.reservation.size(); self.in_mem_sort().await?; + // Sorting may have freed memory, especially if fetch is `Some` // // As such we check again, and if the memory usage has dropped by @@ -426,7 +430,7 @@ impl ExternalSorter { let size: usize = self .in_mem_batches .iter() - .map(|x| x.get_array_memory_size()) + .map(get_record_batch_memory_size) .sum(); // Reserve headroom for next sort/merge @@ -521,7 +525,8 @@ impl ExternalSorter { // Concatenate memory batches together and sort let batch = concat_batches(&self.schema, &self.in_mem_batches)?; self.in_mem_batches.clear(); - self.reservation.try_resize(batch.get_array_memory_size())?; + self.reservation + .try_resize(get_record_batch_memory_size(&batch))?; let reservation = self.reservation.take(); return self.sort_batch_stream(batch, metrics, reservation); } @@ -530,7 +535,8 @@ impl ExternalSorter { .into_iter() .map(|batch| { let metrics = self.metrics.baseline.intermediate(); - let reservation = self.reservation.split(batch.get_array_memory_size()); + let reservation = + self.reservation.split(get_record_batch_memory_size(&batch)); let input = self.sort_batch_stream(batch, metrics, reservation)?; Ok(spawn_buffered(input, 1)) }) @@ -559,7 +565,7 @@ impl ExternalSorter { metrics: BaselineMetrics, reservation: MemoryReservation, ) -> Result { - assert_eq!(batch.get_array_memory_size(), reservation.size()); + assert_eq!(get_record_batch_memory_size(&batch), reservation.size()); let schema = batch.schema(); let fetch = self.fetch; @@ -1185,9 +1191,9 @@ mod tests { assert_eq!(metrics.output_rows().unwrap(), 10000); assert!(metrics.elapsed_compute().unwrap() > 0); - assert_eq!(metrics.spill_count().unwrap(), 4); - assert_eq!(metrics.spilled_bytes().unwrap(), 38784); - assert_eq!(metrics.spilled_rows().unwrap(), 9600); + assert_eq!(metrics.spill_count().unwrap(), 3); + assert_eq!(metrics.spilled_bytes().unwrap(), 36000); + assert_eq!(metrics.spilled_rows().unwrap(), 9000); let columns = result[0].columns(); diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index de85a7c6f0989..353e9aaee02cf 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -20,14 +20,16 @@ use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; +use std::ptr::NonNull; +use arrow::array::ArrayData; use arrow::datatypes::SchemaRef; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use log::debug; use tokio::sync::mpsc::Sender; -use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_common::{exec_datafusion_err, HashSet, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::human_readable_size; use datafusion_execution::SendableRecordBatchStream; @@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size( Ok(()) } +/// Calculate total used memory of this batch. +/// +/// This function is used to estimate the physical memory usage of the `RecordBatch`. The implementation will add up all unique `Buffer`'s memory +/// size, due to: +/// - The data pointer inside `Buffer` are memory regions returned by global memory +/// allocator, those regions can't have overlap. +/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap +/// or reuse the same `Buffer`. For example: taking a slice from `Array`. +/// +/// Example: +/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing +/// to a sub-region of the same buffer. +/// +/// [xxxxxxxxxxxxxxxxxxx] <--- buffer +/// ^ ^ ^ ^ +/// | | | | +/// col1->[ ] | | +/// col2--------->[ ] +/// +/// In the above case, `get_record_batch_memory_size` will return the size of +/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size. +/// +/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the +/// buffer memory size if multiple arrays within the batch are sharing the same +/// `Buffer`. This method provides temporary fix until the issue is resolved: +/// https://github.com/apache/arrow-rs/issues/6439 +pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { + // Store pointers to `Buffer`'s start memory address (instead of actual + // used data region's pointer represented by current `Array`) + let mut counted_buffers: HashSet> = HashSet::new(); + let mut total_size = 0; + + for array in batch.columns() { + let array_data = array.to_data(); + count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size); + } + + total_size +} + +/// Count the memory usage of `array_data` and its children recursively. +fn count_array_data_memory_size( + array_data: &ArrayData, + counted_buffers: &mut HashSet>, + total_size: &mut usize, +) { + // Count memory usage for `array_data` + for buffer in array_data.buffers() { + if counted_buffers.insert(buffer.data_ptr()) { + *total_size += buffer.capacity(); + } // Otherwise the buffer's memory is already counted + } + + if let Some(null_buffer) = array_data.nulls() { + if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) { + *total_size += null_buffer.inner().inner().capacity(); + } + } + + // Count all children `ArrayData` recursively + for child in array_data.child_data() { + count_array_data_memory_size(child, counted_buffers, total_size); + } +} + #[cfg(test)] mod tests { + use super::*; use crate::spill::{spill_record_batch_by_size, spill_record_batches}; use crate::test::build_table_i32; + use arrow::array::{Float64Array, Int32Array}; + use arrow::datatypes::{DataType, Field, Float64Type, Int32Type, Schema}; + use arrow::record_batch::RecordBatch; + use arrow_array::ListArray; use datafusion_common::Result; use datafusion_execution::disk_manager::DiskManagerConfig; use datafusion_execution::DiskManager; @@ -147,7 +219,7 @@ mod tests { assert_eq!(cnt.unwrap(), num_rows); let file = BufReader::new(File::open(spill_file.path())?); - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; + let reader = FileReader::try_new(file, None)?; assert_eq!(reader.num_batches(), 2); assert_eq!(reader.schema(), schema); @@ -175,11 +247,107 @@ mod tests { )?; let file = BufReader::new(File::open(spill_file.path())?); - let reader = arrow::ipc::reader::FileReader::try_new(file, None)?; + let reader = FileReader::try_new(file, None)?; assert_eq!(reader.num_batches(), 4); assert_eq!(reader.schema(), schema); Ok(()) } + + #[test] + fn test_get_record_batch_memory_size() { + // Create a simple record batch with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("ints", DataType::Int32, true), + Field::new("float64", DataType::Float64, false), + ])); + + let int_array = + Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]); + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_array), Arc::new(float64_array)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 60); + } + + #[test] + fn test_get_record_batch_memory_size_empty() { + // Test with empty record batch + let schema = Arc::new(Schema::new(vec![Field::new( + "ints", + DataType::Int32, + false, + )])); + + let int_array: Int32Array = Int32Array::from(vec![] as Vec); + let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 0, "Empty batch should have 0 memory size"); + } + + #[test] + fn test_get_record_batch_memory_size_shared_buffer() { + // Test with slices that share the same underlying buffer + let schema = Arc::new(Schema::new(vec![ + Field::new("slice1", DataType::Int32, false), + Field::new("slice2", DataType::Int32, false), + ])); + + let original = Int32Array::from(vec![1, 2, 3, 4, 5]); + let slice1 = original.slice(0, 3); + let slice2 = original.slice(2, 3); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)]) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + // The size should only count the shared buffer once + assert_eq!(size, 20); + } + + #[test] + fn test_get_record_batch_memory_size_nested_array() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "nested_int", + DataType::List(Arc::new( + Field::new("item", DataType::Int32, true).into(), + )), + false, + ), + Field::new( + "nested_int2", + DataType::List(Arc::new( + Field::new("item", DataType::Int32, true).into(), + )), + false, + ), + ])); + + let int_list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + ]); + + let int_list_array2 = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(4), Some(5), Some(6)]), + ]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_list_array), Arc::new(int_list_array2)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 8320); + } } From 52785f59ca9ad5aa7603e497923d46ce1c6a1a03 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 12 Nov 2024 19:19:54 +0800 Subject: [PATCH 2/5] clippy --- datafusion/physical-plan/src/spill.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index 353e9aaee02cf..301459074510c 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -116,19 +116,19 @@ pub fn spill_record_batch_by_size( /// This function is used to estimate the physical memory usage of the `RecordBatch`. The implementation will add up all unique `Buffer`'s memory /// size, due to: /// - The data pointer inside `Buffer` are memory regions returned by global memory -/// allocator, those regions can't have overlap. +/// allocator, those regions can't have overlap. /// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap -/// or reuse the same `Buffer`. For example: taking a slice from `Array`. +/// or reuse the same `Buffer`. For example: taking a slice from `Array`. /// /// Example: /// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing /// to a sub-region of the same buffer. /// -/// [xxxxxxxxxxxxxxxxxxx] <--- buffer +/// {xxxxxxxxxxxxxxxxxxx} <--- buffer /// ^ ^ ^ ^ /// | | | | -/// col1->[ ] | | -/// col2--------->[ ] +/// col1->{ } | | +/// col2--------->{ } /// /// In the above case, `get_record_batch_memory_size` will return the size of /// the buffer, instead of the sum of `col1` and `col2`'s actual memory size. @@ -136,7 +136,7 @@ pub fn spill_record_batch_by_size( /// Note: Current `RecordBatch`.get_array_memory_size()` will double count the /// buffer memory size if multiple arrays within the batch are sharing the same /// `Buffer`. This method provides temporary fix until the issue is resolved: -/// https://github.com/apache/arrow-rs/issues/6439 +/// pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { // Store pointers to `Buffer`'s start memory address (instead of actual // used data region's pointer represented by current `Array`) From d185ae63aa2ef934bc61022c55660ad80f910780 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 12 Nov 2024 19:46:03 +0800 Subject: [PATCH 3/5] clippy --- datafusion/physical-plan/src/spill.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index 301459074510c..428aa63e0f4c9 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -126,7 +126,7 @@ pub fn spill_record_batch_by_size( /// /// {xxxxxxxxxxxxxxxxxxx} <--- buffer /// ^ ^ ^ ^ -/// | | | | +/// | | | | /// col1->{ } | | /// col2--------->{ } /// From 84cee689792230ce9de64fb75c8048d14793d327 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 12 Nov 2024 20:06:02 +0800 Subject: [PATCH 4/5] clippy --- datafusion/core/tests/memory_limit/mod.rs | 5 +---- datafusion/physical-plan/src/spill.rs | 10 +++------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 256d41caf308e..c431cd6303db1 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -779,10 +779,7 @@ fn make_dict_batches() -> Vec { // How many bytes does the memory from dict_batches consume? fn batches_byte_size(batches: &[RecordBatch]) -> usize { - batches - .iter() - .map(|b| get_record_batch_memory_size(b)) - .sum() + batches.iter().map(get_record_batch_memory_size).sum() } #[derive(Debug)] diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index 428aa63e0f4c9..3b0b951b91eff 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -182,7 +182,7 @@ mod tests { use crate::spill::{spill_record_batch_by_size, spill_record_batches}; use crate::test::build_table_i32; use arrow::array::{Float64Array, Int32Array}; - use arrow::datatypes::{DataType, Field, Float64Type, Int32Type, Schema}; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; use arrow::record_batch::RecordBatch; use arrow_array::ListArray; use datafusion_common::Result; @@ -319,16 +319,12 @@ mod tests { let schema = Arc::new(Schema::new(vec![ Field::new( "nested_int", - DataType::List(Arc::new( - Field::new("item", DataType::Int32, true).into(), - )), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), false, ), Field::new( "nested_int2", - DataType::List(Arc::new( - Field::new("item", DataType::Int32, true).into(), - )), + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), false, ), ])); From 951a5f48fa964687d3c730865c0425a431dcfd68 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 15 Nov 2024 13:46:06 +0800 Subject: [PATCH 5/5] review --- datafusion/physical-plan/src/spill.rs | 56 ++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs index 3b0b951b91eff..dee2d09862d36 100644 --- a/datafusion/physical-plan/src/spill.rs +++ b/datafusion/physical-plan/src/spill.rs @@ -113,7 +113,10 @@ pub fn spill_record_batch_by_size( /// Calculate total used memory of this batch. /// -/// This function is used to estimate the physical memory usage of the `RecordBatch`. The implementation will add up all unique `Buffer`'s memory +/// This function is used to estimate the physical memory usage of the `RecordBatch`. +/// It only counts the memory of large data `Buffer`s, and ignores metadata like +/// types and pointers. +/// The implementation will add up all unique `Buffer`'s memory /// size, due to: /// - The data pointer inside `Buffer` are memory regions returned by global memory /// allocator, those regions can't have overlap. @@ -277,6 +280,27 @@ mod tests { assert_eq!(size, 60); } + #[test] + fn test_get_record_batch_memory_size_with_null() { + // Create a simple record batch with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("ints", DataType::Int32, true), + Field::new("float64", DataType::Float64, false), + ])); + + let int_array = Int32Array::from(vec![None, Some(2), Some(3)]); + let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(int_array), Arc::new(float64_array)], + ) + .unwrap(); + + let size = get_record_batch_memory_size(&batch); + assert_eq!(size, 100); + } + #[test] fn test_get_record_batch_memory_size_empty() { // Test with empty record batch @@ -296,22 +320,36 @@ mod tests { #[test] fn test_get_record_batch_memory_size_shared_buffer() { // Test with slices that share the same underlying buffer + let original = Int32Array::from(vec![1, 2, 3, 4, 5]); + let slice1 = original.slice(0, 3); + let slice2 = original.slice(2, 3); + + // `RecordBatch` with `original` array + // ---- + let schema_origin = Arc::new(Schema::new(vec![Field::new( + "origin_col", + DataType::Int32, + false, + )])); + let batch_origin = + RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap(); + + // `RecordBatch` with all columns are reference to `original` array + // ---- let schema = Arc::new(Schema::new(vec![ Field::new("slice1", DataType::Int32, false), Field::new("slice2", DataType::Int32, false), ])); - let original = Int32Array::from(vec![1, 2, 3, 4, 5]); - let slice1 = original.slice(0, 3); - let slice2 = original.slice(2, 3); - - let batch = + let batch_sliced = RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)]) .unwrap(); - let size = get_record_batch_memory_size(&batch); - // The size should only count the shared buffer once - assert_eq!(size, 20); + // Two sizes should all be only counting the buffer in `original` array + let size_origin = get_record_batch_memory_size(&batch_origin); + let size_sliced = get_record_batch_memory_size(&batch_sliced); + + assert_eq!(size_origin, size_sliced); } #[test]