From 4e52c73e1cdece77978d38ae480fceceddbfa13c Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sat, 23 Sep 2023 07:45:27 -0400 Subject: [PATCH] use ArrowRowGroupWriter directly --- .../src/datasource/file_format/parquet.rs | 183 ++++++++---------- 1 file changed, 78 insertions(+), 105 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index d7234a5375f8b..01394fdb79e5b 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,6 +17,7 @@ //! Parquet format abstractions +use parquet::arrow::arrow_writer::ArrowRowGroupWriter; use parquet::column::writer::ColumnCloseResult; use parquet::file::writer::SerializedFileWriter; use rand::distributions::DistString; @@ -26,7 +27,7 @@ use std::fmt::Debug; use std::io::Write; use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc; use tokio::task::{JoinHandle, JoinSet}; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -40,7 +41,9 @@ use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; +use parquet::arrow::{ + arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, +}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; @@ -60,6 +63,8 @@ use crate::datasource::physical_plan::{ FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, }; +use parquet::arrow::arrow_writer::ArrowColumnChunk; + use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; @@ -828,10 +833,10 @@ impl DataSink for ParquetSink { } } -/// This is the return type when joining subtasks which are serializing parquet files -/// into memory buffers. The first part of the tuple is the parquet bytes and the -/// second is how many rows were written into the file. -type ParquetFileSerializedResult = Result<(Vec, usize), DataFusionError>; +/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of +/// encoded columns which can be appended to a SerializedRowGroupWriter +type RBStreamSerializeResult = + Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>; /// Parallelizes the serialization of a single parquet file, by first serializing N /// independent RecordBatch streams in parallel to parquet files in memory. Another @@ -839,102 +844,73 @@ type ParquetFileSerializedResult = Result<(Vec, usize), DataFusionError>; /// single parquet file to an ObjectStore in multiple parts. async fn output_single_parquet_file_parallelized( mut object_store_writer: AbortableWrite>, - mut data: Vec, + data: Vec, output_schema: Arc, parquet_props: &WriterProperties, ) -> Result { let mut row_count = 0; - let parallelism = data.len(); - let mut join_handles: Vec> = - Vec::with_capacity(parallelism); - for _ in 0..parallelism { - let buffer: Vec = Vec::new(); - let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new( - buffer, - output_schema.clone(), - Some(parquet_props.clone()), - )?; - let mut data_stream = data.remove(0); - join_handles.push(tokio::spawn(async move { - let mut inner_row_count = 0; - while let Some(batch) = data_stream.next().await.transpose()? { - inner_row_count += batch.num_rows(); - writer.write(&batch)?; + let (serialize_tx, mut serialize_rx) = + mpsc::channel::>(100); + + // Create some Arc<> copies that we can move into launch_serialization task and still access later + let arc_props = Arc::new(parquet_props.clone()); + let arc_props_clone = arc_props.clone(); + let schema_clone = output_schema.clone(); + let launch_serialization_task: JoinHandle> = + tokio::spawn(async move { + for mut stream in data { + let schema_desc = arrow_to_parquet_schema(&schema_clone)?; + let mut writer = ArrowRowGroupWriter::new( + &schema_desc, + &arc_props_clone, + &schema_clone, + )?; + serialize_tx + .send(tokio::spawn(async move { + let mut inner_row_count = 0; + while let Some(rb) = stream.next().await.transpose()? { + inner_row_count += rb.num_rows(); + writer.write(&rb)?; + } + Ok((writer.close()?, inner_row_count)) + })) + .await + .map_err(|_| { + DataFusionError::Internal( + "Error sending encoded row group to concat task!".into(), + ) + })?; } - let out = writer.into_inner()?; - Ok((out, inner_row_count)) - })) - } + Ok(()) + }); - let mut writer = None; - let endpoints: (UnboundedSender>, UnboundedReceiver>) = - tokio::sync::mpsc::unbounded_channel(); - let (tx, mut rx) = endpoints; - let writer_join_handle: JoinHandle< - Result< - AbortableWrite>, - DataFusionError, - >, - > = tokio::task::spawn(async move { - while let Some(data) = rx.recv().await { - object_store_writer.write_all(data.as_slice()).await?; - } - Ok(object_store_writer) - }); let merged_buff = SharedBuffer::new(1048576); - for handle in join_handles { + + let schema_desc = arrow_to_parquet_schema(&output_schema)?; + let mut parquet_writer = SerializedFileWriter::new( + merged_buff.clone(), + schema_desc.root_schema_ptr(), + arc_props, + )?; + + while let Some(handle) = serialize_rx.recv().await { let join_result = handle.await; match join_result { Ok(result) => { - let (out, num_rows) = result?; - let reader = bytes::Bytes::from(out); - row_count += num_rows; - //let reader = File::open(buffer)?; - let metadata = parquet::file::footer::parse_metadata(&reader)?; - let schema = metadata.file_metadata().schema(); - writer = match writer { - Some(writer) => Some(writer), - None => Some(SerializedFileWriter::new( - merged_buff.clone(), - Arc::new(schema.clone()), - Arc::new(parquet_props.clone()), - )?), - }; - - match &mut writer{ - Some(w) => { - // Note: cannot use .await within this loop as RowGroupMetaData is not Send - // Instead, use a non-blocking channel to send bytes to separate worker - // which will write to ObjectStore. - for rg in metadata.row_groups() { - let mut rg_out = w.next_row_group()?; - for column in rg.columns() { - let result = ColumnCloseResult { - bytes_written: column.compressed_size() as _, - rows_written: rg.num_rows() as _, - metadata: column.clone(), - bloom_filter: None, - column_index: None, - offset_index: None, - }; - rg_out.append_column(&reader, result)?; - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - if buff_to_flush.len() > 1024000{ - let bytes: Vec = buff_to_flush.drain(..).collect(); - tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?; - - } - } - rg_out.close()?; - let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); - if buff_to_flush.len() > 1024000{ - let bytes: Vec = buff_to_flush.drain(..).collect(); - tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?; - } - } - }, - None => unreachable!("Parquet writer should always be initialized in first iteration of loop!") + let mut rg_out = parquet_writer.next_row_group()?; + let (serialized_columns, cnt) = result?; + row_count += cnt; + for (chunk, close) in serialized_columns { + rg_out.append_column(&chunk, close)?; + let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap(); + if buff_to_flush.len() > 1024000 { + object_store_writer + .write_all(buff_to_flush.as_slice()) + .await?; + buff_to_flush.clear(); + } } + rg_out.close()?; } Err(e) => { if e.is_panic() { @@ -945,25 +921,22 @@ async fn output_single_parquet_file_parallelized( } } } - let inner_writer = writer.unwrap().into_inner()?; + + let inner_writer = parquet_writer.into_inner()?; let final_buff = inner_writer.buffer.try_lock().unwrap(); - // Explicitly drop tx to signal to rx we are done sending data - drop(tx); + object_store_writer.write_all(final_buff.as_slice()).await?; + object_store_writer.shutdown().await?; - let mut object_store_writer = match writer_join_handle.await { - Ok(r) => r?, - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()) - } else { - unreachable!() - } + match launch_serialization_task.await { + Ok(Ok(_)) => (), + Ok(Err(e)) => return Err(e), + Err(_) => { + return Err(DataFusionError::Internal( + "Unknown error writing to object store".into(), + )) } }; - object_store_writer.write_all(final_buff.as_slice()).await?; - object_store_writer.shutdown().await?; - println!("done!"); Ok(row_count) }