diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc index de4d3ee3022..0ed15702832 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash.cc @@ -22,6 +22,7 @@ #include "arrow/array/array_dict.h" #include "arrow/array/array_nested.h" #include "arrow/array/builder_primitive.h" +#include "arrow/array/concatenate.h" #include "arrow/array/dict_internal.h" #include "arrow/array/util.h" #include "arrow/compute/api_vector.h" @@ -440,19 +441,35 @@ class DictionaryHashKernel : public HashKernel { Status Reset() override { return indices_kernel_->Reset(); } - Status HandleDictionary(const std::shared_ptr& dict) { + Status Append(const ArrayData& arr) override { if (!dictionary_) { - dictionary_ = dict; - } else if (!MakeArray(dictionary_)->Equals(*MakeArray(dict))) { - return Status::Invalid( - "Only hashing for data with equal dictionaries " - "currently supported"); + dictionary_ = arr.dictionary; + } else if (!MakeArray(dictionary_)->Equals(*MakeArray(arr.dictionary))) { + // NOTE: This approach computes a new dictionary unification per chunk. + // This is in effect O(n*k) where n is the total chunked array length and + // k is the number of chunks (therefore O(n**2) if chunks have a fixed size). + // + // A better approach may be to run the kernel over each individual chunk, + // and then hash-aggregate all results (for example sum-group-by for + // the "value_counts" kernel). + auto out_dict_type = dictionary_->type; + std::shared_ptr transpose_map; + std::shared_ptr out_dict; + ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(out_dict_type)); + + ARROW_CHECK_OK(unifier->Unify(*MakeArray(dictionary_))); + ARROW_CHECK_OK(unifier->Unify(*MakeArray(arr.dictionary), &transpose_map)); + ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict)); + + this->dictionary_ = out_dict->data(); + auto transpose = reinterpret_cast(transpose_map->data()); + auto in_dict_array = MakeArray(std::make_shared(arr)); + ARROW_ASSIGN_OR_RAISE( + auto tmp, arrow::internal::checked_cast(*in_dict_array) + .Transpose(arr.type, out_dict, transpose)); + return indices_kernel_->Append(*tmp->data()); } - return Status::OK(); - } - Status Append(const ArrayData& arr) override { - RETURN_NOT_OK(HandleDictionary(arr.dictionary)); return indices_kernel_->Append(arr); } diff --git a/cpp/src/arrow/compute/kernels/vector_hash_test.cc b/cpp/src/arrow/compute/kernels/vector_hash_test.cc index 179792e2141..a3fa9314e60 100644 --- a/cpp/src/arrow/compute/kernels/vector_hash_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_hash_test.cc @@ -591,12 +591,15 @@ TEST_F(TestHashKernel, DictionaryUniqueAndValueCounts) { CheckUnique(chunked, ex_uniques); CheckValueCounts(chunked, ex_uniques, ex_counts); - // Different dictionaries not supported - auto dict2 = ArrayFromJSON(int64(), "[30, 40, 50, 60]"); - auto input2 = std::make_shared(dict_ty, indices, dict2); - auto different_dictionaries = *ChunkedArray::Make({input, input2}); - ASSERT_RAISES(Invalid, Unique(different_dictionaries)); - ASSERT_RAISES(Invalid, ValueCounts(different_dictionaries)); + // Different chunk dictionaries + auto input_2 = DictArrayFromJSON(dict_ty, "[1, null, 2, 3]", "[30, 40, 50, 60]"); + auto ex_uniques_2 = + DictArrayFromJSON(dict_ty, "[3, 0, 1, null, 4, 5]", "[10, 20, 30, 40, 50, 60]"); + auto ex_counts_2 = ArrayFromJSON(int64(), "[4, 5, 4, 1, 1, 1]"); + auto different_dictionaries = *ChunkedArray::Make({input, input_2}, dict_ty); + + CheckUnique(different_dictionaries, ex_uniques_2); + CheckValueCounts(different_dictionaries, ex_uniques_2, ex_counts_2); // Dictionary with encoded nulls auto dict_with_null = ArrayFromJSON(int64(), "[10, null, 30, 40]");