Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ parquet = { path = "../parquet", version = "4.0.0-SNAPSHOT", features = ["arrow"
sqlparser = "0.8.0"
clap = "2.33"
rustyline = {version = "7.0", optional = true}
crossbeam = "0.8"
paste = "^1.0"
num_cpus = "1.13.0"
chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
pin-project-lite= "^0.2.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio-stream = "0.1"
log = "^0.4"
md-5 = "^0.9.1"
sha2 = "^0.9.1"
Expand Down
50 changes: 24 additions & 26 deletions rust/datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use super::{
planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream,
};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, Partitioning};
use crate::physical_plan::{common, ExecutionPlan, Partitioning};
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
Expand All @@ -55,14 +54,17 @@ use parquet::file::{
statistics::Statistics as ParquetStatistics,
};

use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use tokio::task;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task,
};
use tokio_stream::wrappers::ReceiverStream;

use crate::datasource::datasource::Statistics;
use async_trait::async_trait;
use futures::stream::Stream;
use futures::stream::{Stream, StreamExt};

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -773,9 +775,9 @@ impl ExecutionPlan for ParquetExec {
// because the parquet implementation is not thread-safe, it is necessary to execute
// on a thread and communicate with channels
let (response_tx, response_rx): (
Sender<Option<ArrowResult<RecordBatch>>>,
Receiver<Option<ArrowResult<RecordBatch>>>,
) = bounded(2);
Sender<ArrowResult<RecordBatch>>,
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);

let filenames = self.partitions[partition].filenames.clone();
let projection = self.projection.clone();
Expand All @@ -796,17 +798,18 @@ impl ExecutionPlan for ParquetExec {

Ok(Box::pin(ParquetStream {
schema: self.schema.clone(),
response_rx,
inner: ReceiverStream::new(response_rx),
}))
}
}

