Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use log::debug;

use super::Statistics;
use super::metrics::{BaselineMetrics, MetricsSet};
use super::{metrics::ExecutionPlanMetricsSet, Statistics};

/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
/// vectorized processing by upstream operators.
Expand All @@ -47,6 +48,8 @@ pub struct CoalesceBatchesExec {
input: Arc<dyn ExecutionPlan>,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl CoalesceBatchesExec {
Expand All @@ -55,6 +58,7 @@ impl CoalesceBatchesExec {
Self {
input,
target_batch_size,
metrics: ExecutionPlanMetricsSet::new(),
}
}

Expand Down Expand Up @@ -115,6 +119,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
buffer: Vec::new(),
buffered_rows: 0,
is_closed: false,
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
}))
}

Expand All @@ -134,6 +139,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
self.input.statistics()
}
Expand All @@ -152,6 +161,8 @@ struct CoalesceBatchesStream {
buffered_rows: usize,
/// Whether the stream has finished returning all of its data or not
is_closed: bool,
/// Execution metrics
baseline_metrics: BaselineMetrics,
}

impl Stream for CoalesceBatchesStream {
Expand All @@ -161,6 +172,26 @@ impl Stream for CoalesceBatchesStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_next_inner(cx);
self.baseline_metrics.record_poll(poll)
}

fn size_hint(&self) -> (usize, Option<usize>) {
// we can't predict the size of incoming batches so re-use the size hint from the input
self.input.size_hint()
}
}

impl CoalesceBatchesStream {
fn poll_next_inner(
self: &mut Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
// Get a clone (uses same underlying atomic) as self gets borrowed below
let cloned_time = self.baseline_metrics.elapsed_compute().clone();
// records time on drop
let _timer = cloned_time.timer();

if self.is_closed {
return Poll::Ready(None);
}
Expand Down Expand Up @@ -221,11 +252,6 @@ impl Stream for CoalesceBatchesStream {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
// we can't predict the size of incoming batches so re-use the size hint from the input
self.input.size_hint()
}
}

impl RecordBatchStream for CoalesceBatchesStream {
Expand Down
63 changes: 54 additions & 9 deletions datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use super::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
RecordBatchStream, SendableRecordBatchStream, Statistics,
};

use async_trait::async_trait;

Expand All @@ -46,12 +49,18 @@ pub struct GlobalLimitExec {
input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
limit: usize,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl GlobalLimitExec {
/// Create a new GlobalLimitExec
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
GlobalLimitExec { input, limit }
GlobalLimitExec {
input,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
}

/// Input execution plan
Expand Down Expand Up @@ -120,8 +129,13 @@ impl ExecutionPlan for GlobalLimitExec {
));
}

let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let stream = self.input.execute(0).await?;
Ok(Box::pin(LimitStream::new(stream, self.limit)))
Ok(Box::pin(LimitStream::new(
stream,
self.limit,
baseline_metrics,
)))
}

fn fmt_as(
Expand All @@ -136,6 +150,10 @@ impl ExecutionPlan for GlobalLimitExec {
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
match input_stats {
Expand Down Expand Up @@ -165,12 +183,18 @@ pub struct LocalLimitExec {
input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
limit: usize,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl LocalLimitExec {
/// Create a new LocalLimitExec partition
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
Self { input, limit }
Self {
input,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
}

/// Input execution plan
Expand Down Expand Up @@ -219,8 +243,13 @@ impl ExecutionPlan for LocalLimitExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let stream = self.input.execute(partition).await?;
Ok(Box::pin(LimitStream::new(stream, self.limit)))
Ok(Box::pin(LimitStream::new(
stream,
self.limit,
baseline_metrics,
)))
}

fn fmt_as(
Expand All @@ -235,6 +264,10 @@ impl ExecutionPlan for LocalLimitExec {
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
match input_stats {
Expand Down Expand Up @@ -280,20 +313,29 @@ struct LimitStream {
schema: SchemaRef,
// the current number of rows which have been produced
current_len: usize,
/// Execution time metrics
baseline_metrics: BaselineMetrics,
}

impl LimitStream {
fn new(input: SendableRecordBatchStream, limit: usize) -> Self {
fn new(
input: SendableRecordBatchStream,
limit: usize,
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = input.schema();
Self {
limit,
input: Some(input),
schema,
current_len: 0,
baseline_metrics,
}
}

fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
if self.current_len == self.limit {
self.input = None; // clear input so it can be dropped early
None
Expand All @@ -316,14 +358,16 @@ impl Stream for LimitStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match &mut self.input {
let poll = match &mut self.input {
Some(input) => input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
other => other,
}),
// input has been cleared
None => Poll::Ready(None),
}
};

self.baseline_metrics.record_poll(poll)
}
}

Expand Down Expand Up @@ -394,7 +438,8 @@ mod tests {

// limit of six needs to consume the entire first record batch
// (5 rows) and 1 row from the second (1 row)
let limit_stream = LimitStream::new(Box::pin(input), 6);
let baseline_metrics = BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let limit_stream = LimitStream::new(Box::pin(input), 6, baseline_metrics);
assert_eq!(index.value(), 0);

let results = collect(Box::pin(limit_stream)).await.unwrap();
Expand Down
46 changes: 29 additions & 17 deletions datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use super::expressions::Column;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use async_trait::async_trait;

Expand All @@ -49,6 +50,8 @@ pub struct ProjectionExec {
schema: SchemaRef,
/// The input plan
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl ProjectionExec {
Expand Down Expand Up @@ -76,6 +79,7 @@ impl ProjectionExec {
expr,
schema,
input: input.clone(),
metrics: ExecutionPlanMetricsSet::new(),
})
}

Expand Down Expand Up @@ -131,6 +135,7 @@ impl ExecutionPlan for ProjectionExec {
schema: self.schema.clone(),
expr: self.expr.iter().map(|x| x.0.clone()).collect(),
input: self.input.execute(partition).await?,
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
}))
}

Expand Down Expand Up @@ -159,6 +164,10 @@ impl ExecutionPlan for ProjectionExec {
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
stats_projection(
self.input.statistics(),
Expand Down Expand Up @@ -194,27 +203,28 @@ fn stats_projection(
}
}

fn batch_project(
batch: &RecordBatch,
expressions: &[Arc<dyn PhysicalExpr>],
schema: &SchemaRef,
) -> ArrowResult<RecordBatch> {
expressions
.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
.map_or_else(
|e| Err(DataFusionError::into_arrow_external_error(e)),
|arrays| RecordBatch::try_new(schema.clone(), arrays),
)
impl ProjectionStream {
fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
self.expr
.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
.map_or_else(
|e| Err(DataFusionError::into_arrow_external_error(e)),
|arrays| RecordBatch::try_new(self.schema.clone(), arrays),
)
}
}

/// Projection iterator
struct ProjectionStream {
schema: SchemaRef,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}

impl Stream for ProjectionStream {
Expand All @@ -224,10 +234,12 @@ impl Stream for ProjectionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
let poll = self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Some(self.batch_project(&batch)),
other => other,
})
});

self.baseline_metrics.record_poll(poll)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
Loading