Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 78 additions & 105 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Parquet format abstractions

use parquet::arrow::arrow_writer::ArrowRowGroupWriter;
use parquet::column::writer::ColumnCloseResult;
use parquet::file::writer::SerializedFileWriter;
use rand::distributions::DistString;
Expand All @@ -26,7 +27,7 @@ use std::fmt::Debug;
use std::io::Write;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc;
use tokio::task::{JoinHandle, JoinSet};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand All @@ -40,7 +41,9 @@ use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
Expand All @@ -60,6 +63,8 @@ use crate::datasource::physical_plan::{
FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter,
};

use parquet::arrow::arrow_writer::ArrowColumnChunk;

use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
Expand Down Expand Up @@ -828,113 +833,84 @@ impl DataSink for ParquetSink {
}
}

/// This is the return type when joining subtasks which are serializing parquet files
/// into memory buffers. The first part of the tuple is the parquet bytes and the
/// second is how many rows were written into the file.
type ParquetFileSerializedResult = Result<(Vec<u8>, usize), DataFusionError>;
/// This is the return type of ArrowRowGroupWriter.close(), i.e. the Vec of
/// encoded columns which can be appended to a SerializedRowGroupWriter
type RBStreamSerializeResult =
Result<(Vec<(ArrowColumnChunk, ColumnCloseResult)>, usize)>;

/// Parallelizes the serialization of a single parquet file, by first serializing N
/// independent RecordBatch streams in parallel to parquet files in memory. Another
/// task then stitches these independent files back together and streams this large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut data: Vec<SendableRecordBatchStream>,
data: Vec<SendableRecordBatchStream>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
) -> Result<usize> {
let mut row_count = 0;
let parallelism = data.len();
let mut join_handles: Vec<JoinHandle<ParquetFileSerializedResult>> =
Vec::with_capacity(parallelism);
for _ in 0..parallelism {
let buffer: Vec<u8> = Vec::new();
let mut writer = parquet::arrow::arrow_writer::ArrowWriter::try_new(
buffer,
output_schema.clone(),
Some(parquet_props.clone()),
)?;
let mut data_stream = data.remove(0);
join_handles.push(tokio::spawn(async move {
let mut inner_row_count = 0;
while let Some(batch) = data_stream.next().await.transpose()? {
inner_row_count += batch.num_rows();
writer.write(&batch)?;
let (serialize_tx, mut serialize_rx) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my reading of this suggests it allows up to 100 row groups to be created in parallel, which likely results in more buffering than necessary.

Rather than formulating this as a mspc::channel it would be really neat to see it formulated as a Stream<(ArrowColumnChunk, ColumnCloseResult)>.

then, in combination with StreamExt::buffered() we could control the parallelism at the row group level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using mpsc::channel even more extensively in the new implementation (#7655). I have experimented with StreamExt::buffered() in the past, but it did not seem to leverage multiple CPU cores, whereas spawning tokio tasks communicating across a channel did.

I can revisit this though as it could simplify the code, and I may have just messed something up last time I tried it 🤔 .

mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(100);

// Create some Arc<> copies that we can move into launch_serialization task and still access later
let arc_props = Arc::new(parquet_props.clone());
let arc_props_clone = arc_props.clone();
let schema_clone = output_schema.clone();
let launch_serialization_task: JoinHandle<Result<(), DataFusionError>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter but if something goes wrong and output_single_parquet_file_parallelized returns an error, this code may well still launch tasks and try to buffer / serialize the streams.

I think this could be avoided if we put all the handles into a JoinSet so when they were dropped all the tasks would be canceled: https://docs.rs/tokio/1.32.0/tokio/task/struct.JoinSet.html

tokio::spawn(async move {
for mut stream in data {
let schema_desc = arrow_to_parquet_schema(&schema_clone)?;
let mut writer = ArrowRowGroupWriter::new(
&schema_desc,
&arc_props_clone,
&schema_clone,
)?;
serialize_tx
.send(tokio::spawn(async move {
let mut inner_row_count = 0;
while let Some(rb) = stream.next().await.transpose()? {
inner_row_count += rb.num_rows();
writer.write(&rb)?;
}
Ok((writer.close()?, inner_row_count))
}))
.await
.map_err(|_| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way the send will fail is if the receiver was dropped -- either due to early plan cancel or some other error

Thus it might make sense to ignore the error and return (with a comment about rationale) rather than returning an error

DataFusionError::Internal(
"Error sending encoded row group to concat task!".into(),
)
})?;
}
let out = writer.into_inner()?;
Ok((out, inner_row_count))
}))
}
Ok(())
});

let mut writer = None;
let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) =
tokio::sync::mpsc::unbounded_channel();
let (tx, mut rx) = endpoints;
let writer_join_handle: JoinHandle<
Result<
AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send + Unpin>>,
DataFusionError,
>,
> = tokio::task::spawn(async move {
while let Some(data) = rx.recv().await {
object_store_writer.write_all(data.as_slice()).await?;
}
Ok(object_store_writer)
});
let merged_buff = SharedBuffer::new(1048576);
for handle in join_handles {

let schema_desc = arrow_to_parquet_schema(&output_schema)?;
let mut parquet_writer = SerializedFileWriter::new(
merged_buff.clone(),
schema_desc.root_schema_ptr(),
arc_props,
)?;

while let Some(handle) = serialize_rx.recv().await {
let join_result = handle.await;
match join_result {
Ok(result) => {
let (out, num_rows) = result?;
let reader = bytes::Bytes::from(out);
row_count += num_rows;
//let reader = File::open(buffer)?;
let metadata = parquet::file::footer::parse_metadata(&reader)?;
let schema = metadata.file_metadata().schema();
writer = match writer {
Some(writer) => Some(writer),
None => Some(SerializedFileWriter::new(
merged_buff.clone(),
Arc::new(schema.clone()),
Arc::new(parquet_props.clone()),
)?),
};

match &mut writer{
Some(w) => {
// Note: cannot use .await within this loop as RowGroupMetaData is not Send
// Instead, use a non-blocking channel to send bytes to separate worker
// which will write to ObjectStore.
for rg in metadata.row_groups() {
let mut rg_out = w.next_row_group()?;
for column in rg.columns() {
let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
bloom_filter: None,
column_index: None,
offset_index: None,
};
rg_out.append_column(&reader, result)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000{
let bytes: Vec<u8> = buff_to_flush.drain(..).collect();
tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?;

}
}
rg_out.close()?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000{
let bytes: Vec<u8> = buff_to_flush.drain(..).collect();
tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?;
}
}
},
None => unreachable!("Parquet writer should always be initialized in first iteration of loop!")
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, cnt) = result?;
row_count += cnt;
for (chunk, close) in serialized_columns {
rg_out.append_column(&chunk, close)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000 {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
buff_to_flush.clear();
}
}
rg_out.close()?;
}
Err(e) => {
if e.is_panic() {
Expand All @@ -945,25 +921,22 @@ async fn output_single_parquet_file_parallelized(
}
}
}
let inner_writer = writer.unwrap().into_inner()?;

let inner_writer = parquet_writer.into_inner()?;
let final_buff = inner_writer.buffer.try_lock().unwrap();

// Explicitly drop tx to signal to rx we are done sending data
drop(tx);
object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;

let mut object_store_writer = match writer_join_handle.await {
Ok(r) => r?,
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic())
} else {
unreachable!()
}
match launch_serialization_task.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(e),
Err(_) => {
return Err(DataFusionError::Internal(
"Unknown error writing to object store".into(),
))
}
};
object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;
println!("done!");

Ok(row_count)
}
Expand Down