Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion-expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub enum AggregateFunction {
ApproxPercentileCont,
/// ApproxMedian
ApproxMedian,
/// BitMap count distinct function
BitMapCountDistinct,
}

impl fmt::Display for AggregateFunction {
Expand Down Expand Up @@ -87,6 +89,7 @@ impl FromStr for AggregateFunction {
"corr" => AggregateFunction::Correlation,
"approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
"approx_median" => AggregateFunction::ApproxMedian,
"bitmap_distinct" => AggregateFunction::BitMapCountDistinct,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in function named {}",
Expand Down
9 changes: 9 additions & 0 deletions datafusion-expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr {
}
}

/// Returns the precise number of distinct input values using bitmap.
pub fn bitmap_count_distinct(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregate_function::AggregateFunction::BitMapCountDistinct,
distinct: false,
args: vec![expr],
}
}

// TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many
// varying arity functions
/// Create an convenience function representing a unary scalar function
Expand Down
4 changes: 3 additions & 1 deletion datafusion-physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ name = "datafusion_physical_expr"
path = "src/lib.rs"

[features]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "roaring_bitmap"]
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
regex_expressions = ["regex"]
unicode_expressions = ["unicode-segmentation"]
roaring_bitmap = ["roaring"]

[dependencies]
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
Expand All @@ -55,3 +56,4 @@ hashbrown = { version = "0.12", features = ["raw"] }
chrono = { version = "0.4", default-features = false }
regex = { version = "^1.4.3", optional = true }
unicode-segmentation = { version = "^1.7.1", optional = true }
roaring = {version = "0.9.0", optional = true }
19 changes: 19 additions & 0 deletions datafusion-physical-expr/src/coercion_rule/aggregate_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Support the coercion rule for aggregate function.

#[cfg(feature = "roaring_bitmap")]
use crate::expressions::is_bitmap_count_distinct_supported_arg_type;
use crate::expressions::{
is_approx_percentile_cont_supported_arg_type, is_avg_support_arg_type,
is_correlation_support_arg_type, is_covariance_support_arg_type,
Expand Down Expand Up @@ -161,6 +163,23 @@ pub fn coerce_types(
}
Ok(input_types.to_vec())
}
#[cfg(feature = "roaring_bitmap")]
AggregateFunction::BitMapCountDistinct => {
if !is_bitmap_count_distinct_supported_arg_type(&input_types[0]) {
return Err(DataFusionError::Plan(format!(
"The function {:?} does not support inputs of type {:?}.",
agg_fun, input_types[0]
)));
}
Ok(input_types.to_vec())
}
#[cfg(not(feature = "roaring_bitmap"))]
AggregateFunction::BitMapCountDistinct => {
return Err(DataFusionError::Plan(format!(
"The function {:?} does not support inputs of type {:?}.",
agg_fun, input_types[0]
)));
}
}
}

Expand Down
229 changes: 229 additions & 0 deletions datafusion-physical-expr/src/expressions/bitmap_distinct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[brainstorming] how about we combine this to ApproxDistinct and use BitmapDistinctCountAccumulator for int8, int16 and int32 if the feature is avilable?

And use NumericHLLAccumulator for int64 and other non-int types. This way, user just need declare approx_distinct and rely on Datafusion to auto select the best approximate algorithm

https://github.com/apache/arrow-datafusion/blob/81592947e8814327ebdbd1fbc3d4a090796e37a3/datafusion-physical-expr/src/expressions/approx_distinct.rs#L91-L98


unrelate notes: as a user, I do want to keep count(distinct) as exact count and approx_distinct as approximation

// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution

use std::any::Any;

use std::fmt::Debug;
use std::ops::BitOrAssign;
use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, BinaryArray, Int16Array, Int32Array, Int8Array, UInt16Array,
UInt32Array, UInt8Array,
};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
use roaring::RoaringBitmap;

use crate::{AggregateExpr, PhysicalExpr};

use super::format_state_name;

/// BITMAP_DISTINCT aggregate expression
#[derive(Debug)]
pub struct BitMapDistinct {
name: String,
input_data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
}

impl BitMapDistinct {
/// Create a new BitmapDistinct aggregate function.
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
input_data_type: DataType,
) -> Self {
Self {
name: name.into(),
input_data_type,
expr,
}
}
}

