Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion-macros = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
half = { workspace = true }
hashbrown = { workspace = true }
log = { workspace = true }
paste = "1.0.14"

Expand Down
202 changes: 197 additions & 5 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

use ahash::RandomState;
use arrow::array::{ArrowNativeTypeOp, ListArray, UInt64Array};
use datafusion_common::hash_utils::combine_hashes;
use datafusion_common::stats::Precision;
use datafusion_expr::expr::WindowFunction;
use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator;
Expand Down Expand Up @@ -346,18 +348,22 @@ impl AggregateUDFImpl for Count {
fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
// groups accumulator only supports `COUNT(c1)`, not
// `COUNT(c1, c2)`, etc
if args.is_distinct {
return false;
}
// if args.is_distinct {
// return false;
// }
args.exprs.len() == 1
}

fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
// instantiate specialized accumulator
Ok(Box::new(CountGroupsAccumulator::new()))
if args.is_distinct {
Ok(Box::new(DistinctCountGroupsAccumulator::new()))
} else {
Ok(Box::new(CountGroupsAccumulator::new()))
}
}

fn reverse_expr(&self) -> ReversedUDAF {
Expand Down Expand Up @@ -623,6 +629,192 @@ impl GroupsAccumulator for CountGroupsAccumulator {
}
}

/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
/// Stores values as native types, and does overflow checking
///
/// Unlike most other accumulators, COUNT never produces NULLs. If no
/// non-null values are seen in any group the output is 0. Thus, this
/// accumulator has no additional null or seen filter tracking.
#[derive(Debug)]
struct DistinctCountGroupsAccumulator {
/// Distinct count per group.
///
/// Note this is an i64 and not a u64 (or usize) because the
/// output type of count is `DataType::Int64`. Thus by using `i64`
/// for the counts, the output [`Int64Array`] can be created
/// without copy.
counts: Vec<Option<Vec<Option<i64>>>>,
final_count: Vec<i64>,

map: hashbrown::HashTable<usize>,
values: Vec<i64>,
group_indices: Vec<usize>,
random_state: RandomState,
}

impl DistinctCountGroupsAccumulator {
pub fn new() -> Self {
Self {
counts: vec![],
final_count: vec![],
random_state: Default::default(),
map: hashbrown::HashTable::with_capacity(128),
values: Vec::with_capacity(128),
group_indices: Vec::with_capacity(128),
}
}
}

impl GroupsAccumulator for DistinctCountGroupsAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = &values[0];

// Add one to each group's counter for each non null, non
// filtered value
self.counts.resize(total_num_groups, None);

// println!("dt: {:?}", values.data_type());

// let mut rows: Vec<u64> = vec![];

let arr = values.as_primitive::<Int64Type>();
for (i, v) in arr.iter().enumerate() {
if let Some(key) = v {
let group_index = group_indices[i];
let state = &self.random_state;
let hash = state.hash_one(key);
let hash = combine_hashes(hash, state.hash_one(group_index));

let insert = self.map.entry(
hash,
|g| unsafe {
self.group_indices.get_unchecked(*g) == &group_index
&& self.values.get_unchecked(*g) == &key
},
|g| unsafe {
let v = self.values.get_unchecked(*g);
let g = self.group_indices.get_unchecked(*g);
combine_hashes(state.hash_one(v), state.hash_one(g))
},
);

match insert {
hashbrown::hash_table::Entry::Occupied(o) => {},
hashbrown::hash_table::Entry::Vacant(v) => {
let g = self.values.len();
v.insert(g);
self.values.push(key);
self.group_indices.push(group_index);
// rows.push(i as u64);

if let Some(existing_keys) = &mut self.counts[group_index] {
// If it's Some(Vec), just push the new key
existing_keys.push(Some(key));
} else {
// If it's None, create a new Vec containing the key and assign it
self.counts[group_index] = Some(vec![Some(key)])
}
}
}
}
}

// let indices = UInt64Array::from(rows);
// let final_array = compute::take(arr, &indices, None)?;

// combine group indices and value and insert into the hashset,
// iterate again with group indices only and count the value for the same group

Ok(())
}

fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
debug_assert_eq!(values.len(), 1);

self.final_count.resize(total_num_groups, 0);

