From 27fbfcf82e542f311896fd18087dbcb1a664bbc6 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 19 Feb 2022 17:06:15 +0800 Subject: [PATCH] move built in window expr and partition evaluator --- datafusion-physical-expr/src/lib.rs | 3 +- .../window/built_in_window_function_expr.rs | 53 +++++++++++ datafusion-physical-expr/src/window/mod.rs | 25 ++++++ .../src/window/partition_evaluator.rs | 84 ++++++++++++++++++ .../src/{ => window}/window_expr.rs | 0 .../physical_plan/expressions/cume_dist.rs | 5 +- .../src/physical_plan/expressions/lead_lag.rs | 5 +- .../physical_plan/expressions/nth_value.rs | 5 +- .../src/physical_plan/expressions/rank.rs | 5 +- .../physical_plan/expressions/row_number.rs | 5 +- datafusion/src/physical_plan/mod.rs | 3 +- .../src/physical_plan/window_functions.rs | 88 +------------------ .../src/physical_plan/windows/aggregate.rs | 2 +- .../src/physical_plan/windows/built_in.rs | 6 +- datafusion/src/physical_plan/windows/mod.rs | 24 +---- 15 files changed, 187 insertions(+), 126 deletions(-) create mode 100644 datafusion-physical-expr/src/window/built_in_window_function_expr.rs create mode 100644 datafusion-physical-expr/src/window/mod.rs create mode 100644 datafusion-physical-expr/src/window/partition_evaluator.rs rename datafusion-physical-expr/src/{ => window}/window_expr.rs (100%) diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs index 63edaa5ac94b0..6beffd18d015f 100644 --- a/datafusion-physical-expr/src/lib.rs +++ b/datafusion-physical-expr/src/lib.rs @@ -18,9 +18,8 @@ mod aggregate_expr; mod physical_expr; mod sort_expr; -mod window_expr; +pub mod window; pub use aggregate_expr::AggregateExpr; pub use physical_expr::PhysicalExpr; pub use sort_expr::PhysicalSortExpr; -pub use window_expr::WindowExpr; diff --git a/datafusion-physical-expr/src/window/built_in_window_function_expr.rs b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs new file mode 100644 index 0000000000000..43e1272bce18c --- /dev/null +++ b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::partition_evaluator::PartitionEvaluator; +use crate::PhysicalExpr; +use arrow::datatypes::Field; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use std::any::Any; +use std::sync::Arc; + +/// A window expression that is a built-in window function. +/// +/// Note that unlike aggregation based window functions, built-in window functions normally ignore +/// window frame spec, with the exception of first_value, last_value, and nth_value. +pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { + /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// the field of the final result of this aggregation. + fn field(&self) -> Result; + + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + + /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "BuiltInWindowFunctionExpr: default name" + } + + /// Create built-in window evaluator with a batch + fn create_evaluator( + &self, + batch: &RecordBatch, + ) -> Result>; +} diff --git a/datafusion-physical-expr/src/window/mod.rs b/datafusion-physical-expr/src/window/mod.rs new file mode 100644 index 0000000000000..48a6e8b4f589d --- /dev/null +++ b/datafusion-physical-expr/src/window/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod built_in_window_function_expr; +mod partition_evaluator; +mod window_expr; + +pub use built_in_window_function_expr::BuiltInWindowFunctionExpr; +pub use partition_evaluator::find_ranges_in_range; +pub use partition_evaluator::PartitionEvaluator; +pub use window_expr::WindowExpr; diff --git a/datafusion-physical-expr/src/window/partition_evaluator.rs b/datafusion-physical-expr/src/window/partition_evaluator.rs new file mode 100644 index 0000000000000..9afdf3860d0ec --- /dev/null +++ b/datafusion-physical-expr/src/window/partition_evaluator.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::ArrayRef; +use datafusion_common::DataFusionError; +use datafusion_common::Result; +use std::ops::Range; + +/// Given a partition range, and the full list of sort partition points, given that the sort +/// partition points are sorted using [partition columns..., order columns...], the split +/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted +/// on finer columns), so this will use binary search to find ranges that are within the +/// partition range and return the valid slice. +pub fn find_ranges_in_range<'a>( + partition_range: &Range, + sort_partition_points: &'a [Range], +) -> &'a [Range] { + let start_idx = sort_partition_points + .partition_point(|sort_range| sort_range.start < partition_range.start); + let end_idx = start_idx + + sort_partition_points[start_idx..] + .partition_point(|sort_range| sort_range.end <= partition_range.end); + &sort_partition_points[start_idx..end_idx] +} + +/// Partition evaluator +pub trait PartitionEvaluator { + /// Whether the evaluator should be evaluated with rank + fn include_rank(&self) -> bool { + false + } + + /// evaluate the partition evaluator against the partitions + fn evaluate(&self, partition_points: Vec>) -> Result> { + partition_points + .into_iter() + .map(|partition| self.evaluate_partition(partition)) + .collect() + } + + /// evaluate the partition evaluator against the partitions with rank information + fn evaluate_with_rank( + &self, + partition_points: Vec>, + sort_partition_points: Vec>, + ) -> Result> { + partition_points + .into_iter() + .map(|partition| { + let ranks_in_partition = + find_ranges_in_range(&partition, &sort_partition_points); + self.evaluate_partition_with_rank(partition, ranks_in_partition) + }) + .collect() + } + + /// evaluate the partition evaluator against the partition + fn evaluate_partition(&self, _partition: Range) -> Result; + + /// evaluate the partition evaluator against the partition but with rank + fn evaluate_partition_with_rank( + &self, + _partition: Range, + _ranks_in_partition: &[Range], + ) -> Result { + Err(DataFusionError::NotImplemented( + "evaluate_partition_with_rank is not implemented by default".into(), + )) + } +} diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window/window_expr.rs similarity index 100% rename from datafusion-physical-expr/src/window_expr.rs rename to datafusion-physical-expr/src/window/window_expr.rs diff --git a/datafusion/src/physical_plan/expressions/cume_dist.rs b/datafusion/src/physical_plan/expressions/cume_dist.rs index 7b0a45ac17b89..6f24d8226d502 100644 --- a/datafusion/src/physical_plan/expressions/cume_dist.rs +++ b/datafusion/src/physical_plan/expressions/cume_dist.rs @@ -19,12 +19,13 @@ //! at runtime during query execution use crate::error::Result; -use crate::physical_plan::window_functions::PartitionEvaluator; -use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; +use crate::physical_plan::PhysicalExpr; use arrow::array::ArrayRef; use arrow::array::Float64Array; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; +use datafusion_physical_expr::window::PartitionEvaluator; use std::any::Any; use std::iter; use std::ops::Range; diff --git a/datafusion/src/physical_plan/expressions/lead_lag.rs b/datafusion/src/physical_plan/expressions/lead_lag.rs index 52f80c30b5305..fef5bad2a926d 100644 --- a/datafusion/src/physical_plan/expressions/lead_lag.rs +++ b/datafusion/src/physical_plan/expressions/lead_lag.rs @@ -19,13 +19,14 @@ //! at runtime during query execution use crate::error::{DataFusionError, Result}; -use crate::physical_plan::window_functions::PartitionEvaluator; -use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; +use crate::physical_plan::PhysicalExpr; use crate::scalar::ScalarValue; use arrow::array::ArrayRef; use arrow::compute::cast; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; +use datafusion_physical_expr::window::PartitionEvaluator; use std::any::Any; use std::ops::Neg; use std::ops::Range; diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 64a526a9c2f98..895a8b95a045a 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -19,13 +19,14 @@ //! that can evaluated at runtime during query execution use crate::error::{DataFusionError, Result}; -use crate::physical_plan::window_functions::PartitionEvaluator; -use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; +use crate::physical_plan::PhysicalExpr; use crate::scalar::ScalarValue; use arrow::array::{new_null_array, ArrayRef}; use arrow::compute::kernels::window::shift; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; +use datafusion_physical_expr::window::PartitionEvaluator; use std::any::Any; use std::iter; use std::ops::Range; diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs index 4facb8b555895..66697a052b3e2 100644 --- a/datafusion/src/physical_plan/expressions/rank.rs +++ b/datafusion/src/physical_plan/expressions/rank.rs @@ -19,12 +19,13 @@ //! at runtime during query execution use crate::error::Result; -use crate::physical_plan::window_functions::PartitionEvaluator; -use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; +use crate::physical_plan::PhysicalExpr; use arrow::array::ArrayRef; use arrow::array::{Float64Array, UInt64Array}; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; +use datafusion_physical_expr::window::PartitionEvaluator; use std::any::Any; use std::iter; use std::ops::Range; diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index be87a350d9a3c..4c349c141fd87 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -18,11 +18,12 @@ //! Defines physical expression for `row_number` that can evaluated at runtime during query execution use crate::error::Result; -use crate::physical_plan::window_functions::PartitionEvaluator; -use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; +use crate::physical_plan::PhysicalExpr; use arrow::array::{ArrayRef, UInt64Array}; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; +use datafusion_physical_expr::window::PartitionEvaluator; use std::any::Any; use std::ops::Range; use std::sync::Arc; diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 9949982d0f278..32f2d5b53087a 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -469,7 +469,8 @@ pub enum Distribution { HashPartitioned(Vec>), } -pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr}; +pub use datafusion_physical_expr::window::WindowExpr; +pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; /// Applies an optional projection to a [`SchemaRef`], returning the /// projected schema diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index b5445ae248424..b667bfe86fc77 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -20,20 +20,11 @@ //! //! see also -use crate::error::{DataFusionError, Result}; +use crate::error::Result; use crate::physical_plan::functions::{TypeSignature, Volatility}; -use crate::physical_plan::{ - aggregates, functions::Signature, type_coercion::data_types, - windows::find_ranges_in_range, PhysicalExpr, -}; -use arrow::array::ArrayRef; +use crate::physical_plan::{aggregates, functions::Signature, type_coercion::data_types}; use arrow::datatypes::DataType; -use arrow::datatypes::Field; -use arrow::record_batch::RecordBatch; pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction}; -use std::any::Any; -use std::ops::Range; -use std::sync::Arc; /// Returns the datatype of the window function pub fn return_type( @@ -112,81 +103,6 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature { } } -/// Partition evaluator -pub(crate) trait PartitionEvaluator { - /// Whether the evaluator should be evaluated with rank - fn include_rank(&self) -> bool { - false - } - - /// evaluate the partition evaluator against the partitions - fn evaluate(&self, partition_points: Vec>) -> Result> { - partition_points - .into_iter() - .map(|partition| self.evaluate_partition(partition)) - .collect() - } - - /// evaluate the partition evaluator against the partitions with rank information - fn evaluate_with_rank( - &self, - partition_points: Vec>, - sort_partition_points: Vec>, - ) -> Result> { - partition_points - .into_iter() - .map(|partition| { - let ranks_in_partition = - find_ranges_in_range(&partition, &sort_partition_points); - self.evaluate_partition_with_rank(partition, ranks_in_partition) - }) - .collect() - } - - /// evaluate the partition evaluator against the partition - fn evaluate_partition(&self, _partition: Range) -> Result; - - /// evaluate the partition evaluator against the partition but with rank - fn evaluate_partition_with_rank( - &self, - _partition: Range, - _ranks_in_partition: &[Range], - ) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate_partition_with_rank is not implemented by default".into(), - )) - } -} - -/// A window expression that is a built-in window function. -/// -/// Note that unlike aggregation based window functions, built-in window functions normally ignore -/// window frame spec, with the exception of first_value, last_value, and nth_value. -pub(crate) trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { - /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "BuiltInWindowFunctionExpr: default name" - } - - /// Create built-in window evaluator with a batch - fn create_evaluator( - &self, - batch: &RecordBatch, - ) -> Result>; -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs index 4c97e2b753314..30e0b2994f50b 100644 --- a/datafusion/src/physical_plan/windows/aggregate.rs +++ b/datafusion/src/physical_plan/windows/aggregate.rs @@ -18,7 +18,6 @@ //! Physical exec for aggregate window function expressions. use crate::error::{DataFusionError, Result}; -use crate::physical_plan::windows::find_ranges_in_range; use crate::physical_plan::{ expressions::PhysicalSortExpr, Accumulator, AggregateExpr, PhysicalExpr, WindowExpr, }; @@ -26,6 +25,7 @@ use arrow::compute::concat; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; use datafusion_expr::{WindowFrame, WindowFrameUnits}; +use datafusion_physical_expr::window::find_ranges_in_range; use std::any::Any; use std::iter::IntoIterator; use std::ops::Range; diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs index de627cbcd27c0..3ded850432bf2 100644 --- a/datafusion/src/physical_plan/windows/built_in.rs +++ b/datafusion/src/physical_plan/windows/built_in.rs @@ -18,13 +18,11 @@ //! Physical exec for built-in window function expressions. use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{ - expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr, - PhysicalExpr, WindowExpr, -}; +use crate::physical_plan::{expressions::PhysicalSortExpr, PhysicalExpr, WindowExpr}; use arrow::compute::concat; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; use std::any::Any; use std::sync::Arc; diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index b3bf9ce83f571..34f9337b0036f 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -25,17 +25,14 @@ use crate::physical_plan::{ PhysicalSortExpr, RowNumber, }, type_coercion::coerce, - window_functions::{ - signature_for_built_in, BuiltInWindowFunction, BuiltInWindowFunctionExpr, - WindowFunction, - }, + window_functions::{signature_for_built_in, BuiltInWindowFunction, WindowFunction}, PhysicalExpr, WindowExpr, }; use crate::scalar::ScalarValue; use arrow::datatypes::Schema; use datafusion_expr::WindowFrame; +use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; use std::convert::TryInto; -use std::ops::Range; use std::sync::Arc; mod aggregate; @@ -153,23 +150,6 @@ fn create_built_in_window_expr( }) } -/// Given a partition range, and the full list of sort partition points, given that the sort -/// partition points are sorted using [partition columns..., order columns...], the split -/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted -/// on finer columns), so this will use binary search to find ranges that are within the -/// partition range and return the valid slice. -pub(crate) fn find_ranges_in_range<'a>( - partition_range: &Range, - sort_partition_points: &'a [Range], -) -> &'a [Range] { - let start_idx = sort_partition_points - .partition_point(|sort_range| sort_range.start < partition_range.start); - let end_idx = start_idx - + sort_partition_points[start_idx..] - .partition_point(|sort_range| sort_range.end <= partition_range.end); - &sort_partition_points[start_idx..end_idx] -} - #[cfg(test)] mod tests { use super::*;