impl AggregateExpr for BitMapDistinct {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field> {
Ok(Field::new(&self.name, DataType::UInt64, false))
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::Int8
| DataType::Int16
| DataType::Int32 => Box::new(BitmapDistinctCountAccumulator::try_new()),
other => {
return Err(DataFusionError::NotImplemented(format!(
"Support for 'bitmap_distinct' for data type {} is not implemented",
other
)))
}
};
Ok(accumulator)
}

fn state_fields(&self) -> Result<Vec<Field>> {
Ok(vec![Field::new(
&format_state_name(&self.name, "bitmap_registers"),
DataType::Binary,
false,
)])
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}
}

#[derive(Debug)]
struct BitmapDistinctCountAccumulator {
bitmap: roaring::bitmap::RoaringBitmap,
}

impl BitmapDistinctCountAccumulator {
fn try_new() -> Self {
Self {
bitmap: RoaringBitmap::new(),
}
}
}

impl Accumulator for BitmapDistinctCountAccumulator {
//state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut bytes = vec![];
self.bitmap.serialize_into(&mut bytes).unwrap();
Ok(vec![ScalarValue::Binary(Some(bytes))])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let value = &values[0];
if value.is_empty() {
return Ok(());
}
match value.data_type() {
DataType::Int8 => {
let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::Int16 => {
let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::Int32 => {
let array = value.as_any().downcast_ref::<Int32Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::UInt8 => {
let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::UInt16 => {
let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::UInt32 => {
let array = value.as_any().downcast_ref::<UInt32Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
e => {
return Err(DataFusionError::Internal(format!(
"BITMAP_COUNT_DISTINCT is not expected to receive the type {:?}",
e
)));
}
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();

for b in binary_array.iter() {
let v = b.ok_or_else(|| {
DataFusionError::Internal(
"Impossibly got empty binary array from states".into(),
)
})?;
let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap();
self.bitmap.bitor_assign(bitmap);
}
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::from(self.bitmap.len()))
}
}

pub fn is_bitmap_count_distinct_supported_arg_type(arg_type: &DataType) -> bool {
matches!(
arg_type,
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::Int8
| DataType::Int16
| DataType::Int32
)
}
4 changes: 4 additions & 0 deletions datafusion-physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod approx_distinct;
mod approx_percentile_cont;
mod array_agg;
mod average;
#[cfg(feature = "roaring_bitmap")]
mod bitmap_distinct;
#[macro_use]
mod binary;
mod case;
Expand Down Expand Up @@ -66,6 +68,8 @@ pub use array_agg::ArrayAgg;
pub use average::is_avg_support_arg_type;
pub use average::{avg_return_type, Avg, AvgAccumulator};
pub use binary::{binary, binary_operator_data_type, BinaryExpr};
#[cfg(feature = "roaring_bitmap")]
pub use bitmap_distinct::{is_bitmap_count_distinct_supported_arg_type, BitMapDistinct};
pub use case::{case, CaseExpr};
pub use cast::{
cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
Expand Down
1 change: 1 addition & 0 deletions datafusion-proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ enum AggregateFunction {
CORRELATION=13;
APPROX_PERCENTILE_CONT = 14;
APPROX_MEDIAN=15;
BITMAP_DISTINCT=16;
}

message AggregateExprNode {
Expand Down
1 change: 1 addition & 0 deletions datafusion-proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
Self::ApproxPercentileCont
}
protobuf::AggregateFunction::ApproxMedian => Self::ApproxMedian,
protobuf::AggregateFunction::BitmapDistinct => Self::BitMapCountDistinct,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion-proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::ApproxPercentileCont => Self::ApproxPercentileCont,
AggregateFunction::ApproxMedian => Self::ApproxMedian,
AggregateFunction::BitMapCountDistinct => Self::BitmapDistinct,
}
}
}
Expand Down Expand Up @@ -494,6 +495,9 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
AggregateFunction::ApproxMedian => {
protobuf::AggregateFunction::ApproxMedian
}
AggregateFunction::BitMapCountDistinct => {
protobuf::AggregateFunction::BitmapDistinct
}
};

let aggregate_expr = protobuf::AggregateExprNode {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ name = "datafusion"
path = "src/lib.rs"

[features]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "roaring_bitmap"]
simd = ["arrow/simd"]
crypto_expressions = [ "datafusion-physical-expr/crypto_expressions" ]
unicode_expressions = ["datafusion-physical-expr/regex_expressions"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
roaring_bitmap = ["datafusion-physical-expr/roaring_bitmap"]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
Expand Down
Loading