let list_arr = values[0].as_list::<i32>();
for (i, counts) in list_arr.iter().enumerate() {
let group_index = group_indices[i];
if let Some(counts) = counts {

let counts_in_row = counts.as_primitive::<Int64Type>();
for key in counts_in_row.iter().flatten() {

let state = &self.random_state;
let hash = state.hash_one(key);
let hash = combine_hashes(hash, state.hash_one(group_index));

let insert = self.map.entry(
hash,
|g| unsafe {
self.group_indices.get_unchecked(*g) == &group_index
&& self.values.get_unchecked(*g) == &key
},
|g| unsafe {
let v = self.values.get_unchecked(*g);
let g = self.group_indices.get_unchecked(*g);
combine_hashes(state.hash_one(v), state.hash_one(g))
},
);

match insert {
hashbrown::hash_table::Entry::Occupied(o) => {},
hashbrown::hash_table::Entry::Vacant(v) => {
let g = self.values.len();
v.insert(g);
self.values.push(key);
self.group_indices.push(group_index);

self.final_count[group_index] += 1;
}
}

}
}
}

Ok(())
}

fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let counts = emit_to.take_needed(&mut self.final_count);

let nulls = None;
let array = PrimitiveArray::<Int64Type>::new(counts.into(), nulls);

Ok(Arc::new(array))

}

fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let counts = emit_to.take_needed(&mut self.counts);
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(counts);
Ok(vec![Arc::new(list_array)])
}

fn size(&self) -> usize {
self.counts.capacity() * size_of::<usize>()
+ self.final_count.capacity() * size_of::<usize>()
+ self.map.capacity() * size_of::<usize>()
+ self.values.capacity() * size_of::<usize>()
+ self.group_indices.capacity() * size_of::<usize>()
}
}

