Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ arrow_add_pkg_config("arrow")
add_arrow_test(allocator-test)

if(WIN32)
# XXX This bogus special case because of MinGW
# see https://github.com/apache/arrow/pull/3693
add_arrow_test(array-test
SOURCES
array-test.cc
Expand Down
31 changes: 28 additions & 3 deletions cpp/src/arrow/array-dict-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,52 @@ TYPED_TEST(TestDictionaryBuilder, ArrayInit) {
}

TYPED_TEST(TestDictionaryBuilder, MakeBuilder) {
// Explicit dictionary values are provided
auto dict_array = ArrayFromJSON(std::make_shared<TypeParam>(), "[1, 2]");
auto dict_type = dictionary(int8(), dict_array);
std::unique_ptr<ArrayBuilder> boxed_builder;
ASSERT_OK(MakeBuilder(default_memory_pool(), dict_type, &boxed_builder));
auto& builder = checked_cast<DictionaryBuilder<TypeParam>&>(*boxed_builder);

ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
ASSERT_OK(builder.AppendNull());

ASSERT_EQ(builder.length(), 4);
ASSERT_EQ(builder.null_count(), 1);
std::shared_ptr<Array> result;
ASSERT_OK(builder.Finish(&result));

// Build expected data
auto int_array = ArrayFromJSON(int8(), "[1, 1, 0, null]");
DictionaryArray expected(dict_type, int_array);

AssertArraysEqual(expected, *result);
}

TYPED_TEST(TestDictionaryBuilder, MakeBuilderFromIncompleteDictType) {
// Dictionary values are inferred as an IncompleteDictionaryType is passed
auto value_type = std::make_shared<TypeParam>();
auto dict_type = incomplete_dictionary(int8(), value_type);
std::unique_ptr<ArrayBuilder> boxed_builder;
ASSERT_OK(MakeBuilder(default_memory_pool(), dict_type, &boxed_builder));
auto& builder = checked_cast<DictionaryBuilder<TypeParam>&>(*boxed_builder);

ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(2)));
ASSERT_OK(builder.Append(static_cast<typename TypeParam::c_type>(1)));
ASSERT_OK(builder.AppendNull());

ASSERT_EQ(builder.length(), 4);
ASSERT_EQ(builder.null_count(), 1);
std::shared_ptr<Array> result;
ASSERT_OK(builder.Finish(&result));

auto int_array = ArrayFromJSON(int8(), "[0, 1, 0, null]");
DictionaryArray expected(dict_type, int_array);
// Build expected data
auto int_array = ArrayFromJSON(int8(), "[0, 0, 1, null]");
auto actual_dict_type = dictionary(int8(), ArrayFromJSON(value_type, "[2, 1]"));
DictionaryArray expected(actual_dict_type, int_array);

AssertArraysEqual(expected, *result);
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,10 @@ class ArrayDataWrapper {
return Status::OK();
}

Status Visit(const IncompleteDictionaryType& type) {
return Status::TypeError("Cannot create array of type '", type.ToString(), "'");
}

