-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator #9043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4b96a1c
b32ec1d
cd9aac0
5b3302d
100bb99
f92ec21
e1b39b4
b9e47ac
c4668b2
bdbbfa2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,296 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! CoalesceBatchesExec combines small batches into larger batches for more efficient use of | ||
| //! vectorized processing by upstream operators. | ||
|
|
||
| use std::any::Any; | ||
| use std::pin::Pin; | ||
| use std::sync::Arc; | ||
| use std::task::{Context, Poll}; | ||
|
|
||
| use crate::error::{DataFusionError, Result}; | ||
| use crate::physical_plan::{ | ||
| ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, | ||
| }; | ||
|
|
||
| use arrow::compute::kernels::concat::concat; | ||
| use arrow::datatypes::SchemaRef; | ||
| use arrow::error::Result as ArrowResult; | ||
| use arrow::record_batch::RecordBatch; | ||
| use async_trait::async_trait; | ||
| use futures::stream::{Stream, StreamExt}; | ||
| use log::debug; | ||
|
|
||
| /// CoalesceBatchesExec combines small batches into larger batches for more efficient use of | ||
| /// vectorized processing by upstream operators. | ||
| #[derive(Debug)] | ||
| pub struct CoalesceBatchesExec { | ||
| /// The input plan | ||
| input: Arc<dyn ExecutionPlan>, | ||
| /// Minimum number of rows for coalesces batches | ||
| target_batch_size: usize, | ||
| } | ||
|
|
||
| impl CoalesceBatchesExec { | ||
| /// Create a new CoalesceBatchesExec | ||
| pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self { | ||
| Self { | ||
| input, | ||
| target_batch_size, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl ExecutionPlan for CoalesceBatchesExec { | ||
| /// Return a reference to Any that can be used for downcasting | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| /// Get the schema for this execution plan | ||
| fn schema(&self) -> SchemaRef { | ||
| // The coalesce batches operator does not make any changes to the schema of its input | ||
| self.input.schema() | ||
| } | ||
|
|
||
| fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { | ||
| vec![self.input.clone()] | ||
| } | ||
|
|
||
| /// Get the output partitioning of this plan | ||
| fn output_partitioning(&self) -> Partitioning { | ||
| // The coalesce batches operator does not make any changes to the partitioning of its input | ||
| self.input.output_partitioning() | ||
| } | ||
|
|
||
| fn with_new_children( | ||
| &self, | ||
| children: Vec<Arc<dyn ExecutionPlan>>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| match children.len() { | ||
| 1 => Ok(Arc::new(CoalesceBatchesExec::new( | ||
| children[0].clone(), | ||
| self.target_batch_size, | ||
| ))), | ||
| _ => Err(DataFusionError::Internal( | ||
| "CoalesceBatchesExec wrong number of children".to_string(), | ||
| )), | ||
| } | ||
| } | ||
|
|
||
| async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { | ||
| Ok(Box::pin(CoalesceBatchesStream { | ||
| input: self.input.execute(partition).await?, | ||
| schema: self.input.schema().clone(), | ||
| target_batch_size: self.target_batch_size.clone(), | ||
| buffer: Vec::new(), | ||
| buffered_rows: 0, | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| struct CoalesceBatchesStream { | ||
| /// The input plan | ||
| input: SendableRecordBatchStream, | ||
| /// The input schema | ||
| schema: SchemaRef, | ||
| /// Minimum number of rows for coalesces batches | ||
| target_batch_size: usize, | ||
| /// Buffered batches | ||
| buffer: Vec<RecordBatch>, | ||
| /// Buffered row count | ||
| buffered_rows: usize, | ||
| } | ||
|
|
||
| impl Stream for CoalesceBatchesStream { | ||
| type Item = ArrowResult<RecordBatch>; | ||
|
|
||
| fn poll_next( | ||
| mut self: Pin<&mut Self>, | ||
| cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Self::Item>> { | ||
| loop { | ||
| let input_batch = self.input.poll_next_unpin(cx); | ||
| match input_batch { | ||
| Poll::Ready(x) => match x { | ||
| Some(Ok(ref batch)) => { | ||
| if batch.num_rows() >= self.target_batch_size | ||
| && self.buffer.is_empty() | ||
| { | ||
| return Poll::Ready(Some(Ok(batch.clone()))); | ||
| } else if batch.num_rows() == 0 { | ||
| // discard empty batches | ||
| } else { | ||
| // add to the buffered batches | ||
| self.buffer.push(batch.clone()); | ||
| self.buffered_rows += batch.num_rows(); | ||
| // check to see if we have enough batches yet | ||
| if self.buffered_rows >= self.target_batch_size { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense too to make batches smaller or split them if they are bigger than the target batch size (e.g. for increased parallelism), or do we for now only want to grow them for now?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, we use partitioning as the unit of parallelism (which I think makes sense) and we recently added the repartition operator which can increase or decrease parallelism. I'm not sure if we will need the ability to split batches. The only use case I can think of right now would be if we had kernels that had limits on the size of batches that they could process for some reason. |
||
| // combine the batches and return | ||
| let batch = concat_batches( | ||
| &self.schema, | ||
| &self.buffer, | ||
| self.buffered_rows, | ||
| )?; | ||
| // reset buffer state | ||
| self.buffer.clear(); | ||
| self.buffered_rows = 0; | ||
| // return batch | ||
| return Poll::Ready(Some(Ok(batch))); | ||
| } | ||
| } | ||
| } | ||
| None => { | ||
| // we have reached the end of the input stream but there could still | ||
| // be buffered batches | ||
| if self.buffer.is_empty() { | ||
| return Poll::Ready(None); | ||
| } else { | ||
| // combine the batches and return | ||
| let batch = concat_batches( | ||
| &self.schema, | ||
| &self.buffer, | ||
| self.buffered_rows, | ||
| )?; | ||
| // reset buffer state | ||
| self.buffer.clear(); | ||
| self.buffered_rows = 0; | ||
| // return batch | ||
| return Poll::Ready(Some(Ok(batch))); | ||
| } | ||
| } | ||
| other => return Poll::Ready(other), | ||
| }, | ||
| Poll::Pending => return Poll::Pending, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn size_hint(&self) -> (usize, Option<usize>) { | ||
| // we can't predict the size of incoming batches so re-use the size hint from the input | ||
| self.input.size_hint() | ||
| } | ||
| } | ||
|
|
||
| impl RecordBatchStream for CoalesceBatchesStream { | ||
| fn schema(&self) -> SchemaRef { | ||
| self.schema.clone() | ||
| } | ||
| } | ||
|
|
||
| fn concat_batches( | ||
| schema: &SchemaRef, | ||
| batches: &[RecordBatch], | ||
| row_count: usize, | ||
| ) -> ArrowResult<RecordBatch> { | ||
| let mut arrays = Vec::with_capacity(schema.fields().len()); | ||
| for i in 0..schema.fields().len() { | ||
| let array = concat( | ||
| &batches | ||
| .iter() | ||
| .map(|batch| batch.column(i).as_ref()) | ||
| .collect::<Vec<_>>(), | ||
| )?; | ||
| arrays.push(array); | ||
| } | ||
| debug!( | ||
| "Combined {} batches containing {} rows", | ||
| batches.len(), | ||
| row_count | ||
| ); | ||
| RecordBatch::try_new(schema.clone(), arrays) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::physical_plan::memory::MemoryExec; | ||
| use arrow::array::UInt32Array; | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
|
|
||
| #[tokio::test(threaded_scheduler)] | ||
| async fn test_concat_batches() -> Result<()> { | ||
| let schema = test_schema(); | ||
| let partition = create_vec_batches(&schema, 10)?; | ||
| let partitions = vec![partition]; | ||
|
|
||
| let output_partitions = coalesce_batches(&schema, partitions, 20).await?; | ||
| assert_eq!(1, output_partitions.len()); | ||
|
|
||
| // input is 10 batches x 8 rows (80 rows) | ||
| // expected output is batches of at least 20 rows (except for the final batch) | ||
| let batches = &output_partitions[0]; | ||
| assert_eq!(4, batches.len()); | ||
| assert_eq!(24, batches[0].num_rows()); | ||
| assert_eq!(24, batches[1].num_rows()); | ||
| assert_eq!(24, batches[2].num_rows()); | ||
| assert_eq!(8, batches[3].num_rows()); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn test_schema() -> Arc<Schema> { | ||
| Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) | ||
| } | ||
|
|
||
| fn create_vec_batches( | ||
| schema: &Arc<Schema>, | ||
| num_batches: usize, | ||
| ) -> Result<Vec<RecordBatch>> { | ||
| let batch = create_batch(schema); | ||
| let mut vec = Vec::with_capacity(num_batches); | ||
| for _ in 0..num_batches { | ||
| vec.push(batch.clone()); | ||
| } | ||
| Ok(vec) | ||
| } | ||
|
|
||
| fn create_batch(schema: &Arc<Schema>) -> RecordBatch { | ||
| RecordBatch::try_new( | ||
| schema.clone(), | ||
| vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))], | ||
| ) | ||
| .unwrap() | ||
| } | ||
|
|
||
| async fn coalesce_batches( | ||
| schema: &SchemaRef, | ||
| input_partitions: Vec<Vec<RecordBatch>>, | ||
| target_batch_size: usize, | ||
| ) -> Result<Vec<Vec<RecordBatch>>> { | ||
| // create physical plan | ||
| let exec = MemoryExec::try_new(&input_partitions, schema.clone(), None)?; | ||
| let exec: Arc<dyn ExecutionPlan> = | ||
| Arc::new(CoalesceBatchesExec::new(Arc::new(exec), target_batch_size)); | ||
|
|
||
| // execute and collect results | ||
| let output_partition_count = exec.output_partitioning().partition_count(); | ||
| let mut output_partitions = Vec::with_capacity(output_partition_count); | ||
| for i in 0..output_partition_count { | ||
| // execute this *output* partition and collect all batches | ||
| let mut stream = exec.execute(i).await?; | ||
| let mut batches = vec![]; | ||
| while let Some(result) = stream.next().await { | ||
| batches.push(result?); | ||
| } | ||
| output_partitions.push(batches); | ||
| } | ||
| Ok(output_partitions) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,6 +104,26 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> { | |
| } | ||
| } | ||
|
|
||
| /// Execute the [ExecutionPlan] and collect the results in memory | ||
| pub async fn collect_partitioned( | ||
| plan: Arc<dyn ExecutionPlan>, | ||
| ) -> Result<Vec<Vec<RecordBatch>>> { | ||
| match plan.output_partitioning().partition_count() { | ||
| 0 => Ok(vec![]), | ||
| 1 => { | ||
| let it = plan.execute(0).await?; | ||
| Ok(vec![common::collect(it).await?]) | ||
| } | ||
| _ => { | ||
| let mut partitions = vec![]; | ||
| for i in 0..plan.output_partitioning().partition_count() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could bind this partition count to a variable and reuse it in the code? |
||
| partitions.push(common::collect(plan.execute(i).await?).await?) | ||
| } | ||
| Ok(partitions) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Partitioning schemes supported by operators. | ||
| #[derive(Debug, Clone)] | ||
| pub enum Partitioning { | ||
|
|
@@ -248,6 +268,7 @@ pub trait Accumulator: Send + Sync + Debug { | |
|
|
||
| pub mod aggregates; | ||
| pub mod array_expressions; | ||
| pub mod coalesce_batches; | ||
| pub mod common; | ||
| pub mod csv; | ||
| pub mod datetime_expressions; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the effect that it can reorder the output rows from this operator. I think that is fine, but it should probably be noted somewhere (so that when we get to optimizations related to sorting we know this operation as written will not preserve the input order)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this change the ordering within a single partition? The intent was to produce the rows in the same order they are received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops -- that was my mistake -- I didn't see the condition of
self.buffer.is_empty()-- in that case I agree that the rows remain orderedI guess I was thinking ahead to an operator that only copies data when needed rather than always. Too fancy. Sorry for the noise