From 3aa64faac948ef022c9144f7372689b2b3d35cf1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 00:37:51 -0400 Subject: [PATCH 1/8] Add conversion for TIMESTAMP_MICROS --- src/parquet/arrow/schema.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 25713a70..2721fc5f 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -455,12 +455,16 @@ Status FieldToNode(const std::shared_ptr& field, break; case ArrowType::TIMESTAMP: { auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get()); - if (timestamp_type->unit() != ::arrow::TimestampType::Unit::MILLI) { + auto unit = timestamp_type->unit(); + type = ParquetType::INT64; + if (unit == ::arrow::TimeUnit::MILLI) { + logical_type = LogicalType::TIMESTAMP_MILLIS; + } else if (unit == ::arrow::TimeUnit::MICRO) { + logical_type = LogicalType::TIMESTAMP_MICROS; + } else { return Status::NotImplemented( - "Other timestamp units than millisecond are not yet support with parquet."); + "Only MILLI and MICRO units supported for Arrow timestamps with Parquet."); } - type = ParquetType::INT64; - logical_type = LogicalType::TIMESTAMP_MILLIS; } break; case ArrowType::TIME32: type = ParquetType::INT64; From 2ab7f12256a58a9b8fd3d6cea94b93f28aa97f6b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 18:30:54 -0400 Subject: [PATCH 2/8] Plumbing and expansions for rest of Arrow date/time types --- src/parquet/arrow/arrow-reader-writer-test.cc | 149 ++++++++++++++---- src/parquet/arrow/arrow-schema-test.cc | 26 ++- src/parquet/arrow/reader.cc | 16 +- src/parquet/arrow/schema.cc | 18 ++- src/parquet/arrow/writer.cc | 14 +- 5 files changed, 177 insertions(+), 46 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 0bdc14d3..cd6f8ab0 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -44,6 +44,7 @@ using arrow::PrimitiveArray; using arrow::Status; using arrow::Table; +using ArrowId = ::arrow::Type; using ParquetType = parquet::Type; using parquet::schema::GroupNode; using parquet::schema::NodePtr; @@ -58,13 +59,98 @@ const int LARGE_SIZE = 10000; constexpr uint32_t kDefaultSeed = 0; +LogicalType::type get_logical_type(const ::arrow::DataType& type) { + switch (type.id()) { + case ArrowId::UINT8: + return LogicalType::UINT_8; + case ArrowId::INT8: + return LogicalType::INT_8; + case ArrowId::UINT16: + return LogicalType::UINT_16; + case ArrowId::INT16: + return LogicalType::INT_16; + case ArrowId::UINT32: + return LogicalType::UINT_32; + case ArrowId::INT32: + return LogicalType::INT_32; + case ArrowId::UINT64: + return LogicalType::UINT_64; + case ArrowId::INT64: + return LogicalType::INT_64; + case ArrowId::STRING: + return LogicalType::UTF8; + case ArrowId::DATE32: + return LogicalType::DATE; + case ArrowId::DATE64: + return LogicalType::DATE; + case ArrowId::TIMESTAMP: { + const auto& ts_type = static_cast(type); + switch (ts_type.unit()) { + case ::arrow::TimeUnit::MILLI: + return LogicalType::TIMESTAMP_MILLIS; + case ::arrow::TimeUnit::MICRO: + return LogicalType::TIMESTAMP_MICROS; + default: + DCHECK(false) + << "Only MILLI and MICRO units supported for Arrow timestamps with Parquet."; + } + } + case ArrowId::TIME32: + return LogicalType::TIME_MILLIS; + case ArrowId::TIME64: + return LogicalType::TIME_MICROS; + default: + break; + } + return LogicalType::NONE; +} + +ParquetType::type get_physical_type(const ::arrow::DataType& type) { + switch (type.id()) { + case ArrowId::BOOL: + return ParquetType::BOOLEAN; + case ArrowId::UINT8: + case ArrowId::INT8: + case ArrowId::UINT16: + case ArrowId::INT16: + case ArrowId::UINT32: + case ArrowId::INT32: + return ParquetType::INT32; + case ArrowId::UINT64: + case ArrowId::INT64: + return ParquetType::INT64; + case ArrowId::FLOAT: + return ParquetType::FLOAT; + case ArrowId::DOUBLE: + return ParquetType::DOUBLE; + case ArrowId::BINARY: + return ParquetType::BYTE_ARRAY; + case ArrowId::STRING: + return ParquetType::BYTE_ARRAY; + case ArrowId::DATE32: + return ParquetType::INT32; + case ArrowId::DATE64: + // Convert to date32 internally + return ParquetType::INT32; + case ArrowId::TIME32: + return ParquetType::INT32; + case ArrowId::TIME64: + return ParquetType::INT64; + case ArrowId::TIMESTAMP: + return ParquetType::INT64; + default: + break; + } + DCHECK(false) << "cannot reach this code"; + return ParquetType::INT32; +} + template struct test_traits {}; template <> struct test_traits<::arrow::BooleanType> { static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static uint8_t const value; }; @@ -73,7 +159,6 @@ const uint8_t test_traits<::arrow::BooleanType>::value(1); template <> struct test_traits<::arrow::UInt8Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; static uint8_t const value; }; @@ -82,7 +167,6 @@ const uint8_t test_traits<::arrow::UInt8Type>::value(64); template <> struct test_traits<::arrow::Int8Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_8; static int8_t const value; }; @@ -91,7 +175,6 @@ const int8_t test_traits<::arrow::Int8Type>::value(-64); template <> struct test_traits<::arrow::UInt16Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; static uint16_t const value; }; @@ -100,7 +183,6 @@ const uint16_t test_traits<::arrow::UInt16Type>::value(1024); template <> struct test_traits<::arrow::Int16Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::INT_16; static int16_t const value; }; @@ -109,7 +191,6 @@ const int16_t test_traits<::arrow::Int16Type>::value(-1024); template <> struct test_traits<::arrow::UInt32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; static uint32_t const value; }; @@ -118,7 +199,6 @@ const uint32_t test_traits<::arrow::UInt32Type>::value(1024); template <> struct test_traits<::arrow::Int32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static int32_t const value; }; @@ -127,7 +207,6 @@ const int32_t test_traits<::arrow::Int32Type>::value(-1024); template <> struct test_traits<::arrow::UInt64Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; static uint64_t const value; }; @@ -136,7 +215,6 @@ const uint64_t test_traits<::arrow::UInt64Type>::value(1024); template <> struct test_traits<::arrow::Int64Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static int64_t const value; }; @@ -145,25 +223,22 @@ const int64_t test_traits<::arrow::Int64Type>::value(-1024); template <> struct test_traits<::arrow::TimestampType> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; - static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS; static int64_t const value; }; const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); template <> -struct test_traits<::arrow::Date64Type> { +struct test_traits<::arrow::Date32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; - static constexpr LogicalType::type logical_enum = LogicalType::DATE; - static int64_t const value; + static int32_t const value; }; -const int64_t test_traits<::arrow::Date64Type>::value(14688000000000); +const int32_t test_traits<::arrow::Date32Type>::value(170000); template <> struct test_traits<::arrow::FloatType> { static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static float const value; }; @@ -172,7 +247,6 @@ const float test_traits<::arrow::FloatType>::value(2.1f); template <> struct test_traits<::arrow::DoubleType> { static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static double const value; }; @@ -181,14 +255,12 @@ const double test_traits<::arrow::DoubleType>::value(4.2); template <> struct test_traits<::arrow::StringType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static constexpr LogicalType::type logical_enum = LogicalType::UTF8; static std::string const value; }; template <> struct test_traits<::arrow::BinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static constexpr LogicalType::type logical_enum = LogicalType::NONE; static std::string const value; }; @@ -231,19 +303,20 @@ void DoSimpleRoundtrip(const std::shared_ptr& table, int num_threads, } } +static std::shared_ptr MakeSimpleSchema( + const ::arrow::DataType& type, Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, + get_physical_type(type), get_logical_type(type)); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); +} + template class TestParquetIO : public ::testing::Test { public: virtual void SetUp() {} - std::shared_ptr MakeSchema(Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, - test_traits::parquet_enum, test_traits::logical_enum); - NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); - return std::static_pointer_cast(node_); - } - std::unique_ptr MakeWriter( const std::shared_ptr& schema) { sink_ = std::make_shared(); @@ -348,7 +421,7 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, - ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::Date64Type, ::arrow::FloatType, + ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType> TestTypes; @@ -358,7 +431,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { std::shared_ptr values; ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::shared_ptr schema = MakeSimpleSchema( + *values->type(), Repetition::REQUIRED); this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); @@ -390,7 +464,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + std::shared_ptr schema = MakeSimpleSchema( + *values->type(), Repetition::OPTIONAL); this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); @@ -399,7 +474,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { std::shared_ptr values; ASSERT_OK(NonNullArray(2 * SMALL_SIZE, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::shared_ptr schema = MakeSimpleSchema( + *values->type(), Repetition::REQUIRED); std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); this->WriteColumn(schema, sliced_values); @@ -414,7 +490,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { std::shared_ptr values; ASSERT_OK(NullableArray(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + std::shared_ptr schema = MakeSimpleSchema( + *values->type(), Repetition::OPTIONAL); std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); this->WriteColumn(schema, sliced_values); @@ -470,7 +547,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); int64_t chunk_size = values->length() / 4; - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + std::shared_ptr schema = MakeSimpleSchema( + *values->type(), Repetition::REQUIRED); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); @@ -531,7 +609,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); + std::shared_ptr schema = MakeSimpleSchema( + *values->type(), Repetition::OPTIONAL); FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); @@ -715,7 +794,9 @@ class TestPrimitiveParquetIO : public TestParquetIO { void MakeTestFile( std::vector& values, int num_chunks, std::unique_ptr* reader) { - std::shared_ptr schema = this->MakeSchema(Repetition::REQUIRED); + TestType dummy; + + std::shared_ptr schema = MakeSimpleSchema(dummy, Repetition::REQUIRED); std::unique_ptr file_writer = this->MakeWriter(schema); size_t chunk_size = values.size() / num_chunks; // Convert to Parquet's expected physical type diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 0f6b4556..a1e1164e 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -46,10 +46,8 @@ const auto FLOAT = ::arrow::float32(); const auto DOUBLE = ::arrow::float64(); const auto UTF8 = ::arrow::utf8(); const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); +const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); - -// TODO: This requires parquet-cpp implementing the MICROS enum value -// const auto TIMESTAMP_US = std::make_shared(TimestampType::Unit::MICRO); const auto BINARY = ::arrow::binary(); const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); @@ -105,9 +103,23 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); + arrow_fields.push_back(std::make_shared("timestamp[us]", TIMESTAMP_US, false)); + parquet_fields.push_back(PrimitiveNode::Make( "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); - arrow_fields.push_back(std::make_shared("date", ::arrow::date64(), false)); + arrow_fields.push_back(std::make_shared("date", ::arrow::date32(), false)); + + parquet_fields.push_back(PrimitiveNode::Make( + "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS)); + arrow_fields.push_back(std::make_shared( + "time32", ::arrow::time32(::arrow::TimeUnit::MILLI), false)); + + parquet_fields.push_back(PrimitiveNode::Make( + "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS)); + arrow_fields.push_back(std::make_shared( + "time64", ::arrow::time64(::arrow::TimeUnit::MICRO), false)); parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); @@ -568,7 +580,11 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make( "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); - arrow_fields.push_back(std::make_shared("date", ::arrow::date64(), false)); + arrow_fields.push_back(std::make_shared("date", ::arrow::date32(), false)); + + parquet_fields.push_back(PrimitiveNode::Make( + "date64", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); + arrow_fields.push_back(std::make_shared("date64", ::arrow::date64(), false)); parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 38d5583a..cc1e79fa 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -502,6 +502,10 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float) NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double) +NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t) +NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t) template <> Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( @@ -607,6 +611,10 @@ NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t) NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t) NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float) NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double) +NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t) +NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t) template <> Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( @@ -1036,13 +1044,14 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type) TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) - TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type) TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType) TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType) + TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type) + TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type) case ::arrow::Type::TIMESTAMP: { ::arrow::TimestampType* timestamp_type = static_cast<::arrow::TimestampType*>(field_->type().get()); @@ -1050,6 +1059,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out case ::arrow::TimeUnit::MILLI: return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out); break; + case ::arrow::TimeUnit::MICRO: + return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out); + break; case ::arrow::TimeUnit::NANO: return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out); break; @@ -1058,6 +1070,8 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out } break; } + TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type) + TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type) default: std::stringstream ss; ss << "No support for reading columns of type " << field_->type()->ToString(); diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 2721fc5f..7c5074b9 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -45,6 +45,7 @@ namespace parquet { namespace arrow { const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); +const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); TypePtr MakeDecimalType(const PrimitiveNode* node) { @@ -105,18 +106,18 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::INT_16: *out = ::arrow::int16(); break; + case LogicalType::INT_32: + *out = ::arrow::int32(); + break; case LogicalType::UINT_32: *out = ::arrow::uint32(); break; case LogicalType::DATE: - *out = ::arrow::date64(); + *out = ::arrow::date32(); break; case LogicalType::TIME_MILLIS: *out = ::arrow::time32(::arrow::TimeUnit::MILLI); break; - case LogicalType::TIME_MICROS: - *out = ::arrow::time64(::arrow::TimeUnit::MICRO); - break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; @@ -135,6 +136,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { case LogicalType::NONE: *out = ::arrow::int64(); break; + case LogicalType::INT_64: + *out = ::arrow::int64(); + break; case LogicalType::UINT_64: *out = ::arrow::uint64(); break; @@ -144,6 +148,12 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { case LogicalType::TIMESTAMP_MILLIS: *out = TIMESTAMP_MS; break; + case LogicalType::TIMESTAMP_MICROS: + *out = TIMESTAMP_US; + break; + case LogicalType::TIME_MICROS: + *out = ::arrow::time64(::arrow::TimeUnit::MICRO); + break; default: std::stringstream ss; ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 90cd1350..b52e2ae2 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -80,6 +80,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { PRIMITIVE_VISIT(Double) PRIMITIVE_VISIT(String) PRIMITIVE_VISIT(Binary) + PRIMITIVE_VISIT(Date32) PRIMITIVE_VISIT(Date64) PRIMITIVE_VISIT(Time32) PRIMITIVE_VISIT(Time64) @@ -357,8 +358,11 @@ Status FileWriter::Impl::WriteNonNullableBatch( } NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Time32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) @@ -417,8 +421,11 @@ Status FileWriter::Impl::WriteNullableBatch( } NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) +NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Time32Type, int32_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) @@ -553,14 +560,17 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) - WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) - WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type) WRITE_BATCH_CASE(FLOAT, FloatType, FloatType) WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType) WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) + WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) + WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) + WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) + WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) + WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type) default: std::stringstream ss; ss << "Data type not supported as list value: " << values_array->type()->ToString(); From e626ebdeb82185daa004f7442290c99d3fd54de2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 18:37:18 -0400 Subject: [PATCH 3/8] Use inline visitor in LevelBuilder --- src/parquet/arrow/writer.cc | 69 +++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index b52e2ae2..d8d47ee4 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -26,6 +26,7 @@ #include "parquet/arrow/schema.h" #include "arrow/api.h" +#include "arrow/visitor_inline.h" using arrow::Array; using arrow::BinaryArray; @@ -49,45 +50,34 @@ namespace arrow { namespace BitUtil = ::arrow::BitUtil; -class LevelBuilder : public ::arrow::ArrayVisitor { +class LevelBuilder { public: explicit LevelBuilder(MemoryPool* pool) : def_levels_(pool, ::arrow::int16()), rep_levels_(pool, ::arrow::int16()) { def_levels_buffer_ = std::make_shared(pool); } -#define PRIMITIVE_VISIT(ArrowTypePrefix) \ - Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ - array_offsets_.push_back(array.offset()); \ - valid_bitmaps_.push_back(array.null_bitmap_data()); \ - null_counts_.push_back(array.null_count()); \ - values_type_ = array.type_id(); \ - values_array_ = &array; \ - return Status::OK(); \ + Status VisitInline(const Array& array); + + Status Visit(const ::arrow::PrimitiveArray& array) { + array_offsets_.push_back(array.offset()); + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.null_count()); + values_type_ = array.type_id(); + values_array_ = &array; + return Status::OK(); } - PRIMITIVE_VISIT(Boolean) - PRIMITIVE_VISIT(Int8) - PRIMITIVE_VISIT(Int16) - PRIMITIVE_VISIT(Int32) - PRIMITIVE_VISIT(Int64) - PRIMITIVE_VISIT(UInt8) - PRIMITIVE_VISIT(UInt16) - PRIMITIVE_VISIT(UInt32) - PRIMITIVE_VISIT(UInt64) - PRIMITIVE_VISIT(HalfFloat) - PRIMITIVE_VISIT(Float) - PRIMITIVE_VISIT(Double) - PRIMITIVE_VISIT(String) - PRIMITIVE_VISIT(Binary) - PRIMITIVE_VISIT(Date32) - PRIMITIVE_VISIT(Date64) - PRIMITIVE_VISIT(Time32) - PRIMITIVE_VISIT(Time64) - PRIMITIVE_VISIT(Timestamp) - PRIMITIVE_VISIT(Interval) - - Status Visit(const ListArray& array) override { + Status Visit(const ::arrow::BinaryArray& array) { + array_offsets_.push_back(array.offset()); + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.null_count()); + values_type_ = array.type_id(); + values_array_ = &array; + return Status::OK(); + } + + Status Visit(const ListArray& array) { array_offsets_.push_back(array.offset()); valid_bitmaps_.push_back(array.null_bitmap_data()); null_counts_.push_back(array.null_count()); @@ -96,13 +86,13 @@ class LevelBuilder : public ::arrow::ArrayVisitor { min_offset_idx_ = array.value_offset(min_offset_idx_); max_offset_idx_ = array.value_offset(max_offset_idx_); - return array.values()->Accept(this); + return VisitInline(*array.values()); } -#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ - Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ - return Status::NotImplemented( \ - "Level generation for ArrowTypePrefix not supported yet"); \ +#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ + Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ + return Status::NotImplemented( \ + "Level generation for ArrowTypePrefix not supported yet"); \ }; NOT_IMPLEMENTED_VIST(Null) @@ -110,6 +100,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { NOT_IMPLEMENTED_VIST(Union) NOT_IMPLEMENTED_VIST(Decimal) NOT_IMPLEMENTED_VIST(Dictionary) + NOT_IMPLEMENTED_VIST(Interval) Status GenerateLevels(const Array& array, const std::shared_ptr& field, int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values, @@ -118,7 +109,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor { // Work downwards to extract bitmaps and offsets min_offset_idx_ = 0; max_offset_idx_ = array.length(); - RETURN_NOT_OK(array.Accept(this)); + RETURN_NOT_OK(VisitInline(array)); *num_values = max_offset_idx_ - min_offset_idx_; *values_offset = min_offset_idx_; *values_type = values_type_; @@ -248,6 +239,10 @@ class LevelBuilder : public ::arrow::ArrayVisitor { const Array* values_array_; }; +Status LevelBuilder::VisitInline(const Array& array) { + return VisitArrayInline(array, this); +} + class FileWriter::Impl { public: Impl(MemoryPool* pool, std::unique_ptr writer); From 440b40f4609b74bd03d3e406fcbc93191d8657ab Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 19:23:43 -0400 Subject: [PATCH 4/8] Add unit test for date/time types that write without implicit casts --- src/parquet/arrow/arrow-reader-writer-test.cc | 87 ++++++++++++++----- src/parquet/arrow/arrow-schema-test.cc | 8 +- src/parquet/arrow/reader.cc | 4 +- src/parquet/arrow/schema.cc | 2 +- src/parquet/arrow/test-util.h | 1 + src/parquet/arrow/writer.cc | 8 +- src/parquet/file/printer.h | 1 + 7 files changed, 78 insertions(+), 33 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index cd6f8ab0..540b1e34 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -43,6 +43,7 @@ using arrow::PoolBuffer; using arrow::PrimitiveArray; using arrow::Status; using arrow::Table; +using arrow::TimeUnit; using ArrowId = ::arrow::Type; using ParquetType = parquet::Type; @@ -86,13 +87,13 @@ LogicalType::type get_logical_type(const ::arrow::DataType& type) { case ArrowId::TIMESTAMP: { const auto& ts_type = static_cast(type); switch (ts_type.unit()) { - case ::arrow::TimeUnit::MILLI: + case TimeUnit::MILLI: return LogicalType::TIMESTAMP_MILLIS; - case ::arrow::TimeUnit::MICRO: + case TimeUnit::MICRO: return LogicalType::TIMESTAMP_MICROS; default: - DCHECK(false) - << "Only MILLI and MICRO units supported for Arrow timestamps with Parquet."; + DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps " + "with Parquet."; } } case ArrowId::TIME32: @@ -305,10 +306,10 @@ void DoSimpleRoundtrip(const std::shared_ptr
& table, int num_threads, static std::shared_ptr MakeSimpleSchema( const ::arrow::DataType& type, Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, - get_physical_type(type), get_logical_type(type)); + auto pnode = PrimitiveNode::Make( + "column1", repetition, get_physical_type(type), get_logical_type(type)); NodePtr node_ = - GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); } @@ -421,8 +422,8 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, - ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::Date32Type, ::arrow::FloatType, - ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType> + ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, + ::arrow::StringType, ::arrow::BinaryType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -431,8 +432,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { std::shared_ptr values; ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); - std::shared_ptr schema = MakeSimpleSchema( - *values->type(), Repetition::REQUIRED); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::REQUIRED); this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); @@ -464,8 +465,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr schema = MakeSimpleSchema( - *values->type(), Repetition::OPTIONAL); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); this->WriteColumn(schema, values); this->ReadAndCheckSingleColumnFile(values.get()); @@ -474,8 +475,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { std::shared_ptr values; ASSERT_OK(NonNullArray(2 * SMALL_SIZE, &values)); - std::shared_ptr schema = MakeSimpleSchema( - *values->type(), Repetition::REQUIRED); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::REQUIRED); std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); this->WriteColumn(schema, sliced_values); @@ -490,8 +491,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) { std::shared_ptr values; ASSERT_OK(NullableArray(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values)); - std::shared_ptr schema = MakeSimpleSchema( - *values->type(), Repetition::OPTIONAL); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); std::shared_ptr sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE); this->WriteColumn(schema, sliced_values); @@ -547,8 +548,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); int64_t chunk_size = values->length() / 4; - std::shared_ptr schema = MakeSimpleSchema( - *values->type(), Repetition::REQUIRED); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::REQUIRED); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); @@ -609,8 +610,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr schema = MakeSimpleSchema( - *values->type(), Repetition::OPTIONAL); + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); @@ -680,7 +681,7 @@ TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { writer->Close(); ::arrow::TimestampBuilder builder( - default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO)); + default_memory_pool(), ::arrow::timestamp(TimeUnit::NANO)); builder.Append(val); std::shared_ptr values; ASSERT_OK(builder.Finish(&values)); @@ -868,6 +869,48 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } +void MakeDateTimeTypesTable(std::shared_ptr
* out) { + using ::arrow::ArrayFromVector; + + std::vector is_valid = {true, true, true, false, true, true}; + + // These are only types that roundtrip without modification + auto f0 = field("f0", ::arrow::date32()); + auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI)); + auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO)); + auto f3 = field("f3", ::arrow::time32(TimeUnit::MILLI)); + auto f4 = field("f4", ::arrow::time64(TimeUnit::MICRO)); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4})); + + std::vector t32_values = { + 1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}; + std::vector t64_values = {1489269000000, 1489270000000, 1489271000000, + 1489272000000, 1489272000000, 1489273000000}; + + std::shared_ptr a0, a1, a2, a3, a4; + ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0); + ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1); + ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2); + ArrayFromVector<::arrow::Time32Type, int32_t>(f3->type(), is_valid, t32_values, &a3); + ArrayFromVector<::arrow::Time64Type, int64_t>(f4->type(), is_valid, t64_values, &a4); + + std::vector> columns = { + std::make_shared("f0", a0), std::make_shared("f1", a1), + std::make_shared("f2", a2), std::make_shared("f3", a3), + std::make_shared("f4", a4)}; + *out = std::make_shared<::arrow::Table>(schema, columns); +} + +TEST(TestArrowReadWrite, DateTimeTypes) { + std::shared_ptr
table; + MakeDateTimeTypesTable(&table); + + std::shared_ptr
result; + DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result); + + ASSERT_TRUE(table->Equals(*result)); +} + void MakeDoubleTable( int num_columns, int num_rows, int nchunks, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index a1e1164e..54044654 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -60,8 +60,8 @@ class TestConvertParquetSchema : public ::testing::Test { for (int i = 0; i < expected_schema->num_fields(); ++i) { auto lhs = result_schema_->field(i); auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) - << i << " " << lhs->ToString() << " != " << rhs->ToString(); + EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() + << " != " << rhs->ToString(); } } @@ -114,12 +114,12 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make( "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS)); arrow_fields.push_back(std::make_shared( - "time32", ::arrow::time32(::arrow::TimeUnit::MILLI), false)); + "time32", ::arrow::time32(::arrow::TimeUnit::MILLI), false)); parquet_fields.push_back(PrimitiveNode::Make( "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS)); arrow_fields.push_back(std::make_shared( - "time64", ::arrow::time64(::arrow::TimeUnit::MICRO), false)); + "time64", ::arrow::time64(::arrow::TimeUnit::MICRO), false)); parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index cc1e79fa..852649ae 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -1070,8 +1070,8 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out } break; } - TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type) - TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type) + TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type) + TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type) default: std::stringstream ss; ss << "No support for reading columns of type " << field_->type()->ToString(); diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 7c5074b9..86e98bcb 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -477,7 +477,7 @@ Status FieldToNode(const std::shared_ptr& field, } } break; case ArrowType::TIME32: - type = ParquetType::INT64; + type = ParquetType::INT32; logical_type = LogicalType::TIME_MILLIS; break; case ArrowType::TIME64: diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index bff952bb..8bcd3149 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -20,6 +20,7 @@ #include "arrow/api.h" #include "arrow/test-util.h" +#include "arrow/type_traits.h" namespace parquet { namespace arrow { diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index d8d47ee4..ff150d40 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -89,10 +89,10 @@ class LevelBuilder { return VisitInline(*array.values()); } -#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ - Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ - return Status::NotImplemented( \ - "Level generation for ArrowTypePrefix not supported yet"); \ +#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ + Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ + return Status::NotImplemented( \ + "Level generation for ArrowTypePrefix not supported yet"); \ }; NOT_IMPLEMENTED_VIST(Null) diff --git a/src/parquet/file/printer.h b/src/parquet/file/printer.h index 433f9e8a..957c313a 100644 --- a/src/parquet/file/printer.h +++ b/src/parquet/file/printer.h @@ -32,6 +32,7 @@ namespace parquet { class PARQUET_EXPORT ParquetFilePrinter { private: ParquetFileReader* fileReader; + public: explicit ParquetFilePrinter(ParquetFileReader* reader) : fileReader(reader) {} ~ParquetFilePrinter() {} From 5167a7abfd9a2f1e03e63949efdcdc68d5dd31e6 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 19:33:37 -0400 Subject: [PATCH 5/8] Add unit test for date64->date32 cast --- src/parquet/arrow/arrow-reader-writer-test.cc | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 540b1e34..aba6e58c 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -911,6 +911,42 @@ TEST(TestArrowReadWrite, DateTimeTypes) { ASSERT_TRUE(table->Equals(*result)); } +TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { + using ::arrow::ArrayFromVector; + + std::vector is_valid = {true, true, true, false, true, true}; + + auto f0 = field("f0", ::arrow::date64()); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0})); + + std::vector t64_values = { + 1489190400000, 1489276800000, 1489363200000, 1489449600000, + 1489536000000, 1489622400000}; + + // Expected schema and values + auto f1 = field("f0", ::arrow::date32()); + std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({f1})); + std::vector ex_values = {17236, 17237, 17238, 17239, 17240, 17241}; + + std::shared_ptr a0, a1; + ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, t64_values, &a0); + ArrayFromVector<::arrow::Date32Type, int32_t>(f1->type(), is_valid, ex_values, &a1); + + std::vector> columns = { + std::make_shared("f0", a0)}; + + std::vector> ex_columns = { + std::make_shared("f0", a1)}; + + auto table = std::make_shared<::arrow::Table>(schema, columns); + auto ex_table = std::make_shared<::arrow::Table>(ex_schema, ex_columns); + + std::shared_ptr
result; + DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result); + + ASSERT_TRUE(result->Equals(*ex_table)); +} + void MakeDoubleTable( int num_columns, int num_rows, int nchunks, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; From 37c1b4209448b7cafd90ffdaaa0edf44db9b25c3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 19:49:17 -0400 Subject: [PATCH 6/8] cpplint --- src/parquet/arrow/writer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index ff150d40..d2d064cb 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -93,7 +93,7 @@ class LevelBuilder { Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ return Status::NotImplemented( \ "Level generation for ArrowTypePrefix not supported yet"); \ - }; + } NOT_IMPLEMENTED_VIST(Null) NOT_IMPLEMENTED_VIST(Struct) From 6331d8c58c2ec6a306f31b5c2117381984b4bee4 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 20:18:20 -0400 Subject: [PATCH 7/8] Cast time32[second] to time32[millisecond] --- src/parquet/arrow/arrow-reader-writer-test.cc | 37 ++--- src/parquet/arrow/writer.cc | 136 ++++++++++++------ 2 files changed, 117 insertions(+), 56 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index aba6e58c..7b63514d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -917,28 +917,33 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) { std::vector is_valid = {true, true, true, false, true, true}; auto f0 = field("f0", ::arrow::date64()); - std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0})); + auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND)); + std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1})); - std::vector t64_values = { - 1489190400000, 1489276800000, 1489363200000, 1489449600000, - 1489536000000, 1489622400000}; + std::vector a0_values = {1489190400000, 1489276800000, 1489363200000, + 1489449600000, 1489536000000, 1489622400000}; + std::vector a1_values = {0, 1, 2, 3, 4, 5}; - // Expected schema and values - auto f1 = field("f0", ::arrow::date32()); - std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({f1})); - std::vector ex_values = {17236, 17237, 17238, 17239, 17240, 17241}; - - std::shared_ptr a0, a1; - ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, t64_values, &a0); - ArrayFromVector<::arrow::Date32Type, int32_t>(f1->type(), is_valid, ex_values, &a1); + std::shared_ptr a0, a1, x0, x1; + ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0); + ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1); std::vector> columns = { - std::make_shared("f0", a0)}; + std::make_shared("f0", a0), std::make_shared("f1", a1)}; + auto table = std::make_shared<::arrow::Table>(schema, columns); - std::vector> ex_columns = { - std::make_shared("f0", a1)}; + // Expected schema and values + auto e0 = field("f0", ::arrow::date32()); + auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI)); + std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1})); - auto table = std::make_shared<::arrow::Table>(schema, columns); + std::vector x0_values = {17236, 17237, 17238, 17239, 17240, 17241}; + std::vector x1_values = {0, 1000, 2000, 3000, 4000, 5000}; + ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0); + ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1); + + std::vector> ex_columns = { + std::make_shared("f0", x0), std::make_shared("f1", x1)}; auto ex_table = std::make_shared<::arrow::Table>(ex_schema, ex_columns); std::shared_ptr
result; diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index d2d064cb..6ac33b11 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -40,6 +40,7 @@ using arrow::PrimitiveArray; using arrow::ListArray; using arrow::Status; using arrow::Table; +using arrow::TimeUnit; using parquet::ParquetFileWriter; using parquet::ParquetVersion; @@ -89,18 +90,18 @@ class LevelBuilder { return VisitInline(*array.values()); } -#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ +#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \ Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \ return Status::NotImplemented( \ "Level generation for ArrowTypePrefix not supported yet"); \ } - NOT_IMPLEMENTED_VIST(Null) - NOT_IMPLEMENTED_VIST(Struct) - NOT_IMPLEMENTED_VIST(Union) - NOT_IMPLEMENTED_VIST(Decimal) - NOT_IMPLEMENTED_VIST(Dictionary) - NOT_IMPLEMENTED_VIST(Interval) + NOT_IMPLEMENTED_VISIT(Null) + NOT_IMPLEMENTED_VISIT(Struct) + NOT_IMPLEMENTED_VISIT(Union) + NOT_IMPLEMENTED_VISIT(Decimal) + NOT_IMPLEMENTED_VISIT(Dictionary) + NOT_IMPLEMENTED_VISIT(Interval) Status GenerateLevels(const Array& array, const std::shared_ptr& field, int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values, @@ -253,14 +254,15 @@ class FileWriter::Impl { int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels); template - Status WriteNonNullableBatch(TypedColumnWriter* writer, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, + Status WriteNonNullableBatch(TypedColumnWriter* writer, + const ArrowType& type, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr); template - Status WriteNullableBatch(TypedColumnWriter* writer, int64_t num_values, - int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, - const uint8_t* valid_bits, int64_t valid_bits_offset, + Status WriteNullableBatch(TypedColumnWriter* writer, const ArrowType& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr); Status WriteColumnChunk(const Array& data); @@ -303,13 +305,14 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data - RETURN_NOT_OK((WriteNonNullableBatch(writer, array->length(), - num_levels, def_levels, rep_levels, data_ptr + data->offset()))); + RETURN_NOT_OK((WriteNonNullableBatch(writer, + static_cast(*array->type()), array->length(), num_levels, + def_levels, rep_levels, data_ptr + data->offset()))); } else { const uint8_t* valid_bits = data->null_bitmap_data(); - RETURN_NOT_OK((WriteNullableBatch(writer, data->length(), - num_levels, def_levels, rep_levels, valid_bits, data->offset(), - data_ptr + data->offset()))); + RETURN_NOT_OK((WriteNullableBatch(writer, + static_cast(*array->type()), data->length(), num_levels, + def_levels, rep_levels, valid_bits, data->offset(), data_ptr + data->offset()))); } PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); @@ -317,8 +320,9 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, template Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter* writer, - int64_t num_values, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) { + const ArrowType& type, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, + const typename ArrowType::c_type* data_ptr) { using ParquetCType = typename ParquetType::c_type; RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); @@ -330,8 +334,9 @@ Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter* w template <> Status FileWriter::Impl::WriteNonNullableBatch( - TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) { + TypedColumnWriter* writer, const ::arrow::Date64Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const int64_t* data_ptr) { RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); for (int i = 0; i < num_values; i++) { @@ -342,19 +347,38 @@ Status FileWriter::Impl::WriteNonNullableBatch( return Status::OK(); } -#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ - template <> \ - Status FileWriter::Impl::WriteNonNullableBatch( \ - TypedColumnWriter * writer, int64_t num_values, int64_t num_levels, \ - const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) { \ - PARQUET_CATCH_NOT_OK( \ - writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \ - return Status::OK(); \ +template <> +Status FileWriter::Impl::WriteNonNullableBatch( + TypedColumnWriter* writer, const ::arrow::Time32Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const int32_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + if (type.unit() == TimeUnit::SECOND) { + for (int i = 0; i < num_values; i++) { + buffer_ptr[i] = data_ptr[i] * 1000; + } + } else { + std::copy(data_ptr, data_ptr + num_values, buffer_ptr); + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + return Status::OK(); +} + +#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ + template <> \ + Status FileWriter::Impl::WriteNonNullableBatch( \ + TypedColumnWriter * writer, const ArrowType& type, \ + int64_t num_values, int64_t num_levels, const int16_t* def_levels, \ + const int16_t* rep_levels, const CType* data_ptr) { \ + PARQUET_CATCH_NOT_OK( \ + writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \ + return Status::OK(); \ } NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) -NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Time32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) @@ -363,9 +387,9 @@ NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) template Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter* writer, - int64_t num_values, int64_t num_levels, const int16_t* def_levels, - const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, - const typename ArrowType::c_type* data_ptr) { + const ArrowType& type, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, + int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr) { using ParquetCType = typename ParquetType::c_type; RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); @@ -385,9 +409,10 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter* writ template <> Status FileWriter::Impl::WriteNullableBatch( - TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const int64_t* data_ptr) { + TypedColumnWriter* writer, const ::arrow::Date64Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, + const int64_t* data_ptr) { RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); INIT_BITSET(valid_bits, valid_bits_offset); @@ -404,12 +429,44 @@ Status FileWriter::Impl::WriteNullableBatch( return Status::OK(); } +template <> +Status FileWriter::Impl::WriteNullableBatch( + TypedColumnWriter* writer, const ::arrow::Time32Type& type, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, + const int32_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + INIT_BITSET(valid_bits, valid_bits_offset); + + if (type.unit() == TimeUnit::SECOND) { + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + buffer_ptr[i] = data_ptr[i] * 1000; + } + READ_NEXT_BITSET(valid_bits); + } + } else { + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + buffer_ptr[i] = data_ptr[i]; + } + READ_NEXT_BITSET(valid_bits); + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( + num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr)); + + return Status::OK(); +} + #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ template <> \ Status FileWriter::Impl::WriteNullableBatch( \ - TypedColumnWriter * writer, int64_t num_values, int64_t num_levels, \ - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \ - int64_t valid_bits_offset, const CType* data_ptr) { \ + TypedColumnWriter * writer, const ArrowType& type, \ + int64_t num_values, int64_t num_levels, const int16_t* def_levels, \ + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \ + const CType* data_ptr) { \ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \ return Status::OK(); \ @@ -417,7 +474,6 @@ Status FileWriter::Impl::WriteNullableBatch( NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t) -NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Time32Type, int32_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t) From 0a896397111da68dfaf8ba85cb9cf5e14408eb88 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 24 Apr 2017 20:28:02 -0400 Subject: [PATCH 8/8] Add test for time64[ns] --- src/parquet/arrow/arrow-schema-test.cc | 24 ++++++++++++++++-------- src/parquet/arrow/schema.cc | 9 +++++++-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 54044654..20425665 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -26,6 +26,7 @@ #include "arrow/test-util.h" using arrow::Field; +using arrow::TimeUnit; using ParquetType = parquet::Type; using parquet::LogicalType; @@ -45,9 +46,9 @@ const auto INT64 = ::arrow::int64(); const auto FLOAT = ::arrow::float32(); const auto DOUBLE = ::arrow::float64(); const auto UTF8 = ::arrow::utf8(); -const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); -const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); -const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); +const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI); +const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO); +const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO); const auto BINARY = ::arrow::binary(); const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); @@ -114,12 +115,12 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make( "time32", Repetition::REQUIRED, ParquetType::INT32, LogicalType::TIME_MILLIS)); arrow_fields.push_back(std::make_shared( - "time32", ::arrow::time32(::arrow::TimeUnit::MILLI), false)); + "time32", ::arrow::time32(TimeUnit::MILLI), false)); parquet_fields.push_back(PrimitiveNode::Make( "time64", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIME_MICROS)); arrow_fields.push_back(std::make_shared( - "time64", ::arrow::time64(::arrow::TimeUnit::MICRO), false)); + "time64", ::arrow::time64(TimeUnit::MICRO), false)); parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); @@ -660,6 +661,16 @@ TEST_F(TestConvertArrowSchema, ParquetLists) { CheckFlatSchema(parquet_fields); } +TEST_F(TestConvertArrowSchema, UnsupportedTypes) { + std::vector> unsupported_fields = { + ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO)) + }; + + for (const auto& field : unsupported_fields) { + ASSERT_RAISES(NotImplemented, ConvertSchema({field})); + } +} + TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { std::vector parquet_fields; std::vector> arrow_fields; @@ -671,8 +682,5 @@ TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) { CheckFlatSchema(parquet_fields); } -TEST(TestNodeConversion, DateAndTime) {} - } // namespace arrow - } // namespace parquet diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 86e98bcb..31895cea 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -480,10 +480,15 @@ Status FieldToNode(const std::shared_ptr& field, type = ParquetType::INT32; logical_type = LogicalType::TIME_MILLIS; break; - case ArrowType::TIME64: + case ArrowType::TIME64: { + auto time_type = static_cast<::arrow::Time64Type*>(field->type().get()); + if (time_type->unit() == ::arrow::TimeUnit::NANO) { + return Status::NotImplemented( + "Nanosecond time not supported in Parquet."); + } type = ParquetType::INT64; logical_type = LogicalType::TIME_MICROS; - break; + } break; case ArrowType::STRUCT: { auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); return StructToNode(struct_type, field->name(), field->nullable(), properties, out);