Status Visit(const ExtensionType& type) {
*out_ = type.MakeArray(data_);
return Status::OK();
Expand Down
41 changes: 31 additions & 10 deletions cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/visitor_inline.h"

namespace arrow {
Expand All @@ -35,11 +34,21 @@ class MemoryPool;
// ----------------------------------------------------------------------
// Helper functions

#define BUILDER_CASE(ENUM, BuilderType) \
case Type::ENUM: \
out->reset(new BuilderType(type, pool)); \
return Status::OK();
template <typename BuilderType>
static Status CreateDictBuilder(const DictionaryType& dict_type, MemoryPool* pool,
std::unique_ptr<ArrayBuilder>* out) {
out->reset(new BuilderType(dict_type.dictionary(), pool));
return Status::OK();
}

template <typename BuilderType>
static Status CreateDictBuilder(const IncompleteDictionaryType& dict_type,
MemoryPool* pool, std::unique_ptr<ArrayBuilder>* out) {
out->reset(new BuilderType(dict_type.value_type(), pool));
return Status::OK();
}

template <typename DICT_TYPE>
struct DictionaryBuilderCase {
template <typename ValueType>
Status Visit(const ValueType&, typename ValueType::c_type* = nullptr) {
Expand All @@ -65,12 +74,11 @@ struct DictionaryBuilderCase {

template <typename BuilderType>
Status Create() {
out->reset(new BuilderType(dict_type.dictionary(), pool));
return Status::OK();
return CreateDictBuilder<BuilderType>(dict_type, pool, out);
}

MemoryPool* pool;
const DictionaryType& dict_type;
const DICT_TYPE& dict_type;
std::unique_ptr<ArrayBuilder>* out;
};

Expand All @@ -80,6 +88,11 @@ struct DictionaryBuilderCase {
// TODO(wesm): come up with a less monolithic strategy
Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
std::unique_ptr<ArrayBuilder>* out) {
#define BUILDER_CASE(ENUM, BuilderType) \
case Type::ENUM: \
out->reset(new BuilderType(type, pool)); \
return Status::OK();

switch (type->id()) {
case Type::NA: {
out->reset(new NullBuilder(pool));
Expand Down Expand Up @@ -107,10 +120,16 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
BUILDER_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryBuilder);
BUILDER_CASE(DECIMAL, Decimal128Builder);
case Type::DICTIONARY: {
const auto& dict_type = static_cast<const DictionaryType&>(*type);
DictionaryBuilderCase visitor = {pool, dict_type, out};
const auto& dict_type = internal::checked_cast<const DictionaryType&>(*type);
DictionaryBuilderCase<DictionaryType> visitor = {pool, dict_type, out};
return VisitTypeInline(*dict_type.dictionary()->type(), &visitor);
}
case Type::INCOMPLETE_DICTIONARY: {
const auto& dict_type =
internal::checked_cast<const IncompleteDictionaryType&>(*type);
DictionaryBuilderCase<IncompleteDictionaryType> visitor = {pool, dict_type, out};
return VisitTypeInline(*dict_type.value_type(), &visitor);
}
case Type::LIST: {
std::unique_ptr<ArrayBuilder> value_builder;
std::shared_ptr<DataType> value_type =
Expand Down Expand Up @@ -138,6 +157,8 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
type->ToString());
}
}

#undef BUILDER_CASE
}

} // namespace arrow
20 changes: 11 additions & 9 deletions cpp/src/arrow/compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,15 @@ class TypeEqualsVisitor {
return Status::OK();
}

Status Visit(const IncompleteDictionaryType& left) {
const auto& right = checked_cast<const IncompleteDictionaryType&>(right_);
result_ = left.index_type()->Equals(right.index_type()) &&
left.value_type()->Equals(right.value_type()) &&
(left.dictionary_id() == right.dictionary_id()) &&
(left.ordered() == right.ordered());
return Status::OK();
}

Status Visit(const ExtensionType& left) {
result_ = left.ExtensionEquals(static_cast<const ExtensionType&>(right_));
return Status::OK();
Expand Down Expand Up @@ -843,15 +852,8 @@ class ScalarEqualsVisitor {
return Status::OK();
}

Status Visit(const UnionScalar& left) { return Status::NotImplemented("union"); }

Status Visit(const DictionaryScalar& left) {
return Status::NotImplemented("dictionary");
}

Status Visit(const ExtensionScalar& left) {
return Status::NotImplemented("extension");
}
// Default case
Status Visit(const Scalar& left) { return Status::NotImplemented(left.type->name()); }

bool result() const { return result_; }

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/kernels/take.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ struct UnpackValues {
return Status::NotImplemented("gathering values of type ", t);
}

Status Visit(const IncompleteDictionaryType& t) {
return Status::NotImplemented("gathering values of type ", t);
}

const TakeParameters& params_;
};

Expand Down
56 changes: 38 additions & 18 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ class FieldToFlatbufferVisitor {
return Status::OK();
}

Status Visit(const IncompleteDictionaryType& type) {
return Status::NotImplemented("incomplete dictionary");
}

Status GetResult(const Field& field, FieldOffset* offset) {
auto fb_name = fbb_.CreateString(field.name());
RETURN_NOT_OK(VisitType(*field.type()));
Expand Down Expand Up @@ -644,6 +648,24 @@ static Status GetFieldMetadata(const flatbuf::Field* field,
return Status::OK();
}

static Status FieldFromFlatbuffer(const flatbuf::Field* field,
const DictionaryMemo& dictionary_memo,
std::shared_ptr<Field>* out);

// Reconstruct the data type of a flatbuffer-encoded field
static Status ReconstructFieldType(const flatbuf::Field* field,
const KeyValueMetadata* metadata,
const DictionaryMemo& dictionary_memo,
std::shared_ptr<DataType>* out) {
auto children = field->children();
std::vector<std::shared_ptr<Field>> child_fields(children->size());
for (int i = 0; i < static_cast<int>(children->size()); ++i) {
RETURN_NOT_OK(
FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
}
return TypeFromFlatbuffer(field, child_fields, metadata, out);
}

static Status FieldFromFlatbuffer(const flatbuf::Field* field,
const DictionaryMemo& dictionary_memo,
std::shared_ptr<Field>* out) {
Expand All @@ -657,24 +679,25 @@ static Status FieldFromFlatbuffer(const flatbuf::Field* field,
if (encoding == nullptr) {
// The field is not dictionary encoded. We must potentially visit its
// children to fully reconstruct the data type
auto children = field->children();
std::vector<std::shared_ptr<Field>> child_fields(children->size());
for (int i = 0; i < static_cast<int>(children->size()); ++i) {
RETURN_NOT_OK(
FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
}
RETURN_NOT_OK(TypeFromFlatbuffer(field, child_fields, metadata.get(), &type));
RETURN_NOT_OK(ReconstructFieldType(field, metadata.get(), dictionary_memo, &type));
} else {
// The field is dictionary encoded. The type of the dictionary values has
// been determined elsewhere, and is stored in the DictionaryMemo. Here we
// construct the logical DictionaryType object

std::shared_ptr<Array> dictionary;
RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
// The field is dictionary encoded. Here we
// construct the logical dictionary type.

std::shared_ptr<DataType> index_type;
RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
type = ::arrow::dictionary(index_type, dictionary, encoding->isOrdered());
std::shared_ptr<Array> dictionary;
auto status = dictionary_memo.GetDictionary(encoding->id(), &dictionary);

if (status.IsKeyError()) {
// Dictionary array not found, need to reconstruct value type
RETURN_NOT_OK(ReconstructFieldType(field, metadata.get(), dictionary_memo, &type));
type = ::arrow::incomplete_dictionary(index_type, type, encoding->isOrdered(),
encoding->id());
} else {
RETURN_NOT_OK(status); // Handle other errors
type = ::arrow::dictionary(index_type, dictionary, encoding->isOrdered());
}
}

*out = std::make_shared<Field>(field->name()->str(), type, field->nullable(), metadata);
Expand All @@ -684,16 +707,13 @@ static Status FieldFromFlatbuffer(const flatbuf::Field* field,

static Status FieldFromFlatbufferDictionary(const flatbuf::Field* field,
std::shared_ptr<Field>* out) {
// Need an empty memo to pass down for constructing children
DictionaryMemo dummy_memo;

// Any DictionaryEncoding set is ignored here

std::shared_ptr<DataType> type;
auto children = field->children();
std::vector<std::shared_ptr<Field>> child_fields(children->size());
for (int i = 0; i < static_cast<int>(children->size()); ++i) {
RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i]));
RETURN_NOT_OK(FieldFromFlatbufferDictionary(children->Get(i), &child_fields[i]));
}

std::shared_ptr<KeyValueMetadata> metadata;
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/ipc/read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,31 @@ TEST_F(TestSchemaMetadata, DictionaryFields) {
}
}

TEST_F(TestSchemaMetadata, IncompleteDictionaryFields) {
auto dict_type = dictionary(int8(), ArrayFromJSON(int32(), "[6, 5, 4]"));
auto f0 = field("f0", dict_type);
auto f1 = field("f1", list(dict_type));
Schema schema({f0, f1});

std::shared_ptr<Buffer> buffer;
ASSERT_OK(SerializeSchema(schema, default_memory_pool(), &buffer));

// Only read one message. It will not contain the schema dictionaries.
std::unique_ptr<Message> message;
std::shared_ptr<Schema> result;
io::BufferReader reader(buffer);
ASSERT_OK(ReadMessage(&reader, &message));
ASSERT_OK(ReadSchema(*message, &result));

// Decoding the schema should give us incomplete dictionary types.
auto incomplete_dict_type =
incomplete_dictionary(int8(), int32(), false /* ordered */, 0 /* dictionary_id */);
f0 = field("f0", incomplete_dict_type);
f1 = field("f1", list(incomplete_dict_type));
Schema expected({f0, f1});
AssertSchemaEqual(expected, *result);
}

TEST_F(TestSchemaMetadata, KeyValueMetadata) {
auto field_metadata = key_value_metadata({{"key", "value"}});
auto schema_metadata = key_value_metadata({{"foo", "bar"}, {"bizz", "buzz"}});
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,8 @@ class ArrayLoader {
}

template <typename T>
typename std::enable_if<std::is_base_of<FixedWidthType, T>::value &&
!std::is_base_of<FixedSizeBinaryType, T>::value &&
!std::is_base_of<DictionaryType, T>::value,
Status>::type
Visit(const T& type) {
typename std::enable_if<std::is_base_of<FixedWidthType, T>::value, Status>::type Visit(
const T& type) {
return LoadPrimitive<T>();
}

Expand Down Expand Up @@ -292,6 +289,10 @@ class ArrayLoader {
return Status::OK();
}

Status Visit(const IncompleteDictionaryType& type) {
return Status::TypeError("Incomplete dictionary encountered, bad IPC stream?");
}

Status Visit(const ExtensionType& type) {
RETURN_NOT_OK(LoadArray(type.storage_type(), context_, out_));
out_->type = type_;
Expand Down Expand Up @@ -373,9 +374,10 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
return ReadRecordBatch(batch, schema, max_recursion_depth, file, out);
}

Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionary_types,
io::RandomAccessFile* file, int64_t* dictionary_id,
std::shared_ptr<Array>* out) {
static Status ReadDictionary(const Buffer& metadata,
const DictionaryTypeMap& dictionary_types,
io::RandomAccessFile* file, int64_t* dictionary_id,
std::shared_ptr<Array>* out) {
auto message = flatbuf::GetMessage(metadata.data());
auto dictionary_batch =
reinterpret_cast<const flatbuf::DictionaryBatch*>(message->header());
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define ARROW_IPC_READER_H

#include <cstdint>
#include <functional>
#include <memory>

#include "arrow/ipc/message.h"
Expand Down
Loading