Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions cpp/src/arrow/compute/kernels/vector_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "arrow/array/array_dict.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/array/dict_internal.h"
#include "arrow/array/util.h"
#include "arrow/compute/api_vector.h"
Expand Down Expand Up @@ -440,19 +441,35 @@ class DictionaryHashKernel : public HashKernel {

Status Reset() override { return indices_kernel_->Reset(); }

Status HandleDictionary(const std::shared_ptr<ArrayData>& dict) {
Status Append(const ArrayData& arr) override {
if (!dictionary_) {
dictionary_ = dict;
} else if (!MakeArray(dictionary_)->Equals(*MakeArray(dict))) {
return Status::Invalid(
"Only hashing for data with equal dictionaries "
"currently supported");
dictionary_ = arr.dictionary;
} else if (!MakeArray(dictionary_)->Equals(*MakeArray(arr.dictionary))) {
// NOTE: This approach computes a new dictionary unification per chunk.
// This is in effect O(n*k) where n is the total chunked array length and
// k is the number of chunks (therefore O(n**2) if chunks have a fixed size).
//
// A better approach may be to run the kernel over each individual chunk,
// and then hash-aggregate all results (for example sum-group-by for
// the "value_counts" kernel).
auto out_dict_type = dictionary_->type;
std::shared_ptr<Buffer> transpose_map;
std::shared_ptr<Array> out_dict;
ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(out_dict_type));

ARROW_CHECK_OK(unifier->Unify(*MakeArray(dictionary_)));
ARROW_CHECK_OK(unifier->Unify(*MakeArray(arr.dictionary), &transpose_map));
ARROW_CHECK_OK(unifier->GetResult(&out_dict_type, &out_dict));

this->dictionary_ = out_dict->data();
auto transpose = reinterpret_cast<const int32_t*>(transpose_map->data());
auto in_dict_array = MakeArray(std::make_shared<ArrayData>(arr));
ARROW_ASSIGN_OR_RAISE(
auto tmp, arrow::internal::checked_cast<const DictionaryArray&>(*in_dict_array)
.Transpose(arr.type, out_dict, transpose));
return indices_kernel_->Append(*tmp->data());
}
return Status::OK();
}

Status Append(const ArrayData& arr) override {
RETURN_NOT_OK(HandleDictionary(arr.dictionary));
return indices_kernel_->Append(arr);
}

Expand Down
15 changes: 9 additions & 6 deletions cpp/src/arrow/compute/kernels/vector_hash_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,15 @@ TEST_F(TestHashKernel, DictionaryUniqueAndValueCounts) {
CheckUnique(chunked, ex_uniques);
CheckValueCounts(chunked, ex_uniques, ex_counts);

// Different dictionaries not supported
auto dict2 = ArrayFromJSON(int64(), "[30, 40, 50, 60]");
auto input2 = std::make_shared<DictionaryArray>(dict_ty, indices, dict2);
auto different_dictionaries = *ChunkedArray::Make({input, input2});
ASSERT_RAISES(Invalid, Unique(different_dictionaries));
ASSERT_RAISES(Invalid, ValueCounts(different_dictionaries));
// Different chunk dictionaries
auto input_2 = DictArrayFromJSON(dict_ty, "[1, null, 2, 3]", "[30, 40, 50, 60]");
auto ex_uniques_2 =
DictArrayFromJSON(dict_ty, "[3, 0, 1, null, 4, 5]", "[10, 20, 30, 40, 50, 60]");
auto ex_counts_2 = ArrayFromJSON(int64(), "[4, 5, 4, 1, 1, 1]");
auto different_dictionaries = *ChunkedArray::Make({input, input_2}, dict_ty);

CheckUnique(different_dictionaries, ex_uniques_2);
CheckValueCounts(different_dictionaries, ex_uniques_2, ex_counts_2);

// Dictionary with encoded nulls
auto dict_with_null = ArrayFromJSON(int64(), "[10, null, 30, 40]");
Expand Down