Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cpp/src/arrow/array/array_dict.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/array/dict_internal.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/datum.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
Expand Down Expand Up @@ -205,6 +206,23 @@ class DictionaryUnifierImpl : public DictionaryUnifier {
return Status::OK();
}

Status GetResultWithIndexType(std::shared_ptr<DataType> index_type,
std::shared_ptr<Array>* out_dict) override {
int64_t dict_length = memo_table_.size();
if (!internal::IntegersCanFit(Datum(dict_length), *index_type).ok()) {
return Status::Invalid(
"These dictionaries cannot be combined. The unified dictionary requires a "
"larger index type.");
}

// Build unified dictionary array
std::shared_ptr<ArrayData> data;
RETURN_NOT_OK(DictTraits::GetDictionaryArrayData(pool_, value_type_, memo_table_,
0 /* start_offset */, &data));
*out_dict = MakeArray(data);
return Status::OK();
}

private:
MemoryPool* pool_;
std::shared_ptr<DataType> value_type_;
Expand Down
83 changes: 72 additions & 11 deletions cpp/src/arrow/array/concatenate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/int_util.h"
#include "arrow/util/int_util_internal.h"
#include "arrow/util/logging.h"
#include "arrow/visitor_inline.h"
Expand All @@ -44,6 +45,7 @@ namespace arrow {

using internal::SafeSignedAdd;

namespace {
/// offset, length pair for representing a Range of a buffer or array
struct Range {
int64_t offset = -1, length = 0;
Expand All @@ -66,8 +68,8 @@ struct Bitmap {
};

// Allocate a buffer and concatenate bitmaps into it.
static Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool* pool,
std::shared_ptr<Buffer>* out) {
Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool* pool,
std::shared_ptr<Buffer>* out) {
int64_t out_length = 0;
for (const auto& bitmap : bitmaps) {
if (internal::AddWithOverflow(out_length, bitmap.range.length, &out_length)) {
Expand All @@ -94,15 +96,15 @@ static Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool*
// Write offsets in src into dst, adjusting them such that first_offset
// will be the first offset written.
template <typename Offset>
static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
Offset* dst, Range* values_range);
Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset, Offset* dst,
Range* values_range);

// Concatenate buffers holding offsets into a single buffer of offsets,
// also computing the ranges of values spanned by each buffer of offsets.
template <typename Offset>
static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
std::shared_ptr<Buffer>* out,
std::vector<Range>* values_ranges) {
Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
std::shared_ptr<Buffer>* out,
std::vector<Range>* values_ranges) {
values_ranges->resize(buffers.size());

// allocate output buffer
Expand Down Expand Up @@ -130,8 +132,8 @@ static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
}

template <typename Offset>
static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
Offset* dst, Range* values_range) {
Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset, Offset* dst,
Range* values_range) {
if (src->size() == 0) {
// It's allowed to have an empty offsets buffer for a 0-length array
// (see Array::Validate)
Expand Down Expand Up @@ -163,6 +165,44 @@ static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset
return Status::OK();
}

struct DictionaryConcatenate {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the concatenation functions simply memcpy'd the buffers. However, the dictionary concatenation needs to map buffers to potentially new index values. As a result, this function needs to know the type of the buffer for the reinterpret case on line 190. Also, the fact that memo table indices are 32 bit and dictionary indices could be 64 bit is a potential problem but one that already existed and it seems unlikely that a dictionary array would be used when there are 4B unique values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are at it, can all non-public functions/classes in this module be put in the anonymous namespace? This reduces the number of exported symbols and can also open more optimization opportunities for the compiler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

DictionaryConcatenate(BufferVector& index_buffers, BufferVector& index_lookup,
MemoryPool* pool)
: out_(nullptr),
index_buffers_(index_buffers),
index_lookup_(index_lookup),
pool_(pool) {}

template <typename T>
enable_if_t<!is_integer_type<T>::value, Status> Visit(const T& t) {
return Status::Invalid("Dictionary indices must be integral types");
}

template <typename T, typename CType = typename T::c_type>
enable_if_integer<T, Status> Visit(const T& index_type) {
int64_t out_length = 0;
for (const auto& buffer : index_buffers_) {
out_length += buffer->size();
}
ARROW_ASSIGN_OR_RAISE(out_, AllocateBuffer(out_length, pool_));
CType* out_data = reinterpret_cast<CType*>(out_->mutable_data());
for (size_t i = 0; i < index_buffers_.size(); i++) {
const auto& buffer = index_buffers_[i];
auto size = buffer->size() / sizeof(CType);
auto old_indices = reinterpret_cast<const CType*>(buffer->data());
auto indices_map = reinterpret_cast<const int32_t*>(index_lookup_[i]->data());
internal::TransposeInts(old_indices, out_data, size, indices_map);
out_data += size;
}
return Status::OK();
}

std::shared_ptr<Buffer> out_;
const BufferVector& index_buffers_;
const BufferVector& index_lookup_;
MemoryPool* pool_;
};

class ConcatenateImpl {
public:
ConcatenateImpl(const std::vector<std::shared_ptr<const ArrayData>>& in,
Expand Down Expand Up @@ -255,6 +295,21 @@ class ConcatenateImpl {
return Status::OK();
}

Result<BufferVector> UnifyDictionaries(const DictionaryType& d) {
BufferVector new_index_lookup;
ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(d.value_type()));
new_index_lookup.resize(in_.size());
for (size_t i = 0; i < in_.size(); i++) {
auto item = in_[i];
auto dictionary_array = MakeArray(item->dictionary);
RETURN_NOT_OK(unifier->Unify(*dictionary_array, &new_index_lookup[i]));
}
std::shared_ptr<Array> out_dictionary;
RETURN_NOT_OK(unifier->GetResultWithIndexType(d.index_type(), &out_dictionary));
out_->dictionary = out_dictionary->data();
return new_index_lookup;
}

Status Visit(const DictionaryType& d) {
auto fixed = internal::checked_cast<const FixedWidthType*>(d.index_type().get());

Expand All @@ -269,12 +324,16 @@ class ConcatenateImpl {
}
}

ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed));
if (dictionaries_same) {
out_->dictionary = in_[0]->dictionary;
ARROW_ASSIGN_OR_RAISE(auto index_buffers, Buffers(1, *fixed));
return ConcatenateBuffers(index_buffers, pool_).Value(&out_->buffers[1]);
} else {
return Status::NotImplemented("Concat with dictionary unification NYI");
ARROW_ASSIGN_OR_RAISE(auto index_lookup, UnifyDictionaries(d));
DictionaryConcatenate concatenate(index_buffers, index_lookup, pool_);
RETURN_NOT_OK(VisitTypeInline(*d.index_type(), &concatenate));
out_->buffers[1] = std::move(concatenate.out_);
return Status::OK();
}
}

