diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index a86b6c63d36..4a0cd602efb 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -109,6 +109,10 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { }; /// \brief Make a node which aggregates input batches, optionally grouped by keys. +/// +/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is +/// expected to be a HashAggregate function. If the keys attribute is an empty vector, +/// then each aggregate is assumed to be a ScalarAggregate function. class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions { public: explicit AggregateNodeOptions(std::vector aggregates, diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index f67d541e1ea..c29f3fa9d50 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -933,6 +933,36 @@ TEST(ExecPlanExecution, SourceGroupedSum) { } } +TEST(ExecPlanExecution, SourceMinMaxScalar) { + // Regression test for ARROW-16904 + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); + auto minmax_opts = std::make_shared(); + auto expected_result = ExecBatch::Make( + {ScalarFromJSON(struct_({field("min", int32()), field("max", int32())}), + R"({"min": -8, "max": 12})")}); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + // NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute + ASSERT_OK(Declaration::Sequence( + {{"source", + SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"min_max", std::move(minmax_opts), + "i32", "min_max"}}, + /*keys=*/{}}}, + {"sink", SinkNodeOptions{&sink_gen}}}) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({*expected_result})))); + } +} + TEST(ExecPlanExecution, NestedSourceFilter) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 6645e1a76bc..c945e7f27f3 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -440,13 +440,11 @@ struct MinMaxImpl : public ScalarAggregator { local.has_nulls = !scalar.is_valid; this->count += scalar.is_valid; - if (local.has_nulls && !options.skip_nulls) { - this->state = local; - return Status::OK(); + if (!local.has_nulls || options.skip_nulls) { + local.MergeOne(internal::UnboxScalar::Unbox(scalar)); } - local.MergeOne(internal::UnboxScalar::Unbox(scalar)); - this->state = local; + this->state += local; return Status::OK(); } @@ -457,19 +455,15 @@ struct MinMaxImpl : public ScalarAggregator { local.has_nulls = null_count > 0; this->count += arr.length() - null_count; - if (local.has_nulls && !options.skip_nulls) { - this->state = local; - return Status::OK(); - } - - if (local.has_nulls) { - local += ConsumeWithNulls(arr); - } else { // All true values + if (!local.has_nulls) { for (int64_t i = 0; i < arr.length(); i++) { local.MergeOne(arr.GetView(i)); } + } else if (local.has_nulls && options.skip_nulls) { + local += ConsumeWithNulls(arr); } - this->state = local; + + this->state += local; return Status::OK(); } @@ -585,17 +579,14 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = null_count > 0; this->count += valid_count; - if (local.has_nulls && !options.skip_nulls) { - this->state = local; - return Status::OK(); + if (!local.has_nulls || options.skip_nulls) { + const auto true_count = arr.true_count(); + const auto false_count = valid_count - true_count; + local.max = true_count > 0; + local.min = false_count == 0; } - const auto true_count = arr.true_count(); - const auto false_count = valid_count - true_count; - local.max = true_count > 0; - local.min = false_count == 0; - - this->state = local; + this->state += local; return Status::OK(); } @@ -604,17 +595,14 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = !scalar.is_valid; this->count += scalar.is_valid; - if (local.has_nulls && !options.skip_nulls) { - this->state = local; - return Status::OK(); + if (!local.has_nulls || options.skip_nulls) { + const int true_count = scalar.is_valid && scalar.value; + const int false_count = scalar.is_valid && !scalar.value; + local.max = true_count > 0; + local.min = false_count == 0; } - const int true_count = scalar.is_valid && scalar.value; - const int false_count = scalar.is_valid && !scalar.value; - local.max = true_count > 0; - local.min = false_count == 0; - - this->state = local; + this->state += local; return Status::OK(); } }; diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 79b01b477af..aa54fe5f3e2 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1563,7 +1563,7 @@ TEST_F(TestBooleanMinMaxKernel, Basics) { TYPED_TEST_SUITE(TestIntegerMinMaxKernel, PhysicalIntegralArrowTypes); TYPED_TEST(TestIntegerMinMaxKernel, Basics) { ScalarAggregateOptions options; - std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 1, null, 3, 4]"}; + std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 8, null, 3, 4]"}; std::vector chunked_input2 = {"[5, null, 2, 3, 4]", "[9, 1, 2, 3, 4]"}; std::vector chunked_input3 = {"[5, 1, 2, 3, null]", "[9, 1, null, 3, 4]"}; auto item_ty = default_type_instance(); diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 02880314c25..8cc41f34e25 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -618,6 +618,17 @@ test_that("UnionDataset handles InMemoryDatasets", { expect_equal(actual, expected) }) +test_that("scalar aggregates with many batches (ARROW-16904)", { + tf <- tempfile() + write_parquet(data.frame(x = 1:100), tf, chunk_size = 20) + + ds <- open_dataset(tf) + replicate(100, ds %>% summarize(min(x)) %>% pull()) + + expect_true(all(replicate(100, ds %>% summarize(min(x)) %>% pull()) == 1)) + expect_true(all(replicate(100, ds %>% summarize(max(x)) %>% pull()) == 100)) +}) + test_that("map_batches", { ds <- open_dataset(dataset_dir, partitioning = "part")