From 4eeca45d8b09a31d80c25fff7e3e516764f11456 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 8 Jul 2024 17:42:53 -0700 Subject: [PATCH 01/10] feat(11344): track memory used for non-parallel writes --- datafusion/core/src/datasource/file_format/parquet.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 27d783cd89b5f..b4a3f6e442261 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -48,6 +48,7 @@ use datafusion_common::{ DEFAULT_PARQUET_EXTENSION, }; use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; @@ -749,14 +750,19 @@ impl DataSink for ParquetSink { parquet_props.writer_options().clone(), ) .await?; + let mut reservation = + MemoryConsumer::new(format!("ParquetSink[{}]", path)) + .register(context.memory_pool()); file_write_tasks.spawn(async move { while let Some(batch) = rx.recv().await { writer.write(&batch).await?; + reservation.try_resize(writer.memory_size())?; } let file_metadata = writer .close() .await .map_err(DataFusionError::ParquetError)?; + drop(reservation); Ok((path, file_metadata)) }); } else { From 07e45e4259a589697d85d5c4c2862e3be78f05d3 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 8 Jul 2024 17:47:23 -0700 Subject: [PATCH 02/10] feat(11344): track memory usage during parallel writes --- .../src/datasource/file_format/parquet.rs | 47 +++++++++++++++---- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b4a3f6e442261..57211fcf45f07 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -48,7 +48,7 @@ use datafusion_common::{ DEFAULT_PARQUET_EXTENSION, }; use datafusion_common_runtime::SpawnedTask; -use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; @@ -777,6 +777,7 @@ impl DataSink for ParquetSink { let schema = self.get_writer_schema(); let props = parquet_props.clone(); let parallel_options_clone = parallel_options.clone(); + let pool = Arc::clone(context.memory_pool()); file_write_tasks.spawn(async move { let file_metadata = output_single_parquet_file_parallelized( writer, @@ -784,6 +785,7 @@ impl DataSink for ParquetSink { schema, props.writer_options(), parallel_options_clone, + pool, ) .await?; Ok((path, file_metadata)) @@ -824,14 +826,16 @@ impl DataSink for ParquetSink { async fn column_serializer_task( mut rx: Receiver, mut writer: ArrowColumnWriter, -) -> Result { + mut reservation: MemoryReservation, +) -> Result<(ArrowColumnWriter, MemoryReservation)> { while let Some(col) = rx.recv().await { writer.write(&col)?; + reservation.try_resize(writer.memory_size())?; } - Ok(writer) + Ok((writer, reservation)) } -type ColumnWriterTask = SpawnedTask>; +type ColumnWriterTask = SpawnedTask>; type ColSender = Sender; /// Spawns a parallel serialization task for each column @@ -841,6 +845,7 @@ fn spawn_column_parallel_row_group_writer( schema: Arc, parquet_props: Arc, max_buffer_size: usize, + pool: &Arc, ) -> Result<(Vec, Vec)> { let schema_desc = arrow_to_parquet_schema(&schema)?; let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?; @@ -854,7 +859,13 @@ fn spawn_column_parallel_row_group_writer( mpsc::channel::(max_buffer_size); col_array_channels.push(send_array); - let task = SpawnedTask::spawn(column_serializer_task(recieve_array, writer)); + let reservation = + MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool); + let task = SpawnedTask::spawn(column_serializer_task( + recieve_array, + writer, + reservation, + )); col_writer_tasks.push(task); } @@ -870,7 +881,8 @@ struct ParallelParquetWriterOptions { /// This is the return type of calling [ArrowColumnWriter].close() on each column /// i.e. the Vec of encoded columns which can be appended to a row group -type RBStreamSerializeResult = Result<(Vec, usize)>; +type RBStreamSerializeResult = + Result<(Vec<(ArrowColumnChunk, MemoryReservation)>, usize)>; /// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective /// parallel column serializers. @@ -906,8 +918,11 @@ fn spawn_rg_join_and_finalize_task( let num_cols = column_writer_tasks.len(); let mut finalized_rg = Vec::with_capacity(num_cols); for task in column_writer_tasks.into_iter() { - let writer = task.join_unwind().await?; - finalized_rg.push(writer.close()?); + let (writer, mut reservation) = task.join_unwind().await?; + let encoded_size = writer.get_estimated_total_bytes(); + let data = writer.close()?; + reservation.try_resize(encoded_size)?; + finalized_rg.push((data, reservation)); } Ok((finalized_rg, rg_rows)) @@ -928,6 +943,7 @@ fn spawn_parquet_parallel_serialization_task( schema: Arc, writer_props: Arc, parallel_options: ParallelParquetWriterOptions, + pool: Arc, ) -> SpawnedTask> { SpawnedTask::spawn(async move { let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; @@ -937,6 +953,7 @@ fn spawn_parquet_parallel_serialization_task( schema.clone(), writer_props.clone(), max_buffer_rb, + &pool, )?; let mut current_rg_rows = 0; @@ -979,6 +996,7 @@ fn spawn_parquet_parallel_serialization_task( schema.clone(), writer_props.clone(), max_buffer_rb, + &pool, )?; } } @@ -1008,6 +1026,7 @@ async fn concatenate_parallel_row_groups( schema: Arc, writer_props: Arc, mut object_store_writer: Box, + pool: Arc, ) -> Result { let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); @@ -1019,20 +1038,27 @@ async fn concatenate_parallel_row_groups( )?; while let Some(task) = serialize_rx.recv().await { + let mut rg_reservation = + MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(&pool); + let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; let (serialized_columns, _cnt) = result?; - for chunk in serialized_columns { + for (chunk, col_reservation) in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); if buff_to_flush.len() > BUFFER_FLUSH_BYTES { object_store_writer .write_all(buff_to_flush.as_slice()) .await?; + rg_reservation.shrink(buff_to_flush.len()); buff_to_flush.clear(); } + rg_reservation.grow(col_reservation.size()); + drop(col_reservation); } rg_out.close()?; + rg_reservation.free(); } let file_metadata = parquet_writer.close()?; @@ -1054,6 +1080,7 @@ async fn output_single_parquet_file_parallelized( output_schema: Arc, parquet_props: &WriterProperties, parallel_options: ParallelParquetWriterOptions, + pool: Arc, ) -> Result { let max_rowgroups = parallel_options.max_parallel_row_groups; // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel @@ -1067,12 +1094,14 @@ async fn output_single_parquet_file_parallelized( output_schema.clone(), arc_props.clone(), parallel_options, + Arc::clone(&pool), ); let file_metadata = concatenate_parallel_row_groups( serialize_rx, output_schema.clone(), arc_props.clone(), object_store_writer, + pool, ) .await?; From 68beda4e5a81e9295a5e5078f48a4affae60360b Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 8 Jul 2024 19:09:11 -0700 Subject: [PATCH 03/10] test(11344): create bounded stream for testing --- datafusion/core/src/test_util/mod.rs | 36 ++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 059fa8fc6da77..ba0509f3f51ac 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -366,3 +366,39 @@ pub fn register_unbounded_file_with_ordering( ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; Ok(()) } + +struct BoundedStream { + limit: usize, + count: usize, + batch: RecordBatch, +} + +impl Stream for BoundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if self.count >= self.limit { + return Poll::Ready(None); + } + self.count += 1; + Poll::Ready(Some(Ok(self.batch.clone()))) + } +} + +impl RecordBatchStream for BoundedStream { + fn schema(&self) -> SchemaRef { + self.batch.schema() + } +} + +/// Creates an bounded stream for testing purposes. +pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchStream { + Box::pin(BoundedStream { + count: 0, + limit, + batch, + }) +} From 073e4714a5e239672a38713418a48f8bdc161e5b Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 8 Jul 2024 19:10:03 -0700 Subject: [PATCH 04/10] test(11344): test ParquetSink memory reservation --- .../src/datasource/file_format/parquet.rs | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 57211fcf45f07..e5fb7a95f12fb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1193,8 +1193,10 @@ mod tests { use super::super::test_util::scan_format; use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; use crate::physical_plan::collect; + use crate::test_util::bounded_stream; use std::fmt::{Display, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; use super::*; @@ -2212,4 +2214,105 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn parquet_sink_write_memory_reservation() -> Result<()> { + async fn test_memory_reservation(global: ParquetOptions) -> Result<()> { + let field_a = Field::new("a", DataType::Utf8, false); + let field_b = Field::new("b", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let object_store_url = ObjectStoreUrl::local_filesystem(); + + let file_sink_config = FileSinkConfig { + object_store_url: object_store_url.clone(), + file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], + table_paths: vec![ListingTableUrl::parse("file:///")?], + output_schema: schema.clone(), + table_partition_cols: vec![], + overwrite: true, + keep_partition_by_columns: false, + }; + let parquet_sink = Arc::new(ParquetSink::new( + file_sink_config, + TableParquetOptions { + key_value_metadata: std::collections::HashMap::from([ + ("my-data".to_string(), Some("stuff".to_string())), + ("my-data-bool-key".to_string(), None), + ]), + global, + ..Default::default() + }, + )); + + // create data + let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); + let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); + let batch = + RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + + // create task context + let task_context = build_ctx(object_store_url.as_ref()); + assert_eq!( + task_context.memory_pool().reserved(), + 0, + "no bytes are reserved yet" + ); + + let mut write_task = parquet_sink.write_all( + Box::pin(RecordBatchStreamAdapter::new( + schema, + bounded_stream(batch, 1000), + )), + &task_context, + ); + + // incrementally poll and check for memory reservation + let mut reserved_bytes = 0; + while futures::poll!(&mut write_task).is_pending() { + reserved_bytes += task_context.memory_pool().reserved(); + tokio::time::sleep(Duration::from_micros(1)).await; + } + assert!( + reserved_bytes > 0, + "should have bytes reserved during write" + ); + assert_eq!( + task_context.memory_pool().reserved(), + 0, + "no leaking byte reservation" + ); + + Ok(()) + } + + let write_opts = ParquetOptions { + allow_single_file_parallelism: false, + ..Default::default() + }; + test_memory_reservation(write_opts) + .await + .expect("should track for non-parallel writes"); + + let row_parallel_write_opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 10, + maximum_buffered_record_batches_per_stream: 1, + ..Default::default() + }; + test_memory_reservation(row_parallel_write_opts) + .await + .expect("should track for row-parallel writes"); + + let col_parallel_write_opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 1, + maximum_buffered_record_batches_per_stream: 2, + ..Default::default() + }; + test_memory_reservation(col_parallel_write_opts) + .await + .expect("should track for column-parallel writes"); + + Ok(()) + } } From 3709d48f708237253513fce62ea0d922016d9926 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 8 Jul 2024 20:26:59 -0700 Subject: [PATCH 05/10] feat(11344): track bytes in file writer --- datafusion/core/src/datasource/file_format/parquet.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e5fb7a95f12fb..d16320d8d4c35 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1030,6 +1030,9 @@ async fn concatenate_parallel_row_groups( ) -> Result { let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); + let mut file_reservation = + MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool); + let schema_desc = arrow_to_parquet_schema(schema.as_ref())?; let mut parquet_writer = SerializedFileWriter::new( merged_buff.clone(), @@ -1058,7 +1061,10 @@ async fn concatenate_parallel_row_groups( drop(col_reservation); } rg_out.close()?; - rg_reservation.free(); + + // bytes remaining. Could be unflushed data, or rg metadata passed to the file writer on rg_out.close() + let remaining_bytes = rg_reservation.free(); + file_reservation.grow(remaining_bytes); } let file_metadata = parquet_writer.close()?; @@ -1066,6 +1072,7 @@ async fn concatenate_parallel_row_groups( object_store_writer.write_all(final_buff.as_slice()).await?; object_store_writer.shutdown().await?; + file_reservation.free(); Ok(file_metadata) } From f880fae92e386927fdf11fb7278d91b6a42daa9d Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 8 Jul 2024 20:47:09 -0700 Subject: [PATCH 06/10] refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed --- datafusion/core/src/datasource/file_format/parquet.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index d16320d8d4c35..1c10f426d4e97 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1049,6 +1049,9 @@ async fn concatenate_parallel_row_groups( let (serialized_columns, _cnt) = result?; for (chunk, col_reservation) in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; + rg_reservation.grow(col_reservation.size()); + drop(col_reservation); + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); if buff_to_flush.len() > BUFFER_FLUSH_BYTES { object_store_writer @@ -1057,8 +1060,6 @@ async fn concatenate_parallel_row_groups( rg_reservation.shrink(buff_to_flush.len()); buff_to_flush.clear(); } - rg_reservation.grow(col_reservation.size()); - drop(col_reservation); } rg_out.close()?; From 558348ea348eaa03a3ac3fcbfce668ab27a08dba Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 9 Jul 2024 13:02:07 -0700 Subject: [PATCH 07/10] refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg --- .../src/datasource/file_format/parquet.rs | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 1c10f426d4e97..557b0d5f97c5b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -881,8 +881,7 @@ struct ParallelParquetWriterOptions { /// This is the return type of calling [ArrowColumnWriter].close() on each column /// i.e. the Vec of encoded columns which can be appended to a row group -type RBStreamSerializeResult = - Result<(Vec<(ArrowColumnChunk, MemoryReservation)>, usize)>; +type RBStreamSerializeResult = Result<(Vec, MemoryReservation, usize)>; /// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective /// parallel column serializers. @@ -913,19 +912,23 @@ async fn send_arrays_to_col_writers( fn spawn_rg_join_and_finalize_task( column_writer_tasks: Vec, rg_rows: usize, + pool: &Arc, ) -> SpawnedTask { + let mut rg_reservation = + MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool); + SpawnedTask::spawn(async move { let num_cols = column_writer_tasks.len(); let mut finalized_rg = Vec::with_capacity(num_cols); for task in column_writer_tasks.into_iter() { - let (writer, mut reservation) = task.join_unwind().await?; + let (writer, col_reservation) = task.join_unwind().await?; let encoded_size = writer.get_estimated_total_bytes(); - let data = writer.close()?; - reservation.try_resize(encoded_size)?; - finalized_rg.push((data, reservation)); + rg_reservation.grow(encoded_size); + drop(col_reservation); + finalized_rg.push(writer.close()?); } - Ok((finalized_rg, rg_rows)) + Ok((finalized_rg, rg_reservation, rg_rows)) }) } @@ -980,6 +983,7 @@ fn spawn_parquet_parallel_serialization_task( let finalize_rg_task = spawn_rg_join_and_finalize_task( column_writer_handles, max_row_group_rows, + &pool, ); serialize_tx.send(finalize_rg_task).await.map_err(|_| { @@ -1005,8 +1009,11 @@ fn spawn_parquet_parallel_serialization_task( drop(col_array_channels); // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows if current_rg_rows > 0 { - let finalize_rg_task = - spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows); + let finalize_rg_task = spawn_rg_join_and_finalize_task( + column_writer_handles, + current_rg_rows, + &pool, + ); serialize_tx.send(finalize_rg_task).await.map_err(|_| { DataFusionError::Internal( @@ -1041,31 +1048,26 @@ async fn concatenate_parallel_row_groups( )?; while let Some(task) = serialize_rx.recv().await { - let mut rg_reservation = - MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(&pool); - let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; - let (serialized_columns, _cnt) = result?; - for (chunk, col_reservation) in serialized_columns { + let (serialized_columns, mut rg_reservation, _cnt) = result?; + for chunk in serialized_columns { chunk.append_to_row_group(&mut rg_out)?; - rg_reservation.grow(col_reservation.size()); - drop(col_reservation); + rg_reservation.free(); let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + file_reservation.try_resize(buff_to_flush.len())?; + if buff_to_flush.len() > BUFFER_FLUSH_BYTES { object_store_writer .write_all(buff_to_flush.as_slice()) .await?; rg_reservation.shrink(buff_to_flush.len()); buff_to_flush.clear(); + file_reservation.try_resize(buff_to_flush.len())?; // will set to zero } } rg_out.close()?; - - // bytes remaining. Could be unflushed data, or rg metadata passed to the file writer on rg_out.close() - let remaining_bytes = rg_reservation.free(); - file_reservation.grow(remaining_bytes); } let file_metadata = parquet_writer.close()?; From eaa5925c2b250f1f2a95fbb3d3797e06e5e00444 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 9 Jul 2024 16:19:42 -0700 Subject: [PATCH 08/10] test(11344): add memory_limit enforcement test for parquet sink --- datafusion/core/tests/memory_limit/mod.rs | 25 +++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index f61ee5d9ab984..aff6d5b737e15 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -323,6 +323,31 @@ async fn oom_recursive_cte() { .await } +#[tokio::test] +async fn oom_parquet_sink() { + let file = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .unwrap(); + + TestCase::new() + .with_query(format!( + " + COPY (select * from t) + TO '{}' + STORED AS PARQUET OPTIONS (compression 'uncompressed'); + ", + file.path().to_string_lossy() + )) + .with_expected_errors(vec![ + // TODO: update error handling in ParquetSink + "Unable to send array to writer!", + ]) + .with_memory_limit(200_000) + .run() + .await +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] From 6ce964edd5847c679148afe7adddc7c71179669b Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 10 Jul 2024 08:46:46 -0700 Subject: [PATCH 09/10] chore: cleanup to remove unnecessary reservation management steps --- datafusion/core/src/datasource/file_format/parquet.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 557b0d5f97c5b..694c949285374 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -762,7 +762,6 @@ impl DataSink for ParquetSink { .close() .await .map_err(DataFusionError::ParquetError)?; - drop(reservation); Ok((path, file_metadata)) }); } else { @@ -921,10 +920,9 @@ fn spawn_rg_join_and_finalize_task( let num_cols = column_writer_tasks.len(); let mut finalized_rg = Vec::with_capacity(num_cols); for task in column_writer_tasks.into_iter() { - let (writer, col_reservation) = task.join_unwind().await?; + let (writer, _col_reservation) = task.join_unwind().await?; let encoded_size = writer.get_estimated_total_bytes(); rg_reservation.grow(encoded_size); - drop(col_reservation); finalized_rg.push(writer.close()?); } @@ -1062,7 +1060,6 @@ async fn concatenate_parallel_row_groups( object_store_writer .write_all(buff_to_flush.as_slice()) .await?; - rg_reservation.shrink(buff_to_flush.len()); buff_to_flush.clear(); file_reservation.try_resize(buff_to_flush.len())?; // will set to zero } From fcf1476eec30b0ae6b58992e6a61aeb22a8b7ff9 Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 10 Jul 2024 09:17:49 -0700 Subject: [PATCH 10/10] fix: fix CI test failure due to file extension rename --- datafusion/core/tests/memory_limit/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index aff6d5b737e15..f7402357d1c76 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -31,6 +31,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use futures::StreamExt; use std::any::Any; use std::sync::{Arc, OnceLock}; +use tokio::fs::File; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::{MemTable, TableProvider}; @@ -325,10 +326,9 @@ async fn oom_recursive_cte() { #[tokio::test] async fn oom_parquet_sink() { - let file = tempfile::Builder::new() - .suffix(".parquet") - .tempfile() - .unwrap(); + let dir = tempfile::tempdir().unwrap(); + let path = dir.into_path().join("test.parquet"); + let _ = File::create(path.clone()).await.unwrap(); TestCase::new() .with_query(format!( @@ -337,7 +337,7 @@ async fn oom_parquet_sink() { TO '{}' STORED AS PARQUET OPTIONS (compression 'uncompressed'); ", - file.path().to_string_lossy() + path.to_string_lossy() )) .with_expected_errors(vec![ // TODO: update error handling in ParquetSink