From 73201125b8b77126317e739eca99c637eee217c5 Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Sat, 21 Oct 2023 07:34:35 +0800 Subject: [PATCH 1/6] optimize dictionary hash functions --- cpp/src/arrow/compute/kernels/vector_hash.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 65e59d1a2eb..9c15b3bd10e 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -458,14 +458,17 @@ class DictionaryHashKernel : public HashKernel { // A better approach may be to run the kernel over each individual chunk, // and then hash-aggregate all results (for example sum-group-by for // the "value_counts" kernel). + if (dictionary_unifier_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(dictionary_unifier_, + DictionaryUnifier::Make(dictionary_->type())); + RETURN_NOT_OK(dictionary_unifier_->Unify(*dictionary_)); + } auto out_dict_type = dictionary_->type(); std::shared_ptr transpose_map; std::shared_ptr out_dict; - ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(out_dict_type)); - ARROW_CHECK_OK(unifier->Unify(*dictionary_)); - ARROW_CHECK_OK(unifier->Unify(*arr_dict, &transpose_map)); - ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict)); + RETURN_NOT_OK(dictionary_unifier_->Unify(*arr_dict, &transpose_map)); + RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict)); dictionary_ = out_dict; auto transpose = reinterpret_cast(transpose_map->data()); @@ -501,6 +504,7 @@ class DictionaryHashKernel : public HashKernel { std::unique_ptr indices_kernel_; std::shared_ptr dictionary_; std::shared_ptr dictionary_value_type_; + std::unique_ptr dictionary_unifier_; }; // ---------------------------------------------------------------------- From f09ab5c6d26a56ab51621f199f32b19c475092b6 Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Sat, 21 Oct 2023 10:20:51 +0800 Subject: [PATCH 2/6] add benchmark --- .../compute/kernels/vector_hash_benchmark.cc | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc index e9548e133aa..472f50db8cf 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash_benchmark.cc @@ -25,6 +25,7 @@ #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/testing/util.h" +#include "arrow/util/logging.h" #include "arrow/compute/api.h" @@ -226,6 +227,33 @@ static void UniqueString100bytes(benchmark::State& state) { BenchUnique(state, HashParams{general_bench_cases[state.range(0)], 100}); } +template +void BenchValueCountsDictionaryChunks(benchmark::State& state, const ParamType& params) { + std::shared_ptr arr; + params.GenerateTestData(&arr); + // chunk arr to 100 slices + std::vector> chunks; + const int64_t chunk_size = arr->length() / 100; + for (int64_t i = 0; i < 100; ++i) { + auto slice = arr->Slice(i * chunk_size, chunk_size); + auto datum = DictionaryEncode(slice).ValueOrDie(); + ARROW_CHECK(datum.is_array()); + chunks.push_back(datum.make_array()); + } + auto chunked_array = std::make_shared(chunks); + + while (state.KeepRunning()) { + ABORT_NOT_OK(ValueCounts(chunked_array).status()); + } + params.SetMetadata(state); +} + +static void ValueCountsDictionaryChunks(benchmark::State& state) { + // Dictionary of byte strings with 10 bytes each + BenchValueCountsDictionaryChunks( + state, HashParams{general_bench_cases[state.range(0)], 10}); +} + void HashSetArgs(benchmark::internal::Benchmark* bench) { for (int i = 0; i < static_cast(general_bench_cases.size()); ++i) { bench->Arg(i); @@ -239,6 +267,14 @@ BENCHMARK(UniqueInt64)->Apply(HashSetArgs); BENCHMARK(UniqueString10bytes)->Apply(HashSetArgs); BENCHMARK(UniqueString100bytes)->Apply(HashSetArgs); +void DictionaryChunksHashSetArgs(benchmark::internal::Benchmark* bench) { + for (int i = 0; i < static_cast(general_bench_cases.size()); ++i) { + bench->Arg(i); + } +} + +BENCHMARK(ValueCountsDictionaryChunks)->Apply(DictionaryChunksHashSetArgs); + void UInt8SetArgs(benchmark::internal::Benchmark* bench) { for (int i = 0; i < static_cast(uint8_bench_cases.size()); ++i) { bench->Arg(i); From a841030157bd115a2c1bfdae5f4f374e50e744ce Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Tue, 19 Dec 2023 07:07:41 +0800 Subject: [PATCH 3/6] remove dictionary_ member --- cpp/src/arrow/compute/kernels/vector_hash.cc | 47 +++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 9c15b3bd10e..b5c290717ec 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -26,17 +26,20 @@ #include "arrow/array/concatenate.h" #include "arrow/array/dict_internal.h" #include "arrow/array/util.h" +#include "arrow/buffer.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" #include "arrow/compute/kernels/common_internal.h" #include "arrow/result.h" #include "arrow/util/hashing.h" +#include "arrow/util/int_util.h" #include "arrow/util/unreachable.h" namespace arrow { using internal::DictionaryTraits; using internal::HashTraits; +using internal::TransposeInts; namespace compute { namespace internal { @@ -448,9 +451,9 @@ class DictionaryHashKernel : public HashKernel { Status Append(const ArraySpan& arr) override { auto arr_dict = arr.dictionary().ToArray(); - if (!dictionary_) { - dictionary_ = arr_dict; - } else if (!dictionary_->Equals(*arr_dict)) { + if (!first_dictionary_) { + first_dictionary_ = arr_dict; + } else if (!first_dictionary_->Equals(*arr_dict)) { // NOTE: This approach computes a new dictionary unification per chunk. // This is in effect O(n*k) where n is the total chunked array length and // k is the number of chunks (therefore O(n**2) if chunks have a fixed size). @@ -460,22 +463,21 @@ class DictionaryHashKernel : public HashKernel { // the "value_counts" kernel). if (dictionary_unifier_ == nullptr) { ARROW_ASSIGN_OR_RAISE(dictionary_unifier_, - DictionaryUnifier::Make(dictionary_->type())); - RETURN_NOT_OK(dictionary_unifier_->Unify(*dictionary_)); + DictionaryUnifier::Make(first_dictionary_->type())); + RETURN_NOT_OK(dictionary_unifier_->Unify(*first_dictionary_)); } - auto out_dict_type = dictionary_->type(); + auto out_dict_type = first_dictionary_->type(); std::shared_ptr transpose_map; - std::shared_ptr out_dict; RETURN_NOT_OK(dictionary_unifier_->Unify(*arr_dict, &transpose_map)); - RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict)); - dictionary_ = out_dict; auto transpose = reinterpret_cast(transpose_map->data()); - auto in_dict_array = arr.ToArray(); + auto in_array = arr.ToArray(); + const auto& in_dict_array = + arrow::internal::checked_cast(*in_array); ARROW_ASSIGN_OR_RAISE( - auto tmp, arrow::internal::checked_cast(*in_dict_array) - .Transpose(arr.type->GetSharedPtr(), out_dict, transpose)); + auto tmp, in_dict_array.Transpose(arr.type->GetSharedPtr(), + in_dict_array.dictionary(), transpose)); return indices_kernel_->Append(*tmp->data()); } @@ -498,11 +500,23 @@ class DictionaryHashKernel : public HashKernel { return dictionary_value_type_; } - std::shared_ptr dictionary() const { return dictionary_; } + Result> dictionary() const { + if (!first_dictionary_) { // Append is never called + return nullptr; + } + if (!dictionary_unifier_) { // Append is called only once + return first_dictionary_; + } + + auto out_dict_type = first_dictionary_->type(); + std::shared_ptr out_dict; + RETURN_NOT_OK(dictionary_unifier_->GetResult(&out_dict_type, &out_dict)); + return out_dict; + } private: std::unique_ptr indices_kernel_; - std::shared_ptr dictionary_; + std::shared_ptr first_dictionary_; std::shared_ptr dictionary_value_type_; std::unique_ptr dictionary_unifier_; }; @@ -634,8 +648,9 @@ Status ValueCountsFinalize(KernelContext* ctx, std::vector* out) { // hence have no dictionary. Result> EnsureHashDictionary(KernelContext* ctx, DictionaryHashKernel* hash) { - if (hash->dictionary()) { - return hash->dictionary()->data(); + ARROW_ASSIGN_OR_RAISE(auto dict, hash->dictionary()); + if (dict) { + return dict->data(); } ARROW_ASSIGN_OR_RAISE(auto null, MakeArrayOfNull(hash->dictionary_value_type(), /*length=*/0, ctx->memory_pool())); From 87d7e1df696ea7c3ab65b08ea8b92055cba6961a Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Sat, 23 Dec 2023 11:47:36 +0800 Subject: [PATCH 4/6] Update cpp/src/arrow/compute/kernels/vector_hash.cc Co-authored-by: Felipe Oliveira Carvalho --- cpp/src/arrow/compute/kernels/vector_hash.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index b5c290717ec..4f166510139 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -501,10 +501,10 @@ class DictionaryHashKernel : public HashKernel { } Result> dictionary() const { - if (!first_dictionary_) { // Append is never called + if (!first_dictionary_) { // Append was never called return nullptr; } - if (!dictionary_unifier_) { // Append is called only once + if (!dictionary_unifier_) { // Append was called only once return first_dictionary_; } From f14c783eb36aebe42862bf63e60afd048fb11a68 Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Sat, 23 Dec 2023 11:47:49 +0800 Subject: [PATCH 5/6] Update cpp/src/arrow/compute/kernels/vector_hash.cc Co-authored-by: Felipe Oliveira Carvalho --- cpp/src/arrow/compute/kernels/vector_hash.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 4f166510139..917f930ac5d 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -500,6 +500,8 @@ class DictionaryHashKernel : public HashKernel { return dictionary_value_type_; } + /// This can't be called more than once because DictionaryUnifier::GetResult() + /// can't be called more than once and produce the same output. Result> dictionary() const { if (!first_dictionary_) { // Append was never called return nullptr; From 8fbcbf33a14b3e2ccc7ffe69d8aaa85717d77b7a Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Tue, 19 Dec 2023 15:46:18 +0800 Subject: [PATCH 6/6] lint --- cpp/src/arrow/compute/kernels/vector_hash.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index 917f930ac5d..800deba3a5e 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -501,7 +501,7 @@ class DictionaryHashKernel : public HashKernel { } /// This can't be called more than once because DictionaryUnifier::GetResult() - /// can't be called more than once and produce the same output. + /// can't be called more than once and produce the same output. Result> dictionary() const { if (!first_dictionary_) { // Append was never called return nullptr;