From f3cd19974394334206ccc54137e06418eb0d2f2d Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 11 Feb 2022 17:39:20 +0800 Subject: [PATCH 1/5] use implement bitmap_distinct --- datafusion-expr/src/aggregate_function.rs | 3 + datafusion-expr/src/expr_fn.rs | 9 + datafusion-physical-expr/Cargo.toml | 1 + .../src/expressions/bitmap_distinct.rs | 211 ++++++++++++++++++ .../src/expressions/mod.rs | 2 + datafusion-proto/src/to_proto.rs | 8 +- datafusion/Cargo.toml | 1 - datafusion/src/physical_plan/aggregates.rs | 59 ++++- datafusion/tests/sql/aggregates.rs | 72 ++++++ 9 files changed, 356 insertions(+), 10 deletions(-) create mode 100644 datafusion-physical-expr/src/expressions/bitmap_distinct.rs diff --git a/datafusion-expr/src/aggregate_function.rs b/datafusion-expr/src/aggregate_function.rs index 87b666ffb3369..30c0e02a02b11 100644 --- a/datafusion-expr/src/aggregate_function.rs +++ b/datafusion-expr/src/aggregate_function.rs @@ -55,6 +55,8 @@ pub enum AggregateFunction { ApproxPercentileCont, /// ApproxMedian ApproxMedian, + /// BitMap count distinct function + BitMapCountDistinct, } impl fmt::Display for AggregateFunction { @@ -87,6 +89,7 @@ impl FromStr for AggregateFunction { "corr" => AggregateFunction::Correlation, "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont, "approx_median" => AggregateFunction::ApproxMedian, + "bitmap_distinct" => AggregateFunction::BitMapCountDistinct, _ => { return Err(DataFusionError::Plan(format!( "There is no built-in function named {}", diff --git a/datafusion-expr/src/expr_fn.rs b/datafusion-expr/src/expr_fn.rs index a5c302c5f6bbb..ae2e71d8b19b6 100644 --- a/datafusion-expr/src/expr_fn.rs +++ b/datafusion-expr/src/expr_fn.rs @@ -166,6 +166,15 @@ pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr { } } +/// Returns the precise number of distinct input values using bitmap. +pub fn bitmap_count_distinct(expr: Expr) -> Expr { + Expr::AggregateFunction { + fun: aggregate_function::AggregateFunction::BitMapCountDistinct, + distinct: false, + args: vec![expr], + } +} + // TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many // varying arity functions /// Create an convenience function representing a unary scalar function diff --git a/datafusion-physical-expr/Cargo.toml b/datafusion-physical-expr/Cargo.toml index 90a560ef9a911..8cf6e3f1ffe36 100644 --- a/datafusion-physical-expr/Cargo.toml +++ b/datafusion-physical-expr/Cargo.toml @@ -55,3 +55,4 @@ hashbrown = { version = "0.12", features = ["raw"] } chrono = { version = "0.4", default-features = false } regex = { version = "^1.4.3", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } +roaring = "0.8.1" diff --git a/datafusion-physical-expr/src/expressions/bitmap_distinct.rs b/datafusion-physical-expr/src/expressions/bitmap_distinct.rs new file mode 100644 index 0000000000000..440d75d133b3b --- /dev/null +++ b/datafusion-physical-expr/src/expressions/bitmap_distinct.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expressions that can evaluated at runtime during query execution + +use std::any::Any; + +use std::fmt::Debug; +use std::ops::BitOrAssign; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array, + UInt32Array, UInt8Array, +}; +use arrow::datatypes::{DataType, Field}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::Accumulator; +use roaring::RoaringBitmap; + +use crate::{AggregateExpr, PhysicalExpr}; + +use super::format_state_name; + +/// BITMAP_DISTINCT aggregate expression +#[derive(Debug)] +pub struct BitMapDistinct { + name: String, + input_data_type: DataType, + expr: Arc, +} + +impl BitMapDistinct { + /// Create a new BitmapDistinct aggregate function. + pub fn new( + expr: Arc, + name: impl Into, + input_data_type: DataType, + ) -> Self { + Self { + name: name.into(), + input_data_type, + expr, + } + } +} + +impl AggregateExpr for BitMapDistinct { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + /// the field of the final result of this aggregation. + fn field(&self) -> Result { + Ok(Field::new(&self.name, DataType::UInt64, false)) + } + + fn create_accumulator(&self) -> Result> { + let accumulator: Box = match &self.input_data_type { + DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()), + other => { + return Err(DataFusionError::NotImplemented(format!( + "Support for 'bitmap_distinct' for data type {} is not implemented", + other + ))) + } + }; + Ok(accumulator) + } + + fn state_fields(&self) -> Result> { + Ok(vec![Field::new( + &format_state_name(&self.name, "bitmap_registers"), + DataType::Binary, + false, + )]) + } + + fn expressions(&self) -> Vec> { + vec![self.expr.clone()] + } + + fn name(&self) -> &str { + &self.name + } +} + +#[derive(Debug)] +struct BitmapDistinctCountAccumulator { + bitmap: roaring::bitmap::RoaringBitmap, +} + +impl BitmapDistinctCountAccumulator { + fn try_new() -> Self { + Self { + bitmap: RoaringBitmap::new(), + } + } +} + +impl Accumulator for BitmapDistinctCountAccumulator { + //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values. + fn state(&self) -> Result> { + let mut bytes = vec![]; + self.bitmap.serialize_into(&mut bytes).unwrap(); + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let value = &values[0]; + if value.is_empty() { + return Ok(()); + } + match value.data_type() { + DataType::Int8 => { + let array = value.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + self.bitmap.insert(array.value(i) as u32); + } + } + DataType::Int16 => { + let array = value.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + self.bitmap.insert(array.value(i) as u32); + } + } + DataType::Int32 => { + let array = value.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + self.bitmap.insert(array.value(i) as u32); + } + } + DataType::UInt8 => { + let array = value.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + self.bitmap.insert(array.value(i) as u32); + } + } + DataType::UInt16 => { + let array = value.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + self.bitmap.insert(array.value(i) as u32); + } + } + DataType::UInt32 => { + let array = value.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + self.bitmap.insert(array.value(i)); + } + } + e => { + return Err(DataFusionError::Internal(format!( + "BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}", + e + ))); + } + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let binary_array = states[0].as_any().downcast_ref::().unwrap(); + + for b in binary_array.iter() { + let v = b.ok_or_else(|| { + DataFusionError::Internal( + "Impossibly got empty binary array from states".into(), + ) + })?; + let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap(); + self.bitmap.bitor_assign(bitmap); + } + Ok(()) + } + + fn evaluate(&self) -> Result { + Ok(ScalarValue::from(self.bitmap.len())) + } +} + +pub fn is_bitmap_count_distinct_supported_arg_type(arg_type: &DataType) -> bool { + matches!( + arg_type, + DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + ) +} diff --git a/datafusion-physical-expr/src/expressions/mod.rs b/datafusion-physical-expr/src/expressions/mod.rs index dd0b01129e8e6..92c13bc024ee6 100644 --- a/datafusion-physical-expr/src/expressions/mod.rs +++ b/datafusion-physical-expr/src/expressions/mod.rs @@ -21,6 +21,7 @@ mod approx_distinct; mod approx_percentile_cont; mod array_agg; mod average; +mod bitmap_distinct; #[macro_use] mod binary; mod case; @@ -66,6 +67,7 @@ pub use array_agg::ArrayAgg; pub use average::is_avg_support_arg_type; pub use average::{avg_return_type, Avg, AvgAccumulator}; pub use binary::{binary, binary_operator_data_type, BinaryExpr}; +pub use bitmap_distinct::{is_bitmap_count_distinct_supported_arg_type, BitMapDistinct}; pub use case::{case, CaseExpr}; pub use cast::{ cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS, diff --git a/datafusion-proto/src/to_proto.rs b/datafusion-proto/src/to_proto.rs index 29c533a560a92..a110201b1ec12 100644 --- a/datafusion-proto/src/to_proto.rs +++ b/datafusion-proto/src/to_proto.rs @@ -494,6 +494,9 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode { AggregateFunction::ApproxMedian => { protobuf::AggregateFunction::ApproxMedian } + AggregateFunction::BitMapCountDistinct => { + protobuf::AggregateFunction::BitmapDistinct + } }; let aggregate_expr = protobuf::AggregateExprNode { @@ -772,8 +775,9 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { scalar, datatype, )) } - } - ( + AggregateFunction::BitMapCountDistinct => Self::BitmapDistinct, + } + ( scalar::ScalarValue::Boolean(_), DataType::Boolean, ) => scalar.try_into(), diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index e2523b2b93347..80842272d6135 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -81,7 +81,6 @@ num-traits = { version = "0.2", optional = true } pyo3 = { version = "0.16", optional = true } tempfile = "3" parking_lot = "0.12" -uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] criterion = "0.3" diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index ab114643feb10..84041438ae7ac 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -54,9 +54,9 @@ pub fn return_type( match fun { // TODO If the datafusion is compatible with PostgreSQL, the returned data type should be INT64. - AggregateFunction::Count | AggregateFunction::ApproxDistinct => { - Ok(DataType::UInt64) - } + AggregateFunction::Count + | AggregateFunction::ApproxDistinct + | AggregateFunction::BitMapCountDistinct => Ok(DataType::UInt64), AggregateFunction::Max | AggregateFunction::Min => { // For min and max agg function, the returned type is same as input type. // The coerced_data_types is same with input_types. @@ -280,6 +280,13 @@ pub fn create_aggregate_expr( "MEDIAN(DISTINCT) aggregations are not available".to_string(), )); } + (AggregateFunction::BitMapCountDistinct, _) => { + Arc::new(expressions::BitMapDistinct::new( + coerced_phy_exprs[0].clone(), + name, + coerced_exprs_types[0].clone(), + )) + } }) } @@ -313,7 +320,10 @@ pub(super) fn signature(fun: &AggregateFunction) -> Signature { match fun { AggregateFunction::Count | AggregateFunction::ApproxDistinct - | AggregateFunction::ArrayAgg => Signature::any(1, Volatility::Immutable), + | AggregateFunction::ArrayAgg + | AggregateFunction::BitMapCountDistinct => { + Signature::any(1, Volatility::Immutable) + } AggregateFunction::Min | AggregateFunction::Max => { let valid = STRINGS .iter() @@ -354,11 +364,12 @@ pub(super) fn signature(fun: &AggregateFunction) -> Signature { mod tests { use super::*; use crate::physical_plan::expressions::{ - ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg, Correlation, - Count, Covariance, DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum, - Variance, + ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg, + BitMapDistinct, Correlation, Count, Covariance, DistinctArrayAgg, DistinctCount, + Max, Min, Stddev, Sum, Variance, }; use crate::{error::Result, scalar::ScalarValue}; + use arrow::datatypes::DataType::UInt64; #[test] fn test_count_arragg_approx_expr() -> Result<()> { @@ -500,6 +511,40 @@ mod tests { ); } } + #[test] + fn test_bitmap_count_distinct_expr() { + let data_types = vec![ + DataType::Int8, + DataType::Int16, + DataType::Int32, + DataType::UInt8, + DataType::UInt16, + DataType::UInt32, + ]; + + for data_type in data_types { + let input_schema = + Schema::new(vec![Field::new("c1", data_type.clone(), true)]); + let input_phy_exprs: Vec> = vec![Arc::new( + expressions::Column::new_with_schema("c1", &input_schema).unwrap(), + )]; + let result_agg_phy_exprs = create_aggregate_expr( + &AggregateFunction::BitMapCountDistinct, + false, + &input_phy_exprs[..], + &input_schema, + "c1", + ) + .expect("failed to create aggregate expr"); + + assert!(result_agg_phy_exprs.as_any().is::()); + assert_eq!("c1", result_agg_phy_exprs.name()); + assert_eq!( + Field::new("c1", UInt64, false), + result_agg_phy_exprs.field().unwrap() + ); + } + } #[test] fn test_agg_approx_percentile_invalid_phy_expr() { diff --git a/datafusion/tests/sql/aggregates.rs b/datafusion/tests/sql/aggregates.rs index 8c5640c04344b..f83679ad390f2 100644 --- a/datafusion/tests/sql/aggregates.rs +++ b/datafusion/tests/sql/aggregates.rs @@ -737,3 +737,75 @@ async fn aggregate_timestamps_avg() -> Result<()> { assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None)."); Ok(()) } + +#[tokio::test] +async fn csv_query_bitmap_i8() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).await?; + let sql = "SELECT bitmap_distinct(c2) FROM aggregate_test_100"; + let results = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+--------------------------------------------+", + "| BITMAPCOUNTDISTINCT(aggregate_test_100.c2) |", + "+--------------------------------------------+", + "| 5 |", + "+--------------------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn csv_query_bitmap_u8() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).await?; + let sql = "SELECT bitmap_distinct(c3) FROM aggregate_test_100"; + let results = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+--------------------------------------------+", + "| BITMAPCOUNTDISTINCT(aggregate_test_100.c3) |", + "+--------------------------------------------+", + "| 80 |", + "+--------------------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn csv_query_bitmap_u32() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).await?; + let sql = "SELECT bitmap_distinct(c5) FROM aggregate_test_100"; + let results = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+--------------------------------------------+", + "| BITMAPCOUNTDISTINCT(aggregate_test_100.c5) |", + "+--------------------------------------------+", + "| 100 |", + "+--------------------------------------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} + +#[tokio::test] +async fn csv_query_bitmap_group() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).await?; + let sql1 = "SELECT bitmap_distinct(c5) as a FROM aggregate_test_100 group by c1 order by a limit 5 "; + let result1 = execute_to_batches(&mut ctx, sql1).await; + + let sql2 = "SELECT count(distinct c5) as a FROM aggregate_test_100 group by c1 order by a limit 5 "; + let result2 = execute_to_batches(&mut ctx, sql2).await; + + assert_eq!( + arrow::util::pretty::pretty_format_batches(&result1) + .unwrap() + .to_string(), + arrow::util::pretty::pretty_format_batches(&result2) + .unwrap() + .to_string() + ); + Ok(()) +} From c2b3e94ce19ae48753e4723e6c36af61f955712c Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 17 Mar 2022 15:10:11 +0800 Subject: [PATCH 2/5] fix conflict2 --- .../src/coercion_rule/aggregate_rule.rs | 15 ++++++++++++--- datafusion/Cargo.toml | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs b/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs index 279fe7d31b7cb..81b7a5a3d1552 100644 --- a/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs +++ b/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs @@ -19,9 +19,9 @@ use crate::expressions::{ is_approx_percentile_cont_supported_arg_type, is_avg_support_arg_type, - is_correlation_support_arg_type, is_covariance_support_arg_type, - is_stddev_support_arg_type, is_sum_support_arg_type, is_variance_support_arg_type, - try_cast, + is_bitmap_count_distinct_supported_arg_type, is_correlation_support_arg_type, + is_covariance_support_arg_type, is_stddev_support_arg_type, is_sum_support_arg_type, + is_variance_support_arg_type, try_cast, }; use crate::PhysicalExpr; use arrow::datatypes::DataType; @@ -161,6 +161,15 @@ pub fn coerce_types( } Ok(input_types.to_vec()) } + AggregateFunction::BitMapCountDistinct => { + if !is_bitmap_count_distinct_supported_arg_type(&input_types[0]) { + return Err(DataFusionError::Plan(format!( + "The function {:?} does not support inputs of type {:?}.", + agg_fun, input_types[0] + ))); + } + Ok(input_types.to_vec()) + } } } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 80842272d6135..e2523b2b93347 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -81,6 +81,7 @@ num-traits = { version = "0.2", optional = true } pyo3 = { version = "0.16", optional = true } tempfile = "3" parking_lot = "0.12" +uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] criterion = "0.3" From 10b53027ef70666205d599539a9e0c0d0b717a14 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 17 Mar 2022 15:15:11 +0800 Subject: [PATCH 3/5] fix null problem; --- .../src/expressions/bitmap_distinct.rs | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/datafusion-physical-expr/src/expressions/bitmap_distinct.rs b/datafusion-physical-expr/src/expressions/bitmap_distinct.rs index 440d75d133b3b..88401014495d9 100644 --- a/datafusion-physical-expr/src/expressions/bitmap_distinct.rs +++ b/datafusion-physical-expr/src/expressions/bitmap_distinct.rs @@ -134,38 +134,56 @@ impl Accumulator for BitmapDistinctCountAccumulator { match value.data_type() { DataType::Int8 => { let array = value.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - self.bitmap.insert(array.value(i) as u32); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; } } DataType::Int16 => { let array = value.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - self.bitmap.insert(array.value(i) as u32); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; } } DataType::Int32 => { let array = value.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - self.bitmap.insert(array.value(i) as u32); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; } } DataType::UInt8 => { let array = value.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - self.bitmap.insert(array.value(i) as u32); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; } } DataType::UInt16 => { let array = value.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - self.bitmap.insert(array.value(i) as u32); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; } } DataType::UInt32 => { let array = value.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - self.bitmap.insert(array.value(i)); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; } } e => { From 5a5fbd4b72b4a5793730f1cb8d44910a835fa878 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 17 Mar 2022 15:54:25 +0800 Subject: [PATCH 4/5] add optional dependency name roaring_bitmap --- datafusion-physical-expr/Cargo.toml | 5 +++-- .../src/coercion_rule/aggregate_rule.rs | 16 +++++++++++++--- datafusion-physical-expr/src/expressions/mod.rs | 2 ++ datafusion-proto/proto/datafusion.proto | 1 + datafusion-proto/src/from_proto.rs | 1 + datafusion-proto/src/to_proto.rs | 6 +++--- datafusion/src/physical_plan/aggregates.rs | 6 ++++++ 7 files changed, 29 insertions(+), 8 deletions(-) diff --git a/datafusion-physical-expr/Cargo.toml b/datafusion-physical-expr/Cargo.toml index 8cf6e3f1ffe36..0f81055670811 100644 --- a/datafusion-physical-expr/Cargo.toml +++ b/datafusion-physical-expr/Cargo.toml @@ -33,10 +33,11 @@ name = "datafusion_physical_expr" path = "src/lib.rs" [features] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] +default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "roaring_bitmap"] crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] regex_expressions = ["regex"] unicode_expressions = ["unicode-segmentation"] +roaring_bitmap = ["roaring"] [dependencies] datafusion-common = { path = "../datafusion-common", version = "7.0.0" } @@ -55,4 +56,4 @@ hashbrown = { version = "0.12", features = ["raw"] } chrono = { version = "0.4", default-features = false } regex = { version = "^1.4.3", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } -roaring = "0.8.1" +roaring = {version = "0.9.0", optional = true } diff --git a/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs b/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs index 81b7a5a3d1552..c1c904fabffeb 100644 --- a/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs +++ b/datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs @@ -17,11 +17,13 @@ //! Support the coercion rule for aggregate function. +#[cfg(feature = "roaring_bitmap")] +use crate::expressions::is_bitmap_count_distinct_supported_arg_type; use crate::expressions::{ is_approx_percentile_cont_supported_arg_type, is_avg_support_arg_type, - is_bitmap_count_distinct_supported_arg_type, is_correlation_support_arg_type, - is_covariance_support_arg_type, is_stddev_support_arg_type, is_sum_support_arg_type, - is_variance_support_arg_type, try_cast, + is_correlation_support_arg_type, is_covariance_support_arg_type, + is_stddev_support_arg_type, is_sum_support_arg_type, is_variance_support_arg_type, + try_cast, }; use crate::PhysicalExpr; use arrow::datatypes::DataType; @@ -161,6 +163,7 @@ pub fn coerce_types( } Ok(input_types.to_vec()) } + #[cfg(feature = "roaring_bitmap")] AggregateFunction::BitMapCountDistinct => { if !is_bitmap_count_distinct_supported_arg_type(&input_types[0]) { return Err(DataFusionError::Plan(format!( @@ -170,6 +173,13 @@ pub fn coerce_types( } Ok(input_types.to_vec()) } + #[cfg(not(feature = "roaring_bitmap"))] + AggregateFunction::BitMapCountDistinct => { + return Err(DataFusionError::Plan(format!( + "The function {:?} does not support inputs of type {:?}.", + agg_fun, input_types[0] + ))); + } } } diff --git a/datafusion-physical-expr/src/expressions/mod.rs b/datafusion-physical-expr/src/expressions/mod.rs index 92c13bc024ee6..4e83304401411 100644 --- a/datafusion-physical-expr/src/expressions/mod.rs +++ b/datafusion-physical-expr/src/expressions/mod.rs @@ -21,6 +21,7 @@ mod approx_distinct; mod approx_percentile_cont; mod array_agg; mod average; +#[cfg(feature = "roaring_bitmap")] mod bitmap_distinct; #[macro_use] mod binary; @@ -67,6 +68,7 @@ pub use array_agg::ArrayAgg; pub use average::is_avg_support_arg_type; pub use average::{avg_return_type, Avg, AvgAccumulator}; pub use binary::{binary, binary_operator_data_type, BinaryExpr}; +#[cfg(feature = "roaring_bitmap")] pub use bitmap_distinct::{is_bitmap_count_distinct_supported_arg_type, BitMapDistinct}; pub use case::{case, CaseExpr}; pub use cast::{ diff --git a/datafusion-proto/proto/datafusion.proto b/datafusion-proto/proto/datafusion.proto index a0c5c2f1f996a..ade68b1ca4814 100644 --- a/datafusion-proto/proto/datafusion.proto +++ b/datafusion-proto/proto/datafusion.proto @@ -201,6 +201,7 @@ enum AggregateFunction { CORRELATION=13; APPROX_PERCENTILE_CONT = 14; APPROX_MEDIAN=15; + BITMAP_DISTINCT=16; } message AggregateExprNode { diff --git a/datafusion-proto/src/from_proto.rs b/datafusion-proto/src/from_proto.rs index e7a5def7b871b..a984b3f5a11d1 100644 --- a/datafusion-proto/src/from_proto.rs +++ b/datafusion-proto/src/from_proto.rs @@ -452,6 +452,7 @@ impl From for AggregateFunction { Self::ApproxPercentileCont } protobuf::AggregateFunction::ApproxMedian => Self::ApproxMedian, + protobuf::AggregateFunction::BitmapDistinct => Self::BitMapCountDistinct, } } } diff --git a/datafusion-proto/src/to_proto.rs b/datafusion-proto/src/to_proto.rs index a110201b1ec12..340f371efd39e 100644 --- a/datafusion-proto/src/to_proto.rs +++ b/datafusion-proto/src/to_proto.rs @@ -312,6 +312,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::Correlation => Self::Correlation, AggregateFunction::ApproxPercentileCont => Self::ApproxPercentileCont, AggregateFunction::ApproxMedian => Self::ApproxMedian, + AggregateFunction::BitMapCountDistinct => Self::BitmapDistinct, } } } @@ -775,9 +776,8 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { scalar, datatype, )) } - AggregateFunction::BitMapCountDistinct => Self::BitmapDistinct, - } - ( + } + ( scalar::ScalarValue::Boolean(_), DataType::Boolean, ) => scalar.try_into(), diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 84041438ae7ac..e1ddc03dae1a7 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -280,6 +280,7 @@ pub fn create_aggregate_expr( "MEDIAN(DISTINCT) aggregations are not available".to_string(), )); } + #[cfg(feature = "roaring_bitmap")] (AggregateFunction::BitMapCountDistinct, _) => { Arc::new(expressions::BitMapDistinct::new( coerced_phy_exprs[0].clone(), @@ -287,6 +288,11 @@ pub fn create_aggregate_expr( coerced_exprs_types[0].clone(), )) } + (AggregateFunction::BitMapCountDistinct, _) => { + return Err(DataFusionError::NotImplemented( + "BitMapCountDistinct aggregations are not available".to_string(), + )); + } }) } From 7794552aedbc2341eb8ece0e680e72c298169087 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 17 Mar 2022 18:13:59 +0800 Subject: [PATCH 5/5] add fix optional dependency --- datafusion/Cargo.toml | 3 ++- datafusion/src/physical_plan/aggregates.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index e2523b2b93347..4335ff62930b8 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -38,11 +38,12 @@ name = "datafusion" path = "src/lib.rs" [features] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] +default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "roaring_bitmap"] simd = ["arrow/simd"] crypto_expressions = [ "datafusion-physical-expr/crypto_expressions" ] unicode_expressions = ["datafusion-physical-expr/regex_expressions"] regex_expressions = ["datafusion-physical-expr/regex_expressions"] +roaring_bitmap = ["datafusion-physical-expr/roaring_bitmap"] pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index e1ddc03dae1a7..9b9aa9861c154 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -288,6 +288,7 @@ pub fn create_aggregate_expr( coerced_exprs_types[0].clone(), )) } + #[cfg(not(feature = "roaring_bitmap"))] (AggregateFunction::BitMapCountDistinct, _) => { return Err(DataFusionError::NotImplemented( "BitMapCountDistinct aggregations are not available".to_string(),