/// count null values for multiple columns
/// for each row if one column value is null, then null_count + 1
fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize {
Expand Down
49 changes: 25 additions & 24 deletions datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ impl SingleDistinctToGroupBy {
fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result<bool> {
let mut fields_set = HashSet::new();
let mut aggregate_count = 0;
let mut distinct_count = 0;
let mut distinct_func: Option<&str> = None;
for expr in aggr_expr {
if let Expr::AggregateFunction(AggregateFunction {
func,
Expand All @@ -84,9 +86,11 @@ fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result<bool> {
}
aggregate_count += 1;
if *distinct {
distinct_count += 1;
for e in args {
fields_set.insert(e);
}
distinct_func = Some(func.name());
} else if func.name() != "sum"
&& func.name().to_lowercase() != "min"
&& func.name().to_lowercase() != "max"
Expand All @@ -97,6 +101,15 @@ fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result<bool> {
return Ok(false);
}
}

if distinct_count == 1 && fields_set.len() == 1 {
if let Some(distinct_func) = distinct_func {
if distinct_func == "count" {
return Ok(false);
}
}
}

Ok(aggregate_count == aggr_expr.len() && fields_set.len() == 1)
}

Expand Down Expand Up @@ -334,10 +347,8 @@ mod tests {
.build()?;

// Should work
let expected = "Projection: count(alias1) AS count(DISTINCT test.b) [count(DISTINCT test.b):Int64]\
\n Aggregate: groupBy=[[]], aggr=[[count(alias1)]] [count(alias1):Int64]\
\n Aggregate: groupBy=[[test.b AS alias1]], aggr=[[]] [alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Aggregate: groupBy=[[]], aggr=[[count(DISTINCT test.b)]] [count(DISTINCT test.b):Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(plan, expected)
}
Expand Down Expand Up @@ -408,10 +419,8 @@ mod tests {
.aggregate(Vec::<Expr>::new(), vec![count_distinct(lit(2) * col("b"))])?
.build()?;

let expected = "Projection: count(alias1) AS count(DISTINCT Int32(2) * test.b) [count(DISTINCT Int32(2) * test.b):Int64]\
\n Aggregate: groupBy=[[]], aggr=[[count(alias1)]] [count(alias1):Int64]\
\n Aggregate: groupBy=[[Int32(2) * test.b AS alias1]], aggr=[[]] [alias1:Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Aggregate: groupBy=[[]], aggr=[[count(DISTINCT Int32(2) * test.b)]] [count(DISTINCT Int32(2) * test.b):Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(plan, expected)
}
Expand All @@ -425,10 +434,8 @@ mod tests {
.build()?;

// Should work
let expected = "Projection: test.a, count(alias1) AS count(DISTINCT test.b) [a:UInt32, count(DISTINCT test.b):Int64]\
\n Aggregate: groupBy=[[test.a]], aggr=[[count(alias1)]] [a:UInt32, count(alias1):Int64]\
\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[]] [a:UInt32, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Aggregate: groupBy=[[test.a]], aggr=[[count(DISTINCT test.b)]] [a:UInt32, count(DISTINCT test.b):Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(plan, expected)
}
Expand Down Expand Up @@ -497,10 +504,8 @@ mod tests {
.build()?;

// Should work
let expected = "Projection: group_alias_0 AS test.a + Int32(1), count(alias1) AS count(DISTINCT test.c) [test.a + Int32(1):Int64, count(DISTINCT test.c):Int64]\
\n Aggregate: groupBy=[[group_alias_0]], aggr=[[count(alias1)]] [group_alias_0:Int64, count(alias1):Int64]\
\n Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int64, alias1:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Aggregate: groupBy=[[test.a + Int32(1)]], aggr=[[count(DISTINCT test.c)]] [test.a + Int32(1):Int64, count(DISTINCT test.c):Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(plan, expected)
}
Expand Down Expand Up @@ -539,10 +544,8 @@ mod tests {
)?
.build()?;
// Should work
let expected = "Projection: test.a, sum(alias2) AS sum(test.c), max(alias3) AS max(test.c), count(alias1) AS count(DISTINCT test.b) [a:UInt32, sum(test.c):UInt64;N, max(test.c):UInt32;N, count(DISTINCT test.b):Int64]\
\n Aggregate: groupBy=[[test.a]], aggr=[[sum(alias2), max(alias3), count(alias1)]] [a:UInt32, sum(alias2):UInt64;N, max(alias3):UInt32;N, count(alias1):Int64]\
\n Aggregate: groupBy=[[test.a, test.b AS alias1]], aggr=[[sum(test.c) AS alias2, max(test.c) AS alias3]] [a:UInt32, alias1:UInt32, alias2:UInt64;N, alias3:UInt32;N]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Aggregate: groupBy=[[test.a]], aggr=[[sum(test.c), max(test.c), count(DISTINCT test.b)]] [a:UInt32, sum(test.c):UInt64;N, max(test.c):UInt32;N, count(DISTINCT test.b):Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(plan, expected)
}
Expand All @@ -558,10 +561,8 @@ mod tests {
)?
.build()?;
// Should work
let expected = "Projection: test.c, min(alias2) AS min(test.a), count(alias1) AS count(DISTINCT test.b) [c:UInt32, min(test.a):UInt32;N, count(DISTINCT test.b):Int64]\
\n Aggregate: groupBy=[[test.c]], aggr=[[min(alias2), count(alias1)]] [c:UInt32, min(alias2):UInt32;N, count(alias1):Int64]\
\n Aggregate: groupBy=[[test.c, test.b AS alias1]], aggr=[[min(test.a) AS alias2]] [c:UInt32, alias1:UInt32, alias2:UInt32;N]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
let expected = "Aggregate: groupBy=[[test.c]], aggr=[[min(test.a), count(DISTINCT test.b)]] [c:UInt32, min(test.a):UInt32;N, count(DISTINCT test.b):Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_equal(plan, expected)
}
Expand Down
33 changes: 13 additions & 20 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1413,27 +1413,20 @@ from join_t1
inner join join_t2 on join_t1.t1_id = join_t2.t2_id
----
logical_plan
01)Projection: count(alias1) AS count(DISTINCT join_t1.t1_id)
02)--Aggregate: groupBy=[[]], aggr=[[count(alias1)]]
03)----Aggregate: groupBy=[[join_t1.t1_id AS alias1]], aggr=[[]]
04)------Projection: join_t1.t1_id
05)--------Inner Join: join_t1.t1_id = join_t2.t2_id
06)----------TableScan: join_t1 projection=[t1_id]
07)----------TableScan: join_t2 projection=[t2_id]
01)Aggregate: groupBy=[[]], aggr=[[count(DISTINCT join_t1.t1_id)]]
02)--Projection: join_t1.t1_id
03)----Inner Join: join_t1.t1_id = join_t2.t2_id
04)------TableScan: join_t1 projection=[t1_id]
05)------TableScan: join_t2 projection=[t2_id]
physical_plan
01)ProjectionExec: expr=[count(alias1)@0 as count(DISTINCT join_t1.t1_id)]
02)--AggregateExec: mode=Final, gby=[], aggr=[count(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(alias1)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=2
07)------------RepartitionExec: partitioning=Hash([alias1@0], 2), input_partitions=2
08)--------------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
09)----------------CoalesceBatchesExec: target_batch_size=2
10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0]
11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
12)--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
13)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
01)AggregateExec: mode=Final, gby=[], aggr=[count(DISTINCT join_t1.t1_id)]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[count(DISTINCT join_t1.t1_id)]
04)------CoalesceBatchesExec: target_batch_size=2
05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, t2_id@0)], projection=[t1_id@0]
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
07)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
08)------------DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.explain.logical_plan_only = true;
Expand Down
Loading
Loading