diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index cef17e5aaba..3babf4ba78e 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -130,6 +130,15 @@ class ARROW_EXPORT ArrayBuilder { DISALLOW_COPY_AND_ASSIGN(ArrayBuilder); }; +class ARROW_EXPORT NullArrayBuilder : public ArrayBuilder { + public: + explicit NullArrayBuilder(MemoryPool* pool, const TypePtr& type) : ArrayBuilder(pool, type) {} + virtual ~NullArrayBuilder() {}; + Status Finish(std::shared_ptr* out) override { + return Status::OK(); + } +}; + } // namespace arrow #endif // ARROW_BUILDER_H_ diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index 47bb0893863..2c562bc028d 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -80,9 +80,11 @@ if(ARROW_HDFS) ${ARROW_IO_TEST_LINK_LIBS}) endif() -add_library(arrow_io SHARED +add_library(arrow_io STATIC ${ARROW_IO_SRCS} ) +set_property(TARGET arrow_io PROPERTY POSITION_INDEPENDENT_CODE 1) + target_link_libraries(arrow_io LINK_PUBLIC ${ARROW_IO_LINK_LIBS} LINK_PRIVATE ${ARROW_IO_PRIVATE_LINK_LIBS}) diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 6955bcb6c23..d0816c4a42b 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -41,9 +41,10 @@ set(ARROW_IPC_SRCS ) # TODO(wesm): SHARED and STATIC targets -add_library(arrow_ipc SHARED +add_library(arrow_ipc STATIC ${ARROW_IPC_SRCS} ) +set_property(TARGET arrow_ipc PROPERTY POSITION_INDEPENDENT_CODE 1) if(FLATBUFFERS_VENDORED) add_dependencies(arrow_ipc flatbuffers_ep) endif() diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index da718c08d54..4628ace3abf 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -37,6 +37,7 @@ #include "arrow/types/primitive.h" #include "arrow/types/string.h" #include "arrow/types/struct.h" +#include "arrow/types/union.h" #include "arrow/util/bit-util.h" #include "arrow/util/buffer.h" #include "arrow/util/logging.h" @@ -115,6 +116,13 @@ Status VisitArray(const Array* arr, std::vector* field_nodes RETURN_NOT_OK( VisitArray(field.get(), field_nodes, buffers, max_recursion_depth - 1)); } + } else if (arr->type_enum() == Type::UNION) { + const auto union_arr = static_cast(arr); + buffers->push_back(union_arr->types()); + buffers->push_back(union_arr->offset_buf()); + for (auto& child_arr : union_arr->children()) { + RETURN_NOT_OK(VisitArray(child_arr.get(), field_nodes, buffers, max_recursion_depth - 1)); + } } else { return Status::NotImplemented("Unrecognized type"); } @@ -363,6 +371,22 @@ class RecordBatchReader::RecordBatchReaderImpl { out->reset(new StructArray( type, field_meta.length, fields, field_meta.null_count, null_bitmap)); return Status::OK(); + } else if (type->type == Type::UNION) { + std::shared_ptr types; + RETURN_NOT_OK(GetBuffer(buffer_index_++, &types)); + std::shared_ptr offset_buf; + RETURN_NOT_OK(GetBuffer(buffer_index_++, &offset_buf)); + auto union_type = std::dynamic_pointer_cast(type); + const int num_children = union_type->num_children(); + std::vector results; + for (int child_idx = 0; child_idx < num_children; ++child_idx) { + std::shared_ptr result; + RETURN_NOT_OK(NextArray(union_type->child(child_idx).get(), max_recursion_depth - 1, &result)); + results.push_back(result); + } + out->reset(new UnionArray( + type, field_meta.length, results, types, offset_buf, field_meta.null_count, null_bitmap)); + return Status::OK(); } return Status::NotImplemented("Non-primitive types not complete yet"); diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 1dc39692332..8d101cf586a 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -26,6 +26,7 @@ #include "arrow/schema.h" #include "arrow/test-util.h" #include "arrow/type.h" +#include "arrow/types/union.h" #include "arrow/util/status.h" namespace arrow { @@ -97,6 +98,22 @@ TEST_F(TestSchemaMessage, NestedFields) { CheckRoundtrip(&schema); } +TEST_F(TestSchemaMessage, UnionType) { + auto f0 = std::make_shared("f0", TypePtr(new Int32Type())); + auto f1 = std::make_shared("f1", TypePtr(new Int64Type())); + std::vector type_ids = {}; // TODO(pcm): Implement typeIds + auto ud = TypePtr(new UnionType(std::vector>({f0, f1}), + type_ids, UnionMode::DENSE)); + auto fd = std::make_shared("f", ud); + Schema schema_dense({fd}); + CheckRoundtrip(&schema_dense); + auto us = TypePtr(new UnionType(std::vector>({f0, f1}), + type_ids, UnionMode::SPARSE)); + auto fs = std::make_shared("f", us); + Schema schema_sparse({fs}); + CheckRoundtrip(&schema_sparse); +} + class TestFileFooter : public ::testing::Test { public: void SetUp() {} diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 7102012c29a..19a810ac1c6 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -28,6 +28,7 @@ #include "arrow/ipc/Message_generated.h" #include "arrow/schema.h" #include "arrow/type.h" +#include "arrow/types/union.h" #include "arrow/util/buffer.h" #include "arrow/util/status.h" @@ -119,8 +120,20 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, case flatbuf::Type_Struct_: *out = std::make_shared(children); return Status::OK(); - case flatbuf::Type_Union: - return Status::NotImplemented("Type is not implemented"); + case flatbuf::Type_Union: { + std::vector type_ids = {}; // TODO(pcm): Implement typeIds + auto union_data = static_cast(type_data); + UnionMode mode; + if (union_data->mode() == flatbuf::UnionMode_Sparse) { + mode = UnionMode::SPARSE; + } else if (union_data->mode() == flatbuf::UnionMode_Dense) { + mode = UnionMode::DENSE; + } else { + return Status::Invalid("Unrecognized UnionMode"); + } + *out = std::make_shared(children, type_ids, mode); + } + return Status::OK(); default: return Status::Invalid("Unrecognized type"); } @@ -158,6 +171,18 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr& type return Status::OK(); } +static Status UnionToFlatbuffer(FBB& fbb, const std::shared_ptr& type, + std::vector* out_children, Offset* offset) { + auto union_type = std::dynamic_pointer_cast(type); + FieldOffset field; + for (int i = 0; i < union_type->num_children(); ++i) { + RETURN_NOT_OK(FieldToFlatbuffer(fbb, union_type->child(i), &field)); + out_children->push_back(field); + } + *offset = flatbuf::CreateUnion(fbb).Union(); + return Status::OK(); +} + #define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \ *out_type = flatbuf::Type_Int; \ *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \ @@ -208,6 +233,9 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, case Type::STRUCT: *out_type = flatbuf::Type_Struct_; return StructToFlatbuffer(fbb, type, children, offset); + case Type::UNION: + *out_type = flatbuf::Type_Union; + return UnionToFlatbuffer(fbb, type, children, offset); default: *out_type = flatbuf::Type_NONE; // Make clang-tidy happy std::stringstream ss; diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 589bdadb77c..e82486d0ed2 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -105,6 +105,16 @@ std::string UnionType::ToString() const { return s.str(); } +bool UnionType::Equals(const DataType* other) const { + if (!DataType::Equals(other)) { + return false; + } + const UnionType *union_type = dynamic_cast(other); + return union_type && type_id == union_type->type_id + && std::equal(type_ids.begin(), type_ids.end(), + union_type->type_ids.begin()); +} + int NullType::bit_width() const { return 0; } diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 5b4d7bc42bd..e2f47f11b07 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -375,6 +375,11 @@ struct ARROW_EXPORT UnionType : public DataType { static std::string name() { return "union"; } Status Accept(TypeVisitor* visitor) const override; + bool Equals(const DataType* other) const override; + bool Equals(const std::shared_ptr& other) const { + return Equals(other.get()); + } + UnionMode mode; std::vector type_ids; }; diff --git a/cpp/src/arrow/types/union.cc b/cpp/src/arrow/types/union.cc index cc2934b2e4a..2a5be998694 100644 --- a/cpp/src/arrow/types/union.cc +++ b/cpp/src/arrow/types/union.cc @@ -23,5 +23,50 @@ #include #include "arrow/type.h" +#include "arrow/util/status.h" -namespace arrow {} // namespace arrow +namespace arrow { + +bool UnionArray::Equals(const std::shared_ptr& arr) const { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (this->type_enum() != arr->type_enum()) { return false; } + if (null_count_ != arr->null_count()) { return false; } + return RangeEquals(0, length_, 0, arr); +} + +bool UnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr& arr) const { + if (this == arr.get()) { return true; } + if (Type::UNION != arr->type_enum()) { return false; } + const auto other = static_cast(arr.get()); + + // TODO(pcm): Handle sparse case here + + int32_t i = start_idx; + int32_t o_i = other_start_idx; + for (size_t c = 0; c < other->children().size(); ++c) { + for (int32_t e = 0; e < other->children()[c]->length(); ++e) { + if (!children()[c]->RangeEquals(e, e + 1, e, other->children()[c])) { // FIXME(pcm): fix this + return false; + } + i += 1; + o_i += 1; + if (i >= end_idx) { + return true; + } + } + } + return false; // to make the compiler happy +} + +Status UnionArray::Validate() const { + // TODO(pcm): what to do here? + return Status::OK(); +} + +Status UnionArray::Accept(ArrayVisitor* visitor) const { + return visitor->Visit(*this); +} + +} // namespace arrow diff --git a/cpp/src/arrow/types/union.h b/cpp/src/arrow/types/union.h index 44f39cc6994..2e56ecf2e99 100644 --- a/cpp/src/arrow/types/union.h +++ b/cpp/src/arrow/types/union.h @@ -24,25 +24,45 @@ #include "arrow/array.h" #include "arrow/type.h" +#include "arrow/types/primitive.h" namespace arrow { class Buffer; -class UnionArray : public Array { +class ARROW_EXPORT UnionArray : public Array { + public: + UnionArray(const TypePtr& type, int32_t length, std::vector& children, + std::shared_ptr types, std::shared_ptr offset_buf, + int32_t null_count = 0, std::shared_ptr null_bitmap = nullptr) + : Array(type, length, null_count, null_bitmap), types_(types) { + type_ = type; + children_ = children; + offset_buf_ = offset_buf; + } + + const std::shared_ptr& types() const { return types_; } + + const std::vector& children() const { return children_; } + + const std::shared_ptr& offset_buf() const { return offset_buf_; } + + Status Validate() const override; + + Status Accept(ArrayVisitor* visitor) const override; + + bool Equals(const std::shared_ptr& arr) const override; + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr& arr) const override; + + ArrayPtr child(int32_t index) const { return children_[index]; } protected: // The data are types encoded as int16 - Buffer* types_; + std::shared_ptr types_; std::vector> children_; + std::shared_ptr offset_buf_; }; -class DenseUnionArray : public UnionArray { - protected: - Buffer* offset_buf_; -}; - -class SparseUnionArray : public UnionArray {}; - } // namespace arrow #endif // ARROW_TYPES_UNION_H