fn send_result(
response_tx: &Sender<Option<ArrowResult<RecordBatch>>>,
result: Option<ArrowResult<RecordBatch>>,
response_tx: &Sender<ArrowResult<RecordBatch>>,
result: ArrowResult<RecordBatch>,
) -> Result<()> {
// Note this function is running on its own blockng tokio thread so blocking here is ok.
response_tx
.send(result)
.blocking_send(result)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Ok(())
}
Expand All @@ -816,7 +819,7 @@ fn read_files(
projection: &[usize],
predicate_builder: &Option<RowGroupPredicateBuilder>,
batch_size: usize,
response_tx: Sender<Option<ArrowResult<RecordBatch>>>,
response_tx: Sender<ArrowResult<RecordBatch>>,
) -> Result<()> {
for filename in filenames {
let file = File::open(&filename)?;
Expand All @@ -833,7 +836,7 @@ fn read_files(
match batch_reader.next() {
Some(Ok(batch)) => {
//println!("ParquetExec got new batch from {}", filename);
send_result(&response_tx, Some(Ok(batch)))?
send_result(&response_tx, Ok(batch))?
}
None => {
break;
Expand All @@ -847,7 +850,7 @@ fn read_files(
// send error to operator
send_result(
&response_tx,
Some(Err(ArrowError::ParquetError(err_msg.clone()))),
Err(ArrowError::ParquetError(err_msg.clone())),
)?;
// terminate thread with error
return Err(DataFusionError::Execution(err_msg));
Expand All @@ -856,9 +859,8 @@ fn read_files(
}
}

// finished reading files
send_result(&response_tx, None)?;

// finished reading files (dropping response_tx will close
// channel)
Ok(())
}

Expand All @@ -872,21 +874,17 @@ fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> {

struct ParquetStream {
schema: SchemaRef,
response_rx: Receiver<Option<ArrowResult<RecordBatch>>>,
inner: ReceiverStream<ArrowResult<RecordBatch>>,
}

impl Stream for ParquetStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.response_rx.recv() {
Ok(batch) => Poll::Ready(batch),
// RecvError means receiver has exited and closed the channel
Err(RecvError) => Poll::Ready(None),
}
self.inner.poll_next_unpin(cx)
}
}

Expand Down
95 changes: 66 additions & 29 deletions rust/datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,24 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, vec};
use std::{any::Any, collections::HashMap, vec};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{ExecutionPlan, Partitioning};
use arrow::record_batch::RecordBatch;
use arrow::{array::Array, error::Result as ArrowResult};
use arrow::{compute::take, datatypes::SchemaRef};
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{hash_join::create_hashes, RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;

use crossbeam::channel::{unbounded, Receiver, Sender};
use futures::stream::Stream;
use futures::StreamExt;
use tokio::sync::Mutex;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex,
};
use tokio::task::JoinHandle;

type MaybeBatch = Option<ArrowResult<RecordBatch>>;
Expand All @@ -48,9 +51,13 @@ pub struct RepartitionExec {
input: Arc<dyn ExecutionPlan>,
/// Partitioning scheme to use
partitioning: Partitioning,
/// Channels for sending batches from input partitions to output partitions
/// there is one entry in this Vec for each output partition
channels: Arc<Mutex<Vec<(Sender<MaybeBatch>, Receiver<MaybeBatch>)>>>,
/// Channels for sending batches from input partitions to output partitions.
/// Key is the partition number
channels: Arc<
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note @Dandandan asked about using a HashMap vs some other structure. It is a HashMap for reasons explained by @edrevo here: edrevo@97c256c#r47626396

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't follow the answer completely; why do we need to remove it?
I think UnboundedSender is cheap to clone and cheap to keep in memory for the duration of RepartitionStream?

But maybe I would need to play with the code more to see why it is needed per se.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ok if it's needed, but I just wasn't sure 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan

The use is here: https://github.com/apache/arrow/pull/9605/files#diff-b9b79e3b35bc8bfb43040ada3a4382bd0a0017ca1b1e8135be8fb310ff095674R229

Basically this code sets up all input and outputs channels for all of the partitions and then hands out one receiver at a time in some arbitrary order (depending on the partition argument).

UnboundedReceiver https://docs.rs/tokio/1.2.0/tokio/sync/mpsc/struct.UnboundedReceiver.html doesn't implement Clone (as it is multiple producer single consumer)

I suspect with some more thought a different structure could be used, but I couldn't convince myself it was a valuable use of time.

Mutex<
HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>,
>,
>,
}

impl RepartitionExec {
Expand Down Expand Up @@ -110,23 +117,28 @@ impl ExecutionPlan for RepartitionExec {
// if this is the first partition to be invoked then we need to set up initial state
if channels.is_empty() {
// create one channel per *output* partition
for _ in 0..num_output_partitions {
for partition in 0..num_output_partitions {
// Note that this operator uses unbounded channels to avoid deadlocks because
// the output partitions can be read in any order and this could cause input
// partitions to be blocked when sending data to output receivers that are not
// partitions to be blocked when sending data to output UnboundedReceivers that are not
// being read yet. This may cause high memory usage if the next operator is
// reading output partitions in order rather than concurrently. One workaround
// for this would be to add spill-to-disk capabilities.
let (sender, receiver) = unbounded::<Option<ArrowResult<RecordBatch>>>();
channels.push((sender, receiver));
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<
Option<ArrowResult<RecordBatch>>,
>();
channels.insert(partition, (sender, receiver));
}
let random = ahash::RandomState::new();

// launch one async task per *input* partition
for i in 0..num_input_partitions {
let random_state = random.clone();
let input = self.input.clone();
let mut channels = channels.clone();
let mut txs: HashMap<_, _> = channels
.iter()
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
let partitioning = self.partitioning.clone();
let _: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut stream = input.execute(i).await?;
Expand All @@ -135,7 +147,7 @@ impl ExecutionPlan for RepartitionExec {
match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let output_partition = counter % num_output_partitions;
let tx = &mut channels[output_partition].0;
let tx = txs.get_mut(&output_partition).unwrap();
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
Expand Down Expand Up @@ -180,7 +192,7 @@ impl ExecutionPlan for RepartitionExec {
input_batch.schema(),
columns,
);
let tx = &mut channels[num_output_partition].0;
let tx = txs.get_mut(&num_output_partition).unwrap();
tx.send(Some(output_batch)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
Expand All @@ -199,14 +211,12 @@ impl ExecutionPlan for RepartitionExec {
}

// notify each output partition that this input partition has no more data
for channel in channels.iter_mut().take(num_output_partitions) {
let tx = &mut channel.0;
for (_, tx) in txs {
tx.send(None)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
}
Ok(())
});
tokio::task::yield_now().await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the workaround added in #9580 and it is now removed in favor of what we think is the fix of the root cause

}
}

Expand All @@ -216,7 +226,7 @@ impl ExecutionPlan for RepartitionExec {
num_input_partitions,
num_input_partitions_processed: 0,
schema: self.input.schema(),
input: channels[partition].1.clone(),
input: UnboundedReceiverStream::new(channels.remove(&partition).unwrap().1),
}))
}
}
Expand All @@ -230,7 +240,7 @@ impl RepartitionExec {
Ok(RepartitionExec {
input,
partitioning,
channels: Arc::new(Mutex::new(vec![])),
channels: Arc::new(Mutex::new(HashMap::new())),
})
}
}
Expand All @@ -243,7 +253,7 @@ struct RepartitionStream {
/// Schema
schema: SchemaRef,
/// channel containing the repartitioned batches
input: Receiver<Option<ArrowResult<RecordBatch>>>,
input: UnboundedReceiverStream<Option<ArrowResult<RecordBatch>>>,
}

impl Stream for RepartitionStream {
Expand All @@ -253,10 +263,9 @@ impl Stream for RepartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.input.recv() {
Ok(Some(batch)) => Poll::Ready(Some(batch)),
// End of results from one input partition
Ok(None) => {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Some(v))) => Poll::Ready(Some(v)),
Poll::Ready(Some(None)) => {
self.num_input_partitions_processed += 1;
if self.num_input_partitions == self.num_input_partitions_processed {
// all input partitions have finished sending batches
Expand All @@ -266,8 +275,8 @@ impl Stream for RepartitionStream {
self.poll_next(cx)
}
}
// RecvError means receiver has exited and closed the channel
Err(_) => Poll::Ready(None),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
Expand All @@ -287,7 +296,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn one_to_many_round_robin() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand All @@ -307,7 +316,7 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn many_to_one_round_robin() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand All @@ -324,7 +333,7 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn many_to_many_round_robin() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand All @@ -345,7 +354,7 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn many_to_many_hash_partition() -> Result<()> {
// define input partitions
let schema = test_schema();
Expand Down Expand Up @@ -415,4 +424,32 @@ mod tests {
}
Ok(output_partitions)
}

#[tokio::test]
async fn many_to_many_round_robin_within_tokio_task() -> Result<()> {
let join_handle: JoinHandle<Result<Vec<Vec<RecordBatch>>>> =
tokio::spawn(async move {
// define input partitions
let schema = test_schema();
let partition = create_vec_batches(&schema, 50);
let partitions =
vec![partition.clone(), partition.clone(), partition.clone()];

// repartition from 3 input to 5 output
repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await
});

let output_partitions = join_handle
.await
.map_err(|e| DataFusionError::Internal(e.to_string()))??;

assert_eq!(5, output_partitions.len());
assert_eq!(30, output_partitions[0].len());
assert_eq!(30, output_partitions[1].len());
assert_eq!(30, output_partitions[2].len());
assert_eq!(30, output_partitions[3].len());
assert_eq!(30, output_partitions[4].len());

Ok(())
}
}