From cab6d4f9da16b714273cbb8c52562a83de19696b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 11 Mar 2017 22:28:36 -0500 Subject: [PATCH] Add functions to make record batches for date, date32, timestamp, time. Fix bugs Change-Id: I31644429fdb93f1a9736f8dd9d4d7b322842eb81 --- cpp/src/arrow/ipc/adapter.cc | 75 ++++++--------------- cpp/src/arrow/ipc/feather-test.cc | 38 ----------- cpp/src/arrow/ipc/ipc-adapter-test.cc | 3 +- cpp/src/arrow/ipc/test-common.h | 97 +++++++++++++++++++++++++++ cpp/src/arrow/test-util.h | 18 +++++ cpp/src/arrow/type.h | 2 +- cpp/src/arrow/type_traits.h | 6 +- 7 files changed, 141 insertions(+), 98 deletions(-) diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 78d58101963..a4eff7214aa 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -309,66 +309,31 @@ class RecordBatchWriter : public ArrayVisitor { return Status::OK(); } - Status Visit(const Int8Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const Int16Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const Int32Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const Int64Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const UInt8Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const UInt16Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const UInt32Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const UInt64Array& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const HalfFloatArray& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const FloatArray& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const DoubleArray& array) override { - return VisitFixedWidth(array); - } +#define VISIT_FIXED_WIDTH(TYPE) \ + Status Visit(const TYPE& array) override { return VisitFixedWidth(array); } + + VISIT_FIXED_WIDTH(Int8Array); + VISIT_FIXED_WIDTH(Int16Array); + VISIT_FIXED_WIDTH(Int32Array); + VISIT_FIXED_WIDTH(Int64Array); + VISIT_FIXED_WIDTH(UInt8Array); + VISIT_FIXED_WIDTH(UInt16Array); + VISIT_FIXED_WIDTH(UInt32Array); + VISIT_FIXED_WIDTH(UInt64Array); + VISIT_FIXED_WIDTH(HalfFloatArray); + VISIT_FIXED_WIDTH(FloatArray); + VISIT_FIXED_WIDTH(DoubleArray); + VISIT_FIXED_WIDTH(DateArray); + VISIT_FIXED_WIDTH(Date32Array); + VISIT_FIXED_WIDTH(TimeArray); + VISIT_FIXED_WIDTH(TimestampArray); + +#undef VISIT_FIXED_WIDTH Status Visit(const StringArray& array) override { return VisitBinary(array); } Status Visit(const BinaryArray& array) override { return VisitBinary(array); } - Status Visit(const DateArray& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const TimeArray& array) override { - return VisitFixedWidth(array); - } - - Status Visit(const TimestampArray& array) override { - return VisitFixedWidth(array); - } - Status Visit(const ListArray& array) override { std::shared_ptr value_offsets; RETURN_NOT_OK(GetZeroBasedValueOffsets(array, &value_offsets)); diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc index b73246b6722..078c3e10aff 100644 --- a/cpp/src/arrow/ipc/feather-test.cc +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -344,44 +344,6 @@ TEST_F(TestTableWriter, PrimitiveRoundTrip) { ASSERT_EQ("f1", col->name()); } -Status MakeDictionaryFlat(std::shared_ptr* out) { - const int64_t length = 6; - - std::vector is_valid = {true, true, false, true, true, true}; - std::shared_ptr dict1, dict2; - - std::vector dict1_values = {"foo", "bar", "baz"}; - std::vector dict2_values = {"foo", "bar", "baz", "qux"}; - - ArrayFromVector(dict1_values, &dict1); - ArrayFromVector(dict2_values, &dict2); - - auto f0_type = arrow::dictionary(arrow::int32(), dict1); - auto f1_type = arrow::dictionary(arrow::int8(), dict1); - auto f2_type = arrow::dictionary(arrow::int32(), dict2); - - std::shared_ptr indices0, indices1, indices2; - std::vector indices0_values = {1, 2, -1, 0, 2, 0}; - std::vector indices1_values = {0, 0, 2, 2, 1, 1}; - std::vector indices2_values = {3, 0, 2, 1, 0, 2}; - - ArrayFromVector(is_valid, indices0_values, &indices0); - ArrayFromVector(is_valid, indices1_values, &indices1); - ArrayFromVector(is_valid, indices2_values, &indices2); - - auto a0 = std::make_shared(f0_type, indices0); - auto a1 = std::make_shared(f1_type, indices1); - auto a2 = std::make_shared(f2_type, indices2); - - // construct batch - std::shared_ptr schema(new Schema( - {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type)})); - - std::vector> arrays = {a0, a1, a2}; - out->reset(new RecordBatch(schema, length, arrays)); - return Status::OK(); -} - TEST_F(TestTableWriter, CategoryRoundtrip) { std::shared_ptr batch; ASSERT_OK(MakeDictionaryFlat(&batch)); diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index 6678fd522a8..b60b8a9ba68 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -175,7 +175,8 @@ INSTANTIATE_TEST_CASE_P( RoundTripTests, TestRecordBatchParam, ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch, &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch, - &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary)); + &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary, &MakeDates, + &MakeTimestamps, &MakeTimes)); void TestGetRecordBatchSize(std::shared_ptr batch) { ipc::MockOutputStream mock; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index dc823662ee1..7f33aba812e 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -425,6 +425,103 @@ Status MakeDictionary(std::shared_ptr* out) { return Status::OK(); } +Status MakeDictionaryFlat(std::shared_ptr* out) { + const int64_t length = 6; + + std::vector is_valid = {true, true, false, true, true, true}; + std::shared_ptr dict1, dict2; + + std::vector dict1_values = {"foo", "bar", "baz"}; + std::vector dict2_values = {"foo", "bar", "baz", "qux"}; + + ArrayFromVector(dict1_values, &dict1); + ArrayFromVector(dict2_values, &dict2); + + auto f0_type = arrow::dictionary(arrow::int32(), dict1); + auto f1_type = arrow::dictionary(arrow::int8(), dict1); + auto f2_type = arrow::dictionary(arrow::int32(), dict2); + + std::shared_ptr indices0, indices1, indices2; + std::vector indices0_values = {1, 2, -1, 0, 2, 0}; + std::vector indices1_values = {0, 0, 2, 2, 1, 1}; + std::vector indices2_values = {3, 0, 2, 1, 0, 2}; + + ArrayFromVector(is_valid, indices0_values, &indices0); + ArrayFromVector(is_valid, indices1_values, &indices1); + ArrayFromVector(is_valid, indices2_values, &indices2); + + auto a0 = std::make_shared(f0_type, indices0); + auto a1 = std::make_shared(f1_type, indices1); + auto a2 = std::make_shared(f2_type, indices2); + + // construct batch + std::shared_ptr schema(new Schema( + {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type)})); + + std::vector> arrays = {a0, a1, a2}; + out->reset(new RecordBatch(schema, length, arrays)); + return Status::OK(); +} + +Status MakeDates(std::shared_ptr* out) { + std::vector is_valid = {true, true, true, false, true, true, true}; + auto f0 = field("f0", date32()); + auto f1 = field("f1", date()); + std::shared_ptr schema(new Schema({f0, f1})); + + std::vector date_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; + std::vector date32_values = {0, 1, 2, 3, 4, 5, 6}; + + std::shared_ptr date_array, date32_array; + ArrayFromVector(is_valid, date_values, &date_array); + ArrayFromVector(is_valid, date32_values, &date32_array); + + std::vector> arrays = {date32_array, date_array}; + *out = std::make_shared(schema, date_array->length(), arrays); + return Status::OK(); +} + +Status MakeTimestamps(std::shared_ptr* out) { + std::vector is_valid = {true, true, true, false, true, true, true}; + auto f0 = field("f0", timestamp(TimeUnit::MILLI)); + auto f1 = field("f1", timestamp(TimeUnit::NANO)); + auto f2 = field("f2", timestamp("US/Los_Angeles", TimeUnit::SECOND)); + std::shared_ptr schema(new Schema({f0, f1, f2})); + + std::vector ts_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; + + std::shared_ptr a0, a1, a2; + ArrayFromVector(f0->type, is_valid, ts_values, &a0); + ArrayFromVector(f1->type, is_valid, ts_values, &a1); + ArrayFromVector(f2->type, is_valid, ts_values, &a2); + + ArrayVector arrays = {a0, a1, a2}; + *out = std::make_shared(schema, a0->length(), arrays); + return Status::OK(); +} + +Status MakeTimes(std::shared_ptr* out) { + std::vector is_valid = {true, true, true, false, true, true, true}; + auto f0 = field("f0", time(TimeUnit::MILLI)); + auto f1 = field("f1", time(TimeUnit::NANO)); + auto f2 = field("f2", time(TimeUnit::SECOND)); + std::shared_ptr schema(new Schema({f0, f1, f2})); + + std::vector ts_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; + + std::shared_ptr a0, a1, a2; + ArrayFromVector(f0->type, is_valid, ts_values, &a0); + ArrayFromVector(f1->type, is_valid, ts_values, &a1); + ArrayFromVector(f2->type, is_valid, ts_values, &a2); + + ArrayVector arrays = {a0, a1, a2}; + *out = std::make_shared(schema, a0->length(), arrays); + return Status::OK(); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 11ce50a76a5..f05a54168b6 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -63,6 +63,8 @@ namespace arrow { +using ArrayVector = std::vector>; + namespace test { template @@ -232,6 +234,22 @@ class TestBase : public ::testing::Test { MemoryPool* pool_; }; +template +void ArrayFromVector(const std::shared_ptr& type, + const std::vector& is_valid, const std::vector& values, + std::shared_ptr* out) { + MemoryPool* pool = default_memory_pool(); + typename TypeTraits::BuilderType builder(pool, type); + for (size_t i = 0; i < values.size(); ++i) { + if (is_valid[i]) { + ASSERT_OK(builder.Append(values[i])); + } else { + ASSERT_OK(builder.AppendNull()); + } + } + ASSERT_OK(builder.Finish(out)); +} + template void ArrayFromVector(const std::vector& is_valid, const std::vector& values, std::shared_ptr* out) { diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index aa0d70e5505..a838082d7e7 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -452,7 +452,7 @@ struct ARROW_EXPORT Date32Type : public FixedWidthType { Date32Type() : FixedWidthType(Type::DATE32) {} - int bit_width() const override { return static_cast(sizeof(c_type) * 8); } + int bit_width() const override { return static_cast(sizeof(c_type) * 4); } Status Accept(TypeVisitor* visitor) const override; std::string ToString() const override; diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h index 2cd14203cdb..91461da8c42 100644 --- a/cpp/src/arrow/type_traits.h +++ b/cpp/src/arrow/type_traits.h @@ -121,7 +121,7 @@ struct TypeTraits { template <> struct TypeTraits { using ArrayType = DateArray; - // using BuilderType = DateBuilder; + using BuilderType = DateBuilder; static inline int64_t bytes_required(int64_t elements) { return elements * sizeof(int64_t); @@ -145,7 +145,7 @@ struct TypeTraits { template <> struct TypeTraits { using ArrayType = TimestampArray; - // using BuilderType = TimestampBuilder; + using BuilderType = TimestampBuilder; static inline int64_t bytes_required(int64_t elements) { return elements * sizeof(int64_t); @@ -156,7 +156,7 @@ struct TypeTraits { template <> struct TypeTraits { using ArrayType = TimeArray; - // using BuilderType = TimestampBuilder; + using BuilderType = TimeBuilder; static inline int64_t bytes_required(int64_t elements) { return elements * sizeof(int64_t);