From f2006f7c4f82d386630b37c070d6ef0fd6b0bb7e Mon Sep 17 00:00:00 2001 From: Jeroen van Straten Date: Mon, 4 Jul 2022 16:10:50 +0200 Subject: [PATCH 01/13] Fix min/max discarding state for each partition --- .../compute/kernels/aggregate_basic_internal.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 6645e1a76bc..7cebc7c5bfb 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -441,12 +441,12 @@ struct MinMaxImpl : public ScalarAggregator { this->count += scalar.is_valid; if (local.has_nulls && !options.skip_nulls) { - this->state = local; + this->state += local; return Status::OK(); } local.MergeOne(internal::UnboxScalar::Unbox(scalar)); - this->state = local; + this->state += local; return Status::OK(); } @@ -458,7 +458,7 @@ struct MinMaxImpl : public ScalarAggregator { this->count += arr.length() - null_count; if (local.has_nulls && !options.skip_nulls) { - this->state = local; + this->state += local; return Status::OK(); } @@ -469,7 +469,7 @@ struct MinMaxImpl : public ScalarAggregator { local.MergeOne(arr.GetView(i)); } } - this->state = local; + this->state += local; return Status::OK(); } @@ -586,7 +586,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = null_count > 0; this->count += valid_count; if (local.has_nulls && !options.skip_nulls) { - this->state = local; + this->state += local; return Status::OK(); } @@ -595,7 +595,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.max = true_count > 0; local.min = false_count == 0; - this->state = local; + this->state += local; return Status::OK(); } @@ -605,7 +605,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = !scalar.is_valid; this->count += scalar.is_valid; if (local.has_nulls && !options.skip_nulls) { - this->state = local; + this->state += local; return Status::OK(); } @@ -614,7 +614,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.max = true_count > 0; local.min = false_count == 0; - this->state = local; + this->state += local; return Status::OK(); } }; From 7cb37a7f5e7dff7191927a2a80472647d94e540b Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Tue, 5 Jul 2022 15:11:18 -0700 Subject: [PATCH 02/13] [ARROW-16904]: inverted conditions for MinMax Based on multiple locations fixed in 85789a9, it seemed better to simplify a few conditions so that updating `this->state` from `local` is consolidated and more maintainable. This should make it easier to understand that there is only a difference in how the local state is updated when consuming input data and there is no difference in how the aggregate's state is updated from the local state. --- .../kernels/aggregate_basic_internal.h | 46 ++++++++----------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index 7cebc7c5bfb..d03862c2749 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -440,12 +440,10 @@ struct MinMaxImpl : public ScalarAggregator { local.has_nulls = !scalar.is_valid; this->count += scalar.is_valid; - if (local.has_nulls && !options.skip_nulls) { - this->state += local; - return Status::OK(); + if (!local.has_nulls || options.skip_nulls) { + local.MergeOne(internal::UnboxScalar::Unbox(scalar)); } - local.MergeOne(internal::UnboxScalar::Unbox(scalar)); this->state += local; return Status::OK(); } @@ -457,18 +455,16 @@ struct MinMaxImpl : public ScalarAggregator { local.has_nulls = null_count > 0; this->count += arr.length() - null_count; - if (local.has_nulls && !options.skip_nulls) { - this->state += local; - return Status::OK(); - } - - if (local.has_nulls) { - local += ConsumeWithNulls(arr); - } else { // All true values + if (!local.has_nulls) { for (int64_t i = 0; i < arr.length(); i++) { local.MergeOne(arr.GetView(i)); } } + + else if (local.has_nulls && options.skip_nulls) { + local += ConsumeWithNulls(arr); + } + this->state += local; return Status::OK(); } @@ -585,16 +581,13 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = null_count > 0; this->count += valid_count; - if (local.has_nulls && !options.skip_nulls) { - this->state += local; - return Status::OK(); + if (!local.has_nulls || options.skip_nulls) { + const auto true_count = arr.true_count(); + const auto false_count = valid_count - true_count; + local.max = true_count > 0; + local.min = false_count == 0; } - const auto true_count = arr.true_count(); - const auto false_count = valid_count - true_count; - local.max = true_count > 0; - local.min = false_count == 0; - this->state += local; return Status::OK(); } @@ -604,16 +597,13 @@ struct BooleanMinMaxImpl : public MinMaxImpl { local.has_nulls = !scalar.is_valid; this->count += scalar.is_valid; - if (local.has_nulls && !options.skip_nulls) { - this->state += local; - return Status::OK(); + if (!local.has_nulls || options.skip_nulls) { + const int true_count = scalar.is_valid && scalar.value; + const int false_count = scalar.is_valid && !scalar.value; + local.max = true_count > 0; + local.min = false_count == 0; } - const int true_count = scalar.is_valid && scalar.value; - const int false_count = scalar.is_valid && !scalar.value; - local.max = true_count > 0; - local.min = false_count == 0; - this->state += local; return Status::OK(); } From f48390ece1c632c9ba905c361e3d8fb41ced829e Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Thu, 7 Jul 2022 14:16:14 -0700 Subject: [PATCH 03/13] [ARROW-16904]: added R test This R test captures the code from ARROW-16904 that reproduces this bug. It uses a scalar aggregate, min, on a dataset (1 column) that produces several exec batches. --- r/tests/testthat/test-dataset.R | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 02880314c25..f639ffb4f04 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -618,6 +618,31 @@ test_that("UnionDataset handles InMemoryDatasets", { expect_equal(actual, expected) }) +test_that("scalar aggregates with many batches", { + test_data <- data.frame(val=1:1e7) + expected_result_distr <- ( + sapply(1:100, function (iter_ndx) { + test_data %>% + dplyr::summarise(min_val = min(val)) %>% + dplyr::collect() %>% + dplyr::pull(min_val) + }) %>% table() + ) + + ds_tmpfile <- tempfile('test-aggregate', fileext='.parquet') + arrow::write_parquet(test_data, ds_tmpfile) + actual_result_distr <- ( + sapply(1:100, function (iter_ndx) { + arrow::open_dataset(ds_tmpfile) %>% + dplyr::summarise(min_val = min(val)) %>% + dplyr::collect() %>% + dplyr::pull(min_val) + }) %>% table() + ) + + expect_equal(actual_result_distr, expected_result_distr) +}) + test_that("map_batches", { ds <- open_dataset(dataset_dir, partitioning = "part") From 431609aec1acaf33519846a7a26e569c6f5f5945 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Thu, 7 Jul 2022 14:32:29 -0700 Subject: [PATCH 04/13] [ARROW-16904]: added test to plan_test.cc This test exercises the bug found in ARROW-16904, by creating a ScalarAggregateNode for the "min_max" function. Previously, there was no unit test for scalar aggregate nodes. --- cpp/src/arrow/compute/exec/plan_test.cc | 51 +++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index f67d541e1ea..d1c423b50e7 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -933,6 +933,57 @@ TEST(ExecPlanExecution, SourceGroupedSum) { } } +TEST(ExecPlanExecution, SourceMinMaxScalar) { + for (bool parallel : { false, true }) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + + auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); + auto minmax_opts = std::make_shared(); + auto expected_result = ExecBatch::Make({ + *StructScalar::Make( + ScalarVector { + ScalarFromJSON(int32(), R"(-8)") + ,ScalarFromJSON(int32(), R"(12)") + } + ,{ "min", "max" } + ) + }); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + // NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute + ASSERT_OK( + Declaration::Sequence({ + { + "source" + ,SourceNodeOptions { + input.schema + ,input.gen(parallel, /*slow=*/false) + } + } + ,{ + "aggregate" + ,AggregateNodeOptions { + /*aggregates=*/{ + { "min_max", std::move(minmax_opts), "i32", "min_max" } + } + ,/*keys=*/{} + } + } + ,{ "sink", SinkNodeOptions{ &sink_gen } } + }).AddToPlan(plan.get()) + ); + + ASSERT_THAT( + StartAndCollect(plan.get(), sink_gen) + ,Finishes(ResultWith( + UnorderedElementsAreArray({ *expected_result }) + )) + ); + } +} + TEST(ExecPlanExecution, NestedSourceFilter) { for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); From d5c7479f604e46a38c53e14863d5acf1516ac7ad Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Thu, 7 Jul 2022 14:34:21 -0700 Subject: [PATCH 05/13] minor improvement to TestIntegerMinMaxKernel This minor improvement splits the min and max values into different chunks of chunked_input1. This doesn't improve coverage given how the scalar aggregate executes on a chunked array, but it seemed a nice extra thing to include --- cpp/src/arrow/compute/kernels/aggregate_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 79b01b477af..aa54fe5f3e2 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1563,7 +1563,7 @@ TEST_F(TestBooleanMinMaxKernel, Basics) { TYPED_TEST_SUITE(TestIntegerMinMaxKernel, PhysicalIntegralArrowTypes); TYPED_TEST(TestIntegerMinMaxKernel, Basics) { ScalarAggregateOptions options; - std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 1, null, 3, 4]"}; + std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 8, null, 3, 4]"}; std::vector chunked_input2 = {"[5, null, 2, 3, 4]", "[9, 1, 2, 3, 4]"}; std::vector chunked_input3 = {"[5, 1, 2, 3, null]", "[9, 1, null, 3, 4]"}; auto item_ty = default_type_instance(); From e60509ee2b75e4702aebba1191778c8f9efab156 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Thu, 7 Jul 2022 14:39:14 -0700 Subject: [PATCH 06/13] [ARROW-16904]: expanded AggregateNodeOptions doc Added to brief documentation for AggregateNodeOptions that provides insight to how the `keys` attribute affects how the `aggregates` attribute is used (or rather, how inputs are delegated to those aggregates). --- cpp/src/arrow/compute/exec/options.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index a86b6c63d36..4a0cd602efb 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -109,6 +109,10 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { }; /// \brief Make a node which aggregates input batches, optionally grouped by keys. +/// +/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is +/// expected to be a HashAggregate function. If the keys attribute is an empty vector, +/// then each aggregate is assumed to be a ScalarAggregate function. class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions { public: explicit AggregateNodeOptions(std::vector aggregates, From fb16334e45d9fe9aec339643575b8df77b019683 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Thu, 7 Jul 2022 14:54:59 -0700 Subject: [PATCH 07/13] [ARROW-16904]: minor style change --- r/tests/testthat/test-dataset.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index f639ffb4f04..c37e1aa9c22 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -619,7 +619,7 @@ test_that("UnionDataset handles InMemoryDatasets", { }) test_that("scalar aggregates with many batches", { - test_data <- data.frame(val=1:1e7) + test_data <- data.frame(val=1:1e7) expected_result_distr <- ( sapply(1:100, function (iter_ndx) { test_data %>% From 695e29bf79745407059216d718d243848dc38072 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Thu, 7 Jul 2022 15:00:57 -0700 Subject: [PATCH 08/13] [ARROW-16904]: minor style change --- cpp/src/arrow/compute/exec/plan_test.cc | 53 ++++++++----------------- 1 file changed, 17 insertions(+), 36 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index d1c423b50e7..922a6461660 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -937,17 +937,12 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { for (bool parallel : { false, true }) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); - auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); - auto minmax_opts = std::make_shared(); - auto expected_result = ExecBatch::Make({ - *StructScalar::Make( - ScalarVector { - ScalarFromJSON(int32(), R"(-8)") - ,ScalarFromJSON(int32(), R"(12)") - } - ,{ "min", "max" } - ) - }); + auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); + auto minmax_opts = std::make_shared(); + auto expected_value = StructScalar::Make( + ScalarVector{ScalarFromJSON(int32(), R"(-8)"), ScalarFromJSON(int32(), R"(12)")}, + {"min","max"}); + auto expected_result = ExecBatch::Make({*expected_value}); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); AsyncGenerator> sink_gen; @@ -955,32 +950,18 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) { // NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute ASSERT_OK( Declaration::Sequence({ - { - "source" - ,SourceNodeOptions { - input.schema - ,input.gen(parallel, /*slow=*/false) - } - } - ,{ - "aggregate" - ,AggregateNodeOptions { - /*aggregates=*/{ - { "min_max", std::move(minmax_opts), "i32", "min_max" } - } - ,/*keys=*/{} - } - } - ,{ "sink", SinkNodeOptions{ &sink_gen } } - }).AddToPlan(plan.get()) - ); + {"source", + SourceNodeOptions {input.schema, input.gen(parallel, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"min_max", + std::move(minmax_opts), + "i32", "min_max"}}, + /*keys=*/{}}}, + {"sink", SinkNodeOptions{&sink_gen}} + }) + .AddToPlan(plan.get())); - ASSERT_THAT( - StartAndCollect(plan.get(), sink_gen) - ,Finishes(ResultWith( - UnorderedElementsAreArray({ *expected_result }) - )) - ); + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray({*expected_result})))); } } From 5a3641f709b4112ddc4e64e854b6d458c0bd4e03 Mon Sep 17 00:00:00 2001 From: octalene Date: Fri, 8 Jul 2022 11:15:02 -0700 Subject: [PATCH 09/13] Update cpp/src/arrow/compute/exec/plan_test.cc added comment Co-authored-by: David Li --- cpp/src/arrow/compute/exec/plan_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 922a6461660..fa69fd20c12 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -934,6 +934,7 @@ TEST(ExecPlanExecution, SourceGroupedSum) { } TEST(ExecPlanExecution, SourceMinMaxScalar) { + // Regression test for ARROW-16904 for (bool parallel : { false, true }) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); From fba6c8cf767a3352b254ee68b9b47024fcb15536 Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Fri, 8 Jul 2022 12:17:14 -0700 Subject: [PATCH 10/13] [ARROW-16904]: fixed style for R test --- r/tests/testthat/test-dataset.R | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index c37e1aa9c22..aa0eedbad2b 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -619,25 +619,27 @@ test_that("UnionDataset handles InMemoryDatasets", { }) test_that("scalar aggregates with many batches", { - test_data <- data.frame(val=1:1e7) + test_data <- data.frame(val = 1:1e7) expected_result_distr <- ( - sapply(1:100, function (iter_ndx) { + sapply(1:100, function(iter_ndx) { test_data %>% dplyr::summarise(min_val = min(val)) %>% dplyr::collect() %>% dplyr::pull(min_val) - }) %>% table() + }) %>% + table() ) - ds_tmpfile <- tempfile('test-aggregate', fileext='.parquet') + ds_tmpfile <- tempfile("test-aggregate", fileext = ".parquet") arrow::write_parquet(test_data, ds_tmpfile) actual_result_distr <- ( - sapply(1:100, function (iter_ndx) { + sapply(1:100, function(iter_ndx) { arrow::open_dataset(ds_tmpfile) %>% dplyr::summarise(min_val = min(val)) %>% dplyr::collect() %>% dplyr::pull(min_val) - }) %>% table() + }) %>% + table() ) expect_equal(actual_result_distr, expected_result_distr) From 534691b439eb442c45c8be2ff1418188d6aa149a Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Fri, 8 Jul 2022 12:28:58 -0700 Subject: [PATCH 11/13] [ARROW-16904]: simplified Scalar construction Used `ScalarFromJSON` to construct the StructScalar instead of `StructScalar::Make`. Also fixed style using clang-format --- cpp/src/arrow/compute/exec/plan_test.cc | 29 +++++++++++-------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index fa69fd20c12..c29f3fa9d50 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -935,31 +935,28 @@ TEST(ExecPlanExecution, SourceGroupedSum) { TEST(ExecPlanExecution, SourceMinMaxScalar) { // Regression test for ARROW-16904 - for (bool parallel : { false, true }) { + for (bool parallel : {false, true}) { SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); auto input = MakeGroupableBatches(/*multiplicity=*/parallel ? 100 : 1); auto minmax_opts = std::make_shared(); - auto expected_value = StructScalar::Make( - ScalarVector{ScalarFromJSON(int32(), R"(-8)"), ScalarFromJSON(int32(), R"(12)")}, - {"min","max"}); - auto expected_result = ExecBatch::Make({*expected_value}); + auto expected_result = ExecBatch::Make( + {ScalarFromJSON(struct_({field("min", int32()), field("max", int32())}), + R"({"min": -8, "max": 12})")}); ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); AsyncGenerator> sink_gen; // NOTE: Test `ScalarAggregateNode` by omitting `keys` attribute - ASSERT_OK( - Declaration::Sequence({ - {"source", - SourceNodeOptions {input.schema, input.gen(parallel, /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"min_max", - std::move(minmax_opts), - "i32", "min_max"}}, - /*keys=*/{}}}, - {"sink", SinkNodeOptions{&sink_gen}} - }) - .AddToPlan(plan.get())); + ASSERT_OK(Declaration::Sequence( + {{"source", + SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"min_max", std::move(minmax_opts), + "i32", "min_max"}}, + /*keys=*/{}}}, + {"sink", SinkNodeOptions{&sink_gen}}}) + .AddToPlan(plan.get())); ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), Finishes(ResultWith(UnorderedElementsAreArray({*expected_result})))); From 988b68b8a9cf8256e40e4c922547ba8ab538de8a Mon Sep 17 00:00:00 2001 From: octalene Date: Fri, 8 Jul 2022 12:50:39 -0700 Subject: [PATCH 12/13] Update r/tests/testthat/test-dataset.R simplifying test body by specifying `chunk_size` to `write_parquet` and using `replicate` instead of `sapply`. Adding other style changes for readability. Co-authored-by: Neal Richardson --- r/tests/testthat/test-dataset.R | 34 +++++++++------------------------ 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index aa0eedbad2b..8cc41f34e25 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -618,31 +618,15 @@ test_that("UnionDataset handles InMemoryDatasets", { expect_equal(actual, expected) }) -test_that("scalar aggregates with many batches", { - test_data <- data.frame(val = 1:1e7) - expected_result_distr <- ( - sapply(1:100, function(iter_ndx) { - test_data %>% - dplyr::summarise(min_val = min(val)) %>% - dplyr::collect() %>% - dplyr::pull(min_val) - }) %>% - table() - ) - - ds_tmpfile <- tempfile("test-aggregate", fileext = ".parquet") - arrow::write_parquet(test_data, ds_tmpfile) - actual_result_distr <- ( - sapply(1:100, function(iter_ndx) { - arrow::open_dataset(ds_tmpfile) %>% - dplyr::summarise(min_val = min(val)) %>% - dplyr::collect() %>% - dplyr::pull(min_val) - }) %>% - table() - ) - - expect_equal(actual_result_distr, expected_result_distr) +test_that("scalar aggregates with many batches (ARROW-16904)", { + tf <- tempfile() + write_parquet(data.frame(x = 1:100), tf, chunk_size = 20) + + ds <- open_dataset(tf) + replicate(100, ds %>% summarize(min(x)) %>% pull()) + + expect_true(all(replicate(100, ds %>% summarize(min(x)) %>% pull()) == 1)) + expect_true(all(replicate(100, ds %>% summarize(max(x)) %>% pull()) == 100)) }) test_that("map_batches", { From 249af7b0f5b75c097a4252f8081e209a6070725c Mon Sep 17 00:00:00 2001 From: Aldrin M Date: Fri, 8 Jul 2022 14:06:42 -0700 Subject: [PATCH 13/13] [ARROW-16904]: minor style fix --- cpp/src/arrow/compute/kernels/aggregate_basic_internal.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index d03862c2749..c945e7f27f3 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -459,9 +459,7 @@ struct MinMaxImpl : public ScalarAggregator { for (int64_t i = 0; i < arr.length(); i++) { local.MergeOne(arr.GetView(i)); } - } - - else if (local.has_nulls && options.skip_nulls) { + } else if (local.has_nulls && options.skip_nulls) { local += ConsumeWithNulls(arr); }