From a7b803a367640921df45126a93a3beaf0e788f54 Mon Sep 17 00:00:00 2001 From: Miki Tebeka Date: Mon, 13 Mar 2017 18:55:43 +0200 Subject: [PATCH] PARQUET-915: Support Arrow Time types in Schema ARROW-601: Some logical types not supported when loading Parquet Support for TimeType with TimeUnit::MILLI and TimeUnit::MICRO --- src/parquet/arrow/arrow-schema-test.cc | 8 ++++++++ src/parquet/arrow/reader.cc | 23 ++++++++++++++++++++++- src/parquet/arrow/schema.cc | 25 ++++++++++++++++++++++--- src/parquet/schema.cc | 1 + 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index 8db792f2..7771cdf7 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -407,6 +407,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); // arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_US, false)); + parquet_fields.push_back(PrimitiveNode::Make("time_ms", Repetition::REQUIRED, + ParquetType::INT32, LogicalType::TIME_MILLIS)); + arrow_fields.push_back(std::make_shared("time_ms", ::arrow::time(::arrow::TimeUnit::MILLI), false)); + + parquet_fields.push_back(PrimitiveNode::Make("time_us", Repetition::REQUIRED, + ParquetType::INT64, LogicalType::TIME_MICROS)); + arrow_fields.push_back(std::make_shared("time_us", ::arrow::time(::arrow::TimeUnit::MICRO), false)); + parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); arrow_fields.push_back(std::make_shared("float", FLOAT)); diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 73f6d873..f4ba3dea 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -870,6 +870,27 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( return TypedReadBatch(batch_size, out); \ break; +static inline Status UnsupportedTimeUnit(::arrow::TimeUnit unit) { + std::stringstream ss; + ss << "TimeUnit::"; + switch (unit) { + case ::arrow::TimeUnit::SECOND: + ss << "SECOND"; + break; + case ::arrow::TimeUnit::MILLI: + ss << "MILLI"; + break; + case ::arrow::TimeUnit::MICRO: + ss << "MICRO"; + break; + case ::arrow::TimeUnit::NANO: + ss << "NANO"; + break; + } + ss << " is not supported"; + return Status::NotImplemented(ss.str()); +} + Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { if (!column_reader_) { // Exhausted all row groups. @@ -903,7 +924,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out); break; default: - return Status::NotImplemented("TimeUnit not supported"); + return UnsupportedTimeUnit(timestamp_type->unit); } break; } diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 0c336d95..b41f297b 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -113,6 +113,12 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; + case LogicalType::TIME_MILLIS: + *out = ::arrow::time(::arrow::TimeUnit::MILLI); + break; + case LogicalType::TIME_MICROS: + *out = ::arrow::time(::arrow::TimeUnit::MICRO); + break; default: std::stringstream ss; ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) @@ -391,9 +397,22 @@ Status FieldToNode(const std::shared_ptr& field, type = ParquetType::INT64; logical_type = LogicalType::TIMESTAMP_MILLIS; } break; - case ArrowType::TIME: - type = ParquetType::INT64; - logical_type = LogicalType::TIME_MILLIS; + case ArrowType::TIME: { + auto time_type = static_cast<::arrow::TimeType*>(field->type.get()); + switch (time_type->unit) { + case ::arrow::TimeUnit::MILLI: + logical_type = LogicalType::TIME_MILLIS; + type = ParquetType::INT32; + break; + case ::arrow::TimeUnit::MICRO: + logical_type = LogicalType::TIME_MICROS; + type = ParquetType::INT64; + break; + default: + return Status::NotImplemented( + "Only MILLI and MICRO TimeUnit are supported"); + } + } break; case ArrowType::STRUCT: { auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type); diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc index e6380aea..be4f812b 100644 --- a/src/parquet/schema.cc +++ b/src/parquet/schema.cc @@ -143,6 +143,7 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio if (type != Type::INT32) { ss << LogicalTypeToString(logical_type); ss << " can only annotate INT32"; + ss << " (was " << TypeToString(type) << ")"; throw ParquetException(ss.str()); } break;