Skip to content

Conversation

@devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Sep 23, 2023

This PR is Draft since it cannot compile without arrow-rs changes: apache/arrow-rs#4850

Which issue does this PR close?

Closes #7591
Closes #7589
Related to apache/arrow-rs#1718

Rationale for this change

Parallel parquet serialization process implemented in #7562 did not support all parquet metadata (indexes/bloom filters) and had no backpressure on serialization tasks. This PR aims to address these two deficiencies.

What changes are included in this PR?

  • Parallel parquet serialization tasks now use ArrowRowGroupWriter directly rather than ArrowWriter
  • Upstream arrow-rs changes filed to make ArrowRowGroupWriter public and Send so it can be used across an .await
  • Parquet serialization tasks can be throttled via a bounded channel mechanism.

Benchmarking Results

The results show the parallel parquet process in this PR is ~10% faster than the previous in addition to supporting statistics/bloom filters. Channel_limit=N means that the maximum number of parallel parquet serialization tasks running at one time is set to N. Surprisingly setting this number low can actually increase peak memory usage, which is a surprising result. Perhaps RecordBatches are accumulating in memory as some streams are being consumed but not others, more than offsetting the benefit of limiting the amount of serialized parquet data in memory.

See #7562 for benchmarking script used.

Test 1, All 16 Columns, ~3.6GB Parquet File (release build)

Execution Time (s)

Parallelism Main (single_file_output=false) Main (single_file_output=true) This PR (single_file_output=true, channel_limit=100) This PR (single_file_output=true, channel_limit=4)
1 22.48 22.53 21.04 22.17
4 12.24 14.4 12.49 12.73
8 10.79 12.37 10.7 11.03
16 10.52 12.67 10.78 10.85
32 10.91 12.07 10.31 10.25
64 10.21 12.97 12.34 11.62

Peak Memory Usage (MB)

Parallelism Main (single_file_output=false) Main (single_file_output=true) This PR (single_file_output=true, channel_limit=100) This PR (single_file_output=true, channel_limit=4)
1 1753.3 1758 1757.1 1760
4 2445.4 7104 5690.7 5684.2
8 3387 7623.1 6642 7804
16 5047.6 8262.6 8151 10437.7
32 7683.6 7672.6 9358 11657.5
64 10961.1 10370.2 10388 12898

Test 2, Subset of 3 Columns, ~895MB Parquet File (release build)

Execution Time (s)

Parallelism Main (single_file_output=false) Main (single_file_output=true) This PR (single_file_output=true, channel_limit=100) This PR (single_file_output=true, channel_limit=4)
1 3.57 3.15 3.38 3.39
4 1.78 2.37 2.53 2.3
8 1.45 2.07 2.58 2.23
16 1.54 2.09 2.06 1.71
32 1.7 2.1 2.08 1.65
64 1.89 2.72 2.76 2.07

Peak Memory Consumption (MB)

Parallelism Main (single_file_output=false) Main (single_file_output=true) This PR (single_file_output=true, channel_limit=100) This PR (single_file_output=true, channel_limit=4)
1 450.6 451.6 448.9 449.4
4 584.5 1659.1 1257.0 1284.4
8 759.4 1939.7 1225.7 1339.2
16 1045.8 2051.2 1359.8 1438.5
32 1564.6 1899.7 1445.5 1545.3
64 2318.8 1726.1 1732.0 1735.3

Are these changes tested?

Yes, by existing tests and adhoc benchmarking

Are there any user-facing changes?

No

Next Steps

So far, parquet serialization is only being parallelized in terms of RowGroups. This means we are limited in terms of parallelization based on the number of RowGroups we want in our file, which can be as low as 1 in general. Parquet files generally have a large number of columns and we could parallelize at the column level in addition to speed up more.

We could also break free of the Parallelism=RowGroupNumber limit if it were possible to concatenate (ArrowColumnChunk, ColumnCloseResult) tuples together before writing them into a RowGroup. This might not be possible efficiently, since ArrowColumnChunk's are already compressed. Aggregating column min/max statistics would be trivial, but distinct values + bloom filters would not be trivial.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool @devinjdangelo -- thank you for pushing it forward. I think it is a very nice improvement and while there might be additional improvements possible this is nice step in the right direction

let arc_props = Arc::new(parquet_props.clone());
let arc_props_clone = arc_props.clone();
let schema_clone = output_schema.clone();
let launch_serialization_task: JoinHandle<Result<(), DataFusionError>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter but if something goes wrong and output_single_parquet_file_parallelized returns an error, this code may well still launch tasks and try to buffer / serialize the streams.

I think this could be avoided if we put all the handles into a JoinSet so when they were dropped all the tasks would be canceled: https://docs.rs/tokio/1.32.0/tokio/task/struct.JoinSet.html

Ok((writer.close()?, inner_row_count))
}))
.await
.map_err(|_| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way the send will fail is if the receiver was dropped -- either due to early plan cancel or some other error

Thus it might make sense to ignore the error and return (with a comment about rationale) rather than returning an error

while let Some(batch) = data_stream.next().await.transpose()? {
inner_row_count += batch.num_rows();
writer.write(&batch)?;
let (serialize_tx, mut serialize_rx) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my reading of this suggests it allows up to 100 row groups to be created in parallel, which likely results in more buffering than necessary.

Rather than formulating this as a mspc::channel it would be really neat to see it formulated as a Stream<(ArrowColumnChunk, ColumnCloseResult)>.

then, in combination with StreamExt::buffered() we could control the parallelism at the row group level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using mpsc::channel even more extensively in the new implementation (#7655). I have experimented with StreamExt::buffered() in the past, but it did not seem to leverage multiple CPU cores, whereas spawning tokio tasks communicating across a channel did.

I can revisit this though as it could simplify the code, and I may have just messed something up last time I tried it 🤔 .

@devinjdangelo
Copy link
Contributor Author

@alamb Thank you for the review! I have significantly reworked this PR in a new PR #7655, relying primarily on column wise parallelization rather than row group wise. The new approach is much more complex in the code, but the performance advantage is huge. 20% faster and 90% lower memory overhead vs. this PR.

@alamb alamb closed this in #7655 Oct 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

2 participants