Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,22 @@ where

/// Function that computes the output
bool_fn: F,

/// The identity element for the boolean operation.
/// Any value combined with this returns the original value.
identity: bool,
}

impl<F> BooleanGroupsAccumulator<F>
where
F: Fn(bool, bool) -> bool + Send + Sync,
{
pub fn new(bitop_fn: F) -> Self {
pub fn new(bool_fn: F, identity: bool) -> Self {
Self {
values: BooleanBufferBuilder::new(0),
null_state: NullState::new(),
bool_fn: bitop_fn,
bool_fn,
identity,
}
}
}
Expand All @@ -77,7 +82,9 @@ where

if self.values.len() < total_num_groups {
let new_groups = total_num_groups - self.values.len();
self.values.append_n(new_groups, Default::default());
// Fill with the identity element, so that when the first non-null value is encountered,
// it will combine with the identity and the result will be the first non-null value itself.
self.values.append_n(new_groups, self.identity);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice -- this is similar to the default value for the PrimitiveOp accumulator too

}

// NullState dispatches / handles tracking nulls and groups that saw no values
Expand Down
9 changes: 5 additions & 4 deletions datafusion/functions-aggregate/src/bool_and_or.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl AggregateUDFImpl for BoolAnd {
) -> Result<Box<dyn GroupsAccumulator>> {
match args.return_type {
DataType::Boolean => {
Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y)))
Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y, true)))
}
_ => not_impl_err!(
"GroupsAccumulator not supported for {} with {}",
Expand Down Expand Up @@ -270,9 +270,10 @@ impl AggregateUDFImpl for BoolOr {
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
match args.return_type {
DataType::Boolean => {
Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y)))
}
DataType::Boolean => Ok(Box::new(BooleanGroupsAccumulator::new(
|x, y| x || y,
false,
))),
_ => not_impl_err!(
"GroupsAccumulator not supported for {} with {}",
args.name,
Expand Down
45 changes: 45 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3724,6 +3724,51 @@ SELECT bool_or(distinct c1), bool_or(distinct c2), bool_or(distinct c3), bool_or
----
true true true false true true false NULL

# Test issue: https://github.com/apache/datafusion/issues/11846
statement ok
create table t1(v1 int, v2 boolean);

statement ok
insert into t1 values (1, true), (1, true);

statement ok
insert into t1 values (3, null), (3, true);

statement ok
insert into t1 values (2, false), (2, true);

statement ok
insert into t1 values (6, false), (6, false);

statement ok
insert into t1 values (4, null), (4, null);

statement ok
insert into t1 values (5, false), (5, null);

query IB
select v1, bool_and(v2) from t1 group by v1 order by v1;
----
1 true
2 false
3 true
4 NULL
5 false
6 false

query IB
select v1, bool_or(v2) from t1 group by v1 order by v1;
----
1 true
2 true
3 true
4 NULL
5 false
6 false

statement ok
drop table t1;

# All supported timestamp types

# "nanos" --> TimestampNanosecondArray
Expand Down