From ed55aa2d626f28b15380980286327046081c0586 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 12 Jun 2021 17:15:12 +0800 Subject: [PATCH] parallalize window function calls --- datafusion/src/physical_plan/windows.rs | 30 ++++++++++++++++++++----- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 2f539057c82f4..2cbb5687bfb08 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -46,6 +46,7 @@ use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use tokio::task; /// Window execution plan #[derive(Debug)] @@ -467,14 +468,29 @@ pin_project! { } /// Compute the window aggregate columns -fn compute_window_aggregates( +async fn compute_window_aggregates( window_expr: Vec>, - batch: &RecordBatch, + batch: Arc, ) -> Result> { - window_expr + let handles = window_expr .iter() - .map(|window_expr| window_expr.evaluate(batch)) - .collect() + .map(|window_expr| { + let batch = batch.clone(); + let window_expr = window_expr.clone(); + task::spawn_blocking(move || window_expr.evaluate(&batch)) + }) + .collect::>(); + let mut result = vec![]; + for handle in handles { + let arr = handle.await.map_err(|e| { + DataFusionError::Execution(format!( + "Failed to join window aggregation handle {}", + e + )) + })??; + result.push(arr); + } + Ok(result) } impl WindowAggStream { @@ -510,8 +526,10 @@ impl WindowAggStream { .map_err(DataFusionError::into_arrow_external_error)?; let batch = common::combine_batches(&batches, input_schema.clone())?; if let Some(batch) = batch { + let batch = Arc::new(batch); // calculate window cols - let mut columns = compute_window_aggregates(window_expr, &batch) + let mut columns = compute_window_aggregates(window_expr, batch.clone()) + .await .map_err(DataFusionError::into_arrow_external_error)?; // combine with the original cols // note the setup of window aggregates is that they newly calculated window