From 5ca9c5757d88380d62d39aa215ba4bedd60cd03e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 2 Jan 2017 11:02:55 -0500 Subject: [PATCH 1/6] Implement IPC/JSON serializationf or unions. Test UnionMode::SPARSE example in IPC Change-Id: Icc5c3f9a5080d3853ed6382f714b8a1ee2595adf --- cpp/src/arrow/array.cc | 113 +++++++++++++++++++++++--- cpp/src/arrow/array.h | 82 +++++++++++++------ cpp/src/arrow/builder.h | 2 +- cpp/src/arrow/io/hdfs-internal.h | 2 +- cpp/src/arrow/ipc/adapter.cc | 56 ++++++++++--- cpp/src/arrow/ipc/ipc-adapter-test.cc | 2 +- cpp/src/arrow/ipc/json-internal.cc | 19 +++-- cpp/src/arrow/ipc/test-common.h | 46 +++++++++++ cpp/src/arrow/pretty_print.cc | 44 ++++++---- cpp/src/arrow/test-util.h | 12 +++ cpp/src/arrow/type.cc | 2 +- cpp/src/arrow/type.h | 8 +- 12 files changed, 311 insertions(+), 77 deletions(-) diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index d13fa1e1081..3027205c558 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -189,14 +189,14 @@ bool BooleanArray::EqualsExact(const BooleanArray& other) const { } } -bool BooleanArray::Equals(const ArrayPtr& arr) const { +bool BooleanArray::Equals(const std::shared_ptr& arr) const { if (this == arr.get()) return true; if (Type::BOOL != arr->type_enum()) { return false; } return EqualsExact(*static_cast(arr.get())); } bool BooleanArray::RangeEquals(int32_t start_idx, int32_t end_idx, - int32_t other_start_idx, const ArrayPtr& arr) const { + int32_t other_start_idx, const std::shared_ptr& arr) const { if (this == arr.get()) { return true; } if (!arr) { return false; } if (this->type_enum() != arr->type_enum()) { return false; } @@ -222,7 +222,7 @@ bool ListArray::EqualsExact(const ListArray& other) const { if (null_count_ != other.null_count_) { return false; } bool equal_offsets = - offset_buffer_->Equals(*other.offset_buffer_, (length_ + 1) * sizeof(int32_t)); + offsets_buffer_->Equals(*other.offsets_buffer_, (length_ + 1) * sizeof(int32_t)); if (!equal_offsets) { return false; } bool equal_null_bitmap = true; if (null_count_ > 0) { @@ -269,10 +269,10 @@ bool ListArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_st Status ListArray::Validate() const { if (length_ < 0) { return Status::Invalid("Length was negative"); } - if (!offset_buffer_) { return Status::Invalid("offset_buffer_ was null"); } - if (offset_buffer_->size() / static_cast(sizeof(int32_t)) < length_) { + if (!offsets_buffer_) { return Status::Invalid("offsets_buffer_ was null"); } + if (offsets_buffer_->size() / static_cast(sizeof(int32_t)) < length_) { std::stringstream ss; - ss << "offset buffer size (bytes): " << offset_buffer_->size() + ss << "offset buffer size (bytes): " << offsets_buffer_->size() << " isn't large enough for length: " << length_; return Status::Invalid(ss.str()); } @@ -337,8 +337,8 @@ BinaryArray::BinaryArray(const TypePtr& type, int32_t length, const std::shared_ptr& offsets, const std::shared_ptr& data, int32_t null_count, const std::shared_ptr& null_bitmap) : Array(type, length, null_count, null_bitmap), - offset_buffer_(offsets), - offsets_(reinterpret_cast(offset_buffer_->data())), + offsets_buffer_(offsets), + offsets_(reinterpret_cast(offsets_buffer_->data())), data_buffer_(data), data_(nullptr) { if (data_buffer_ != nullptr) { data_ = data_buffer_->data(); } @@ -353,7 +353,7 @@ bool BinaryArray::EqualsExact(const BinaryArray& other) const { if (!Array::EqualsExact(other)) { return false; } bool equal_offsets = - offset_buffer_->Equals(*other.offset_buffer_, (length_ + 1) * sizeof(int32_t)); + offsets_buffer_->Equals(*other.offsets_buffer_, (length_ + 1) * sizeof(int32_t)); if (!equal_offsets) { return false; } if (!data_buffer_ && !(other.data_buffer_)) { return true; } @@ -433,7 +433,7 @@ bool StructArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_ if (this == arr.get()) { return true; } if (!arr) { return false; } if (Type::STRUCT != arr->type_enum()) { return false; } - const auto other = static_cast(arr.get()); + const auto& other = static_cast(*arr.get()); bool equal_fields = true; for (int32_t i = start_idx, o_i = other_start_idx; i < end_idx; ++i, ++o_i) { @@ -442,7 +442,7 @@ bool StructArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_ for (size_t j = 0; j < field_arrays_.size(); ++j) { // TODO: really we should be comparing stretches of non-null data rather // than looking at one value at a time. - equal_fields = field(j)->RangeEquals(i, i + 1, o_i, other->field(j)); + equal_fields = field(j)->RangeEquals(i, i + 1, o_i, other.field(j)); if (!equal_fields) { return false; } } } @@ -490,6 +490,95 @@ Status StructArray::Accept(ArrayVisitor* visitor) const { return visitor->Visit(*this); } +// ---------------------------------------------------------------------- +// UnionArray + +UnionArray::UnionArray(const TypePtr& type, int32_t length, + std::vector>& children, + const std::shared_ptr& type_ids, const std::shared_ptr& offsets, + int32_t null_count, std::shared_ptr null_bitmap) + : Array(type, length, null_count, null_bitmap) { + type_ = type; + children_ = children; + type_ids_buffer_ = type_ids; + type_ids_ = reinterpret_cast(type_ids->data()); + + offsets_buffer_ = offsets; + if (offsets) { offsets_ = reinterpret_cast(offsets->data()); } +} + +std::shared_ptr UnionArray::child(int32_t pos) const { + DCHECK_GT(children_.size(), 0); + return children_[pos]; +} + +bool UnionArray::Equals(const std::shared_ptr& arr) const { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (!this->type_->Equals(arr->type())) { return false; } + if (null_count_ != arr->null_count()) { return false; } + return RangeEquals(0, length_, 0, arr); +} + +bool UnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr& arr) const { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (Type::UNION != arr->type_enum()) { return false; } + const auto& other = static_cast(*arr.get()); + + if (mode() != other.mode()) { return false; } + DCHECK_EQ(mode(), UnionMode::SPARSE) << "Only SPARSE implemented"; + + // Define a mapping from the type id to child number + const auto& type_codes = static_cast(*arr->type().get()).type_ids; + uint8_t max_code = 0; + for (uint8_t code : type_codes) { + if (code > max_code) { max_code = code; } + } + + // Store mapping in a vector for constant time lookups + std::vector type_id_to_child_num(max_code); + for (uint8_t i = 0; i < static_cast(type_codes.size()); ++i) { + type_id_to_child_num[type_codes[i]] = i; + } + + const uint8_t* this_ids = raw_type_ids(); + const uint8_t* other_ids = other.raw_type_ids(); + + uint8_t id, child_num; + for (int32_t i = start_idx, o_i = other_start_idx; i < end_idx; ++i, ++o_i) { + if (IsNull(i) != other.IsNull(o_i)) { return false; } + if (IsNull(i)) continue; + if (this_ids[i] != other_ids[i]) { return false; } + + id = this_ids[i]; + child_num = type_id_to_child_num[id]; + + // TODO(wesm): really we should be comparing stretches of non-null data + // rather than looking at one value at a time. + if (!child(child_num)->RangeEquals(i, i + 1, o_i, other.child(child_num))) { + return false; + } + } + return true; +} + +Status UnionArray::Validate() const { + if (length_ < 0) { return Status::Invalid("Length was negative"); } + + if (null_count() > length_) { + return Status::Invalid("Null count exceeds the length of this struct"); + } + + DCHECK(false) << "Validate not yet implemented"; + return Status::OK(); +} + +Status UnionArray::Accept(ArrayVisitor* visitor) const { + return visitor->Visit(*this); +} + // ---------------------------------------------------------------------- #define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType) \ @@ -499,7 +588,7 @@ Status StructArray::Accept(ArrayVisitor* visitor) const { Status MakePrimitiveArray(const TypePtr& type, int32_t length, const std::shared_ptr& data, int32_t null_count, - const std::shared_ptr& null_bitmap, ArrayPtr* out) { + const std::shared_ptr& null_bitmap, std::shared_ptr* out) { switch (type->type) { MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray); MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array); diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 6239ccc576b..98b2340c481 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -152,7 +152,7 @@ class ARROW_EXPORT NumericArray : public PrimitiveArray { } bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, - const ArrayPtr& arr) const override { + const std::shared_ptr& arr) const override { if (this == arr.get()) { return true; } if (!arr) { return false; } if (this->type_enum() != arr->type_enum()) { return false; } @@ -256,9 +256,9 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray { int32_t null_count = 0, const std::shared_ptr& null_bitmap = nullptr); bool EqualsExact(const BooleanArray& other) const; - bool Equals(const ArrayPtr& arr) const override; + bool Equals(const std::shared_ptr& arr) const override; bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, - const ArrayPtr& arr) const override; + const std::shared_ptr& arr) const override; Status Accept(ArrayVisitor* visitor) const override; @@ -275,12 +275,12 @@ class ARROW_EXPORT ListArray : public Array { using TypeClass = ListType; ListArray(const TypePtr& type, int32_t length, std::shared_ptr offsets, - const ArrayPtr& values, int32_t null_count = 0, + const std::shared_ptr& values, int32_t null_count = 0, std::shared_ptr null_bitmap = nullptr) : Array(type, length, null_count, null_bitmap) { - offset_buffer_ = offsets; + offsets_buffer_ = offsets; offsets_ = offsets == nullptr ? nullptr : reinterpret_cast( - offset_buffer_->data()); + offsets_buffer_->data()); values_ = values; } @@ -291,9 +291,7 @@ class ARROW_EXPORT ListArray : public Array { // Return a shared pointer in case the requestor desires to share ownership // with this array. std::shared_ptr values() const { return values_; } - std::shared_ptr offsets() const { - return std::static_pointer_cast(offset_buffer_); - } + std::shared_ptr offsets() const { return offsets_buffer_; } std::shared_ptr value_type() const { return values_->type(); } @@ -309,14 +307,14 @@ class ARROW_EXPORT ListArray : public Array { bool Equals(const std::shared_ptr& arr) const override; bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, - const ArrayPtr& arr) const override; + const std::shared_ptr& arr) const override; Status Accept(ArrayVisitor* visitor) const override; protected: - std::shared_ptr offset_buffer_; + std::shared_ptr offsets_buffer_; const int32_t* offsets_; - ArrayPtr values_; + std::shared_ptr values_; }; // ---------------------------------------------------------------------- @@ -346,7 +344,7 @@ class ARROW_EXPORT BinaryArray : public Array { } std::shared_ptr data() const { return data_buffer_; } - std::shared_ptr offsets() const { return offset_buffer_; } + std::shared_ptr offsets() const { return offsets_buffer_; } const int32_t* raw_offsets() const { return offsets_; } @@ -359,14 +357,14 @@ class ARROW_EXPORT BinaryArray : public Array { bool EqualsExact(const BinaryArray& other) const; bool Equals(const std::shared_ptr& arr) const override; bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, - const ArrayPtr& arr) const override; + const std::shared_ptr& arr) const override; Status Validate() const override; Status Accept(ArrayVisitor* visitor) const override; private: - std::shared_ptr offset_buffer_; + std::shared_ptr offsets_buffer_; const int32_t* offsets_; std::shared_ptr data_buffer_; @@ -401,8 +399,9 @@ class ARROW_EXPORT StructArray : public Array { public: using TypeClass = StructType; - StructArray(const TypePtr& type, int32_t length, std::vector& field_arrays, - int32_t null_count = 0, std::shared_ptr null_bitmap = nullptr) + StructArray(const TypePtr& type, int32_t length, + std::vector>& field_arrays, int32_t null_count = 0, + std::shared_ptr null_bitmap = nullptr) : Array(type, length, null_count, null_bitmap) { type_ = type; field_arrays_ = field_arrays; @@ -416,7 +415,7 @@ class ARROW_EXPORT StructArray : public Array { // with this array. std::shared_ptr field(int32_t pos) const; - const std::vector& fields() const { return field_arrays_; } + const std::vector>& fields() const { return field_arrays_; } bool EqualsExact(const StructArray& other) const; bool Equals(const std::shared_ptr& arr) const override; @@ -427,25 +426,54 @@ class ARROW_EXPORT StructArray : public Array { protected: // The child arrays corresponding to each field of the struct data type. - std::vector field_arrays_; + std::vector> field_arrays_; }; // ---------------------------------------------------------------------- // Union class UnionArray : public Array { + public: + using TypeClass = UnionType; + + UnionArray(const TypePtr& type, int32_t length, + std::vector>& children, + const std::shared_ptr& type_ids, + const std::shared_ptr& offsets = nullptr, int32_t null_count = 0, + std::shared_ptr null_bitmap = nullptr); + + Status Validate() const override; + + virtual ~UnionArray() {} + + std::shared_ptr type_ids() const { return type_ids_buffer_; } + const uint8_t* raw_type_ids() const { return type_ids_; } + + std::shared_ptr offsets() const { return offsets_buffer_; } + const int32_t* raw_offsets() const { return offsets_; } + + UnionMode mode() const { return static_cast(*type_.get()).mode; } + + std::shared_ptr child(int32_t pos) const; + + const std::vector>& children() const { return children_; } + + bool EqualsExact(const UnionArray& other) const; + bool Equals(const std::shared_ptr& arr) const override; + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr& arr) const override; + + Status Accept(ArrayVisitor* visitor) const override; + protected: - // The data are types encoded as int16 - Buffer* types_; std::vector> children_; -}; -class DenseUnionArray : public UnionArray { - protected: - Buffer* offset_buf_; -}; + std::shared_ptr type_ids_buffer_; + const uint8_t* type_ids_; -class SparseUnionArray : public UnionArray {}; + std::shared_ptr offsets_buffer_; + const int32_t* offsets_; +}; // ---------------------------------------------------------------------- // extern templates and other details diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 205139849b4..1837340cedc 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -228,7 +228,7 @@ using DoubleBuilder = NumericBuilder; class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { public: - explicit BooleanBuilder(MemoryPool* pool, const TypePtr& type) + explicit BooleanBuilder(MemoryPool* pool, const TypePtr& type = boolean()) : ArrayBuilder(pool, type), data_(nullptr) {} virtual ~BooleanBuilder() {} diff --git a/cpp/src/arrow/io/hdfs-internal.h b/cpp/src/arrow/io/hdfs-internal.h index 8f9a06758cb..fb57fb938d8 100644 --- a/cpp/src/arrow/io/hdfs-internal.h +++ b/cpp/src/arrow/io/hdfs-internal.h @@ -26,8 +26,8 @@ #ifndef NOMINMAX #define NOMINMAX #endif -#include #include +#include // TODO(wesm): address when/if we add windows support // #include diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index ac4054b376a..9bfd11fd01b 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -276,7 +276,16 @@ class RecordBatchWriter : public ArrayVisitor { } Status Visit(const UnionArray& array) override { - return Status::NotImplemented("union"); + buffers_.push_back(array.type_ids()); + + if (array.mode() == UnionMode::DENSE) { buffers_.push_back(array.offsets()); } + + --max_recursion_depth_; + for (const auto& field : array.children()) { + RETURN_NOT_OK(VisitArray(*field.get())); + } + ++max_recursion_depth_; + return Status::OK(); } // Do not copy this vector. Ownership must be retained elsewhere @@ -464,9 +473,10 @@ class ArrayLoader : public TypeVisitor { Status Visit(const ListType& type) override { FieldMetadata field_meta; std::shared_ptr null_bitmap; - std::shared_ptr offsets; RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + + std::shared_ptr offsets; RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets)); const int num_children = type.num_children(); @@ -484,20 +494,25 @@ class ArrayLoader : public TypeVisitor { return Status::OK(); } + Status LoadChildren(std::vector> child_fields, + std::vector>* arrays) { + arrays->reserve(static_cast(child_fields.size())); + + for (const auto& child_field : child_fields) { + std::shared_ptr field_array; + RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array)); + arrays->emplace_back(field_array); + } + return Status::OK(); + } + Status Visit(const StructType& type) override { FieldMetadata field_meta; std::shared_ptr null_bitmap; RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); - const int num_children = type.num_children(); - std::vector fields; - fields.reserve(num_children); - - for (int child_idx = 0; child_idx < num_children; ++child_idx) { - std::shared_ptr field_array; - RETURN_NOT_OK(LoadChild(*type.child(child_idx).get(), &field_array)); - fields.emplace_back(field_array); - } + std::vector> fields; + RETURN_NOT_OK(LoadChildren(type.children(), &fields)); result_ = std::make_shared( field_.type, field_meta.length, fields, field_meta.null_count, null_bitmap); @@ -505,7 +520,24 @@ class ArrayLoader : public TypeVisitor { } Status Visit(const UnionType& type) override { - return Status::NotImplemented(type.ToString()); + FieldMetadata field_meta; + std::shared_ptr null_bitmap; + RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap)); + + std::shared_ptr type_ids; + std::shared_ptr offsets = nullptr; + RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &type_ids)); + + if (type.mode == UnionMode::DENSE) { + RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets)); + } + + std::vector> fields; + RETURN_NOT_OK(LoadChildren(type.children(), &fields)); + + result_ = std::make_shared(field_.type, field_meta.length, fields, + type_ids, offsets, field_meta.null_count, null_bitmap); + return Status::OK(); } }; diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index f309b8562f7..c085c66de64 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -95,7 +95,7 @@ TEST_P(TestWriteRecordBatch, RoundTrip) { INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch, ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, - &MakeStringTypesRecordBatch, &MakeStruct)); + &MakeStringTypesRecordBatch, &MakeStruct, &MakeSparseUnion)); void TestGetRecordBatchSize(std::shared_ptr batch) { ipc::MockOutputStream mock; diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index db11b7d0372..88412e6bed4 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -415,11 +415,11 @@ class JsonArrayWriter : public ArrayVisitor { } template - void WriteOffsetsField(const T* offsets, int32_t length) { - writer_->Key("OFFSET"); + void WriteIntegerField(const char* name, const T* values, int32_t length) { + writer_->Key(name); writer_->StartArray(); for (int i = 0; i < length; ++i) { - writer_->Int64(offsets[i]); + writer_->Int64(values[i]); } writer_->EndArray(); } @@ -456,7 +456,7 @@ class JsonArrayWriter : public ArrayVisitor { template Status WriteVarBytes(const T& array) { WriteValidityField(array); - WriteOffsetsField(array.raw_offsets(), array.length() + 1); + WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1); WriteDataField(array); SetNoChildren(); return Status::OK(); @@ -524,7 +524,7 @@ class JsonArrayWriter : public ArrayVisitor { Status Visit(const ListArray& array) override { WriteValidityField(array); - WriteOffsetsField(array.raw_offsets(), array.length() + 1); + WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1); auto type = static_cast(array.type().get()); return WriteChildren(type->children(), {array.values()}); } @@ -536,7 +536,14 @@ class JsonArrayWriter : public ArrayVisitor { } Status Visit(const UnionArray& array) override { - return Status::NotImplemented("union"); + WriteValidityField(array); + auto type = static_cast(array.type().get()); + + WriteIntegerField("TYPE_IDS", array.raw_type_ids(), array.length()); + if (type->mode == UnionMode::DENSE) { + WriteIntegerField("OFFSET", array.raw_offsets(), array.length()); + } + return WriteChildren(type->children(), array.children()); } private: diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 8416f0df573..0abda38738c 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -270,6 +270,52 @@ Status MakeStruct(std::shared_ptr* out) { return Status::OK(); } +Status MakeSparseUnion(std::shared_ptr* out) { + // Define schema + std::vector> union_types( + {std::make_shared("u0", int32()), std::make_shared("u1", boolean())}); + + std::vector type_codes = {0, 1}; + auto type = std::make_shared(union_types, type_codes, UnionMode::SPARSE); + + auto f0 = std::make_shared("non_null_union", type, false); + auto f1 = std::make_shared("null_union", type); + std::shared_ptr schema(new Schema({f0, f1})); + + // Create data + std::vector> children(2); + + const int32_t length = 6; + + std::vector f0_values = {0, 1, 2, 3, 4, 5, 6}; + ArrayFromVector(type->child(0)->type, f0_values, &children[0]); + + std::vector f1_values = {1, 0, 1, 1, 0, 1}; + BooleanBuilder bool_builder(default_memory_pool()); + RETURN_NOT_OK(bool_builder.Append(f1_values.data(), length)); + RETURN_NOT_OK(bool_builder.Finish(&children[1])); + + std::shared_ptr type_ids_buffer; + std::vector type_ids = {0, 1, 0, 0, 1, 1}; + RETURN_NOT_OK(test::CopyBufferFromVector(type_ids, &type_ids_buffer)); + + std::vector null_bytes(length, 1); + null_bytes[2] = 0; + std::shared_ptr null_bitmask; + RETURN_NOT_OK(BitUtil::BytesToBits(null_bytes, &null_bitmask)); + + // construct individual nullable/non-nullable struct arrays + auto no_nulls = std::make_shared(type, length, children, type_ids_buffer); + + auto with_nulls = std::make_shared( + type, length, children, type_ids_buffer, nullptr, 1, null_bitmask); + + // construct batch + std::vector arrays = {no_nulls, with_nulls}; + out->reset(new RecordBatch(schema, length, arrays)); + return Status::OK(); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc index 9c439c47eb8..324f81bfbfd 100644 --- a/cpp/src/arrow/pretty_print.cc +++ b/cpp/src/arrow/pretty_print.cc @@ -161,44 +161,60 @@ class ArrayPrinter : public ArrayVisitor { return Status::NotImplemented("decimal"); } - Status Visit(const ListArray& array) override { + Status WriteValidityBitmap(const Array& array) { Newline(); Write("-- is_valid: "); BooleanArray is_valid(array.length(), array.null_bitmap()); - PrettyPrint(is_valid, indent_ + 2, sink_); + return PrettyPrint(is_valid, indent_ + 2, sink_); + } + + Status Visit(const ListArray& array) override { + RETURN_NOT_OK(WriteValidityBitmap(array)); Newline(); Write("-- offsets: "); Int32Array offsets(array.length() + 1, array.offsets()); - PrettyPrint(offsets, indent_ + 2, sink_); + RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_)); Newline(); Write("-- values: "); - PrettyPrint(*array.values().get(), indent_ + 2, sink_); + RETURN_NOT_OK(PrettyPrint(*array.values().get(), indent_ + 2, sink_)); return Status::OK(); } - Status Visit(const StructArray& array) override { - Newline(); - Write("-- is_valid: "); - BooleanArray is_valid(array.length(), array.null_bitmap()); - PrettyPrint(is_valid, indent_ + 2, sink_); - - const std::vector>& fields = array.fields(); + Status PrintChildren(const std::vector>& fields) { for (size_t i = 0; i < fields.size(); ++i) { Newline(); std::stringstream ss; ss << "-- child " << i << " type: " << fields[i]->type()->ToString() << " values: "; Write(ss.str()); - PrettyPrint(*fields[i].get(), indent_ + 2, sink_); + RETURN_NOT_OK(PrettyPrint(*fields[i].get(), indent_ + 2, sink_)); } - return Status::OK(); } + Status Visit(const StructArray& array) override { + RETURN_NOT_OK(WriteValidityBitmap(array)); + return PrintChildren(array.fields()); + } + Status Visit(const UnionArray& array) override { - return Status::NotImplemented("union"); + RETURN_NOT_OK(WriteValidityBitmap(array)); + + Newline(); + Write("-- type_ids: "); + UInt8Array type_ids(array.length(), array.type_ids()); + RETURN_NOT_OK(PrettyPrint(type_ids, indent_ + 2, sink_)); + + if (array.mode() == UnionMode::DENSE) { + Newline(); + Write("-- offsets: "); + Int32Array offsets(array.length(), array.offsets()); + RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_)); + } + + return PrintChildren(array.children()); } void Write(const char* data) { (*sink_) << data; } diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index aa310b1a49e..ae397109dbc 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -274,6 +274,18 @@ void ArrayFromVector(const std::shared_ptr& type, values_buffer, null_count, values_bitmap); } +template +void ArrayFromVector(const std::shared_ptr& type, + const std::vector& values, std::shared_ptr* out) { + std::shared_ptr values_buffer; + + ASSERT_OK(test::CopyBufferFromVector(values, &values_buffer)); + + using ArrayType = typename TypeTraits::ArrayType; + *out = std::make_shared( + type, static_cast(values.size()), values_buffer); +} + class TestBuilder : public ::testing::Test { public: void SetUp() { diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 8ff9eea8705..9618af8fcdf 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -103,7 +103,7 @@ std::string UnionType::ToString() const { for (size_t i = 0; i < children_.size(); ++i) { if (i) { s << ", "; } - s << children_[i]->ToString(); + s << children_[i]->ToString() << "=" << type_ids[i]; } s << ">"; return s.str(); diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 73005707c9e..530c3235dc9 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -394,10 +394,10 @@ enum class UnionMode : char { SPARSE, DENSE }; struct ARROW_EXPORT UnionType : public DataType { static constexpr Type::type type_id = Type::UNION; - UnionType(const std::vector>& child_fields, + UnionType(const std::vector>& fields, const std::vector& type_ids, UnionMode mode = UnionMode::SPARSE) : DataType(Type::UNION), mode(mode), type_ids(type_ids) { - children_ = child_fields; + children_ = fields; } std::string ToString() const override; @@ -407,6 +407,10 @@ struct ARROW_EXPORT UnionType : public DataType { std::vector GetBufferLayout() const override; UnionMode mode; + + // The type id used in the data to indicate each data type in the union. For + // example, the first type in the union might be denoted by the id 5 (instead + // of 0). std::vector type_ids; }; From 4887fd2e5f27fda8cca3d4ba1cf66329bf769ce1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 2 Jan 2017 11:22:26 -0500 Subject: [PATCH 2/6] Move Windows stuff into a compatibility header, exclude from clang-format because of include order sensitivity Change-Id: I5376a75ca2df817570b1e6cf929bfd64e0809333 --- cpp/CMakeLists.txt | 4 ++- cpp/src/arrow/io/hdfs-internal.h | 12 +------- cpp/src/arrow/io/windows_compatibility.h | 36 ++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 12 deletions(-) create mode 100644 cpp/src/arrow/io/windows_compatibility.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index bf30543dc4d..13f0354a73b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -668,7 +668,9 @@ endif (UNIX) if (${CLANG_FORMAT_FOUND}) # runs clang format and updates files in place. add_custom_target(format ${BUILD_SUPPORT_DIR}/run-clang-format.sh ${CMAKE_CURRENT_SOURCE_DIR} ${CLANG_FORMAT_BIN} 1 - `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'` + `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | + sed -e '/_generated/g' | + sed -e '/windows_compatibility.h/g'` `find ${CMAKE_CURRENT_SOURCE_DIR}/../python -name \\*.cc -or -name \\*.h`) # runs clang format and exits with a non-zero exit code if any files need to be reformatted diff --git a/cpp/src/arrow/io/hdfs-internal.h b/cpp/src/arrow/io/hdfs-internal.h index fb57fb938d8..01cf1499857 100644 --- a/cpp/src/arrow/io/hdfs-internal.h +++ b/cpp/src/arrow/io/hdfs-internal.h @@ -20,21 +20,11 @@ #ifndef _WIN32 #include -#else - -// Windows defines min and max macros that mess up std::min/maxa -#ifndef NOMINMAX -#define NOMINMAX -#endif -#include -#include - -// TODO(wesm): address when/if we add windows support -// #include #endif #include +#include "arrow/io/windows_compatibility.h" #include "arrow/util/visibility.h" namespace arrow { diff --git a/cpp/src/arrow/io/windows_compatibility.h b/cpp/src/arrow/io/windows_compatibility.h new file mode 100644 index 00000000000..ac8f6aeeb5c --- /dev/null +++ b/cpp/src/arrow/io/windows_compatibility.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IO_WINDOWS_COMPATIBILITY +#define ARROW_IO_WINDOWS_COMPATIBILITY + +#ifdef _WIN32 + +// Windows defines min and max macros that mess up std::min/max +#ifndef NOMINMAX +#define NOMINMAX +#endif + +#include +#include + +// TODO(wesm): address when/if we add windows support +// #include + +#endif // _WIN32 + +#endif // ARROW_IO_WINDOWS_COMPATIBILITY From 30b718828983bf98b5de6e13c86db76c2d4d02b7 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 2 Jan 2017 13:27:58 -0500 Subject: [PATCH 3/6] Add test case for dense union, implement RangeEquals for it Change-Id: I69146616629ba3adcee56f63133fe9bd59922c5e --- cpp/src/arrow/array-list-test.cc | 2 +- cpp/src/arrow/array-primitive-test.cc | 2 +- cpp/src/arrow/array-struct-test.cc | 5 +- cpp/src/arrow/array-test.cc | 6 ++- cpp/src/arrow/array.cc | 33 +++++++----- cpp/src/arrow/array.h | 12 ++--- cpp/src/arrow/ipc/ipc-adapter-test.cc | 4 +- cpp/src/arrow/ipc/test-common.h | 73 ++++++++++++++++++--------- cpp/src/arrow/test-util.h | 2 +- 9 files changed, 85 insertions(+), 54 deletions(-) diff --git a/cpp/src/arrow/array-list-test.cc b/cpp/src/arrow/array-list-test.cc index 8baaf06a7db..8e4d319f5dc 100644 --- a/cpp/src/arrow/array-list-test.cc +++ b/cpp/src/arrow/array-list-test.cc @@ -89,7 +89,7 @@ class TestListBuilder : public TestBuilder { TEST_F(TestListBuilder, Equality) { Int32Builder* vb = static_cast(builder_->value_builder().get()); - ArrayPtr array, equal_array, unequal_array; + std::shared_ptr array, equal_array, unequal_array; vector equal_offsets = {0, 1, 2, 5}; vector equal_values = {1, 2, 3, 4, 5, 2, 2, 2}; vector unequal_offsets = {0, 1, 4}; diff --git a/cpp/src/arrow/array-primitive-test.cc b/cpp/src/arrow/array-primitive-test.cc index a10e2404f29..443abac459d 100644 --- a/cpp/src/arrow/array-primitive-test.cc +++ b/cpp/src/arrow/array-primitive-test.cc @@ -318,7 +318,7 @@ TYPED_TEST(TestPrimitiveBuilder, Equality) { this->RandomData(size); vector& draws = this->draws_; vector& valid_bytes = this->valid_bytes_; - ArrayPtr array, equal_array, unequal_array; + std::shared_ptr array, equal_array, unequal_array; auto builder = this->builder_.get(); ASSERT_OK(MakeArray(valid_bytes, draws, size, builder, &array)); ASSERT_OK(MakeArray(valid_bytes, draws, size, builder, &equal_array)); diff --git a/cpp/src/arrow/array-struct-test.cc b/cpp/src/arrow/array-struct-test.cc index 58386fe028f..5827c399dda 100644 --- a/cpp/src/arrow/array-struct-test.cc +++ b/cpp/src/arrow/array-struct-test.cc @@ -261,8 +261,9 @@ TEST_F(TestStructBuilder, BulkAppendInvalid) { } TEST_F(TestStructBuilder, TestEquality) { - ArrayPtr array, equal_array; - ArrayPtr unequal_bitmap_array, unequal_offsets_array, unequal_values_array; + std::shared_ptr array, equal_array; + std::shared_ptr unequal_bitmap_array, unequal_offsets_array, + unequal_values_array; vector int_values = {1, 2, 3, 4}; vector list_values = {'j', 'o', 'e', 'b', 'o', 'b', 'm', 'a', 'r', 'k'}; diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 783104e874b..a1d8fdfa91e 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -56,7 +56,8 @@ TEST_F(TestArray, TestLength) { ASSERT_EQ(arr->length(), 100); } -ArrayPtr MakeArrayFromValidBytes(const std::vector& v, MemoryPool* pool) { +std::shared_ptr MakeArrayFromValidBytes( + const std::vector& v, MemoryPool* pool) { int32_t null_count = v.size() - std::accumulate(v.begin(), v.end(), 0); std::shared_ptr null_buf = test::bytes_to_null_buffer(v); @@ -65,7 +66,8 @@ ArrayPtr MakeArrayFromValidBytes(const std::vector& v, MemoryPool* pool value_builder.Append(0); } - ArrayPtr arr(new Int32Array(v.size(), value_builder.Finish(), null_count, null_buf)); + std::shared_ptr arr( + new Int32Array(v.size(), value_builder.Finish(), null_count, null_buf)); return arr; } diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index 3027205c558..cbe8af4bbd7 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -494,16 +494,14 @@ Status StructArray::Accept(ArrayVisitor* visitor) const { // UnionArray UnionArray::UnionArray(const TypePtr& type, int32_t length, - std::vector>& children, + const std::vector>& children, const std::shared_ptr& type_ids, const std::shared_ptr& offsets, - int32_t null_count, std::shared_ptr null_bitmap) - : Array(type, length, null_count, null_bitmap) { - type_ = type; - children_ = children; - type_ids_buffer_ = type_ids; + int32_t null_count, const std::shared_ptr& null_bitmap) + : Array(type, length, null_count, null_bitmap), + children_(children), + type_ids_buffer_(type_ids), + offsets_buffer_(offsets) { type_ids_ = reinterpret_cast(type_ids->data()); - - offsets_buffer_ = offsets; if (offsets) { offsets_ = reinterpret_cast(offsets->data()); } } @@ -527,8 +525,8 @@ bool UnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_s if (Type::UNION != arr->type_enum()) { return false; } const auto& other = static_cast(*arr.get()); - if (mode() != other.mode()) { return false; } - DCHECK_EQ(mode(), UnionMode::SPARSE) << "Only SPARSE implemented"; + const UnionMode union_mode = mode(); + if (union_mode != other.mode()) { return false; } // Define a mapping from the type id to child number const auto& type_codes = static_cast(*arr->type().get()).type_ids; @@ -550,15 +548,24 @@ bool UnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_s for (int32_t i = start_idx, o_i = other_start_idx; i < end_idx; ++i, ++o_i) { if (IsNull(i) != other.IsNull(o_i)) { return false; } if (IsNull(i)) continue; - if (this_ids[i] != other_ids[i]) { return false; } + if (this_ids[i] != other_ids[o_i]) { return false; } id = this_ids[i]; child_num = type_id_to_child_num[id]; // TODO(wesm): really we should be comparing stretches of non-null data // rather than looking at one value at a time. - if (!child(child_num)->RangeEquals(i, i + 1, o_i, other.child(child_num))) { - return false; + if (union_mode == UnionMode::SPARSE) { + if (!child(child_num)->RangeEquals(i, i + 1, o_i, other.child(child_num))) { + return false; + } + } else { + const int32_t offset = offsets_[i]; + const int32_t o_offset = other.offsets_[i]; + if (!child(child_num)->RangeEquals( + offset, offset + 1, o_offset, other.child(child_num))) { + return false; + } } } return true; diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 98b2340c481..84fc70d18b2 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -108,8 +108,6 @@ class ARROW_EXPORT NullArray : public Array { Status Accept(ArrayVisitor* visitor) const override; }; -typedef std::shared_ptr ArrayPtr; - Status ARROW_EXPORT GetEmptyBitmap( MemoryPool* pool, int32_t length, std::shared_ptr* result); @@ -274,9 +272,9 @@ class ARROW_EXPORT ListArray : public Array { public: using TypeClass = ListType; - ListArray(const TypePtr& type, int32_t length, std::shared_ptr offsets, + ListArray(const TypePtr& type, int32_t length, const std::shared_ptr& offsets, const std::shared_ptr& values, int32_t null_count = 0, - std::shared_ptr null_bitmap = nullptr) + const std::shared_ptr& null_bitmap = nullptr) : Array(type, length, null_count, null_bitmap) { offsets_buffer_ = offsets; offsets_ = offsets == nullptr ? nullptr : reinterpret_cast( @@ -400,7 +398,7 @@ class ARROW_EXPORT StructArray : public Array { using TypeClass = StructType; StructArray(const TypePtr& type, int32_t length, - std::vector>& field_arrays, int32_t null_count = 0, + const std::vector>& field_arrays, int32_t null_count = 0, std::shared_ptr null_bitmap = nullptr) : Array(type, length, null_count, null_bitmap) { type_ = type; @@ -437,10 +435,10 @@ class UnionArray : public Array { using TypeClass = UnionType; UnionArray(const TypePtr& type, int32_t length, - std::vector>& children, + const std::vector>& children, const std::shared_ptr& type_ids, const std::shared_ptr& offsets = nullptr, int32_t null_count = 0, - std::shared_ptr null_bitmap = nullptr); + const std::shared_ptr& null_bitmap = nullptr); Status Validate() const override; diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index c085c66de64..90f16134c14 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -136,7 +136,7 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { int64_t* body_length, std::shared_ptr* schema) { const int batch_length = 5; TypePtr type = int32(); - ArrayPtr array; + std::shared_ptr array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array)); for (int i = 0; i < recursion_level; ++i) { @@ -149,7 +149,7 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture { *schema = std::shared_ptr(new Schema({f0})); - std::vector arrays = {array}; + std::vector> arrays = {array}; auto batch = std::make_shared(*schema, batch_length, arrays); std::string path = "test-write-past-max-recursion"; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 0abda38738c..c309e53bd86 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -110,7 +110,7 @@ Status MakeIntRecordBatch(std::shared_ptr* out) { template Status MakeRandomBinaryArray( - const TypePtr& type, int32_t length, MemoryPool* pool, ArrayPtr* out) { + const TypePtr& type, int32_t length, MemoryPool* pool, std::shared_ptr* out) { const std::vector values = { "", "", "abc", "123", "efg", "456!@#!@#", "12312"}; Builder builder(pool, type); @@ -225,7 +225,7 @@ Status MakeDeeplyNestedList(std::shared_ptr* out) { TypePtr type = int32(); MemoryPool* pool = default_memory_pool(); - ArrayPtr array; + std::shared_ptr array; const bool include_nulls = true; RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array)); for (int i = 0; i < 63; ++i) { @@ -235,7 +235,7 @@ Status MakeDeeplyNestedList(std::shared_ptr* out) { auto f0 = std::make_shared("f0", type); std::shared_ptr schema(new Schema({f0})); - std::vector arrays = {array}; + std::vector> arrays = {array}; out->reset(new RecordBatch(schema, batch_length, arrays)); return Status::OK(); } @@ -244,7 +244,7 @@ Status MakeStruct(std::shared_ptr* out) { // reuse constructed list columns std::shared_ptr list_batch; RETURN_NOT_OK(MakeListRecordBatch(&list_batch)); - std::vector columns = { + std::vector> columns = { list_batch->column(0), list_batch->column(1), list_batch->column(2)}; auto list_schema = list_batch->schema(); @@ -256,16 +256,16 @@ Status MakeStruct(std::shared_ptr* out) { std::shared_ptr schema(new Schema({f0, f1})); // construct individual nullable/non-nullable struct arrays - ArrayPtr no_nulls(new StructArray(type, list_batch->num_rows(), columns)); + std::shared_ptr no_nulls(new StructArray(type, list_batch->num_rows(), columns)); std::vector null_bytes(list_batch->num_rows(), 1); null_bytes[0] = 0; std::shared_ptr null_bitmask; RETURN_NOT_OK(BitUtil::BytesToBits(null_bytes, &null_bitmask)); - ArrayPtr with_nulls( + std::shared_ptr with_nulls( new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask)); // construct batch - std::vector arrays = {no_nulls, with_nulls}; + std::vector> arrays = {no_nulls, with_nulls}; out->reset(new RecordBatch(schema, list_batch->num_rows(), arrays)); return Status::OK(); } @@ -276,42 +276,65 @@ Status MakeSparseUnion(std::shared_ptr* out) { {std::make_shared("u0", int32()), std::make_shared("u1", boolean())}); std::vector type_codes = {0, 1}; - auto type = std::make_shared(union_types, type_codes, UnionMode::SPARSE); + auto sparse_type = + std::make_shared(union_types, type_codes, UnionMode::SPARSE); - auto f0 = std::make_shared("non_null_union", type, false); - auto f1 = std::make_shared("null_union", type); - std::shared_ptr schema(new Schema({f0, f1})); + auto dense_type = + std::make_shared(union_types, type_codes, UnionMode::DENSE); - // Create data - std::vector> children(2); + auto f0 = std::make_shared("sparse_nonnull", sparse_type, false); + auto f1 = std::make_shared("sparse", sparse_type); + auto f2 = std::make_shared("dense", dense_type); - const int32_t length = 6; + std::shared_ptr schema(new Schema({f0, f1, f2})); - std::vector f0_values = {0, 1, 2, 3, 4, 5, 6}; - ArrayFromVector(type->child(0)->type, f0_values, &children[0]); + // Create data + std::vector> sparse_children(2); + std::vector> dense_children(2); - std::vector f1_values = {1, 0, 1, 1, 0, 1}; - BooleanBuilder bool_builder(default_memory_pool()); - RETURN_NOT_OK(bool_builder.Append(f1_values.data(), length)); - RETURN_NOT_OK(bool_builder.Finish(&children[1])); + const int32_t length = 7; std::shared_ptr type_ids_buffer; - std::vector type_ids = {0, 1, 0, 0, 1, 1}; + std::vector type_ids = {0, 1, 0, 0, 1, 1, 0}; RETURN_NOT_OK(test::CopyBufferFromVector(type_ids, &type_ids_buffer)); + std::vector u0_values = {0, 1, 2, 3, 4, 5, 6}; + ArrayFromVector( + sparse_type->child(0)->type, u0_values, &sparse_children[0]); + + std::vector u1_values = {10, 11, 12, 13, 14, 15, 16}; + ArrayFromVector( + sparse_type->child(1)->type, u1_values, &sparse_children[1]); + + // dense children + u0_values = {0, 2, 3, 7}; + ArrayFromVector( + dense_type->child(0)->type, u0_values, &dense_children[0]); + + u1_values = {11, 14, 15}; + ArrayFromVector( + dense_type->child(1)->type, u1_values, &dense_children[1]); + + std::shared_ptr offsets_buffer; + std::vector offsets = {0, 0, 1, 2, 1, 2, 3}; + RETURN_NOT_OK(test::CopyBufferFromVector(offsets, &offsets_buffer)); + std::vector null_bytes(length, 1); null_bytes[2] = 0; std::shared_ptr null_bitmask; RETURN_NOT_OK(BitUtil::BytesToBits(null_bytes, &null_bitmask)); // construct individual nullable/non-nullable struct arrays - auto no_nulls = std::make_shared(type, length, children, type_ids_buffer); + auto sparse_no_nulls = + std::make_shared(sparse_type, length, sparse_children, type_ids_buffer); + auto sparse = std::make_shared( + sparse_type, length, sparse_children, type_ids_buffer, nullptr, 1, null_bitmask); - auto with_nulls = std::make_shared( - type, length, children, type_ids_buffer, nullptr, 1, null_bitmask); + auto dense = std::make_shared(dense_type, length, dense_children, + type_ids_buffer, offsets_buffer, 1, null_bitmask); // construct batch - std::vector arrays = {no_nulls, with_nulls}; + std::vector> arrays = {sparse_no_nulls, sparse, dense}; out->reset(new RecordBatch(schema, length, arrays)); return Status::OK(); } diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index ae397109dbc..ce9327d9009 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -305,7 +305,7 @@ class TestBuilder : public ::testing::Test { template Status MakeArray(const std::vector& valid_bytes, const std::vector& values, - int size, Builder* builder, ArrayPtr* out) { + int size, Builder* builder, std::shared_ptr* out) { // Append the first 1000 for (int i = 0; i < size; ++i) { if (valid_bytes[i] > 0) { From 3edca1eff1a84bd65cf19105684e1a5a70206a92 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 2 Jan 2017 14:50:10 -0500 Subject: [PATCH 4/6] Implement basic JSON roundtrip for unions Change-Id: If418d5862fbe84e0623c82fc74ecab5f89f86eef --- cpp/src/arrow/ipc/ipc-adapter-test.cc | 2 +- cpp/src/arrow/ipc/ipc-json-test.cc | 18 ++++++- cpp/src/arrow/ipc/json-internal.cc | 73 +++++++++++++++++++++------ cpp/src/arrow/ipc/test-common.h | 4 +- cpp/src/arrow/type.cc | 2 +- 5 files changed, 79 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 90f16134c14..6ba0a6e16be 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -95,7 +95,7 @@ TEST_P(TestWriteRecordBatch, RoundTrip) { INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch, ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, - &MakeStringTypesRecordBatch, &MakeStruct, &MakeSparseUnion)); + &MakeStringTypesRecordBatch, &MakeStruct, &MakeUnion)); void TestGetRecordBatchSize(std::shared_ptr batch) { ipc::MockOutputStream mock; diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index f793a265957..07509890da3 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -29,6 +29,7 @@ #include "arrow/builder.h" #include "arrow/ipc/json-internal.h" #include "arrow/ipc/json.h" +#include "arrow/ipc/test-common.h" #include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/table.h" @@ -142,11 +143,16 @@ TEST(TestJsonArrayWriter, NestedTypes) { auto value_type = int32(); std::vector values_is_valid = {true, false, true, true, false, true, true}; - std::vector values = {0, 1, 2, 3, 4, 5, 6}; + std::vector values = {0, 1, 2, 3, 4, 5, 6}; std::shared_ptr values_array; ArrayFromVector(int32(), values_is_valid, values, &values_array); + std::vector i16_values = {0, 1, 2, 3, 4, 5, 6}; + std::shared_ptr i16_values_array; + ArrayFromVector( + int16(), values_is_valid, i16_values, &i16_values_array); + // List std::vector list_is_valid = {true, false, true, true, true}; std::vector offsets = {0, 0, 0, 1, 4, 7}; @@ -173,6 +179,16 @@ TEST(TestJsonArrayWriter, NestedTypes) { TestArrayRoundTrip(struct_array); } +TEST(TestJsonArrayWriter, Unions) { + std::shared_ptr batch; + ASSERT_OK(MakeUnion(&batch)); + + for (int i = 0; i < batch->num_columns(); ++i) { + std::shared_ptr col = batch->column(i); + TestArrayRoundTrip(*col.get()); + } +} + // Data generation for test case below void MakeBatchArrays(const std::shared_ptr& schema, const int num_rows, std::vector>* arrays) { diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc index 88412e6bed4..4f980d3e5d1 100644 --- a/cpp/src/arrow/ipc/json-internal.cc +++ b/cpp/src/arrow/ipc/json-internal.cc @@ -539,7 +539,7 @@ class JsonArrayWriter : public ArrayVisitor { WriteValidityField(array); auto type = static_cast(array.type().get()); - WriteIntegerField("TYPE_IDS", array.raw_type_ids(), array.length()); + WriteIntegerField("TYPE_ID", array.raw_type_ids(), array.length()); if (type->mode == UnionMode::DENSE) { WriteIntegerField("OFFSET", array.raw_offsets(), array.length()); } @@ -854,27 +854,35 @@ class JsonArrayReader { return builder.Finish(array); } + template + Status GetIntArray( + const RjArray& json_array, const int32_t length, std::shared_ptr* out) { + auto buffer = std::make_shared(pool_); + RETURN_NOT_OK(buffer->Resize(length * sizeof(T))); + T* values = reinterpret_cast(buffer->mutable_data()); + for (int i = 0; i < length; ++i) { + const rj::Value& val = json_array[i]; + DCHECK(val.IsInt()); + values[i] = static_cast(val.GetInt()); + } + + *out = buffer; + return Status::OK(); + } + template typename std::enable_if::value, Status>::type ReadArray( const RjObject& json_array, int32_t length, const std::vector& is_valid, const std::shared_ptr& type, std::shared_ptr* array) { - const auto& json_offsets = json_array.FindMember("OFFSET"); - RETURN_NOT_ARRAY("OFFSET", json_offsets, json_array); - const auto& json_offsets_arr = json_offsets->value.GetArray(); - int32_t null_count = 0; std::shared_ptr validity_buffer; RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer)); - auto offsets_buffer = std::make_shared(pool_); - RETURN_NOT_OK(offsets_buffer->Resize((length + 1) * sizeof(int32_t))); - int32_t* offsets = reinterpret_cast(offsets_buffer->mutable_data()); - - for (int i = 0; i < length + 1; ++i) { - const rj::Value& val = json_offsets_arr[i]; - DCHECK(val.IsInt()); - offsets[i] = val.GetInt(); - } + const auto& json_offsets = json_array.FindMember("OFFSET"); + RETURN_NOT_ARRAY("OFFSET", json_offsets, json_array); + std::shared_ptr offsets_buffer; + RETURN_NOT_OK(GetIntArray( + json_offsets->value.GetArray(), length + 1, &offsets_buffer)); std::vector> children; RETURN_NOT_OK(GetChildren(json_array, type, &children)); @@ -903,6 +911,41 @@ class JsonArrayReader { return Status::OK(); } + template + typename std::enable_if::value, Status>::type ReadArray( + const RjObject& json_array, int32_t length, const std::vector& is_valid, + const std::shared_ptr& type, std::shared_ptr* array) { + int32_t null_count = 0; + + const auto& union_type = static_cast(*type.get()); + + std::shared_ptr validity_buffer; + std::shared_ptr type_id_buffer; + std::shared_ptr offsets_buffer; + + RETURN_NOT_OK(GetValidityBuffer(is_valid, &null_count, &validity_buffer)); + + const auto& json_type_ids = json_array.FindMember("TYPE_ID"); + RETURN_NOT_ARRAY("TYPE_ID", json_type_ids, json_array); + RETURN_NOT_OK( + GetIntArray(json_type_ids->value.GetArray(), length, &type_id_buffer)); + + if (union_type.mode == UnionMode::DENSE) { + const auto& json_offsets = json_array.FindMember("OFFSET"); + RETURN_NOT_ARRAY("OFFSET", json_offsets, json_array); + RETURN_NOT_OK( + GetIntArray(json_offsets->value.GetArray(), length, &offsets_buffer)); + } + + std::vector> children; + RETURN_NOT_OK(GetChildren(json_array, type, &children)); + + *array = std::make_shared(type, length, children, type_id_buffer, + offsets_buffer, null_count, validity_buffer); + + return Status::OK(); + } + template typename std::enable_if::value, Status>::type ReadArray( const RjObject& json_array, int32_t length, const std::vector& is_valid, @@ -999,7 +1042,7 @@ class JsonArrayReader { NOT_IMPLEMENTED_CASE(INTERVAL); TYPE_CASE(ListType); TYPE_CASE(StructType); - NOT_IMPLEMENTED_CASE(UNION); + TYPE_CASE(UnionType); default: std::stringstream ss; ss << type->ToString(); diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index c309e53bd86..d68372b8f79 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -270,10 +270,10 @@ Status MakeStruct(std::shared_ptr* out) { return Status::OK(); } -Status MakeSparseUnion(std::shared_ptr* out) { +Status MakeUnion(std::shared_ptr* out) { // Define schema std::vector> union_types( - {std::make_shared("u0", int32()), std::make_shared("u1", boolean())}); + {std::make_shared("u0", int32()), std::make_shared("u1", uint8())}); std::vector type_codes = {0, 1}; auto sparse_type = diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 9618af8fcdf..89faab6ec6a 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -103,7 +103,7 @@ std::string UnionType::ToString() const { for (size_t i = 0; i < children_.size(); ++i) { if (i) { s << ", "; } - s << children_[i]->ToString() << "=" << type_ids[i]; + s << children_[i]->ToString() << "=" << static_cast(type_ids[i]); } s << ">"; return s.str(); From cdfc61deff2415876d34d8a0ce66daad4380b7e3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 2 Jan 2017 14:51:54 -0500 Subject: [PATCH 5/6] Export UnionArray Change-Id: I02a81e35f277fb08ad54ffafec878ff9e7b025e7 --- cpp/src/arrow/array.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 84fc70d18b2..cd42a28e251 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -430,7 +430,7 @@ class ARROW_EXPORT StructArray : public Array { // ---------------------------------------------------------------------- // Union -class UnionArray : public Array { +class ARROW_EXPORT UnionArray : public Array { public: using TypeClass = UnionType; From 86c4191ee173669d0c4393ca1287ea98d25118e2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 2 Jan 2017 15:20:20 -0500 Subject: [PATCH 6/6] Fix valgrind error Change-Id: Ie49471df170207c6a9ce7be3601096e8f48d2204 --- cpp/src/arrow/array.cc | 2 +- cpp/src/arrow/ipc/test-common.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index cbe8af4bbd7..3d309b8b92f 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -536,7 +536,7 @@ bool UnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_s } // Store mapping in a vector for constant time lookups - std::vector type_id_to_child_num(max_code); + std::vector type_id_to_child_num(max_code + 1); for (uint8_t i = 0; i < static_cast(type_codes.size()); ++i) { type_id_to_child_num[type_codes[i]] = i; } diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index d68372b8f79..3faeebf9569 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -275,7 +275,7 @@ Status MakeUnion(std::shared_ptr* out) { std::vector> union_types( {std::make_shared("u0", int32()), std::make_shared("u1", uint8())}); - std::vector type_codes = {0, 1}; + std::vector type_codes = {5, 10}; auto sparse_type = std::make_shared(union_types, type_codes, UnionMode::SPARSE); @@ -295,7 +295,7 @@ Status MakeUnion(std::shared_ptr* out) { const int32_t length = 7; std::shared_ptr type_ids_buffer; - std::vector type_ids = {0, 1, 0, 0, 1, 1, 0}; + std::vector type_ids = {5, 10, 5, 5, 10, 10, 5}; RETURN_NOT_OK(test::CopyBufferFromVector(type_ids, &type_ids_buffer)); std::vector u0_values = {0, 1, 2, 3, 4, 5, 6};