diff --git a/cpp/src/arrow/array/array_dict.cc b/cpp/src/arrow/array/array_dict.cc index 614c81d8f40..dc9c23cdd80 100644 --- a/cpp/src/arrow/array/array_dict.cc +++ b/cpp/src/arrow/array/array_dict.cc @@ -29,6 +29,7 @@ #include "arrow/array/dict_internal.h" #include "arrow/array/util.h" #include "arrow/buffer.h" +#include "arrow/datum.h" #include "arrow/status.h" #include "arrow/type.h" #include "arrow/type_traits.h" @@ -205,6 +206,23 @@ class DictionaryUnifierImpl : public DictionaryUnifier { return Status::OK(); } + Status GetResultWithIndexType(std::shared_ptr index_type, + std::shared_ptr* out_dict) override { + int64_t dict_length = memo_table_.size(); + if (!internal::IntegersCanFit(Datum(dict_length), *index_type).ok()) { + return Status::Invalid( + "These dictionaries cannot be combined. The unified dictionary requires a " + "larger index type."); + } + + // Build unified dictionary array + std::shared_ptr data; + RETURN_NOT_OK(DictTraits::GetDictionaryArrayData(pool_, value_type_, memo_table_, + 0 /* start_offset */, &data)); + *out_dict = MakeArray(data); + return Status::OK(); + } + private: MemoryPool* pool_; std::shared_ptr value_type_; diff --git a/cpp/src/arrow/array/concatenate.cc b/cpp/src/arrow/array/concatenate.cc index a22b28be21a..1aa33c05f7c 100644 --- a/cpp/src/arrow/array/concatenate.cc +++ b/cpp/src/arrow/array/concatenate.cc @@ -36,6 +36,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/int_util.h" #include "arrow/util/int_util_internal.h" #include "arrow/util/logging.h" #include "arrow/visitor_inline.h" @@ -44,6 +45,7 @@ namespace arrow { using internal::SafeSignedAdd; +namespace { /// offset, length pair for representing a Range of a buffer or array struct Range { int64_t offset = -1, length = 0; @@ -66,8 +68,8 @@ struct Bitmap { }; // Allocate a buffer and concatenate bitmaps into it. -static Status ConcatenateBitmaps(const std::vector& bitmaps, MemoryPool* pool, - std::shared_ptr* out) { +Status ConcatenateBitmaps(const std::vector& bitmaps, MemoryPool* pool, + std::shared_ptr* out) { int64_t out_length = 0; for (const auto& bitmap : bitmaps) { if (internal::AddWithOverflow(out_length, bitmap.range.length, &out_length)) { @@ -94,15 +96,15 @@ static Status ConcatenateBitmaps(const std::vector& bitmaps, MemoryPool* // Write offsets in src into dst, adjusting them such that first_offset // will be the first offset written. template -static Status PutOffsets(const std::shared_ptr& src, Offset first_offset, - Offset* dst, Range* values_range); +Status PutOffsets(const std::shared_ptr& src, Offset first_offset, Offset* dst, + Range* values_range); // Concatenate buffers holding offsets into a single buffer of offsets, // also computing the ranges of values spanned by each buffer of offsets. template -static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool, - std::shared_ptr* out, - std::vector* values_ranges) { +Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool, + std::shared_ptr* out, + std::vector* values_ranges) { values_ranges->resize(buffers.size()); // allocate output buffer @@ -130,8 +132,8 @@ static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool, } template -static Status PutOffsets(const std::shared_ptr& src, Offset first_offset, - Offset* dst, Range* values_range) { +Status PutOffsets(const std::shared_ptr& src, Offset first_offset, Offset* dst, + Range* values_range) { if (src->size() == 0) { // It's allowed to have an empty offsets buffer for a 0-length array // (see Array::Validate) @@ -163,6 +165,44 @@ static Status PutOffsets(const std::shared_ptr& src, Offset first_offset return Status::OK(); } +struct DictionaryConcatenate { + DictionaryConcatenate(BufferVector& index_buffers, BufferVector& index_lookup, + MemoryPool* pool) + : out_(nullptr), + index_buffers_(index_buffers), + index_lookup_(index_lookup), + pool_(pool) {} + + template + enable_if_t::value, Status> Visit(const T& t) { + return Status::Invalid("Dictionary indices must be integral types"); + } + + template + enable_if_integer Visit(const T& index_type) { + int64_t out_length = 0; + for (const auto& buffer : index_buffers_) { + out_length += buffer->size(); + } + ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_)); + CType* out_data = reinterpret_cast(out_->mutable_data()); + for (size_t i = 0; i < index_buffers_.size(); i++) { + const auto& buffer = index_buffers_[i]; + auto size = buffer->size() / sizeof(CType); + auto old_indices = reinterpret_cast(buffer->data()); + auto indices_map = reinterpret_cast(index_lookup_[i]->data()); + internal::TransposeInts(old_indices, out_data, size, indices_map); + out_data += size; + } + return Status::OK(); + } + + std::shared_ptr out_; + const BufferVector& index_buffers_; + const BufferVector& index_lookup_; + MemoryPool* pool_; +}; + class ConcatenateImpl { public: ConcatenateImpl(const std::vector>& in, @@ -255,6 +295,21 @@ class ConcatenateImpl { return Status::OK(); } + Result UnifyDictionaries(const DictionaryType& d) { + BufferVector new_index_lookup; + ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(d.value_type())); + new_index_lookup.resize(in_.size()); + for (size_t i = 0; i < in_.size(); i++) { + auto item = in_[i]; + auto dictionary_array = MakeArray(item->dictionary); + RETURN_NOT_OK(unifier->Unify(*dictionary_array, &new_index_lookup[i])); + } + std::shared_ptr out_dictionary; + RETURN_NOT_OK(unifier->GetResultWithIndexType(d.index_type(), &out_dictionary)); + out_->dictionary = out_dictionary->data(); + return new_index_lookup; + } + Status Visit(const DictionaryType& d) { auto fixed = internal::checked_cast(d.index_type().get()); @@ -269,12 +324,16 @@ class ConcatenateImpl { } } + ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed)); if (dictionaries_same) { out_->dictionary = in_[0]->dictionary; - ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed)); return ConcatenateBuffers(index_buffers, pool_).Value(&out_->buffers[1]); } else { - return Status::NotImplemented("Concat with dictionary unification NYI"); + ARROW_ASSIGN_OR_RAISE(auto index_lookup, UnifyDictionaries(d)); + DictionaryConcatenate concatenate(index_buffers, index_lookup, pool_); + RETURN_NOT_OK(VisitTypeInline(*d.index_type(), &concatenate)); + out_->buffers[1] = std::move(concatenate.out_); + return Status::OK(); } } @@ -416,6 +475,8 @@ class ConcatenateImpl { std::shared_ptr out_; }; +} // namespace + Result> Concatenate(const ArrayVector& arrays, MemoryPool* pool) { if (arrays.size() == 0) { return Status::Invalid("Must pass at least one array"); diff --git a/cpp/src/arrow/array/concatenate_test.cc b/cpp/src/arrow/array/concatenate_test.cc index 428967889ec..c6e6e5bcc57 100644 --- a/cpp/src/arrow/array/concatenate_test.cc +++ b/cpp/src/arrow/array/concatenate_test.cc @@ -225,6 +225,133 @@ TEST_F(ConcatenateTest, DictionaryType) { }); } +TEST_F(ConcatenateTest, DictionaryTypeDifferentDictionaries) { + { + auto dict_type = dictionary(uint8(), utf8()); + auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]", + "[\"A0\", \"A1\", \"A2\", \"A3\"]"); + auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]", + "[\"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]"); + auto concat_expected = DictArrayFromJSON( + dict_type, "[1, 2, null, 3, 0, null, 8, 6, 5]", + "[\"A0\", \"A1\", \"A2\", \"A3\", \"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]"); + ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two})); + AssertArraysEqual(*concat_expected, *concat_actual); + } + { + const int SIZE = 500; + auto dict_type = dictionary(uint16(), utf8()); + + UInt16Builder index_builder; + UInt16Builder expected_index_builder; + ASSERT_OK(index_builder.Reserve(SIZE)); + ASSERT_OK(expected_index_builder.Reserve(SIZE * 2)); + for (auto i = 0; i < SIZE; i++) { + index_builder.UnsafeAppend(i); + expected_index_builder.UnsafeAppend(i); + } + for (auto i = SIZE; i < 2 * SIZE; i++) { + expected_index_builder.UnsafeAppend(i); + } + ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto expected_indices, expected_index_builder.Finish()); + + // Creates three dictionaries. The first maps i->"{i}" the second maps i->"{500+i}", + // each for 500 values and the third maps i->"{i}" but for 1000 values. + // The first and second concatenated should end up equaling the third. All strings + // are padded to length 8 so we can know the size ahead of time. + StringBuilder values_one_builder; + StringBuilder values_two_builder; + ASSERT_OK(values_one_builder.Resize(SIZE)); + ASSERT_OK(values_two_builder.Resize(SIZE)); + ASSERT_OK(values_one_builder.ReserveData(8 * SIZE)); + ASSERT_OK(values_two_builder.ReserveData(8 * SIZE)); + for (auto i = 0; i < SIZE; i++) { + auto i_str = std::to_string(i); + auto padded = i_str.insert(0, 8 - i_str.length(), '0'); + values_one_builder.UnsafeAppend(padded); + auto upper_i_str = std::to_string(i + SIZE); + auto upper_padded = upper_i_str.insert(0, 8 - i_str.length(), '0'); + values_two_builder.UnsafeAppend(upper_padded); + } + ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_one_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto dictionary_two, values_two_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto expected_dictionary, + Concatenate({dictionary_one, dictionary_two})) + + auto one = std::make_shared(dict_type, indices, dictionary_one); + auto two = std::make_shared(dict_type, indices, dictionary_two); + auto expected = std::make_shared(dict_type, expected_indices, + expected_dictionary); + ASSERT_OK_AND_ASSIGN(auto combined, Concatenate({one, two})); + AssertArraysEqual(*combined, *expected); + } +} + +TEST_F(ConcatenateTest, DictionaryTypePartialOverlapDictionaries) { + auto dict_type = dictionary(uint8(), utf8()); + auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]", + "[\"A0\", \"A1\", \"C2\", \"C3\"]"); + auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]", + "[\"B0\", \"B1\", \"C2\", \"C3\", \"B4\"]"); + auto concat_expected = + DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0, null, 6, 2, 5]", + "[\"A0\", \"A1\", \"C2\", \"C3\", \"B0\", \"B1\", \"B4\"]"); + ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two})); + AssertArraysEqual(*concat_expected, *concat_actual); +} + +TEST_F(ConcatenateTest, DictionaryTypeDifferentSizeIndex) { + auto dict_type = dictionary(uint8(), utf8()); + auto bigger_dict_type = dictionary(uint16(), utf8()); + auto dict_one = DictArrayFromJSON(dict_type, "[0]", "[\"A0\"]"); + auto dict_two = DictArrayFromJSON(bigger_dict_type, "[0]", "[\"B0\"]"); + ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status()); +} + +TEST_F(ConcatenateTest, DictionaryTypeCantUnifyNullInDictionary) { + auto dict_type = dictionary(uint8(), utf8()); + auto dict_one = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"A\"]"); + auto dict_two = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"B\"]"); + ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status()); +} + +TEST_F(ConcatenateTest, DictionaryTypeEnlargedIndices) { + auto size = std::numeric_limits::max() + 1; + auto dict_type = dictionary(uint8(), uint16()); + + UInt8Builder index_builder; + ASSERT_OK(index_builder.Reserve(size)); + for (auto i = 0; i < size; i++) { + index_builder.UnsafeAppend(i); + } + ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish()); + + UInt16Builder values_builder; + ASSERT_OK(values_builder.Reserve(size)); + UInt16Builder values_builder_two; + ASSERT_OK(values_builder_two.Reserve(size)); + for (auto i = 0; i < size; i++) { + values_builder.UnsafeAppend(i); + values_builder_two.UnsafeAppend(i + size); + } + ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto dictionary_two, values_builder_two.Finish()); + + auto dict_one = std::make_shared(dict_type, indices, dictionary_one); + auto dict_two = std::make_shared(dict_type, indices, dictionary_two); + ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status()); + + auto bigger_dict_type = dictionary(uint16(), uint16()); + + auto bigger_one = + std::make_shared(bigger_dict_type, dictionary_one, dictionary_one); + auto bigger_two = + std::make_shared(bigger_dict_type, dictionary_one, dictionary_two); + ASSERT_OK_AND_ASSIGN(auto combined, Concatenate({bigger_one, bigger_two})); + ASSERT_EQ(size * 2, combined->length()); +} + TEST_F(ConcatenateTest, DISABLED_UnionType) { // sparse mode Check([this](int32_t size, double null_probability, std::shared_ptr* out) { diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index a1071c4c86e..127ed598399 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -1367,6 +1367,12 @@ class ARROW_EXPORT DictionaryUnifier { /// after this is called virtual Status GetResult(std::shared_ptr* out_type, std::shared_ptr* out_dict) = 0; + + /// \brief Return a unified dictionary with the given index type. If + /// the index type is not large enough then an invalid status will be returned. + /// The unifier cannot be used after this is called + virtual Status GetResultWithIndexType(std::shared_ptr index_type, + std::shared_ptr* out_dict) = 0; }; // ----------------------------------------------------------------------