Expand Down Expand Up @@ -416,6 +475,8 @@ class ConcatenateImpl {
std::shared_ptr<ArrayData> out_;
};

} // namespace

Result<std::shared_ptr<Array>> Concatenate(const ArrayVector& arrays, MemoryPool* pool) {
if (arrays.size() == 0) {
return Status::Invalid("Must pass at least one array");
Expand Down
127 changes: 127 additions & 0 deletions cpp/src/arrow/array/concatenate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,133 @@ TEST_F(ConcatenateTest, DictionaryType) {
});
}

TEST_F(ConcatenateTest, DictionaryTypeDifferentDictionaries) {
{
auto dict_type = dictionary(uint8(), utf8());
auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]",
"[\"A0\", \"A1\", \"A2\", \"A3\"]");
auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]",
"[\"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]");
auto concat_expected = DictArrayFromJSON(
dict_type, "[1, 2, null, 3, 0, null, 8, 6, 5]",
"[\"A0\", \"A1\", \"A2\", \"A3\", \"B0\", \"B1\", \"B2\", \"B3\", \"B4\"]");
ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two}));
AssertArraysEqual(*concat_expected, *concat_actual);
}
{
const int SIZE = 500;
auto dict_type = dictionary(uint16(), utf8());

UInt16Builder index_builder;
UInt16Builder expected_index_builder;
ASSERT_OK(index_builder.Reserve(SIZE));
ASSERT_OK(expected_index_builder.Reserve(SIZE * 2));
for (auto i = 0; i < SIZE; i++) {
index_builder.UnsafeAppend(i);
expected_index_builder.UnsafeAppend(i);
}
for (auto i = SIZE; i < 2 * SIZE; i++) {
expected_index_builder.UnsafeAppend(i);
}
ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto expected_indices, expected_index_builder.Finish());

// Creates three dictionaries. The first maps i->"{i}" the second maps i->"{500+i}",
// each for 500 values and the third maps i->"{i}" but for 1000 values.
// The first and second concatenated should end up equaling the third. All strings
// are padded to length 8 so we can know the size ahead of time.
StringBuilder values_one_builder;
StringBuilder values_two_builder;
ASSERT_OK(values_one_builder.Resize(SIZE));
ASSERT_OK(values_two_builder.Resize(SIZE));
ASSERT_OK(values_one_builder.ReserveData(8 * SIZE));
ASSERT_OK(values_two_builder.ReserveData(8 * SIZE));
for (auto i = 0; i < SIZE; i++) {
auto i_str = std::to_string(i);
auto padded = i_str.insert(0, 8 - i_str.length(), '0');
values_one_builder.UnsafeAppend(padded);
auto upper_i_str = std::to_string(i + SIZE);
auto upper_padded = upper_i_str.insert(0, 8 - i_str.length(), '0');
values_two_builder.UnsafeAppend(upper_padded);
}
ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_one_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto dictionary_two, values_two_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto expected_dictionary,
Concatenate({dictionary_one, dictionary_two}))

auto one = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_one);
auto two = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_two);
auto expected = std::make_shared<DictionaryArray>(dict_type, expected_indices,
expected_dictionary);
ASSERT_OK_AND_ASSIGN(auto combined, Concatenate({one, two}));
AssertArraysEqual(*combined, *expected);
}
}

TEST_F(ConcatenateTest, DictionaryTypePartialOverlapDictionaries) {
auto dict_type = dictionary(uint8(), utf8());
auto dict_one = DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0]",
"[\"A0\", \"A1\", \"C2\", \"C3\"]");
auto dict_two = DictArrayFromJSON(dict_type, "[null, 4, 2, 1]",
"[\"B0\", \"B1\", \"C2\", \"C3\", \"B4\"]");
auto concat_expected =
DictArrayFromJSON(dict_type, "[1, 2, null, 3, 0, null, 6, 2, 5]",
"[\"A0\", \"A1\", \"C2\", \"C3\", \"B0\", \"B1\", \"B4\"]");
ASSERT_OK_AND_ASSIGN(auto concat_actual, Concatenate({dict_one, dict_two}));
AssertArraysEqual(*concat_expected, *concat_actual);
}

TEST_F(ConcatenateTest, DictionaryTypeDifferentSizeIndex) {
auto dict_type = dictionary(uint8(), utf8());
auto bigger_dict_type = dictionary(uint16(), utf8());
auto dict_one = DictArrayFromJSON(dict_type, "[0]", "[\"A0\"]");
auto dict_two = DictArrayFromJSON(bigger_dict_type, "[0]", "[\"B0\"]");
ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
}

TEST_F(ConcatenateTest, DictionaryTypeCantUnifyNullInDictionary) {
auto dict_type = dictionary(uint8(), utf8());
auto dict_one = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"A\"]");
auto dict_two = DictArrayFromJSON(dict_type, "[0, 1]", "[null, \"B\"]");
ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());
}

TEST_F(ConcatenateTest, DictionaryTypeEnlargedIndices) {
auto size = std::numeric_limits<uint8_t>::max() + 1;
auto dict_type = dictionary(uint8(), uint16());

UInt8Builder index_builder;
ASSERT_OK(index_builder.Reserve(size));
for (auto i = 0; i < size; i++) {
index_builder.UnsafeAppend(i);
}
ASSERT_OK_AND_ASSIGN(auto indices, index_builder.Finish());

UInt16Builder values_builder;
ASSERT_OK(values_builder.Reserve(size));
UInt16Builder values_builder_two;
ASSERT_OK(values_builder_two.Reserve(size));
for (auto i = 0; i < size; i++) {
values_builder.UnsafeAppend(i);
values_builder_two.UnsafeAppend(i + size);
}
ASSERT_OK_AND_ASSIGN(auto dictionary_one, values_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto dictionary_two, values_builder_two.Finish());

auto dict_one = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_one);
auto dict_two = std::make_shared<DictionaryArray>(dict_type, indices, dictionary_two);
ASSERT_RAISES(Invalid, Concatenate({dict_one, dict_two}).status());

auto bigger_dict_type = dictionary(uint16(), uint16());

auto bigger_one =
std::make_shared<DictionaryArray>(bigger_dict_type, dictionary_one, dictionary_one);
auto bigger_two =
std::make_shared<DictionaryArray>(bigger_dict_type, dictionary_one, dictionary_two);
ASSERT_OK_AND_ASSIGN(auto combined, Concatenate({bigger_one, bigger_two}));
ASSERT_EQ(size * 2, combined->length());
}

TEST_F(ConcatenateTest, DISABLED_UnionType) {
// sparse mode
Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,12 @@ class ARROW_EXPORT DictionaryUnifier {
/// after this is called
virtual Status GetResult(std::shared_ptr<DataType>* out_type,
std::shared_ptr<Array>* out_dict) = 0;

/// \brief Return a unified dictionary with the given index type. If
/// the index type is not large enough then an invalid status will be returned.
/// The unifier cannot be used after this is called
virtual Status GetResultWithIndexType(std::shared_ptr<DataType> index_type,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be an overload but the behavior was different enough I felt it warranted its own name. GetResult is not actually used in the code base anywhere but DictionaryUnifier is an exported type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the docstring a bit off? It doesn't seem a DictionaryType is returned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, fixed.

std::shared_ptr<Array>* out_dict) = 0;
};

// ----------------------------------------------------------------------
Expand Down